nikkie-ftnextの日記

イベントレポートや読書メモを発信

同時実行数を制限できるセマフォ、イベントループによる並行処理でも使えるのか!(Pythonのasyncio.Semaphoreの素振り)

はじめに

俺もー!!ノ nikkieです。

Pythonのasyncio(イベントループによる並行処理)まわりで長い間宿題だった事項に、ついに答えを得ました!

目次

前提:asyncioを使った私の実装の伸びしろ

Pythonasync/awaitを使った実装には分かっていない点がありました。

上のコードは全部一度にgatherするので、tweetsが千や万あるとすると、短時間に大量のリクエストが立て続けに送られるように思われます。

単純なサンプルコードで補足します。

% python -V
Python 3.11.8
import asyncio


async def single_request(i: int) -> int:
    print("start", i)
    await asyncio.sleep(i)
    print("end", i)
    return i


async def main():
    return await asyncio.gather(*[single_request(i) for i in range(5, 0, -1)])


if __name__ == "__main__":
    print(asyncio.run(main()))

コルーチンsingle_request()は、引数i秒だけ待ってからiを返します。
HTTP通信のイメージです。
これを5個の引数で呼び出します:

>>> list(range(5, 0, -1))
[5, 4, 3, 2, 1]

5つのコルーチンオブジェクトをasyncio.gather()並行実行します。
https://docs.python.org/ja/3/library/asyncio-task.html#asyncio.gather

start 5
start 4
start 3
start 2
start 1
end 1
end 2
end 3
end 4
end 5
[5, 4, 3, 2, 1]

待ち(asyncio.sleep())に入ると、次のコルーチンオブジェクトを実行するように動くので、5 -> 4 -> 3 -> 2 -> 1 と実行されています。
待ち時間が短いコルーチンオブジェクト(i=1)から返り値が返っていきますが、asyncio.gather()

返り値の順序は、 aws での awaitable の順序に相当します。

そのため、[5, 4, 3, 2, 1]となっているわけです。

今はrange(5, 0, -1)ですが、5を1000や10000にしたときに「全部実行される状態になるよね」という点を懸念に思っていました1
どう解決すればよいか長らくわかっていませんでした2が、セマフォを使って対処できることを知りました。

概念は知っていたセマフォ

「HTTP通信処理にtime.sleep()を入れるの、なんかカッコ悪いな〜(もっといいやり方あるんじゃないか)」と私は美意識上の引っ掛かりを感じていたので、セマフォthreading.Semaphore)の存在はとても興味を惹かれました(ふおおおお!)
https://docs.python.org/ja/3/library/threading.html#semaphore-example

セマフォを使って、同時処理数を制御する

「asyncioでもセマフォ使えるよ」と教えていただいたのがこちらの記事。

並列数の制限にはSemaphoreを使います。

こちらを参考に、上記のサンプルコードを変更します。

実行すると、非同期処理であっても、それに対応するセマフォによって同時処理数が制限されているんです!!

start 5
start 4
end 4
start 3
end 5
start 2
end 2
end 3
start 1
end 1
[5, 4, 3, 2, 1]

最初に表示されるのは「start 5」「start 4」なので同時に処理されるのは2つなんです!(Semaphore(2)

なお、async with semaphore:の行をコメントアウトすると、同時処理数の制限がない状態(=記事最初で示した状態)です

セマフォの書き方素振りゾーン

上は関数内関数(正確には、コルーチン内コルーチン?)でしたが、別の書き方も試します。

こちらはクラスの属性にasyncio.Semaphoreを持たせる例。

このZennの記事からセマフォインスタンスを引数で渡せるとヒントを得て、コルーチンを抽出しました。

-async def main():
-    semaphore = asyncio.Semaphore(2)
-
-    async def coroutine(i: int) -> int:
-        async with semaphore:
-            return await single_request(i)
-
-    return await asyncio.gather(*[coroutine(i) for i in range(5, 0, -1)])
+async def coroutine(sem, i: int) -> int:
+    async with sem:
+        return await single_request(i)
+
+
+async def main():
+    semaphore = asyncio.Semaphore(2)
+    return await asyncio.gather(
+        *[coroutine(semaphore, i) for i in range(5, 0, -1)]
+    )

関数内関数を解消して同様に動かせました(リファクタリング成功🙌)

終わりに

Pythonでasync/awaitによる並行処理の実行数を制御できるasyncio.Semaphoreを知りました。
HTTP通信の並行処理の最後のピースがハマった感があります。
並行処理でもセマフォで制限をかけられるんだ!
よ う や く 完全に理解したぞ!

しかし、コンテキストマネージャーとして使うだけで処理数に制限がかかるセマフォ、いったい全体どういう仕組み(実装)なんでしょう?


  1. 直面した記事です。
  2. 過去の記事では、more_itertoolsでchunkに分けて、chunkごとにasyncio.gather()のようなことをしています