はじめに
頑張れば、何かがあるって、信じてる。nikkieです。
週末ログの後編です。
PyTorchでモデルを訓練した後、KFServingでサーブする部分です。
目次
1. KFServingが使える環境を用意する
前編を参照ください。
2. PyTorchでテキスト分類するモデルを用意
PyTorchのチュートリアルの中から、テキストの多クラス分類のモデルを選択しました。
AG_NEWS
の記事を4つのカテゴリのいずれかに分類します1。
KFServingのサンプルにあるCIFAR10を使った画像分類から、問題設定とモデルを変えました。
Python環境
$ python -V # venvを使用 Python 3.7.3 $ pip list # 手で入れたパッケージを示します black 19.10b0 flake8 3.7.9 ipython 7.13.0 requests 2.23.0 torch 1.4.0 torchtext 0.5.0
モデルの学習スクリプト
続く3-1のステップを通ったもの(ag_news.py
)を示します2。
import os import time import torch from torchtext.datasets import text_classification import torch.nn as nn from torch.utils.data import DataLoader from torch.utils.data.dataset import random_split from torchtext.data.utils import get_tokenizer, ngrams_iterator NGRAMS = 2 BATCH_SIZE = 16 EMBED_DIM = 32 N_EPOCHS = 5 min_valid_loss = float("inf") class Net(nn.Module): def __init__(self, vocab_size=1308844, embed_dim=EMBED_DIM, num_class=4): super().__init__() self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=True) self.fc = nn.Linear(embed_dim, num_class) self.init_weights() def init_weights(self): initrange = 0.5 self.embedding.weight.data.uniform_(-initrange, initrange) self.fc.weight.data.uniform_(-initrange, initrange) self.fc.bias.data.zero_() def forward(self, text, offsets=None): if offsets is None: offsets = torch.tensor([0]) embedded = self.embedding(text, offsets) return self.fc(embedded) def generate_batch(batch): label = torch.tensor([entry[0] for entry in batch]) text = [entry[1] for entry in batch] offsets = [0] + [len(entry) for entry in text] offsets = torch.tensor(offsets[:-1]).cumsum(dim=0) text = torch.cat(text) return text, offsets, label def train_func(sub_train_): train_loss = 0 train_acc = 0 data = DataLoader( sub_train_, batch_size=BATCH_SIZE, shuffle=True, collate_fn=generate_batch, ) for text, offsets, cls_ in data: optimizer.zero_grad() text, offsets, cls_ = ( text.to(device), offsets.to(device), cls_.to(device), ) output = model(text, offsets) loss = criterion(output, cls_) train_loss += loss.item() loss.backward() optimizer.step() train_acc += (output.argmax(1) == cls_).sum().item() scheduler.step() return train_loss / len(sub_train_), train_acc / len(sub_train_) def test(data_): loss = 0 acc = 0 data = DataLoader(data_, batch_size=BATCH_SIZE, collate_fn=generate_batch) for text, offsets, cls_ in data: text, offsets, cls_ = ( text.to(device), offsets.to(device), cls_.to(device), ) with torch.no_grad(): output = model(text, offsets) loss = criterion(output, cls_) loss += loss.item() acc += (output.argmax(1) == cls_).sum().item() return loss / len(data_), acc / len(data_) if __name__ == "__main__": if not os.path.isdir("./.data"): os.mkdir("./.data") train_dataset, test_dataset = text_classification.DATASETS["AG_NEWS"]( root="./.data", ngrams=NGRAMS, vocab=None ) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # VOCAB_SIZE = len(train_dataset.get_vocab()) # NUM_CLASS = len(train_dataset.get_labels()) model = Net().to(device) criterion = torch.nn.CrossEntropyLoss().to(device) optimizer = torch.optim.SGD(model.parameters(), lr=4.0) scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.9) train_len = int(len(train_dataset) * 0.95) sub_train_, sub_valid_ = random_split( train_dataset, [train_len, len(train_dataset) - train_len] ) for epoch in range(N_EPOCHS): start_time = time.time() train_loss, train_acc = train_func(sub_train_) valid_loss, valid_acc = test(sub_valid_) secs = int(time.time() - start_time) mins = secs // 60 secs = secs % 60 print(f"Epoch {epoch+1} | time in {mins} minutes, {secs} seconds") print( f"\tLoss: {train_loss:.4f}(train)\t|", f"\tAcc: {train_acc * 100:.1f}%(train)", ) print( f"\tLoss: {valid_loss:.4f}(valid)\t|", f"\tAcc: {valid_acc * 100:.1f}%(valid)", ) print("checking the results of test dataset...") test_loss, test_acc = test(test_dataset) print( f"\tLoss: {test_loss:.4f}(test)\t|\tAcc: {test_acc * 100:.1f}%(test)" ) torch.save(model.state_dict(), "model.pt") ag_news_label = {1: "World", 2: "Sports", 3: "Business", 4: "Sci/Tec"} ex_text_str = "MEMPHIS, Tenn. – Four days ago, Jon Rahm was \ enduring the season’s worst weather conditions on Sunday at The \ Open on his way to a closing 75 at Royal Portrush, which \ considering the wind and the rain was a respectable showing. \ Thursday’s first round at the WGC-FedEx St. Jude Invitational \ was another story. With temperatures in the mid-80s and hardly any \ wind, the Spaniard was 13 strokes better in a flawless round. \ Thanks to his best putting performance on the PGA Tour, Rahm \ finished with an 8-under 62 for a three-stroke lead, which \ was even more impressive considering he’d never played the \ front nine at TPC Southwind."
サーブするモデルに渡すテンソルを用意
今回は小さく試すため、テキストではなく、テンソル化した(=前処理済みの)テキストをモデルに与えるとします3。
$ python -i ag_news.py : Epoch 5 | time in 0 minutes, 22 seconds Loss: 0.0022(train) | Acc: 99.0%(train) Loss: 0.0000(valid) | Acc: 90.7%(valid) checking the results of test dataset... Loss: 0.0002(test) | Acc: 89.6%(test) >>> tokenizer = get_tokenizer("basic_english") >>> ex_tokenized = tokenizer(ex_text_str) # ag_news.py中に定義したテキストをトークナイズ >>> vocab = train_dataset.get_vocab() >>> tokens = [vocab[token] for token in ngrams_iterator(ex_tokenized, NGRAMS)] >>> len(tokens) 237 >>> import json >>> # テンソル化したテキストのJSONを作成 >>> with open('input.json', 'w') as f: ... json.dump({'instances': [tokens]}, f, indent=4)
3. KFServingでサーブを試す
3-1 ローカルでモデルをサーブする
まず、PyTorchのモデルをKFServingでサーブするサンプルで使っているpytorchserver
を入手します(python -m pytorchserver ...
)。
これはPyPIには上がっておらず、リポジトリのコードからインストールする必要がありました。
$ git clone git@github.com:kubeflow/kfserving.git $ cd kfserving/python/pytorchserver/ $ pip install -e . $ cd ../../.. # 元の階層に戻る
ディレクトリ配置
. ├── __pycache__ ├── ag_news.py # 2で作成 ├── env ├── input.json # 2で作成 ├── kfserving # 3-1でclone ├── model.pt # 2で作成 └── pytorch.yaml # 3-2で作る
model.ptがあるディレクトリでpython -m pytorchserver --model_dir ./ --model_name pytorchmodel --model_class_name Net
を実行します。
--model_dir
で指定したディレクトリにあるPythonスクリプト(ここではag_news.py)をimportするようです。
サーバが起動したら、サンプルに沿ってrequests
でlocalhostにinput.jsonの内容をPOSTします。
In [1]: import json In [11]: with open('input.json') as f: ...: form_data = json.load(f) ...: In [14]: import requests In [15]: res = requests.post('http://localhost:8080/v1/models/pytorchmodel:predict', json=form_data) In [16]: res Out[16]: <Response [200]> In [17]: res.text Out[17]: '{"predictions": [[-1.8232978582382202, 7.340854644775391, -1.85544753074646, -4.12541389465332]]}'
ここでつまづいたのは2点:
(1) モデルをロードする際に引数を渡せません4
self.model = model_class().to(self.device)
そこで、Net.__init__
の引数にデフォルト値を指定しました(マジックナンバーの正体です)。
以下のようにデフォルト値を調べましたが、引数のデフォルト値は一度だけ評価されるので、len(train_dataset.get_vocab())
を指定してもいいかもしれません。
In [2]: from torchtext.datasets import text_classification In [3]: train_dataset, test_dataset = text_classification.DATASETS["AG_NEWS"]( ...: root="./.data", ngrams=2, vocab=None) In [4]: len(train_dataset.get_vocab()) Out[4]: 1308844 In [5]: len(train_dataset.get_labels()) Out[5]: 4
モデルのロードで引数を渡したいと、Issueも上がっていました:
inputs = torch.tensor(request["instances"]).to(self.device)
self.model(inputs).tolist()
model(inputs)
と渡すためNet.forward
のoffsets
引数にもデフォルト値が必要でした6。
現状では、POSTしたJSONからoffsets
引数に値は渡せないと思います。
また現状は、POSTするデータとしては、一度に1テキストしかポストできないようです(キーがinstances
ですが、値の実体はinstance)。
2. GKEでモデルをサーブする(ハマり中)
訓練したモデルをGCSに置きます(今回バケットはコンソールから作りましたが、gcloud
コマンドでも作れそうですね)。
$ gsutil cp model.pt gs://nikkie-knative-project/models/pytorch/ag_news/
サンプルに沿ったyamlファイルを準備します。
apiVersion: "serving.kubeflow.org/v1alpha2" kind: "InferenceService" metadata: name: "pytorch-agnews" spec: default: predictor: pytorch: storageUri: "gs://nikkie-knative-project/models/pytorch/ag_news/" modelClassName: "Net"
kubectl apply
でKFServingのリソースをデプロイ!
$ kubectl apply -f pytorch.yaml inferenceservice.serving.kubeflow.org/pytorch-agnews created
ところが、READYがFalseとなって、URLを取得できません😱
$ kubectl get inferenceservices NAME URL READY DEFAULT TRAFFIC CANARY TRAFFIC AGE pytorch-agnews False 95s
暗中模索
似た事象のIssue
k8sの練度が低く、Issueの情報が活かせていません。
特に、どのリソースについてログが見られるかがよく分かっておらず、「Issueに書いてある情報はどうやって出すんだろう」という状況です。
別の問題を解消
GitHub - kubeflow/kfserving: Serverless Inferencing on Kubernetes にあったインストール後のテストを試しました。
$ kubectl get po -n kfserving-system $ kubectl apply -f kfserving/docs/samples/sklearn/sklearn.yaml $ kubectl get inferenceservices sklearn-iris NAME URL READY DEFAULT TRAFFIC CANARY TRAFFIC AGE sklearn-iris False 19s
READYがFalseだった7ため、テストに続くトラブルシューティング8からk8s 1.15以上でオススメのコマンドを試します(リソースはdelete
しています)。
kubectl patch mutatingwebhookconfiguration inferenceservice.serving.kubeflow.org --patch '{"webhooks":[{"name": "inferenceservice.kfserving-webhook-server.pod-mutator","objectSelector":{"matchExpressions":[{"key":"serving.kubeflow.org/inferenceservice", "operator": "Exists"}]}}]}'
すると、READYがFalseという事象は解決しました。
しかし、リクエストを送ると503が返ります。
$ kubectl get inferenceservices sklearn-iris NAME URL READY DEFAULT TRAFFIC CANARY TRAFFIC AGE sklearn-iris http://sklearn-iris.default.example.com/v1/models/sklearn-iris True 100 $ kubectl port-forward --namespace istio-system $(kubectl get pod --namespace istio-system --selector="app=istio-ingressgateway" --output jsonpath='{.items[0].metadata.name}') 8080:80 # 別のターミナルで $ curl -v -H "Host: sklearn-iris.default.example.com" http://localhost:8080/v1/models/sklearn-iris:predict -d @./kfserving/docs/samples/sklearn/iris-input.json * Trying 127.0.0.1... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > POST /v1/models/sklearn-iris:predict HTTP/1.1 > Host: sklearn-iris.default.example.com > User-Agent: curl/7.54.0 > Accept: */* > Content-Length: 76 > Content-Type: application/x-www-form-urlencoded > * upload completely sent off: 76 out of 76 bytes < HTTP/1.1 503 Service Unavailable < date: Sun, 12 Apr 2020 09:28:08 GMT < server: istio-envoy < connection: close < content-length: 0 < * Closing connection 0
CIFAR10の例でも同様のエラー
git clone
していたので試してみました。
しかしながら、READYはFalseのままです。
前掲のトラブルシューティングで知ったログの出力を試すと
$ kubectl logs -l app=networking-istio -n knative-serving : W0412 13:10:04.732258 1 reflector.go:299] runtime/asm_amd64.s:1357: watch of *v1.ConfigMap ended with: too old resource version: 137817 (139711)
too old resource version ... 🤔(分からない。。)
お片付け
逆順にdeleteしていきます。
$ kubectl delete -f pytorch.yaml $ kubectl delete -f $CONFIG_URI $ kubectl delete --filename https://github.com/knative/serving/releases/download/v0.13.0/serving-istio.yaml $ kubectl delete -f https://raw.githubusercontent.com/knative/serving/master/third_party/istio-${ISTIO_VERSION}/istio-minimal.yaml $ kubectl delete -f https://raw.githubusercontent.com/knative/serving/master/third_party/istio-${ISTIO_VERSION}/istio-crds.yaml $ kubectl delete --filename https://github.com/knative/serving/releases/download/v0.13.0/serving-core.yaml $ kubectl delete --filename https://github.com/knative/serving/releases/download/v0.13.0/serving-crds.yaml # 1-1で参照したドキュメントより Cleaning up $ gcloud container clusters delete $CLUSTER_NAME --zone $CLUSTER_ZONE
感想
ローカルでは動くので、GKE上の環境の構築ミスだと思うのですが、k8sの練度が低くて今回は切り分けられませんでした。
k8sの理解が甘いので、ひとまずリソースとログの関係をキャッチアップしたいところです。
IstioやKnativeをそれぞれ触って理解を深めるのもいいかもしれません。
また、今回は中を見ずにapply
したyamlファイルを覗いて9、どうあるべきかを掴むことも考えられます。
KFServingはじめKubeflowは機械学習環境を自力で自由に整えられるツールと理解しています。
好きなように整えられるのは魅力ですが、自由を謳歌するにはやはり技術力が必要ですね。
他方では、マネージドな機械学習環境として、SageMakerなどクラウドベンダー各社が提供するものを使うという選択肢もあります。
KFServingのyamlは非常にシンプルだったので、k8sの練度を上げて、今回の事象を解決したいところです。
読まれた方でピンときた方はコメントやTwitterで教えていただけると大変助かります。
-
PyTorchを使っての発見もいくつかあったのですが、それはまたの機会とします↩
-
今回デプロイするリソースは
predictor
だけですが、pre-processor
やpost-processor
をデプロイすることにより、KFServing側で前処理(例:テキストのテンソル化)や後処理(例:出力されたテンソルをクラスラベルに変換)ができそうです(前掲の101 Slidesより)↩ -
https://github.com/kubeflow/kfserving/blob/8c261457b3ec8017736b882c4ffd3379914471ac/python/pytorchserver/pytorchserver/model.py#L57↩
-
https://github.com/kubeflow/kfserving/blob/8c261457b3ec8017736b882c4ffd3379914471ac/python/pytorchserver/pytorchserver/model.py#L66↩
-
この検証には Saving and Loading Models — PyTorch Tutorials 1.4.0 documentation が参考になりました。対話モードでモデルをロードし試行錯誤しました。↩
-
もしかすると
kubectl get inferenceservices
で確認するタイミングが早かったのかもしれません↩ -
https://github.com/kubeflow/kfserving/blob/master/docs/DEVELOPER_GUIDE.md#troubleshooting↩
-
GitHub - kubeflow/manifests: A repository for Kustomize manifests ↩