Airflow Part 5 - Dependencies Between Tasks

Data Engineering
MLOps
Airflow
Author

Imad Dabbura

Published

February 14, 2022

Note

Other than my experience and the documentation, the main resource behind this post and figures is the fantastic book: Data Pipelines with Apache. Airflow.

from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator
def _latest_only(**context):
    # execution_time is the first time in the schedule interval
    # So following_schedule is the next execution_date
    left_window = context["dag"].following_schedule(context["execution_date"])
    right_window = context["dag"].following_schedule(left_window)
    now = pendulum.now("utc")
    # Since execution of DAG starts after last time point passed of the 
    # schedule interval -> 
    if not left_window < now <= right_window:
        raise AirflowSkipException("Not the most recent run!")

latest_only = PythonOperator(task_id="latest_only", python_callable=_latest_only, dag=dag)
latest_only >> deplpy_model