Airflow 101: working locally and familiarise with the tool

Pre-requisites

The following prerequisites are needed:

  • Libraries detailed in the Setting up section (either via conda or pipenv)
  • MySQL installed
  • text editor
  • command line

Getting your environment up and running

If you followed the instructions you should have Airflow installed as well as the rest of the packages we will be using.

So let’s get our environment up and running:

If you are using conda start your environment via:

$ source activate airflow-env

If using pipenv then:

$ pipenv shell

this will start a shell within a virtual environment, to exit the shell you need to type exit and this will exit the virtual environment.

Starting Airflow locally

Airflow home lives in ~/airflow by default, but you can change the location before installing airflow. You first need to set the AIRFLOW_HOME environment variable and then install airflow. For example, using pip:

export AIRFLOW_HOME=~/mydir/airflow

# install from PyPI using pip
pip install apache-airflow

once you have completed the installation you should see something like this in the airflow directory (wherever it lives for you)

drwxr-xr-x    - myuser 18 Apr 14:02 .
.rw-r--r--  26k myuser 18 Apr 14:02 ├── airflow.cfg
drwxr-xr-x    - myuser 18 Apr 14:02 ├── logs
drwxr-xr-x    - myuser 18 Apr 14:02 │  └── scheduler
drwxr-xr-x    - myuser 18 Apr 14:02 │     ├── 2019-04-18
lrwxr-xr-x   46 myuser 18 Apr 14:02 │     └── latest -> /Users/myuser/airflow/logs/scheduler/2019-04-18
.rw-r--r-- 2.5k myuser 18 Apr 14:02 └── unittests.cfg

We need to create a local dag folder:

mkdir ~/airflow/dags

As your project evolves, your directory will look something like this:

airflow                  # the root directory.
├── dags                 # root folder for all dags. files inside folders are not searched for dags.
│   ├── my_dag.py, # my dag (definitions of tasks/operators) including precedence.
│   └── ...
├── logs                 # logs for the various tasks that are run
│   └── my_dag           # DAG specific logs
│   │   ├── src1_s3      # folder for task-specific logs (log files are created by date of a run)
│   │   ├── src2_hdfs
│   │   ├── src3_s3
│   │   └── spark_task_etl
├── airflow.db           # SQLite database used by Airflow internally to track the status of each DAG.
├── airflow.cfg          # global configuration for Airflow (this can be overridden by config inside the file.)
└── ...

Prepare your database

As we mentioned before Airflow uses a database to keep track of the tasks and their statuses. So it is critical to have one set up.

To start the default database we can run airflow initdb. This will initialize your database via alembic so that it matches the latest Airflow release.

The default database used is sqlite which means you cannot parallelize tasks using this database. Since we have MySQL and MySQL client installed we will set them up so that we can use them with airflow.

🚦Create an airflow database

From the command line:

MySQL -u root -p
mysql> CREATE DATABASE airflow CHARACTER SET utf8 COLLATE utf8_unicode_ci;
mysql> GRANT ALL PRIVILEGES ON airflow.* To 'airflow'@'localhost';
mysql> FLUSH PRIVILEGES;

and initialize the database:

airflow initdb

Notice that this will fail with the default airflow.cfg

Update your local configuration

Open your airflow configuration file ~/airflow/airflow.cf and make the following changes:

executor = CeleryExecutor
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
# needs rabbitmq running
broker_url = amqp://guest:guest@127.0.0.1/


# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
result_backend = db+mysql://airflow:airflow@localhost:3306/airflow

sql_alchemy_conn = mysql://airflow:python2019@localhost:3306/airflow

Here we are replacing the default executor (SequentialExecutor) with the CeleryExecutor so that we can run multiple DAGs in parallel. We also replace the default sqlite database with our newly created airflow database.

Now we can initialize the database:

airflow initdb

Let’s now start the web server locally:

airflow webserver -p 8080

we can head over to http://localhost:8080 now and you will see that there are a number of examples DAGS already there.

🚦 Take some time to familiarise with the UI and get your local instance set up

Now let’s have a look at the connections (http://localhost:8080/admin/connection/) go to admin > connections. You should be able to see a number of connections available. For this tutorial, we will use some of the connections including mysql.

Commands

Let us go over some of the commands. Back on your command line:

airflow list_dags

we can list the DAG tasks in a tree view

airflow list_tasks tutorial --tree

we can tests the dags too, but we will need to set a date parameter so that this executes:

airflow test tutorial print_date 2019-05-01

(note that you cannot use a future date or you will get an error)

airflow test tutorial templated 2019-05-01

By using the test commands these are not saved in the database.

Now let’s start the scheduler:

airflow scheduler

Behind the scenes, it monitors and stays in sync with a folder for all DAG objects it contains. The Airflow scheduler is designed to run as a service in an Airflow production environment.

Now with the schedule up and running we can trigger an instance:

$ airflow run airflow run example_bash_operator runme_0 2015-01-01

This will be stored in the database and you can see the change of the status change straight away.

What would happen for example if we wanted to run or trigger the tutorial task? 🤔

Let’s try from the CLI and see what happens.

airflow trigger_dag tutorial

Writing your first DAG

Let’s create our first simple DAG. Inside the dag directory (~/airflow/dags) create a simple_dag.py file.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator


def print_hello():
    return "Hello world!"


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2019, 4, 30),
    "email": ["airflow@example.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=2),
}

dag = DAG(
    "hello_world",
    description="Simple tutorial DAG",
    schedule_interval="0 12 * * *",
    default_args=default_args,
    catchup=False,
)

t1 = DummyOperator(task_id="dummy_task", retries=3, dag=dag)

t2 = PythonOperator(task_id="hello_task", python_callable=print_hello, dag=dag)

# sets downstream foe t1
t1 >> t2

# equivalent
# t2.set_upstream(t1)

If it is properly setup you should be able to see this straight away on your instance.

Now let’s create a DAG from the previous ETL pipeline (kind of)

All hands on - check the solutions