nikkie-ftnextの日記

イベントレポートや読書メモを発信

Apache Beam Python SDKのGet startedにあるWordCountの例をローカルで動かす

はじめに

海風とカスタネット🤗 nikkieです♪

Apache Beamなるものを触ってみました。なんとかビーム!1

目次

Apache Beamとの出会い

先日のPyCon APAC 2023で聞いたトークがきっかけです2

GCPのDataflowのドキュメントを見たところ、まずローカルで動かせそうでした。 https://cloud.google.com/dataflow/docs/quickstarts/create-pipeline-python?hl=ja#local-terminal
そこで「新規要素は最小限」とまずはローカルで動かしました。

Apache Beam Python SDKのドキュメントでは以下です。

リア王に登場する英単語の数をApache Beamで集計

スクリプト

動作環境

リア王のファイルの取得:
gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt .

python minimal_wordcount_example.pyスクリプトを実行すると、outputs-00000-of-00001というファイルができます。
中身は、「単語: 出現回数」です(以下に抜粋)

KING: 243
LEAR: 236
king: 65

現時点の理解メモ

リア王のテキストファイルを以下のように加工しています

  1. テキストファイル -> 1行ずつ取り出す
  2. 1行 -> 単語を取り出す(正規表現使用)
  3. 単語ごとに出現回数を数える
  4. 単語と出現回数を「単語: 出現回数」という1行に整形
  5. 整形した行をファイルに出力

https://github.com/apache/beam/blob/v2.51.0/website/www/site/static/images/wordcount-pipeline.svg (スクショ撮りました)

apache_beamのPipelineをコンテキストマネージャとして使うと、withの中で処理を適用するフローを定義した後、withを抜けるのが契機になって、Pipelineのrunメソッドが実行されるようです(__exit__を使っているのかな?)

Pipelineは|>>を使って定義する独特の書き方。
DSLっぽいですね。
これらの演算子に対応するダンダーメソッドを使って実装していそう

実装の解説は以下にあります。
https://beam.apache.org/get-started/wordcount-example/#applying-pipeline-transforms

  • ReadFromTextPCollectionを出力
    • PCollectionの要素はテキストファイルの1行
  • 1行を要素とするPCollectionから単語を要素とするPCollectionへ変換
>>> re.findall(r"[A-Za-z']+", "Knights of Lear's train,")
['Knights', 'of', "Lear's", 'train']

この正規表現の実装は、カンマやピリオドを除けるので、半角スペースで分割するという実装(ぱっと浮かぶ)よりも賢いですね

  • Countにより、単語を要素とするPCollectionをキーと値のペアを要素とするPCollectionに変換
    • キーは重複が除かれた単語、値はキーの出現回数
  • キーと値のペアを要素とするPCollectionを文字列を要素とするPCollectionに変換
    • 単語の出現回数が表せるように整形
  • 文字列を要素とするPCollectionを1要素1行としてファイル出力

終わりに

Apache BeamのPython SDKのドキュメントにある、ファイル中の単語の出現回数を数える例で手を動かしました。
これはApache Beamを使わずに実装したこともある処理ですが、Apache Beamの概念を使ってDSLでパイプラインを書くというのは、なかなかに興味深かったです。
PCollectionを続けて変換していくんだ!

今回はminimalなexampleなので、word countの他の例も見てみたいのと、PyConで見たようにDataflowで動かしてみたいなと思っています。


  1. PyCon APAC 2023で初めて出会ったわけではないのですが、使い所があるかもと興味を持ちました。