はじめに
頑張れば、何かがあるって、信じてる。nikkieです。
週末ログの後編です。
PyTorchでモデルを訓練した後、KFServingでサーブする部分です。
目次
1. KFServingが使える環境を用意する
前編を参照ください。
2. PyTorchでテキスト分類するモデルを用意
PyTorchのチュートリアルの中から、テキストの多クラス分類のモデルを選択しました。
AG_NEWS
の記事を4つのカテゴリのいずれかに分類します1。
KFServingのサンプルにあるCIFAR10を使った画像分類から、問題設定とモデルを変えました。
$ python -V # venvを使用
Python 3.7.3
$ pip list # 手で入れたパッケージを示します
black 19.10b0
flake8 3.7.9
ipython 7.13.0
requests 2.23.0
torch 1.4.0
torchtext 0.5.0
続く3-1のステップを通ったもの(ag_news.py
)を示します2。
import os
import time
import torch
from torchtext.datasets import text_classification
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.data.dataset import random_split
from torchtext.data.utils import get_tokenizer, ngrams_iterator
NGRAMS = 2
BATCH_SIZE = 16
EMBED_DIM = 32
N_EPOCHS = 5
min_valid_loss = float("inf")
class Net(nn.Module):
def __init__(self, vocab_size=1308844, embed_dim=EMBED_DIM, num_class=4):
super().__init__()
self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=True)
self.fc = nn.Linear(embed_dim, num_class)
self.init_weights()
def init_weights(self):
initrange = 0.5
self.embedding.weight.data.uniform_(-initrange, initrange)
self.fc.weight.data.uniform_(-initrange, initrange)
self.fc.bias.data.zero_()
def forward(self, text, offsets=None):
if offsets is None:
offsets = torch.tensor([0])
embedded = self.embedding(text, offsets)
return self.fc(embedded)
def generate_batch(batch):
label = torch.tensor([entry[0] for entry in batch])
text = [entry[1] for entry in batch]
offsets = [0] + [len(entry) for entry in text]
offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)
text = torch.cat(text)
return text, offsets, label
def train_func(sub_train_):
train_loss = 0
train_acc = 0
data = DataLoader(
sub_train_,
batch_size=BATCH_SIZE,
shuffle=True,
collate_fn=generate_batch,
)
for text, offsets, cls_ in data:
optimizer.zero_grad()
text, offsets, cls_ = (
text.to(device),
offsets.to(device),
cls_.to(device),
)
output = model(text, offsets)
loss = criterion(output, cls_)
train_loss += loss.item()
loss.backward()
optimizer.step()
train_acc += (output.argmax(1) == cls_).sum().item()
scheduler.step()
return train_loss / len(sub_train_), train_acc / len(sub_train_)
def test(data_):
loss = 0
acc = 0
data = DataLoader(data_, batch_size=BATCH_SIZE, collate_fn=generate_batch)
for text, offsets, cls_ in data:
text, offsets, cls_ = (
text.to(device),
offsets.to(device),
cls_.to(device),
)
with torch.no_grad():
output = model(text, offsets)
loss = criterion(output, cls_)
loss += loss.item()
acc += (output.argmax(1) == cls_).sum().item()
return loss / len(data_), acc / len(data_)
if __name__ == "__main__":
if not os.path.isdir("./.data"):
os.mkdir("./.data")
train_dataset, test_dataset = text_classification.DATASETS["AG_NEWS"](
root="./.data", ngrams=NGRAMS, vocab=None
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Net().to(device)
criterion = torch.nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=4.0)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.9)
train_len = int(len(train_dataset) * 0.95)
sub_train_, sub_valid_ = random_split(
train_dataset, [train_len, len(train_dataset) - train_len]
)
for epoch in range(N_EPOCHS):
start_time = time.time()
train_loss, train_acc = train_func(sub_train_)
valid_loss, valid_acc = test(sub_valid_)
secs = int(time.time() - start_time)
mins = secs // 60
secs = secs % 60
print(f"Epoch {epoch+1} | time in {mins} minutes, {secs} seconds")
print(
f"\tLoss: {train_loss:.4f}(train)\t|",
f"\tAcc: {train_acc * 100:.1f}%(train)",
)
print(
f"\tLoss: {valid_loss:.4f}(valid)\t|",
f"\tAcc: {valid_acc * 100:.1f}%(valid)",
)
print("checking the results of test dataset...")
test_loss, test_acc = test(test_dataset)
print(
f"\tLoss: {test_loss:.4f}(test)\t|\tAcc: {test_acc * 100:.1f}%(test)"
)
torch.save(model.state_dict(), "model.pt")
ag_news_label = {1: "World", 2: "Sports", 3: "Business", 4: "Sci/Tec"}
ex_text_str = "MEMPHIS, Tenn. – Four days ago, Jon Rahm was \
enduring the season’s worst weather conditions on Sunday at The \
Open on his way to a closing 75 at Royal Portrush, which \
considering the wind and the rain was a respectable showing. \
Thursday’s first round at the WGC-FedEx St. Jude Invitational \
was another story. With temperatures in the mid-80s and hardly any \
wind, the Spaniard was 13 strokes better in a flawless round. \
Thanks to his best putting performance on the PGA Tour, Rahm \
finished with an 8-under 62 for a three-stroke lead, which \
was even more impressive considering he’d never played the \
front nine at TPC Southwind."
サーブするモデルに渡すテンソルを用意
今回は小さく試すため、テキストではなく、テンソル化した(=前処理済みの)テキストをモデルに与えるとします3。
$ python -i ag_news.py
:
Epoch 5 | time in 0 minutes, 22 seconds
Loss: 0.0022(train) | Acc: 99.0%(train)
Loss: 0.0000(valid) | Acc: 90.7%(valid)
checking the results of test dataset...
Loss: 0.0002(test) | Acc: 89.6%(test)
>>> tokenizer = get_tokenizer("basic_english")
>>> ex_tokenized = tokenizer(ex_text_str) # ag_news.py中に定義したテキストをトークナイズ
>>> vocab = train_dataset.get_vocab()
>>> tokens = [vocab[token] for token in ngrams_iterator(ex_tokenized, NGRAMS)]
>>> len(tokens)
237
>>> import json
>>> # テンソル化したテキストのJSONを作成
>>> with open('input.json', 'w') as f:
... json.dump({'instances': [tokens]}, f, indent=4)
3. KFServingでサーブを試す
3-1 ローカルでモデルをサーブする
まず、PyTorchのモデルをKFServingでサーブするサンプルで使っているpytorchserver
を入手します(python -m pytorchserver ...
)。
これはPyPIには上がっておらず、リポジトリのコードからインストールする必要がありました。
$ git clone git@github.com:kubeflow/kfserving.git
$ cd kfserving/python/pytorchserver/
$ pip install -e .
$ cd ../../.. # 元の階層に戻る
ディレクトリ配置
.
├── __pycache__
├── ag_news.py # 2で作成
├── env
├── input.json # 2で作成
├── kfserving # 3-1でclone
├── model.pt # 2で作成
└── pytorch.yaml # 3-2で作る
model.ptがあるディレクトリでpython -m pytorchserver --model_dir ./ --model_name pytorchmodel --model_class_name Net
を実行します。
--model_dir
で指定したディレクトリにあるPythonスクリプト(ここではag_news.py)をimportするようです。
- Pythonスクリプトが 複数あるとエラー になりました
if __name__ == "__main__":
を書いていなかったら、学習を1から実行しました
サーバが起動したら、サンプルに沿ってrequests
でlocalhostにinput.jsonの内容をPOSTします。
In [1]: import json
In [11]: with open('input.json') as f:
...: form_data = json.load(f)
...:
In [14]: import requests
In [15]: res = requests.post('http://localhost:8080/v1/models/pytorchmodel:predict', json=form_data)
In [16]: res
Out[16]: <Response [200]>
In [17]: res.text
Out[17]: '{"predictions": [[-1.8232978582382202, 7.340854644775391, -1.85544753074646, -4.12541389465332]]}'
ここでつまづいたのは2点:
(1) モデルをロードする際に引数を渡せません4
self.model = model_class().to(self.device)
そこで、Net.__init__
の引数にデフォルト値を指定しました(マジックナンバーの正体です)。
以下のようにデフォルト値を調べましたが、引数のデフォルト値は一度だけ評価されるので、len(train_dataset.get_vocab())
を指定してもいいかもしれません。
In [2]: from torchtext.datasets import text_classification
In [3]: train_dataset, test_dataset = text_classification.DATASETS["AG_NEWS"](
...: root="./.data", ngrams=2, vocab=None)
In [4]: len(train_dataset.get_vocab())
Out[4]: 1308844
In [5]: len(train_dataset.get_labels())
Out[5]: 4
モデルのロードで引数を渡したいと、Issueも上がっていました:
(2) 送られてくるJSONは以下のように操作されます5
inputs = torch.tensor(request["instances"]).to(self.device)
self.model(inputs).tolist()
model(inputs)
と渡すためNet.forward
のoffsets
引数にもデフォルト値が必要でした6。
現状では、POSTしたJSONからoffsets
引数に値は渡せないと思います。
また現状は、POSTするデータとしては、一度に1テキストしかポストできないようです(キーがinstances
ですが、値の実体はinstance)。
2. GKEでモデルをサーブする(ハマり中)
訓練したモデルをGCSに置きます(今回バケットはコンソールから作りましたが、gcloud
コマンドでも作れそうですね)。
$ gsutil cp model.pt gs://nikkie-knative-project/models/pytorch/ag_news/
サンプルに沿ったyamlファイルを準備します。
apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
name: "pytorch-agnews"
spec:
default:
predictor:
pytorch:
storageUri: "gs://nikkie-knative-project/models/pytorch/ag_news/"
modelClassName: "Net"
kubectl apply
でKFServingのリソースをデプロイ!
$ kubectl apply -f pytorch.yaml
inferenceservice.serving.kubeflow.org/pytorch-agnews created
ところが、READYがFalseとなって、URLを取得できません😱
$ kubectl get inferenceservices
NAME URL READY DEFAULT TRAFFIC CANARY TRAFFIC AGE
pytorch-agnews False 95s
暗中模索
似た事象のIssue
k8sの練度が低く、Issueの情報が活かせていません。
特に、どのリソースについてログが見られるかがよく分かっておらず、「Issueに書いてある情報はどうやって出すんだろう」という状況です。
別の問題を解消
GitHub - kubeflow/kfserving: Serverless Inferencing on Kubernetes にあったインストール後のテストを試しました。
$ kubectl get po -n kfserving-system
$ kubectl apply -f kfserving/docs/samples/sklearn/sklearn.yaml
$ kubectl get inferenceservices sklearn-iris
NAME URL READY DEFAULT TRAFFIC CANARY TRAFFIC AGE
sklearn-iris False 19s
READYがFalseだった7ため、テストに続くトラブルシューティング8からk8s 1.15以上でオススメのコマンドを試します(リソースはdelete
しています)。
kubectl patch mutatingwebhookconfiguration inferenceservice.serving.kubeflow.org --patch '{"webhooks":[{"name": "inferenceservice.kfserving-webhook-server.pod-mutator","objectSelector":{"matchExpressions":[{"key":"serving.kubeflow.org/inferenceservice", "operator": "Exists"}]}}]}'
すると、READYがFalseという事象は解決しました。
しかし、リクエストを送ると503が返ります。
$ kubectl get inferenceservices sklearn-iris
NAME URL READY DEFAULT TRAFFIC CANARY TRAFFIC AGE
sklearn-iris http://sklearn-iris.default.example.com/v1/models/sklearn-iris True 100
$ kubectl port-forward --namespace istio-system $(kubectl get pod --namespace istio-system --selector="app=istio-ingressgateway" --output jsonpath='{.items[0].metadata.name}') 8080:80
# 別のターミナルで
$ curl -v -H "Host: sklearn-iris.default.example.com" http://localhost:8080/v1/models/sklearn-iris:predict -d @./kfserving/docs/samples/sklearn/iris-input.json
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /v1/models/sklearn-iris:predict HTTP/1.1
> Host: sklearn-iris.default.example.com
> User-Agent: curl/7.54.0
> Accept: */*
> Content-Length: 76
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 76 out of 76 bytes
< HTTP/1.1 503 Service Unavailable
< date: Sun, 12 Apr 2020 09:28:08 GMT
< server: istio-envoy
< connection: close
< content-length: 0
<
* Closing connection 0
CIFAR10の例でも同様のエラー
git clone
していたので試してみました。
しかしながら、READYはFalseのままです。
前掲のトラブルシューティングで知ったログの出力を試すと
$ kubectl logs -l app=networking-istio -n knative-serving
:
W0412 13:10:04.732258 1 reflector.go:299] runtime/asm_amd64.s:1357: watch of *v1.ConfigMap ended with: too old resource version: 137817 (139711)
too old resource version ... 🤔(分からない。。)
お片付け
逆順にdeleteしていきます。
$ kubectl delete -f pytorch.yaml
$ kubectl delete -f $CONFIG_URI
$ kubectl delete --filename https://github.com/knative/serving/releases/download/v0.13.0/serving-istio.yaml
$ kubectl delete -f https://raw.githubusercontent.com/knative/serving/master/third_party/istio-${ISTIO_VERSION}/istio-minimal.yaml
$ kubectl delete -f https://raw.githubusercontent.com/knative/serving/master/third_party/istio-${ISTIO_VERSION}/istio-crds.yaml
$ kubectl delete --filename https://github.com/knative/serving/releases/download/v0.13.0/serving-core.yaml
$ kubectl delete --filename https://github.com/knative/serving/releases/download/v0.13.0/serving-crds.yaml
# 1-1で参照したドキュメントより Cleaning up
$ gcloud container clusters delete $CLUSTER_NAME --zone $CLUSTER_ZONE
感想
ローカルでは動くので、GKE上の環境の構築ミスだと思うのですが、k8sの練度が低くて今回は切り分けられませんでした。
k8sの理解が甘いので、ひとまずリソースとログの関係をキャッチアップしたいところです。
IstioやKnativeをそれぞれ触って理解を深めるのもいいかもしれません。
また、今回は中を見ずにapply
したyamlファイルを覗いて9、どうあるべきかを掴むことも考えられます。
KFServingはじめKubeflowは機械学習環境を自力で自由に整えられるツールと理解しています。
好きなように整えられるのは魅力ですが、自由を謳歌するにはやはり技術力が必要ですね。
他方では、マネージドな機械学習環境として、SageMakerなどクラウドベンダー各社が提供するものを使うという選択肢もあります。
KFServingのyamlは非常にシンプルだったので、k8sの練度を上げて、今回の事象を解決したいところです。
読まれた方でピンときた方はコメントやTwitterで教えていただけると大変助かります。