Celery in Django | Celery beat | Explained with examples

Posted by

Introduction

What is celery?

Celery is a distributed task queue for executing work outside a python web application request response cycle.

A task queue’s input is a unit of work called a task. Dedicated worker processes constantly monitor task queues for new work to perform.

Celery is written in Python, but protocol can be implemented in any language.. In addition to Python, there is node-celery and node-celery-ts for Node.js and a PHP client.

Why Use Celery?

There are reasons why most developers want to start using Celery:

  • Third Party API Calls
  • For Periodic/ Scheduled Tasks ( with help of celery beat)
  • For Better User Exeperience (user will get immediate response)
  • High CPU Intensive tasks (suppose your machine learning model taking much time ,there you can use celery)

Why celery if we have multithreading , async and multiprocessing?

Everything is good but if you are working on large scale then we don’t want to give much overload to django.

Celery requires message transport to send and receive messages. We will use Redis( we can also use RedditMQ or Amazon SQS) and for storing information for out tasks we will use django database. We can also use redis ,mongodb, amazon s3 etc.

Redis

Redis can be both a backend and a broker.

As a Broker: Redis works well for rapid transport of small messages. Large messages can congest the system.

See documentation for details

As a Backend: Redis is a super fast K/V store, making it very efficient for fetching the results of a task call. As with the design of Redis, you do have to consider the limit memory available to store your data, and how you handle data persistence. If result persistence is important, consider using another DB for your backend.

Serialization

For transferring messeges from django and celery we need some proper structure for our data to be transform so there Celery provides different types of serialization like pickle, json, msgpack etc.

Even though both of these functionalities are part of Celery, they’re often addressed separately:

  1. Celery workers are worker processes that run tasks independently from one another and outside the context of your main service.
  2. Celery beat is a scheduler that orchestrates when to run tasks. You can use it to schedule periodic tasks as well.

Celery workers are the backbone of Celery. Even if you aim to schedule recurring tasks using Celery beat, a Celery worker will pick up your instructions and handle them at the scheduled time. What Celery beat adds to the mix is a time-based scheduler for Celery workers.

Let’s get started

First we will create basic setup for django celery.

Check python version (I am using Python 3.11.0)

python --version

In one new folder open cmd and create virtual environment and activate it,

virtualenv venv

venv\Scripts\activate

In that now install django,

pip install django

Now create project in that folder,

django-admin startproject celeryproject

open Vscode(you can use any editor) on the same folder and start terminal and activate virtualenv.

Install celery

pip install celery

Create app in your project,

django-admin startapp celeryapp

Now we will do some configuration in settings.py file,

Install redis

https://github.com/tporadowski/redis/releases

select Redis-x64-5.0.14.1.msi  (use redis version >5.0)

It will get installed in C drive>program files. And you will find Redis folder inside that click on redis-cli

To check it is running Type PING, you will get PONG.

Now create celery.py folder in your project and add below code,

from __future__ import absolute_import,unicode_literals
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "celeryproject.settings")
app = Celery("celeryproject")

#we are using asia/kolkata time so we are making it False
app.conf.enable_utc=False
app.conf.update(timezone='Asia/Kolkata')

app.config_from_object("django.conf:settings", namespace="CELERY")

app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f"Request: {self.request!r}")

Now we will create tasks.py in your celeryapp and add below code,

from celery import shared_task

@shared_task(bind=True)
def fun(self):
    # operations
    print("You are in Fun function")
    return "done"

Now in views.py file create View in which we will call that function.

from django.shortcuts import render
from .tasks import fun
from django.http import HttpResponse

# Create your views here.

def testView(request):
    fun.delay()
    return HttpResponse("Done")

setup url for above view,

celeryproject/urls.py

from django.contrib import admin
from django.urls import path
from django.urls.conf import include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('', include("celeryapp.urls")),
]

In mainapp create urls.py and in mainapp/urls.py

from django.urls import path
from .views import testView

urlpatterns = [
    path('', testView,name="testView"),
]

Now Install redis with below command,

pip install redis

Now runserver

python manage.py runserver

And also start new terminal  and we will start our celery Worker

celery -A celeryproject.celery worker --pool=solo -l INFO

(don’t worry about above command everything i will cover in this post)

If you want to monitor your tasks. Like it is in pending state or it is successfully executed by celery then you can also see all that. So for that install django-celery-results

pip install django-celery-results

and in settings.py mention below line,

#for storing result

CELERY_RESULT_BACKEND = "django-db"

And add django_celery_results to your installed apps list in settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'celeryapp',
    'django_celery_results',
]

In __init__.py file of your project folder add below line,

from .celery import app as celery_app

__all__ = ("celery_app",)

Then Make migrations and and migrate models to database,

python manage.py makemigrations

python manage.py migrate

Create super user then runserver,

python manage.py  createsuperuser 

 python manage.py runserver

go to localhost:8000 and if you are getting below error then,

OperationalError at / Error 11001 connecting to 127.0.0.1.6379:6379. getaddrinfo failed.

Then remove CELERY namespace from celery.py and  refresh page.

See now that fun function executed in celery  and responded by “done” which we mentioned in our task.

Great!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

Now we will add configurations for celery beat,

pip install django-celery-beat

add that in installed apps,

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'celeryapp',
    'django_celery_results',
    'django_celery_beat'
]

And add below in settings.py file,

#celery beat settings
CELERY_BEAT_SCHEDULER='django_celery_beat.schedulers:DatabaseScheduler'

Or use below command

celery -A celeryproject beat -l INFO –scheduler    django_celery_beat.schedulers:DatabaseScheduler

Now in celery.py file

#celery beat settings
app.conf.beat_schedule={

}

We haven’t added any periodic task but now we have successfully configured all requied configuration.

Now to see celery beat worker is running or not in new terminal add below command,

celery -A celeryproject.celery beat -l INFO

Beat can be embedded in regular Celery worker as well as with -B parameter. However, it’s not recommended for production use:

celery -A celeryproject worker -B -l INFO

How to scale our application?

A celery system can consist of multiple workers and brokers, giving way to high availability and horizontal scaling.

By default celery uses multiprocessing concept.

When you start a celery worker on the command line you just start supervisor process. The celery worker does not process tasks. It spawns child processes for your threads and deals like storing/managing data. The child processes(or threads) execute the actual tasks. These is also known as Execution pool.

The size of the execution pool determines the number of tasks celery worker can process.

It also depends on the CPU cores you are having in your system.

Worker, Pool, Concurrency

When you start a celery worker you specify the pool, concurrency, autoscale etc in the command.

Pool :- decides who will actually perform task – thread, child process, or worker itself or some one else. (if task is cpu based then use child process or if tasks is I/O based then go for threads)

Concurrency   :- concurrency decide the size of the pool.(by default number of cpu cores it uses)

autoscale :- to dynamically resize the pool based on the load. The autoscaler adds more pool processes when there is more work to do and starts removing processes when the workload is low.

celery –A project.celery worker –pool=prepfork –concurrency=5 –autoscale=10,3 –l info

  • if we specify pool=prefork saying it uses child processing( default ).
  • If we specify solo it will not spawn any child processes. Single worker will do all the tasks.
  • If we specify threads then it will use multithreading concept of python.

We can also use gevent and eventlet pools.

For autoscale=10,3 means  10 is maximum size and 3 saying minimum size.

Exercise

Send emails using Django celery

Idea is to send  emails to all the users of our application.

We will create a new app in our above project.

django-admin startapp mailfireapp

and then create tasks.py in that app directory and add below code,

from django.contrib.auth import get_user_model
from celery import shared_task
from django.core.mail import send_mail
from celeryproject import settings
	
@shared_task(bind=True)
def send_mail_func(self):
    users=get_user_model().objects.all()
    for user in users:
        mail_subject="hello celery"
        message="subscribe TheCodeSpace Youtube channel."
        to_email=user.email
        send_mail(
            subject=mail_subject,
            message=message,
            from_email=settings.EMAIL_HOST_USER,
            recipient_list=[to_email],
            fail_silently=True,
            )
    return "Task Successfull"

Settings for your Gmail account

In myaccount.google.com > security >enable 2 step verification > click on app paswords >give app name

Then it will give us some password(secret code).Don’t share it with anyone. For now suppose you got “abcde@123” then you need to add it in EMAIL_HOST_PASSWORD.

We also need to configure smtp settings in settings.py,

#SMTP Settings

EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.gmail.com'
EMAIL_USE_TLS = True
EMAIL_PORT = 587
EMAIL_HOST_USER = 'abc@gmail.com'
EMAIL_HOST_PASSWORD = 'abcde@123'
DEFAULT_FROM_EMAIL='Celery Testing | TheCodeSpace <abc@gmail.com>'

Now we will call send_mail_func in out celeryapp app. Now your celeryapp/views.py file will look like this,

from django.shortcuts import render
from .tasks import fun
from django.http import HttpResponse

from mailfireapp.tasks import send_mail_func
# Create your views here.

def send_mail_to_all_users(request):
    send_mail_func.delay()
    return HttpResponse("Email has beed Sent Successfully")

def testView(request):
    fun.delay()
    return HttpResponse("Done")

  and now attach url for send-mail_to_all_users function in your celeryapp/urls.py

from django.urls import path
from .views import testView,send_mail_to_all_users

urlpatterns = [
    path('', testView,name="testView"),
    path('sendmail/', send_mail_to_all_users,name="send_mail_to_all_users"),
]

Now start your server and celery worker.

Cmd 1>celery -A celeryproject.celery worker --pool=solo -l info

Cmd 2>python manage.py runserver

Go to http://localhost:8000/sendmail

And also check in your celery terminal you will get “Task Successful” and You users mail id will have mail which we have sent.

Sent mail periodically with django celery beat

Now suppose now we want to sent mail in specific interval then that we can do with celery beat ,

 We have already install django-celery-beat right. Remember in our project celery.py file we haven’t added anything in app.conf.beat_schedule dictionary.

Now add below things to celery.py file,

#celery beat settings
app.conf.beat_schedule={
    'send-mail-everyday-at-7':{
        'task':'mailfireapp.tasks.send_mail_func',
        'schedule': crontab(hour=7,minute=2),
        # we can pass arguments here and we can use those in
        # firemailapp/tasks.py send_mail_func function for 
        # that you need one extra argument in your function
        # currently we are not doing that
        # 'args': (1000,)  
    }
}

Start celery beat in terminal (cmd3 is just for your understanding that we are in third termnal)

Cmd3> celery -A celeryproject.celery beat -l INFO

Now you will get mail on 07:02 am time from our celery worker. You can check

https://docs.celeryq.dev/en/stable/reference/celery.schedules.html for more information about passing arguments to crontab function.

Now without specifying your all tasks in celery.py file you can also create tasks dynamically.

Create new url in urls.py

from django.urls import path
from .views import testView,send_mail_to_all_users,sendmailattime

urlpatterns = [
    path('', testView,name="testView"),
    path('sendmail/', send_mail_to_all_users,name="send_mail_to_all_users"),
    path('sendmailattime/', sendmailattime,name="sendmailattime"),
]

Create sendmailattime view for same so your view ,

from django.shortcuts import render
from .tasks import fun
from django.http import HttpResponse
from django_celery_beat.models import PeriodicTask,CrontabSchedule
from mailfireapp.tasks import send_mail_func
import json
# Create your views here.

def send_mail_to_all_users(request):
    send_mail_func.delay()
    return HttpResponse("Email has beed Sent Successfully")

def testView(request):
    fun.delay()     
    return HttpResponse("Done")

def sendmailattime(request):
    schedule,created=CrontabSchedule.objects.get_or_create(hour=20,minute=45)
    #we need to give name dynamically. now i am manually adding like mail_task_1
    task=PeriodicTask.objects.create(crontab=schedule,
    name="mail_task_"+"6",task='mailfireapp.tasks.send_mail_func')
    return HttpResponse("succcess")

Now Go on :-  http://localhost:8000/sendmailattime/

You will get “success” in page and You will get mail on that defined time.

Note :- change time settings to your Zone accordingly.

That’s it !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! We are done with the Most amazing concept of django –“CELERY IN DJANGO”.


I want to mention one thing to mention :-In one of my interview,

interviewer asked me :- what is celery in django?

i understood it as Salary :- i replied my Salary is X LPA.😂😂😂

he just laughed🤣🤣 !!! and i was rejected. from that time i have decided to learn this concept in deep.


Thank you for reading this article. Please put your thoughts on comment section.

Leave a Reply

Your email address will not be published. Required fields are marked *