yieldやジェネレータの周りの知識や使い所まとめ[Python]

先日バックエンドの開発をしていてyieldを使うと便利な場面があり、他にも使えるところないかなと思って調べたので、ついでにyieldの周りの知識や使い所についてまとめてみました。

どんな場面で使えるか

yield(というよりはジェネレータ)の使える状況を簡単に説明すると、主に次の1点だと思われます。

何らかの状態(ステート)を保持する必要があるが、クラスを作るまでもない、すぐにいらなくなるようなものを保持しないといけない時

具体的なケースでいうと

  • メモリを圧迫するぐらい大容量のcsvファイルを一行ずつ扱いたい時
  • 無限データストリームの生成
  • 再帰処理

といったことに使うことができます。


yieldとは

英語で「産み出す」や「もたらす」という意味があります。
yieldは、関数においてreturnの代わりにyieldを使用することによってその関数をジェネレータ関数にすることができます。
またasync def 関数で使用するとそのコルーチン関数は非同期ジェネレータになります。(今回非同期ジェネレータの話は割愛します)

# ジェネレータ関数
def generator_func():
    yield 1
    yield 2
    yield 3

# 非同期ジェネレータ
async def AsyncGenerator(num: int):
    for item in range(num):
        await asyncio.sleep(2)
        yield item

ジェネレータという言葉が出てきました。 ジェネレータの説明が必要ですね。

ジェネレータとは

Pythonにおける「ジェネレータ」とはジェネレータ関数を指す場合と、ジェネレータイテレータを指す場合があります。(言葉の定義があいまい。。)

ジェネレータ関数とは

ジェネレータ関数イテレータを返す、特殊な種類の関数のことです。
ジェネレータ関数によって作成されたイテレータのことを特にジェネレータイテレータと呼びます。
ジェネレータ関数はイテレータを簡単に作るためのものと言われています。(それだけではないですが)

# ジェネレータ関数
def generator_func():
    yield 1
    yield 2

# ジェネレータイテレータ
generator_iter = generator_func()
print(type(generator_iter)) # <class 'generator'>
print('__iter__' in dir(generator_iter)) # True
print('__next__' in dir(generator_iter)) # True

# 組み込みメソッドnext()もしくは特殊メソッド.__next__()で値を取得
print(next(generator_iter)) # 1
print(generator_iter.__next__())  # 2
# 要素を出し切ってからnextするとStopIterationエラーがraiseされる
print(next(generator_iter)) # raise StopIteration

# リストにも変換可能
# generator_iterとgenerator_iter_2の内部状況は違いに独立
generator_iter_2 = generator_func()
print(list(generator_iter_2))  # [1, 2]
# ジェネレータは一度使うとリストのように複数回使うことは不可
print(list(generator_iter_2)) # []

イテレータとは

イテレータ(オブジェクト)は「データの流れを表現」するオブジェクトで、値を 1 つずつ返してくれるオブジェクトのことです。
簡単には自身を戻り値とする__iter__()と次の要素を返す __next__() の 2 つのメソッドを持っています。
イテレータはリストやタプルなどのイテレータブルから作ることもできますし、クラスからも作ることができます。
イテレータイテレータブルの違いはこちら(Python公式Docs)

# イテレータブル(リスト)からのイテレータの生成
tmp_list = [1, 2, 3, 4]
print(type(tmp_list)) # <class 'list'>
new_iter = iter(tmp_list)
print(type(new_iter)) # <class 'list_iterator'>
print('__iter__' in dir(new_iter)) # True
print('__next__' in dir(new_iter))  # True
# クラスでのイテレータの定義
class PowTwo:
    def __init__(self, max=0):
        self.n = 0
        self.max = max

    def __iter__(self):
        return self

    def __next__(self):
        if self.n > self.max:
            raise StopIteration

        result = 2 ** self.n
        self.n += 1
        return result

pow_two_iter = PowTwo(2)
print('__iter__' in dir(pow_two_iter)) # True
print('__next__' in dir(pow_two_iter)) # True
print(next(pow_two_iter)) # 1
print(next(pow_two_iter)) # 2
print(next(pow_two_iter)) # 4
print(next(pow_two_iter)) # raise StopIteration

上と同様のものをジェネレータで実装するととても簡潔です。

def PowTwoGen(max=0):
    n = 0
    while n <= max:
        yield 2 ** n
        n += 1

pow_two_gen = PowTwoGen(2)
print(type(pow_two_gen)) # <class 'generator'>
print('__iter__' in dir(pow_two_gen)) # True
print('__next__' in dir(pow_two_gen)) # True
print(next(pow_two_gen)) # 1
print(next(pow_two_gen)) # 2
print(next(pow_two_gen)) # 4
print(next(pow_two_gen)) # raise StopIteration

ジェネレータイテレータとは(別名:ジェネレータオブジェクト)

ジェネレータ関数によって作成されたイテレータのことで、イテレータと同様__iter__()と次の要素を返す __next__() の 2 つのメソッドを持っています。
しかし、通常のイテレータとは異なる特徴として、yield文までの実行された、局所実行状態を保存して、処理を一時的に中断することができます。(ローカル変数、プログラムカウンタや評価スタック、そして例外処理のを含むすべてのローカルの状態が保持されます。)
そしてまたジェネレータイテレータが再開されると、中断した位置を取得し実行を再開します。
これはコルーチンとよく似た機構です。

このようにジェネレータは必要に応じて、計算を止めたり、再開することで、リストのように全ての要素を保持するのではなく、一つずつの要素をその都度、計算し生成するのでメモリを節約することができます。

その他の知識

ジェネレータについての簡単な説明は終わりますが、その他にもPythonのジェネレータには色々な機構があるのでそちらを紹介します。興味のない方は使用例まで飛ばしていただいて大丈夫です。

ジェネレータとコルーチン

ジェネレータ(イテレータ)は、中断と実行をしながら一時的にローカル状態を保存しつつ、値を返していくという性質を持っていて、それがコルーチンの機構と似ているということでした。
しかし、コルーチンはただ値を返すことだけではなくコルーチンに対して値を送信する機構があります。
Pythonのジェネレータではコルーチンと同様に値を送信することができ、これをジェネレータベースのコルーチンと言います(他の言語ではジェネレータにこの機構は存在しないのでこの特別な名称がついていると思われます。)

ジェネレータベースのコルーチン

ジェネレータに値を送信する場合は、next() 関数ではなくsend()メソッドを利用します。

※現在、ジェネレータベースのコルーチンはPython 3.10で廃止予定で、代わりにネイティブコルーチンの使用が推奨されています。
https://docs.python.org/ja/3/library/asyncio-task.html#generator-based-coroutines

def sumup():
    n = 0
    while True:
        n += yield n


coroutine = sumup()
# next() 関数を実行して、ジェネレータを実行状態に遷移
print(next(coroutine)) # 0

print(coroutine.send(1)) # 1
print(coroutine.send(2)) # 3
print(coroutine.send(3)) # 6

coroutine.close()

値を渡すsendのほかにもPythonのジェネレータはthrowとcloseというコルーチン特有のメソッドを使うことができます。

yield from

yield fromPython 3.3から登場した構文で、ジェネレータを作る時のfor文をyield fromを使うことでより簡潔に書くことができます。

# for文を使う場合
def generator_func():
    for i in range(1, 3):
        yield i

# yield forを使用(上と等価)
def generator_func_2():
    yield from range(1, 3)

しかしyield fromは上の例のようにfor文のシンタックスシュガーとして機能するだけではありません。
yield from を使うことでジェネレータの入れ子を簡単に実現することができ、ジェネレータオブジェクトをそのまま結合したようなふるまいをさせることができます。

def generator_A():
    for n in 'ABCDE':
        yield n

def generator_B():
    for n in '12345678':
        yield n

def generator_all():
    # 他のジェネレータ関数で作られたジェネレータオブジェクトが
    # yield した値がそのままyieldされる
    yield from generator_A()
    yield from generator_B()

print(list(generator_A())) # ['A', 'B', 'C', 'D', 'E']

print(list(generator_B())) # ['1', '2', '3', '4', '5', '6', '7', '8']

print(list(generator_all())) # ['A', 'B', 'C', 'D', 'E', '1', '2', '3', '4', '5', '6', '7', '8']

ジェネレータ式

ジェネレータ式とは簡単なジェネレータを作る時に使うことのできる書き方で、yieldを使用することなくジェネレータを作成できます。

# 普通のジェネレータの生成方法
def sample_generator():
    for x in range(5):
        yield x * 2
# ジェネレータ式を使った場合
generator_formula = (x * 2 for x in range(5))

print(next(generator_formula)) # 0
print(next(generator_formula)) # 2

ジェネレータ式はよくリスト内包表記と比較されます。ジェネレータ式はジェネレータイテレータを生成するのでリストとは違い、必要に応じてだけ値を算出するので、無限長ストリームや膨大なデータを返すようなイテレータを扱うことができます。

# https://docs.python.org/ja/3/howto/functional.html#generator-expressions-and-list-comprehensions

# 大規模データ
line_list = ['  line 1\n', 'line 2  \n', ...]

# ジェネレータ式
stripped_iter = (line.strip() for line in line_list)

# リスト内包表記
stripped_list = [line.strip() for line in line_list]

使い所

ここから本格的に使い方について紹介していきます。

大容量データの処理

すでにジェネレータ式で大容量のテキストデータを扱う例を書きましたが、よく使われるのでもう一度例をあげておきます。

# 大容量のファイルの場合以下はMemoryErrorが発生する場合がある。
def csv_reader(file_name):
    file = open(file_name)
    result = file.read().split("\n")
    return result

# ジェネレータ関数での書き換えを行い一行ずつ処理することができる
def csv_reader(file_name):
    for row in open(file_name, "r"):
        yield row

# ジェネレータ式の場合
csv_gen = (row for row in open(file_name))

無限のデータストリームの生成

あまり使うことはないかもしれませんが、無限長のデータストリームを生成することができます。 以下の例では理論的には全ての偶数を生成することができます。

def all_even():
    n = 0
    while True:
        yield n
        n += 2

処理のパイプライン化

複数のジェネレータを使って処理をパイプライン化することができます。よくわからないと思うので具体的な例を示します。

フィボナッチ数列を生成するジェネレータと数字を2乗するための別のジェネレータがあるとします。 フィボナッチ数列の二乗和を求めたい場合は、ジェネレータ関数の処理をパイプライン化することで、以下のように求めることができます。

def fibonacci_numbers(nums):
    x, y = 0, 1
    for _ in range(nums):
        x, y = y, x+y
        yield x

def square(nums):
    for num in nums:
        yield num**2

print(sum(square(fibonacci_numbers(15)))) # 602070

パイプライン化することによってきちんと処理が明示的に分割された可読性の高いコードが書くことができます。

再帰処理

ジェネレータはリスト全体を保持することなく、逐次処理ができるので、再帰処理にも利用されます。
以下の例はジェネレータを再帰的に使用し、木の走査を行う例です。

class Node:
    def __init__(self,val):
        self.val = val
        self.left = None
        self.right = None

# ジェネレータを使った木の走査
def inorder(node):
    if node:
        for x in inorder(node.left):
            yield x
        yield t.val
        for x in inorder(node.right):
            yield x

yield fromを使用するとより簡潔になります。

def inorder(node):
    if node:
        yield from inorder(node.left):
        yield t.val
        yield from inorder(node.right):

終わりに

コルーチンやら、非同期やらジェネレータとは関係なさそうな言葉が色々出てきました。言葉の定義の境目があいまいなものが多かったので、正確でない部分があるかもしれません。間違っていましたらフィードバックお願いします。 今回出てきた非同期ジェネレータなど非同期処理に関する内容が出てきたので、また自分の知識の整理のためにも非同期についての記事を書こうと思います。

参照一覧

https://www.lifewithpython.com/2015/11/python-create-iterator-protocol-class.html
https://docs.python.org/ja/3/howto/functional.html#generator-expressions-and-list-comprehensions
https://postd.cc/python-generators-coroutines-native-coroutines-and-async-await/
https://qiita.com/koshigoe/items/054383a89bd51d099f10
https://qiita.com/keitakurita/items/5a31b902db6adfa45a70#%E5%86%8D%E5%B8%B0
https://www.programiz.com/python-programming/generator

AWS ソリューションアーキテクト-プロフェッショナルに合格したのでその勉強方法まとめ

f:id:okiyasi:20210218150304p:plain

AWS ソリューションアーキテクト-プロフェッショナルに合格したので備忘録として勉強方法をまとめます。
(※勉強にかまけてブログの記事の作成をサボってました)

私の経歴

社会人3年目のWebエンジニアで、フロントからサーバーサイドからインフラまでなんでもやりますエンジニアやってます。

  • サーバーサイドはもうすぐ3年
  • フロントエンドは2年弱ぐらい
  • インフラは基本AWSを使っていて、業務でも設計・運用をやっている。(運用歴は1年ちょっと)
  • AWS Solution Architect Associateは1年半ぐらい前に取得済み
  • 主要サービスは大体使ってる(EC2はあんまり使ってない)

試験の概用

AWSの認定試験の中で一番難しいと言われてる試験で実際結構難しかったです。
試験時間は180分 試験の料金は3万円です。 いつでも何回でも受けるのは良いんですが、試験の料金が高いので、しっかり準備してやらないと3万吹っ飛ぶという、金銭的なリスクがあります。
※記事を書いてて気づいたのですが、そういえばアソシエイト合格の時に50%のバウチャーもらってたのに完全に使うの忘れてました。(死)

最近では家でも試験が受けられるようになったらしいですが、自分は子供に邪魔される未来しか見えなかったので素直にテストセンターで受けました。
テストセンターの中でも遠隔でオンライン監督がみるやつは自分はアソシエイトの時の苦い経験があるので(英語は問題なかったのですが、会場のカメラのピントが合わなくて証明書がちゃんと見れないとかなんとかで20分ぐらい、カメラにパスポートを掲げてました)、 テストセンターの受付で身分証明書を見てくれる場所で受験しました。

私の試験結果

f:id:okiyasi:20210218145006p:plain
750点で合格で873点取れてたので、割と安全に受かったぽいです。
また5つの分野で問題が構成されていて、一応全部の分野が合格ラインでした。
でも試験中は受かった確信は全くなかったです。

勉強期間について

勉強時間は4ヶ月半ぐらいで 基本的に3~5時間ほど土日に勉強して、平日は気が向いたらblackbeltの動画を視聴してました。
トータルの勉強時間は130時間ほどじゃないかと思います。
※途中、なんでAWSばっかり勉強してるんだと虚無を感じて半月ぐらい何もしてなかった時期もありました。。

効率の良い人は2ヶ月半から3ヶ月ぐらいでも受かるかと思います。

勉強する意義

普段からAWSを運用している人からしたら、勉強する過程で実際の運用におけるよくあるハマりどころが知れたり。(VPC内に配置したLambdaはそのままだとインターネットを介して通信できないとか ※勉強する前にハマってたけどね)
もっとこのサービスに置き換えればコスト抑えられたり、効率的じゃんみたいなことが発見できたので割と個人的には有意義だったんではないかと思います。

もちろん、資格を取ることによって報奨金もらえたりする会社もあるので(前職はそうだったけど、今の会社はもらえない笑)勉強する意義は人それぞれありそうです。

試験の難しさについて

この試験には一般の試験とは違う、独特な難しさがあります。

  • まとまった情報が少ない(後述の本が出たので比較的マシになりました)
  • 単純に勉強範囲が大きすぎるので使ったことのないサービスは細かいとこまで覚えてないし、マイナーサービスは調べてもそんなに情報が出てこない
  • 選択肢がどれもほぼ正解だが、その中からベストを選ばないといけないという正解の確信がない問題が多い
  • 問題文・選択肢が割と長文で、ちょっと日本語がおかしい問題が75問もあり、3時間ギリギリかかる
  • 複数選択肢選ぶ時に「または」の選択肢を選べば良いのか「かつ」の選択肢を選べば良いのかわかりにくい

というような、試験中に問題を解いていても「本当に受かるかな」と不安になるような感じでした。
確実に取れてるだろうと思った問題は75問中20問ぐらいです。 最近の他の合格記見てても同じ感じなのでみんな同じぐらいだと思います。

おすすめの勉強方法

まずはこちらのトレーニングでざっくりとした試験の全体像を掴みます。

AWS training and certification

そして次に、去年出たこの有名な本を解いていきます。 www.amazon.co.jp

この本は割と解説がしっかりしてるのでわかりやすいのですが、メジャーだけどあまり使ったことのないサービス(Glue,Config,System Manager等) の全体像は掴みづらいので
下のBlackBeltのサービス別資料の動画などを見て理解して、自分なりにそれぞれのサービスについてのまとめを作っていきました。

aws.amazon.com

そして本の後ろの模擬試験を解きます。 問題の演習量が少ないと思ったら、Udemyの問題集を課金して解きます。(一回目の点数は大体50%ぐらいでした。。) 自分は通しで解くのは見直しが大変すぎて嫌になったので15問ずつぐらいで区切って解いてました。

www.udemy.com

基本的には問題を解きながら気になったところを調べて、ホワイトペーパーだったりクラスメソッドさんのDevelopers.IOの記事をまとめながら理解する作業を繰り返します。(マイナーサービスはホワイトペーパーの訳がとてもひどく全然頭に入ってこないので、クラスメソッドさんが救世主でした。)
udemyやって忘れたころに本の模擬試験やると良い感じでした。(本の問題の方が本番に生きる問題は多かったように感じます。)

本の模擬試験やudemyの練習問題を2回目解いて大体75~90%を取れるようになったらほぼ受かるのではないかと思います。(ただ、問題集に出てくる問題で本番とほぼ同じような問題は20問ぐらいしか出てこなかった印象なので油断は禁物。)

試験の解き方

個人的には以下の解き方でやっていきました。

  1. まず、問題文を割としっかりに読む。問題文の最後の方にどのような選択肢を選べば良いのか書いてあるのでそこに注目
    可用性なのか、コストなのか、スピードなのか
  2. 全ての選択肢をざっくり読んで違いを見つける 単純に使用しているサービスが違うのか、どこに注目
  3. なんとなく合ってそうな選択肢を一つ読んでみて、内容を理解する
  4. 選択肢を比較しながら問題文に合った選択肢を選択する
  5. 書いてる内容が複雑でわからない問題や日本語がとても怪しい問題はもらえる紙に番号だけ書いてあとで見直せるようにしておく
  6. 25問50分のペースで解いていき、30分ほど余らせる。
  7. わからなかった問題で日本語が怪しい問題を英語で見てみる (環境によっては切り替えにやたらと時間がかかる場合があるので一通り解いて見直しの時にやる方がおすすめ)
    自分の時は日本語訳がひどいやつが2問あって、英語でみたら一瞬で解けたりしました(選択肢1の訳になぜか選択肢2の訳が混じってたりしました笑)

合格したら

この記事の一番上にあるAWSのバッチがもらえます。特に使えるところもないのでこの記事に貼らせていただきました。 あと、また50%offバウチャーもらえました。

最後に・あとがき

AWS認定プログラムの規約で試験の内容についてはあんまり言ってはいけないので、突っ込んだ話はできなかったですが、以上で自分の体験談は終わりです。
次はAWSみたいな1サービスの勉強じゃなくて、もっと汎用的なものの勉強をしたいので、IPAデータベーススペシャリストを取ろうと思いますが、如何せんまだ8ヶ月あるので、 AWS機械学習スペシャリストでも取ろうかなと思ってます(一応AI系のサービス作ってるし、50%offバウチャー2枚余ってるので)


参考

なんとなく問題集を解きながらAWSサービスの構造を理解する上でわかりやすかったドキュメントたちを置いていきます。

マルチアカウント

https://d0.awsstatic.com/events/jp/2017/summit/slide/D4T2-2.pdf
「組織の複雑さに対応する設計」でマルチアカウントの問題はよく出てくるし、おそらくそこまで大規模組織の運用を経験したことある人はいないと思うのでこれは一通り目を通した方が良いと思います。

APIGatewayのREST APIとHTTP APIの違い

APIGatewayはREST APIしかなかった頃に触ったことある程度で、RESTってあのRESTだよな?と思いながら一体何が違うねんとなったのでこの記事ありがたかったです。(名称が機能と一致してない気が。。)
最近はHTTP APIも機能が強化されてるみたいで、HTTP APIを使えることが多くなったそうですね。
Amazon API Gatewayは「HTTP API」と「REST API」のどちらを選択すれば良いのか? #reinvent | DevelopersIO

DynamoDBのグローバルセカンダリインデックスとローカルセカンダリインデックス

Dynamoの基本です。でも、理解しにくいのでとても参考になりました DynamoDBの概要 - Qiita

AWS公式

自分はEC2じゃなくてECSのFargateでばっかり運用してるのでEC2の運用をサポートしてるサービスはあんまり触れてなかったし、System Managerは機能が多すぎてわけわからないので役立ちました。
BlackBelt AWS Config
20190618 AWS Black Belt Online Seminar AWS Config
BlackBelt AWS System Manager
https://www.youtube.com/watch?v=UXSbh4Wsp7c&feature=youtu.be
サポートされないVPCピアリング(マイナー問題ですが、問題集で図なしで解説されても全くなんのこっちゃわからなかったのでこの図で理解しました。)
サポートされていない VPC ピア接続設定 - Amazon Virtual Private Cloud
あとはAI系のサービスやConnectとかAppStream2.0とかほぼ使わないマイナー系のサービスはまじで印象なさすぎて覚えられないのでBlackBeltの動画をみて無理矢理頭の中に入れました。

Blobって一体何者?使い方まとめ(JavaScript/TypeScript)

フロントでファイルを扱おうとして、JavaScriptやTypescriptを書いているとnew Blobしたり型でBlobを書いたりすることが必要になったりするのですが、このBlobについてあまりよくついて知らないなと思ったので今回調べてみてみることにしました。

Blobとは

BlobとはBinary Large OBjectの略で、単にバイナリデータの塊を表現したもので、ウェブブラウザ(WEB API)ではデータを保持する役割を担うBlobクラスを実装しています。
BlobにはWEB APIFileが継承されていてプロパティにはデータサイズやMIMEタイプを持っています。

ちなみに他にもJavaScriptにはバイナリデータを扱うクラスが用意されていてArrayBuffer / TypedArrays(型付き配列)などがありますが、ArrayBuffer / TypedArraysは主に直接操作できるバイナリデータを扱うのに用いられるのに対し、Blobはイミュータブル(不変)なバイナリデータを扱います。
そのためこのBlobのバイナリデータは、File APIを介してのみアクセスされることが想定されています。

Blobの作成・Blobのコンテンツの読み込み

new Blob(source, option)でBlobオブジェクトを生成します。
sourceにはテキストやバイナリのデータを指定することができ、optionにはMIMEタイプを指定します。

stackblitz.com

Blobの読み込みにはFileReaderを用います。FileReaderはFile APIの機能の一つです。(File APIHTML5の機能) FileReaderを使用することで、ユーザーのPCに保存されているファイル(またはデータバッファ)のコンテンツを非同期に読み取ることができます。 ここでは読み込みで使用していますが、ファイルのアップロードでも使用可能です。

Blob URL Scheme

バイナリデータを保持するURLの一種で、BlobからBlob URL Schemeに変換することが可能です。
URL.createObjectURL()を使用することで簡単に変換できます。
Blob URL Schemeにおいてバイナリデータ自身はBlob URL Schemeの文字列に埋め込まれているのではなく、ブラウザで保持されています。
そのため、バイナリデータをHTMLへ直接埋め込むようなことはできませんが、大容量のバイナリデータを扱うことが可能です。

用途としてはURLなのでaタグに指定してファイルをダウンロードさせたり、imgタグに指定して画像表示したりすることができます。以下がその例です。

stackblitz.com

blob:https://js-thmf3i.stackblitz.io/a3f2610b-3462-404f-af65-471f6dc73743

Blob URL Schemeでは先頭にblob:がついていて、https://js-thmf3i.stackblitz.io/の後の文字列部分にはUUIDのようなものしか記述されていないことが分かります。

比較対象としてのData URL Scheme(おまけ)

よくBlob URL Schemeと比較されるのがData URL Schemeです。
blob:ではなくdata:が文字列の先頭に付きます。
こちらはBlob URL Schemeと同様にimgタグのsrcに指定することが可能ですが、Blob URL Schemeと違い直接バイナリデータをHTMLに埋め込むことが可能です。
バイナリデータをBase64の変換方式で文字列に変換しています。 そうすることによって、通常別データに別れている画像などのデータを一度の通信で取得できるメリットがあります。
しかし、

  • キャッシュされないためリロードするたび画像データを読み直さないといけない
  • 100Mを超えるようなサイズだと、非常に処理が重たい
  • HTMLにとても長い文字列を記述しないといけない

などの結構なデメリットがあり、あまり使われてはいないようです。

以下がData URL Schemeの例です。

<!-- 赤い小さな点の画像を表示するだけのData URL Scheme -->
<img src="
ANSUhEUgAAAAUAAAAFCAYAAACNbyblAAAAHElEQVQI12P4
//8/w38GIAXDIBKE0DHxgljNBAAO9TXL0Y4OHwAAAABJRU
5ErkJggg==" alt="Red dot" />

終わりに

今回はBlobについて少し掘り下げてみました。バイナリデータの扱いに関しては普段コードを書いていてそこまで意識することがないので、また記事を書いて深堀りしていきたいです。

参考

Blob - Web API | MDN

What are Blobs used for in JavaScript? | by <Andrew Rymaruk /> | JavaScript In Plain English | Medium

ワクガンス | JavaScriptによるファイルとバイナリデータの扱い

Data URI scheme - Wikipedia

Pythonにおける並行・並列処理について調べてみた

f:id:okiyasi:20200922031123p:plain 開発をしていると要求されている処理時間より時間がかかってしまうことがあり、処理を高速化しないといけない場面に遭遇すると思います。

その場合は、まずボトルネックとなっている処理を探し、ボトルネックとなっている場所のアルゴリズムやデータ構造の改善やライブラリの使用による処理の代替を行います。

しかし、それだけでは高速化できななかったり、もっと高速化しないと要件を満たせないことがあります。その時に一つの高速化の手段として並列処理や並行処理があります。 今回はその並列処理・並行処理について解説していきたいと思います。

並行処理と並列処理とは

まずは、並行処理と並列処理の定義です。

並行処理と並列処理は似たような言葉ですが違いがあります。細かな定義は人によってまちまちですが、以下のような違いがあります。

並行処理

システムが複数の動作を同時に実行状態に保てる状態にあること
わかりにくいので端的に言うと、2 つ以上のタスクが存在する時に、複数のタスク処理を高速に切り替えてあたかも同時に処理されているかのように見せることです。

並列処理

2 つ以上のタスクが存在する時、それらを実際に同時に処理すること。


これらの並列処理や並行処理を利用して、プログラムを高速化しますが、ここでボトルネックなっている処理の種類が重要となってきます。

ボトルネックとなる処理の種類

ボトルネックとなる処理は主に以下の2つの種類に分けることができます。

CPUバウンドな処理

数値計算のようにCPUに負荷をかけるようなような処理のことで、処理速度がCPUの計算速度に依存する処理です。

I/Oバウンドな処理

ファイルの読み書き、ネットワーク通信、DBへの接続などの処理のことで、ディスクの読み出しなど、主にCPUとは関係ないI/O部分に負荷がかかる処理のことです。


CPUバウンドな処理は並列処理により、複数のタスクを同時に処理することによって高速化することができます。(※並行処理では高速化できません。)

一方で、I/Oバウンドな処理はCPUはディスクの処理(I/O処理)が終わるまで待ち状態になります。そこでI/Oバウンドな処理では、並行処理により、その待ち時間の間にCPUが他のタスクをこなすことで高速化を実現します。(もちろん並列処理でも高速化できます。)

並列処理・並行処理を行う際には複数のタスク処理を実行する・実行できる状態にすることが必要になりますが、このタスク処理を行ってくれるものについて理解する必要があります。それがプロセススレッドです。

プロセスとスレッド

プロセス

プロセスとは、OSが実行しているプログラム(のインスタンス)のことです。

スレッド

スレッドとは、プロセス内の「実行単位」のことで、このスレッドを使用して、CPUのコアに命令を与えて計算処理を行っています。


おそらく、ざっくり過ぎてなんのことを言っているのかわからないかもしれないので、詳しくは こちらの記事を読んでいただけると幸いです。

このスレッドやプロセスを複数用意し、並列的に同時に動かすことによって、並列処理を実現し高速化します。

GoやJavaなどの言語ではメモリの節約の観点などから、プロセスよりも、スレッドを複数用意して並列処理を行うことで高速化を実現するのですが、Pythonの場合は少し勝手が違います。

Pythonにおけるマルチスレッド

マルチスレッドで処理を行う場合は基本的にスレッドセーフである必要があります。スレッドセーフとは複数のスレッドを並列的に使用しても問題が発生しないことを意味します。

具体的にはスレッドが複数存在していても共有しているデータに対してが一度に1つのスレッドのみがアクセスするようにしておくことで、処理中に他のスレッドにデータを上書きされたりするのを防ぎます。

しかし、Python(正確にはPythonの中でC言語で実装されている部分CPython)はスレッドセーフではありません

そこでPythonGIL(Global Interpreter lock)という排他ロックを使用することによってこの問題を回避しています。GILにより、Pythonインタプリタのプロセスは1スレッドしか実行できません。

つまり1つのプロセスに複数スレッドが存在してもロックを持つ単一スレッドでしかコードが実行できずに、その他のスレッドは待機状態になります。

そのためPythonにおけるマルチスレッドでは並列処理ではなく並行処理になってしまうため、CPUバウンドな処理ではマルチスレッドでは高速化が期待できません。むしろロックの切り替えのために遅くなる場合があります。

CPUバウンドな処理での速度の比較

本当にCPUバウンドな処理でマルチスレッドで処理を行っても速度が変わらないのか検証するために、大きな数値に対してfizz_buzzを行うCPUバウンドな処理のサンプルプログラムを用意します。 リストの中の5つの大きな値について1つずつ逐次的に、1からその数値までfizz_buzzしていき、最後にどのくらいの時間がかかったのかを出力しています。

逐次実行のサンプルプログラム

用意した5つ大きな数字に対してfizz_buzzを逐次的に行っています。

import time

def fizz_buzz(num: int):
    result_list = []
    for i in range(1, num + 1):
        result = ''
        if i % 3 == 0:
            result += 'fizz'
        if i % 5 == 0:
            result += 'buzz'
        if not result:
            result = str(i)
        result_list.append(result)
    return result_list


start = time.time()
num_list = [22000000, 19000000, 25000000, 24500000, 21300000]
for n in num_list:
    fizz_buzz(n)
stop = time.time()
print(f'Sequential processing: {stop - start:.3f} seconds')

実行結果
f:id:okiyasi:20200922005229p:plain

マルチスレッドによる実装

マルチスレッドを実現するための主にthread・threading・concurrent.futuresという3つのPythonのモジュールがあります。 基本的にはPython3ではthreadingかconcurrent.futuresを使います。今回は単純にマルチスレッドを実現したいだけなので一旦threadingを使用します。

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, num):
        super().__init__()
        self.__num = num

    def fizz_buzz(self, num: int):
        result_list = []
        for i in range(1, num + 1):
            result = ''
            if i % 3 == 0:
                result += 'fizz'
            if i % 5 == 0:
                result += 'buzz'
            if not result:
                result = str(i)
            result_list.append(result)
        return result_list

    def run(self):
        self.fizz_buzz(self.__num)


start = time.time()
threads = []
num_list = [22000000, 19000000, 25000000, 24500000, 21300000]
for n in num_list:
    thread = MyThread(n)
    thread.start()
    threads.append(thread)
for th in threads:
    th.join()
stop = time.time()
print(f'multi threads: {stop - start:.3f} seconds')

実行結果
f:id:okiyasi:20200922005357p:plain

全然早くなっていません。 むしろ、逐次的に実行した時よりも少しマルチスレッドで実装した場合の方がロックの切り替え分、2秒程度時間がかかっているのがわかります。

マルチプロセスによる実装

マルチプロセスにはGIL制約などは存在しないので、並行処理ではなく並列処理になります。 マルチプロセスにはmultiprocessingと先ほどスレッドでも登場したconcurrent.futuresのどちらかのモジュールを使用しますが、今回はmultiprocessingを使用します。

from multiprocessing import Process
import time

class MyProcessor(Process):

    def __init__(self, num):
        super().__init__()
        self.__num = num

    def fizz_buzz(self, num: int):
        result_list = []
        for i in range(1, num + 1):
            result = ''
            if i % 3 == 0:
                result += 'fizz'
            if i % 5 == 0:
                result += 'buzz'
            if not result:
                result = str(i)
            result_list.append(result)
        return result_list

    def run(self):
        self.fizz_buzz(self.__num)


start = time.time()
processes = []
num_list = [22000000, 19000000, 25000000, 24500000, 21300000]
for n in num_list:
    process = MyProcessor(n)
    process.start()
    processes.append(process)
for p in processes:
    p.join()
stop = time.time()
print(f'multi process: {stop - start:.3f} seconds')

実行結果
f:id:okiyasi:20200922005430p:plain

スレッドや逐次的に処理した場合よりもかなり早くなっていることがわかります。 CPUバウンドな処理に関してはマルチプロセスを使用することで高速化できることがわかりました。

I/Oバウンドな処理の速度比較

今度はI/Oバウンドな処理に対して速度を比較していきます。

逐次実行のサンプルプログラム

サンプルプログラムとしてwebページからコンテンツをダウンロードするプログラムを用意します。

import requests
import time

def download_site(url, session):
    with session.get(url) as response:
        count_list = []
        for i in range(len(response.content)):
            count_list.append(i)


def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)


sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start = time.time()
download_all_sites(sites)
stop = time.time()
print(f"Sequential processing: {stop - start:.3f} seconds")

実行結果
f:id:okiyasi:20200922023123p:plain

マルチスレッドによる実装

前回はCPUバウンドの時はthreadingを使用しましたが、今回はconcurrent.futuresを使用します。(そこまで変わらないと思いますがconcurrent.futuresの方がメジャーらしいです) ThreadPoolというスレッド数を一定に保ちながらスレッドを使い回してくれる機能を使います。スレッド数は5を指定しています。

import threading
import concurrent.futures
import requests
import time

thread_local = threading.local()


def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    session = get_session()
    with session.get(url) as response:
        len(response.content)


def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_site, sites)


sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start = time.time()
download_all_sites(sites)
stop = time.time()
print(f"multi threads: {stop - start:.3f} seconds")

実行結果
f:id:okiyasi:20200922023106p:plain

今回はCPUバウンドの処理とは違い、格段に処理が高速化されています。スレッド数分だけ約5倍程度早くなっています。

これはI/O処理自体はGILの制約を受けないので処理が並列化されていて、尚且つCPU処理も並行化によってI/O待ち時間に次のCPU処理が行われているため早くなっています。(主にI/O処理の並列化が寄与しています)

マルチプロセスによる実装

マルチスレッド時のスレッド数と揃えるためにプロセスの数を5にしています。デフォルトではos.cup_count()の数だけProcessが起動されます。

import requests
import time
import multiprocessing

session = None

def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(url):
    with session.get(url) as response:
        multiprocessing.current_process().name
        len(response.content)


def download_all_sites(sites):
    with multiprocessing.Pool(processes=5, initializer=set_global_session) as pool:
        pool.map(download_site, sites)


sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start = time.time()
download_all_sites(sites)
stop = time.time()
print(f"multi process: {stop - start:.3f} seconds")

実行結果
f:id:okiyasi:20200922030125p:plain

マルチスレッドの時よりも速度がさらに早いという結果になりました。 これは単純にプロセスが増えたことによってCPU処理もI/O処理も両方とも並列化されるために、マルチスレッドの時よりCPU処理時間分だけ早くなったと予想されます。(スレッド数とプロセス数を同じにして比較することがナンセンス感ありますが)

マルチプロセス・マルチスレッドの問題点

今回はPythonにおける並列処理・並行処理を実現するマルチプロセス・マルチスレッドについてみていきました。

しかしマルチスレッドやマルチプロセスについても問題点があります。

それは、プロセス数やスレッド数が増えすぎると、メモリを食いつぶしたり、コンテキストスイッチするコストが増大してサーバがパンクしてしまうという問題です。(いわゆるC10K問題)

そのため、マルチスレッドやマルチプロセスを使わずに、シングルスレッドでも多くの処理を捌く必要が出てきました。

そこで登場してきたのが非同期処理です。非同期処理とはタスクを止めず(ブロックせず)に、別のタスクを実行する手法のことで、非同期処理によってより多くのI/O処理を捌くことが可能になりました。Pythonにおいてはasyncioが非同期処理モジュールとして有名です。

今回は非同期処理については書きませんが、近々、非同期処理についての記事を書こうと思います。

参考

実践Python
【図解】CPUのコアとスレッドとプロセスの違い・関係性、同時マルチスレッディング、コンテキストスイッチについて
Speed Up Your Python Program With Concurrency
Pythonをとりまく並行/非同期の話

デコレータで例外処理を共通化する[Python]

業務でSaas機械学習サービスを開発しているのですが、そこである決済サービスを使用しています。

その決済サービスのライブラリ(Python)を利用していると、ライブラリで定義されたExceptionが返ってくるのですが、

ライブラリ定義のExceptionに対して、同じような例外処理を業務コードのあちこちに書かないといけなく、とても冗長でした。

そこでPythonデコレータを使って例外処理を共通化したので、今回はそのことについての記事を少し書きます。

決済サービスの話はマイナーケースだとは思いますが、サービス開発する上での例外処理を共通化したい場面はあると思うので参考になれば幸いです。

例外処理を共通化をしない場合

Pythonで普通に例外処理を書こうとするとこのようになると思います。

ログ処理やAPIでHttpExceptionを投げないといけない時などはかなり冗長になってしまいます。

例外処理だけでなくtryの中の処理もそれぞれ同じ処理内容だとメソッドを切って共通化するだけで終わりますが、

例外処理だけ共通化したい場合は工夫が必要です。

def pay(product: Product):
    try:
        ...
    except HogeHogeException as e
        ...
    except FugaFugaException as e
        ...
    except Exception as e:
        logging.critical(e)
    ...
    return ...


def subscribe_to(product: Product):
    try:
        ...
    except HogeHogeException as e
        ...
    except FugaFugaException as e
        ...
    except Exception as e:
        logging.critical(e)
    ...
    return ...

デコレータを使って例外処理を共通化した場合

デコレータを使うと上のコードを以下のように記述することができます。

例外処理を一箇所にまとめて切り出すことができました。例外処理に修正が入る場合も簡単に修正できそうです。

def payment_exception(func):
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except HogeHogeException as e
            ...
        except FugaFugaException as e
            ...
        except Exception as e:
            ...

    return wrapper


@payment_exception
def pay(product: Product):
    ...
    return ...


@payment_exception
def subscribe_to(product: Product):
    ...
    return ...

ちなみに、デコレータとは引数として関数を受け取り、別の関数を返す関数のことです。

デコレータはシンタックスシュガー@payment_exceptionと簡単に書くことができていますが

pay = payment_exception(pay)と動作的に等価です。

終わりに

今回はデコレータによる例外処理の共通化を行いましたが、例外処理だけでなく、ロギングなどいろいろな処理の共通化できるので試してみてください。

サクッとWSGI・ASGIに触れてみる

普段FlaskやFastAPIなどのpythonフレームワークを使っている方は、起動時のメッセージやエラーメッセージなどでWSGIやASGIという言葉をよく目にすることがあると思います。でもフレームワークを使っているだけではWSGIやASGIについてあまり意識する必要はありません。私も言葉だけ知ってるだけで何者かよく知らない状態だったので、今回調べてことにしました。

WSGIとは

WSGIとはWeb Server Gateway Interfaceの略で、Pythonにおいて、WebサーバとWebアプリケーションが通信するための、標準化されたインタフェース定義のことです。アプリケーション(またはフレームワークやツールキット)がWSGI仕様で書かれていれば、WSGI をサポートするサーバ(gunicornなど)上であればどこでも動作させることができます。
サーバ側とアプリケーション側の両方のインタフェース定義については、PEP 3333で規定されています。

WSGIに沿ったアプリケーションの実装

よりWSGIを理解するためにWSGIアプリケーションを実装したいと思います。 WSGIアプリケーションは、呼び出し可能オブジェクトとして実装されます。(関数・クラス・object.call()メソッドを持つインスタンスなどが呼び出し可能オブジェクトです。) これはリクエストを受けてレスポンスを返す単一の同期呼び出し可能なもので、非同期処理やWebSocketのようなプロトコルは許可されていません。

WSGIアプリケーションについて以下のことが定義されてます。

  1. 次の2つの引数を持つ
    • 環境変数を含む辞書(environ)
    • HTTP ステータスコード/メッセージと HTTP ヘッダをサーバに送信するためにアプリケーションが使用するコールバック関数(start_response)
  2. ResposeBodyをイテレータブルな文字列としてサーバに返す

上の定義にしたがって、環境変数(environ)を羅列したtextを返すだけのWSGIアプリケーションを簡単に実装してみました。

#app.py
from wsgiref.simple_server import make_server

# WSGIアプリケーションを作成(今回は関数)
# 環境変数environとコールバック関数start_responseの二つの引数を持つ
def application(environ, start_response):

    response_body = [
        '%s: %s' % (key, value) for key, value in sorted(environ.items())
    ]
    response_body = '\n'.join(response_body)

    status = '200 OK'
    response_headers = [
        ('Content-Type', 'text/plain'),
        ('Content-Length', str(len(response_body)))
    ]
    start_response(status, response_headers)

    return [response_body.encode('utf-8')]

# applicationへのコネクションを受け付けるWSGIサーバを作成。戻り値はserver_classのインスタンス
httpd = make_server(
    'localhost', # ホストネーム
    8051, # ポート番号
    application # 上で実装しているapplication classを使ってリクエストを処理
)

# 一つのリクエストだけ処理します。killするまでずっと立たせる場合はserve_foreverを使います。
httpd.handle_request()

作成したapp.pyを実行して Postmanなどを利用してhttp://localhost:8051/にGETリクエストを送信してみます。
以下のような環境変数が羅列されたテキストが返ってきたら成功です。

f:id:okiyasi:20200809230046p:plain

Pythonの標準モジュールであるwsgirefを使用してWEBサーバを立てましたが、WSGIに則っているのでもちろんgunicornなどを使ってもアプリケーションを動作させることができます。 より本格的なフレームワークやアプリケーションを作成したい場合にはもっと細かい仕様について理解する必要があります。詳しくはPEP 3333をご覧ください。

ASGIとは

ASGIとはAsynchronous Server Gateway Interfaceの略です。WSGIの精神的な後継仕様であり、asyncioを介して非同期で動作するように設計されていて、またWebSocketなど複数のプロトコルをサポートしています。 Djangoの開発組織がASGIのドキュメントを管理しており、2020年8月現在、ASGIはまだPEP化はされてないようです。 DjangoやFastapiがASGIに対応しており、非同期処理を簡単に実装できます。

ASGIに沿ったアプリケーションの実装

WSGIと同じようにASGIアプリケーションを実装していきます。

ASGIアプリケーションでは以下のことが定義されています。

  1. 次の3つの引数を持つ
    • 受信したリクエストに関する情報を含む辞書(scope)
    • ASGI イベントメッセージを受信するために使用される非同期関数(receive)
    • ASGI イベントメッセージを送信するために使用する非同期関数(send)

上の定義にしたがって、実装していきます。 ここでは時間を測るTimingMiddlewareというものでappをラップしています。 また、starletteというASGI frameworkを使用します。starletteを使うことで簡潔にASGIアプリケーションを実装することができます。

# app.py
import asyncio
from starlette.responses import PlainTextResponse
import time


class TimingMiddleware:
    def __init__(self, application):
        self.application = application

    # ちゃんとsleepされているか検証
    async def __call__(self, scope, receive, send):
        start_time = time.time()
        await self.application(scope, receive, send)
        end_time = time.time()
        print(f"Took {end_time - start_time:.2f} seconds")


async def application(scope, receive, send):
    await asyncio.sleep(3)
    response_body = [
        '%s: %s' % (key, value) for key, value in sorted(scope.items())
    ]
    response_body = '\n'.join(response_body)
    response = PlainTextResponse(response_body)
    await response(scope, receive, send)


application = TimingMiddleware(application)

今回はASGIサーバとしてuvicornを使用します。uvicornをインストールしてapp.pyがあるディレクトリでuvicorn app:applicationを実行します。するとASGIサーバーが立ち上がりASGIアプリケーションが動作します。
WSGIの場合と同様にPostmanを使ってhttp://localhost:8000/GETリクエストを送信します。3秒待った後にtextが返ってきたら成功です。

f:id:okiyasi:20200810181927p:plain
コンソールには次のように出力されているはずです。
f:id:okiyasi:20200810182638p:plain:w600

ASGIについてより詳しい仕様を知りたい方は公式ドキュメンテーションをご覧ください。

まとめ

今回はWSGIとASGIの簡単なアプリケーションを実装しました。 いつもはあまり意識しないWSGIとASGIについて少しは知れたかと思います。Django version3になってASGIに対応したり、Fastapiが少しずつ人気になってきてたりフレームワークのASGIへの移行はトレンドになっているので今後も注目していきたいですね。

参考

wsgiref --- WSGI ユーティリティとリファレンス実装 — Python 3.9.3 ドキュメント Introduction — WSGI Tutorial ASGI Documentation — ASGI 3.0 documentation

Lambda+Aurora serverless+WebhookでTeamsに毎日analyticsを通知するようにしてみた

社内で新しいサービスがローンチしたのですが、どれぐらい伸びてるか毎日統計情報を取りたいとなり、LambdaでAurora Serverlessに接続しSQLを発行して分析し、その結果をWebhookでTeamsに通知するようなものを作ったので作り方を記事にまとめてみます。

以下の手順で作成していきます。

  1. Temasチャンネルにwebhook用コネクタを追加
  2. Aurora serverlessのData API設定
  3. Lambda関数作成
  4. CloudWatch Eventsをトリガーに追加
  5. チャンネルに通知

Temasチャンネルにwebhook用コネクタを追加

通知をしたいTeamsチャンネルの右上の3点ボタンをクリックし、メニューを表示して「コネクタ」を選択します。

f:id:okiyasi:20200802231435p:plain:w200
コネクタの中から「Incoming Webhook」を探し「構成」をクリックします。すると以下のような画面が出てくるので適当な名前を入力して「作成」でWebhookを作成します。
f:id:okiyasi:20200802232108p:plain:w600

作成するとこのようにWebhookのURLが表示されるのでこのURLをコピーしておきます。(後に使うのでどこかに保存しておいてください。)

f:id:okiyasi:20200802232602p:plain:w600

Aurora serverlessのData API設定

Aurora Serverlessとは(簡単に)

Aurora ServerlessはAuroraをオンデマンドで自動スケーリングしてくれるようなもので、一定時間リクエストがないときはDBインスタンスが停止状態になり、逆に負荷が高まったりした時は自動でスケーリングしてくれるので普通のAuroraに比べてとてもコストパフォーマンスが高いです。まだユーザーの少ない新規サービスや開発環境などの場合はServerlessを使うとAuroraに比べてかなりコストを抑えられます。Auroraへの移行も比較的簡単です。

Data APIとは(簡単に)

Data APIはAurora Serverless の エンドポイントとして機能するもので、Lambdaなどを使用する場合にはAurora Serverlessに直接接続せずに、Data APIを介して接続することでRDBの最大接続数の問題などを回避することができたり、VPC内にLambdaを設置しなくてもAurora Serverlessに接続できたりするのでとても便利な機能になります。

設定するには

AWSのコンソールで接続したいAurora Serverlessのクラスターを選択して「変更」を行います。変更の中の「ネットワーク & セキュリティ」という箇所があるのでその中にわかりにくく、Data APIチェックボックスがあるのでそれをオンにして、変更を適用して終わりです。設定するとシークレットマネージャーに自動で接続情報が登録されます。
MySQL Version 5.6以上・PostgreSQL Version 10.7以上と互換性のあるAuroraで使用可能です ※2020年8月時点)

f:id:okiyasi:20200803001012p:plain

Lambda関数作成

次にLambda関数を作成します。 まず通常のコンソール画面から、Lambdaデフォルトのポリシーがアタッチされたロールを付与されている新しい関数を作成します。その次にIAMロールの編集を行います。

ポリシーの作成

Data APIに接続するためのポリシーの作成を行います。
ポリシーについてはこののクラスメソッドさんの記事を参考に作成しました。 (※ちなみに2020年7月時点ではもうLayerはなくてもData APIは叩けました)
https://dev.classmethod.jp/articles/aurora-sl-dataapi-with-lambda-layer/

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "SecretsManagerDbCredentialsAccess",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue",
                "secretsmanager:PutResourcePolicy",
                "secretsmanager:PutSecretValue",
                "secretsmanager:DeleteSecret",
                "secretsmanager:DescribeSecret",
                "secretsmanager:TagResource"
            ],
            "Resource": [
                "arn:aws:secretsmanager:*:*:secret:rds-db-credentials/*",
                "--ここに自動作成されたのシークレットマネージャーのarnを記入(シークレットマネージャーのコンソールに載っている)--"
            ]
        },
        {
            "Sid": "RDSDataServiceAccess",
            "Effect": "Allow",
            "Action": [
                "dbqms:CreateFavoriteQuery",
                "dbqms:DescribeFavoriteQueries",
                "dbqms:UpdateFavoriteQuery",
                "dbqms:DeleteFavoriteQueries",
                "dbqms:GetQueryString",
                "dbqms:CreateQueryHistory",
                "dbqms:DescribeQueryHistory",
                "dbqms:UpdateQueryHistory",
                "dbqms:DeleteQueryHistory",
                "dbqms:DescribeQueryHistory",
                "rds-data:ExecuteSql",
                "rds-data:ExecuteStatement",
                "rds-data:BatchExecuteStatement",
                "rds-data:BeginTransaction",
                "rds-data:CommitTransaction",
                "rds-data:RollbackTransaction",
                "secretsmanager:CreateSecret",
                "secretsmanager:ListSecrets",
                "secretsmanager:GetRandomPassword",
                "tag:GetResources"
            ],
            "Resource": "*"
        }
    ]
}

このポリシーを作成したLambdaのロールにアタッチします。

Lambdaのコードを作成

今回はzipで外部ライブラリをLambdaに持ってくるとかの説明は省きたいので、pythonの標準ライブラリだけでできるような、ユーザーの数だけを取得して通知する簡単なコードを紹介します。

import json
import boto3
import urllib
from datetime import date

def lambda_handler(event, context):

    rdsData = boto3.client('rds-data')

    # Aurora ServerlessのクラスターのARN(RDSのコンソールから取得できる)
    cluster_arn = 'arn:aws:rds〜〜〜〜〜'
    # シークレットマネージャーのARN
    secret_arn = 'arn:aws:secretsmanager〜〜〜〜〜'

    response = rdsData.execute_statement(
                resourceArn = cluster_arn, 
                secretArn = secret_arn, 
                database = '自分の作ったデータベースの名前', 
                # ユーザーの数を取得するだけのSQL
                sql = 'select count(*) from user情報のテーブル名') 

    user_number = response['records'][0][0]['longValue']
    today = date.today().strftime('%Y年%m月%d日')
    message = f'{today}の累計ユーザー数:{user_number}人'
    
    print(message)
    
    # 1で作成したwebhookのURL
    webhook_url = 'https://outlook.office.com〜〜〜〜〜〜'

    data = {
        "text": message,
    }
    headers = {'Content-Type': 'application/json'}
    request = urllib.request.Request(
        webhook_url,
        json.dumps(data).encode("utf-8"), 
        headers
    )
    with urllib.request.urlopen(request) as response:
        response_body = response.read().decode("utf-8")

Lambdaで外部ライブラリを使ってもっと複雑なことをしたい場合はこちらの記事をお読みください。 【AWS・Lambda】Python外部ライブラリ読み込み方法 - Qiita

CloudWatch Eventsをトリガーに追加

Lambdaの画面にある「デザイナー」の中の「トリガーを追加」を選択し、トリガーを選択で「Event Bridge(CloudWatch Events)」を選択します。

f:id:okiyasi:20200803011039p:plain:w300
ルールで「新規のルールの作成」を選び名前を記入し、ルールタイプはスケジュールタイプを選択します。
f:id:okiyasi:20200803012009p:plain
スケジュール式にcron(0 0 ? * MON-FRI *)と記入することによって平日のUTC時間の午前0時0分にLambdaを実行することが可能です。 cron式については詳しくはこちらをご覧ください。 docs.aws.amazon.com 最後に「追加」を押すとトリガーの追加が完了します。

テスト

以上の作業でTeamsのチャンネルに毎日通知を行うことが可能になりました! ちゃんとできてるかUTCの午前0時0分まで待てないので一旦Lambdaのテストを使って、通知を実施してみます。今回はパラメータは使わないのでデフォルトのhello worldのテストをそのまま使ってテストをすると以下のように指定したチャンネルに通知がくると思います。

f:id:okiyasi:20200803013403p:plain