Airflow Xcoms
What Are XComs in Airflow? XComs, short for Cross-Communications, allow tasks in an Airflow DAG to share data with each other. They store small pieces of data (key-value pairs) in Airflow’s metadata database, making it possible for one task to push data and another task to retrieve it later. When using a PythonOperator, Airflow automatically creates an XCom if the function returns a value. However, the way Airflow names and stores these values might not always be intuitive. To have more control over the XCom, you can use the task instance (ti) object and specify a custom key when pushing data. Let's look at a simple example: # dags/xcom_dag.py from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from random import uniform from datetime import datetime default_args = { 'start_date': datetime(2020, 1, 1) } def _training_model(ti): accuracy = uniform(0.1, 10.0) print(f'model\'s accuracy: {accuracy}') def _choose_best_model(ti): print('choose best model') with DAG('xcom_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag: downloading_data = BashOperator( task_id='downloading_data', bash_command='sleep 3' ) training_model_task = [ PythonOperator( task_id=f'training_model_{task}', python_callable=_training_model ) for task in ['A', 'B', 'C']] choose_model = PythonOperator( task_id='choose_model', python_callable=_choose_best_model ) downloading_data >> training_model_task >> choose_model Once the above code is executed in Airflow, navigating to the XCom tables will show a newly generated XCom by default, as shown below: By default, BashOperator stores the command's output in XCom. However, if you don't need this behavior, you can set do_xcom_push=False to prevent unnecessary data storage. This helps keep the Airflow metadata database clean and optimized. Using XComs for Data Sharing We can modify the code by explicitly pushing and pulling XComs using the task instance (ti). This allows tasks to share data efficiently: # dags/xcom_dag.py from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from random import uniform from datetime import datetime default_args = { 'start_date': datetime(2020, 1, 1) } def _training_model(ti): accuracy = uniform(0.1, 10.0) print(f'model\'s accuracy: {accuracy}') ti.xcom_push(key='model_accuracy', value=accuracy) def _choose_best_model(ti): print('choose best model') accuracies = ti.xcom_pull(key='model_accuracy', task_ids=['training_model_A', 'training_model_B', 'training_model_C']) print(accuracies) with DAG('xcom_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag: downloading_data = BashOperator( task_id='downloading_data', bash_command='sleep 3', do_xcom_push=False # Prevents unnecessary XCom storage ) training_model_task = [ PythonOperator( task_id=f'training_model_{task}', python_callable=_training_model ) for task in ['A', 'B', 'C']] choose_model = PythonOperator( task_id='choose_model', python_callable=_choose_best_model ) downloading_data >> training_model_task >> choose_model This DAG demonstrates how XComs facilitate data sharing between tasks in Airflow. It starts with a BashOperator task (downloading_data) that simulates data preparation. Next, three PythonOperator tasks (training_model_A, training_model_B, and training_model_C) generate random accuracy scores and push them to XCom using ti.xcom_push(key='model_accuracy', value=accuracy). Finally, the choose_model task retrieves these values using ti.xcom_pull(key='model_accuracy', task_ids=['training_model_A', 'training_model_B', 'training_model_C']) and prints them. This process enables tasks to exchange data without direct dependencies, with values stored in Airflow’s metadata database for later retrieval. As a proof of success, the screenshot below shows the result logs for the choose_model task, where the accuracy scores are printed: XCom Limitations While XComs are powerful, they do have limitations. Since Airflow is an orchestration tool, not a data processing tool, handling large volumes of data via XComs can be inefficient. The size limitations depend on the metadata database being used: MySQL: ~64 KB PostgreSQL: ~1 GB SQLite: ~2 GB For larger data transfers, consider using external storage solutions like Amazon S3, Google Cloud Storage, or Azure Blob Storage, and pass only references (e.g., file paths) via XComs. With this understanding, you can now confidently use XComs in Airflow to facilitate data sharing between tasks wh

What Are XComs in Airflow?
XComs, short for Cross-Communications, allow tasks in an Airflow DAG to share data with each other. They store small pieces of data (key-value pairs) in Airflow’s metadata database, making it possible for one task to push data and another task to retrieve it later.
When using a PythonOperator, Airflow automatically creates an XCom if the function returns a value. However, the way Airflow names and stores these values might not always be intuitive. To have more control over the XCom, you can use the task instance (ti
) object and specify a custom key when pushing data.
Let's look at a simple example:
# dags/xcom_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from random import uniform
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
def _training_model(ti):
accuracy = uniform(0.1, 10.0)
print(f'model\'s accuracy: {accuracy}')
def _choose_best_model(ti):
print('choose best model')
with DAG('xcom_dag',
schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
downloading_data = BashOperator(
task_id='downloading_data',
bash_command='sleep 3'
)
training_model_task = [
PythonOperator(
task_id=f'training_model_{task}',
python_callable=_training_model
) for task in ['A', 'B', 'C']]
choose_model = PythonOperator(
task_id='choose_model',
python_callable=_choose_best_model
)
downloading_data >> training_model_task >> choose_model
Once the above code is executed in Airflow, navigating to the XCom tables will show a newly generated XCom by default, as shown below:
By default, BashOperator stores the command's output in XCom. However, if you don't need this behavior, you can set do_xcom_push=False
to prevent unnecessary data storage. This helps keep the Airflow metadata database clean and optimized.
Using XComs for Data Sharing
We can modify the code by explicitly pushing and pulling XComs using the task instance (ti
). This allows tasks to share data efficiently:
# dags/xcom_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from random import uniform
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
def _training_model(ti):
accuracy = uniform(0.1, 10.0)
print(f'model\'s accuracy: {accuracy}')
ti.xcom_push(key='model_accuracy', value=accuracy)
def _choose_best_model(ti):
print('choose best model')
accuracies = ti.xcom_pull(key='model_accuracy', task_ids=['training_model_A', 'training_model_B', 'training_model_C'])
print(accuracies)
with DAG('xcom_dag',
schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
downloading_data = BashOperator(
task_id='downloading_data',
bash_command='sleep 3',
do_xcom_push=False # Prevents unnecessary XCom storage
)
training_model_task = [
PythonOperator(
task_id=f'training_model_{task}',
python_callable=_training_model
) for task in ['A', 'B', 'C']]
choose_model = PythonOperator(
task_id='choose_model',
python_callable=_choose_best_model
)
downloading_data >> training_model_task >> choose_model
This DAG demonstrates how XComs facilitate data sharing between tasks in Airflow. It starts with a BashOperator
task (downloading_data
) that simulates data preparation. Next, three PythonOperator
tasks (training_model_A
, training_model_B
, and training_model_C
) generate random accuracy scores and push them to XCom using ti.xcom_push(key='model_accuracy', value=accuracy)
. Finally, the choose_model
task retrieves these values using ti.xcom_pull(key='model_accuracy', task_ids=['training_model_A', 'training_model_B', 'training_model_C'])
and prints them. This process enables tasks to exchange data without direct dependencies, with values stored in Airflow’s metadata database for later retrieval.
As a proof of success, the screenshot below shows the result logs for the choose_model
task, where the accuracy scores are printed:
XCom Limitations
While XComs are powerful, they do have limitations. Since Airflow is an orchestration tool, not a data processing tool, handling large volumes of data via XComs can be inefficient. The size limitations depend on the metadata database being used:
- MySQL: ~64 KB
- PostgreSQL: ~1 GB
- SQLite: ~2 GB
For larger data transfers, consider using external storage solutions like Amazon S3, Google Cloud Storage, or Azure Blob Storage, and pass only references (e.g., file paths) via XComs.
With this understanding, you can now confidently use XComs in Airflow to facilitate data sharing between tasks while ensuring efficiency and best practices.