Airflow Part 8 - Best Practices

Data Engineering
MLOps
Airflow
Author

Imad Dabbura

Published

March 28, 2022

Note

Other than my experience and the documentation, the main resource behind this post and figures is the fantastic book: Data Pipelines with Apache. Airflow.

task1 >> task2
task1 << task2
[task1] >> task2
task1.set_downstream(task2)
task2.set_upstream(task1)
def generate_tasks(dataset_name, raw_dir, processed_dir, preprocess_script, output_dir, dag):
    raw_path = os.path.join(raw_dir, dataset_name, "{ds_nodash}.json") 
    processed_path = os.path.join(
    processed_dir, dataset_name, "{ds_nodash}.json" )
    output_path = os.path.join(output_dir, dataset_name, "{ds_nodash}.json")
    fetch_task = BashOperator(
        task_id=f"fetch_{dataset_name}",
        bash_command=f"echo 'curl http://example.com/{dataset_name}.json{raw_path}.json'", dag=dag,
        )
    preprocess_task = BashOperator(
        task_id=f"preprocess_{dataset_name}",
        bash_command=f"echo '{preprocess_script} {raw_path} {processed_path}'", dag=dag,
    )
    export_task = BashOperator(
        task_id=f"export_{dataset_name}",
        bash_command=f"echo 'cp {processed_path} {output_path}'", dag=dag,
       )
        fetch_task >> preprocess_task >> export_task
    return fetch_task, export_task

with DAG(
    dag_id="01_task_factory",
    start_date=airflow.utils.dates.days_ago(5),
    schedule_interval="@daily",
) as dag:
    for dataset in ["sales", "customers"]:
        generate_tasks(
            dataset_name=dataset,
            raw_dir="/data/raw", 
            processed_dir="/data/processed", 
            output_dir="/data/output",
            preprocess_script=f"preprocess_{dataset}.py", dag=dag
        )