はじめに
俺もー!!ノ nikkieです。
Pythonのasyncio(イベントループによる並行処理)まわりで長い間宿題だった事項に、ついに答えを得ました!
目次
前提:asyncioを使った私の実装の伸びしろ
Pythonのasync
/await
を使った実装には分かっていない点がありました。
上のコードは全部一度にgatherするので、tweetsが千や万あるとすると、短時間に大量のリクエストが立て続けに送られるように思われます。
単純なサンプルコードで補足します。
% python -V Python 3.11.8
import asyncio async def single_request(i: int) -> int: print("start", i) await asyncio.sleep(i) print("end", i) return i async def main(): return await asyncio.gather(*[single_request(i) for i in range(5, 0, -1)]) if __name__ == "__main__": print(asyncio.run(main()))
コルーチンsingle_request()
は、引数i
秒だけ待ってからi
を返します。
HTTP通信のイメージです。
これを5個の引数で呼び出します:
>>> list(range(5, 0, -1)) [5, 4, 3, 2, 1]
5つのコルーチンオブジェクトをasyncio.gather()
で並行実行します。
https://docs.python.org/ja/3/library/asyncio-task.html#asyncio.gather
start 5 start 4 start 3 start 2 start 1 end 1 end 2 end 3 end 4 end 5 [5, 4, 3, 2, 1]
待ち(asyncio.sleep()
)に入ると、次のコルーチンオブジェクトを実行するように動くので、5 -> 4 -> 3 -> 2 -> 1 と実行されています。
待ち時間が短いコルーチンオブジェクト(i=1
)から返り値が返っていきますが、asyncio.gather()
は
返り値の順序は、 aws での awaitable の順序に相当します。
そのため、[5, 4, 3, 2, 1]
となっているわけです。
今はrange(5, 0, -1)
ですが、5を1000や10000にしたときに「全部実行される状態になるよね」という点を懸念に思っていました1。
どう解決すればよいか長らくわかっていませんでした2が、セマフォを使って対処できることを知りました。
概念は知っていたセマフォ
#pythontipslt 「セマフォでタスクの同時実行数制限」
— nikkie / にっきー (@ftnext) 2022年5月25日
セマフォを使えば、並列プログラミングでタスクを同時にN個までできる!https://t.co/YLKXHKBbZd
セマフォ、とっつきづらそうという印象でしたが、例を見てスクレイピングするときに使ってみようと思いました。
さらば秘伝のtime.sleep👋
「HTTP通信処理にtime.sleep()
を入れるの、なんかカッコ悪いな〜(もっといいやり方あるんじゃないか)」と私は美意識上の引っ掛かりを感じていたので、セマフォ(threading.Semaphore
)の存在はとても興味を惹かれました(ふおおおお!)
https://docs.python.org/ja/3/library/threading.html#semaphore-example
セマフォを使って、同時処理数を制御する
「asyncioでもセマフォ使えるよ」と教えていただいたのがこちらの記事。
並列数の制限には
Semaphore
を使います。
こちらを参考に、上記のサンプルコードを変更します。
実行すると、非同期処理であっても、それに対応するセマフォによって同時処理数が制限されているんです!!
start 5 start 4 end 4 start 3 end 5 start 2 end 2 end 3 start 1 end 1 [5, 4, 3, 2, 1]
最初に表示されるのは「start 5」「start 4」なので同時に処理されるのは2つなんです!(Semaphore(2)
)
なお、async with semaphore:
の行をコメントアウトすると、同時処理数の制限がない状態(=記事最初で示した状態)です
セマフォの書き方素振りゾーン
上は関数内関数(正確には、コルーチン内コルーチン?)でしたが、別の書き方も試します。
こちらはクラスの属性にasyncio.Semaphore
を持たせる例。
このZennの記事からセマフォインスタンスを引数で渡せるとヒントを得て、コルーチンを抽出しました。
-async def main(): - semaphore = asyncio.Semaphore(2) - - async def coroutine(i: int) -> int: - async with semaphore: - return await single_request(i) - - return await asyncio.gather(*[coroutine(i) for i in range(5, 0, -1)]) +async def coroutine(sem, i: int) -> int: + async with sem: + return await single_request(i) + + +async def main(): + semaphore = asyncio.Semaphore(2) + return await asyncio.gather( + *[coroutine(semaphore, i) for i in range(5, 0, -1)] + )
関数内関数を解消して同様に動かせました(リファクタリング成功🙌)
終わりに
Pythonでasync/awaitによる並行処理の実行数を制御できるasyncio.Semaphore
を知りました。
HTTP通信の並行処理の最後のピースがハマった感があります。
並行処理でもセマフォで制限をかけられるんだ!
よ う や く 完全に理解したぞ!
しかし、コンテキストマネージャーとして使うだけで処理数に制限がかかるセマフォ、いったい全体どういう仕組み(実装)なんでしょう?
- 直面した記事です。↩
-
過去の記事では、more_itertoolsでchunkに分けて、chunkごとに
asyncio.gather()
のようなことをしています↩