什么是events?
celery中,有几种工具可以用来监视和检查celery集群, 比如在这篇中,提到的命令行工具:inspect/control和web监控工具flower。除此之外,celery还提供了events的概念。在celery中,注册了很多event,这些event将会在Task/Worker的状态发生变化的时候被发出,然后被绑定的event消费者所接收。
有哪些events?
celery内置的事件类型可以大致分为worker和task事件。
worker
worker-online worker-offline worker-heartbeat
task
task-sent task-received task-received task-started task-succeeded task-failed task-rejected task-revoked task-retried
events的作用?
上面说了,event会在Task/Worker的状态发生变化的时候被发出,那么我们就可以利用event来监控task和worker的状态。其实flower本身也是利用events来实现的。如worker发送events,flower捕获events以监控worker状态。
此外,官方也提供了命令行工具,celery events。
celery events是一个简单的curses 监控器,显示task和worker的历史记录。
$ celery -A proj events
复制
执行上面的命令,我们可以看到类似下图所示的screen
什么是curses?
是一种基于字符的终端显示技术。在python中,curses是为文本终端提供一个界面绘图和键盘输入响应的库。
events工作原理
事件是如何产生的?
事件由事件的发送者初始化,并且发送,在celery里事件的发送往往伴随着状态的改变,例如对于task的事件中包括发送,被接收,被执行,执行成功,执行失败等。worker的事件包括下线、心跳等。
在celery中有个EventDispatcher类,它负责发送event messages。

EventDispatcher类中核心的方法即_publish

其中producer.publish中的producer是kombu的producer。
什么是kombu?
kombu是celery生态的一部分,是用来发送和接收消息的library,支持多种不同的消息代理。kombu其目的是为AMQP协议提供一个高级接口,使python中消息传递尽可能简单。
什么是AMQP:Advanced Message Queuing Protocol,高级消息队列协议,是一种开放的标准协议,用于消息定向、队列、路由、可靠性和安全性,RabbitMQ是其最流行的实现。
事件是如何传递的?
如果采用redis作为broker,那么事件是基于redis的pub/sub模式而传递的。
事件是如何捕获的?
同上,如果采用redis作为broker,基于pub/sub模式模式,事件得以被接收。
参考:
https://liqiang.io/post/celery-source-analysis-event-implemention
https://www.jianshu.com/p/b8152e5fe275