Redis 是一个事件驱动程序,需要处理两类事件:文件事件(file event)和事件事件(time event)。
文件事件:用于处理 Redis 服务器和客户端之间的网络IO。
时间事件:Redis 服务器中的一些操作(比如serverCron函数)需要在给定的时间点执行,而时间事件就是处理这类定时操作的。
文件事件
Redis 基于 Reactor 模式开发了自己的网络事件处理器:这个处理器被称为文件事件处理器(file event handler):
文件事件处理器使用 I/O 多路复用(multiplexing)程序来同时监听多个套接字, 并根据套接字目前执行的任务来为套接字关联不同的事件处理器。
当被监听的套接字准备好执行连接应答(accept)、读取(read)、写入(write)、关闭(close)等操作时, 与操作相对应的文件事件就会产生, 这时文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件。
虽然文件事件处理器以单线程方式运行, 但通过使用 I/O 多路复用程序来监听多个套接字, 文件事件处理器既实现了高性能的网络通信模型, 又保持了 Redis 内部单线程设计的简单性。
文件事件处理器构成
文件事件处理器由四个部分组成:套接字、I/O 多路复用程序、 文件事件分派器(dispatcher)和事件处理器。
文件事件是对套接字操作的抽象,每当一个套接字准备好执行连接应答(accept)、写入、读取、关闭等操作时,就会产生一个文件事件。I/O 多路复用程序负责监听套接字,并向文件事件分派器传送那些产生了事件的套接字。
I/O 多路复用程序会将所有产生事件的套接字都入队到一个队列里面,然后通过这个队列,以有序(sequentially)、同步(synchronously)、每次一个套接字的方式向文件事件分派器传送套接字:当上一个套接字产生的事件被处理完毕之后(该套接字为事件所关联的事件处理器执行完毕),I/O 多路复用程序才会继续向文件事件分派器传送下一个套接字。
文件事件分派器接收 I/O 多路复用程序传来的套接字,并根据套接字产生的事件的类型,调用相应的事件处理器。
服务器会为执行不同任务的套接字关联不同的事件处理器,这些处理器是一个个函数,它们定义了某个事件发生时,服务器应该执行的动作。
多路复用
Redis 的 I/O 多路复用程序的所有功能都是通过包装常见的 select 、 epoll 、 evport 和 kqueue 这些 I/O 多路复用函数库来实现的。每个多路复用函数库在 Redis 源码中都对应一个单独的文件,比如 ae_select.c、ae_epoll.c、ae_kqueue.c等。Redis 会根据不同的操作系统,按照不同的优先级选择多路复用技术。
时间事件
Redis 的时间事件分为以下两类:
定时事件:指定时间之间执行程序
周期事件:指定间隔事件循环执行
时间事件结构:
typedef struct aeTimeEvent {
/* 全局唯一ID */
long long id; /* time event identifier. */
/* 秒精确的UNIX时间戳,记录时间事件到达的时间*/
long when_sec; /* seconds */
/* 毫秒精确的UNIX时间戳,记录时间事件到达的时间*/
long when_ms; /* milliseconds */
/* 时间处理器(一个函数):时间事件到达时,服务器调用处理事件 */
aeTimeProc *timeProc;
/* 事件结束回调函数,析构一些资源*/
aeEventFinalizerProc *finalizerProc;
/* 私有数据 */
void *clientData;
/* 前驱节点 */
struct aeTimeEvent *prev;
/* 后继节点 */
struct aeTimeEvent *next;
} aeTimeEvent;
复制
一个时间事件是定时事件还是周期性事件取决于时间处理器的返回值:
如果返回值是 AE_NOMORE,那么这个事件是一个定时事件,该事件在达到后删除,之后不会再重复。
如果返回值是非 AE_NOMORE 的值,那么这个事件为周期性事件,当一个时间事件到达后,服务器会根据时间处理器的返回值,对时间事件的 when_sec 属性和 when_ms 属性进行更新,让这个事件在一段时间后再次达到。
服务器将所有事件事件放在一个无序(没有按事件排序)链表中,每当时间事件执行器运行时,它就遍历整个链表,查找所有已经到达的事件时间,并调用相应的事件处理器。
serverCron
serverCron 是 Redis 中定期处理函数,负责定期检查服务器资源和状态,保障服务器可以长期、稳定的运行,主要工作如下:
更新服务器各类统计信息,如内存占用。
清理数据库中过期键值对。
关闭和清理连接失效的客户端。
尝试 RDB 或 AOF 持久化操作。
如果是主服务器,尝试对从服务器定期同步。
如果处于集群模式,对集群进行定期同步和连接测试。
Redis 以周期性事件的方式运行 serverCron 函数,直到服务器关闭。
通信过程
一次客户端和服务器通信过程:
客户端向服务端发起建立 socket 连接的请求,那么监听套接字将产生 AE_READABLE 事件,触发连接应答处理器执行。处理器会对客户端的连接请求进行应答,然后创建客户端套接字,以及客户端状态,并将客户端套接字的 AE_READABLE 事件与命令请求处理器关联。
客户端建立连接后,向服务器发送命令,那么客户端套接字将产生 AE_READABLE 事件,触发命令请求处理器执行,处理器读取客户端命令,然后传递给相关程序去执行。
执行命令获得相应的命令回复,为了将命令回复传递给客户端,服务器将客户端套接字的 AE_WRITEABLE 事件与命令回复处理器关联。当客户端试图读取命令回复时,客户端套接字产生 AE_WRITEABLE 事件,触发命令回复处理器将命令回复全部写入到套接字中。
事件管理器
服务器中同时存在文件事件和时间事件两种事件类型,Redis 需要对这两种事件进行调度,下面是 Redis 事件管理器。
aeEventLoop(事件管理器)是整个事件驱动模型的核心,它管理着文件事件列表和时间事件列表,不断地循环处理着就绪的文件事件和到期的时间事件。
aeEventLoop 结构:
typedef struct aeEventLoop {
// 目前已注册的最大描述符
int maxfd; /* highest file descriptor currently registered */
// 目前已追踪的最大描述符
int setsize; /* max number of file descriptors tracked */
// 用于生成时间事件 id
long long timeEventNextId;
// 最后一次执行时间事件的时间
time_t lastTime; /* Used to detect system clock skew */
// 已注册的文件事件
aeFileEvent *events; /* Registered events */
// 已就绪的文件事件
aeFiredEvent *fired; /* Fired events */
// 时间事件
aeTimeEvent *timeEventHead;
// 事件处理器的开关
int stop;
// 多路复用库的私有数据
void *apidata; /* This is used for polling API specific data */
// 在处理事件前要执行的函数
aeBeforeSleepProc *beforesleep;
// 在处理事件后要执行的函数
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
复制
Redis 服务端在启动服务器过程中会同 AeEventLoop *aeCreateEventLoop(int setsize) 函数创建事件管理器 aeEventLoop 对象,主要是初始化 aeEventLoop 的各个属性值,如 events、fired、timeEventHead 和 apidata。
先创建aeEventLoop对象。
初始化未就绪文件事件表、就绪文件事件表。events指针指向未就绪文件事件表、fired指针指向就绪文件事件表。
初始化时间事件列表,设置 timeEventHead 和 timeEventNextId 属性。
调用 aeApiCreate 函数创建 epoll 实例(生产环境大多 Linux,对应的是 epoll),并初始化 apidata。
其中要关注的是 aeApiCreate 函数:
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
/* 初始化 epoll 就绪事件表 */
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
/* 创建 epoll 实例 */
state->epfd = epoll_create(1024);
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
/* 事件管理器与 epoll 关联 */
eventLoop->apidata = state;
return 0;
}
复制
aeApiCreate 函数首先创建了 aeApiState 对象,初始化了 epoll 就绪事件表,然后调用 epoll_create 创建了 epoll 实例,最后将该 aeApiState 赋值给 apidata 属性。
aeApiState 结构:
typedef struct aeApiState {
/* epoll_event 实例描述符*/
int epfd;
/* 存储epoll就绪事件表 */
struct epoll_event *events;
} aeApiState;
复制
aeApiState 对象中 epfd 存储 epoll 的标识,events 是一个 epoll 就绪事件数组,当有 epoll 事件发生时,所有发生的 epoll 事件和其描述符将存储在这个数组中。这个就绪事件数组由应用层开辟空间、内核负责把所有发生的事件填充到该数组。
文件事件结构:
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
复制
文件事件处理与套接字相关的工作,在 aeEventLoop 中以数组的形式保存,redis.conf 中可以设置 maxClients,根据这个配置分配数组的大小,fd 会按照从小到大分配,对应 fd 的事件就保存在数组 events[fd] 中。
时间事件结构:
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
复制
时间事件在 aeEventLoop 中以链表保存,aeCreateTimeEvent() 会将新创建的时间事件添加在链表头部。
事件处理
事件管理器初始化完成后,Redis 就一直在 aeMain() 中处理事件循环。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
复制
aeProcessEvents() 为处理事件的函数,流程如下:
查找最近的一个 TimeEvent,因为以链表形式保存,耗时 O(N)。
以最近的 TimeEvent 时间间隔为参数调用 aeApiPoll(),保证能及时处理。
处理 FileEvents。
处理 TimeEvents: processTimeEvents() 会处理所有到时的事件,并重新添加到链表头。
aeProcessEvents() 伪代码:
def aeProcessEvents():
# 获取到达时间离当前时间最接近的时间事件
time_event = aeSearchNearestTimer()
# 计算最接近的时间事件距离到达还有多少毫秒
remaind_ms = time_event.when - unix_ts_now()
# 如果事件已到达,那么remaind_ms 的值可能为负数,将它设定为0
if remaind_ms < 0:
remaind_ms = 0
# 根据remaind_ms 的值,创建timeval 结构
timeval = create_timeval_with_ms(remaind_ms)
# 阻塞并等待文件事件产生,最大阻塞时间由传入的timeval 结构决定
# 如果remaind_ms 的值为0 ,那么aeApiPoll 调用之后马上返回,不阻塞
aeApiPoll(timeval)
# 处理所有已产生的文件事件
processFileEvents()
# 处理所有已到达的时间事件
processTimeEvents()
复制
文章同步github。
地址:github.com/lazecoding/Note/blob/main/note/articles/redis/事件驱动.md