Compare commits
10 Commits
a0ac14d0f7
...
master
Author | SHA1 | Date | |
---|---|---|---|
2ce9a31883 | |||
1b90edee59 | |||
e1f143736e | |||
3c39c48242 | |||
ac6400e93a | |||
0e6a5b8925 | |||
578f0ceea1 | |||
4bf037de15 | |||
1f86146b12 | |||
ec19042d0d |
3
.gitignore
vendored
3
.gitignore
vendored
@ -1,2 +1,3 @@
|
|||||||
.DS_Store
|
.DS_Store
|
||||||
fortune_predict_model
|
__pycache__
|
||||||
|
model
|
36
README.md
36
README.md
@ -1,3 +1,35 @@
|
|||||||
# test_mlflow
|
# Abstract
|
||||||
|
|
||||||
測試使用MLflow紀錄Pytorch模型訓練,以及從Model registry中拉取Production model進行推論。
|
Try to use [MLflow](https://mlflow.org) platform to log PyTorch model training, and pull production model from model registry to run inference⛩
|
||||||
|
|
||||||
|
# Requirements
|
||||||
|
|
||||||
|
* MacOS 12.5
|
||||||
|
* Docker 20.10
|
||||||
|
|
||||||
|
# Dirs
|
||||||
|
|
||||||
|
* **service**
|
||||||
|
* House MLflow service data, including MLflow artifacts, backend store and model registry
|
||||||
|
* **env**
|
||||||
|
* **mlflow.yaml**
|
||||||
|
* conda env yaml to run this repo
|
||||||
|
|
||||||
|
# Files
|
||||||
|
|
||||||
|
* **docker-compose.yaml**
|
||||||
|
* a yaml to apply docker-compose to start MLflow service with basic configuration (run ```docker-compose -f docker-compose.yaml up```)
|
||||||
|
* **test_pytorch_m1.py**
|
||||||
|
* a script to test PyTorch on Apple M1 platform with GPU acceleration
|
||||||
|
* **train.py**
|
||||||
|
* a sample code to apply PyTorch to train a small neural network to predict fortune with MLflow logging
|
||||||
|
* **predict.py**
|
||||||
|
* a sample code to call registered model to predict testing data and save model to local file system
|
||||||
|
* **get_registered_model_via_rest_api.py**
|
||||||
|
* a script to test MLflow REST api
|
||||||
|
* **log_unsupported_model.py**
|
||||||
|
* a sample script to apply mlflow.pyfunc to package unsupported ml model which can be logged and registered by mlflow
|
||||||
|
* **optimize_model.py**
|
||||||
|
* a sample script to demonstrate how to use MLflow and TensorRT libs to optimize Pytorch model on edge devices and fetch it out on client
|
||||||
|
|
||||||
|
###### tags: `MLOps`
|
18
docker-compose.yaml
Normal file
18
docker-compose.yaml
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
version: '3.7'
|
||||||
|
|
||||||
|
services:
|
||||||
|
mlflow_server:
|
||||||
|
image: ghcr.io/mlflow/mlflow:v2.1.1
|
||||||
|
restart: always
|
||||||
|
ports:
|
||||||
|
- 5001:5001
|
||||||
|
volumes:
|
||||||
|
- ~/python/test_mlflow/service:/home
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "curl", "-f", "http://0.0.0.0:5001"]
|
||||||
|
interval: 30s
|
||||||
|
timeout: 10s
|
||||||
|
retries: 3
|
||||||
|
start_period: 20s
|
||||||
|
command: bash -c "apt update && apt install -y curl && mlflow server --host 0.0.0.0 --port 5001 --backend-store-uri sqlite:////home/backend.db --registry-store-uri sqlite:////home/registry.db --artifacts-destination /home/artifacts --serve-artifacts"
|
||||||
|
|
0
conda.yaml → env/mlflow.yaml
vendored
0
conda.yaml → env/mlflow.yaml
vendored
@ -12,7 +12,7 @@ def main():
|
|||||||
production_model_version = None
|
production_model_version = None
|
||||||
|
|
||||||
query = {'name': registered_model_name}
|
query = {'name': registered_model_name}
|
||||||
res = requests.get('http://127.0.0.1:5000/api/2.0/mlflow/registered-models/get', params=query)
|
res = requests.get('http://127.0.0.1:5001/api/2.0/mlflow/registered-models/get', params=query)
|
||||||
content = json.loads(res.text)
|
content = json.loads(res.text)
|
||||||
print(content)
|
print(content)
|
||||||
|
|
||||||
@ -23,7 +23,7 @@ def main():
|
|||||||
|
|
||||||
if production_model_version is not None:
|
if production_model_version is not None:
|
||||||
query = {'name': registered_model_name, 'version': production_model_version}
|
query = {'name': registered_model_name, 'version': production_model_version}
|
||||||
res = requests.get('http://127.0.0.1:5000/api/2.0/mlflow/model-versions/get-download-uri', params=query)
|
res = requests.get('http://127.0.0.1:5001/api/2.0/mlflow/model-versions/get-download-uri', params=query)
|
||||||
print(res.text)
|
print(res.text)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
77
log_unsupported_model.py
Normal file
77
log_unsupported_model.py
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
# log_unsupported_model.py
|
||||||
|
#
|
||||||
|
# author: deng
|
||||||
|
# date : 20230309
|
||||||
|
|
||||||
|
import mlflow
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
|
||||||
|
class CustomModel(mlflow.pyfunc.PythonModel):
|
||||||
|
""" A mlflow wrapper to package unsupported model """
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.model = None
|
||||||
|
|
||||||
|
def load_model(self):
|
||||||
|
# load your custom model here
|
||||||
|
self.model = lambda value: value * 2
|
||||||
|
|
||||||
|
def predict(self,
|
||||||
|
model_input: pd.DataFrame) -> pd.DataFrame:
|
||||||
|
|
||||||
|
if self.model is None:
|
||||||
|
self.load_model()
|
||||||
|
|
||||||
|
output = model_input.apply(self.model)
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
|
||||||
|
def log_model(server_uri:str,
|
||||||
|
exp_name: str,
|
||||||
|
registered_model_name: str) -> None:
|
||||||
|
|
||||||
|
# init mlflow
|
||||||
|
mlflow.set_tracking_uri(server_uri)
|
||||||
|
mlflow.set_experiment(exp_name)
|
||||||
|
|
||||||
|
# register custom model
|
||||||
|
model = CustomModel()
|
||||||
|
mlflow.pyfunc.log_model(
|
||||||
|
artifact_path='model',
|
||||||
|
python_model=model,
|
||||||
|
registered_model_name=registered_model_name
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def pull_model(server_uri: str,
|
||||||
|
exp_name: str,
|
||||||
|
registered_model_name: str) -> None:
|
||||||
|
|
||||||
|
# init mlflow
|
||||||
|
mlflow.set_tracking_uri(server_uri)
|
||||||
|
mlflow.set_experiment(exp_name)
|
||||||
|
|
||||||
|
# pull model from registry
|
||||||
|
model = mlflow.pyfunc.load_model(f'models:/{registered_model_name}/latest')
|
||||||
|
model = model.unwrap_python_model() # get CustomModel object
|
||||||
|
print(f'Model loaded. model type: {type(model)}')
|
||||||
|
|
||||||
|
# test model availability
|
||||||
|
fake_data = pd.DataFrame([1, 3, 5])
|
||||||
|
output = model.predict(fake_data)
|
||||||
|
print(f'input data: {fake_data}, predictions: {output}')
|
||||||
|
|
||||||
|
# save it to local file system
|
||||||
|
mlflow.pyfunc.save_model(path=f'./model/{exp_name}', python_model=model)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
server_uri = 'http://127.0.0.1:5001'
|
||||||
|
exp_name = 'custom_model'
|
||||||
|
registered_model_name = 'custom_model'
|
||||||
|
|
||||||
|
log_model(server_uri, exp_name, registered_model_name)
|
||||||
|
pull_model(server_uri, exp_name, registered_model_name)
|
75
optimize_model.py
Normal file
75
optimize_model.py
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
# optimize_model.py
|
||||||
|
#
|
||||||
|
# author: deng
|
||||||
|
# date : 20230418
|
||||||
|
|
||||||
|
import shutil
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import torch
|
||||||
|
import mlflow
|
||||||
|
|
||||||
|
|
||||||
|
def optimize_pytorch_model(run_id: str,
|
||||||
|
model_artifact_dir: str = 'model') -> None:
|
||||||
|
"""Optimize Pytorch model from MLflow server on edge device
|
||||||
|
|
||||||
|
Args:
|
||||||
|
run_id (str): mlflow run id
|
||||||
|
model_artifact_dir (str, optional): model dir of run on server. Defaults to 'model'.
|
||||||
|
"""
|
||||||
|
|
||||||
|
download_path = Path('./model/downloaded_pytorch_model')
|
||||||
|
if download_path.is_dir():
|
||||||
|
print(f'Remove existed dir: {download_path}')
|
||||||
|
shutil.rmtree(download_path)
|
||||||
|
|
||||||
|
# Download model artifacts to local file system
|
||||||
|
mlflow_model = mlflow.pytorch.load_model(Path(f'runs:/{run_id}').joinpath(model_artifact_dir).as_posix())
|
||||||
|
mlflow.pytorch.save_model(mlflow_model, download_path)
|
||||||
|
|
||||||
|
# Optimize model
|
||||||
|
model = torch.load(download_path.joinpath('data/model.pth'))
|
||||||
|
dummy_input = torch.randn(5)
|
||||||
|
torch.onnx.export(model, dummy_input, download_path.joinpath('data/model.onnx'))
|
||||||
|
# we can not call TensorRT on macOS, so imagine we get a serialized model😘
|
||||||
|
download_path.joinpath('data/model.trt').touch()
|
||||||
|
|
||||||
|
# Sent optimized model back to given run
|
||||||
|
with mlflow.start_run(run_id=run_id):
|
||||||
|
mlflow.log_artifact(download_path.joinpath('data/model.trt'), 'model/data')
|
||||||
|
print(f'Optimized model had been uploaded to server: {mlflow.get_tracking_uri()}')
|
||||||
|
|
||||||
|
|
||||||
|
def download_optimized_model(run_id: str,
|
||||||
|
save_dir: str,
|
||||||
|
model_artifact_path: str = 'model/data/model.trt') -> None:
|
||||||
|
"""Download optimized model from MLflow server on clent
|
||||||
|
|
||||||
|
Args:
|
||||||
|
run_id (str): mlflow run id
|
||||||
|
save_dir (str): dir of local file system to save model
|
||||||
|
model_artifact_path (str, optional): artifact path of model on server. Defaults to 'model/data/model.trt'.
|
||||||
|
"""
|
||||||
|
|
||||||
|
mlflow.artifacts.download_artifacts(
|
||||||
|
run_id= run_id,
|
||||||
|
artifact_path=model_artifact_path,
|
||||||
|
dst_path=save_dir
|
||||||
|
)
|
||||||
|
|
||||||
|
print(f'Optimized model had been saved, please check: {Path(save_dir).joinpath(model_artifact_path)}')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
mlflow.set_tracking_uri('http://127.0.0.1:5001')
|
||||||
|
|
||||||
|
optimize_pytorch_model(
|
||||||
|
run_id='f1b7b9a5ba934f158c07975a8a332de5'
|
||||||
|
)
|
||||||
|
|
||||||
|
download_optimized_model(
|
||||||
|
run_id='f1b7b9a5ba934f158c07975a8a332de5',
|
||||||
|
save_dir='./model/download_tensorrt'
|
||||||
|
)
|
10
predict.py
10
predict.py
@ -7,10 +7,10 @@ import torch
|
|||||||
import mlflow
|
import mlflow
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
def main():
|
||||||
|
|
||||||
# set MLflow server
|
# set MLflow server
|
||||||
mlflow.set_tracking_uri('http://127.0.0.1:5000')
|
mlflow.set_tracking_uri('http://127.0.0.1:5001')
|
||||||
|
|
||||||
# load production model
|
# load production model
|
||||||
model = mlflow.pytorch.load_model('models:/fortune_predict_model/production')
|
model = mlflow.pytorch.load_model('models:/fortune_predict_model/production')
|
||||||
@ -21,4 +21,8 @@ if __name__ == '__main__':
|
|||||||
print(my_fortune)
|
print(my_fortune)
|
||||||
|
|
||||||
# save model and env to local file system
|
# save model and env to local file system
|
||||||
mlflow.pytorch.save_model(model, './fortune_predict_model')
|
mlflow.pytorch.save_model(model, './model/fortune_predict_model')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
@ -0,0 +1,98 @@
|
|||||||
|
# train.py
|
||||||
|
#
|
||||||
|
# author: deng
|
||||||
|
# date : 20230221
|
||||||
|
|
||||||
|
import torch
|
||||||
|
import torch.nn as nn
|
||||||
|
from torch.optim import SGD
|
||||||
|
import mlflow
|
||||||
|
from mlflow.models.signature import ModelSignature
|
||||||
|
from mlflow.types.schema import Schema, ColSpec
|
||||||
|
from tqdm import tqdm
|
||||||
|
|
||||||
|
|
||||||
|
class Net(nn.Module):
|
||||||
|
""" define a simple neural network model """
|
||||||
|
def __init__(self):
|
||||||
|
super(Net, self).__init__()
|
||||||
|
self.fc1 = nn.Linear(5, 3)
|
||||||
|
self.fc2 = nn.Linear(3, 1)
|
||||||
|
|
||||||
|
def forward(self, x):
|
||||||
|
x = self.fc1(x)
|
||||||
|
x = torch.relu(x)
|
||||||
|
x = self.fc2(x)
|
||||||
|
return x
|
||||||
|
|
||||||
|
|
||||||
|
def train(model, dataloader, criterion, optimizer, epochs):
|
||||||
|
""" define the training function """
|
||||||
|
for epoch in tqdm(range(epochs), 'Epochs'):
|
||||||
|
|
||||||
|
for batch, (inputs, labels) in enumerate(dataloader):
|
||||||
|
|
||||||
|
# forwarding
|
||||||
|
outputs = model(inputs)
|
||||||
|
loss = criterion(outputs, labels)
|
||||||
|
|
||||||
|
# update gradient
|
||||||
|
optimizer.zero_grad()
|
||||||
|
loss.backward()
|
||||||
|
optimizer.step()
|
||||||
|
|
||||||
|
# log loss
|
||||||
|
mlflow.log_metric('train_loss', loss.item(), step=epoch)
|
||||||
|
|
||||||
|
return loss
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
# set hyper parameters
|
||||||
|
learning_rate = 1e-2
|
||||||
|
batch_size = 10
|
||||||
|
epochs = 20
|
||||||
|
|
||||||
|
# create a dataloader with fake data
|
||||||
|
dataloader = [(torch.randn(5), torch.randn(1)) for _ in range(100)]
|
||||||
|
dataloader = torch.utils.data.DataLoader(dataloader, batch_size=batch_size)
|
||||||
|
|
||||||
|
# create the model, criterion, and optimizer
|
||||||
|
model = Net()
|
||||||
|
criterion = nn.MSELoss()
|
||||||
|
optimizer = SGD(model.parameters(), lr=learning_rate)
|
||||||
|
|
||||||
|
# set the tracking URI to the model registry
|
||||||
|
mlflow.set_tracking_uri('http://127.0.0.1:5001')
|
||||||
|
mlflow.set_experiment('train_fortune_predict_model')
|
||||||
|
|
||||||
|
# start a new MLflow run
|
||||||
|
with mlflow.start_run():
|
||||||
|
|
||||||
|
# train the model
|
||||||
|
loss = train(model, dataloader, criterion, optimizer, epochs)
|
||||||
|
|
||||||
|
# log some additional metrics
|
||||||
|
mlflow.log_metric('final_loss', loss.item())
|
||||||
|
mlflow.log_param('learning_rate', learning_rate)
|
||||||
|
mlflow.log_param('batch_size', batch_size)
|
||||||
|
|
||||||
|
# create a signature to record model input and output info
|
||||||
|
input_schema = Schema([
|
||||||
|
ColSpec('float', 'age'),
|
||||||
|
ColSpec('float', 'mood level'),
|
||||||
|
ColSpec('float', 'health level'),
|
||||||
|
ColSpec('float', 'hungry level'),
|
||||||
|
ColSpec('float', 'sexy level')
|
||||||
|
])
|
||||||
|
output_schema = Schema([ColSpec('float', 'fortune')])
|
||||||
|
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
|
||||||
|
|
||||||
|
# log trained model
|
||||||
|
mlflow.pytorch.log_model(model, 'model', signature=signature)
|
||||||
|
|
||||||
|
# log training code
|
||||||
|
mlflow.log_artifact('./train.py', 'code')
|
||||||
|
|
||||||
|
print('Completed.')
|
Binary file not shown.
Binary file not shown.
@ -1,7 +0,0 @@
|
|||||||
#!/bin/bash
|
|
||||||
# start_mlflow_server.sh
|
|
||||||
#
|
|
||||||
# author: deng
|
|
||||||
# date : 20230221
|
|
||||||
|
|
||||||
mlflow server --backend-store-uri sqlite:///service/backend.db --registry-store-uri sqlite:///service/registry.db --default-artifact-root ./service/artifacts --host 127.0.0.1 --port 5000 --serve-artifacts
|
|
@ -1,8 +1,14 @@
|
|||||||
|
# test_pytorch_m1.py
|
||||||
|
# Ref: https://towardsdatascience.com/installing-pytorch-on-apple-m1-chip-with-gpu-acceleration-3351dc44d67c
|
||||||
|
#
|
||||||
|
# author: deng
|
||||||
|
# date : 20230301
|
||||||
|
|
||||||
import torch
|
import torch
|
||||||
import math
|
import math
|
||||||
|
|
||||||
print('this ensures that the current MacOS version is at least 12.3+')
|
print('This ensures that the current MacOS version is at least 12.3+')
|
||||||
print(torch.backends.mps.is_available())
|
print(torch.backends.mps.is_available())
|
||||||
|
|
||||||
print('\nthis ensures that the current current PyTorch installation was built with MPS activated.')
|
print('\nThis ensures that the current current PyTorch installation was built with MPS activated.')
|
||||||
print(torch.backends.mps.is_built())
|
print(torch.backends.mps.is_built())
|
2
train.py
2
train.py
@ -64,7 +64,7 @@ if __name__ == '__main__':
|
|||||||
optimizer = SGD(model.parameters(), lr=learning_rate)
|
optimizer = SGD(model.parameters(), lr=learning_rate)
|
||||||
|
|
||||||
# set the tracking URI to the model registry
|
# set the tracking URI to the model registry
|
||||||
mlflow.set_tracking_uri('http://127.0.0.1:5000')
|
mlflow.set_tracking_uri('http://127.0.0.1:5001')
|
||||||
mlflow.set_experiment('train_fortune_predict_model')
|
mlflow.set_experiment('train_fortune_predict_model')
|
||||||
|
|
||||||
# start a new MLflow run
|
# start a new MLflow run
|
||||||
|
Reference in New Issue
Block a user