94 lines
2.4 KiB
Python
94 lines
2.4 KiB
Python
# build_flow.py
|
|
#
|
|
# author: deng
|
|
# date : 20230328
|
|
|
|
import time
|
|
|
|
from prefect import flow
|
|
from prefect import task
|
|
from prefect import get_run_logger
|
|
from prefect.deployments import Deployment
|
|
from prefect.filesystems import RemoteFileSystem
|
|
from prefect.infrastructure import DockerContainer
|
|
from prefect.task_runners import ConcurrentTaskRunner
|
|
|
|
|
|
@task(name='stop_at_floor',
|
|
description='moving stage',
|
|
retries=0,
|
|
retry_delay_seconds=1)
|
|
def stop_at_floor(floor: int) -> None:
|
|
|
|
logger = get_run_logger()
|
|
logger.info(f'elevator moving to floor {floor}')
|
|
|
|
time.sleep(floor)
|
|
|
|
logger.info(f'elevator stops on floor {floor}')
|
|
|
|
|
|
@flow(name='elevator',
|
|
description='this is a cute elevator',
|
|
task_runner=ConcurrentTaskRunner(),
|
|
timeout_seconds=60)
|
|
def elevator():
|
|
""" This is a small sample from official doc to run tasks concurrently
|
|
ref: https://docs.prefect.io/latest/concepts/task-runners/
|
|
"""
|
|
|
|
logger = get_run_logger()
|
|
logger.info('Elevator start.')
|
|
|
|
for floor in range(5, 0, -1):
|
|
stop_at_floor.submit(floor)
|
|
|
|
logger.info('Elevator stop.')
|
|
|
|
|
|
def build_deployment() -> None:
|
|
""" Deploy flow to docker-based Prefect server """
|
|
|
|
infra_block = DockerContainer(
|
|
image='test_prefect:20230328-1712',
|
|
image_pull_policy='IF_NOT_PRESENT',
|
|
auto_remove=True,
|
|
networks=['prefect'],
|
|
env={
|
|
'USE_SSL': False,
|
|
'AWS_ACCESS_KEY_ID': 'root',
|
|
'AWS_SECRET_ACCESS_KEY': 'minio_password',
|
|
'ENDPOINT_URL': 'http://172.28.0.2:9000'
|
|
}
|
|
)
|
|
|
|
storage_block = RemoteFileSystem(
|
|
basepath='s3://prefect-deployment/test_prefect',
|
|
settings={
|
|
'use_ssl': False,
|
|
'key': 'root',
|
|
'secret': 'minio_password',
|
|
'client_kwargs': {'endpoint_url': 'http://localhost:9000'}
|
|
}
|
|
)
|
|
|
|
infra_block.save('test-prefect-infra', overwrite=True)
|
|
storage_block.save('test-prefect-storage', overwrite=True)
|
|
|
|
deployment = Deployment.build_from_flow(
|
|
flow=elevator,
|
|
name='test_prefect',
|
|
output='test_prefect-deployment.yaml',
|
|
tags=['test_prefect'],
|
|
work_queue_name='default',
|
|
parameters={},
|
|
infrastructure=infra_block,
|
|
storage=storage_block
|
|
)
|
|
|
|
deployment.apply()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
build_deployment() |