暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

ActiveMQ实战指南:实现发布/订阅(publish-subscribe)消息发送!

追梦Java 2024-08-29
185

发布/订阅(publish-subscribe)消息发送

消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点消息发送的方式不同,发布到 topic 的消息会被所有订阅者消费。

发布/订阅的消息模型

发布/订阅消息发送演示过程

1、消息生产者

在生产者工程 producer 里新建一个类 TopicSender 用于发布消息

    package producer;


    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;


    import org.apache.activemq.ActiveMQConnectionFactory;


    /**
    * 发布/订阅(publish-subscribe)消息发送
    *
    * @author JPM
    */
    public class TopicSender {
    public static void main(String[] args) {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    "tcp://localhost:61616");
    Connection connection = null;
    try {
    connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(Boolean.TRUE,
    Session.AUTO_ACKNOWLEDGE);
    // 创建消息的目的地,createTopic表示创建的是主题消息
    Destination destination = session.createTopic("topic_01");
    MessageProducer producer = session.createProducer(destination);
    TextMessage textMessage = session
    .createTextMessage("hello,topic_01!");
    producer.send(textMessage);
    session.commit();
    session.close();
    } catch (JMSException e) {
    e.printStackTrace();
    } finally {
    if (connection != null) {
    try {
    connection.close();
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    }
    }
    }

    2、运行消息生产者类,查看 ActiveMQ 管理界面

    运行消息生产者类 TopicSender,向名称为  “topic_01”的 Topic里发送一条消息 “hello,topic_01!” ,打开 ActiveMQ 管理界面,如下:

    3、消息消费者

    在消费者工程 consumer 里新建二个消费者类 TopicReceiver1 和 TopicReceiver2 用于订阅消息,TopicReceiver2 的代码与 TopicReceiver1 的代码类似,下面给出 TopicReceiver1 的代码:

      package consumer;


      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.Destination;
      import javax.jms.JMSException;
      import javax.jms.MessageConsumer;
      import javax.jms.Session;
      import javax.jms.TextMessage;


      import org.apache.activemq.ActiveMQConnectionFactory;


      /**
      * 发布/订阅(publish-subscribe)消息接收者1
      *
      * @author JPM
      */
      public class TopicReceiver1 {
      public static void main(String[] args) {
      ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
      "tcp://localhost:61616");
      Connection connection = null;
      try {
      connection = connectionFactory.createConnection();
      connection.start();
      // 设置客户端手动签收消息
      Session session = connection.createSession(Boolean.FALSE,
      Session.CLIENT_ACKNOWLEDGE);
      // 创建消息的目的地,createTopic表示创建的是主题消息
      Destination destination = session.createTopic("topic_01");
      MessageConsumer consumer = session.createConsumer(destination);
      TextMessage textMessage = (TextMessage) consumer.receive();
      textMessage.acknowledge(); // 确认消息
      System.out.println("TopicReceiver1--->" + textMessage.getText());
      session.close();
      } catch (JMSException e) {
      e.printStackTrace();
      } finally {
      if (connection != null) {
      try {
      connection.close();
      } catch (JMSException e) {
      e.printStackTrace();
      }
      }
      }
      }
      }

      4、运行2个消费者,查看 ActiveMQ 管理界面

      注意,发布订阅模式默认需要首先运行消费者,然后开启生产者,这样才能消费者才能订阅到生产者发布的消息。

      分别启动2个消费者 TopicReceiver1 和 TopicReceiver2,管理界面如下:

      然后启动生产者 TopicSender,TopicReceiver1 和 TopicReceiver2 的控制台如下:

      TopicReceiver1--->hello,topic_01!

      TopicReceiver2--->hello,topic_01!

      eclipse切换控制台的技巧

      消息消费完成后,管理界面显示如下:

      说明:

      发布订阅的模式默认的情况下,消息的内容不存储在服务器上,当生产者发送了一个消息,如果消费者之前没有订阅,那么该消费者就获取不到这个消息。

      点对点的模式默认的情况下,消息存储在服务器上,消费者随时来取,但是只能有一个消费者获取到这个消息。

      ActiveMQ 管理界面术语解释

      文章转载自追梦Java,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论