Dynamic Task Mapping (Airflow)

1. Contexto Durante o desenvolvimento de uma DAG no Airflow surgiu uma necessidade de criar múltiplas tasks de forma dinâmica, as tasks deveriam ser criadas em tempo de execução e não se sabe previamente a quantidade de tasks. Um caso prático seria ter um conjunto com n números e esses números devem passar por um processamento. Supondo que esse conjunto de valores tem 3 elementos, portanto seria a necessária a declaração de 3 tasks. Para valores pré-definidos a declaração dessas tasks são simples. Abaixo um exemplo de código que processa números multiplicando-os por 2: # imports def get_values(): return [10, 20, 30] def double_value(value): return int(value) * 2 # DAG code get_values_task = PythonOperator( task_id=f"get_values", python_callable=get_values ) double_value_1_task = PythonOperator( task_id=f"double_value_1", python_callable=double_value, op_args=["{{ task_instance.xcom_pull('get_values')[0] }}"] ) double_value_2_task = PythonOperator( task_id=f"double_value_2", python_callable=double_value, op_args=["{{ task_instance.xcom_pull('get_values')[1] }}"] ) double_value_3_task = PythonOperator( task_id=f"double_value_3", python_callable=double_value, op_args=["{{ task_instance.xcom_pull('get_values')[2] }}"] ) get_values_task >> [double_value_1_task, double_value_2_task, double_value_3_task] Entretanto há 2 limitações: A criação de tasks no Airflow é declarativa. Necessariamente se precisa declarar um Operator e definir a ordem de execução dessa task. Se surgir a necessidade de criar tasks que tem a mesma finalidade com valores dos parâmetros diferentes é necessário declarar um Operator individualmente. É possível a criação das tasks com um loop, entretanto é necessário saber previamente quantas tasks é necessário criar. Resgatar os valores via XCOM não são acessáveis fora de uma Task Instance. Esse fato acontece pois os valores via XCOM são serealizados/deserealizados e renderizados em tempo de execução ao inicializar a execução da task, a renderização faz parte da execução da task. Reutilizando o exemplo acima da função double_value, se os valores necessários para esse processamento estiver na XCOM então só seria possível acessá-los durante a execução de outra task. Criar uma task dentro de uma outra task não é recomendável. Então como posso criar tasks dinamicamente baseado nos valores de tasks anteriores? No Airflow há um recurso para resolver esse problema: Dynamic Task Mapping. 2. O que é Dynamic Task Mapping? O Dynamic Task Mapping permite criar um conjuntos de tasks em tempo de execução baseado em parâmetros, sem o autor da DAG saber quantas tasks são necessárias previamente. É similar a definir as tasks num loop, entretanto o scheduler do Airflow usa como base o output da task anterior. Com esse recurso é possível paralelizar a execução da task, onde a task declarada terá uma lista de subtasks, cada substasks será criada dinamicamente baseado no parâmetro passado. Esse recurso no Airflow tornou-se disponível a partir da versão 2.3.0. 3. Um exemplo prático A ideia do Dynamic Task Mapping é definir os parâmetros onde um desses parâmetros será utilizado para a criação das tasks dinâmicas. A sintaxe de declaração do Operator é semelhante a um Operator convencional, haverá a adição de 2 métodos: partial e expand. expand Como o próprio nome diz significa expandir o parâmetro utilizado. Dado a lista do parâmetro definido então sera expandido essa lista. Para cada expansão é criado uma nova task. É possível definir 1 ou mais parâmetros para expandir. Caso haja mais de 1 parâmetro então será feito o produto cartesiano dos parâmetros. PythonOperator.partial(...).expand(parameter_1, parameter_2,...) partial Valores intermediários onde serão fixos para cada expansão dos parâmetros do expand, esses valores não são expandidos. No partial terá os parâmetros da task como task_id, python_callable, parâmetros definidos e etc. PythonOperator.partial(task_id="task_id", python_callable=...).expand(...) Abrindo um parênteses, há uma forma de pegar o output de uma task da seguinte maneira: # Task arbitraty arbitraty_task = PythonOperator(...) arbitraty_task.output Juntando essas informações e definindo a Dynamic Task Mapping para o primeiro exemplo dado de processa os números multiplicando-os por 2: double_value_task = PythonOperator.partial( task_id=f"double_value", python_callable=double_value, ).expand(op_args=get_values_task.output) Na prática não se torna necessário declarar explicitamente a task para cada valor retornado da task anterior. Como se interpreta o PythonOperator criado com Dynamic Task Mapping? Para cada valor retornado da task get_values será invocado a função double_value. Como o Airflow mostra essa informação na UI? O grafo mostrado será diferente, onde terá apenas 1 PythonOperator declarado. Para acessar a informação de cada

Mar 31, 2025 - 16:04
 0
Dynamic Task Mapping (Airflow)

1. Contexto

Durante o desenvolvimento de uma DAG no Airflow surgiu uma necessidade de criar múltiplas tasks de forma dinâmica, as tasks deveriam ser criadas em tempo de execução e não se sabe previamente a quantidade de tasks.

Um caso prático seria ter um conjunto com n números e esses números devem passar por um processamento. Supondo que esse conjunto de valores tem 3 elementos, portanto seria a necessária a declaração de 3 tasks. Para valores pré-definidos a declaração dessas tasks são simples.

Abaixo um exemplo de código que processa números multiplicando-os por 2:

# imports

def get_values():
    return [10, 20, 30]

def double_value(value):
    return int(value) * 2

# DAG code
get_values_task = PythonOperator(
    task_id=f"get_values",
    python_callable=get_values
)

double_value_1_task = PythonOperator(
    task_id=f"double_value_1",
    python_callable=double_value,
    op_args=["{{ task_instance.xcom_pull('get_values')[0] }}"]
)

double_value_2_task = PythonOperator(
    task_id=f"double_value_2",
    python_callable=double_value,
    op_args=["{{ task_instance.xcom_pull('get_values')[1] }}"]
)
double_value_3_task = PythonOperator(
    task_id=f"double_value_3",
    python_callable=double_value,
    op_args=["{{ task_instance.xcom_pull('get_values')[2] }}"]
)

get_values_task >> [double_value_1_task, double_value_2_task, double_value_3_task]

Entretanto há 2 limitações:

  1. A criação de tasks no Airflow é declarativa. Necessariamente se precisa declarar um Operator e definir a ordem de execução dessa task. Se surgir a necessidade de criar tasks que tem a mesma finalidade com valores dos parâmetros diferentes é necessário declarar um Operator individualmente.
    É possível a criação das tasks com um loop, entretanto é necessário saber previamente quantas tasks é necessário criar.

  2. Resgatar os valores via XCOM não são acessáveis fora de uma Task Instance. Esse fato acontece pois os valores via XCOM são serealizados/deserealizados e renderizados em tempo de execução ao inicializar a execução da task, a renderização faz parte da execução da task.
    Reutilizando o exemplo acima da função double_value, se os valores necessários para esse processamento estiver na XCOM então só seria possível acessá-los durante a execução de outra task. Criar uma task dentro de uma outra task não é recomendável.

Então como posso criar tasks dinamicamente baseado nos valores de tasks anteriores? No Airflow há um recurso para resolver esse problema: Dynamic Task Mapping.

2. O que é Dynamic Task Mapping?

O Dynamic Task Mapping permite criar um conjuntos de tasks em tempo de execução baseado em parâmetros, sem o autor da DAG saber quantas tasks são necessárias previamente. É similar a definir as tasks num loop, entretanto o scheduler do Airflow usa como base o output da task anterior.

Com esse recurso é possível paralelizar a execução da task, onde a task declarada terá uma lista de subtasks, cada substasks será criada dinamicamente baseado no parâmetro passado.

Esse recurso no Airflow tornou-se disponível a partir da versão 2.3.0.

3. Um exemplo prático

A ideia do Dynamic Task Mapping é definir os parâmetros onde um desses parâmetros será utilizado para a criação das tasks dinâmicas. A sintaxe de declaração do Operator é semelhante a um Operator convencional, haverá a adição de 2 métodos: partial e expand.

  • expand

Como o próprio nome diz significa expandir o parâmetro utilizado. Dado a lista do parâmetro definido então sera expandido essa lista. Para cada expansão é criado uma nova task.

É possível definir 1 ou mais parâmetros para expandir. Caso haja mais de 1 parâmetro então será feito o produto cartesiano dos parâmetros.

PythonOperator.partial(...).expand(parameter_1, parameter_2,...)
  • partial

Valores intermediários onde serão fixos para cada expansão dos parâmetros do expand, esses valores não são expandidos.

No partial terá os parâmetros da task como task_id, python_callable, parâmetros definidos e etc.

PythonOperator.partial(task_id="task_id", python_callable=...).expand(...)

Abrindo um parênteses, há uma forma de pegar o output de uma task da seguinte maneira:

# Task arbitraty
arbitraty_task = PythonOperator(...)
arbitraty_task.output

Juntando essas informações e definindo a Dynamic Task Mapping para o primeiro exemplo dado de processa os números multiplicando-os por 2:

double_value_task = PythonOperator.partial(
    task_id=f"double_value",
    python_callable=double_value,
).expand(op_args=get_values_task.output)

Na prática não se torna necessário declarar explicitamente a task para cada valor retornado da task anterior.

Como se interpreta o PythonOperator criado com Dynamic Task Mapping?

Para cada valor retornado da task get_values será invocado a função double_value.

Como o Airflow mostra essa informação na UI?

O grafo mostrado será diferente, onde terá apenas 1 PythonOperator declarado.

Image description

Para acessar a informação de cada task criada dinamicamente será necessário clicar na task e acessar o campo Mapped Tasks. O Map Index é criado de 0 até n, onde n é n-ésimo índice que indica a quantidade de elementos retornados da task anterior.

Image description

Por fim a DAG descrita no primeiro exemplo, que processa números multiplicando-os por 2, pode ser reduzida para:

# imports

def get_values():
    return [[10], [20], [30]]

def double_value(value):
    return int(value) * 2

# DAG code
get_values_task = PythonOperator(
    task_id=f"get_values",
    python_callable=get_values
)

double_value_task = PythonOperator.partial(
    task_id=f"double_value",
    python_callable=double_value,
).expand(op_args=get_values_task.output)

get_values_task >> double_value_task

Em resumo o Dynamic Task Mapping pode ser utilizado para criar tasks de forma dinâmica e paralelizar processamentos. Abaixo é possível acessar a documentação para entender com mais detalhes.

4. Referências

[1] Airflow. Dynamic Task Mapping. Disponível em: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html. Acessado em: 09/12/2024.