Airflow Part 3 - DAG Scheduling

Data Engineering
MLOps
Airflow
Author

Imad Dabbura

Published

January 24, 2022

Note

Other than my experience and the documentation, the main resource behind this post and figures is the fantastic book: Data Pipelines with Apache. Airflow.

  1. Airflow will schedule the first execution of DAG at the end of the interval; which means after the last time point in the interval has passed. For example, if we schedule it to run @daily, it will run t midnight of each day starting from the start_date until (optionally) end_date. In other words, as soon as 23:59:59 has passed which means any time after 00:00:00.
    • Example: if start_date=“2022-01-01” and schedule_interval=“@daily” -> The first time it runs is any time soon after “2022-01-02 00:00” which is midnight of January second.
  2. We can use convenience string (such as @daily), timedetla objects (such as timedelta(days=3), or cron expressions (such as 0 0 * * 0 which means weekly on Sunday 00:00)
  3. Frequency scheduling intervals (shorthands):
    • @once: Schedule once and only once.
    • @hourly: Run once an hour at the beginning of the hour.
    • @daily: Run once a day at midnight.
    • @weekly: Run once a week at midnight on Sunday morning.
    • @monthly: Run once a month at midnight on the first day of the month. Run once a year at midnight on January 1.
  4. Cron-based intervals:
# ┌─────── minute (0 - 59)
# │ ┌────── hour (0 - 23)
# │ │ ┌───── dayofthemonth(1-31)
# │ │ │ ┌───── month(1-12)
# │ │ │ │ ┌──── dayoftheweek(0-6)(SundaytoSaturday; 
# │ │ │ │ │ 7 is also Sunday on some systems) 
# * * * * *
- "*" means don't care values.
- Examples:
    1. 0**** means hourly
    2. 00*** means daily at midnight
    3. 00**0 means weekly at midnight on Sunday
- Useful link to check meaning of cron-based intervals: https://crontab.guru/
  1. Cron expressions have limitations when trying to specify frequency-based intervals such as every three days. The reason for this behavior is that cron expressions are stateless and don’t look at previous runs to determine next run, they only look at the current time to see if it matches the expression.
  2. Airflow allows us to use frequency-based intervals using timedelta from datetime library. This way we can use previous run to determine the next run.
    • Example: schedule_interval=“timedelta(days=3)” means to run every 3 days after start_date.
  3. We can use dynamic time reference that uses execution dates which allows us to do the work incrementally. Airflow will pass those dates to the tasks to determine which schedule interval is being executed.
    • execution_date is a timestamp of the start time of the schedule interval
    • next_execution_date is a timestamp of the end time of the schedule interval
    • previous_execution_date is a timestamp of the start time of the previous schedule interval
    • Airflow uses Jinja-based templating such as {{variable_name}}:
fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data && "
        "curl -o /data/events.json " "http://localhost:5000/events?" 
        "start_date={{execution_date.strftime('%Y-%m-%d')}}" 
        "&end_date={{next_execution_date.strftime('%Y-%m-%d')}}"
    ),
dag=dag,
)
- Or we can use shorthands:
fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data && "
        "curl -o /data/events.json " "http://localhost:5000/events?" 
        "start_date={{ds}}" 
        "&end_date={{next_ds}}"
    ),
dag=dag,
)
- `ds` has `YYYY-MM-DD` format while `ds_nodash` has `YYYYMMDD` format
- Shorthands: ds, ds_nodash, next_ds, next_ds_nodash, ps, ps_nodash execution date of the next interval.
  1. We can also use dates or any dynamic parameters to Python function using templates_dict argument and the python callable will be passed the context that has the templates_dict For example:
    calculate_stats = PythonOperator(
       task_id="calculate_stats",
       python_callable=_calculate_stats,
       templates_dict={
            "input_path": "/data/events/{{ds}}.json",
           "output_path": "/data/stats/{{ds}}.csv",
    },
    dag=dag
    )
    def _calculate_stats(**context):
        """Calculates event statistics."""
            input_path = context["templates_dict"]["input_path"] 
            output_path = context["templates_dict"]["output_path"]
  1. Because Airlfow follows Interval-Based Scheduling, that means DAGs run only after the last time point of schedule interval passed. If we run the DAG daily starting from 2022-01-01, the first time it runs is soon after 2022-01-02 00:00:00 has passed and the execution_date would be 2022-01-01 even though it is running in 2022-01-02. This is because it is running for the corresponding interval.
    • The end of the previous interval is execution_date
    • One caveat for this is that previous_execution_date and next_execution_date are only defined for DAGs that run on schedule interval. This means that those values are undefined when the DAGs are run from the UI or CLI
  2. Airflow allows us to have start_date in the past. This will help us in backfilling. By default, Airflow will run all the schedule intervals from the past until current time once the DAG is activated. We can control this behavior using catchup parameter to the DAG() class. If we set it to False, it won’t run previous schedule intervals.
    • Backfilling is also helpful if we change the code for the DAG. It would run all previous schedules after we clear them.

Best Practices: