commit c7af8b202205c05cac3b69d0d4c086a88cf66b6f Author: deng Date: Wed Mar 29 08:45:56 2023 +0800 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..496ee2c --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.DS_Store \ No newline at end of file diff --git a/.prefectignore b/.prefectignore new file mode 100644 index 0000000..786b18c --- /dev/null +++ b/.prefectignore @@ -0,0 +1,2 @@ +service +docker-compose.yaml \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..d6eb066 --- /dev/null +++ b/README.md @@ -0,0 +1,25 @@ +# Abstract + +This is a demo to use [Prefect 2.0](https://github.com/PrefectHQ/prefect) and [Minio](https://github.com/minio/minio) to build a service for data pipeline orchestration🎼 + +# Requirements + +* MacOS 12.5 +* Docker 20.10 + +# Dirs + +* **service** + * House Prefect server data and Minio storage +* **env** + * **prefect.yaml** + * conda env yaml to run this repo + +# Files + +* **docker-compose.yaml** + * a yaml to apply docker-compose to start data pipeline service with basic configuration (run ```docker-compose -f docker-compose.yaml up```) +* **build_flow.py** + * a script to create a small flow and deploy it to the data pipeline server + +###### tags: `Data pipeline` `Object Storage` `Data Engineering` \ No newline at end of file diff --git a/build_flow.py b/build_flow.py new file mode 100644 index 0000000..96a9984 --- /dev/null +++ b/build_flow.py @@ -0,0 +1,82 @@ +# build_flow.py +# +# author: deng +# date : 20230328 + +import time + +from prefect import flow +from prefect import task +from prefect import get_run_logger +from prefect.deployments import Deployment +from prefect.filesystems import RemoteFileSystem +from prefect.infrastructure import DockerContainer +from prefect.task_runners import ConcurrentTaskRunner + + +@task(name='stop_at_floor', + description='moving stage', + retries=0, + retry_delay_seconds=1) +def stop_at_floor(floor: int) -> None: + + logger = get_run_logger() + + logger.info(f'elevator moving to floor {floor}') + time.sleep(floor) + logger.info(f'elevator stops on floor {floor}') + + +@flow(name='elevator', + description='this is a cute elevator', + task_runner=ConcurrentTaskRunner(), + timeout_seconds=60) +def elevator(): + for floor in range(5, 0, -1): + stop_at_floor.submit(floor) + + +def build_deployment() -> None: + """ Deploy flow to docker-based Prefect server """ + + infra_block = DockerContainer( + image='test_prefect:20230328-1712', + image_pull_policy='IF_NOT_PRESENT', + auto_remove=True, + networks=['prefect'], + env={ + 'USE_SSL': False, + 'AWS_ACCESS_KEY_ID': 'root', + 'AWS_SECRET_ACCESS_KEY': 'minio_password', + 'ENDPOINT_URL': 'http://127.0.0.1:9000' + } + ) + + storage_block = RemoteFileSystem( + basepath='s3://prefect-deployment/test_prefect', + settings={ + 'use_ssl': False, + 'key': 'root', + 'secret': 'minio_password', + 'client_kwargs': {'endpoint_url': 'http://127.0.0.1:9000'} + } + ) + + infra_block.save('test-prefect-infra', overwrite=True) + storage_block.save('test-prefect-storage', overwrite=True) + + deployment = Deployment.build_from_flow( + flow=elevator, + name='test_prefect', + output='test_prefect-deployment.yaml', + tags=['test_prefect'], + work_queue_name='default', + parameters={}, + infrastructure=infra_block, + storage=storage_block + ) + + deployment.apply() + +if __name__ == '__main__': + build_deployment() \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..7bef743 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,71 @@ +version: '3.7' + +networks: + prefect: + name: prefect + +services: + minio: + image: minio/minio:latest + restart: always + environment: + MINIO_ROOT_USER: root + MINIO_ROOT_PASSWORD: minio_password + ports: + - 9000:9000 + - 9001:9001 + volumes: + - ~/python/test_prefect/service/minio:/data + command: + - server + - --console-address + - :9001 + - /data + networks: + - prefect + + prefect-server: + image: prefecthq/prefect:2.8.7-python3.10 + restart: always + ports: + - 4200:4200 + volumes: + - ~/python/test_prefect/service/prefect:/root/.prefect + healthcheck: + test: ["CMD", "curl", "-f", "http://0.0.0.0:4200"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 20s + command: bash -c "apt update && apt install -y curl && prefect orion start" + depends_on: + minio: + condition: service_started + environment: + PREFECT_DEBUG_MODE: 'True' + PREFECT_ORION_API_HOST: 0.0.0.0 + PREFECT_ORION_API_PORT: 4200 + PREFECT_API_URL: http://127.0.0.1:4200/api + networks: + - prefect + + prefect-agent: + image: prefecthq/prefect:2.8.7-python3.10 + restart: always + volumes: + - /var/run/docker.sock:/var/run/docker.sock + command: + - prefect + - agent + - start + - -q + - default + depends_on: + prefect-server: + condition: service_started + environment: + PREFECT_API_URL: http://prefect-server:4200/api + PREFECT_LOGGING_LEVEL: DEBUG + DOCKER_HOST: unix://var/run/docker.sock + networks: + - prefect \ No newline at end of file diff --git a/env/Dockerfile b/env/Dockerfile new file mode 100644 index 0000000..037b43a --- /dev/null +++ b/env/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.10-slim + +WORKDIR /prefect + +RUN pip install s3fs +RUN pip install prefect==2.8.7 \ No newline at end of file diff --git a/env/build_infra.sh b/env/build_infra.sh new file mode 100755 index 0000000..b5c49dd --- /dev/null +++ b/env/build_infra.sh @@ -0,0 +1,8 @@ +#!/bin/bash +# build_infra.sh +# +# author: deng +# date : 20230328 + +buildtime=$(date +"%Y%m%d-%H%M") +docker build --no-cache -f Dockerfile -t test_prefect:${buildtime} . \ No newline at end of file diff --git a/service/minio/.minio.sys/buckets/.bloomcycle.bin/xl.meta b/service/minio/.minio.sys/buckets/.bloomcycle.bin/xl.meta new file mode 100644 index 0000000..d272ec2 Binary files /dev/null and b/service/minio/.minio.sys/buckets/.bloomcycle.bin/xl.meta differ diff --git a/service/minio/.minio.sys/buckets/.usage-cache.bin/xl.meta b/service/minio/.minio.sys/buckets/.usage-cache.bin/xl.meta new file mode 100644 index 0000000..a3e443f Binary files /dev/null and b/service/minio/.minio.sys/buckets/.usage-cache.bin/xl.meta differ diff --git a/service/minio/.minio.sys/buckets/.usage.json/xl.meta b/service/minio/.minio.sys/buckets/.usage.json/xl.meta new file mode 100644 index 0000000..6716534 Binary files /dev/null and b/service/minio/.minio.sys/buckets/.usage.json/xl.meta differ diff --git a/service/minio/.minio.sys/buckets/prefect-deployment/.metadata.bin/xl.meta b/service/minio/.minio.sys/buckets/prefect-deployment/.metadata.bin/xl.meta new file mode 100644 index 0000000..746691d Binary files /dev/null and b/service/minio/.minio.sys/buckets/prefect-deployment/.metadata.bin/xl.meta differ diff --git a/service/minio/.minio.sys/buckets/prefect-deployment/.usage-cache.bin/xl.meta b/service/minio/.minio.sys/buckets/prefect-deployment/.usage-cache.bin/xl.meta new file mode 100644 index 0000000..495bac0 Binary files /dev/null and b/service/minio/.minio.sys/buckets/prefect-deployment/.usage-cache.bin/xl.meta differ diff --git a/service/minio/.minio.sys/config/config.json/xl.meta b/service/minio/.minio.sys/config/config.json/xl.meta new file mode 100644 index 0000000..3cc42c4 Binary files /dev/null and b/service/minio/.minio.sys/config/config.json/xl.meta differ diff --git a/service/minio/.minio.sys/config/iam/format.json/xl.meta b/service/minio/.minio.sys/config/iam/format.json/xl.meta new file mode 100644 index 0000000..7a9b2aa Binary files /dev/null and b/service/minio/.minio.sys/config/iam/format.json/xl.meta differ diff --git a/service/minio/.minio.sys/config/iam/sts/WUYXZEOZV3475NY440O7/identity.json/xl.meta b/service/minio/.minio.sys/config/iam/sts/WUYXZEOZV3475NY440O7/identity.json/xl.meta new file mode 100644 index 0000000..265a86e Binary files /dev/null and b/service/minio/.minio.sys/config/iam/sts/WUYXZEOZV3475NY440O7/identity.json/xl.meta differ diff --git a/service/minio/.minio.sys/format.json b/service/minio/.minio.sys/format.json new file mode 100644 index 0000000..f3f8e9d --- /dev/null +++ b/service/minio/.minio.sys/format.json @@ -0,0 +1 @@ +{"version":"1","format":"xl-single","id":"d3e5292e-7b32-44b2-b4ec-545e359fc2db","xl":{"version":"3","this":"88e54d68-0e10-4257-825d-eab6afe6bf5c","sets":[["88e54d68-0e10-4257-825d-eab6afe6bf5c"]],"distributionAlgo":"SIPMOD+PARITY"}} \ No newline at end of file diff --git a/service/minio/.minio.sys/pool.bin/xl.meta b/service/minio/.minio.sys/pool.bin/xl.meta new file mode 100644 index 0000000..efbcd70 Binary files /dev/null and b/service/minio/.minio.sys/pool.bin/xl.meta differ diff --git a/service/minio/prefect-deployment/test_prefect/.gitignore/xl.meta b/service/minio/prefect-deployment/test_prefect/.gitignore/xl.meta new file mode 100644 index 0000000..03b01f6 Binary files /dev/null and b/service/minio/prefect-deployment/test_prefect/.gitignore/xl.meta differ diff --git a/service/minio/prefect-deployment/test_prefect/.prefectignore/xl.meta b/service/minio/prefect-deployment/test_prefect/.prefectignore/xl.meta new file mode 100644 index 0000000..f6c6456 Binary files /dev/null and b/service/minio/prefect-deployment/test_prefect/.prefectignore/xl.meta differ diff --git a/service/minio/prefect-deployment/test_prefect/README.md/xl.meta b/service/minio/prefect-deployment/test_prefect/README.md/xl.meta new file mode 100644 index 0000000..b5c8b5e Binary files /dev/null and b/service/minio/prefect-deployment/test_prefect/README.md/xl.meta differ diff --git a/service/minio/prefect-deployment/test_prefect/build_flow.py/xl.meta b/service/minio/prefect-deployment/test_prefect/build_flow.py/xl.meta new file mode 100644 index 0000000..9a039db Binary files /dev/null and b/service/minio/prefect-deployment/test_prefect/build_flow.py/xl.meta differ diff --git a/service/minio/prefect-deployment/test_prefect/env/Dockerfile/xl.meta b/service/minio/prefect-deployment/test_prefect/env/Dockerfile/xl.meta new file mode 100644 index 0000000..907e2df Binary files /dev/null and b/service/minio/prefect-deployment/test_prefect/env/Dockerfile/xl.meta differ diff --git a/service/minio/prefect-deployment/test_prefect/env/build_infra.sh/xl.meta b/service/minio/prefect-deployment/test_prefect/env/build_infra.sh/xl.meta new file mode 100644 index 0000000..1e82cc3 Binary files /dev/null and b/service/minio/prefect-deployment/test_prefect/env/build_infra.sh/xl.meta differ diff --git a/service/minio/prefect-deployment/test_prefect/test_asyncio.py/xl.meta b/service/minio/prefect-deployment/test_prefect/test_asyncio.py/xl.meta new file mode 100644 index 0000000..30fa69a Binary files /dev/null and b/service/minio/prefect-deployment/test_prefect/test_asyncio.py/xl.meta differ diff --git a/service/minio/prefect-deployment/test_prefect/test_prefect-deployment.yaml/xl.meta b/service/minio/prefect-deployment/test_prefect/test_prefect-deployment.yaml/xl.meta new file mode 100644 index 0000000..c97558c Binary files /dev/null and b/service/minio/prefect-deployment/test_prefect/test_prefect-deployment.yaml/xl.meta differ diff --git a/service/prefect/memo_store.toml b/service/prefect/memo_store.toml new file mode 100644 index 0000000..e87622b --- /dev/null +++ b/service/prefect/memo_store.toml @@ -0,0 +1 @@ +block_auto_registration = "0016c72a6d80786d0b6c25d0bf5fed0752a58b96c7b3643cd9c27c2b250c6a75" diff --git a/service/prefect/prefect.db b/service/prefect/prefect.db new file mode 100644 index 0000000..40eeb20 Binary files /dev/null and b/service/prefect/prefect.db differ diff --git a/test_asyncio.py b/test_asyncio.py new file mode 100644 index 0000000..56c09de --- /dev/null +++ b/test_asyncio.py @@ -0,0 +1,15 @@ +# test_asyncio.py +# +# author: deng +# date : 20230328 + +import asyncio + + +async def main(): + await asyncio.sleep(1) + print('hello') + + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file diff --git a/test_prefect-deployment.yaml b/test_prefect-deployment.yaml new file mode 100644 index 0000000..a79dc74 --- /dev/null +++ b/test_prefect-deployment.yaml @@ -0,0 +1,70 @@ +### +### A complete description of a Prefect Deployment for flow 'elevator' +### +name: test_prefect +description: this is a cute elevator +version: 44e21b51f98000011b5960b821f33b31 +# The work queue that will handle this deployment's runs +work_queue_name: default +work_pool_name: default-agent-pool +tags: +- test_prefect +parameters: {} +schedule: null +is_schedule_active: true +infra_overrides: {} + +### +### DO NOT EDIT BELOW THIS LINE +### +flow_name: elevator +manifest_path: null +infrastructure: + type: docker-container + env: + USE_SSL: 'False' + AWS_ACCESS_KEY_ID: root + AWS_SECRET_ACCESS_KEY: minio_password + ENDPOINT_URL: http://127.0.0.1:9000 + labels: {} + name: null + command: null + image: test_prefect:20230328-1712 + image_pull_policy: IF_NOT_PRESENT + image_registry: null + networks: + - prefect + network_mode: null + auto_remove: true + volumes: [] + stream_output: true + memswap_limit: null + mem_limit: null + privileged: false + _is_anonymous: false + _block_document_name: test-prefec-infra + _block_document_id: c0336aea-3d1e-4c77-b2fe-66e53263a1cc + block_type_slug: docker-container + _block_type_slug: docker-container +storage: + basepath: s3://prefect-deployment/test_prefect + settings: + use_ssl: false + key: root + secret: minio_password + client_kwargs: + endpoint_url: http://127.0.0.1:9000 + _is_anonymous: false + _block_document_name: test-prefect-storage + _block_document_id: 9d763296-4114-4a72-8fa7-c9c7c2464b9c + block_type_slug: remote-file-system + _block_type_slug: remote-file-system +path: null +entrypoint: build_flow.py:elevator +parameter_openapi_schema: + title: Parameters + type: object + properties: {} + required: null + definitions: null +timestamp: '2023-03-28T09:22:52.437835+00:00'