はじめに
なんとかビーム! nikkieです。
PyCon APAC 2023で興味を持ったApache Beamをローカルで動かしました。
PythonでDSLっぽい書き方ができたわけですが、これがどうやって実現されているのかソースコードを覗いてみました。
目次
PythonでPipelineを定義するのがDSLっぽい!
ローカルで動かしたときのコードの抜粋です
with beam.Pipeline() as pipeline: ( pipeline | beam.io.ReadFromText("kinglear.txt") | "ExtractWords" >> beam.FlatMap(lambda row: re.findall(r"[A-Za-z']+", row)) )
|
や>>
の使い方がふだんは見かけないですよね🤩
Apache Beam SDKのPipelineの操作をするために独自の実装がされていそうです。
Pipelineとは
A pipeline object that manages a DAG of
PValue
s and theirPTransform
s.
PValueとPTransformからなるDAG(有向非巡回グラフ1)を管理するのがPipeline
- PValue:グラフのノード
- PTransform:グラフのエッジ
DAGについても気になりますが、今回はここまで。
以下のように続きます。
All the transforms applied to the pipeline must have distinct full labels.
パイプラインに適用されるtransformすべては、別個の完全なラベルを持たなければならない
新しい名前を割り当てるためにinput | "label" >> my_transform
のような書き方となるという記載もあります。
この書き方がどのように実現できるのか、PTransformの実装を見てみます。
PTransform
https://github.com/apache/beam/blob/v2.51.0/sdks/python/apache_beam/transforms/ptransform.py
>>
で新しい名前を割り当てられるのはなぜ?
結論を言うと、>>
という演算子2に関連するメソッドは__rrshift__
です。
"label" >> my_transform
という式について、
演算は以下のように進みます3
- 左項の
__rshift__
メソッドを呼び出して演算できるかをみる - 1ができない場合、右項の
__rrshift__
メソッドを呼び出して演算できるかをみる
"label" >> my_transform
という式の場合は、2に進みます
def __rrshift__(self, label): return _NamedPTransform(self, label)
つまり"label" >> my_transform
という式は_NamedPTransform
を返すわけですね。
これはPTransform
を継承したクラスです
|
でPTransformを合成できるのはなぜ?
|
には__or__
と__ror__
が関係します。
- https://docs.python.org/ja/3/reference/datamodel.html#object.__or__
- https://docs.python.org/ja/3/reference/datamodel.html#object.__ror__
__or__
は、右項がPTransformならば、左項のPTransformと合成します
def __or__(self, right): if isinstance(right, PTransform): return _ChainedPTransform(self, right) return NotImplemented
Used to compose PTransforms, e.g., ptransform1 | ptransform2. (docstringより)
__ror__
は、左項がPValueでないときにPTransformと演算できるようにするための実装のようです(Pipelineを作っている?)
Used to apply this PTransform to non-PValues, e.g., a tuple. (docstringより)
演算子の優先順位
Apache Beamの書式に沿って定義したPipelineは1つの式なので、演算子の優先順位も確認しましょう。
https://docs.python.org/ja/3/reference/expressions.html#operator-precedence
シフト演算>>
の方が、ビット単位OR|
より優先されますね。
なので、PTransformに名前を与える"label" >> my_transform
がまず全部実行されるわけです。
そして、PTransformが|
演算子で左から合成されていきます。
終わりに
Apache Beam SDKでtransform1 | "label" >> transform2
という式が実行できる実装を見てきました。
- 演算子(
|
や>>
)に対応する特殊メソッド(=ダンダーメソッド)がある - SDKのPTransformクラスでそれらの特殊メソッドを実装しているので、
transform1 | "label" >> transform2
と書ける"label" >> transform
と書けるのは、PTransformの__rrshift__
メソッドによる(新しいPTransformを返す)transform1 | transform2
と書けるのは、PTransformの__or__
(と__ror__
)メソッドによる
ダンダーメソッド使っているのではと予想していた通りでした。
一方宿題事項も残っており、完全に理解まで道半ばです
- PValue・PTransformとそれらからなるDAGについてはまだ掴みきれていない
- ローカルで動かす実装は起点(一番左の項)がPipelineインスタンスではないか。
|
演算子がどう動いている?(PTransformの__ror__
を見れば分かる?) - Pipelineをwith文でコンテキストマネージャとして使ったときに、ブロックで定義した一連のPTransformはどう実行されるんだろう?(withが閉じるのはPipelineで、それがどうPTransformたちに伝わるのか)
この先のソースリーディングはちょっと重そうですが、また時間を見つけて読んでみようと思います
- https://ja.wikipedia.org/wiki/%E6%9C%89%E5%90%91%E9%9D%9E%E5%B7%A1%E5%9B%9E%E3%82%B0%E3%83%A9%E3%83%95↩
- right shift operator(Apache Beamのドキュメントより)↩
- 理解の助けになったのは『ゼロから作るDeep Learning ❸』ステップ20〜22です。また組み込み定数 NotImplemented のドキュメントも仕組みの理解の中で参照しました↩