Compare commits

...

8 Commits

Author SHA1 Message Date
2ce9a31883 update service data 2023-04-19 11:46:42 +08:00
1b90edee59 add demo to download optimized model on client 2023-04-19 11:46:33 +08:00
e1f143736e update service data 2023-04-18 17:13:15 +08:00
3c39c48242 script to optimize pytorch model on server 2023-04-18 17:13:05 +08:00
ac6400e93a add one more layer to model dir 2023-04-11 11:56:56 +08:00
0e6a5b8925 update service 2023-04-11 11:56:07 +08:00
578f0ceea1 update service 2023-03-10 15:03:31 +08:00
4bf037de15 package unsupported ml model 2023-03-10 15:03:21 +08:00
8 changed files with 262 additions and 4 deletions

3
.gitignore vendored
View File

@ -1,2 +1,3 @@
.DS_Store .DS_Store
fortune_predict_model __pycache__
model

View File

@ -27,6 +27,9 @@ Try to use [MLflow](https://mlflow.org) platform to log PyTorch model training,
* a sample code to call registered model to predict testing data and save model to local file system * a sample code to call registered model to predict testing data and save model to local file system
* **get_registered_model_via_rest_api.py** * **get_registered_model_via_rest_api.py**
* a script to test MLflow REST api * a script to test MLflow REST api
* **log_unsupported_model.py**
* a sample script to apply mlflow.pyfunc to package unsupported ml model which can be logged and registered by mlflow
* **optimize_model.py**
* a sample script to demonstrate how to use MLflow and TensorRT libs to optimize Pytorch model on edge devices and fetch it out on client
###### tags: `MLOps` ###### tags: `MLOps`

77
log_unsupported_model.py Normal file
View File

@ -0,0 +1,77 @@
# log_unsupported_model.py
#
# author: deng
# date : 20230309
import mlflow
import pandas as pd
class CustomModel(mlflow.pyfunc.PythonModel):
""" A mlflow wrapper to package unsupported model """
def __init__(self):
self.model = None
def load_model(self):
# load your custom model here
self.model = lambda value: value * 2
def predict(self,
model_input: pd.DataFrame) -> pd.DataFrame:
if self.model is None:
self.load_model()
output = model_input.apply(self.model)
return output
def log_model(server_uri:str,
exp_name: str,
registered_model_name: str) -> None:
# init mlflow
mlflow.set_tracking_uri(server_uri)
mlflow.set_experiment(exp_name)
# register custom model
model = CustomModel()
mlflow.pyfunc.log_model(
artifact_path='model',
python_model=model,
registered_model_name=registered_model_name
)
def pull_model(server_uri: str,
exp_name: str,
registered_model_name: str) -> None:
# init mlflow
mlflow.set_tracking_uri(server_uri)
mlflow.set_experiment(exp_name)
# pull model from registry
model = mlflow.pyfunc.load_model(f'models:/{registered_model_name}/latest')
model = model.unwrap_python_model() # get CustomModel object
print(f'Model loaded. model type: {type(model)}')
# test model availability
fake_data = pd.DataFrame([1, 3, 5])
output = model.predict(fake_data)
print(f'input data: {fake_data}, predictions: {output}')
# save it to local file system
mlflow.pyfunc.save_model(path=f'./model/{exp_name}', python_model=model)
if __name__ == '__main__':
server_uri = 'http://127.0.0.1:5001'
exp_name = 'custom_model'
registered_model_name = 'custom_model'
log_model(server_uri, exp_name, registered_model_name)
pull_model(server_uri, exp_name, registered_model_name)

75
optimize_model.py Normal file
View File

@ -0,0 +1,75 @@
# optimize_model.py
#
# author: deng
# date : 20230418
import shutil
from pathlib import Path
import torch
import mlflow
def optimize_pytorch_model(run_id: str,
model_artifact_dir: str = 'model') -> None:
"""Optimize Pytorch model from MLflow server on edge device
Args:
run_id (str): mlflow run id
model_artifact_dir (str, optional): model dir of run on server. Defaults to 'model'.
"""
download_path = Path('./model/downloaded_pytorch_model')
if download_path.is_dir():
print(f'Remove existed dir: {download_path}')
shutil.rmtree(download_path)
# Download model artifacts to local file system
mlflow_model = mlflow.pytorch.load_model(Path(f'runs:/{run_id}').joinpath(model_artifact_dir).as_posix())
mlflow.pytorch.save_model(mlflow_model, download_path)
# Optimize model
model = torch.load(download_path.joinpath('data/model.pth'))
dummy_input = torch.randn(5)
torch.onnx.export(model, dummy_input, download_path.joinpath('data/model.onnx'))
# we can not call TensorRT on macOS, so imagine we get a serialized model😘
download_path.joinpath('data/model.trt').touch()
# Sent optimized model back to given run
with mlflow.start_run(run_id=run_id):
mlflow.log_artifact(download_path.joinpath('data/model.trt'), 'model/data')
print(f'Optimized model had been uploaded to server: {mlflow.get_tracking_uri()}')
def download_optimized_model(run_id: str,
save_dir: str,
model_artifact_path: str = 'model/data/model.trt') -> None:
"""Download optimized model from MLflow server on clent
Args:
run_id (str): mlflow run id
save_dir (str): dir of local file system to save model
model_artifact_path (str, optional): artifact path of model on server. Defaults to 'model/data/model.trt'.
"""
mlflow.artifacts.download_artifacts(
run_id= run_id,
artifact_path=model_artifact_path,
dst_path=save_dir
)
print(f'Optimized model had been saved, please check: {Path(save_dir).joinpath(model_artifact_path)}')
if __name__ == '__main__':
mlflow.set_tracking_uri('http://127.0.0.1:5001')
optimize_pytorch_model(
run_id='f1b7b9a5ba934f158c07975a8a332de5'
)
download_optimized_model(
run_id='f1b7b9a5ba934f158c07975a8a332de5',
save_dir='./model/download_tensorrt'
)

View File

@ -7,7 +7,7 @@ import torch
import mlflow import mlflow
if __name__ == '__main__': def main():
# set MLflow server # set MLflow server
mlflow.set_tracking_uri('http://127.0.0.1:5001') mlflow.set_tracking_uri('http://127.0.0.1:5001')
@ -21,4 +21,8 @@ if __name__ == '__main__':
print(my_fortune) print(my_fortune)
# save model and env to local file system # save model and env to local file system
mlflow.pytorch.save_model(model, './fortune_predict_model') mlflow.pytorch.save_model(model, './model/fortune_predict_model')
if __name__ == '__main__':
main()

View File

@ -0,0 +1,98 @@
# train.py
#
# author: deng
# date : 20230221
import torch
import torch.nn as nn
from torch.optim import SGD
import mlflow
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec
from tqdm import tqdm
class Net(nn.Module):
""" define a simple neural network model """
def __init__(self):
super(Net, self).__init__()
self.fc1 = nn.Linear(5, 3)
self.fc2 = nn.Linear(3, 1)
def forward(self, x):
x = self.fc1(x)
x = torch.relu(x)
x = self.fc2(x)
return x
def train(model, dataloader, criterion, optimizer, epochs):
""" define the training function """
for epoch in tqdm(range(epochs), 'Epochs'):
for batch, (inputs, labels) in enumerate(dataloader):
# forwarding
outputs = model(inputs)
loss = criterion(outputs, labels)
# update gradient
optimizer.zero_grad()
loss.backward()
optimizer.step()
# log loss
mlflow.log_metric('train_loss', loss.item(), step=epoch)
return loss
if __name__ == '__main__':
# set hyper parameters
learning_rate = 1e-2
batch_size = 10
epochs = 20
# create a dataloader with fake data
dataloader = [(torch.randn(5), torch.randn(1)) for _ in range(100)]
dataloader = torch.utils.data.DataLoader(dataloader, batch_size=batch_size)
# create the model, criterion, and optimizer
model = Net()
criterion = nn.MSELoss()
optimizer = SGD(model.parameters(), lr=learning_rate)
# set the tracking URI to the model registry
mlflow.set_tracking_uri('http://127.0.0.1:5001')
mlflow.set_experiment('train_fortune_predict_model')
# start a new MLflow run
with mlflow.start_run():
# train the model
loss = train(model, dataloader, criterion, optimizer, epochs)
# log some additional metrics
mlflow.log_metric('final_loss', loss.item())
mlflow.log_param('learning_rate', learning_rate)
mlflow.log_param('batch_size', batch_size)
# create a signature to record model input and output info
input_schema = Schema([
ColSpec('float', 'age'),
ColSpec('float', 'mood level'),
ColSpec('float', 'health level'),
ColSpec('float', 'hungry level'),
ColSpec('float', 'sexy level')
])
output_schema = Schema([ColSpec('float', 'fortune')])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
# log trained model
mlflow.pytorch.log_model(model, 'model', signature=signature)
# log training code
mlflow.log_artifact('./train.py', 'code')
print('Completed.')

Binary file not shown.

Binary file not shown.