Airflow Part 5 - Dependencies Between Tasks

Data Engineering

Imad Dabbura


February 14, 2022


Other than my experience and the documentation, the main resource behind this post and figures is the fantastic book: Data Pipelines with Apache. Airflow.

from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator
def _latest_only(**context):
    # execution_time is the first time in the schedule interval
    # So following_schedule is the next execution_date
    left_window = context["dag"].following_schedule(context["execution_date"])
    right_window = context["dag"].following_schedule(left_window)
    now ="utc")
    # Since execution of DAG starts after last time point passed of the 
    # schedule interval -> 
    if not left_window < now <= right_window:
        raise AirflowSkipException("Not the most recent run!")

latest_only = PythonOperator(task_id="latest_only", python_callable=_latest_only, dag=dag)
latest_only >> deplpy_model