Note
Other than my experience and the documentation, the main resource behind this post and figures is the fantastic book: Data Pipelines with Apache. Airflow.
- Airflow will schedule the first execution of DAG at the end of the interval; which means after the last time point in the interval has passed. For example, if we schedule it to run
@daily
, it will run t midnight of each day starting from thestart_date
until (optionally)end_date
. In other words, as soon as23:59:59
has passed which means any time after00:00:00
.- Example: if start_date=“2022-01-01” and schedule_interval=“@daily” -> The first time it runs is any time soon after “2022-01-02 00:00” which is midnight of January second.
- We can use convenience string (such as
@daily
), timedetla objects (such as timedelta(days=3), or cron expressions (such as0 0 * * 0
which means weekly on Sunday 00:00) - Frequency scheduling intervals (shorthands):
@once
: Schedule once and only once.@hourly
: Run once an hour at the beginning of the hour.@daily
: Run once a day at midnight.@weekly
: Run once a week at midnight on Sunday morning.@monthly
: Run once a month at midnight on the first day of the month. Run once a year at midnight on January 1.
- Cron-based intervals:
# ┌─────── minute (0 - 59)
# │ ┌────── hour (0 - 23)
# │ │ ┌───── dayofthemonth(1-31)
# │ │ │ ┌───── month(1-12)
# │ │ │ │ ┌──── dayoftheweek(0-6)(SundaytoSaturday;
# │ │ │ │ │ 7 is also Sunday on some systems)
# * * * * *
- "*" means don't care values.
- Examples:
1. 0**** means hourly
2. 00*** means daily at midnight
3. 00**0 means weekly at midnight on Sunday
- Useful link to check meaning of cron-based intervals: https://crontab.guru/
- Cron expressions have limitations when trying to specify frequency-based intervals such as every three days. The reason for this behavior is that cron expressions are stateless and don’t look at previous runs to determine next run, they only look at the current time to see if it matches the expression.
- Airflow allows us to use frequency-based intervals using
timedelta
from datetime library. This way we can use previous run to determine the next run.- Example: schedule_interval=“timedelta(days=3)” means to run every 3 days after start_date.
- We can use dynamic time reference that uses execution dates which allows us to do the work incrementally. Airflow will pass those dates to the tasks to determine which schedule interval is being executed.
execution_date
is a timestamp of the start time of the schedule intervalnext_execution_date
is a timestamp of the end time of the schedule intervalprevious_execution_date
is a timestamp of the start time of the previous schedule interval- Airflow uses Jinja-based templating such as {{variable_name}}:
fetch_events = BashOperator(
task_id="fetch_events",
bash_command=(
"mkdir -p /data && "
"curl -o /data/events.json " "http://localhost:5000/events?"
"start_date={{execution_date.strftime('%Y-%m-%d')}}"
"&end_date={{next_execution_date.strftime('%Y-%m-%d')}}"
),
dag=dag,
)
- Or we can use shorthands:
fetch_events = BashOperator(
task_id="fetch_events",
bash_command=(
"mkdir -p /data && "
"curl -o /data/events.json " "http://localhost:5000/events?"
"start_date={{ds}}"
"&end_date={{next_ds}}"
),
dag=dag,
)
- `ds` has `YYYY-MM-DD` format while `ds_nodash` has `YYYYMMDD` format
- Shorthands: ds, ds_nodash, next_ds, next_ds_nodash, ps, ps_nodash execution date of the next interval.
- We can also use dates or any dynamic parameters to Python function using
templates_dict
argument and the python callable will be passed the context that has thetemplates_dict
For example:
calculate_stats = PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
templates_dict={
"input_path": "/data/events/{{ds}}.json",
"output_path": "/data/stats/{{ds}}.csv",
},
dag=dag
)
def _calculate_stats(**context):
"""Calculates event statistics."""
= context["templates_dict"]["input_path"]
input_path = context["templates_dict"]["output_path"] output_path
- Because Airlfow follows Interval-Based Scheduling, that means DAGs run only after the last time point of schedule interval passed. If we run the DAG daily starting from
2022-01-01
, the first time it runs is soon after2022-01-02 00:00:00
has passed and theexecution_date
would be2022-01-01
even though it is running in2022-01-02
. This is because it is running for the corresponding interval.- The end of the previous interval is
execution_date
- One caveat for this is that
previous_execution_date
andnext_execution_date
are only defined for DAGs that run on schedule interval. This means that those values are undefined when the DAGs are run from the UI or CLI
- The end of the previous interval is
- Airflow allows us to have
start_date
in the past. This will help us in backfilling. By default, Airflow will run all the schedule intervals from the past until current time once the DAG is activated. We can control this behavior usingcatchup
parameter to theDAG()
class. If we set it toFalse
, it won’t run previous schedule intervals.- Backfilling is also helpful if we change the code for the DAG. It would run all previous schedules after we clear them.
Best Practices:
- Task needs to be atomic which means a single coherent unit of work. This allows us to split work into smaller units where if one fails we know exactly what is it and recover easily.
- Task needs to be idempotent which means it has no side effects on the system when it reruns. If the task is given the same input, it should produce the same output.
- In database systems, we can use upsert, which allows us to overwrite existing row.
- When writing to files, make sure that rerunning the same task for the same interval don’t write data again. Append doesn’t let us make the task idempotent.