暂无图片
暂无图片
暂无图片
暂无图片
1
暂无图片

oup:springBoot+websocket/Netty+redis/rabbitMq消息推送

loulou分享 2020-11-16
1558

       oup解决方案中,针对互动方式分为客户主动互动,引导客户互动二项方式,目的都是增强平台与客户的交互能力。营销人员为了主动营销,需要主动用优质的服务、内容与客户/潜客建立第一连接关系。对于客户主动互动,客户触发、客户端周期性触发,依托一次性会话、长连接等模式,该模式已较为熟悉。对于引导客户互动方式,需要采集客户信息、初判,然后推送消息给客户。

      另外还有一个场景是实时聊天,实时聊天体现消息流动较全的场景,营销推送的核心是数据分析+信息推送。数据分析暂不展开,消息流动仅仅单向。此文章以点对点聊天为主要场景,说明消息互动的内容。

一、选型

    消息通信上选择websocket,在springboot的websocket上,接入方式直接通过消息类型分离,类http注解方式,url支持参数传递,管理方便。对于高性能通信来说,netty中的零拷贝、多路复用一直是其高性能的核心处理能力,核心线程与工作线程二级处理模式,更增大了其通信处理能力。最终选型如下:

   websocket:springboot-websocket + netty

      消息队列:redis+rabbit

   分布式消息:redis集群


        Ps: 设计结构:分为应用层、公共推送层,目前公共推送层以模块为单位集成在oup-console中,包含了websocket启动配置、接入配置、ws消息转发类。主要涉及redis、rabbitmq、mysql日志表,后期会剥离独立的消息网关。

      分布式环境下,消息接受涉及监听,oup中主要是redis队列监听、rabbitmq队列监听属于redis、rabbit的基本配置,作为监听消息类型之一,列入的redis、rabbitmq模块下,并不在ws模块下,ws模块主要目的是前端通信、接受转发、接受消息ws推送。消息转发后的监听服务属于队列范畴。

     在建立消息队列时,队列名称以固定名称、节点(数据中心+主机+应用)、客户、场景为单位选择,考虑到不断建立通道的维护开销,最终选择二级模式,节点为一级、启动时自动上线,用户登录自动绑定节点上线,场景为二线,节点收到后,马上按场景端处理。


二、模式

     模式框架其实不是刚开始就清晰的,起步就一个设想,追求功能稳定、高复用、易扩展、上手快,应用/底层分离、支持分布式、自发现等原则,最终慢慢细化。

三、建立过程

       在上面已知的设计原则下,优先完成功能,优先完成原生的websocket接入,按boot的逻辑,优先装载启动参数类WebSocketConfig。形成接入springboot的websocket、扩展mq、扩展netty、优化结构四个阶段。具体如下:

接入springboot的websocket(redis队列)

  1. Pom文件引入websocket库。

  2. 构建WebSocketConfig配置类,内含ServerEndpointExporter类,生效ServerEndpoint。

  3. 形成WebSocketServer通信类,内含@ServerEndpoint注解,指定连接地址。包含onOpen/Onclose/OnError/OnMessage、ws过程对应的缓存操作、ws集群推送。Websocket session本地化,无法序列化传输,需通过接入标识进行本地化存储,并记入高可用高速存储redis集群,实现登录绑定(接入id-->节点id),推送时识别推送范围(标签/属性-接入id范围),发送时按节点发送(接收id-->节点id),监听到后推送消息(节点id->接入id->会话)。

    Ps:消息通道以节点为单位,应用启动时即可装入,自动上线。

  4. 转发类MessageProducer,转发消息到实时队列(redis\rabbit),此类为路由类,由该类接入启动配置参数,实现通信消息队列自动转换,也可以通过报错容灾实现通道的自动切换。对应用层无感知容错切换。

  5. 前端:接受到消息后,先转入WebSocketServer进行集群ws定位转发,然后前端进行ws接受消息。对于不同种类的消息需要在WebSocketServer进行通用处理,然后转发到前端在publis.js公用逻辑下进行场景定位,展示。

扩展MQ:

  1. 建议大家搭建docker容器,缩短学习过程,接入rabbitmq。

  2. pom文件引入,因前期引入spring-cloud-starter-bus-amqp包,故直接复用即可。

  3. 构建配置类RabbitConfig类,关键建立默认节点队列,此处由于消息队列没有分离出来,故配置没有以独立消息网关设置,此处Direct模式。


  4. 构建消费类RabbitMQProducer、RabbitMQConsumer,其中生产端无需扩展,消费端RabbitMQConsumer需要根据监听的routerkey进行扩展。

  5. 测试验证,在WebSocketServer收到消息后,切换消息生产者及队列,切换到mq进行验证。切换逻辑如下:

扩展netty

  1. pom文件引入,引入netty-all包。

  2. 按boot约定,构建启动配置类,此处需要注意的是参数配置,以及通过handler先后顺序,支持ws地址参数加载的处理方式。端口放在启动端,支持环境变量,handler支持自定义,支持未来扩展切面逻辑。


  3. 构建自定义消息处理类MyWebScoketHandler,进行消息处理,此处需要与boot的websocket的消息事件类型不一致,启动参数中传入接入id,需要在消息channelRead时捕捉首个FullHttpRequest请求,同时根据源代码,需要跳过channelRead0抽象方法,并在channelRead处理连接建立初始化、消息推送双逻辑。

  4. 对于在自定义Handler中,在连接断开时,按官网及其他建议模式,会造成redis集群中残留对应关系,需要清除,否则会造成其他系统消息分发时,分发到不存在的接入店。

优化结构

  1. 按前端、websocket、消息产生、接入队列、监听、消息消费、推送分点进行代码重构,屏蔽websocket、mq生产/消费、session的切换,实现应用上手简单,公共转换协议通道、分布式存储。

  2. 场景分离,支持实时聊天、配置分发、远程调度、消息推送等场景。

四、提升

     在完整实现分布式可用的前提下,优先扩展以下功能:

  1. 支持移动端的消息推送。

  2. 支持网页展示、C端升级,提示等场景。

  3. 调优,从操作系统、中间件参数、应用等分层进行调优。

五、感想

    从想到做,首先收收集关于了解基础知识、最佳实践、评估自己手里的资源,从简单化向复杂化转变,先功能、后全面、再优化提升。后续该模块会整体分离出完整的消息网关。实践和口述很大差异,勿空想,落地时随便一个问题都是致命的。从想到做,一小步的工作,一大步的差距,努力从一小步做起。




文章转载自loulou分享,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论

B
big duncan
暂无图片
2年前
评论
暂无图片 0
請問有github可以取code供練習嗎
2年前
暂无图片 点赞
评论