nikkie-ftnextの日記

イベントレポートや読書メモを発信

OpenAIのAPIに並行処理でリクエストを送る(ChatCompletionにasync/await!イベントループ篇)

はじめに

たくさん、どうぞ!1 nikkieです。

毎日夏!って感じですが、今日も元気にOpenAIのAPIを叩いていきたいと思います!
今回はたくさん送るので、並行処理を模索しました。
現時点での考えのバックアップ目的のエントリです。

目次

データセット数千件をChatGPTに固有表現認識させたい

今回の状況の共有です。
論文「Is Information Extraction Solved by ChatGPT?」に興味を持ちました。

この論文は固有表現認識をはじめとするデータセットを何種類も用意し、プロンプトエンジニアリングしてChatGPTに解かせ、性能を確認します。
SOTA(state of the art)の性能に及ぶかを検証しており、その点でタイトル「ChatGPTによって、情報抽出タスクは解かれたのか?」がぴったりですね。

この検証の実装の一部を手元でも再現させたいというのが今回の動機です。
使ったデータセットhuggingface/datasetsにあるconll2003
テストセット3453件をChatGPTのAPIに送り、ChatCompletionさせて固有表現認識を解かせます。
その後は正解ラベルとChatGPTのレスポンス(予測ラベル)から評価指標(Micro F1)を算出するのですが、この記事ではスコープアウトします。

今回の特徴はChatGPTに処理させたいデータの件数の多さ
データ1件につき1つのプロンプトで解かせるので、合計で3453回リクエストを送ります。
これは1つ1つ順番に(逐次)リクエストを送っていたら、結構時間がかかりそうですよね。
応答に1秒かかると仮定すると、逐次処理では少なくとも3453秒(約1時間)です。

件数の多さに対して、論文の実装ではマルチスレッドで対応しています2
マルチスレッドによる並行処理でももちろんよいのですが、個人的にはasyncio(イベントループ)を使った並行処理を練習したいという想いがありました。

ライブラリopenaiをイベントループで利用する

過去のエントリ3でopenaiの実装を眺めていたのですが、そのときにopenai.ChatCompletion.acreateを見つけていました。
https://github.com/openai/openai-python/blob/v0.27.8/openai/api_resources/chat_completion.py#L33

class ChatCompletion(EngineAPIResource):
    @classmethod
    async def acreate(cls, *args, **kwargs):
        ...

というわけでこちらを使っていきます。
イベントループについては昨年のみんなのPython勉強会を機に完全に理解しました。

asyncio.gatherを使います。
https://docs.python.org/ja/3.10/library/asyncio-task.html#asyncio.gather

asyncio.gatherには複数のコルーチンを渡します(*aws引数)。
以下の点が非常に強力です。

全ての awaitable が正常終了した場合、その結果は返り値を集めたリストになります。 返り値の順序は、 aws での awaitable の順序に相当します。

渡したコルーチンと返り値(APIのレスポンス)の順序が揃うわけですね!
今回はプロンプトごとにコルーチンを作ることにしました(openai.ChatCompletion.acreateを呼び出せばよいです)

コード例(コメント歓迎)

動作環境

  • Python 3.10.9
  • openai 0.27.8
  • jsonlines 3.1.0
  • more-itertools 9.1.0
  • tqdm 4.65.0

現時点の考え

  • 探し方が悪かったか、イベントループを使って並行処理でたくさんリクエストを送るサンプルコードは見つからず
    • 1件だけacreateで送るといったコードは見つかりました。こうやったらコルーチンができるのかという参考情報として読みました
  • 3453件から3453個のコルーチンを作り、一度にasyncio.gatherに渡す?
    • ー No(3453個並行になるから、めっちゃ負荷かけますよね)
  • 小さなチャンクに分け(今回は5個)、チャンクごとに並行処理する実装です(並行処理されるリクエストは最大でも5個)
    • tqdmで出したプログレスバー、チャンクが並行に処理されたら次のチャンクに進む様子が見てとれます
    • チャンク分けの実装、他の方がやっているのをまだ見かけていないので、もっといいやり方あったら知りたいな〜

OpenAIのAPIはエラーが返ることが多い感覚なので、asyncio.gatherreturn_exceptions引数をTrueに指定。

return_exceptions が True だった場合、例外は成功した結果と同じように取り扱われ、結果リストに集められます。

結果リストに集めた例外は、今回は握り潰してNone扱い(※悪い例です)としました。
イベントループを使った並行処理が少しストレッチだったので、細部までこだわりきれずに宿題となっています。

なお論文ではAPIが返したエラーのリトライにbackoffを使っています4
よくわかんないけどそれをパクったらなんかいい感じでした(理解を深めて今後アウトプットしたい!)

終わりに

ChatGPTに自然言語処理のタスクを解かせたいシーンで、手元にあるたくさんのラベル付きデータを並行処理でAPIに送り処理させる方法をアウトプットしました。

  • マルチスレッドでやることもできるけれど、今回はイベントループを試した(asyncio, async/await)
  • openai.ChatCompletion.acreateが提供されている
    • プロンプトをチャンクに分け、チャンク分acreateを呼び出してコルーチンを用意
    • asyncio.gatherに渡してコルーチンを並行処理!
  • 例外の扱い、return_exceptions=Trueを指定して握り潰したが、これは伸びしろでbackoffがいい感じ(なお、分かってない)

実装に伸びしろはあると思いますが、「XXタスクのYYデータセットをChatGPTに解かせたら?」と思ったときにどのくらいの性能か(ChatGPT as ベースライン)を確認できるスクリプトを手に入れました🙌

再現実装は現在こんな感じです: