Notes - Introduction to Airflow in Python

Intro to Airflow

Introduction to Airflow

Data engineering is: Taking any action involving data and turning it into a reliable, repeatable, and maintainable process.

[[Workflow]] is a set of steps to accomplish a given data engineering task.

[[Apache Airflow]] is a platform to program workflows.

[[DAG]] stands for Directed Acyclic Graph

Airflow DAGs

What is a DAG?

Directed Acyclic Graph

  1. Directed: inherent flow representing dependencies between components
  2. Acyclic: does not loop
  3. Graph: the actual set of components

DAG in Airflow

  1. Are written in Python (but can use components written in other languages)
  2. Are made up of components (typically tasks) to be executed, such as operators, sensors, etc.
  3. Contain dependencies defined explicitly or implicitly
from airflow.models import DAG
from datetime import datetime
default_arguments = {
'owner':'jdoe',
'email':'[email protected]',
'start_date': datetime(2020, 1, 20)
}
etl_dag = DAG('etl_workflow', default_args = default_arguments)

DAGs on the command line

The airflow command line program contains many subcommands

airflow -h airflow list_dags

Command line vs Python

Cmd line Python
Start Airflow processes Create a DAG
Manually run DAGs / Tasks Edit the individual properties of a DAG
Get logging information from Airflow
# Import the DAG object
from airflow.models import DAG
# Define the default_args dictionary
default_args = {
'owner': 'dsmith',
'start_date': datetime(2020, 1, 14),
'retries': 2
}
# Instantiate the DAG object
etl_dag = DAG('example_etl', default_args=default_args)

Instantiate

Airflow web interface

Implementing Airflow DAGs

[[Airflow operator]]s

Operators

Represent a single task in a workflow Run independently (usually) Generally do not share information between each other Various operators to perform different tasks

[[BashOperator]]

Executes a given Bash command or script

from airflow.operators.bash_operator import BashOperator
BashOperator(
task_id = 'bash_example',
bash_command = 'echo "Example!"',
dag = ml_dag
)
BashOperator(
task_id = 'bash_script_example',
bash_command = 'runcleanup.sh',
dag = ml_dag
)
bash_task = BashOerator(task_id = 'clean_addresses',
bash_command = 'cat addresses.txt | awk "NF==10" > cleaned.txt',
dag = dag)

Runs the command in a temporary directory Can specify environment variables for the command

Operator gotchas

Not guaranteed to run in the same location / environment May require extensive use of Environment variables Can be difficult to run tasks with elevated privileges


# Import the BashOperator
from airflow.operators.bash_operator import BashOperator
# Define the BashOperator
cleanup = BashOperator(
task_id='cleanup_task',
# Define the bash_command
bash_command='cleanup.sh',
# Add the task to the dag
dag=analytics_dag
)
# Define a second operator to run the `consolidate_data.sh` script
consolidate = BashOperator(
task_id='consolidate_task',
bash_command='consolidate_data.sh',
dag=analytics_dag)
# Define a final operator to execute the `push_data.sh` script
push_data = BashOperator(
task_id='pushdata_task',
bash_command='push_data.sh',
dag=analytics_dag)

[[Airflow task]]s

Tasks

Instances of operators Usually assigned to a variable in Python Referred to by the task_id within the Airflow tools

Task dependencies

Define a given order of task completion Are not required for a given workflow, but usually present in most Are referred to as upstream or downstream tasks. In Airflow 1.8 and later, are defined using the bitshift operators >>, or the upstream operator <<, or the downstream operator

Upstream = before Downstream = after

task1 >> task2
# or task2 << task1

Multiple dependencies

![[Pasted image 20220629143605.png]]

Chained dependencies Mixed dependencies

# Define a new pull_sales task
pull_sales = BashOperator(
task_id='pullsales_task',
bash_command = 'wget https://salestracking/latestinfo?json',
dag=analytics_dag
)
# Set pull_sales to run prior to cleanup
pull_sales >> cleanup
# Configure consolidate to run after cleanup
consolidate << cleanup
# Set push_data to run last
consolidate >> push_data

Additional operators

PythonOperator

Executes a Python function / callable Operates similarly to BashOperator, with more options Can pass in arguments to the Python code

from airflow.operators.python_operator import PythonOperator
def printme():
print("This goes in the logs!")
python_task = PythonOperator(
task_id = 'simple_print',
python_callale = printme,
dag = example_dag
)

Arguments

Supports arguments to tasks positional keyword

Use the op_kwargs dictionary

def sleep(length_of_time):
time.sleep(length_of_time)
sleep_task = PythonOperator(
task_id = 'sleep',
python_callable = sleep,
op_kwargs = {'length_of_time':5},
dag = example_dag
)

EmailOperator

Found in the airflow.operators library Sends an email Can contain typical components HTML content Attachments Does require the Airflow system to be configured with email sever details

from airflow.operators.email_operator import EmailOperator
email_task = EmailOperator(
task_id = 'email_sales_report',
subject = 'Automated Sales Report',
html_content = 'Attached is the latest sales report',
files = 'latest_sales.xlsx',
dag = example_dag
)

def pull_file(URL, savepath):
r = requests.get(URL)
with open(savepath, 'wb') as f:
f.write(r.content)
# Use the print method for logging
print(f"File pulled from {URL} and saved to {savepath}")
from airflow.operators.python_operator import PythonOperator
# Create the task
pull_file_task = PythonOperator(
task_id='pull_file',
# Add the callable
python_callable=pull_file,
# Define the arguments
op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'},
dag=process_sales_dag
)
# Add another Python task
parse_file_task = PythonOperator(
task_id='parse_file',
# Set the function to call
python_callable = parse_file,
# Add the arguments
op_kwargs ={'inputfile':'latestsales.json', 'outputfile':'parsedfile.json'},
# Add the DAG
dag=process_sales_dag
)
# Import the Operator
from airflow.operators.email_operator import EmailOperator
# Define the task
email_manager_task = EmailOperator(
task_id='email_manager',
subject='Latest sales JSON',
html_content='Attached is the latest sales JSON file as requested.',
files='parsedfile.json',
dag=process_sales_dag
)
# Set the order of tasks
pull_file_task >> parse_file_task >> email_manager_task

Airflow scheduling

[[DAG Run]]s

A specific instance of a workflow at a point in time Can be run manually or via schedule_interval Maintain state for each workflow and the tasks within running failed success

Schedule details

start_date - The date/time to initially schedule the DAG run - datetime objects end_date - Optional attribute for when to stop running new DAG instances max_tries - Optional attribute for how many attempts to make schedule_interval - How often to run

Schedule interval

How often to schedule the DAG Between the start_date and end_date Can be define via cron style syntax or via built-in presets

[[cron]] syntax

Is pulled from the Unix cron format Consists of 5 fields separated by a space ![[Pasted image 20220629152115.png]]

An asterisk * represents running for every interval Can be comma separated values in fields for a list of values

cron examples

0 12 * * * # Run daily at noon
* * 25 2 * # Run once per minute on February 25
0,15,30,45 * * * * # Run every 15 minutes

Airflow scheduler presets

@hourly = 0 * * * * @daily = 0 0 * * * @weekly = 0 0 * * 0

None = don't schedule ever, used for manually triggered DAGs @once = only schedule once

schedule_interval issues

When scheduling a DAG, Airflow will: Use the start_date as the earliest possible value Schedule the task at start_date + schedule_interval (be careful!)

# Update the scheduling arguments as defined
default_args = {
'owner': 'Engineering',
'start_date': datetime(2019, 11, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=20)
}
dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3')

Maintaining and monitoring Airflow workflows

[[Airflow sensor]]s

Sensors

An operator that waits for a certain condition to be true Creation a file Upload of a database record Certain response from a web request

Can define how often to check for the condition to be true

Derived from airflow.sensors.base_sensor_operator

Sensor arguments: mode - how to check for the condition mode = 'poke' - the default, run repeatedly mode = 'reschedule' - give up [[worker slot]] and try again later

poke_interval = How often to wait between checks - at least 1 min

timeout - How long to wait before failing task - make sure it is significantly shorter than the schedule interval

Also includes normal operator attributes

File sensor

Part of the airflow.contrib.sensors library

Checks for the existence of a file at a certain location Could also check if any files exist within a directory

from airflow.contrib.sensors.file_sensor import FileSensor
file_sensor_task = FileSensor(task_id = 'file_sense',
filepath = 'salesdata.csv',
poke_interval = 300,
dag = sales_report_dag)
# 300 means 300s
init_sales_cleanup >> file_sensor_task >> generate_report

Other sensors

ExternalTaskSensor - wait for a task in another DAG to complete

HttpSensor - Request a web URL and check for content

SqlSensor - Runs a SQL query to check for content

Many others in airflow.sensors and airflow.contrib.sensors libraries

Why sensors?

Uncertain when it will be true If failure not immediately desired To add task repetition without loops

[[Airflow executor]]s

The execution model Executors run tasks Different executors handle running the tasks differently

Example: SequentialExecutor - the default - runs one task at a time; useful for debugging; while functional, not really recommended for production LocalExecutor - treats tasks as processes; parallelism defined by the user; can utilize all resources of a given host system CeleryExecutor - [[Celery]]: a general queuing system written in Python that allows multiple systems to communicate as a basic cluster; multiple worker systems can be defined; significantly more difficult to setup and configure; extremely powerful for organizations with extensive workflows

command line - know the executor

airflow.cfg

![[Pasted image 20220630114151.png]]

airflow list_dags e.g. INFO - Using SequentialExecutor

Debugging and troubleshooting in Airflow

Typical issues...

DAG won't run on schedule DAG won't load Syntax errors

DAG won't run on schedule

  1. Check if scheduler is running

![[Pasted image 20220630115026.png]]

Fix by running airflow scheduler from the command-line

  1. At least one schedule_interval hasn't passed Modify the attributes to meet your requirements

  2. Not enough tasks free within the executor to run

  3. Change executor type

  4. Add system resources

  5. Add more systems

  6. Change DAG scheduling

DAG won't load

DAG not in web UI DAG not in airflow list_dags

Verify DAG file is in correct folder - it must be an absolute path Determine the DAGs folder via airflow.cfg

Syntax errors

The most common reason Sometimes difficult to find errors in DAG

2 quick methods: airflow list_dags python3 <dagfile.py>

![[Pasted image 20220630120041.png]]

SLAs and reporting in Airflow

SLAs

[[Service Level Agreement]] Within Airflow - the amount of time a task or a DAG should require to run An [[SLA Miss]] is any time the task / DAG does not meet the expected timing an email is sent out and a log is stored. can be viewed in the web UI - Browse

Defining SLAs

Using the sla argument on the task a timedelta object

task1 = BashOperator(task_id = 'sla_task',
bash_command = 'runcode.sh',
sla = timedelta(seconds = 30),
dag = dag)

On the default_args dictionary

default_args = {'sla':timedelta(minutes = 20),
'start_date': datetime(2020,2,20)}
dag = DAG('sla_dag', default_args = default_args)

timedelta object

[[datetime]]

In the datetime library Accessed via from datetime import timedelta Take arguments of days, seconds, minutes, hours, and weeks

timedelta(seconds = 30)
timedelta(weeks = 2)
timedelta(days = 4, hours = 10, minutes = 20, seconds = 30)

General reporting

Options for success / failure / error Keys in the default_args dictionary

default_args = {
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'email_on_success': True,
...
}

Within DAGs from the EmailOperator


# Import the timedelta object
from datetime import timedelta
# Create the dictionary entry
default_args = {
'start_date': datetime(2020, 2, 20),
'sla': timedelta(minutes = 30)
}
# Add to the DAG
test_dag = DAG('test_workflow', default_args=default_args, schedule_interval='@None')
# Import the timedelta object
from datetime import timedelta
test_dag = DAG('test_workflow', start_date=datetime(2020,2,20), schedule_interval='@None')
# Create the task with the SLA
task1 = BashOperator(task_id='first_task',
sla =timedelta(hours = 3),
bash_command='initialize_data.sh',
dag=test_dag)
# Define the email task
email_report = EmailOperator(
task_id='email_report',
subject='Airflow Monthly Report',
html_content="""Attached is your monthly workflow report - please refer to it for more detail""",
files=['monthly_report.pdf'],
dag=report_dag
)
# Set the email task to run after the report is generated
email_report << generate_report
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime
default_args={
    'email': ['[email protected]', '[email protected]'],
    'email_on_failure': True,
    'email_on_success': True
}
report_dag = DAG(
    dag_id = 'execute_report',
    schedule_interval = "0 0 * * *",
    default_args=default_args
)
precheck = FileSensor(
    task_id='check_for_datafile',
    filepath='salesdata_ready.csv',
    start_date=datetime(2020,2,20),
    mode='reschedule',
    dag=report_dag)
generate_report_task = BashOperator(
    task_id='generate_report',
    bash_command='generate_report.sh',
    start_date=datetime(2020,2,20),
    dag=report_dag
)
precheck >> generate_report_task

Building production pipelines in Airflow

Working with templates

What are templates?

Allow substituting information during a DAG run Provide added flexibility when defining tasks Are created using the Jinja templating language - [[Jinja]]

Non-Templated BashOperator example

Templated BashOperator example

templated_command = """
echo "Reading {{ params.filename }}"
"""
# {{}} means information to be substituted
t1 = BashOperator(task_id = 'template_task',
bash_command = templated_command,
params = {'filename': 'file1.txt'},
dag = example_dag)

Exercise:

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
  'start_date': datetime(2020, 4, 15),
}
cleandata_dag = DAG('cleandata',
                    default_args=default_args,
                    schedule_interval='@daily')
# Create a templated command to execute
# 'bash cleandata.sh datestring'
templated_command = """
    bash cleandata.sh {{ ds_nodash }}
"""
# Modify clean_task to use the templated command
clean_task = BashOperator(task_id='cleandata_task',
                          bash_command=templated_command,
                          params = {ds_nodash},
                          dag=cleandata_dag)
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
  'start_date': datetime(2020, 4, 15),
}
cleandata_dag = DAG('cleandata',
                    default_args=default_args,
                    schedule_interval='@daily')
# Modify the templated command to handle a
# second argument called filename.
templated_command = """
  bash cleandata.sh {{ ds_nodash }} {{params.filename}}
"""
# Modify clean_task to pass the new argument
clean_task = BashOperator(task_id='cleandata_task',
                          bash_command=templated_command,
                          params={'filename': 'salesdata.txt'},
                          dag=cleandata_dag)
# Create a new BashOperator clean_task2
clean_task2 = BashOperator(task_id='cleandata_task2',
                            bash_command=templated_command,
                          params={'filename': 'supportdata.txt'},
                          dag=cleandata_dag)
# Set the operator dependencies
clean_task >> clean_task2

More templates

cement

  • n. 水泥;胶合剂,接合剂;(对关系、看法等起巩固作用的)纽带;牙骨质;(黏附颗粒的)沉积岩基质;(填充牙齿龋洞的)黏固粉
  • v. 用(水泥或胶合剂)黏结,胶合;巩固,加强;决定,确立

More advanced template

Jinja could use for loop

templated_command = """
{% for filename in params.filenames %}
echo "Reading {{ filename }}"
{% endfor %}
"""
# Necessary to end the loop
t1 = BashOperator(task_id = 'template_task',
bash_command = templated_command,
params = {'filenames': ['file1.txt', 'file2.txt']},
dag = example_dag)

Variables

Airflow built-in runtime variables Provides assorted information about DAG runs, tasks, and even the system configuration.

Examples:

Execution date: {{ ds }} # YYYY-MM-DD
Execution Date, no dashes: {{ ds_nodash }} # YYYYMMDD
Previous Execution date: {{ prev_ds }}
Previous Execution date, no dashes: {{ prev_ds_nodash }}
DAG object: {{ dag }}
Airflow config object: {{ conf }}
And more

[[Macros]]

{{ macros }} A reference to the Airflow macros package which provides various useful objects / methods for Airflow templates

{{ macros.datetime }}: the datetime.datetime object {{ macros.timedelta }}: the timedelta object {{ macros.uuid }} : Python's uuid object {{ macros.ds_add('2020-04-15', 5) }}: Modify days from a date (could subtract if the number is negative)


Exercise

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
filelist = [f'file{x}.txt' for x in range(30)]
default_args = {
  'start_date': datetime(2020, 4, 15),
}
cleandata_dag = DAG('cleandata',
                    default_args=default_args,
                    schedule_interval='@daily')
# Modify the template to handle multiple files in a 
# single run.
templated_command = """
  <% for filename in params.filenames %>
  bash cleandata.sh {{ ds_nodash }} {{ filename }};
  <% endfor %>
"""
# Modify clean_task to use the templated command
clean_task = BashOperator(task_id='cleandata_task',
                          bash_command=templated_command,
                          params={'filenames': filelist},
                          dag=cleandata_dag)
from airflow.models import DAG
from airflow.operators.email_operator import EmailOperator
from datetime import datetime
# Create the string representing the html email content
html_email_str = """
Date: {{ ds }}
Username: {{ params.username }}
"""
email_dag = DAG('template_email_test',
                default_args={'start_date': datetime(2020, 4, 15)},
                schedule_interval='@weekly')
email_task = EmailOperator(task_id='email_task',
                           to='[email protected]',
                           subject="{{ macros.uuid.uuid4() }}",
                           html_content= html_email_str,
                           params={'username': 'testemailuser'},
                           dag=email_dag)

[[Branching]]

Provides conditional logic in Airflow By default: BranchPythonOperator from airflow.operators.python_operator import BranchPythonOperator

Takes a python_callable to return the next task id (or list of ids) to follow

Branching example

def branch_test(**kwargs):
if int(kwargs['ds_nodash']) % 2 == 0;
return 'even_day_task'
else:
return 'odd_day_task'
branch_task = BranchPythonOperator(task_id = 'branch_task',
dag = dag,
provide_context = True,
python_callable = branch_test)
start_tasl >> branch_task >> even_day_task >> even_day_task2
branch_task >> odd_day_task >> odd_day_task2

![[Pasted image 20220701112150.png]]


Exercise:

# Create a function to determine if years are different
def year_check(**kwargs):
current_year = int(kwargs['ds_nodash'][0:4])
previous_year = int(kwargs['prev_ds_nodash'][0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
python_callable=year_check, provide_context=True)
# Define the dependencies
branch_dag >> current_year_task
branch_dag >> new_year_task

Creating a production pipeline

Running DAGs & Tasks

To run a specific task from command-line: airflow run <dag_id> <task_id> <date>

To run a full DAG: airflow trigger_dag -e <date> <dag_id>

Operators

BashOperator - bash_command PythonOperator - python_callable BranchPythonOperator - python_callable and provide_context = True. The callable must accept **kwargs FileSensor - requires filepath argument and might need mode or poke_interval attributes

Template

Many objects in Airflow can use templates Certain fields may use templated strings, while others do not. One way to check is to use built-in documentation:

  1. Open Python3 interpreter
  2. Import necessary libraries (i.e. from airflow.operators.bash_operator import BashOperator)
  3. At prompt, run help(<Airflow object>), i.e. help(BashOperator
  4. Look for a line that referencing template_fields

![[Pasted image 20220701113416.png]]


Final Exercise:

from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
# Import the needed operators
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import date, datetime
def process_data(**context):
  file = open('/home/repl/workspace/processed_data.tmp', 'w')
  file.write(f'Data processed on {date.today()}')
  file.close()
dag = DAG(dag_id='etl_update', default_args={'start_date': datetime(2020,4,1)})
sensor = FileSensor(task_id='sense_file', 
                    filepath='/home/repl/workspace/startprocess.txt',
                    poke_interval=5,
                    timeout=15,
                    dag=dag)
bash_task = BashOperator(task_id='cleanup_tempfiles', 
                         bash_command='rm -f /home/repl/*.tmp',
                         dag=dag)
python_task = PythonOperator(task_id='run_processing', 
                             python_callable=process_data,
                             dag=dag)
sensor >> bash_task >> python_task
from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from dags.process import process_data
from datetime import timedelta, datetime
# Update the default arguments and apply them to the DAG
default_args = {
  'start_date': datetime(2019,1,1),
  'sla': timedelta(minutes = 90)
}
dag = DAG(dag_id='etl_update', default_args=default_args)
sensor = FileSensor(task_id='sense_file', 
                    filepath='/home/repl/workspace/startprocess.txt',
                    poke_interval = 45,
                    dag=dag)
bash_task = BashOperator(task_id='cleanup_tempfiles', 
                         bash_command='rm -f /home/repl/*.tmp',
                         dag=dag)
python_task = PythonOperator(task_id='run_processing', 
                             python_callable=process_data,
                             provide_context = True,
                             dag=dag)
sensor >> bash_task >> python_task
from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from dags.process import process_data
from datetime import datetime, timedelta
# Update the default arguments and apply them to the DAG.
default_args = {
  'start_date': datetime(2019,1,1),
  'sla': timedelta(minutes=90)
}
dag = DAG(dag_id='etl_update', default_args=default_args)
sensor = FileSensor(task_id='sense_file', 
                    filepath='/home/repl/workspace/startprocess.txt',
                    poke_interval=45,
                    dag=dag)
bash_task = BashOperator(task_id='cleanup_tempfiles', 
                         bash_command='rm -f /home/repl/*.tmp',
                         dag=dag)
python_task = PythonOperator(task_id='run_processing', 
                             python_callable=process_data,
                             provide_context=True,
                             dag=dag)
email_subject="""
  Email report for {{ params.department }} on {{ ds_nodash }}
"""
email_report_task = EmailOperator(task_id='email_report_task',
                                  to='[email protected]',
                                  subject=email_subject,
                                  html_content='',
                                  params={'department': 'Data subscription services'},
                                  dag=dag)
no_email_task = DummyOperator(task_id='no_email_task', dag=dag)
def check_weekend(**kwargs):
    dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d")
    # If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat / Sun.
    if (dt.weekday() < 5):
        return 'email_report_task'
    else:
        return 'no_email_task'
branch_task = BranchPythonOperator(task_id='check_if_weekend',
                                   provide_context = True,
                                   python_callable = check_weekend,
                                   dag=dag)
sensor >> bash_task >> python_task
python_task >> branch_task >> [email_report_task, no_email_task]
All rights reserved
Except where otherwise noted, content on this page is copyrighted.