Airflow Part 6 - Sharing Data Between Tasks

Imad Dabbura


February 28, 2022


Other than my experience and the documentation, the main resource behind this post and figures is the fantastic book: Data Pipelines with Apache. Airflow.

from typing import Any
from airflow.models.xcom import BaseXCom

class CustomXComBackend(BaseXCom):
    def serialize(value: Any):
    def deserialize(result):

from airflow.decorators import task

with DAG(...) as dag:
    start = DummyOperator(task_id="start")
    start >> fetch_sales
    start >> fetch_weather
    fetch_sales >> clean_sales
    fetch_weather >> clean_weather
    [clean_sales, clean_weather] >> join_datasets
    def train_model():
        model_id = str(uuid.uuid4())
        # Airflow will figure out that the return value is XCom
        # and would take care of pushing it
        return model_id

    def deploy_model(model_id: str):
        # Airflow would realize that this task uses XCom so it passes
        # it from XCom
        print(f"Deploying model {model_id}")

model_id = train_model()

# Now train_model and deploy_model will be new tasks
# with explicit dependeny. 
# The task type is PythonDecoratedOperator
join_datasets >> model_id