Intro to Airflow
Introduction to Airflow
Data engineering is: Taking any action involving data and turning it into a reliable, repeatable, and maintainable process.
[[Workflow]] is a set of steps to accomplish a given data engineering task.
[[Apache Airflow]] is a platform to program workflows.
[[DAG]] stands for Directed Acyclic Graph
Airflow DAGs
What is a DAG?
Directed Acyclic Graph
- Directed: inherent flow representing dependencies between components
- Acyclic: does not loop
- Graph: the actual set of components
DAG in Airflow
- Are written in Python (but can use components written in other languages)
- Are made up of components (typically tasks) to be executed, such as operators, sensors, etc.
- Contain dependencies defined explicitly or implicitly
from airflow.models import DAG
from datetime import datetime
default_arguments = {
'owner':'jdoe',
'start_date': datetime(2020, 1, 20)
}
etl_dag = DAG('etl_workflow', default_args = default_arguments)
DAGs on the command line
The airflow
command line program contains many subcommands
airflow -h
airflow list_dags
Command line vs Python
Cmd line | Python |
---|---|
Start Airflow processes | Create a DAG |
Manually run DAGs / Tasks | Edit the individual properties of a DAG |
Get logging information from Airflow |
# Import the DAG object
from airflow.models import DAG
# Define the default_args dictionary
default_args = {
'owner': 'dsmith',
'start_date': datetime(2020, 1, 14),
'retries': 2
}
# Instantiate the DAG object
etl_dag = DAG('example_etl', default_args=default_args)
Instantiate
Airflow web interface
Implementing Airflow DAGs
[[Airflow operator]]s
Operators
Represent a single task in a workflow Run independently (usually) Generally do not share information between each other Various operators to perform different tasks
[[BashOperator]]
Executes a given Bash command or script
from airflow.operators.bash_operator import BashOperator
BashOperator(
task_id = 'bash_example',
bash_command = 'echo "Example!"',
dag = ml_dag
)
BashOperator(
task_id = 'bash_script_example',
bash_command = 'runcleanup.sh',
dag = ml_dag
)
bash_task = BashOerator(task_id = 'clean_addresses',
bash_command = 'cat addresses.txt | awk "NF==10" > cleaned.txt',
dag = dag)
Runs the command in a temporary directory Can specify environment variables for the command
Operator gotchas
Not guaranteed to run in the same location / environment May require extensive use of Environment variables Can be difficult to run tasks with elevated privileges
# Import the BashOperator
from airflow.operators.bash_operator import BashOperator
# Define the BashOperator
cleanup = BashOperator(
task_id='cleanup_task',
# Define the bash_command
bash_command='cleanup.sh',
# Add the task to the dag
dag=analytics_dag
)
# Define a second operator to run the `consolidate_data.sh` script
consolidate = BashOperator(
task_id='consolidate_task',
bash_command='consolidate_data.sh',
dag=analytics_dag)
# Define a final operator to execute the `push_data.sh` script
push_data = BashOperator(
task_id='pushdata_task',
bash_command='push_data.sh',
dag=analytics_dag)
[[Airflow task]]s
Tasks
Instances of operators Usually assigned to a variable in Python Referred to by the task_id within the Airflow tools
Task dependencies
Define a given order of task completion Are not required for a given workflow, but usually present in most Are referred to as upstream or downstream tasks. In Airflow 1.8 and later, are defined using the bitshift operators >>, or the upstream operator <<, or the downstream operator
Upstream = before Downstream = after
task1 >> task2
# or task2 << task1
Multiple dependencies
![[Pasted image 20220629143605.png]]
Chained dependencies Mixed dependencies
# Define a new pull_sales task
pull_sales = BashOperator(
task_id='pullsales_task',
bash_command = 'wget https://salestracking/latestinfo?json',
dag=analytics_dag
)
# Set pull_sales to run prior to cleanup
pull_sales >> cleanup
# Configure consolidate to run after cleanup
consolidate << cleanup
# Set push_data to run last
consolidate >> push_data
Additional operators
PythonOperator
Executes a Python function / callable Operates similarly to BashOperator, with more options Can pass in arguments to the Python code
from airflow.operators.python_operator import PythonOperator
def printme():
print("This goes in the logs!")
python_task = PythonOperator(
task_id = 'simple_print',
python_callale = printme,
dag = example_dag
)
Arguments
Supports arguments to tasks positional keyword
Use the op_kwargs
dictionary
def sleep(length_of_time):
time.sleep(length_of_time)
sleep_task = PythonOperator(
task_id = 'sleep',
python_callable = sleep,
op_kwargs = {'length_of_time':5},
dag = example_dag
)
EmailOperator
Found in the airflow.operators
library
Sends an email
Can contain typical components
HTML content
Attachments
Does require the Airflow system to be configured with email sever details
from airflow.operators.email_operator import EmailOperator
email_task = EmailOperator(
task_id = 'email_sales_report',
subject = 'Automated Sales Report',
html_content = 'Attached is the latest sales report',
files = 'latest_sales.xlsx',
dag = example_dag
)
def pull_file(URL, savepath):
r = requests.get(URL)
with open(savepath, 'wb') as f:
f.write(r.content)
# Use the print method for logging
print(f"File pulled from {URL} and saved to {savepath}")
from airflow.operators.python_operator import PythonOperator
# Create the task
pull_file_task = PythonOperator(
task_id='pull_file',
# Add the callable
python_callable=pull_file,
# Define the arguments
op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'},
dag=process_sales_dag
)
# Add another Python task
parse_file_task = PythonOperator(
task_id='parse_file',
# Set the function to call
python_callable = parse_file,
# Add the arguments
op_kwargs ={'inputfile':'latestsales.json', 'outputfile':'parsedfile.json'},
# Add the DAG
dag=process_sales_dag
)
# Import the Operator
from airflow.operators.email_operator import EmailOperator
# Define the task
email_manager_task = EmailOperator(
task_id='email_manager',
subject='Latest sales JSON',
html_content='Attached is the latest sales JSON file as requested.',
files='parsedfile.json',
dag=process_sales_dag
)
# Set the order of tasks
pull_file_task >> parse_file_task >> email_manager_task
Airflow scheduling
[[DAG Run]]s
A specific instance of a workflow at a point in time
Can be run manually or via schedule_interval
Maintain state for each workflow and the tasks within
running
failed
success
Schedule details
start_date
- The date/time to initially schedule the DAG run - datetime
objects
end_date
- Optional attribute for when to stop running new DAG instances
max_tries
- Optional attribute for how many attempts to make
schedule_interval
- How often to run
Schedule interval
How often to schedule the DAG
Between the start_date
and end_date
Can be define via cron
style syntax or via built-in presets
[[cron]] syntax
Is pulled from the Unix cron format Consists of 5 fields separated by a space ![[Pasted image 20220629152115.png]]
An asterisk *
represents running for every interval
Can be comma separated values in fields for a list of values
cron examples
0 12 * * * # Run daily at noon
* * 25 2 * # Run once per minute on February 25
0,15,30,45 * * * * # Run every 15 minutes
Airflow scheduler presets
@hourly
= 0 * * * *
@daily
= 0 0 * * *
@weekly
= 0 0 * * 0
None
= don't schedule ever, used for manually triggered DAGs
@once
= only schedule once
schedule_interval issues
When scheduling a DAG, Airflow will:
Use the start_date as the earliest possible value
Schedule the task at start_date
+ schedule_interval
(be careful!)
# Update the scheduling arguments as defined
default_args = {
'owner': 'Engineering',
'start_date': datetime(2019, 11, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=20)
}
dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3')
Maintaining and monitoring Airflow workflows
[[Airflow sensor]]s
Sensors
An operator that waits for a certain condition to be true Creation a file Upload of a database record Certain response from a web request
Can define how often to check for the condition to be true
Derived from airflow.sensors.base_sensor_operator
Sensor arguments:
mode
- how to check for the condition
mode = 'poke'
- the default, run repeatedly
mode = 'reschedule'
- give up [[worker slot]] and try again later
poke_interval
= How often to wait between checks - at least 1 min
timeout
- How long to wait before failing task - make sure it is significantly shorter than the schedule interval
Also includes normal operator attributes
File sensor
Part of the airflow.contrib.sensors
library
Checks for the existence of a file at a certain location Could also check if any files exist within a directory
from airflow.contrib.sensors.file_sensor import FileSensor
file_sensor_task = FileSensor(task_id = 'file_sense',
filepath = 'salesdata.csv',
poke_interval = 300,
dag = sales_report_dag)
# 300 means 300s
init_sales_cleanup >> file_sensor_task >> generate_report
Other sensors
ExternalTaskSensor
- wait for a task in another DAG to complete
HttpSensor
- Request a web URL and check for content
SqlSensor
- Runs a SQL query to check for content
Many others in airflow.sensors
and airflow.contrib.sensors
libraries
Why sensors?
Uncertain when it will be true If failure not immediately desired To add task repetition without loops
[[Airflow executor]]s
The execution model Executors run tasks Different executors handle running the tasks differently
Example:
SequentialExecutor
- the default - runs one task at a time; useful for debugging; while functional, not really recommended for production
LocalExecutor
- treats tasks as processes; parallelism defined by the user; can utilize all resources of a given host system
CeleryExecutor
- [[Celery]]: a general queuing system written in Python that allows multiple systems to communicate as a basic cluster; multiple worker systems can be defined; significantly more difficult to setup and configure; extremely powerful for organizations with extensive workflows
command line - know the executor
airflow.cfg
![[Pasted image 20220630114151.png]]
airflow list_dags
e.g. INFO - Using SequentialExecutor
Debugging and troubleshooting in Airflow
Typical issues...
DAG won't run on schedule DAG won't load Syntax errors
DAG won't run on schedule
- Check if scheduler is running
![[Pasted image 20220630115026.png]]
Fix by running airflow scheduler
from the command-line
At least one
schedule_interval
hasn't passed Modify the attributes to meet your requirementsNot enough tasks free within the executor to run
Change executor type
Add system resources
Add more systems
Change DAG scheduling
DAG won't load
DAG not in web UI DAG not in airflow list_dags
Verify DAG file is in correct folder - it must be an absolute path
Determine the DAGs folder via airflow.cfg
Syntax errors
The most common reason Sometimes difficult to find errors in DAG
2 quick methods:
airflow list_dags
python3 <dagfile.py>
![[Pasted image 20220630120041.png]]
SLAs and reporting in Airflow
SLAs
[[Service Level Agreement]] Within Airflow - the amount of time a task or a DAG should require to run An [[SLA Miss]] is any time the task / DAG does not meet the expected timing an email is sent out and a log is stored. can be viewed in the web UI - Browse
Defining SLAs
Using the sla
argument on the task
a timedelta object
task1 = BashOperator(task_id = 'sla_task',
bash_command = 'runcode.sh',
sla = timedelta(seconds = 30),
dag = dag)
On the default_args
dictionary
default_args = {'sla':timedelta(minutes = 20),
'start_date': datetime(2020,2,20)}
dag = DAG('sla_dag', default_args = default_args)
timedelta object
[[datetime]]
In the datetime
library
Accessed via from datetime import timedelta
Take arguments of days, seconds, minutes, hours, and weeks
timedelta(seconds = 30)
timedelta(weeks = 2)
timedelta(days = 4, hours = 10, minutes = 20, seconds = 30)
General reporting
Options for success / failure / error Keys in the default_args dictionary
default_args = {
'email_on_failure': True,
'email_on_retry': False,
'email_on_success': True,
...
}
Within DAGs from the EmailOperator
# Import the timedelta object
from datetime import timedelta
# Create the dictionary entry
default_args = {
'start_date': datetime(2020, 2, 20),
'sla': timedelta(minutes = 30)
}
# Add to the DAG
test_dag = DAG('test_workflow', default_args=default_args, schedule_interval='@None')
# Import the timedelta object
from datetime import timedelta
test_dag = DAG('test_workflow', start_date=datetime(2020,2,20), schedule_interval='@None')
# Create the task with the SLA
task1 = BashOperator(task_id='first_task',
sla =timedelta(hours = 3),
bash_command='initialize_data.sh',
dag=test_dag)
# Define the email task
email_report = EmailOperator(
task_id='email_report',
subject='Airflow Monthly Report',
html_content="""Attached is your monthly workflow report - please refer to it for more detail""",
files=['monthly_report.pdf'],
dag=report_dag
)
# Set the email task to run after the report is generated
email_report << generate_report
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime
default_args={
'email_on_failure': True,
'email_on_success': True
}
report_dag = DAG(
dag_id = 'execute_report',
schedule_interval = "0 0 * * *",
default_args=default_args
)
precheck = FileSensor(
task_id='check_for_datafile',
filepath='salesdata_ready.csv',
start_date=datetime(2020,2,20),
mode='reschedule',
dag=report_dag)
generate_report_task = BashOperator(
task_id='generate_report',
bash_command='generate_report.sh',
start_date=datetime(2020,2,20),
dag=report_dag
)
precheck >> generate_report_task
Building production pipelines in Airflow
Working with templates
What are templates?
Allow substituting information during a DAG run
Provide added flexibility when defining tasks
Are created using the Jinja
templating language - [[Jinja]]
Non-Templated BashOperator example
Templated BashOperator example
templated_command = """
echo "Reading {{ params.filename }}"
"""
# {{}} means information to be substituted
t1 = BashOperator(task_id = 'template_task',
bash_command = templated_command,
params = {'filename': 'file1.txt'},
dag = example_dag)
Exercise:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
'start_date': datetime(2020, 4, 15),
}
cleandata_dag = DAG('cleandata',
default_args=default_args,
schedule_interval='@daily')
# Create a templated command to execute
# 'bash cleandata.sh datestring'
templated_command = """
bash cleandata.sh {{ ds_nodash }}
"""
# Modify clean_task to use the templated command
clean_task = BashOperator(task_id='cleandata_task',
bash_command=templated_command,
params = {ds_nodash},
dag=cleandata_dag)
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
'start_date': datetime(2020, 4, 15),
}
cleandata_dag = DAG('cleandata',
default_args=default_args,
schedule_interval='@daily')
# Modify the templated command to handle a
# second argument called filename.
templated_command = """
bash cleandata.sh {{ ds_nodash }} {{params.filename}}
"""
# Modify clean_task to pass the new argument
clean_task = BashOperator(task_id='cleandata_task',
bash_command=templated_command,
params={'filename': 'salesdata.txt'},
dag=cleandata_dag)
# Create a new BashOperator clean_task2
clean_task2 = BashOperator(task_id='cleandata_task2',
bash_command=templated_command,
params={'filename': 'supportdata.txt'},
dag=cleandata_dag)
# Set the operator dependencies
clean_task >> clean_task2
More templates
cement
- n. 水泥;胶合剂,接合剂;(对关系、看法等起巩固作用的)纽带;牙骨质;(黏附颗粒的)沉积岩基质;(填充牙齿龋洞的)黏固粉
- v. 用(水泥或胶合剂)黏结,胶合;巩固,加强;决定,确立
More advanced template
Jinja
could use for loop
templated_command = """
{% for filename in params.filenames %}
echo "Reading {{ filename }}"
{% endfor %}
"""
# Necessary to end the loop
t1 = BashOperator(task_id = 'template_task',
bash_command = templated_command,
params = {'filenames': ['file1.txt', 'file2.txt']},
dag = example_dag)
Variables
Airflow built-in runtime variables Provides assorted information about DAG runs, tasks, and even the system configuration.
Examples:
Execution date: {{ ds }} # YYYY-MM-DD
Execution Date, no dashes: {{ ds_nodash }} # YYYYMMDD
Previous Execution date: {{ prev_ds }}
Previous Execution date, no dashes: {{ prev_ds_nodash }}
DAG object: {{ dag }}
Airflow config object: {{ conf }}
And more
[[Macros]]
{{ macros }}
A reference to the Airflow macros package which provides various useful objects / methods for Airflow templates
{{ macros.datetime }}
: the datetime.datetime
object
{{ macros.timedelta }}
: the timedelta
object
{{ macros.uuid }}
: Python's uuid
object
{{ macros.ds_add('2020-04-15', 5) }}
: Modify days from a date (could subtract if the number is negative)
Exercise
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
filelist = [f'file{x}.txt' for x in range(30)]
default_args = {
'start_date': datetime(2020, 4, 15),
}
cleandata_dag = DAG('cleandata',
default_args=default_args,
schedule_interval='@daily')
# Modify the template to handle multiple files in a
# single run.
templated_command = """
<% for filename in params.filenames %>
bash cleandata.sh {{ ds_nodash }} {{ filename }};
<% endfor %>
"""
# Modify clean_task to use the templated command
clean_task = BashOperator(task_id='cleandata_task',
bash_command=templated_command,
params={'filenames': filelist},
dag=cleandata_dag)
from airflow.models import DAG
from airflow.operators.email_operator import EmailOperator
from datetime import datetime
# Create the string representing the html email content
html_email_str = """
Date: {{ ds }}
Username: {{ params.username }}
"""
email_dag = DAG('template_email_test',
default_args={'start_date': datetime(2020, 4, 15)},
schedule_interval='@weekly')
email_task = EmailOperator(task_id='email_task',
subject="{{ macros.uuid.uuid4() }}",
html_content= html_email_str,
params={'username': 'testemailuser'},
dag=email_dag)
[[Branching]]
Provides conditional logic in Airflow
By default: BranchPythonOperator
from airflow.operators.python_operator import BranchPythonOperator
Takes a python_callable
to return the next task id (or list of ids) to follow
Branching example
def branch_test(**kwargs):
if int(kwargs['ds_nodash']) % 2 == 0;
return 'even_day_task'
else:
return 'odd_day_task'
branch_task = BranchPythonOperator(task_id = 'branch_task',
dag = dag,
provide_context = True,
python_callable = branch_test)
start_tasl >> branch_task >> even_day_task >> even_day_task2
branch_task >> odd_day_task >> odd_day_task2
![[Pasted image 20220701112150.png]]
Exercise:
# Create a function to determine if years are different
def year_check(**kwargs):
current_year = int(kwargs['ds_nodash'][0:4])
previous_year = int(kwargs['prev_ds_nodash'][0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
python_callable=year_check, provide_context=True)
# Define the dependencies
branch_dag >> current_year_task
branch_dag >> new_year_task
Creating a production pipeline
Running DAGs & Tasks
To run a specific task from command-line:
airflow run <dag_id> <task_id> <date>
To run a full DAG:
airflow trigger_dag -e <date> <dag_id>
Operators
BashOperator - bash_command
PythonOperator - python_callable
BranchPythonOperator - python_callable
and provide_context = True
. The callable must accept **kwargs
FileSensor - requires filepath
argument and might need mode
or poke_interval
attributes
Template
Many objects in Airflow can use templates Certain fields may use templated strings, while others do not. One way to check is to use built-in documentation:
- Open Python3 interpreter
- Import necessary libraries (i.e.
from airflow.operators.bash_operator import BashOperator
) - At prompt, run
help(<Airflow object>)
, i.e.help(BashOperator
- Look for a line that referencing template_fields
![[Pasted image 20220701113416.png]]
Final Exercise:
from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
# Import the needed operators
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import date, datetime
def process_data(**context):
file = open('/home/repl/workspace/processed_data.tmp', 'w')
file.write(f'Data processed on {date.today()}')
file.close()
dag = DAG(dag_id='etl_update', default_args={'start_date': datetime(2020,4,1)})
sensor = FileSensor(task_id='sense_file',
filepath='/home/repl/workspace/startprocess.txt',
poke_interval=5,
timeout=15,
dag=dag)
bash_task = BashOperator(task_id='cleanup_tempfiles',
bash_command='rm -f /home/repl/*.tmp',
dag=dag)
python_task = PythonOperator(task_id='run_processing',
python_callable=process_data,
dag=dag)
sensor >> bash_task >> python_task
from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from dags.process import process_data
from datetime import timedelta, datetime
# Update the default arguments and apply them to the DAG
default_args = {
'start_date': datetime(2019,1,1),
'sla': timedelta(minutes = 90)
}
dag = DAG(dag_id='etl_update', default_args=default_args)
sensor = FileSensor(task_id='sense_file',
filepath='/home/repl/workspace/startprocess.txt',
poke_interval = 45,
dag=dag)
bash_task = BashOperator(task_id='cleanup_tempfiles',
bash_command='rm -f /home/repl/*.tmp',
dag=dag)
python_task = PythonOperator(task_id='run_processing',
python_callable=process_data,
provide_context = True,
dag=dag)
sensor >> bash_task >> python_task
from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from dags.process import process_data
from datetime import datetime, timedelta
# Update the default arguments and apply them to the DAG.
default_args = {
'start_date': datetime(2019,1,1),
'sla': timedelta(minutes=90)
}
dag = DAG(dag_id='etl_update', default_args=default_args)
sensor = FileSensor(task_id='sense_file',
filepath='/home/repl/workspace/startprocess.txt',
poke_interval=45,
dag=dag)
bash_task = BashOperator(task_id='cleanup_tempfiles',
bash_command='rm -f /home/repl/*.tmp',
dag=dag)
python_task = PythonOperator(task_id='run_processing',
python_callable=process_data,
provide_context=True,
dag=dag)
email_subject="""
Email report for {{ params.department }} on {{ ds_nodash }}
"""
email_report_task = EmailOperator(task_id='email_report_task',
subject=email_subject,
html_content='',
params={'department': 'Data subscription services'},
dag=dag)
no_email_task = DummyOperator(task_id='no_email_task', dag=dag)
def check_weekend(**kwargs):
dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d")
# If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat / Sun.
if (dt.weekday() < 5):
return 'email_report_task'
else:
return 'no_email_task'
branch_task = BranchPythonOperator(task_id='check_if_weekend',
provide_context = True,
python_callable = check_weekend,
dag=dag)
sensor >> bash_task >> python_task
python_task >> branch_task >> [email_report_task, no_email_task]