暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

四、RocketMQ的使用实践

241

四、使用实践

4.1 NameServer地址配置

有四种配置方式,优先级从高到低:

  1. 代码硬编码,多个namesrv地址之间用分号分割

    producer.setNamesrvAddr("192.168.10.104:9876;192.168.10.105:9876"); consumer.setNamesrvAddr("192.168.10.104:9876;192.168.10.105:9876");
    复制
  2. JVM参数指定Name Server

    -Drocketmq.namesrv.addr=192.168.10.104:9876;192.168.10.105:9876
    复制
  3. 环境变量指定Name Server

    export NAMESRV_ADDR=192.168.10.104:9876;192.168.10.105:9876
    复制
  4. HTTP静态服务器寻址(默认),但是要自己发布这个接口。

    客户端启动后,会定时访问一个静态HTTP服务器,地址如下:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,这个URL的返回内容如下:

    192.168.10.104:9876;192.168.10.105:9876
    复制

    客户端默认每隔2分钟访问一次这个HTTP服务器,并更新本地的Name Server地址。URL已经在代码中硬编码,可通过修改/etc/hosts文件来改变要访问的服务器,例如在/etc/hosts增加如下配置:

    ${自己暴露的接口ip} jmenv.taobao.net
    复制

4.2 消费者

4.2.1 消费过程幂等

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)

4.2.2 消费速度慢的处理方式

增加并行

绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:

  • 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。
  • 提高单个 Consumer 的消费并行线程,starter包默认20的并行度。

批量方式消费

某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。starter包这个操作无法操作,只能单条消费。

4.3 Broker

4.3.1 Broker 角色

Broker 角色分为 ASYNC_MASTER(异步主机)、SYNC_MASTER(同步主机)以及SLAVE(从机)。

如果对消息的可靠性要求比较严格,可以采用 SYNC_MASTER加SLAVE的部署方式。如果对消息可靠性要求不高,可以采用ASYNC_MASTER加SLAVE的部署方式。如果只是测试方便,则可以选择仅ASYNC_MASTER或仅SYNC_MASTER的部署方式。

4.3.2 FlushDiskType

SYNC_FLUSH(同步刷新)相比于ASYNC_FLUSH(异步处理)会损失很多性能,但是也更可靠,所以需要根据实际的业务场景做好权衡。

4.3.3 Broker 配置

参数名 默认值 说明
listenPort 10911 接受客户端连接的监听端口
namesrvAddr null nameServer 地址
brokerIP1 网卡的 InetAddress 当前 broker 监听的 IP
brokerIP2 跟 brokerIP1 一样 存在主从 broker 时,如果在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进行同步
brokerName null broker 的名称
brokerClusterName DefaultCluster 本 broker 所属的 Cluser 名称
brokerId 0 broker id, 0 表示 master, 其他的正整数表示 slave
storePathCommitLog $HOME/store/commitlog/ 存储 commit log 的路径
storePathConsumerQueue $HOME/store/consumequeue/ 存储 consume queue 的路径
mapedFileSizeCommitLog 1024 1024 1024(1G) commit log 的映射文件大小
deleteWhen 04 在每天的什么时间删除已经超过文件保留时间的 commit log
fileReserverdTime 72 以小时计算的文件保留时间
brokerRole ASYNC_MASTER SYNC_MASTER/ASYNC_MASTER/SLAVE
flushDiskType ASYNC_FLUSH SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在生产者收到确认之前将消息刷盘。ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。
最后修改时间:2023-05-16 11:12:42
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论