はじめに
みんな、目黒シネマへ行くんだ。特別上映版『かがみの孤城』が観られるぞ! nikkieです。
ここ数日Pythonの非同期ジェネレータが頭に引っかかっています。
難しそうな概念ですが、「ついにその時が来たか」と腹を決めて素振りしました。
※考え違いをしていたら@ftnextまでご指摘いただけると大変ありがたいです。
目次
- はじめに
- 目次
- ジェネレータ
- 非同期ジェネレータが気になりだしたきっかけ
- はじめて非同期ジェネレータ
- OpenAIのAPIに並行処理でリクエスト with 非同期ジェネレータ
- 終わりに
- ソースコード(OpenAIのAPIに並行処理でリクエスト with 非同期ジェネレータ)
ジェネレータ
ジェネレータとは、yield
を持つ関数です。
def awesome_generator(): yield "さかなー🐟" yield "ちんあなごー🙌"
ジェネレータはリストと比べるとメモリ効率がよく、私がPythonを書く上ではもう必需品です。
リストを返す関数の代わりにジェネレータで実装することを検討します。
もう少し詳しい説明が以下にあります。
過去のLTより
非同期ジェネレータが気になりだしたきっかけ
「非同期ジェネレータ」が気になり始めたきっかけは、OpenAIのAPIに3000件のリクエストを並行処理で送る実装をしたこと。
この実装をする中で、APIを呼び出す関数はレスポンス1件ずつyield
したかったのですが、
async def call_api(...): # この実装はイメージです for chunk in chunked(propmts, chunk_size): coroutines = [ openai.ChatCompletion.acreate(...) for prompt in chunk ] results = await asyncio.gather(*coroutines, return_exceptions=True) for result in results: # resultを元にresponseを組み立て yield response
慣れていないasync/awaitとyield
の組合せ方が分からず断念(経験値不足で、エラーがちょっと何言ってるかわからない状態)。
このときは実装したい事項が山積みだったので、yield
を使うのを諦め、リストに3000件全部を集めて返す実装としました。
async def call_api(...) -> list[OpenAIResponse]: # この実装はイメージです responses: list[OpenAIResponse] = [] for chunk in chunked(propmts, chunk_size): coroutines = [ openai.ChatCompletion.acreate(...) for prompt in chunk ] results = await asyncio.gather(*coroutines, return_exceptions=True) for result in results: # resultを元にresponseを組み立て responses.append(response) return responses
一通り実装を終えた後、「async
の付いた関数でyield
って非同期ジェネレータってやつだよな」と引っかかり始めます。
はじめて非同期ジェネレータ
石本さんブログの例
「非同期ジェネレータ」で検索すると、ぁっぉさん1のブログを発見。
私が初めて触ったPythonは3.6なのですが、そこで非同期ジェネレータもサポートされていたのか!
写経して動かします(Python 3.10.9)。
import asyncio async def spam_generator(): await asyncio.sleep(1) yield "spam1" await asyncio.sleep(2) yield "spam2" async def spam(): async for s in spam_generator(): print(s) if __name__ == "__main__": asyncio.run(spam())
% time python aish_example.py spam1 spam2 python aish_example.py 0.05s user 0.04s system 3% cpu 3.118 total
この例を観察すると
- 非同期ジェネレータ
spam_generator
はasync def
でyield
している - 非同期ジェネレータを使う
spam
はasync def
でasync for
している
ここから、「非同期ジェネレータを使う側はasync for
を使えばいいのでは」という仮説が生まれます。
What's New In Python 3.6の例
もう一例、「Python 3.6で加わったのなら」と公式ドキュメントのWhat's Newを見ます。
https://docs.python.org/ja/3/whatsnew/3.6.html#pep-525-asynchronous-generators
import asyncio async def ticker(delay, to): for i in range(to): yield i await asyncio.sleep(delay) async def print_ticker(delay, to): async for t in ticker(delay, to): print(t) if __name__ == "__main__": asyncio.run(print_ticker(0.5, 5))
% time python whats_new_example.py 0 1 2 3 4 python whats_new_example.py 0.06s user 0.05s system 4% cpu 2.660 total
非同期ジェネレータticker
を使う側はasync for
で使うことになりますね(※使う側の例まではドキュメントにはありません)。
非同期ジェネレータの実装はふつうのfor
で、async for
は不要そうです。
OpenAIのAPIに並行処理でリクエスト with 非同期ジェネレータ
call_api
を非同期ジェネレータとし、使う関数にasync for
を導入します。
使う側はこう書いたら動かせました🙌
async def main(input_path: Path, output_path: Path, chunk_size: int): examples = load_examples(input_path) responses = call_api(tqdm(prompts_generator(examples)), chunk_size) # 非同期ジェネレータ examples_generator = (e for e in examples) with jsonlines.open(output_path, "w") as writer: async for response in responses: example = next(examples_generator) # zipの代わり writer.write(example | response)
zip関数で非同期のイテラブルは扱えないようだったので、ややトリッキーな実装になっています。
コード全体は記事の最後に示します。
上記実装はファイルIO(特にここでは書き込み)がブロッキングですが、Node.jsを思い出すとファイルIOも非同期でした。
上記のコードは、非同期なファイルIOのライブラリ2を導入することで、並行処理の恩恵をより享受できるのではないかと思われます。
終わりに
Pythonの非同期ジェネレータを素振りしました。
async def
とyield
で非同期ジェネレータasync for
ではなく、for
でよい
- 非同期ジェネレータを使う側は
async for
async def
も必要
非同期ジェネレータ、めちゃくちゃいかつい語感ですが、ジェネレータへの習熟、イベントループを使った並行処理(asyncio)の理解によって差分は小さくなっていて、少し背伸びしたら手が届いた感覚です。
非同期ジェネレータのよりよい使い方(ベストプラクティス)は引き続き素振りして探していこうと思います。
現時点では、概念は完全に理解した気がする!