1.RocketMq历史
RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用
2 Producer
RocketMq消息生产者,提供多种方式方式,如同步,异步和单向
2.1.Produce.SendStatus
我们在发送消息时会拿到一个SendResult 从它里面我们可以获取到SendStatus,假设Message的isWaitStoreMsgOK=true(默认也是true),如果不发生异常我们得到的状态总是SEND_OK,下面列举了一些其它情况
2.1.1.FLUSH_DISK_TIMEOUT
如果broker设置的消息刷盘策略为同步刷盘(FlushDiskType=SYNC_FLUSH(默认是 ASYNC_FLUSH)),Broker没有在设置的规定时间(syncFlushTimeout 默认是5秒)完成刷盘,将会得到此状态
2.1.2.FLUSH_SLAVE_TIMEOUT
如果Boker的角色是同步复制(SYNC_MASTER 默认是ASYNC_MASTER),slave没有在规定的时间内syncFlushTimeout(默认是5秒)完成复制,将会得到此状态
2.1.2.3.SLAVE_NOT_AVAILABLE
如果broker的角色是同步复制,但是没有配置slave,将会得到此状态
2.1.2.4.SEND_OK
当得到此状态时,并不意味着一定可靠,如果想一定可靠,我们的设置为同步刷盘,同步复制
2.1.2.5.Duplication or Missing
消息重复和消息丢失
如果你得到的状态是FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT,broker此时恰好又宕机了,意味着此时你的消息就丢了,此时我们有两种处理方式,一,如果认为消息真的不重要,直接丢弃,二,也是建议的方式,建议重新发送,这样可能会造成消息重新,我们的提供一种机制来保证剔除重复的消费,如果此时得到的状态是SLAVE_NOT_AVAILABLE,重发是没用的,更加建议,报警发邮件
2.2.Timeout
我们发送消息到broker,并等待响应,如果在规定的时间内(默认是3秒),还没有返回我们将得到一个RemotingTimeoutException,你可以使用send(msg,timeout)代替send(msg),通常不建议不这个值设的太小,因为broker需要一些时间来刷盘和同步slave,当然如果超过syncFlushTimeout这个值很多,基本上这个值的作用就很小了,因为会在这个时间期间内就返回FLUSH_SLAVE_TIMEOUT or FLUSH_SLAVE_TIMEOUT
2.3.Message Size
通常建议不要超过512k
2.3.Async Sending
默认情况下发消息会阻塞,如果我们为了提高性能,可以采用异步发送send(msg,callback)
2.3.Producer Group
简单来说,就是多个生产者实例,通常下,它们之间是互相不影响的,但是如果你开启了一个事物,默认情况下,你只能在相同的JVM中创建一个具有相同生产者组的生产者,这通常就足够了。
如果在某个事物中,原始生产者crash掉了,rocketMq此时会通过broker联系其它的生产者来提交或者回滚事物
2.3.Thread Safety
生产者是线程安全的,可以放心使用
2.4.big data
如果你想发送大量数据,通过建议
使用生产者异步发送(3-5个就足够了)
为每个生产者设置实例名称
3 Consumner
3.1.PullConsumer
简单来说,就是主动从broker拉取消息
3.2.PushConsumer
封装了消息提取、消耗进度和维护内部的其他工作,将一个回调接口留给最终用户来实现,该接口将在消息到达时执行
3.3.Consumer Group
消费者组在消息消费方面实现了负载均衡,集群容错
在同一个消费者中,请保持订阅关系一致【可以先简单理解为,消费者的订阅关系是已group为单位,存在map中,如果订阅关系不一致,后面的启动消费者,会覆盖前面的】
3.4.MessageListener(消息监听器)
3.4.1.Orderly(顺序消费)
消费者会锁住消费队列(MessageQueue),来确保是one by one,这会损失性能,但是在那种如果比较关注顺序消费的场景,将非常有用
如果消费失败,我们不建议抛异常,可以用ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT来代替
3.4.2.Concurrently(并发消费)
消费者并发消费消息,通常性能较高,如果消费失败,不建议抛异常,可以用ConsumeConcurrentlyStatus.RECONSUME_LATER代替
3.4.3.Consumer Status
在并发消费时,你可以返回RECONSUME_LATER 来告诉消费者,你消费这条消费失败了,想重新消费,接下来你会继续消费其它消息
在顺序消费时,因为你比较注重消费的顺序,你不希望跳过这条消息,此时你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT。来告诉消费者等一会
3.4.4.Blocking
不建议使用阻塞监听器。因为它会阻塞线程池,最终可能会导致消费进程的停止。
3.5.Thread Number
消费者使用ThreadPoolExecutor来处理内部消费,因此您可以通过setConsumeThreadMin或者setConsumeThreadMax来改变最大消费线程数和最大消费线程数
3.5.ConsumeFromWhere(从哪里消费)
当一个新的消费组建立时,它将需要决定是否需要消费已经存在与代理中的历史消息。
CONSUME_FROM_LAST_OFFSET: 这个配置将会忽略历史消息,并消费之后产生的任何内容。CONSUME_FROM_FIRST_OFFSET: 这个配置将会消费代理中存在的所有消息。
CONSUME_FROM_TIMESTAMP: 这个配置将消费指定时间戳之后再生成消息。
3.6.Duplication(消费重复)
许多情况可能导致重复,例如:
生产者重复发送(例如: 在FLUSH_SLAVE_TIMEOUT的情况下)。消费者关闭,某些补偿机制未及时更新到代理。
因此,如果应用程序不能容忍重复,你可能需要做一些外部工作来处理这一问题,例如: 通过检查数据库的主键。
4 Broker
Broker的角色有ASYNC_MASTER, SYNC_MASTER or SLAVE.
如果你不能容忍消息丢失,你可以部署为SYNC_MASTER,并添加一个SLAVE,
如果能够容忍消息丢失,但是需要保证高可用,你可以使用SLAVE部署ASYNC_MASTER
如果你只想一个很简单的功能,你只需要使用ASYNC_MASTER,不带有SLAVE
5 NameServer
提供路由信息,让消费者和生产者找到对于的broker列表
管理两部分数据:
broker定时更新到每一个nameSever
为producers, consumers 和 command line提供最新的路由信息
因为在启动broker和client之前,我们需要告诉怎么获取nameSever的地址列表,在rocketMq中提供了4种方式
5.1.Programmatic Way(程序化的方法)
如这样
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");
如果你是使用的命令行可以通过这个方式
sh mqadmin command-name -n name-server-ip1:port;name-server-ip2:port -X OTHER-OPTION
如果你已经集成了管理工具,你可以使用
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt("please_rename_unique_group_name");
defaultMQAdminExt.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");
5.2.Java Options(java配置项)
可以在启动之前使用环境变量rocketmq.namesrv.addr
5.3.Environment Variable(环境变量)
您可以导出NAMESRV_ADDR环境变量。如果设置了它的值,Broker和客户端将会使用它
5.4.HTTP Endpoint(http端口)
如果没有使用前面提到的方法指定名称服务器地址列表,Apache RocketMQ将访问以下HTTP端点,以每两分钟获取和更新名称服务器地址列表,初始延迟为10秒
默认情况下:
http://jmenv.tbsite.net:8080/rocketmq/nsaddr
你可以使用java命令行替换jmenv.tbsite.net,如rocketmq.namesrv.domain你也可以覆盖nsaddr用的java命令行是:rocketmq.namesrv.domain.subgroup
如果你在生产环境中运行Apache RocketMQ,建议使用此方法,因为它提供了最大的灵活性——您可以动态添加或删除名称服务器节点,而不必根据名称服务器的系统负载重新启动代理和客户机。
5.5.几种获取nameServer地址的优先级为
Programmatic Way > Java Options > Environment Variable > HTTP Endpoint
6.rocketMq建议的jvm参数
-server -Xms8g -Xmx8g -Xmn4g
-XX:+AlwaysPreTouch
-XX:-UseBiasedLocking
-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m
-Xloggc:/dev/shm/mq_gc_%p.log
6 结构图





