nikkie-ftnextの日記

イベントレポートや読書メモを発信

OpenAIのChat completions APIからstreamで「data: JSON形式文字列」というデータが返ってくるのですが、openaiライブラリはどこでprefixのdataを除いてJSONを取り出しているの?

はじめに

サンシャIN!(朝までラーニング見てます) nikkieです。

ここ最近、OpenAIのAPIでstream呼び出しをしましたが、その中で気になった小さな疑問に取り組んでいきます。

目次

OpenAIのChat completions APIをstream呼び出し

openaiライブラリのクライアントは(同期処理も並行処理も)stream呼び出しをサポートしています1

openaiライブラリが中で使っているHTTPXもstream呼び出しをサポートしていました。

2つの返り値を比較したとき、

  • openai
ChatCompletionChunk(id='chatcmpl-AWjPGQ6exHoP0NWXH33IkBoOCwPeo', choices=[Choice(delta=ChoiceDelta(content='1', function_call=None, refusal=None, role=None, tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1732364402, model='gpt-4o-mini-2024-07-18', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_0705bf87c0', usage=None)
  • HTTPX
data: {"id":"chatcmpl-AWmbogNqdsbF6W5f74CtUtJQr6sWd","object":"chat.completion.chunk","created":1732376712,"model":"gpt-4o-mini-2024-07-18","system_fingerprint":"fp_3de1288069","choices":[{"index":0,"delta":{"content":"1"},"logprobs":null,"finish_reason":null}]}

OpenAIのAPIの返り値自体は data:というprefixで始まります(HTTPXの例)。
一方、openaiライブラリを通すと、このprefixは除かれてJSON部分がパースされています(Pydanticでやっていますね)

data:というprefixを除く実装は、openaiライブラリのどこにあるのでしょう?
私、気になります!

結論:SSEDecoderで除かれる

SSEDecoderdecode()メソッド
https://github.com/openai/openai-python/blob/v1.54.3/src/openai/_streaming.py#L346

fieldname, _, value = line.partition(":")

linestrです。
str.partition()
https://docs.python.org/ja/3/library/stdtypes.html#str.partition

文字列を sep の最初の出現位置で区切り、 3 要素のタプルを返します。
タプルの内容は、区切りの前の部分、区切り文字列そのもの、そして区切りの後ろの部分です。
もし区切れなければ、タプルには元の文字列そのものとその後ろに二つの空文字列が入ります。

続く実装により(prefixのdata:が除かれた)valueself._dataに溜まっていきます。

# https://github.com/openai/openai-python/blob/v1.54.3/src/openai/_streaming.py#L353-L354
elif fieldname == "data":
    self._data.append(value)

最初の手がかり:テストケース

一発でSSEDecoderと分かったわけではありません。
手がかりになったのはテストケースでした。
https://github.com/openai/openai-python/blob/v1.54.3/tests/test_streaming.py#L29-L42

@pytest.mark.asyncio
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
async def test_data_missing_event(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None:
    def body() -> Iterator[bytes]:
        yield b'data: {"foo":true}\n'
        yield b"\n"

    iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)

    sse = await iter_next(iterator)
    assert sse.event is None
    assert sse.json() == {"foo": True}

    await assert_empty_iter(iterator)

調査スクリプト

このケースから調査スクリプトを書き起こします。
各種ヘルパー関数をインライン化したものです。

import httpx
import openai


def body():
    yield b'data: {"foo":true}\n'
    yield b"\n"


client = openai.Client()

response = httpx.Response(200, content=body())
stream = openai.Stream(cast_to=object, client=client, response=response)
iterator = stream._iter_events()
sse = next(iterator)

assert sse.json() == {"foo": True}

デバッグのために(inline script metadataではなく)手元の仮想環境を使いました。

  • Python 3.12.6
  • openai 1.54.3
  • httpx 0.27.2

このスクリプトの実行から分かることは

  • httpx.Responseではdata:のprefixが付いている
>>> response2 = httpx.Response(200, content=body())
>>> list(response2.iter_bytes())
[b'data: {"foo":true}\n', b'\n']
  • sse.json()data:は付いていない(JSON形式のデータが取り出せている)

iterator = stream._iter_events()の行について見ていきます

stream._iter_events()

実装はこちら
https://github.com/openai/openai-python/blob/v1.54.3/src/openai/_streaming.py#L49-L50

def _iter_events(self) -> Iterator[ServerSentEvent]:
    yield from self._decoder.iter_bytes(self.response.iter_bytes())

調査スクリプトをインライン化します。

-iterator = stream._iter_events()
+iterator = stream._decoder.iter_bytes(stream.response.iter_bytes())

_decoder属性の型を確認し

>>> type(stream._decoder)
<class 'openai._streaming.SSEDecoder'>

SSEDecoderクラスのiter_bytes()メソッドにprintデバッグすることにしました。
https://github.com/openai/openai-python/blob/v1.54.3/src/openai/_streaming.py#L278-L286

def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]:
    for chunk in self._iter_chunks(iterator):
+            print(f"{chunk=}")
        for raw_line in chunk.splitlines():
+                print(f"{raw_line=}")
            line = raw_line.decode("utf-8")
            sse = self.decode(line)
            if sse:
                yield sse

調査スクリプト実行結果

chunk=b'data: {"foo":true}\n\n'
raw_line=b'data: {"foo":true}'
raw_line=b''

まだdata:とついていますね。
この出力からdecode()メソッドと当たりがつきました。

落穂拾い

Q: str.partition(":")で、data: JSON形式文字列のうちdata:は除かれるけれど、JSON形式文字列の先頭の半角スペースは?

str.partition(":")のすぐ後で除かれています。
https://github.com/openai/openai-python/blob/v1.54.3/src/openai/_streaming.py#L348-L349

if value.startswith(" "):
    value = value[1:]

終わりに

OpenAIのChat completions APIからstreamで、すなわち、server-sent eventsで返るレスポンス(data: ...)をopenaiライブラリがどう処理しているかを見てきました。
mdnのドキュメントにあるevent, data, id, retryの4つのフィールドを扱っているのですね!
サーバー送信イベントの使用 - Web API | MDN
decode()メソッドにコメントしてあった2リンク先にserver-sent eventsの扱いが記載されているようです(積ん読)。
https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation

SSEDecoder.decode()メソッドの実装はなかなか面白いと思います。
値を返すのは空行が来たときですからね。

>>> stream._decoder.decode('data: {"foo":true}')
>>> stream._decoder.decode('')
ServerSentEvent(event=None, data={"foo":true}, id=None, retry=None)

dataはSSEDecoderの中に溜めていて、dataが揃ったら返しています。
iter_bytes()の実装のifの分岐も合わさって

def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]:
    for chunk in self._iter_chunks(iterator):
        for raw_line in chunk.splitlines():
            line = raw_line.decode("utf-8")
            sse = self.decode(line)
            if sse:
                yield sse

外からは一連のデータが取り出されている(すなわち、Iterator)ように見えます


  1. stream呼び出しにより1トークンずつ返るので、すべてのトークンが生成されるまでユーザを待たせずに、テキストを表示し始めることができます
  2. https://github.com/openai/openai-python/blob/v1.54.3/src/openai/_streaming.py#L323