はじめに
未来で待ってる。 nikkieです1。
ここ最近イベントループを使った並行処理(asyncio)を素振りしました。
これはI/Oバウンドな処理に有効です。
I/Oバウンドな処理にはマルチスレッドも有効と聞くので、少しだけ素振りしました。
目次
concurrent.futuresによるマルチスレッド
I/Oバウンドな処理の一例は、通信処理(具体的には、Web APIの大量呼び出し2)。
マルチスレッドを使っても並行処理の恩恵にあずかれます。
並行処理は『Python実践入門』の解説が詳しいです3。
複数の処理がある場合は非同期実行で並行化すると、通信中の待ち時間を有効活用できて合計時間を短縮できます (Kindle の位置No.4454-4455)
並行処理の実装にはマルチスレッドの他にマルチプロセスがあり(※有効なケースが異なります)、concurrent.futuresではこれらをほとんど同じコードで扱える利点があります。
ですが、concurrent.futuresを使って書かれたコードを初めて見たとき、新しい概念しかなさすぎて「全く読めない... Executorってなに? Future? as_completed??」となったのでした。
今回の素振りはそのリベンジ、少しでも理解することを目指します。
例で見るThreadPoolExecutor
動作環境:Python 3.10.9
『Python実践入門』を参考にした例
ThreadPoolExecutor
は図10.4が特に分かりやすかったです。
公式ドキュメント側に寄せて素振りします
from urllib.request import urlopen from concurrent.futures import as_completed, ThreadPoolExecutor URLS = [ "http://www.foxnews.com/", "http://www.cnn.com/", "http://europe.wsj.com/", "http://www.bbc.co.uk/", "http://nonexistant-subdomain.python.org/", ] def load_url(url: str, timeout: float) -> bytes: with urlopen(url, timeout=timeout) as conn: return conn.read() if __name__ == "__main__": with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(load_url, url, 60) for url in URLS] for future in as_completed(futures): try: print(len(future.result())) except Exception as exc: print(str(exc))
- I/Oバウンドな通信処理
load_url
をマルチスレッドで並行処理する submit
メソッドが返すFutureオブジェクトをリストに集める(futures
)- https://docs.python.org/ja/3/library/concurrent.futures.html#concurrent.futures.Executor.submit
submit
メソッドの引数は、load_url
関数を2つの引数(url
とtimeout
)で呼び出すことを表す- 呼び出しは先送り(future)
- イテラブルを
as_completed
関数に渡すと、完了したFutureから返る- https://docs.python.org/ja/3/library/concurrent.futures.html#concurrent.futures.as_completed
- 完了は終了またはキャンセル(finished or cancelled futures)
- (順番を保持しないのは
asyncio.gather
と違うところ)
- 完了したFutureは
result
メソッドで(load_url
呼び出しの)返り値が得られる- https://docs.python.org/ja/3/library/concurrent.futures.html#concurrent.futures.Future.result
- 呼び出しで例外が送出された場合は
result
メソッドも同じ例外を送出する(なのでtry
文が必要)
実行結果です。
% time python book_example.py <urlopen error [Errno 8] nodename nor servname provided, or not known> 594994 HTTP Error 403: Forbidden 2572624 597976 python book_example.py 0.10s user 0.06s system 27% cpu 0.563 total
公式ドキュメント
https://docs.python.org/ja/3/library/concurrent.futures.html#threadpoolexecutor-example
さらに工夫した実装になっています。
Futureの結果(resultメソッドの返り値)と呼び出しの引数(url)を対応付けています
# URLSやload_url関数は共通です if __name__ == "__main__": with ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in as_completed(future_to_url): url = future_to_url[future] try: print(url, len(future.result())) except Exception as exc: print(url, str(exc))
future_to_url
という辞書がポイント!- Futureオブジェクトは辞書のキーになれるんですね(ハッシュ可能)
as_completed
関数で完了したFutureオブジェクトから返ってきますが、辞書を使って、呼び出したURLを取得できます- URLと合わせて、わかりやすい出力となりました
% time python docs_example.py http://nonexistant-subdomain.python.org/ <urlopen error [Errno 8] nodename nor servname provided, or not known> http://europe.wsj.com/ HTTP Error 403: Forbidden http://www.bbc.co.uk/ 595786 http://www.foxnews.com/ 587429 http://www.cnn.com/ 2572624 python docs_example.py 0.10s user 0.05s system 32% cpu 0.485 total
終わりに
Pythonのconcurrent.futuresのThreadPoolExecutorを素振りしました。
- I/Oバウンドな処理に有効なマルチスレッド
- Executorインスタンスのsubmitメソッドの返り値(Futureオブジェクト)をイテラブルに集める
- 辞書のキーにFutureオブジェクト、値に後で取り出したい実引数
- as_completed関数に渡すと完了したFutureから返り、個々のFutureはresultメソッドで返り値がわかる
イベントループと比べて新しい概念が多かったので「うげっ」となっていましたが、今回腰を据えて小さな一歩を踏み出したことで、なかよくやっていけそうな印象です。
近い将来、完全理解したいですね。
- このセリフ、2つの作品が胸に去来します↩
- イベントループによる実装例です ↩
- イベントループのときは大変助けになりました また『Python実践レシピ』にも並行処理の解説があります↩