Note
Other than my experience and the documentation, the main resource behind this post and figures is the fantastic book: Data Pipelines with Apache. Airflow.
- It is meant to exchange messages between tasks, which is some form of shared state
- We can use dag instance to push/pull data between tasks:
conext["dag_instance"].xcom_push(key="data_name", value="value")
to push data to metastore. It also store thedag_id
,task_id
, &execution_date
.conext["dag_instance"].xcom_pull(key="data_name")
which pull the shared data. We can also specifydag_id
andexecution_date
.- We can also access push/pull methods in templates using
task_instance.xcom_push()
ortask_instance.xcom_pull()
- We can view the shared data on the UI by going to Admin -> XComs
- Limitations:
- XComs data will be pickled and stored in the database -> The objects have to be serializable
- Size limitations:
- SQLite—Stored as BLOB type, 2GB limit
- PostgreSQL—Stored as BYTEA type, 1 GB limit
- MySQL—Stored as BLOB type, 64 KB limit
- It create hidden dependency between tasks because now the task the pushes the shared state has to push the data before the task that pulls the data. Airflow won’t manage/respect this dependency the developer has to document this and make sure this is not an issue based on the tasks’ order
- Due to its limitations in terms of size, we can create custom backends for XComs by defining a class that inherits from
BaseXCom
and implements two static methods. Airflow will use this class. It can be added toxcom_backend
parameter in the Airflow configWe can use cheap/large storage services on the cloud such as Amazon S3, Azure Blob Storage, or Google GCS.
from typing import Any
from airflow.models.xcom import BaseXCom
class CustomXComBackend(BaseXCom):
@staticmethod
def serialize(value: Any):
...
@staticmethod
def deserialize(result):
...
- If most of tasks are PythonOperators, we can use
Taskflow
API that takes care of passing state between tasks and avoid the boilerplate code that we have to write with regular API. We need to just decorate the function that we use in the PythonOperator with@task
and Airflow will take care of the rest by passed XCom data between tasks. Example:
from airflow.decorators import task
with DAG(...) as dag:
= DummyOperator(task_id="start")
start >> fetch_sales
start >> fetch_weather
start >> clean_sales
fetch_sales >> clean_weather
fetch_weather >> join_datasets
[clean_sales, clean_weather]
@task
def train_model():
= str(uuid.uuid4())
model_id # Airflow will figure out that the return value is XCom
# and would take care of pushing it
return model_id
@task
def deploy_model(model_id: str):
# Airflow would realize that this task uses XCom so it passes
# it from XCom
print(f"Deploying model {model_id}")
= train_model()
model_id
deploy_model(model_id)
# Now train_model and deploy_model will be new tasks
# with explicit dependeny.
# The task type is PythonDecoratedOperator
>> model_id join_datasets
- Any data passed between Taskflow-style tasks will be stored as XComs and subject to the same limitations of XCom
- The main limitation of Taskflow API is that it is still only for PythonOperators