暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

ActiveMQ 的网络连接及消息回流机制

追梦Java 2024-09-09
40

1、ActiveMQ 的网络连接

activeMQ 如果要实现扩展性和高可用性的要求的话,就需要用用到网络连接模式。

NetworkConnector:主要用来配置 broker 与 broker 之间的通信连接

如上图所示,MQ 服务器1 和MQ 服务器2 通过 NewworkConnector 相连,则生产者1和生产者2发送消息,消费者3和消费者4都可以接收到,而生产者3和生产者4发送的消息,消费者1和消费者2同样也可以接收到 ,这样就增强了消息服务器的可用性。

静态网络连接

假设 MQ 服务器1 的 ip 地址是192.168.110.60, MQ 服务器2 的 ip 地址是192.168.110.61

配置说明

修改 MQ 服务器1 的 activeMQ.xml,在 broker 节点里增加如下配置:

    <networkConnectors> 
    <networkConnector uri="static://(tcp://192.168.110.60:61616,tcp://192.168.110.61:61616)"/>
    </networkConnectors>

    修改 MQ 服务器2 的 activeMQ.xml,在 broker 节点里增加如下配置:

      <networkConnectors> 
      <networkConnector uri="static://(tcp://192.168.110.61:61616,tcp://192.168.110.60:61616)"/>
      </networkConnectors>

      重启两台服务器。

      生产者连接到 MQ 服务器1(tcp://192.168.110.60:61616)发送消息,消费者连接到 MQ 服务器2(tcp://192.168.110.61:61616)可以获取消息,同时,生产者连接到 MQ 服务器2(tcp://192.168.110.61:61616)发送消息,消费者连接到 MQ 服务器1(tcp://192.168.110.60:61616)也可以获取消息。

      两个 broker 通过一个 static 的协议来进行网络连接,一个 Consumer 连接到 broker2 的一个地址上,当 Producer 在 broker1 上以相同的地址发送消息时,此时消息会被转移到Broker2上,也就是说 broker1 会转发消息到 broker2 上。

      2、消息回流机制

      上面的配置,存在一个问题,如果生产者1向 MQ 服务器1发送10条消息,消费者3从 MQ 服务器2获取了10条消息,此时,万一 MQ 服务器2宕机了,此时这10条消息就会丢失,MQ 服务器1上的消息被 MQ 服务器2拿走了,MQ 服务器1上的消费者读取不到这些消息了。

      activemq 从5.6版本开始,在 destinationPolicy 上新增了一个选项 replayWhenNoConsumers 属性设置为 true,这个属性可以用来解决当一个 broker上有需要转发的消息但是没有消费者时,把消息回流到它原始的 broker。同时把enableAudit 设置为false,为了防止消息回流后被当作重复消息而不被分发。

      在两台服务器的 activeMQ.xml中,通过如下配置,即可完成消息回流处理。

        <policyEntry queue=">" enableAudit="false"> 
        <networkBridgeFilterFactory>
        <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> </networkBridgeFilterFactory>
        </policyEntry>

        3、容错的链接

        前面讲述的都是 Client 配置连接到指定的 broker 上,但是如果 broker 的连接失败怎么办呢?默认的情况下,这种协议用于随机的去选择一个链接去连接,如果连接失败了,那么会连接到其他的broker上。默认配置定义了延迟重新连接,意味着传输将会在10s后自动去重新连接可用的broker.当然所以有的重新连接参数都可以根据应用的需要而配置的。

        在客户端程序中创建 ConnectionFactory 时使用如下代码即可做到容错的连接配置:

          //随机连接不同的broker
          ConnectionFactory factory = new ActiveMQConnectionFacory("failover:(tcp://192.168.110.60:61616,tcp://192.168.110.61:61616)?randomize=true")

          Failover 协议可用的配置参数

          4、动态网络连接

          ActiveMQ 使用 Multicast (组播)协议建立服务与远程的 broker 服务的网络链接,实现 broker 的发现机制。

          配置一个组播的ip地址,当前网络内所有的 broker 回向组播的服务端发送消息,证明自己是存活的,并且建立连接,这种动态连接的方式会一直不断地进行连接,会消耗网络资源,因此,不建议使用动态网络连接。

          文章转载自追梦Java,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论