linux下使用hiredis异步API实现subpub消息订阅和发布的功能.doc_第1页
linux下使用hiredis异步API实现subpub消息订阅和发布的功能.doc_第2页
linux下使用hiredis异步API实现subpub消息订阅和发布的功能.doc_第3页
linux下使用hiredis异步API实现subpub消息订阅和发布的功能.doc_第4页
linux下使用hiredis异步API实现subpub消息订阅和发布的功能.doc_第5页
已阅读5页,还剩13页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、linux 下使用 hiredis 异步 API 实现 subpub 消息订阅和发布的功能最近使用 redis 的 c 接口 hiredis,使客户端与 redis 服务器通信,实现消息订阅和发布( PUB/SUB )的功能,我把遇到的一些问题和解决方法列出来供大家学习。redis_publisher.hcpp view plain copy/*> File Name:redis_publisher.h> Created Time: Sat 23 Apr 201610:15:09 PM CST> Description: 封装 hiredis,实现消息发布给 redis 功能

2、*/#ifndefREDIS_PUBLISHER_H#define REDIS_PUBLISHER_H#include#include#include#include#include#include#include#include#includeclassCRedisPublisher public:CRedisPublisher();CRedisPublisher();bool init();bool uninit();bool connect();bool disconnect();boolpublish(const std:string &channel_name,const s

3、td:string &message);private:/ 下面三个回调函数供redis 服务调用/连接回调static void connect_callback(const redisAsyncContext*redis_context,int status);/断开连接的回调static void disconnect_callback(constredisAsyncContext *redis_context,int status);/ 执行命令回调static voidcommand_callback(redisAsyncContext *redis_context,void

4、 *reply, void *privdata);/ 事件分发线程函数static void *event_thread(void *data);void *event_proc();private:/ libevent 事件对象event_base*_event_base;/ 事件线程 IDpthread_t_event_thread;/ 事件线程的信号量sem_t_event_sem;/ hiredis 异步对象redisAsyncContext *_redis_context;#endifredis_publisher.cppcpp view plain copy/*> File

5、Name:redis_publisher.cpp> Created Time: Sat 23 Apr 201610:15:09 PM CST> Description:*/#include#include#include#include redis_publisher.hCRedisPublisher:CRedisPublisher():_event_base(0),_event_thread(0), _redis_context(0) CRedisPublisher:CRedisPublisher()boolCRedisPublisher:init()/ initialize t

6、he event_event_base = event_base_new();/ 创建 libevent 对象if (NULL = _event_base)printf(: Createredis event failed.n);return false;memset(&_event_sem, 0, sizeof(_event_sem);intret = sem_init(&_event_sem, 0, 0);if (ret != 0)printf(: Init sem failed.n);returnfalse;return true;boolCRedisPublisher:

7、uninit() _event_base = NULL;sem_destroy(&_event_sem);return true;bool CRedisPublisher:connect()/ connect redis_redis_context = redisAsyncConnect(, 6379);/异步连接到 redis 服务器上,使用默认端口if (NULL =_redis_context)printf(: Connect redisfailed.n);return false;if(_redis_context->err)printf(: Conne

8、ctredis error: %d, %sn,_redis_context->err,_redis_context->errstr);/ 输出错误信息return false;/ attach the eventredisLibeventAttach(_redis_context, _event_base);/ 将事件绑定到 redis context 上,使设置给redis 的回调跟事件关联/ 创建事件处理线程int ret =pthread_create(&_event_thread, 0,&CRedisPublisher:event_thread, this)

9、;printf(: create event thread failed.n);disconnect();return false;if (ret != 0)/ 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态redisAsyncSetConnectCallback(_redis_context,&CRedisPublisher:connect_callback);/ 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连redisAsyncSetDisconnectCallback(_redis_context,&a

10、mp;CRedisPublisher:disconnect_callback);/ 启动事件线程sem_post(&_event_sem);returntrue;bool CRedisPublisher:disconnect()if(_redis_context) redisAsyncDisconnect(_redis_context); redisAsyncFree(_redis_context); _redis_context = NULL; return true; boolCRedisPublisher:publish(const std:string &channel

11、_name,const std:string &message)int ret =redisAsyncCommand(_redis_context,&CRedisPublisher:command_callback, this,PUBLISH %s %s,channel_name.c_str(),message.c_str();if (REDIS_ERR = ret)printf(Publish command failed: %dn, ret);return false;return true;voidCRedisPublisher:connect_callback(cons

12、t redisAsyncContext*redis_context,int status)if (status !=REDIS_OK)printf(: Error: %sn,redis_context->errstr);elseprintf(: Redis connected!n);void CRedisPublisher:disconnect_callback(constredisAsyncContext *redis_context, int status)if(status != REDIS_OK)/这里异常退出,可以尝试重连printf(: Error: %sn,redis_co

13、ntext->errstr);/消息接收回调函数voidCRedisPublisher:command_callback(redisAsyncContext*redis_context,void *reply, void *privdata)printf(command callback.n);/ 这里不执行任何操作 void *CRedisPublisher:event_thread(void*data)if (NULL = data)printf(:Error!n);assert(false);returnNULL;CRedisPublisher *self_this =reinte

14、rpret_cast(data);returnself_this->event_proc();void*CRedisPublisher:event_proc()sem_wait(&_event_sem);/开启事件分发, event_base_dispatch 会阻塞event_base_dispatch(_event_base);return NULL; redis_subscriber.hcpp view plain copy/*> File Name:redis_subscriber.h> Created Time: Sat 23 Apr 201610:15:0

15、9 PM CST> Description:封装 hiredis,实现消息订阅 redis 功能*/#ifndefREDIS_SUBSCRIBER_H#define REDIS_SUBSCRIBER_H#include#include#include#include#include#include#include#include#includeclassCRedisSubscriber public:typedef std:tr1:functionNotifyMessageFn;/ 回调函数对象类型,当接收到消息后调用回调把消息发送出去CRedisSubscriber();CRedisS

16、ubscriber();bool init(const NotifyMessageFn &fn);/ 传入回调对象bool uninit();bool connect();booldisconnect();/ 可以多次调用,订阅多个频道bool subscribe(const std:string &channel_name);private:/ 下面三个回调函数供redis 服务调用/连接回调static void connect_callback(constredisAsyncContext *redis_context,int status);/ 断开连接的回调stati

17、c voiddisconnect_callback(const redisAsyncContext *redis_context,int status);/ 执行命令回调static voidcommand_callback(redisAsyncContext *redis_context,void *reply, void *privdata);/ 事件分发线程函数static void *event_thread(void *data);void *event_proc();private:/ libevent 事件对象event_base*_event_base;/ 事件线程 IDpth

18、read_t_event_thread;/ 事件线程的信号量sem_t_event_sem;/ hiredis 异步对象redisAsyncContext *_redis_context;/通知外层的回调函数对象NotifyMessageFn_notify_message_fn;#endifredis_subscriber.cpp:/*> File Name:redis_subscriber.cpp> Created Time: Sat 23 Apr 201610:15:09 PM CST> Description:*/#include#include#include#inc

19、lude redis_subscriber.hCRedisSubscriber:CRedisSubscriber():_event_base(0),_event_thread(0), _redis_context(0)CRedisSubscriber:CRedisSubscriber()boolCRedisSubscriber:init(const NotifyMessageFn &fn)/ initialize the event_notify_message_fn = fn;_event_base = event_base_new();/创建 libevent 对象if (NULL

20、 = _event_base)printf(: Createredis event failed.n);return false;memset(&_event_sem, 0, sizeof(_event_sem);intret = sem_init(&_event_sem, 0, 0);if (ret != 0)printf(: Init sem failed.n);returnfalse;return true;boolCRedisSubscriber:uninit()_event_base = NULL;sem_destroy(&_event_sem);return

21、 true;bool CRedisSubscriber:connect()/ connect redis_redis_context = redisAsyncConnect(, 6379);/异步连接到 redis 服务器上,使用默认端口if (NULL =_redis_context)printf(: Connect redisfailed.n);return false;if(_redis_context->err)printf(: Connectredis error: %d, %sn,_redis_context->err,_redis_context-&

22、gt;errstr);/ 输出错误信息return false;/ attach the eventredisLibeventAttach(_redis_context, _event_base);/将事件绑定到 redis context 上,使设置给redis 的回调跟事件关联/ 创建事件处理线程int ret =pthread_create(&_event_thread, 0,&CRedisSubscriber:event_thread, this);printf(: create event thread failed.n);disconnect();return fa

23、lse;if (ret != 0)/ 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态redisAsyncSetConnectCallback(_redis_context,&CRedisSubscriber:connect_callback);/ 设置断开连接回调, 当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连redisAsyncSetDisconnectCallback(_redis_context,&CRedisSubscriber:disconnect_callback);/ 启动事件线程sem_post(&a

24、mp;_event_sem);returntrue; bool CRedisSubscriber:disconnect() if(_redis_context)redisAsyncDisconnect(_redis_context);redisAsyncFree(_redis_context);_redis_context =NULL;return true; bool CRedisSubscriber:subscribe(const std:string&channel_name)int ret =redisAsyncCommand(_redis_context,&CRedi

25、sSubscriber:command_callback, this,SUBSCRIBE %s,channel_name.c_str();if (REDIS_ERR = ret)printf(Subscribecommand failed: %dn, ret);return false;printf(: Subscribe success: %sn, channel_name.c_str();return true;voidCRedisSubscriber:connect_callback(const redisAsyncContext*redis_context,int status)if

26、(status !=REDIS_OK)printf(: Error: %sn,redis_context->errstr);elseprintf(: Redis connected!);voidCRedisSubscriber:disconnect_callback(constredisAsyncContext *redis_context, int status)if(status != REDIS_OK)/这里异常退出,可以尝试重连printf(: Error: %sn,redis_context->errstr);/ 消息接收回调函数 voidCRedisSubscriber

27、:command_callback(redisAsyncContext*redis_context,void *reply, void *privdata)if(NULL = reply | NULL = privdata)return ;/静态函数中,要使用类的成员变量,把当前的this 指针传进来,用this 指针间接访问CRedisSubscriber *self_this =reinterpret_cast(privdata);redisReply *redis_reply =reinterpret_cast(reply);/ 订阅接收到的消息是一个带三元素的数组if (redis_r

28、eply->type =REDIS_REPLY_ARRAY &&redis_reply->elements = 3)printf(:Recieve message:%s:%d:%s:%d:%s:%dn,redis_reply->element0->str,redis_reply->element0->len,redis_reply->element1->str,redis_reply->element1->len,redis_reply->element2->str,redis_reply->elem

29、ent2->len);/调用函数对象把消息通知给外层self_this->_notify_message_fn(redis_reply->element1->str,redis_reply->element2->str,redis_reply->element2->len);void*CRedisSubscriber:event_thread(void *data)if(NULL = data)printf(: Error!n);assert(false);return NULL;CRedisSubscriber *self_this = rei

30、nterpret_cast(data);return self_this->event_proc();void*CRedisSubscriber:event_proc()sem_wait(&_event_sem);/ 开启事件分发, event_base_dispatch 会阻塞event_base_dispatch(_event_base);return NULL; 问题 1: hiredis 官网没有异步接口的实现例子。hiredis 提供了几个异步通信的API ,一开始根据API名字的理解, 我们实现了跟redis 服务器建立连接、 订阅和发布的功能,可在实际使用的时候,程

31、序并没有像我们预想的那样,除了能够建立连接外,任何事情都没发生。网上查了很多资料,原来 hiredis 的异步实现是通过事件来分发redis 发送过来的消息的,hiredis 可以使用 libae 、libev 、libuv 和 libevent 中的任何一个实现事件的分发,网上的资料提示使用libae、 libev 和 libuv 可能发生其他问题,这里为了方便就选用libevent 。 hireds 官网并没有对libevent 做任何介绍,也没用说明使用异步机制需要引入事件的接口,所以一开始走了很多弯路。关于 libevent 的使用这里就不再赘述,详情可以见libevent 官网。li

32、bevent 官网: /libevent api 文档:/provos/libevent/doxygen-2.0.1/include_2event2_2event_8h.html#6e9827de8c3014417b11b48f2fe688aeCRedisPublisher 和 CRedisSubscriber 的初始化过程:初始化事件处理,并获得事件处理的实例:cpp view plain copy _event_base = event_base_new();在获得 redisAsyncContext * 之后,

33、调用cpp view plain copy redisLibeventAttach(_redis_context,_event_base);这样就将事件处理和redis 关联起来,最后在另一个线程调用cpp view plain copy event_base_dispatch(_event_base);启动事件的分发,这是一个阻塞函数,因此,创建了一个新的线程处理事件分发, 值得注意的是, 这里用信号灯 _event_sem控制线程的启动,意在程序调用cpp view plain copyredisAsyncSetConnectCallback(_redis_context,&CRe

34、disSubscriber:connect_callback);redisAsyncSetDisconnectCallback(_redis_context,&CRedisSubscriber:disconnect_callback);之后,能够完全捕捉到这两个回调。问题2 奇特的 ERR only(P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in thiscontext错误有些人会觉得这两个类设计有点冗余,我们发现CRedisPublisher 和 CRedisSubscriber 很多逻辑是一样的,为什么不把他们整合到一起成一个类,既能够

35、发布消息也能够订阅消息。 其实一开始我就是这么干的,在使用的时候发现,用同个 redisAsynContex * 对象进行消息订阅和发布,与 redis服务连接会自动断开,disconnect_callback 回调会被调用, 并且返回奇怪的错误:ERRonly (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed inthis context ,因此,不能使用同个redisAsyncContext * 对象实现发布和订阅。这里为了减少设计的复杂性,就将两个类的逻辑分开了。当然,你也可以将相同的逻辑抽象到一个基类里,并实现 publish 和 subscri

36、be 接口。问题3 相关依赖的库编译之前,需要安装hiredis 、 libevent 和 boost 库,我是用的是Ubuntu x64 系统。hiredis 官网: 下载源码解压, 进入解压目录, 执行 make && makeinstall 命令。libevent 官网: / 下载最新的稳定版解压后进入解压目录,执行命令./configure -prefix=/usrsudo make && make installboost 库:直接执行安装:sudo apt-get install libboost-dev如果你不是

37、用std:tr1:function 的函数对象来给外层通知消息,就不需要boost 库。你可以用接口的形式实现回调,把接口传给 CRedisSubscribe 类,让它在接收到消息后调用接口回调,通知外层。问题4 如何使用最后贴出例子代码。publisher.cpp,实现发布消息:cpp view plain copy/*> File Name: publisher.cpp> Author: chenzengba> Mail:chenzengba> Created Time: Sat 23 Apr2016 12:13:24 PM CST*/#include redis_

38、publisher.hint main(int argc, char *argv)CRedisPublisherpublisher;bool ret = publisher.init();if (!ret)printf(Init failed.n);return 0;ret = publisher.connect();if (!ret)printf(connect failed.);return0;while (true)publisher.publish(test-channel, Test message);sleep(1);publisher.disconnect();publisher.uninit();return 0;subscriber.cpp 实现订阅消息:cpp view plain copy/*> Fi

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论