Asynchronous Distributed Task Execution via Python Celery using Redis and RabbitMQ
The asynchronous task execution is required nearly at every software piece, if there is an uncertain response time for the execution requester. Almost all programming languages have different approaches and libraries, however, the task queuing concept is a well known approach. For Python ecosystem, Celery comes forward as distributed asynchronous task queue that ease the execution and management of task distribution. The main concern supports the real-time operations and scheduling. The aim of using such an approach relies on the execution tasks outside the application context. In other words, the resource hungry-tasks can be offloaded to the task queue, which may run also in different threads or machines, thus running the main application without concerning them.
Asynchronous distributed task execution may not be required for each software program, however, if a part of the code cannot deliver the results to other components on time or in a specific time interval, the asynchronous implementation approach can be necessary, i.e. sending an email, processing an image or video file, etc.
Celery framework does not limit itself with the python ecosystem and run with different programming languages such as java, JavaScript or node. The whole point is here to find a suitable programming API that can the python functions, as given in this example for java and in this link for C++, and anoher link for node. Developer can break their codes into smaller pieces/batches to process all tasks in Celery workers.
Even the asynchronous task execution is highlighted, job scheduling through celerybeat, which is responsible for scheduling tasks, and real-time celery worker monitoring can be performed via celery flower.
Celery Anatomy
Celery Clients: This triggers the execution of the celery worker over the message broker. A web-request or any other interface on the terminal can be seen as a celery client.
Celery Workers: This represents the background task and the logic part of the celery. Regarding the implemented logic required by the developers will b processed in the worker. The results can be visualized either on the terminal or return to the calling function as in many web application.
Message Broker: The role of the broker is to establish the communication between celery client and celery worker through message queues. RabbitMQ and Redis are commonly utilized message brokers.
Celery Working Process
Celery Client uses the functionalities of Celery worker by importing them and execute worker processes that are bound to the message queue. Celery Client transmits the task to the message broker, and the message broker assign the task to the related worker. Whenever the task is received by the Celery Worker, the task is executed and the result is transmitted to the client through another queue for further processing.
A Celery is mostly used by the web applications and the flow diagram is depicted for Django web application, which reflects almost the similar steps for other web application as well.
Use Case
We usually go to the restaurants and a meal. Every product can be prepared in different time amounts, and each task is separated from each other. Assuming clients order two different meals. Two orders will be sent to the kitchen through waiters, which takes the message broker role. The cooking chefs in the kitchen start cooking the meals, and each of which requires different times due to their ingredients and cooking arts. The cooking chefs inform the waiter the ordered meals whenever they are ready, and the meals are delivered to the clients in the restaurant. The following figure depicts this process.
Considering this analogy, all the items in the figure above can be represented also through the celery components, namely, clients, message broker and celery workers. Restaurant clients ordering meals indicate the celery clients, whereas waiters play the message broker role, since they are responsible for delivering the messages between clients and chefs. Finally, cooking chefs play the role of celery worker. The same scenario can be replaced also with online meal ordering, in this case the message broker is replaced with a digital interface, however, the meals are delivered by food logistic firms and the rest components are similar.
Use Case Development
The aforementioned scenario will be developed below with two different message brokers, these are RabbitMQ and Redis. The whole processes are quite similar, the unique change between these two message brokers is the distinct URL addresses from the configuration perspective. Instead of installing these two message brokers, their dockers are deployed in order to decrease the configuration steps. The following steps will be performed for both cases:
- Install required celery libraries
- Implement celery tasks and an application that call the celery tasks
- Start message broker via docker images
- Initiate celery workers on the terminal
- Execute application that triggers the celery workers
It is noteworthy to mention that the following examples cover two different queues for different tasks. It is possible to increase queue numbers, which isolates the tasks from each other.
Common Requirements
Development Environment: MacOS Catalina,
Required Libraries: celery, flower
pip install celery flower
Implementing Required Tasks
A tasks file is created to import all required libs, create application, configure the message broker and define the required task functions.
celery and time libs are imported. Celery for the main task, time libs is required is just to simulate the preparation duration.
from time import sleep
from celery import Celery
In orert to establish the communication betwen the client and celery workers, the message broker instance should be creatd. Redis and RabbitMQ databases are provided as URL to and should support the following format. If the results will not required for the client sid, then the BACKEND_URL can be ignored. In this tutorial, they are used to show how it works.
# Redis Database Configuration as Message BrokerBROKER_URL = 'redis://localhost:6379/0'
BACKEND_URL = 'redis://localhost:6379/1'#redis://:password@hostname:port/db_number
If RabbitMQ should be integrated in the code, the url addresses differ from the redis configuration as shown below:
BACKEND_URL = "rpc://"
BROKER_URL ="pyamqp://guest@localhost//"
Based on the selected configuraiton approach, a celery app instance should be created with the above URL addresses. First argument represents the actual module name, broker keyword indicatest the message broker url and backen represents the message broker url for transmitting the reslts
app = Celery('tasks', broker=BROKER_URL, backend=BACKEND_URL)
In this tutorial, a multi queue approach is implemented, therefore, each queue ought to be added as a task_route with its name to the app instance. Below, two queues are added as an example. Each queue will be directed to its own task. This approach can be quite useful while implementing the complex and distrbuted applications. If you plan to use only a single queue, then the code below can be removed and the queue name should be removed while executing the celery worker.
app.conf.task_routes = {
'celery_use_cases.tasks_redis.prepare_tee': {'queue': 'tee'},
'celery_use_cases.tasks_redis.prepare_frites': {'queue': 'frites'}
}
Once all these configuration steps are completed, it is time to define the logic of the tasks. The following tasks is decorated, since the celery framework can identifies using these decorators. Two simple functions are added, one is preparing a X meal/drink, whereas the other Y meal/drink.
@app.task
def prepare_tee(table_num, size):
print('Preparing for the table {} a {}-sized tee!'.format(table_num, size))
sleep(2)
return "tee"
@app.task
def prepare_frites(table_num, size):
print('Preparing for the table {} a {}-sized frites!'.format(table_num, size))
sleep(6)
return "frites"
The whole code should be seen as below in the end for redis example:
from time import sleep
from celery import Celery
BROKER_URL = 'redis://localhost:6379/0'
BACKEND_URL = 'redis://localhost:6379/1'
app = Celery('tasks_redis', backend=BACKEND_URL, broker=BROKER_URL)
app.conf.task_routes = {
'celery_use_cases.tasks_redis.prepare_tee': {'queue': 'tee'},
'celery_use_cases.tasks_redis.prepare_frites': {'queue': 'frites'}
}
@app.task
def prepare_tee(table_num, size):
print('Preparing for the table {} a {}-sized tee!'.format(table_num, size))
sleep(2)
return "tee"
@app.task
def prepare_frites(table_num, size):
print('Preparing for the table {} a {}-sized frites!'.format(table_num, size))
sleep(6)
return "frites"
for rabbitmq:
from time import sleep
from celery import Celery
BACKEND_URL = "rpc://"
BROKER_URL = "pyamqp://guest@localhost//"
# Creating a celery instance with redis as message broker.
app = Celery('tasks', backend=BACKEND_URL, broker=BROKER_URL)
app.conf.task_routes = {
'celery_use_cases.tasks_rabbitmq.prepare_hamburger': {'queue': 'hamburger'},
'celery_use_cases.tasks_rabbitmq.prepare_spaghetti': {'queue': 'spaghetti'}
}
@app.task
def prepare_hamburger(table_num, size):
print('Preparing for the table {} a {}-sized hamburger!'.format(table_num, size))
sleep(5)
return "hamburger"
@app.task
def prepare_spaghetti(table_num, size):
print('Preparing for the table {} a {}-sized spaghetti!'.format(table_num, size))
sleep(6)
return "spagetti"
Creating Celery Client Application
Assuming that the tasks are created, and they need to be called by a client side application. As the most usage area of celery framework in the web application, these requests are mostly sent over a rest interface such as flask or django to the celery. In order to keep the simplicity, a simple python code calling these tasks and receiving their responses.
First the described task should be imported and then AsyncResult module is required to receive asynchrnously the responses from the celery worker.
from celery_use_cases import tasks_redis as tredis
from celery.result import AsyncResult
asyncobj = tredis.prepare_frites.apply_async(('3', 'small'))
print("Frites is ordered")
Executing the task above returns an AsyncResult instance that can observe the task state, wait for its finish, and retrieve the return value if there is any or an exception traceback due to the failed task.
Retrieving Results from Celery Worker
In order to return responses and keeping track of celery tasks, celery requires a storage mechanism to transmit every task state. BACKEND_URL is therefore given above for this purpose. For more info I encourage you to read throughly Result Backends to gain more detail.
result = AsyncResult(asyncobj.id, app=tredis.app)
print(result.get()) # 'return value'
The whole client app code:
from celery_use_cases import tasks_redis as tredis
from celery.result import AsyncResult
asyncobj = tredis.prepare_frites.apply_async(('3', 'small'))
print("Frites is ordered")
result = AsyncResult(asyncobj.id, app=tredis.app)
print(result.get()) # 'return value'
Run Redis/RabbitMQ Message Broker as Docker
Redis/RabbitMQ Message broker are two popular softwares, therefore their docker images are stable enough to use, which allows us to keep seperate them from our local computer.
Redis can be started via
$docker run --name my-redis -p 6379:6379 --restart always --detach redis
The last two parameters can be removed, if redis should not be started after the OS boot and if you want to see the console outputs.
RabbitMQ can be executed via
$docker run --rm -it -p 15672:15672 -p 5672:5672 rabbitmq:3-management
Some code examples make use of both software, redis is preferred as broker, whereas rabbitmq is mostly selected as backend url.
Starting Celery Workers
$celery -A celery_use_cases.tasks_redis worker -l info -Q frites
$celery -A celery_use_cases.tasks_redis worker -l info -Q tee
For the sake of simplicity, all above codes are converted into bash scripts and can be executed as below
./hamburger_chef.sh
./spaghetti_chef.sh
Celery Monitoring
Celery monitoring does simplify to monitor Celery tasks and many other operations. Flower is one of the web based monitoring tool for Celery. This software is already installed in the beginning of the tutorial.
To run flower, it requires the total path of the tasks, i.e.
# celery -A [proj_tasks] flower [example]$celery -A celery_use_cases.tasks_rabbitmq flower #[real usage]
Once this command is executed the operations can be accessed via http://localhost:5555 url address.
More than one application monitoring is also possible, two approaches are provided as described in this link.
Run celery monitoring for each application instance if needed:
celery -A celery_use_cases.tasks_rabbitmq flower --port=5555
celery -A celery_use_cases.tasks_redis flower --port=5556 flower
If the applications are connected to the same message broker, then celery flower can be connected to the broker as below:
celery --broker=redis://localhost:6379/0 flower
Dockerized Celery Flower: In this tutorial, celery flower library is installed directly on the OS, nevertheless you can directly run its dockerized version via
$docker run -p 5555:5555 mher/flower
Conclusion
Celery is much more than the aforementioned use case, especially, it is main role is to execute the tasks through the workers. Its specialized area is indeed to execute them with distinct configurations via celery decorators. Due to the number of the features, another article on this topic is much more logical. Especially designing work flows using celery worker, which is also used in Airflow, is quite intesting. During the writing phase of this article, the main contribution comes from the official website, and if needed, for more details it is highly recommended to see this link as well.
Source Code: https://github.com/cemakpolat/celery-examples