はじめに
サンシャIN!(朝までラーニング見てます) nikkieです。
ここ最近、OpenAIのAPIでstream呼び出しをしましたが、その中で気になった小さな疑問に取り組んでいきます。
目次
- はじめに
- 目次
- OpenAIのChat completions APIをstream呼び出し
- 結論:SSEDecoderで除かれる
- 最初の手がかり:テストケース
- 調査スクリプト
- stream._iter_events()
- 落穂拾い
- 終わりに
OpenAIのChat completions APIをstream呼び出し
openaiライブラリのクライアントは(同期処理も並行処理も)stream呼び出しをサポートしています1。
openaiライブラリが中で使っているHTTPXもstream呼び出しをサポートしていました。
2つの返り値を比較したとき、
- openai
ChatCompletionChunk(id='chatcmpl-AWjPGQ6exHoP0NWXH33IkBoOCwPeo', choices=[Choice(delta=ChoiceDelta(content='1', function_call=None, refusal=None, role=None, tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1732364402, model='gpt-4o-mini-2024-07-18', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_0705bf87c0', usage=None)
- HTTPX
data: {"id":"chatcmpl-AWmbogNqdsbF6W5f74CtUtJQr6sWd","object":"chat.completion.chunk","created":1732376712,"model":"gpt-4o-mini-2024-07-18","system_fingerprint":"fp_3de1288069","choices":[{"index":0,"delta":{"content":"1"},"logprobs":null,"finish_reason":null}]}
OpenAIのAPIの返り値自体は data:
というprefixで始まります(HTTPXの例)。
一方、openaiライブラリを通すと、このprefixは除かれてJSON部分がパースされています(Pydanticでやっていますね)
data:
というprefixを除く実装は、openaiライブラリのどこにあるのでしょう?
私、気になります!
結論:SSEDecoder
で除かれる
SSEDecoder
のdecode()
メソッド
https://github.com/openai/openai-python/blob/v1.54.3/src/openai/_streaming.py#L346
fieldname, _, value = line.partition(":")
line
はstr
です。
str.partition()
https://docs.python.org/ja/3/library/stdtypes.html#str.partition
文字列を sep の最初の出現位置で区切り、 3 要素のタプルを返します。
タプルの内容は、区切りの前の部分、区切り文字列そのもの、そして区切りの後ろの部分です。
もし区切れなければ、タプルには元の文字列そのものとその後ろに二つの空文字列が入ります。
続く実装により(prefixのdata:
が除かれた)value
がself._data
に溜まっていきます。
# https://github.com/openai/openai-python/blob/v1.54.3/src/openai/_streaming.py#L353-L354 elif fieldname == "data": self._data.append(value)
最初の手がかり:テストケース
一発でSSEDecoder
と分かったわけではありません。
手がかりになったのはテストケースでした。
https://github.com/openai/openai-python/blob/v1.54.3/tests/test_streaming.py#L29-L42
@pytest.mark.asyncio @pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) async def test_data_missing_event(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: def body() -> Iterator[bytes]: yield b'data: {"foo":true}\n' yield b"\n" iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client) sse = await iter_next(iterator) assert sse.event is None assert sse.json() == {"foo": True} await assert_empty_iter(iterator)
調査スクリプト
このケースから調査スクリプトを書き起こします。
各種ヘルパー関数をインライン化したものです。
import httpx import openai def body(): yield b'data: {"foo":true}\n' yield b"\n" client = openai.Client() response = httpx.Response(200, content=body()) stream = openai.Stream(cast_to=object, client=client, response=response) iterator = stream._iter_events() sse = next(iterator) assert sse.json() == {"foo": True}
デバッグのために(inline script metadataではなく)手元の仮想環境を使いました。
- Python 3.12.6
- openai 1.54.3
- httpx 0.27.2
このスクリプトの実行から分かることは
httpx.Response
ではdata:
のprefixが付いている
>>> response2 = httpx.Response(200, content=body()) >>> list(response2.iter_bytes()) [b'data: {"foo":true}\n', b'\n']
sse.json()
はdata:
は付いていない(JSON形式のデータが取り出せている)
iterator = stream._iter_events()
の行について見ていきます
stream._iter_events()
実装はこちら
https://github.com/openai/openai-python/blob/v1.54.3/src/openai/_streaming.py#L49-L50
def _iter_events(self) -> Iterator[ServerSentEvent]: yield from self._decoder.iter_bytes(self.response.iter_bytes())
調査スクリプトをインライン化します。
-iterator = stream._iter_events() +iterator = stream._decoder.iter_bytes(stream.response.iter_bytes())
_decoder
属性の型を確認し
>>> type(stream._decoder) <class 'openai._streaming.SSEDecoder'>
SSEDecoder
クラスのiter_bytes()
メソッドにprintデバッグすることにしました。
https://github.com/openai/openai-python/blob/v1.54.3/src/openai/_streaming.py#L278-L286
def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]: for chunk in self._iter_chunks(iterator): + print(f"{chunk=}") for raw_line in chunk.splitlines(): + print(f"{raw_line=}") line = raw_line.decode("utf-8") sse = self.decode(line) if sse: yield sse
調査スクリプト実行結果
chunk=b'data: {"foo":true}\n\n' raw_line=b'data: {"foo":true}' raw_line=b''
まだdata:
とついていますね。
この出力からdecode()
メソッドと当たりがつきました。
落穂拾い
Q: str.partition(":")
で、data: JSON形式文字列
のうちdata:
は除かれるけれど、JSON形式文字列の先頭の半角スペースは?
str.partition(":")
のすぐ後で除かれています。
https://github.com/openai/openai-python/blob/v1.54.3/src/openai/_streaming.py#L348-L349
if value.startswith(" "): value = value[1:]
終わりに
OpenAIのChat completions APIからstreamで、すなわち、server-sent eventsで返るレスポンス(data: ...
)をopenaiライブラリがどう処理しているかを見てきました。
mdnのドキュメントにあるevent, data, id, retryの4つのフィールドを扱っているのですね!
サーバー送信イベントの使用 - Web API | MDN
decode()
メソッドにコメントしてあった2リンク先にserver-sent eventsの扱いが記載されているようです(積ん読)。
https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
SSEDecoder.decode()
メソッドの実装はなかなか面白いと思います。
値を返すのは空行が来たときですからね。
>>> stream._decoder.decode('data: {"foo":true}') >>> stream._decoder.decode('') ServerSentEvent(event=None, data={"foo":true}, id=None, retry=None)
dataはSSEDecoderの中に溜めていて、dataが揃ったら返しています。
iter_bytes()
の実装のif
の分岐も合わさって
def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]: for chunk in self._iter_chunks(iterator): for raw_line in chunk.splitlines(): line = raw_line.decode("utf-8") sse = self.decode(line) if sse: yield sse
外からは一連のデータが取り出されている(すなわち、Iterator)ように見えます
- stream呼び出しにより1トークンずつ返るので、すべてのトークンが生成されるまでユーザを待たせずに、テキストを表示し始めることができます↩
- https://github.com/openai/openai-python/blob/v1.54.3/src/openai/_streaming.py#L323↩