nikkie-ftnextの日記

イベントレポートや読書メモを発信

週末ログ | GKEでKFServingを使ってPyTorchのモデルのサーブを試しハマりました(後編:デプロイしたら暗中模索)

はじめに

頑張れば、何かがあるって、信じてる。nikkieです。

週末ログの後編です。
PyTorchでモデルを訓練した後、KFServingでサーブする部分です。

目次

1. KFServingが使える環境を用意する

前編を参照ください。

2. PyTorchでテキスト分類するモデルを用意

PyTorchのチュートリアルの中から、テキストの多クラス分類のモデルを選択しました。
AG_NEWSの記事を4つのカテゴリのいずれかに分類します1
KFServingのサンプルにあるCIFAR10を使った画像分類から、問題設定とモデルを変えました。

Python環境

$ python -V  # venvを使用
Python 3.7.3
$ pip list  # 手で入れたパッケージを示します
black                    19.10b0
flake8                   3.7.9
ipython                  7.13.0
requests                 2.23.0
torch                    1.4.0
torchtext                0.5.0

モデルの学習スクリプト

続く3-1のステップを通ったもの(ag_news.py)を示します2

import os
import time

import torch
from torchtext.datasets import text_classification
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.data.dataset import random_split
from torchtext.data.utils import get_tokenizer, ngrams_iterator


NGRAMS = 2
BATCH_SIZE = 16
EMBED_DIM = 32
N_EPOCHS = 5
min_valid_loss = float("inf")


class Net(nn.Module):
    def __init__(self, vocab_size=1308844, embed_dim=EMBED_DIM, num_class=4):
        super().__init__()
        self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=True)
        self.fc = nn.Linear(embed_dim, num_class)
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, text, offsets=None):
        if offsets is None:
            offsets = torch.tensor([0])
        embedded = self.embedding(text, offsets)
        return self.fc(embedded)


def generate_batch(batch):
    label = torch.tensor([entry[0] for entry in batch])
    text = [entry[1] for entry in batch]
    offsets = [0] + [len(entry) for entry in text]
    offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)
    text = torch.cat(text)
    return text, offsets, label


def train_func(sub_train_):
    train_loss = 0
    train_acc = 0
    data = DataLoader(
        sub_train_,
        batch_size=BATCH_SIZE,
        shuffle=True,
        collate_fn=generate_batch,
    )
    for text, offsets, cls_ in data:
        optimizer.zero_grad()
        text, offsets, cls_ = (
            text.to(device),
            offsets.to(device),
            cls_.to(device),
        )
        output = model(text, offsets)
        loss = criterion(output, cls_)
        train_loss += loss.item()
        loss.backward()
        optimizer.step()
        train_acc += (output.argmax(1) == cls_).sum().item()
    scheduler.step()
    return train_loss / len(sub_train_), train_acc / len(sub_train_)


def test(data_):
    loss = 0
    acc = 0
    data = DataLoader(data_, batch_size=BATCH_SIZE, collate_fn=generate_batch)
    for text, offsets, cls_ in data:
        text, offsets, cls_ = (
            text.to(device),
            offsets.to(device),
            cls_.to(device),
        )
        with torch.no_grad():
            output = model(text, offsets)
            loss = criterion(output, cls_)
            loss += loss.item()
            acc += (output.argmax(1) == cls_).sum().item()
    return loss / len(data_), acc / len(data_)


if __name__ == "__main__":
    if not os.path.isdir("./.data"):
        os.mkdir("./.data")

    train_dataset, test_dataset = text_classification.DATASETS["AG_NEWS"](
        root="./.data", ngrams=NGRAMS, vocab=None
    )
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # VOCAB_SIZE = len(train_dataset.get_vocab())
    # NUM_CLASS = len(train_dataset.get_labels())
    model = Net().to(device)

    criterion = torch.nn.CrossEntropyLoss().to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=4.0)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.9)

    train_len = int(len(train_dataset) * 0.95)
    sub_train_, sub_valid_ = random_split(
        train_dataset, [train_len, len(train_dataset) - train_len]
    )

    for epoch in range(N_EPOCHS):
        start_time = time.time()
        train_loss, train_acc = train_func(sub_train_)
        valid_loss, valid_acc = test(sub_valid_)

        secs = int(time.time() - start_time)
        mins = secs // 60
        secs = secs % 60

        print(f"Epoch {epoch+1} | time in {mins} minutes, {secs} seconds")
        print(
            f"\tLoss: {train_loss:.4f}(train)\t|",
            f"\tAcc: {train_acc * 100:.1f}%(train)",
        )
        print(
            f"\tLoss: {valid_loss:.4f}(valid)\t|",
            f"\tAcc: {valid_acc * 100:.1f}%(valid)",
        )

    print("checking the results of test dataset...")
    test_loss, test_acc = test(test_dataset)
    print(
        f"\tLoss: {test_loss:.4f}(test)\t|\tAcc: {test_acc * 100:.1f}%(test)"
    )

    torch.save(model.state_dict(), "model.pt")

    ag_news_label = {1: "World", 2: "Sports", 3: "Business", 4: "Sci/Tec"}

    ex_text_str = "MEMPHIS, Tenn. – Four days ago, Jon Rahm was \
        enduring the season’s worst weather conditions on Sunday at The \
        Open on his way to a closing 75 at Royal Portrush, which \
        considering the wind and the rain was a respectable showing. \
        Thursday’s first round at the WGC-FedEx St. Jude Invitational \
        was another story. With temperatures in the mid-80s and hardly any \
        wind, the Spaniard was 13 strokes better in a flawless round. \
        Thanks to his best putting performance on the PGA Tour, Rahm \
        finished with an 8-under 62 for a three-stroke lead, which \
        was even more impressive considering he’d never played the \
        front nine at TPC Southwind."

サーブするモデルに渡すテンソルを用意

今回は小さく試すため、テキストではなく、テンソル化した(=前処理済みの)テキストをモデルに与えるとします3

$ python -i ag_news.py
 :
Epoch 5 | time in 0 minutes, 22 seconds
    Loss: 0.0022(train) |   Acc: 99.0%(train)
    Loss: 0.0000(valid) |   Acc: 90.7%(valid)
checking the results of test dataset...
    Loss: 0.0002(test)  |   Acc: 89.6%(test)
>>> tokenizer = get_tokenizer("basic_english")
>>> ex_tokenized = tokenizer(ex_text_str)  # ag_news.py中に定義したテキストをトークナイズ
>>> vocab = train_dataset.get_vocab()
>>> tokens = [vocab[token] for token in ngrams_iterator(ex_tokenized, NGRAMS)]
>>> len(tokens)
237
>>> import json
>>> # テンソル化したテキストのJSONを作成
>>> with open('input.json', 'w') as f:
...     json.dump({'instances': [tokens]}, f, indent=4)

3. KFServingでサーブを試す

3-1 ローカルでモデルをサーブする

まず、PyTorchのモデルをKFServingでサーブするサンプルで使っているpytorchserverを入手します(python -m pytorchserver ...)。
これはPyPIには上がっておらず、リポジトリのコードからインストールする必要がありました。

$ git clone git@github.com:kubeflow/kfserving.git
$ cd kfserving/python/pytorchserver/
$ pip install -e .
$ cd ../../..  # 元の階層に戻る

ディレクトリ配置

.
├── __pycache__
├── ag_news.py  # 2で作成
├── env
├── input.json  # 2で作成
├── kfserving  # 3-1でclone
├── model.pt  # 2で作成
└── pytorch.yaml  # 3-2で作る

model.ptがあるディレクトリでpython -m pytorchserver --model_dir ./ --model_name pytorchmodel --model_class_name Netを実行します。
--model_dirで指定したディレクトリにあるPythonスクリプト(ここではag_news.py)をimportするようです。

  • Pythonスクリプト複数あるとエラー になりました
  • if __name__ == "__main__":を書いていなかったら、学習を1から実行しました

サーバが起動したら、サンプルに沿ってrequestslocalhostにinput.jsonの内容をPOSTします。

In [1]: import json

In [11]: with open('input.json') as f:
    ...:     form_data = json.load(f)
    ...:

In [14]: import requests

In [15]: res = requests.post('http://localhost:8080/v1/models/pytorchmodel:predict', json=form_data)

In [16]: res
Out[16]: <Response [200]>

In [17]: res.text
Out[17]: '{"predictions": [[-1.8232978582382202, 7.340854644775391, -1.85544753074646, -4.12541389465332]]}'

ここでつまづいたのは2点:

(1) モデルをロードする際に引数を渡せません4

self.model = model_class().to(self.device)

そこで、Net.__init__の引数にデフォルト値を指定しました(マジックナンバーの正体です)。
以下のようにデフォルト値を調べましたが、引数のデフォルト値は一度だけ評価されるので、len(train_dataset.get_vocab())を指定してもいいかもしれません。

In [2]: from torchtext.datasets import text_classification

In [3]: train_dataset, test_dataset = text_classification.DATASETS["AG_NEWS"](
   ...:         root="./.data", ngrams=2, vocab=None)

In [4]: len(train_dataset.get_vocab())
Out[4]: 1308844

In [5]: len(train_dataset.get_labels())
Out[5]: 4

モデルのロードで引数を渡したいと、Issueも上がっていました:

(2) 送られてくるJSONは以下のように操作されます5

inputs = torch.tensor(request["instances"]).to(self.device)
self.model(inputs).tolist()

model(inputs)と渡すためNet.forwardoffsets引数にもデフォルト値が必要でした6
現状では、POSTしたJSONからoffsets引数に値は渡せないと思います。

また現状は、POSTするデータとしては、一度に1テキストしかポストできないようです(キーがinstancesですが、値の実体はinstance)。

2. GKEでモデルをサーブする(ハマり中)

訓練したモデルをGCSに置きます(今回バケットはコンソールから作りましたが、gcloudコマンドでも作れそうですね)。

$ gsutil cp model.pt  gs://nikkie-knative-project/models/pytorch/ag_news/

サンプルに沿ったyamlファイルを準備します。

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "pytorch-agnews"
spec:
  default:
    predictor:
      pytorch:
        storageUri: "gs://nikkie-knative-project/models/pytorch/ag_news/"
        modelClassName: "Net"

kubectl applyでKFServingのリソースをデプロイ!

$ kubectl apply -f pytorch.yaml
inferenceservice.serving.kubeflow.org/pytorch-agnews created

ところが、READYがFalseとなって、URLを取得できません😱

$ kubectl get inferenceservices
NAME             URL   READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
pytorch-agnews         False                                      95s

暗中模索

似た事象のIssue

k8sの練度が低く、Issueの情報が活かせていません。
特に、どのリソースについてログが見られるかがよく分かっておらず、「Issueに書いてある情報はどうやって出すんだろう」という状況です。

別の問題を解消

GitHub - kubeflow/kfserving: Serverless Inferencing on Kubernetes にあったインストール後のテストを試しました。

$ kubectl get po -n kfserving-system
$ kubectl apply -f kfserving/docs/samples/sklearn/sklearn.yaml
$ kubectl get inferenceservices sklearn-iris
NAME           URL   READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
sklearn-iris         False                                      19s

READYがFalseだった7ため、テストに続くトラブルシューティング8からk8s 1.15以上でオススメのコマンドを試します(リソースはdeleteしています)。

kubectl patch mutatingwebhookconfiguration inferenceservice.serving.kubeflow.org --patch '{"webhooks":[{"name": "inferenceservice.kfserving-webhook-server.pod-mutator","objectSelector":{"matchExpressions":[{"key":"serving.kubeflow.org/inferenceservice", "operator": "Exists"}]}}]}'

すると、READYがFalseという事象は解決しました。
しかし、リクエストを送ると503が返ります。

$ kubectl get inferenceservices sklearn-iris
NAME           URL                                                              READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
sklearn-iris   http://sklearn-iris.default.example.com/v1/models/sklearn-iris   True    100
$ kubectl port-forward --namespace istio-system $(kubectl get pod --namespace istio-system --selector="app=istio-ingressgateway" --output jsonpath='{.items[0].metadata.name}') 8080:80

# 別のターミナルで
$ curl -v -H "Host: sklearn-iris.default.example.com" http://localhost:8080/v1/models/sklearn-iris:predict -d @./kfserving/docs/samples/sklearn/iris-input.json
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /v1/models/sklearn-iris:predict HTTP/1.1
> Host: sklearn-iris.default.example.com
> User-Agent: curl/7.54.0
> Accept: */*
> Content-Length: 76
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 76 out of 76 bytes
< HTTP/1.1 503 Service Unavailable
< date: Sun, 12 Apr 2020 09:28:08 GMT
< server: istio-envoy
< connection: close
< content-length: 0
<
* Closing connection 0

CIFAR10の例でも同様のエラー

git cloneしていたので試してみました。
しかしながら、READYはFalseのままです。

前掲のトラブルシューティングで知ったログの出力を試すと

$ kubectl logs -l app=networking-istio -n knative-serving
 :
W0412 13:10:04.732258       1 reflector.go:299] runtime/asm_amd64.s:1357: watch of *v1.ConfigMap ended with: too old resource version: 137817 (139711)

too old resource version ... 🤔(分からない。。)

お片付け

逆順にdeleteしていきます。

$ kubectl delete -f pytorch.yaml
$ kubectl delete -f $CONFIG_URI
$ kubectl delete --filename https://github.com/knative/serving/releases/download/v0.13.0/serving-istio.yaml
$ kubectl delete -f https://raw.githubusercontent.com/knative/serving/master/third_party/istio-${ISTIO_VERSION}/istio-minimal.yaml
$ kubectl delete -f https://raw.githubusercontent.com/knative/serving/master/third_party/istio-${ISTIO_VERSION}/istio-crds.yaml
$ kubectl delete --filename https://github.com/knative/serving/releases/download/v0.13.0/serving-core.yaml
$ kubectl delete --filename https://github.com/knative/serving/releases/download/v0.13.0/serving-crds.yaml

# 1-1で参照したドキュメントより Cleaning up
$ gcloud container clusters delete $CLUSTER_NAME --zone $CLUSTER_ZONE

感想

ローカルでは動くので、GKE上の環境の構築ミスだと思うのですが、k8sの練度が低くて今回は切り分けられませんでした。
k8sの理解が甘いので、ひとまずリソースとログの関係をキャッチアップしたいところです。
IstioやKnativeをそれぞれ触って理解を深めるのもいいかもしれません。
また、今回は中を見ずにapplyしたyamlファイルを覗いて9、どうあるべきかを掴むことも考えられます。

KFServingはじめKubeflowは機械学習環境を自力で自由に整えられるツールと理解しています。
好きなように整えられるのは魅力ですが、自由を謳歌するにはやはり技術力が必要ですね。
他方では、マネージドな機械学習環境として、SageMakerなどクラウドベンダー各社が提供するものを使うという選択肢もあります。

KFServingのyamlは非常にシンプルだったので、k8sの練度を上げて、今回の事象を解決したいところです。
読まれた方でピンときた方はコメントやTwitterで教えていただけると大変助かります。


  1. torchtext.datasets — torchtext 0.5.1 documentation

  2. PyTorchを使っての発見もいくつかあったのですが、それはまたの機会とします

  3. 今回デプロイするリソースはpredictorだけですが、pre-processorpost-processorをデプロイすることにより、KFServing側で前処理(例:テキストのテンソル化)や後処理(例:出力されたテンソルをクラスラベルに変換)ができそうです(前掲の101 Slidesより)

  4. https://github.com/kubeflow/kfserving/blob/8c261457b3ec8017736b882c4ffd3379914471ac/python/pytorchserver/pytorchserver/model.py#L57

  5. https://github.com/kubeflow/kfserving/blob/8c261457b3ec8017736b882c4ffd3379914471ac/python/pytorchserver/pytorchserver/model.py#L66

  6. この検証には Saving and Loading Models — PyTorch Tutorials 1.4.0 documentation が参考になりました。対話モードでモデルをロードし試行錯誤しました。

  7. もしかするとkubectl get inferenceservicesで確認するタイミングが早かったのかもしれません

  8. https://github.com/kubeflow/kfserving/blob/master/docs/DEVELOPER_GUIDE.md#troubleshooting

  9. GitHub - kubeflow/manifests: A repository for Kustomize manifests