test_prefect/build_flow.py

94 lines
2.4 KiB
Python

# build_flow.py
#
# author: deng
# date : 20230328
import time
from prefect import flow
from prefect import task
from prefect import get_run_logger
from prefect.deployments import Deployment
from prefect.filesystems import RemoteFileSystem
from prefect.infrastructure import DockerContainer
from prefect.task_runners import ConcurrentTaskRunner
@task(name='stop_at_floor',
description='moving stage',
retries=0,
retry_delay_seconds=1)
def stop_at_floor(floor: int) -> None:
logger = get_run_logger()
logger.info(f'elevator moving to floor {floor}')
time.sleep(floor)
logger.info(f'elevator stops on floor {floor}')
@flow(name='elevator',
description='this is a cute elevator',
task_runner=ConcurrentTaskRunner(),
timeout_seconds=60)
def elevator():
""" This is a small sample from official doc to run tasks concurrently
ref: https://docs.prefect.io/latest/concepts/task-runners/
"""
logger = get_run_logger()
logger.info('Elevator start.')
for floor in range(5, 0, -1):
stop_at_floor.submit(floor)
logger.info('Elevator stop.')
def build_deployment() -> None:
""" Deploy flow to docker-based Prefect server """
infra_block = DockerContainer(
image='test_prefect:20230328-1712',
image_pull_policy='IF_NOT_PRESENT',
auto_remove=True,
networks=['prefect'],
env={
'USE_SSL': False,
'AWS_ACCESS_KEY_ID': 'root',
'AWS_SECRET_ACCESS_KEY': 'minio_password',
'ENDPOINT_URL': 'http://172.28.0.2:9000'
}
)
storage_block = RemoteFileSystem(
basepath='s3://prefect-deployment/test_prefect',
settings={
'use_ssl': False,
'key': 'root',
'secret': 'minio_password',
'client_kwargs': {'endpoint_url': 'http://localhost:9000'}
}
)
infra_block.save('test-prefect-infra', overwrite=True)
storage_block.save('test-prefect-storage', overwrite=True)
deployment = Deployment.build_from_flow(
flow=elevator,
name='test_prefect',
output='test_prefect-deployment.yaml',
tags=['test_prefect'],
work_queue_name='default',
parameters={},
infrastructure=infra_block,
storage=storage_block
)
deployment.apply()
if __name__ == '__main__':
build_deployment()