nikkie-ftnextの日記

イベントレポートや読書メモを発信

Apache BeamのPython SDKで transform1 | "label" >> transform2 のように書ける実装を読む

はじめに

なんとかビーム! nikkieです。

PyCon APAC 2023で興味を持ったApache Beamをローカルで動かしました。

PythonDSLっぽい書き方ができたわけですが、これがどうやって実現されているのかソースコードを覗いてみました。

目次

PythonでPipelineを定義するのがDSLっぽい!

ローカルで動かしたときのコードの抜粋です

with beam.Pipeline() as pipeline:
    (
        pipeline
        | beam.io.ReadFromText("kinglear.txt")
        | "ExtractWords" >> beam.FlatMap(lambda row: re.findall(r"[A-Za-z']+", row))
    )

|>>の使い方がふだんは見かけないですよね🤩
Apache Beam SDKのPipelineの操作をするために独自の実装がされていそうです。

Pipelineとは

https://beam.apache.org/releases/pydoc/2.51.0/apache_beam.pipeline.html#apache_beam.pipeline.Pipeline

A pipeline object that manages a DAG of PValues and their PTransforms.

PValueとPTransformからなるDAG(有向非巡回グラフ1)を管理するのがPipeline

  • PValue:グラフのノード
  • PTransform:グラフのエッジ

DAGについても気になりますが、今回はここまで。
以下のように続きます。

All the transforms applied to the pipeline must have distinct full labels.

パイプラインに適用されるtransformすべては、別個の完全なラベルを持たなければならない

新しい名前を割り当てるためにinput | "label" >> my_transformのような書き方となるという記載もあります。
この書き方がどのように実現できるのか、PTransformの実装を見てみます。

PTransform

https://github.com/apache/beam/blob/v2.51.0/sdks/python/apache_beam/transforms/ptransform.py

>>で新しい名前を割り当てられるのはなぜ?

結論を言うと、>>という演算子2に関連するメソッドは__rrshift__です。

"label" >> my_transformという式について、

演算は以下のように進みます3

  1. 左項の__rshift__メソッドを呼び出して演算できるかをみる
  2. 1ができない場合、右項の__rrshift__メソッドを呼び出して演算できるかをみる

"label" >> my_transformという式の場合は、2に進みます

  def __rrshift__(self, label):
    return _NamedPTransform(self, label)

つまり"label" >> my_transformという式は_NamedPTransformを返すわけですね。
これはPTransformを継承したクラスです

|でPTransformを合成できるのはなぜ?

|には__or____ror__が関係します。

__or__は、右項がPTransformならば、左項のPTransformと合成します

  def __or__(self, right):
    if isinstance(right, PTransform):
      return _ChainedPTransform(self, right)
    return NotImplemented

Used to compose PTransforms, e.g., ptransform1 | ptransform2. (docstringより)

__ror__は、左項がPValueでないときにPTransformと演算できるようにするための実装のようです(Pipelineを作っている?)

Used to apply this PTransform to non-PValues, e.g., a tuple. (docstringより)

演算子の優先順位

Apache Beamの書式に沿って定義したPipelineは1つの式なので、演算子の優先順位も確認しましょう。
https://docs.python.org/ja/3/reference/expressions.html#operator-precedence

シフト演算>>の方が、ビット単位OR|より優先されますね。
なので、PTransformに名前を与える"label" >> my_transformがまず全部実行されるわけです。
そして、PTransformが|演算子で左から合成されていきます。

終わりに

Apache Beam SDKtransform1 | "label" >> transform2という式が実行できる実装を見てきました。

  • 演算子|>>)に対応する特殊メソッド(=ダンダーメソッド)がある
  • SDKのPTransformクラスでそれらの特殊メソッドを実装しているので、transform1 | "label" >> transform2と書ける
    • "label" >> transform と書けるのは、PTransformの__rrshift__メソッドによる(新しいPTransformを返す)
    • transform1 | transform2と書けるのは、PTransformの__or__(と__ror__)メソッドによる

ダンダーメソッド使っているのではと予想していた通りでした。

一方宿題事項も残っており、完全に理解まで道半ばです

  • PValue・PTransformとそれらからなるDAGについてはまだ掴みきれていない
  • ローカルで動かす実装は起点(一番左の項)がPipelineインスタンスではないか。|演算子がどう動いている?(PTransformの__ror__を見れば分かる?)
  • Pipelineをwith文でコンテキストマネージャとして使ったときに、ブロックで定義した一連のPTransformはどう実行されるんだろう?(withが閉じるのはPipelineで、それがどうPTransformたちに伝わるのか)

この先のソースリーディングはちょっと重そうですが、また時間を見つけて読んでみようと思います


  1. https://ja.wikipedia.org/wiki/%E6%9C%89%E5%90%91%E9%9D%9E%E5%B7%A1%E5%9B%9E%E3%82%B0%E3%83%A9%E3%83%95
  2. right shift operator(Apache Beamのドキュメントより)
  3. 理解の助けになったのは『ゼロから作るDeep Learning ❸』ステップ20〜22です。また組み込み定数 NotImplemented のドキュメントも仕組みの理解の中で参照しました