import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
= DAG(dag_id="python-operator-context", start_date=airflow.utils.dates.days_ago(1))
dag
def _print_context(**kwargs):
print(kwargs)
= PythonOperator(task_id="print-context", python_callable=_print_context, dag=dag)
print_context print_context
Note
Other than my experience and the documentation, the main resource behind this post and figures is the fantastic book: Data Pipelines with Apache. Airflow.
- Airflow uses
Pendulum
library for datetimes. It is a drop-in replacement to the Python standard library datetime but with much nicer API and more features. - Not all arguments can be templates. By default, all arguments are not made into templates and
{name}
will be read as a literal stringname
unless it is included intemplate_fields
in the list of attributes that can be templated in the Operator.- Elements in the
template_fields
are names for class attributes. The arguments passed to the__init__
match the class attributes.
- Elements in the
- All operators; such as BashOperator, take their argument as string except PythonOperator. It takes its argument as
python_callable
, which is any callable object in Python. The context and parameters will be available to this callable.- The context variable is a dictionary that has all the instance variables for this task.
- we can use default
**kwargs
or make it easier to read using**context
- we can use default
- If we specify argument name in the python_callable, then Airflow will call the python_callable with all the variables in the context.
- If a variable is specified as argument by the callable, then it is passed to the callabe
- Otherwise, it is added to the context dictionary. If we don’t have context dictionary as an argument for the callable, then all other variables in the context that are not specified as arguments will be discarded.
- The context variable is a dictionary that has all the instance variables for this task.
- Some arguments of operators can be templated
- Templating happens at run time
- We can provide arguments to PythonOperator using:
op_args
: list of positional arguments that are passed to the callableop_kwargs
: dictionary of keyword arguments
- We can inspect the templated arguments either on the UI or using the CLI:
- CLI:
airflow tasks render [dag id] [task id] [desired execution date]
- CLI:
- There are two ways to pass data between tasks:
- read/write to the metastore. It is called
XCom
- This is done by pickling the objects we want to share and write it to metastore. After that, tasks can read the pickled objects (and unpickle them)
- This is only recommended for small objects because the object are stored as blobs in the metastore. Don’t use it for large objects
- read/write to persistent storage such as disk or database
- read/write to the metastore. It is called
- Tasks are independent and may run on completely different machines -> Can’t share memory -> Sharing has to be through persistent storage.
- Most operators are installed via separate pip install. For example, PostgresOperator allows us to work with PostgreSQL database.
- We can install operators like
pip install apache-airflow-providers-*
- We can import the operator as
from airflow.providers.pogstres.operators.postgres import PostgresOperator
- We can add connections using UI or CLI, which Airflow store them encrypted in metastore, such as:
airflow connections add \ \ --conn-type postgres \ --conn-host localhost \ --conn-login postgres \ --conn-password mysecretpassword my_postgres
- We can later refer to those credentions by name when connecting to any database
- Airflow takes care of setting up the connection and close it once done
- We can install operators like
- Postgres is an external system and Airflow supports connecting to a wide range of external systems with the help of many operators in its ecosystem. This does have an implication: connecting to an external system often requires specific dependencies to be installed, which allow connecting and communicating with the external system. This also holds for Postgres; we must install the package apache-airflow-providers- postgres to install additional Postgres dependencies in our Airflow installation.
- Upon execution of the PostgresOperator, a number of things happen. The PostgresOperator will instantiate a so-called hook to communicate with Postgres. The hook deals with creating a connection, sending queries to Postgres and closing the connection afterward. The operator is merely passing through the request from the user to the hook in this situation.
- An operator determines what has to be done; a hook determines how to do something. When building pipelines like these, you will only deal with operators and have no notion of any hooks, because hooks are used internally in operators.
- There’s a number of things to point out in this last step. The DAG has an additional argument: template_searchpath. Besides a string INSERT INTO …, the content of files can also be templated. Each operator can read and template files with specific extensions by providing the file path to the operator. In the case of the Postgres- Operator, the argument SQL can be templated and thus a path to a file holding a SQL query can also be provided. Any filepath ending in .sql will be read, templates in the file will be rendered, and the queries in the file will be executed by the PostgresOperator. Again, refer to the documentation of the operators and check the field template_ext, which holds the file extensions that can be templated by the operator.
- Jinja requires you to provide the path to search for files that can be templated. By default, only the path of the DAG file is searched for, but since we’ve stored it in /tmp, Jinja won’t find it. To add paths for Jinja to search, set the argument template_searchpath on the DAG and Jinja will traverse the default path plus additional provided paths to search for.