异步API的实现细节
异步API的数据结构定义
在deps/hiredis/async.h头文件之中定义了关于异步API使用的redisAsyncContext
数据结构。与同步API类似,redisAsyncContext
表示用户与Redis服务器之间的连接。
typedef struct redisAsyncContext
{
redisContext c;
int err;
char *errstr;
void *data;
struct {
void *data;
void (*addRead)(void *privdata);
void (*delRead)(void *privdata);
void (*addWrite)(void *privdata);
void (*delWrite)(void *privdata);
void (*cleanup)(void *privdata);
} ev;
redisDisConnectCallback *onDisConnect;
redisConnectCallback *onConnect;
redisCallbackList replies;
struct {
redisCCallbackList invalid;
struct dict *channels;
struct dict *patterns;
} sub;
} redisAsyncContext;
复制
在redisAsyncContext
这个数据接口之中:
redisAsyncContext.c
,这个字段是一个redisContext
类型的数据,用于实际地存储用户与Redis服务器之间的连接信息。redisAsyncContext.err
与redisAsyncContext.errstr
,当连接或者对于redisAsyncContext
对象数据的操作出现错误时,这两个字段存储了对应的错误信息。而实际上这两个字段是从redisAsyncContext.c
这个redisContext
类型数据中的redisContext.err
以及redisContext.errstr
字段之中复制过来的。redisAsyncContext.ev
,这个字段是redisAsyncContext
对象与事件驱动库交互的接口,需要Hiredis的用户按照声明的函数原型自行定义相关接口函数的实现,并赋值给redisAsyncContext.ev
这个字段。redisAsyncContext.onDisConnect
,redisAsyncContext
对象在连接断开时的回调函数。redisAsyncContext.onConnect
,由于redisAyncContext
对象建立连接时也是采用异步的方式建立的,而这个字段存储的便是连接被真正建立起来时,需要调用的回调函数。redisAsyncContext.replies
,这里以单链表的形式存储着查询命令返回数据的回调函数,由于Redis服务器在处理核心数据的时候,采用的是单线程的方式进行处理,因此用户执行的查询命令都是按照顺序执行的,而redisAsyncContext.replies
字段之中也是按照先进先出的顺序存储查询命令对应的回调函数的。redisAsyncContext.sub
,这里存储在订阅/发布模式下的消息回调函数。对于普通的查询命令,无论执行成功与否,都是一条查询命令对应一条返回数据的形式;而在订阅/发布模式中,在执行过订阅命令后,可能会收到多条返回数据,也就是对一个频道或者模式的订阅命令对应多个返回数据的一对多的形式。因此异步API之中采用了redisAsyncContext.sub
这个字段独立存储订阅/发布模式下的消息回调函数。
对于redisAsyncContext.onDisConnect
以及redisAsyncContext.onConnect
这两个函数指针对应的回调函数,需要调用者自行实现,不过函数的定义应该遵循下面的函数原型:
void(const redisAsyncContext *c, int status);
复制
这两个回调函数应该根据status
参数的值来判断连接是否正常建立,或者说导致连接断开的具体的原因,并作出相应的处理。
而对于前面介绍的回调函数队列redisAsyncContext.replies
,其数据结构的定义为:
typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*);
typedef struct redisCallback {
struct redisCallback *next;
redisCallbackFn *fn;
void *privdata;
} redisCallback;
typedef struct redisCallbackList {
redisCallback *head, *tail;
} redisCallbackList;
复制
这里面redisCallbackFn
为查询命令返回数据所触发的回调函数,Hiredis库的用户可以按照自己的需求为查询命令定义自己的回调函数,以处理查询命令的返回数据,回调函数应该接收一个redisAsyncContext
对象、一个redisReply
对象以及一个用户自定义的数据privdata
做为参数。
redisCallbackList
这个单链表中按照顺寻存储了查询命令的回调数据redisCallback
。redisCallback
对象包含了对应的回调函数,以及用户自定义的数据privdata
。
异步API接口与事件驱动库的绑定
Hiredis异步API的高效之处就在,用户通过异步API执行查询命令之后,不需要原地阻塞地等待服务器的返回数据。用户在执行完查询命令之后,可以继续去处理其他的业务逻辑,在查询命令的返回数据就绪之后,Hiredis在通知用户读取数据,通过回调函数来处理返回数据。而这种通知机制就是整个异步API的核心,事件驱动库主要的工作便是执行上述的通知机制。在前面的文章之中我们介绍了Redis服务器所使用的自己实现的事件驱动库,相信各位读者对事件驱动库的核心概念已经有了一个了解。
除了Redis服务器自身实现的事件驱动库之外,业内比较流行开源的事件驱动库包括Libevent、Libev等等,同时也包括了各个公司内部自己实现的事件驱动库。作为Hiredis库的使用者,如果你想在你的项目之中集成Hiredis的异步API,那么你需要将Hiredis与自己项目之中事件驱动库进行绑定。本小节会就如何与事件驱动库进行绑定进行介绍。
为了实现与事件驱动库的绑定,Hiredis库的用户首先需要定义一个事件对象类型,用于在事件驱动库之中表示这个redisAsyncContext
对象。我们来看一下,针对Redis自己的事件驱动库以及Libevent这个事件驱动库来说,这个事件对象类型的定义:
typedef struct redisAeEvents {
redisAsyncContext *context;
aeEventLoop *loop;
int fd;
int reading, writing;
} redisAeEvents;
typedef struct redisLibeventEvents {
redisAsyncContext *context;
struct event *rev, *wer;
} redisLibeventEvents;
复制
这里我们可以发现,Hiredis异步API所需要的事件驱动类型的定义中,至少应该包含有其对应的redisAsyncContext
对象的信息;区分输入或者输出的标记,以及对应的事件驱动库对象的数据,例如redisAeEvents.loop
以及redisLibeventEvents.rev
和redisLibeventEvents.wer
(由于在Libevent库之中,struct event
本身就携带着事件驱动库对象struct event_base
的信息,因此在结构体之中包含了struct event
数据也就相当于包含了struct event_base
这个事件驱动库的数据)。
在Hiredis异步API之中已经定义了两个函数接口作为可读事件以及可写事件的回调处理函数:
void redisAsyncHandleRead(redisAsyncContext *ac);
void redisAsyncHandleWrite(redisAsyncContext *ac);
复制
对于这两个函数接口,我们需要根据自己绑定的事件驱动库所对应的Events
上进行一个简单的封装,以Libevents对应可读事件的处理函数为例:
static void redisLibeventReadEvents(int fd, short event. void *arg)
{
redisLibeventEvents *e = (redisLibeventEvents)arg;
redisAsyncHandleRead(e->context);
}
复制
也就是说,实际上注册到事件驱动库之中的可读事件处理函数为redisLibeventReadEvents
,而这个函数最终是通过调用redisAsyncHandleRead
来完成对可读事件的处理的。
这样一来,我们可以自定义一个函数接口,将事件驱动库以及redisAsyncContext
对象绑定起来,还是以Libevent库为例:
static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base)
{
redisContext *c = &(ac->c);
redisLibeventEvents *e;
if (ac->ev.data != NULL)
return REDIS_ERR;
e = (redisLibeventEvents*)malloc(sizeof(*e));
e->context = ac;
...
ac->ev.data = e;
e->rev = event_new(base, c->fd, EV_READ, redisLibeventReadEvent, e);
e->wev = event_new(base, c->fd, EV_WRITE, redisLibeventWriteEvent, e);
event_add(e->rev, NULL);
event_add(e->wev, NULL);
return REDIS_OK;
}复制
通过上面这个函数接口,我们便初始化了一个redisLibeventEvents
对象,并将其赋值给redisAsyncContext.ev.data
对象,这样就建立起了redisAsyncContext
对象与事件驱动库的绑定关系。
回头我们再来看看异步API的数据结构定义:
struct {
void *data;
void (*addRead)(void *privdata);
void (*delRead)(void *privdata);
void (*addWrite)(void *privdata);
void (*delWrite)(void *privdata);
void (*cleanup)(void *privdata);
} ev;复制
这里处理ev.data
字段用于储存我们自定义的Events
对象之外,还有五个函数指针,同样的这也需要我们根据所使用的事件驱动库来实现这些函数接口,这些函数接口的含义为:
ev.addRead
,我们需要自定义这个函数接口用于向事件驱动之中注册监听可读事件。ev.delRead
,我们需要自定义这个函数接口从事件驱动之中移除对可读事件的监听。ev.addWrite
,我们需要自定义这个函数接口用于向事件驱动之中注册监听可写事件。ev.delWrite
,我们需要自定义这个函数接口从事件驱动之中移除对可写事件的监听。ev.cleanup
,这个函数用清理redisAsyncContext
对象,会从事件驱动之中移除可读事件以及可写事件的监听。
这样,当我们向服务器发送一条查询命令之后,便可以通过调用redisAsyncContext.ev.addRead
接口注册可读事件,进而在查询数据返回时触发可读事件回调函数,来处理命令的返回数据。