什么是celery beat?
celery beat是一个调度程序(scheduler)。它定期启动任务,然后由集群中可用的工作节点执行。默认情况下,任务配置取自beat_schedule设置, 其默认情况下是celery.beat:PersistentScheduler,这种调度器将配置存储在本地shelve文件中,我们也可以使用自定义存储,比如将任务配置存储在MySQL,redis或MongoDB中等。自定义调度器可以通过celery beat -S
设置,示例:
celery beat -S celerybeatmongo.schedulers.MongoScheduler
复制
可参考celery之scheduler调度器,以上内容,我在这篇中介绍过。
什么是celerybeat-mongo?
celerybeat-mongo是一种自定义scheduler。源码见:https://github.com/zmap/celerybeat-mongo。它将任务配置信息存储在MongoDB数据库中。与之相关的配置有:
mongodb_scheduler_db = celery
mongodb_scheduler_url = mongodb://root:123456@localhost:27017/celery?authsource=admin复制
这种调度器,将任务信息存储在mongodb_scheduler_db库下的schedules表中。
使用celerybeat-mongo中遇到的问题。
见issues: https://github.com/zmap/celerybeat-mongo/issues/69 将其中一个任务的enabled字段设置为false, 将导致所有任务都不再被调度。
要找出问题的根源,就需要了解celery beat的工作原理。我在上篇celery beat实现原理分析中做过简单的梳理。
celery beat工作原理
celery beat的定时任务的实现是利用的最小堆,任务按时间顺序排序,堆顶元素是最先要被执行的任务。同时会有一个线程不断的轮询判断,每次都取堆顶元素,判断其是否达到执行时间,如果达到,则执行任务,如果没到则sleep相应时间后,进入下次循环。
注:关于当我们执行celery beat启动命令的时候,到底发生了什么?我一直认为,要了解一个组件的工作原理,最好的方式是通过“调试”代码,通过打断点“step by step”的方式,可以帮助我们理清代码的执行逻辑。
使用pycharm调试celery beat。
再回到celerybeat-mongo存在的问题上:
将其中一个任务的enabled字段设置为false, 将导致所有任务都不再被调度。
经过调试,我发现其问题出在堆的生成上,当我们把其中任务的enabled字段设置为false后,这个任务会一直在堆顶元素。
我们来看下最小堆的生成规则:
event_t = namedtuple('event_t', ('time', 'priority', 'entry'))
...
def populate_heap(self, event_t=event_t, heapify=heapq.heapify):
"""Populate the heap with the data contained in the schedule."""
priority = 5
self._heap = []
for entry in self.schedule.values():
is_due, next_call_delay = entry.is_due()
info('entry: %s', entry)
info('is_due: %s', is_due)
info('next_call_delay: %s', next_call_delay)
self._heap.append(event_t(
self._when(
entry,
0 if is_due else next_call_delay
) or 0,
priority, entry
))
heapify(self._heap)
...
is_due, next_time_to_run = self.is_due(entry)复制
heapq.heapify用于将list转换为最小堆。我们看到如上所示代码片断中self._heap是要转换的list。
list中的每一项是个event_t,event_t是个命名元组,包含三项time、priority、entry。
所以排序的关键是time字段,即next_time_to_run,它是self.is_due(entry)函数返回值,这时就需要进入了celerybeat_mongo的代码去看了,看下celerybeat_mongo的is_due方法。
def is_due(self):
if not self._task.enabled:
return False, 5.0 # 5 second delay for re-enable.
if hasattr(self._task, 'start_after') and self._task.start_after:
if datetime.datetime.now() < self._task.start_after:
return False, 5.0
if hasattr(self._task, 'max_run_count') and self._task.max_run_count:
if (self._task.total_run_count or 0) >= self._task.max_run_count:
return False, 5.0
if self._task.run_immediately:
# figure out when the schedule would run next anyway
_, n = self.schedule.is_due(self.last_run_at)
return True, n
return self.schedule.is_due(self.last_run_at)复制
问题就出在
if not self._task.enabled:
return False, 5.0 # 5 second delay for re-enable.复制
当task的enabled设为False后,next_time_to_run的值是5...这个值太小了导致它一直处在最小堆的堆顶。应该将其设为一个超大值,比如sys.maxsize。
参考:
https://github.com/zmap/celerybeat-mongo
https://liqiang.io/post/deep-in-celery-periodic-task