Get back @periodic_task in Celery 5
The @periodic_task
decorator was deprecated in Celery 4 and completely removed in Celery version 5. If you (like me) used it heavily and are frustrated that you have to migrate to a different way of defining what times the periodic_task will run, here is what you can do to get back @periodic_task
decorator
@periodic_task deprecated and finally removed in Celery 5#
Since Celery 4, @periodic_task
was marked for deprecation, in Celery 5, it was removed entirely. I loved @periodic_task
decorator for its simplicity, as I could easily see both the task logic and timing in one place and not have to move between two files.
Removal of the decorator meant that developers must configure the schedule separately (in a dedicated configuration file, usually your celerfile.py
) instead of decorating a task code logic in place with its schedule. Celery's documentation states that having all schedules in one place is better. However, in my opinion, for a large codebase, separating the task definition from its schedule potentially makes the code harder to read and maintain. And I am not alone as I saw here and here. Let me explain why.
Starting Point - celery 4.0 code with @periodic_task#
Consider a project with multiple Django apps, each containing several periodic tasks. With the periodic_task decorator, each task's run-schedule was immediately visible at the same place as the task definition, like so:
App 1: apps/user_engagement#
tasks.py
@periodic_task(run_every=crontab(hour=19, minute=30, day_of_week='sunday'),)
def task_send_weekly_newsletter():
# send newsletter on sunday evening when open rates are highest.
# When I am inside this task declaration, it takes my eyes a mere fraction of a
# second to glance to the top and find out when the task is usually executed (Sunday evening)
#
pass
@periodic_task(run_every=crontab(day_of_week='sunday', hour=10),)
def task_generate_weekly_report():
# Similarly a short glance up would show me Sunday 10am is when this will run
pass
App 2: apps/inventory_management#
tasks.py
@periodic_task(run_every=crontab(hour=2, minute=0, day_of_week='sunday'),)
def task_check_inventory_levels():
# I see that this happens at 2am on Sunday morning, server utilization is at its
# lowest, so I don't have to focus on optimizing this task for cpu/memory
pass
@periodic_task(run_every=crontab(minute='*/15'),)
def task_update_stock_counts():
# Updating stock count I see above happens every 15 minutes,
# so my eyes straight away goes down and starts to check if the querysets are
# optimised and loops are optimised to be efficient
pass
Without the decorator, you have first to fall back to the vanilla task (or shared_task) decorator
-@periodic_task(run_every=crontab(hour=12, minute=30),)
+@shared_task
def task_send_weekly_newsletter():
# as before
and then use either of the two following methods
Approach 1: Add entries directly#
To call a task periodically, you have to add an entry to the beat schedule list, as explained here](https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#entries)
celeryfile.py
from celery import Celery
from celery.schedules import crontab
from apps.user_engagement.tasks import task_send_weekly_newsletter
from apps.user_engagement.tasks import task_generate_weekly_report
# long list of imports for every "periodic" task here, and we had over 80
app = Celery('your_project_name')
# Celery configuration settings here
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# App 2: User Engagement
sender.add_periodic_task(
crontab(hour=19, minute=30, day_of_week='sunday'),
task_send_weekly_newsletter.s(),
name='Send Weekly Newsletter'
)
sender.add_periodic_task(
crontab(day_of_week='sunday', hour=10),
task_generate_weekly_report.s(),
name='Generate Weekly Report'
)
# and similar for the 2 other tasks
# in fact in a large codebase there will be 80 tasks here
# so good luck in understanding each of this separately in context of looking at their
# scheduled-times and code-logic together in fact we started going down the route of having a
# spreadsheet for each of our 80 tasks with their app/task names and scheduled times;
# and then have some high level logic explanation in the same spreadsheet to get a good picture
If you want to avoid writing those boilerplate imports at the top, you can, as an alternative, do the following:
Approach 2: Use beat_schedule dict#
To call a task periodically, you have to add an entry to the beat schedule list like Celery docs show here
celeryfile.py
from celery import Celery
from celery.schedules import crontab
# imports not needed BUT you now have to painstakingly type out the fully scoped names of the tasks
app = Celery('your_project_name')
# Celery configuration settings here
app.conf.beat_schedule = {
'send_weekly_newsletter': {
'task': 'apps.user_management.tasks.task_send_weekly_newsletter',
'schedule': crontab(hour=12, minute=30, day_of_week='sunday'),
},
'generate_weekly_report': {
'task': 'apps.user_management.tasks.task_generate_weekly_report',
'schedule': crontab(hour=10, minute=0, day_of_week='sunday'),
},
# in our project this meant typing out 80 such different dicts with scoped task name strings
# in a place different from the logic of the tasks
# this approach still has the same context-switching issue as approach 1
}
In my opinion, neither of the methods above is elegant, and I should not be forced to centralize schedules at the cost of developer experience/context-switching
Of course, using the Django database for periodic task schedules is a different ball game, but our codebase did not. We wanted to keep run-schedules, and logic collocated for reasons I talked about above, viz, knowing which periodic tasks run more and when and always looking at the logic with that context of scheduled run-times in mind.
Resurrecting the @periodic_task decorator#
When I started making the spreadsheet to bring back everything in one place, I was so uninspired that I explored ways of getting the periodic_task decorator back from the grave.
Looking into the celery internals and at some helpful GitHub issues, I figured out a way to write the
decorator custom decorator using the base
parameter of the celery task
or shared_task
and basically creating the beat_schedule dict on the fly using the on_bound method of the celery.Task
class
from celery import Task
from celery import shared_task
class BasePeriodicTask(Task):
@classmethod
def on_bound(cls, app):
# this is called when the task is bound to the celery app at the time of discovery
# Uncomment these to inspect what happens during on_bound
# print("On bound called .. ")
# from celery.contrib import rdb
# # rdb.set_trace()
# print(f"Cls dict = {cls.__dict__}", )
app.conf.beat_schedule[cls.name] = {
"task": cls.name,
"schedule": cls.run_every,
"args": (),
"kwargs": {},
"options": cls.options or {},
"relative": cls.relative if hasattr(cls, "relative") else False,
}
def periodic_task(run_every, **kwargs):
if run_every is None:
raise NotImplementedError("run_every must be specified in @periodic task decorator")
def decorator(f):
# Use shared_task decorator with BasePeriodicTask as the base, passing additional args and kwargs
return shared_task(base=BasePeriodicTask, run_every=run_every, queue=queue, options=options, **kwargs)(f)
return decorator
This decorator above is a drop-in replacement to Celery's (now-removed) periodic_task decorator without any change of code and works perfectly with our initial usage at the start