nikkie-ftnextの日記

イベントレポートや読書メモを発信

はじめてThreadPoolExecutor(Pythonのconcurrent.futuresの素振り一歩目)

はじめに

未来で待ってる。 nikkieです1

ここ最近イベントループを使った並行処理(asyncio)を素振りしました。
これはI/Oバウンドな処理に有効です。
I/Oバウンドな処理にはマルチスレッドも有効と聞くので、少しだけ素振りしました。

目次

concurrent.futuresによるマルチスレッド

I/Oバウンドな処理の一例は、通信処理(具体的には、Web APIの大量呼び出し2)。
マルチスレッドを使っても並行処理の恩恵にあずかれます。

並行処理は『Python実践入門』の解説が詳しいです3

複数の処理がある場合は非同期実行で並行化すると、通信中の待ち時間を有効活用できて合計時間を短縮できます (Kindle の位置No.4454-4455)

並行処理の実装にはマルチスレッドの他にマルチプロセスがあり(※有効なケースが異なります)、concurrent.futuresではこれらをほとんど同じコードで扱える利点があります。

ですが、concurrent.futuresを使って書かれたコードを初めて見たとき、新しい概念しかなさすぎて「全く読めない... Executorってなに? Future? as_completed??」となったのでした。
今回の素振りはそのリベンジ、少しでも理解することを目指します。

例で見るThreadPoolExecutor

動作環境:Python 3.10.9

Python実践入門』を参考にした例

ThreadPoolExecutorは図10.4が特に分かりやすかったです。
公式ドキュメント側に寄せて素振りします

from urllib.request import urlopen
from concurrent.futures import as_completed, ThreadPoolExecutor

URLS = [
    "http://www.foxnews.com/",
    "http://www.cnn.com/",
    "http://europe.wsj.com/",
    "http://www.bbc.co.uk/",
    "http://nonexistant-subdomain.python.org/",
]


def load_url(url: str, timeout: float) -> bytes:
    with urlopen(url, timeout=timeout) as conn:
        return conn.read()


if __name__ == "__main__":
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(load_url, url, 60) for url in URLS]
        for future in as_completed(futures):
            try:
                print(len(future.result()))
            except Exception as exc:
                print(str(exc))

実行結果です。

% time python book_example.py
<urlopen error [Errno 8] nodename nor servname provided, or not known>
594994
HTTP Error 403: Forbidden
2572624
597976
python book_example.py  0.10s user 0.06s system 27% cpu 0.563 total

公式ドキュメント

https://docs.python.org/ja/3/library/concurrent.futures.html#threadpoolexecutor-example

さらに工夫した実装になっています。
Futureの結果(resultメソッドの返り値)と呼び出しの引数(url)を対応付けています

# URLSやload_url関数は共通です
if __name__ == "__main__":
    with ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                print(url, len(future.result()))
            except Exception as exc:
                print(url, str(exc))
  • future_to_urlという辞書がポイント!
    • Futureオブジェクトは辞書のキーになれるんですね(ハッシュ可能)
  • as_completed関数で完了したFutureオブジェクトから返ってきますが、辞書を使って、呼び出したURLを取得できます
    • URLと合わせて、わかりやすい出力となりました
% time python docs_example.py
http://nonexistant-subdomain.python.org/ <urlopen error [Errno 8] nodename nor servname provided, or not known>
http://europe.wsj.com/ HTTP Error 403: Forbidden
http://www.bbc.co.uk/ 595786
http://www.foxnews.com/ 587429
http://www.cnn.com/ 2572624
python docs_example.py  0.10s user 0.05s system 32% cpu 0.485 total

終わりに

Pythonのconcurrent.futuresのThreadPoolExecutorを素振りしました。

  • I/Oバウンドな処理に有効なマルチスレッド
  • Executorインスタンスsubmitメソッドの返り値(Futureオブジェクト)をイテラブルに集める
    • 辞書のキーにFutureオブジェクト、値に後で取り出したい実引数
  • as_completed関数に渡すと完了したFutureから返り、個々のFutureはresultメソッドで返り値がわかる

イベントループと比べて新しい概念が多かったので「うげっ」となっていましたが、今回腰を据えて小さな一歩を踏み出したことで、なかよくやっていけそうな印象です。
近い将来、完全理解したいですね。


  1. このセリフ、2つの作品が胸に去来します
  2. イベントループによる実装例です
  3. イベントループのときは大変助けになりました また『Python実践レシピ』にも並行処理の解説があります