nikkie-ftnextの日記

イベントレポートや読書メモを発信

非同期ジェネレータ 素振りの記(OpenAIのAPIに並行処理でリクエストを送る処理を書き直す)

はじめに

みんな、目黒シネマへ行くんだ。特別上映版『かがみの孤城』が観られるぞ! nikkieです。

ここ数日Python非同期ジェネレータが頭に引っかかっています。
難しそうな概念ですが、「ついにその時が来たか」と腹を決めて素振りしました。

※考え違いをしていたら@ftnextまでご指摘いただけると大変ありがたいです。

目次

ジェネレータ

ジェネレータとは、yieldを持つ関数です。

def awesome_generator():
    yield "さかなー🐟"
    yield "ちんあなごー🙌"

ジェネレータはリストと比べるとメモリ効率がよく、私がPythonを書く上ではもう必需品です。
リストを返す関数の代わりにジェネレータで実装することを検討します。
もう少し詳しい説明が以下にあります。

過去のLTより

非同期ジェネレータが気になりだしたきっかけ

「非同期ジェネレータ」が気になり始めたきっかけは、OpenAIのAPIに3000件のリクエストを並行処理で送る実装をしたこと。

この実装をする中で、APIを呼び出す関数はレスポンス1件ずつyieldしたかったのですが、

async def call_api(...):  # この実装はイメージです
    for chunk in chunked(propmts, chunk_size):
        coroutines = [
            openai.ChatCompletion.acreate(...) for prompt in chunk
        ]
        results = await asyncio.gather(*coroutines, return_exceptions=True)
        for result in results:
            # resultを元にresponseを組み立て
            yield response    

慣れていないasync/awaitとyieldの組合せ方が分からず断念(経験値不足で、エラーがちょっと何言ってるかわからない状態)。
このときは実装したい事項が山積みだったので、yieldを使うのを諦め、リストに3000件全部を集めて返す実装としました。

async def call_api(...) -> list[OpenAIResponse]:  # この実装はイメージです
    responses: list[OpenAIResponse] = []
    for chunk in chunked(propmts, chunk_size):
        coroutines = [
            openai.ChatCompletion.acreate(...) for prompt in chunk
        ]
        results = await asyncio.gather(*coroutines, return_exceptions=True)
        for result in results:
            # resultを元にresponseを組み立て
            responses.append(response)
    return responses

一通り実装を終えた後、「asyncの付いた関数でyieldって非同期ジェネレータってやつだよな」と引っかかり始めます。

はじめて非同期ジェネレータ

石本さんブログの例

「非同期ジェネレータ」で検索すると、ぁっぉさん1のブログを発見。

私が初めて触ったPythonは3.6なのですが、そこで非同期ジェネレータもサポートされていたのか!

写経して動かします(Python 3.10.9)。

import asyncio


async def spam_generator():
    await asyncio.sleep(1)
    yield "spam1"
    await asyncio.sleep(2)
    yield "spam2"


async def spam():
    async for s in spam_generator():
        print(s)


if __name__ == "__main__":
    asyncio.run(spam())
% time python aish_example.py
spam1
spam2
python aish_example.py  0.05s user 0.04s system 3% cpu 3.118 total

この例を観察すると

  • 非同期ジェネレータspam_generatorasync defyieldしている
  • 非同期ジェネレータを使うspamasync defasync forしている

ここから、「非同期ジェネレータを使う側はasync forを使えばいいのでは」という仮説が生まれます。

What's New In Python 3.6の例

もう一例、「Python 3.6で加わったのなら」と公式ドキュメントのWhat's Newを見ます。
https://docs.python.org/ja/3/whatsnew/3.6.html#pep-525-asynchronous-generators

import asyncio


async def ticker(delay, to):
    for i in range(to):
        yield i
        await asyncio.sleep(delay)


async def print_ticker(delay, to):
    async for t in ticker(delay, to):
        print(t)


if __name__ == "__main__":
    asyncio.run(print_ticker(0.5, 5))
% time python whats_new_example.py
0
1
2
3
4
python whats_new_example.py  0.06s user 0.05s system 4% cpu 2.660 total

非同期ジェネレータtickerを使う側はasync forで使うことになりますね(※使う側の例まではドキュメントにはありません)。
非同期ジェネレータの実装はふつうのforで、async forは不要そうです。

OpenAIのAPIに並行処理でリクエスト with 非同期ジェネレータ

call_apiを非同期ジェネレータとし、使う関数にasync forを導入します。
使う側はこう書いたら動かせました🙌

async def main(input_path: Path, output_path: Path, chunk_size: int):
    examples = load_examples(input_path)
    responses = call_api(tqdm(prompts_generator(examples)), chunk_size)  # 非同期ジェネレータ

    examples_generator = (e for e in examples)
    with jsonlines.open(output_path, "w") as writer:
        async for response in responses:
            example = next(examples_generator)  # zipの代わり
            writer.write(example | response)

zip関数で非同期のイテラブルは扱えないようだったので、ややトリッキーな実装になっています。
コード全体は記事の最後に示します。

上記実装はファイルIO(特にここでは書き込み)がブロッキングですが、Node.jsを思い出すとファイルIOも非同期でした。
上記のコードは、非同期なファイルIOのライブラリ2を導入することで、並行処理の恩恵をより享受できるのではないかと思われます。

終わりに

Pythonの非同期ジェネレータを素振りしました。

  • async defyieldで非同期ジェネレータ
    • async forではなく、forでよい
  • 非同期ジェネレータを使う側はasync for
    • async defも必要

非同期ジェネレータ、めちゃくちゃいかつい語感ですが、ジェネレータへの習熟、イベントループを使った並行処理(asyncio)の理解によって差分は小さくなっていて、少し背伸びしたら手が届いた感覚です。
非同期ジェネレータのよりよい使い方(ベストプラクティス)は引き続き素振りして探していこうと思います。
現時点では、概念は完全に理解した気がする!

ソースコード(OpenAIのAPIに並行処理でリクエスト with 非同期ジェネレータ)


  1. 石本さん == ぁっぉさん
  2. 過去にaiofilesを触ったことがあります。