A short guide to dags in Airflow 2.0 using Taskflow API

Prince Arora
2 min readFeb 17, 2021

I recently started using Airflow for one of my projects and really liked the way airflow is designed and how it can handle different use cases in the domain of ETL, data sync etc.

Trying to figure the code realized that the current documentation is quite fragmented and the code examples online are mix of different implementations via TaskFlowAPI(dag decorator) and V1 API using dag module.

In this article I will try to give a brief intro to Airflow and then we will create a dags and trigger it from another dag with some code examples using TaskFlowAPI.

What is Airflow, when to use it?

Apache Airflow is an open-source workflow management platform.

Airflow is a job scheduler and you can use it for tasks which have dependencies on each other and have to wait for the previous task/s to complete.
In this article I will not be able to go through the airflow basics or installation.
You can learn more about airflow here https://airflow.apache.org/

I would recommend having basic understanding of Airflow concepts like hooks, operators and task before jumping in the code examples.

https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html
https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html

Writing your dags and tasks.
Let us go through a brief code example here and understand how it works.

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization

default_args = {
'owner': 'airflow',
}


@dag(
default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=["test", "me"])
def degree_to_fahrenheit(**kwargs):
@task()
def multiply_by_9(value):
return value * 9

@task()
def divide_by_5(value):
return value / 5

@task()
def add_32(value):
temp_in_fahrenheit = value + 32
print(f"Temperature in Fahrenheit:{ temp_in_fahrenheit }")
return temp_in_fahrenheit

multiplied_value = multiply_by_9(37) # task 1
divided_value = divide_by_5(multiplied_value) # task 2
add_32(divided_value) # task 3


degree_to_fahrenheit = degree_to_fahrenheit()

In the above code example.
The name/dag_id of the dag on airflow will be : degree_to_fahrenheit
It has 3 tasks named
multiply_by_9 >> divide_by_5 >> add_32
which are dependent on each other.

Triggering one dag from another using TriggerDagRunOperator:

from datetime import datetime, timezone
from airflow.decorators import dag, task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization

default_args = {
'owner': 'airflow',
}


@dag(default_args=default_args,
schedule_interval=None,
start_date=days_ago(0),
tags=["trigger", "dag"])
def trigger_degree_to_fahrenheit(**kwargs):

@task()
def trigger_another_dag():
TriggerDagRunOperator(
task_id="a_unique_task_id",
execution_date=datetime.now().replace(tzinfo=timezone.utc),
trigger_dag_id="degree_to_fahrenheit", # dag to trigger conf={"k": "v"}
).execute(kwargs)

trigger_another_dag()


trigger_degree_to_fahrenheit = trigger_degree_to_fahrenheit()

In the above code example.
The name/dag_id of the dag on airflow will be : trigger_degree_to_fahrenheit
It has a task, trigger_another_dag.
Using TriggerDagRunOperator we can instantiate execution of another dag.

I will try to cover a few more Airflow concepts here, please let know if there are any suggestions!

--

--