# build_flow.py # # author: deng # date : 20230328 import time from prefect import flow from prefect import task from prefect import get_run_logger from prefect.deployments import Deployment from prefect.filesystems import RemoteFileSystem from prefect.infrastructure import DockerContainer from prefect.task_runners import ConcurrentTaskRunner @task(name='stop_at_floor', description='moving stage', retries=0, retry_delay_seconds=1) def stop_at_floor(floor: int) -> None: logger = get_run_logger() logger.info(f'elevator moving to floor {floor}') time.sleep(floor) logger.info(f'elevator stops on floor {floor}') @flow(name='elevator', description='this is a cute elevator', task_runner=ConcurrentTaskRunner(), timeout_seconds=60) def elevator(): for floor in range(5, 0, -1): stop_at_floor.submit(floor) def build_deployment() -> None: """ Deploy flow to docker-based Prefect server """ infra_block = DockerContainer( image='test_prefect:20230328-1712', image_pull_policy='IF_NOT_PRESENT', auto_remove=True, networks=['prefect'], env={ 'USE_SSL': False, 'AWS_ACCESS_KEY_ID': 'root', 'AWS_SECRET_ACCESS_KEY': 'minio_password', 'ENDPOINT_URL': 'http://127.0.0.1:9000' } ) storage_block = RemoteFileSystem( basepath='s3://prefect-deployment/test_prefect', settings={ 'use_ssl': False, 'key': 'root', 'secret': 'minio_password', 'client_kwargs': {'endpoint_url': 'http://127.0.0.1:9000'} } ) infra_block.save('test-prefect-infra', overwrite=True) storage_block.save('test-prefect-storage', overwrite=True) deployment = Deployment.build_from_flow( flow=elevator, name='test_prefect', output='test_prefect-deployment.yaml', tags=['test_prefect'], work_queue_name='default', parameters={}, infrastructure=infra_block, storage=storage_block ) deployment.apply() if __name__ == '__main__': build_deployment()