Airflow Celery

broken image


CeleryExecutor is one of the ways you can scale out the number of workers. For thisto work, you need to setup a Celery backend (RabbitMQ, Redis, …) andchange your airflow.cfg to point the executor parameter toCeleryExecutor and provide the related Celery settings.

For more information about setting up a Celery broker, refer to theexhaustive Celery documentation on the topic.

Airflow with Celery executor architecture Thanks to Airflow's modularity, every node can be installed in a separate hosts / container. The diagram below shows Airflow architecture with Celery executor: Starting from Airflow version 1.10.7, webserver can be stateless. AirflowException: Celery command failed - The recorded hostname does not match this instance's hostname.

Here are a few imperative requirements for your workers:

  • airflow needs to be installed, and the CLI needs to be in the path

  • Airflow configuration settings should be homogeneous across the cluster

  • Operators that are executed on the worker need to have their dependenciesmet in that context. For example, if you use the HiveOperator,the hive CLI needs to be installed on that box, or if you use theMySqlOperator, the required Python library needs to be available inthe PYTHONPATH somehow

  • The worker needs to have access to its DAGS_FOLDER, and you need tosynchronize the filesystems by your own means. A common setup would be tostore your DAGS_FOLDER in a Git repository and sync it across machines usingChef, Puppet, Ansible, or whatever you use to configure machines in yourenvironment. If all your boxes have a common mount point, having yourpipelines files shared there should work as well

To kick off a worker, you need to setup Airflow and kick off the workersubcommand

Your worker should start picking up tasks as soon as they get fired inits direction.

Note that you can also run 'Celery Flower', a web UI built on top of Celery,to monitor your workers. You can use the shortcut command airflowflowerto start a Flower web server.

Please note that you must have the flower python library already installed on your system. The recommend way is to install the airflow celery bundle.

Some caveats:

  • Make sure to use a database backed result backend

  • Make sure to set a visibility timeout in [celery_broker_transport_options] that exceeds the ETA of your longest running task

  • Tasks can consume resources. Make sure your worker has enough resources to run worker_concurrency tasks

  • Queue names are limited to 256 characters, but each broker backend might have its own restrictions

Architecture¶

Airflow consist of several components:

  • Workers - Execute the assigned tasks

  • Scheduler - Responsible for adding the necessary tasks to the queue

  • Web server - HTTP Server provides access to DAG/task status information

  • Database - Contains information about the status of tasks, DAGs, Variables, connections, etc.

  • Celery - Queue mechanism

Please note that the queue at Celery consists of two components:

  • Broker - Stores commands for execution

  • Result backend - Stores status of completed commands

The components communicate with each other in many places

  • [1] Web server –> Workers - Fetches task execution logs

  • [2] Web server –> DAG files - Reveal the DAG structure

  • [3] Web server –> Database - Fetch the status of the tasks

  • [4] Workers –> DAG files - Reveal the DAG structure and execute the tasks

  • [5] Workers –> Database - Gets and stores information about connection configuration, variables and XCOM.

  • [6] Workers –> Celery's result backend - Saves the status of tasks

  • [7] Workers –> Celery's broker - Stores commands for execution

  • [8] Scheduler –> Database - Store a DAG run and related tasks

  • [9] Scheduler –> DAG files - Reveal the DAG structure and execute the tasks

  • [10] Scheduler –> Celery's result backend - Gets information about the status of completed tasks

  • [11] Scheduler –> Celery's broker - Put the commands to be executed

The life of a distributed task instance

Discover what happens when Apache Airflow performs task distribution on Celery workers through RabbitMQ queues.

Apache Airflow is a tool to create workflows such as an extract-load-transform pipeline on AWS. A workflow is a directed acyclic graph (DAG) of tasks and Airflow has the ability to distribute tasks on a cluster of nodes. Let's see how it does that.

RabbitMQ is a message broker. Its job is to manage communication between multiple services by operating message queues. It provides an API for other services to publish and to subscribe to the queues.

Celery is a task queue. It can distribute tasks on multiple workers by using a protocol to transfer jobs from the main application to Celery workers. It relies on a message broker to transfer the messages.

Inside Apache Airflow, tasks are carried out by an executor. The main types of executors are:

  • Sequential Executor: Each task is run locally (on the same machine as the scheduler) in its own python subprocess. They are run sequentially which means that only one task can be executed at a time. It is the default executor.
  • Local Executor: It is the same as the sequential executor except that multiple tasks can run in parallel. It needs a metadata database (where DAGs and tasks status are stored) that supports parallelism like MySQL. Setting such a database requires some extra work since the default configuration uses SQLite.
  • Celery Executor: The workload is distributed on multiple celery workers which can run on different machines. It is the executor you should use for availability and scalability.

Distributed Apache Airflow Architecture

Apache Airflow is split into different processes which run independently from each other.

When setting up Apache Airflow with the celery executor to use a distributed architecture, you have to launch a bunch of these processes and other services:

  • A metadata database (MySQL): it contains the status of the DAG runs and task instances.
  • Airflow web server: a web interface to query the metadata to monitor and execute DAGs.
  • Airflow scheduler: checks the status of the DAGs and tasks in the metadata database, create new ones if necessary and sends the tasks to the queues.
  • A message broker (RabbitMQ): it stores the task commands to be run in queues.
  • Airflow Celery workers: they retrieve the commands from the queues, execute them and update the metadata.

You can look at Clairvoyant blog to set up everything.

In the end, a typical architecture looks like:

Journey of a task instance

Let's now dive deeper into each step to understand the complete process and to know how to monitor and debug them.

1. In the beginning, the scheduler creates a new task instance in the metadata database with the scheduled state:

2. Then the scheduler uses the Celery Executor which sends the task to the message broker. RabbitMQ queues can be explored from the management UI on port 15672. You can see the number of messages in the queue and open each message.

If you look closely at the payload created by the Celery Executor, you will see that it contains the command that the celery worker should execute:

airflow run tutorial Hello_World 2019-03-24T13:52:56.271392+00:00

3. The celery worker then receives the command from the queue. It is now empty.

4. The celery worker updates the metadata to set the status of the task instance to running:

5. The celery worker executes the command. The worker status can be monitored from the Flower web interface by running airflow flower. It is a simple web server on which celery workers regularly report their status.

6. Once the task is finished, the celery worker updates the metadata to set the status of the task instance to success:

Airflow Celery Queued

Airflow celery queued

That's it. The task is now complete. If other ones depend on its completion, they will now be created by the scheduler and follow the same process.

Multiple Celery workers

Obviously, what we want to achieve with a Celery Executor is to distribute the workload on multiple nodes. It only makes sense if multiple tasks are running at the same time. Let's try with a simple DAG:

In our example, just after the Hello_World task is completed, the scheduler pushes both sleep15 and sleep20 tasks to the queue to be executed by the celery workers.

We can check with Flower that each celery worker received one task from the queue and that they are executing their assigned task simultaneously:

The scheduler now waits for both tasks to be reported as successful before sending the next one to the queue.

Airflow Celery Vs Local

Conclusion

I hope you now have a clearer understanding of all the execution steps of task distribution with Apache Airflow.

If you need help, please post a comment or ask the community on the Apache Airflow Slack.

Airflow Celery Architecture

Airflow celery architecture

That's it. The task is now complete. If other ones depend on its completion, they will now be created by the scheduler and follow the same process.

Multiple Celery workers

Obviously, what we want to achieve with a Celery Executor is to distribute the workload on multiple nodes. It only makes sense if multiple tasks are running at the same time. Let's try with a simple DAG:

In our example, just after the Hello_World task is completed, the scheduler pushes both sleep15 and sleep20 tasks to the queue to be executed by the celery workers.

We can check with Flower that each celery worker received one task from the queue and that they are executing their assigned task simultaneously:

The scheduler now waits for both tasks to be reported as successful before sending the next one to the queue.

Airflow Celery Vs Local

Conclusion

I hope you now have a clearer understanding of all the execution steps of task distribution with Apache Airflow.

If you need help, please post a comment or ask the community on the Apache Airflow Slack.

Airflow Celery Architecture

Airflow Celery Architecture

Thanks to Flavian Hautbois, Florian Carra, and Alexandre Sapet.





broken image