Asynchronous Distributed Task Execution via Python Celery using Redis and RabbitMQ

Celery Anatomy

Celery Working Process

https://en.proft.me/2013/10/25/celery-periodic-tasks-django-projects/

Use Case

Use Case Development

  1. Install required celery libraries
  2. Implement celery tasks and an application that call the celery tasks
  3. Start message broker via docker images
  4. Initiate celery workers on the terminal
  5. Execute application that triggers the celery workers

Common Requirements

pip install celery flower

Implementing Required Tasks

from time import sleep
from celery import Celery
# Redis Database Configuration as Message BrokerBROKER_URL = 'redis://localhost:6379/0'
BACKEND_URL = 'redis://localhost:6379/1'
#redis://:password@hostname:port/db_number
BACKEND_URL = "rpc://"
BROKER_URL ="pyamqp://guest@localhost//"
app = Celery('tasks', broker=BROKER_URL, backend=BACKEND_URL)
app.conf.task_routes = {
'celery_use_cases.tasks_redis.prepare_tee': {'queue': 'tee'},
'celery_use_cases.tasks_redis.prepare_frites': {'queue': 'frites'}
}
@app.task
def prepare_tee(table_num, size):

print('Preparing for the table {} a {}-sized tee!'.format(table_num, size))
sleep(2)
return "tee"


@app.task
def prepare_frites(table_num, size):

print('Preparing for the table {} a {}-sized frites!'.format(table_num, size))
sleep(6)
return "frites"
from time import sleep
from celery import Celery

BROKER_URL = 'redis://localhost:6379/0'
BACKEND_URL = 'redis://localhost:6379/1'

app = Celery('tasks_redis', backend=BACKEND_URL, broker=BROKER_URL)

app.conf.task_routes = {
'celery_use_cases.tasks_redis.prepare_tee': {'queue': 'tee'},
'celery_use_cases.tasks_redis.prepare_frites': {'queue': 'frites'}
}


@app.task
def prepare_tee(table_num, size):

print('Preparing for the table {} a {}-sized tee!'.format(table_num, size))
sleep(2)
return "tee"


@app.task
def prepare_frites(table_num, size):

print('Preparing for the table {} a {}-sized frites!'.format(table_num, size))
sleep(6)
return "frites"
from time import sleep
from celery import Celery


BACKEND_URL = "rpc://"
BROKER_URL = "pyamqp://guest@localhost//"

# Creating a celery instance with redis as message broker.

app = Celery('tasks', backend=BACKEND_URL, broker=BROKER_URL)


app.conf.task_routes = {
'celery_use_cases.tasks_rabbitmq.prepare_hamburger': {'queue': 'hamburger'},
'celery_use_cases.tasks_rabbitmq.prepare_spaghetti': {'queue': 'spaghetti'}
}


@app.task
def prepare_hamburger(table_num, size):

print('Preparing for the table {} a {}-sized hamburger!'.format(table_num, size))
sleep(5)
return "hamburger"

@app.task
def prepare_spaghetti(table_num, size):

print('Preparing for the table {} a {}-sized spaghetti!'.format(table_num, size))
sleep(6)
return "spagetti"
from celery_use_cases import tasks_redis as tredis
from celery.result import AsyncResult


asyncobj = tredis.prepare_frites.apply_async(('3', 'small'))
print("Frites is ordered")
result = AsyncResult(asyncobj.id, app=tredis.app)
print(result.get()) # 'return value'
from celery_use_cases import tasks_redis as tredis
from celery.result import AsyncResult


asyncobj = tredis.prepare_frites.apply_async(('3', 'small'))
print("Frites is ordered")
result = AsyncResult(asyncobj.id, app=tredis.app)
print(result.get()) # 'return value'
$docker run --name my-redis -p 6379:6379 --restart always --detach redis
$docker run --rm -it -p 15672:15672 -p 5672:5672 rabbitmq:3-management
$celery -A celery_use_cases.tasks_redis worker -l info -Q frites
$celery -A celery_use_cases.tasks_redis worker -l info -Q tee
./hamburger_chef.sh
./spaghetti_chef.sh

Celery Monitoring

# celery -A [proj_tasks] flower [example]$celery -A celery_use_cases.tasks_rabbitmq flower #[real usage]
celery -A  celery_use_cases.tasks_rabbitmq flower --port=5555 
celery -A celery_use_cases.tasks_redis flower --port=5556 flower
celery --broker=redis://localhost:6379/0 flower
$docker run -p 5555:5555 mher/flower

Conclusion

--

--

--

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

LINE Chatbot 開發攻略(二)

Git, A beginners guide

Navigating the Legal Cloud: How to Manage Data and Intellectual Property with Cloud Orchestration…

Team Spotlight: James Wu

What is Dependency Injection?

The Riddle of Sphinx: How to Document Your Code Easily

Pi 4 Case Fan

Case

HKS Pose Estimation — Day 2/30

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
cem akpolat

cem akpolat

More from Medium

How to secure your GraphQL API (Part 2, the Typetta way)

AWS: Run an S3 triggered Lambda Locally Using LocalStack

Query Array JSON in PostgreSQL

Liquibase — Keep your database on track