暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

celery之自定义scheduler:celerybeat-mongo中配置失效的问题

1443

  1. 什么是celery beat?

celery beat是一个调度程序(scheduler)。它定期启动任务,然后由集群中可用的工作节点执行。默认情况下,任务配置取自beat_schedule设置, 其默认情况下是celery.beat:PersistentScheduler,这种调度器将配置存储在本地shelve文件中,我们也可以使用自定义存储,比如将任务配置存储在MySQL,redis或MongoDB中等。自定义调度器可以通过celery beat -S
设置,示例:

celery beat -S celerybeatmongo.schedulers.MongoScheduler

复制

可参考celery之scheduler调度器,以上内容,我在这篇中介绍过。

  1. 什么是celerybeat-mongo?

celerybeat-mongo是一种自定义scheduler。源码见:https://github.com/zmap/celerybeat-mongo。它将任务配置信息存储在MongoDB数据库中。与之相关的配置有:

mongodb_scheduler_db = celery
mongodb_scheduler_url = mongodb://root:123456@localhost:27017/celery?authsource=admin

复制

这种调度器,将任务信息存储在mongodb_scheduler_db库下的schedules表中。

  1. 使用celerybeat-mongo中遇到的问题。

见issues: https://github.com/zmap/celerybeat-mongo/issues/69 将其中一个任务的enabled字段设置为false, 将导致所有任务都不再被调度。

要找出问题的根源,就需要了解celery beat的工作原理。我在上篇celery beat实现原理分析中做过简单的梳理。

celery beat工作原理

celery beat的定时任务的实现是利用的最小堆,任务按时间顺序排序,堆顶元素是最先要被执行的任务。同时会有一个线程不断的轮询判断,每次都取堆顶元素,判断其是否达到执行时间,如果达到,则执行任务,如果没到则sleep相应时间后,进入下次循环。

注:关于当我们执行celery beat启动命令的时候,到底发生了什么?我一直认为,要了解一个组件的工作原理,最好的方式是通过“调试”代码,通过打断点“step by step”的方式,可以帮助我们清代码的执行逻辑。

使用pycharm调试celery beat。

再回到celerybeat-mongo存在的问题上:

将其中一个任务的enabled字段设置为false, 将导致所有任务都不再被调度。

经过调试,我发现其问题出在堆的生成上,当我们把其中任务的enabled字段设置为false后,这个任务会一直在堆顶元素。

我们来看下最小堆的生成规则:

    event_t = namedtuple('event_t', ('time''priority''entry'))
    
    ...
    
    def populate_heap(self, event_t=event_t, heapify=heapq.heapify):
        """Populate the heap with the data contained in the schedule."""
        priority = 5
        self._heap = []
        for entry in self.schedule.values():
            
            is_due, next_call_delay = entry.is_due()
            info('entry: %s', entry)
            info('is_due: %s', is_due)
            info('next_call_delay: %s', next_call_delay)
            self._heap.append(event_t(
                self._when(
                    entry,
                    0 if is_due else next_call_delay
                ) or 0,
                priority, entry
            ))
        heapify(self._heap)
        
...

is_due, next_time_to_run = self.is_due(entry)


复制

heapq.heapify用于将list转换为最小堆。我们看到如上所示代码片断中self._heap是要转换的list。

list中的每一项是个event_t,event_t是个命名元组,包含三项time、priority、entry。

所以排序的关键是time字段,即next_time_to_run,它是self.is_due(entry)函数返回值,这时就需要进入了celerybeat_mongo的代码去看了,看下celerybeat_mongo的is_due方法。

    def is_due(self):
        if not self._task.enabled:
            return False, 5.0   # 5 second delay for re-enable.
        if hasattr(self._task, 'start_after') and self._task.start_after:
            if datetime.datetime.now() < self._task.start_after:
                return False, 5.0
        if hasattr(self._task, 'max_run_count') and self._task.max_run_count:
            if (self._task.total_run_count or 0) >= self._task.max_run_count:
                return False, 5.0
        if self._task.run_immediately:
            # figure out when the schedule would run next anyway
            _, n = self.schedule.is_due(self.last_run_at)
            return True, n
        return self.schedule.is_due(self.last_run_at)

复制

问题就出在

        if not self._task.enabled:
            return False, 5.0   # 5 second delay for re-enable.

复制

当task的enabled设为False后,next_time_to_run的值是5...这个值太小了导致它一直处在最小堆的堆顶。应该将其设为一个超大值,比如sys.maxsize。

参考:

https://github.com/zmap/celerybeat-mongo 

https://liqiang.io/post/deep-in-celery-periodic-task

文章转载自PostgreSQL运维技术,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论