Compare commits

...

10 Commits

Author SHA1 Message Date
2ce9a31883 update service data 2023-04-19 11:46:42 +08:00
1b90edee59 add demo to download optimized model on client 2023-04-19 11:46:33 +08:00
e1f143736e update service data 2023-04-18 17:13:15 +08:00
3c39c48242 script to optimize pytorch model on server 2023-04-18 17:13:05 +08:00
ac6400e93a add one more layer to model dir 2023-04-11 11:56:56 +08:00
0e6a5b8925 update service 2023-04-11 11:56:07 +08:00
578f0ceea1 update service 2023-03-10 15:03:31 +08:00
4bf037de15 package unsupported ml model 2023-03-10 15:03:21 +08:00
1f86146b12 replace bash script to docker-compose to build server 2023-03-07 16:33:45 +08:00
ec19042d0d update file description and reference 2023-03-01 17:13:17 +08:00
14 changed files with 322 additions and 18 deletions

3
.gitignore vendored
View File

@ -1,2 +1,3 @@
.DS_Store .DS_Store
fortune_predict_model __pycache__
model

View File

@ -1,3 +1,35 @@
# test_mlflow # Abstract
測試使用MLflow紀錄Pytorch模型訓練以及從Model registry中拉取Production model進行推論。 Try to use [MLflow](https://mlflow.org) platform to log PyTorch model training, and pull production model from model registry to run inference⛩
# Requirements
* MacOS 12.5
* Docker 20.10
# Dirs
* **service**
* House MLflow service data, including MLflow artifacts, backend store and model registry
* **env**
* **mlflow.yaml**
* conda env yaml to run this repo
# Files
* **docker-compose.yaml**
* a yaml to apply docker-compose to start MLflow service with basic configuration (run ```docker-compose -f docker-compose.yaml up```)
* **test_pytorch_m1.py**
* a script to test PyTorch on Apple M1 platform with GPU acceleration
* **train.py**
* a sample code to apply PyTorch to train a small neural network to predict fortune with MLflow logging
* **predict.py**
* a sample code to call registered model to predict testing data and save model to local file system
* **get_registered_model_via_rest_api.py**
* a script to test MLflow REST api
* **log_unsupported_model.py**
* a sample script to apply mlflow.pyfunc to package unsupported ml model which can be logged and registered by mlflow
* **optimize_model.py**
* a sample script to demonstrate how to use MLflow and TensorRT libs to optimize Pytorch model on edge devices and fetch it out on client
###### tags: `MLOps`

18
docker-compose.yaml Normal file
View File

@ -0,0 +1,18 @@
version: '3.7'
services:
mlflow_server:
image: ghcr.io/mlflow/mlflow:v2.1.1
restart: always
ports:
- 5001:5001
volumes:
- ~/python/test_mlflow/service:/home
healthcheck:
test: ["CMD", "curl", "-f", "http://0.0.0.0:5001"]
interval: 30s
timeout: 10s
retries: 3
start_period: 20s
command: bash -c "apt update && apt install -y curl && mlflow server --host 0.0.0.0 --port 5001 --backend-store-uri sqlite:////home/backend.db --registry-store-uri sqlite:////home/registry.db --artifacts-destination /home/artifacts --serve-artifacts"

View File

View File

@ -12,7 +12,7 @@ def main():
production_model_version = None production_model_version = None
query = {'name': registered_model_name} query = {'name': registered_model_name}
res = requests.get('http://127.0.0.1:5000/api/2.0/mlflow/registered-models/get', params=query) res = requests.get('http://127.0.0.1:5001/api/2.0/mlflow/registered-models/get', params=query)
content = json.loads(res.text) content = json.loads(res.text)
print(content) print(content)
@ -23,7 +23,7 @@ def main():
if production_model_version is not None: if production_model_version is not None:
query = {'name': registered_model_name, 'version': production_model_version} query = {'name': registered_model_name, 'version': production_model_version}
res = requests.get('http://127.0.0.1:5000/api/2.0/mlflow/model-versions/get-download-uri', params=query) res = requests.get('http://127.0.0.1:5001/api/2.0/mlflow/model-versions/get-download-uri', params=query)
print(res.text) print(res.text)
if __name__ == '__main__': if __name__ == '__main__':

77
log_unsupported_model.py Normal file
View File

@ -0,0 +1,77 @@
# log_unsupported_model.py
#
# author: deng
# date : 20230309
import mlflow
import pandas as pd
class CustomModel(mlflow.pyfunc.PythonModel):
""" A mlflow wrapper to package unsupported model """
def __init__(self):
self.model = None
def load_model(self):
# load your custom model here
self.model = lambda value: value * 2
def predict(self,
model_input: pd.DataFrame) -> pd.DataFrame:
if self.model is None:
self.load_model()
output = model_input.apply(self.model)
return output
def log_model(server_uri:str,
exp_name: str,
registered_model_name: str) -> None:
# init mlflow
mlflow.set_tracking_uri(server_uri)
mlflow.set_experiment(exp_name)
# register custom model
model = CustomModel()
mlflow.pyfunc.log_model(
artifact_path='model',
python_model=model,
registered_model_name=registered_model_name
)
def pull_model(server_uri: str,
exp_name: str,
registered_model_name: str) -> None:
# init mlflow
mlflow.set_tracking_uri(server_uri)
mlflow.set_experiment(exp_name)
# pull model from registry
model = mlflow.pyfunc.load_model(f'models:/{registered_model_name}/latest')
model = model.unwrap_python_model() # get CustomModel object
print(f'Model loaded. model type: {type(model)}')
# test model availability
fake_data = pd.DataFrame([1, 3, 5])
output = model.predict(fake_data)
print(f'input data: {fake_data}, predictions: {output}')
# save it to local file system
mlflow.pyfunc.save_model(path=f'./model/{exp_name}', python_model=model)
if __name__ == '__main__':
server_uri = 'http://127.0.0.1:5001'
exp_name = 'custom_model'
registered_model_name = 'custom_model'
log_model(server_uri, exp_name, registered_model_name)
pull_model(server_uri, exp_name, registered_model_name)

75
optimize_model.py Normal file
View File

@ -0,0 +1,75 @@
# optimize_model.py
#
# author: deng
# date : 20230418
import shutil
from pathlib import Path
import torch
import mlflow
def optimize_pytorch_model(run_id: str,
model_artifact_dir: str = 'model') -> None:
"""Optimize Pytorch model from MLflow server on edge device
Args:
run_id (str): mlflow run id
model_artifact_dir (str, optional): model dir of run on server. Defaults to 'model'.
"""
download_path = Path('./model/downloaded_pytorch_model')
if download_path.is_dir():
print(f'Remove existed dir: {download_path}')
shutil.rmtree(download_path)
# Download model artifacts to local file system
mlflow_model = mlflow.pytorch.load_model(Path(f'runs:/{run_id}').joinpath(model_artifact_dir).as_posix())
mlflow.pytorch.save_model(mlflow_model, download_path)
# Optimize model
model = torch.load(download_path.joinpath('data/model.pth'))
dummy_input = torch.randn(5)
torch.onnx.export(model, dummy_input, download_path.joinpath('data/model.onnx'))
# we can not call TensorRT on macOS, so imagine we get a serialized model😘
download_path.joinpath('data/model.trt').touch()
# Sent optimized model back to given run
with mlflow.start_run(run_id=run_id):
mlflow.log_artifact(download_path.joinpath('data/model.trt'), 'model/data')
print(f'Optimized model had been uploaded to server: {mlflow.get_tracking_uri()}')
def download_optimized_model(run_id: str,
save_dir: str,
model_artifact_path: str = 'model/data/model.trt') -> None:
"""Download optimized model from MLflow server on clent
Args:
run_id (str): mlflow run id
save_dir (str): dir of local file system to save model
model_artifact_path (str, optional): artifact path of model on server. Defaults to 'model/data/model.trt'.
"""
mlflow.artifacts.download_artifacts(
run_id= run_id,
artifact_path=model_artifact_path,
dst_path=save_dir
)
print(f'Optimized model had been saved, please check: {Path(save_dir).joinpath(model_artifact_path)}')
if __name__ == '__main__':
mlflow.set_tracking_uri('http://127.0.0.1:5001')
optimize_pytorch_model(
run_id='f1b7b9a5ba934f158c07975a8a332de5'
)
download_optimized_model(
run_id='f1b7b9a5ba934f158c07975a8a332de5',
save_dir='./model/download_tensorrt'
)

View File

@ -7,10 +7,10 @@ import torch
import mlflow import mlflow
if __name__ == '__main__': def main():
# set MLflow server # set MLflow server
mlflow.set_tracking_uri('http://127.0.0.1:5000') mlflow.set_tracking_uri('http://127.0.0.1:5001')
# load production model # load production model
model = mlflow.pytorch.load_model('models:/fortune_predict_model/production') model = mlflow.pytorch.load_model('models:/fortune_predict_model/production')
@ -21,4 +21,8 @@ if __name__ == '__main__':
print(my_fortune) print(my_fortune)
# save model and env to local file system # save model and env to local file system
mlflow.pytorch.save_model(model, './fortune_predict_model') mlflow.pytorch.save_model(model, './model/fortune_predict_model')
if __name__ == '__main__':
main()

View File

@ -0,0 +1,98 @@
# train.py
#
# author: deng
# date : 20230221
import torch
import torch.nn as nn
from torch.optim import SGD
import mlflow
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec
from tqdm import tqdm
class Net(nn.Module):
""" define a simple neural network model """
def __init__(self):
super(Net, self).__init__()
self.fc1 = nn.Linear(5, 3)
self.fc2 = nn.Linear(3, 1)
def forward(self, x):
x = self.fc1(x)
x = torch.relu(x)
x = self.fc2(x)
return x
def train(model, dataloader, criterion, optimizer, epochs):
""" define the training function """
for epoch in tqdm(range(epochs), 'Epochs'):
for batch, (inputs, labels) in enumerate(dataloader):
# forwarding
outputs = model(inputs)
loss = criterion(outputs, labels)
# update gradient
optimizer.zero_grad()
loss.backward()
optimizer.step()
# log loss
mlflow.log_metric('train_loss', loss.item(), step=epoch)
return loss
if __name__ == '__main__':
# set hyper parameters
learning_rate = 1e-2
batch_size = 10
epochs = 20
# create a dataloader with fake data
dataloader = [(torch.randn(5), torch.randn(1)) for _ in range(100)]
dataloader = torch.utils.data.DataLoader(dataloader, batch_size=batch_size)
# create the model, criterion, and optimizer
model = Net()
criterion = nn.MSELoss()
optimizer = SGD(model.parameters(), lr=learning_rate)
# set the tracking URI to the model registry
mlflow.set_tracking_uri('http://127.0.0.1:5001')
mlflow.set_experiment('train_fortune_predict_model')
# start a new MLflow run
with mlflow.start_run():
# train the model
loss = train(model, dataloader, criterion, optimizer, epochs)
# log some additional metrics
mlflow.log_metric('final_loss', loss.item())
mlflow.log_param('learning_rate', learning_rate)
mlflow.log_param('batch_size', batch_size)
# create a signature to record model input and output info
input_schema = Schema([
ColSpec('float', 'age'),
ColSpec('float', 'mood level'),
ColSpec('float', 'health level'),
ColSpec('float', 'hungry level'),
ColSpec('float', 'sexy level')
])
output_schema = Schema([ColSpec('float', 'fortune')])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
# log trained model
mlflow.pytorch.log_model(model, 'model', signature=signature)
# log training code
mlflow.log_artifact('./train.py', 'code')
print('Completed.')

Binary file not shown.

Binary file not shown.

View File

@ -1,7 +0,0 @@
#!/bin/bash
# start_mlflow_server.sh
#
# author: deng
# date : 20230221
mlflow server --backend-store-uri sqlite:///service/backend.db --registry-store-uri sqlite:///service/registry.db --default-artifact-root ./service/artifacts --host 127.0.0.1 --port 5000 --serve-artifacts

View File

@ -1,8 +1,14 @@
# test_pytorch_m1.py
# Ref: https://towardsdatascience.com/installing-pytorch-on-apple-m1-chip-with-gpu-acceleration-3351dc44d67c
#
# author: deng
# date : 20230301
import torch import torch
import math import math
print('this ensures that the current MacOS version is at least 12.3+') print('This ensures that the current MacOS version is at least 12.3+')
print(torch.backends.mps.is_available()) print(torch.backends.mps.is_available())
print('\nthis ensures that the current current PyTorch installation was built with MPS activated.') print('\nThis ensures that the current current PyTorch installation was built with MPS activated.')
print(torch.backends.mps.is_built()) print(torch.backends.mps.is_built())

View File

@ -64,7 +64,7 @@ if __name__ == '__main__':
optimizer = SGD(model.parameters(), lr=learning_rate) optimizer = SGD(model.parameters(), lr=learning_rate)
# set the tracking URI to the model registry # set the tracking URI to the model registry
mlflow.set_tracking_uri('http://127.0.0.1:5000') mlflow.set_tracking_uri('http://127.0.0.1:5001')
mlflow.set_experiment('train_fortune_predict_model') mlflow.set_experiment('train_fortune_predict_model')
# start a new MLflow run # start a new MLflow run