Airflow Part 6 - Sharing Data Between Tasks

Data Engineering
MLOps
Airflow
Author

Imad Dabbura

Published

February 28, 2022

Note

Other than my experience and the documentation, the main resource behind this post and figures is the fantastic book: Data Pipelines with Apache. Airflow.

from typing import Any
from airflow.models.xcom import BaseXCom

class CustomXComBackend(BaseXCom):
    
    @staticmethod
    def serialize(value: Any):
        ...
    
    @staticmethod
    def deserialize(result):
        ...

from airflow.decorators import task


with DAG(...) as dag:
    start = DummyOperator(task_id="start")
    start >> fetch_sales
    start >> fetch_weather
    fetch_sales >> clean_sales
    fetch_weather >> clean_weather
    [clean_sales, clean_weather] >> join_datasets
    
    @task
    def train_model():
        model_id = str(uuid.uuid4())
        # Airflow will figure out that the return value is XCom
        # and would take care of pushing it
        return model_id

    @task
    def deploy_model(model_id: str):
        # Airflow would realize that this task uses XCom so it passes
        # it from XCom
        print(f"Deploying model {model_id}")

model_id = train_model()
deploy_model(model_id)

# Now train_model and deploy_model will be new tasks
# with explicit dependeny. 
# The task type is PythonDecoratedOperator
join_datasets >> model_id