Airflow 101: working locally and familiarise with the tool¶
Pre-requisites¶
The following prerequisites are needed:
- Libraries detailed in the Setting up section (either via conda or pipenv)
- MySQL installed
- text editor
- command line
Getting your environment up and running¶
If you followed the instructions you should have Airflow installed as well as the rest of the packages we will be using.
So let’s get our environment up and running:
If you are using conda start your environment via:
$ source activate airflow-env
If using pipenv then:
$ pipenv shell
this will start a shell within a virtual environment, to exit the shell you need to type exit
and this will exit the virtual environment.
Starting Airflow locally¶
Airflow home lives in ~/airflow
by default, but you can change the location before installing airflow. You first need to set the AIRFLOW_HOME
environment variable and then install airflow. For example, using pip:
export AIRFLOW_HOME=~/mydir/airflow
# install from PyPI using pip
pip install apache-airflow
once you have completed the installation you should see something like this in the airflow
directory (wherever it lives for you)
drwxr-xr-x - myuser 18 Apr 14:02 .
.rw-r--r-- 26k myuser 18 Apr 14:02 ├── airflow.cfg
drwxr-xr-x - myuser 18 Apr 14:02 ├── logs
drwxr-xr-x - myuser 18 Apr 14:02 │ └── scheduler
drwxr-xr-x - myuser 18 Apr 14:02 │ ├── 2019-04-18
lrwxr-xr-x 46 myuser 18 Apr 14:02 │ └── latest -> /Users/myuser/airflow/logs/scheduler/2019-04-18
.rw-r--r-- 2.5k myuser 18 Apr 14:02 └── unittests.cfg
We need to create a local dag folder:
mkdir ~/airflow/dags
As your project evolves, your directory will look something like this:
airflow # the root directory.
├── dags # root folder for all dags. files inside folders are not searched for dags.
│ ├── my_dag.py, # my dag (definitions of tasks/operators) including precedence.
│ └── ...
├── logs # logs for the various tasks that are run
│ └── my_dag # DAG specific logs
│ │ ├── src1_s3 # folder for task-specific logs (log files are created by date of a run)
│ │ ├── src2_hdfs
│ │ ├── src3_s3
│ │ └── spark_task_etl
├── airflow.db # SQLite database used by Airflow internally to track the status of each DAG.
├── airflow.cfg # global configuration for Airflow (this can be overridden by config inside the file.)
└── ...
Prepare your database¶
As we mentioned before Airflow uses a database to keep track of the tasks and their statuses. So it is critical to have one set up.
To start the default database we can run
airflow initdb
. This will initialize your database via alembic so that it matches the latest Airflow release.
The default database used is sqlite
which means you cannot parallelize tasks using this database. Since we have MySQL and MySQL client installed we will set them up so that we can use them with airflow.
🚦Create an airflow database
From the command line:
MySQL -u root -p
mysql> CREATE DATABASE airflow CHARACTER SET utf8 COLLATE utf8_unicode_ci;
mysql> GRANT ALL PRIVILEGES ON airflow.* To 'airflow'@'localhost';
mysql> FLUSH PRIVILEGES;
and initialize the database:
airflow initdb
Notice that this will fail with the default airflow.cfg
Update your local configuration¶
Open your airflow configuration file ~/airflow/airflow.cf
and make the following changes:
executor = CeleryExecutor
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
# needs rabbitmq running
broker_url = amqp://guest:guest@127.0.0.1/
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
sql_alchemy_conn = mysql://airflow:python2019@localhost:3306/airflow
Here we are replacing the default executor (SequentialExecutor
) with the CeleryExecutor
so that we can run multiple DAGs in parallel.
We also replace the default sqlite
database with our newly created airflow
database.
Now we can initialize the database:
airflow initdb
Let’s now start the web server locally:
airflow webserver -p 8080
we can head over to http://localhost:8080 now and you will see that there are a number of examples DAGS already there.
🚦 Take some time to familiarise with the UI and get your local instance set up
Now let’s have a look at the connections (http://localhost:8080/admin/connection/) go to admin > connections
. You should be able to see a number of connections available. For this tutorial, we will use some of the connections including mysql
.
Commands¶
Let us go over some of the commands. Back on your command line:
airflow list_dags
we can list the DAG tasks in a tree view
airflow list_tasks tutorial --tree
we can tests the dags too, but we will need to set a date parameter so that this executes:
airflow test tutorial print_date 2019-05-01
(note that you cannot use a future date or you will get an error)
airflow test tutorial templated 2019-05-01
By using the test commands these are not saved in the database.
Now let’s start the scheduler:
airflow scheduler
Behind the scenes, it monitors and stays in sync with a folder for all DAG objects it contains. The Airflow scheduler is designed to run as a service in an Airflow production environment.
Now with the schedule up and running we can trigger an instance:
$ airflow run airflow run example_bash_operator runme_0 2015-01-01
This will be stored in the database and you can see the change of the status change straight away.
What would happen for example if we wanted to run or trigger the tutorial
task? 🤔
Let’s try from the CLI and see what happens.
airflow trigger_dag tutorial
Writing your first DAG¶
Let’s create our first simple DAG.
Inside the dag directory (~/airflow/dags)
create a simple_dag.py
file.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
def print_hello():
return "Hello world!"
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2019, 4, 30),
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=2),
}
dag = DAG(
"hello_world",
description="Simple tutorial DAG",
schedule_interval="0 12 * * *",
default_args=default_args,
catchup=False,
)
t1 = DummyOperator(task_id="dummy_task", retries=3, dag=dag)
t2 = PythonOperator(task_id="hello_task", python_callable=print_hello, dag=dag)
# sets downstream foe t1
t1 >> t2
# equivalent
# t2.set_upstream(t1)
If it is properly setup you should be able to see this straight away on your instance.
Now let’s create a DAG from the previous ETL pipeline (kind of)¶
All hands on - check the solutions