暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

ActiveMq 实现点对点(point to point)消息发送

追梦Java 2021-03-03
639

点对点(point to point)消息发送

消息生产者生产消息发送到 queue 中,然后消息消费者从 queue 中取出并且消费消息。消息被消费以后,queue 中的消息被删掉,一个 queue 可以有多个消费者,但是对一个消息而言,只会有一个消费者可以消费到该消息。

点对点消息发送演示过程

1、启动 ActiveMQ

打开 ActiveMQ 管理界面,点开 Queues 选项卡,没有队列,如下:

2、ActiveMQ 的依懒包

ActiveMQ 开发只需引入 activemq-all-5.15.0.jar 这一个包即可,因为它集成了所有开发需要的jar包。

    <!-- activemq 依懒包 -->
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.0</version>
    </dependency>
    复制

    3、建立消息生产者的工程,producer

    新建一个类 MessageSender 用于发送点对点消息

      package producer;


      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.Destination;
      import javax.jms.JMSException;
      import javax.jms.MessageProducer;
      import javax.jms.Session;
      import javax.jms.TextMessage;


      import org.apache.activemq.ActiveMQConnectionFactory;


      /**
      * 点对点(point to point)消息发送
      *
      * @author JPM
      */
      public class MessageSender {


      public static void main(String[] args) {
      ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
      "tcp://localhost:61616"); // 创建连接工厂
      Connection connection = null;
      try {
      connection = connectionFactory.createConnection(); // 创建连接
      connection.start(); // 启动连接


      // 创建session,两个参数分别表示:是否启动事务,消息确认模式
      Session session = connection.createSession(Boolean.TRUE,
      Session.AUTO_ACKNOWLEDGE);
      // 创建消息的目的地,createQueue表示创建的是队列消息
      Destination destination = session.createQueue("queue_01");
      // 创建消息生产者
      MessageProducer producer = session.createProducer(destination);
      // 创建需要发送的消息
      TextMessage textMessage = session
      .createTextMessage("hello,queue_01!");
      // 发送消息
      producer.send(textMessage);
      // 开启事务的时候,消息发送必须使用commit提交
      session.commit();
      session.close();
      } catch (JMSException e) {
      e.printStackTrace();
      } finally {
      if (connection != null) {
      try {
      connection.close();
      } catch (JMSException e) {
      e.printStackTrace();
      }
      }
      }
      }
      }
      复制

      4、运行消息生产者类,查看 ActiveMQ 管理界面

      运行消息生产者类 MessageSender,向队列 “queue_01” 发送一条消息 “hello,queue_01!” ,打开 ActiveMQ 管理界面,如下:

      此时说明消息生产者向队列写入消息成功。

      5、建立消息消费者的工程,consumer

      新建一个类 MessageReceiver 用于接收点对点消息

        package consumer;


        import javax.jms.Connection;
        import javax.jms.ConnectionFactory;
        import javax.jms.Destination;
        import javax.jms.JMSException;
        import javax.jms.MessageConsumer;
        import javax.jms.Session;
        import javax.jms.TextMessage;


        import org.apache.activemq.ActiveMQConnectionFactory;


        /**
        * 点对点(point to point)消息接收
        *
        * @author JPM
        */
        public class MessageReceiver {


        public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
        "tcp://localhost:61616");
        Connection connection = null;
        try {
        connection = connectionFactory.createConnection();
        connection.start();
        // 表示消息由客户端自动确认
        Session session = connection.createSession(Boolean.FALSE,
        Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("queue_01");
        // 创建消息消费者
        MessageConsumer consumer = session.createConsumer(destination);
        TextMessage textMessage = (TextMessage) consumer.receive(); // 接收消息
        System.out.println("MessageReceiver--->" + textMessage.getText());
        session.close();
        } catch (JMSException e) {
        e.printStackTrace();
        } finally {
        if (connection != null) {
        try {
        connection.close();
        } catch (JMSException e) {
        e.printStackTrace();
        }
        }
        }
        }
        }
        复制

        6、运行消息消费者类,查看 ActiveMQ 管理界面

        运行消息生产者类 MessageReceiver,从队列 “queue_01” 后去一条消息,并输出到控制台 ,控制台显示:

          MessageReceiver--->hello,queue_01!
          复制

          打开 ActiveMQ 管理界面,如下:

          至此说明队列里的消息已经被消费掉。


          **7、消息消费者监听消息的关键代码**

            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(destination);
            // 监听消息
            consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message msg) {
            TextMessage textMessage = (TextMessage)msg;
            try {
            System.out.println("MessageReceiver--->" + textMessage.getText());
            } catch (JMSException e) {
            e.printStackTrace();
            }
            }
            });
            复制


            文章转载自追梦Java,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论