Note
Other than my experience and the documentation, the main resource behind this post and figures is the fantastic book: Data Pipelines with Apache. Airflow.
- Workflows are most commonly triggered based on schedule intervals provided using
start_date
,end_date
,schedule_interval
. Airflow would calculate when the next schedule would be and start the first task(s) to run at the next data/time. - However, sometimes we want the workflow to run based on the occurance of external events such as a file is available in specific location OR code is changed on git repo etc.
- One way to execute workflows based on the occurance of external exents is using Airflow’s sensors. Sensor is a subclass of operators that checks if certain condition is true. If true, execute the step (workflow). If false, wait for a given period (default 60 seconds) and tries again. It keeps doing so for timeout period. This is a form of Poking, which is checking for the existence of file in the case of FileSensor.
from airflow.sensors.filesystem import FileSensor
= FileSensor(
wait_for_file_1 ="wait_for_file_1", filepath="/data/file_1.csv"
task_id )
- We can also use globbing with FileSensors by using wildcards to check for the existence of file(s)
- We can also use PythonSensor which checks for certain condition and must return a Boolean. It is more flexible and easier to read than using globbing within FileSensor. It is the same as PythonOperator in terms of taking a Python callable
from pathlib import Path
from airflow.sensors.python import PythonSensor
# Check whether there is any data for a given supermarker
# and there is _SUCCESS path which indicates whether the
# data for the given supermarket is all uploaded
def _wait_for_supermarket(supermarket):
= Path("/data") / supermarket
supermarket_path = Path("/data") / "_SUCCESS"
success_path = supermarketpath.glob("*.csv")
data_files return data_files and success_path.exists()
= PythonSensor(
wait_for_supermarket_1 ="wait_for_supermarket_1",
task_id=_wait_for_supermarket,
python_callable={"supermarket": "supermarket_1"},
op_kwargs=dag
dag )
- All sensors take a
timeout
arguments, which has default value of 7 days - There is also a limit on the number of tasks Airflow can run concurrently per DAG (default is 16). DAG takes
concurrency
argument that can change this number. There is also a limit on the number of tasks per global Airflow and the number DAG runs per DAG
= PythonSensor(
wait_for_supermarket_1 ="wait_for_supermarket_1",
task_id=_wait_for_supermarket,
python_callable={"supermarket": "supermarket_1"},
op_kwargs=20, # Default is 16
concurreny=dag
dag )
- There is snowball effect when sensors don’t succeed. The occupy slots that DAG has (which is determined by the concurrency argument. From the above figure, if only task 1 succeeds and the rest keeps polling and the DAG is scheduled daily with default concurrency of 16 slots and default timeout of 7 days, this is what will happen (sensor deadlock):
- Day 1: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 3 tasks.
- Day 2: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 6 tasks.
- Day 3: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 9 tasks.
- Day 4: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 12 tasks.
- Day 5: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 15 tasks.
- Day 6: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 16 tasks; two new tasks cannot run, and any other task trying to run is blocked.
- This also affect the global Airflow limit of maximum number of tasks that can run concurrently, which may lead to whole system get stalled.
- For sensor task, it pokes to check the condition and block if it is false. So it would run for a little bit and wait for the most part. It keeps poking untel the timeout period is completed, which means it keeps occupying the slot until the condition becomes true or timeout is reached
mode
argument which has two values: {poking
,reschedule
}. The default is poking. Reschedule can solve the sensor deadlock and snowball effect because it releases the slot the sensor task is occupying after the slot has finished poking. In other words, sensor task would poke, if condition if false, the system will reschedule it and take its slot and make it available to other tasks. It is the same concept as process scheduling that the OS does when a process does a blocking system call.
= PythonSensor(
wait_for_supermarket_1 ="wait_for_supermarket_1",
task_id=_wait_for_supermarket,
python_callable={"supermarket": "supermarket_1"},
op_kwargs="reschedule",
mode=dag
dag )
- We can trigger another DAG to run from inside another DAG using
TriggerDagRunOperator
. This will cause another DAG to run once the trigger_operator runs which is useful if we want to split DAGs and make some DAGs available to other DAGs instead of repearing functionality. See below for both approaches:
from pathlib import Path
import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.python import PythonSensor
= DAG(
dag1 ="ingest_supermarket_data",
dag_id=airflow.utils.dates.days_ago(3),
start_date="0 16 * * *",
schedule_interval
)= DAG(
dag2 ="create_metrics",
dag_id=airflow.utils.dates.days_ago(3),
start_date=None, # Since it will be triggered
schedule_interval
)
def _wait_for_supermarket(supermarket_id_):
= Path("/data/" + supermarket_id_)
supermarket_path = supermarket_path.glob("data-*.csv")
data_files = supermarket_path / "_SUCCESS"
success_file return data_files and success_file.exists()
for supermarket_id in range(1, 5):
= PythonSensor(
wait =f"wait_for_supermarket_{supermarket_id}",
task_id=_wait_for_supermarket,
python_callable={"supermarket_id_": f"supermarket{supermarket_id}"},
op_kwargs=dag1,
dag
)= DummyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}", dag=dag1)
copy = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag1)
process = TriggerDagRunOperator(
trigger_create_metrics_dag =f"trigger_create_metrics_dag_supermarket_{supermarket_id}",
task_id="create_metrics", # Has to be the same dag_id as dag2
trigger_dag_id=dag1,
dag
)>> copy >> process >> trigger_create_metrics_dag
wait
= DummyOperator(task_id="compute_differences", dag=dag2)
compute_differences = DummyOperator(task_id="update_dashboard", dag=dag2)
update_dashboard = DummyOperator(task_id="notify_new_data", dag=dag2)
notify_new_data >> update_dashboard compute_differences
- Each DAG run has a run_id that starts with one of the following:
scheduled__
to indicate the DAG run started because of its schedulebackfill__
to indicate the DAG run started by a backfill jobmanual__
to indicate the DAG run started by a manual action (e.g., pressing the Trigger Dag button, or triggered by a TriggerDagRunOperator)
- From the UI, scheduled DAGs have their task instance in black border while Triggered DAGs don’t
- Clearing a task in a DAG will clear the task and all its downstream tasks and trigger a run (backfill)
- It only clears tasks within the same DAG, NOT downstream tasks in another DAG of TriggerDagRunOperator
- If the triggered DAG has dependency on multiple triggering DAGs to be completed before it can run, then we can use
ExternalTaskSensor
that checks whether the task has been completed successfully (sensor poking the state of tasks in another DAGs). EachExternalTaskSensor
checks for only 1 task by querying the metastore database- By default, it uses the same execution_date as itself
- If the task runs on different schedule, we then need to provide timedelta object to
execution_delta
argument to get what would be the execution_date of the task it tries to sense
import datetime
import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor
= DAG(
dag1 ="ingest_supermarket_data",
dag_id=airflow.utils.dates.days_ago(3),
start_date="0 16 * * *",
schedule_interval
)= DAG(
dag2 ="create_metrics",
dag_id=airflow.utils.dates.days_ago(3),
start_date="0 18 * * *",
schedule_interval
)
="copy_to_raw", dag=dag1) >> DummyOperator(
DummyOperator(task_id="process_supermarket", dag=dag1
task_id
)
= ExternalTaskSensor(
wait ="wait_for_process_supermarket",
task_id="figure_6_20_dag_1",
external_dag_id="process_supermarket",
external_task_id# positive # will be subtracted from the execution_date of task sensor
# to get the execution_date of the task it is trying to sense
=datetime.timedelta(hours=6),
execution_delta=dag2,
dag
)= DummyOperator(task_id="report", dag=dag2)
report >> report wait
- We can also trigger DAGs from CLI which will have execution_date of the current data and time
airflow dags trigger dag1
- With configuration; which will be available in the context of each task using context[“dag_run”].conf:
airflow dags trigger -c '{"supermarket_id": 1}' dag1
airflow dags trigger --conf '{"supermarket_id": 1}' dag1