暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spring + ActiveMQ 整合实现点对点(point to point)消息发送案例

追梦Java 2024-09-04
29

本节演示点对点模式的消息发送的 Spring + ActiveMQ 代码。

Spring + ActiveMQ 整合

1、依懒包

spring:4.2.5.RELEASE,activemq-all:5.15.0

    <dependency>
    <groupId>javax.servlet</groupId>
    <artifactId>javax.servlet-api</artifactId>
    <version>3.0.1</version>
    <scope>provided</scope>
    </dependency>


    <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.9.1</version>
    </dependency>


    <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>2.9.1</version>
    </dependency>


    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.0</version>
    </dependency>


    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>4.2.5.RELEASE</version>
    </dependency>


    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>4.2.5.RELEASE</version>
    </dependency>


    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-aop</artifactId>
    <version>4.2.5.RELEASE</version>
    </dependency>


    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-beans</artifactId>
    <version>4.2.5.RELEASE</version>
    </dependency>


    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-web</artifactId>
    <version>4.2.5.RELEASE</version>
    </dependency>


    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
    <version>4.2.5.RELEASE</version>
    </dependency>


    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-tx</artifactId>
    <version>4.2.5.RELEASE</version>
    </dependency>


    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-test</artifactId>
    <version>4.2.5.RELEASE</version>
    </dependency>


    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.0</version>
    </dependency>


    <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.4.2</version>
    </dependency>
    复制

    2、工程介绍

    • producer 生产者工程

    • consumer 消费者工程

    3、整合关键类 JmsTemplate

    Spring 官方提供了一个 JmsTemplate 的类,这个类就专门用来处理JMS的,在该类的Bean配置标签中有两个属性connectionFactory-ref 和 defaultDestination-ref 正好对应 JMS 中的 ConnectionFactory 和 Destination。

    点对点(point to point)消息发送

    1、生产者代码

    • spring 关键代码:springContext-activemq.xml

      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans.xsd">


      <!-- 创建一个ConnectionFactory,为了提升性能用了连接池 -->
      <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
      destroy-method="stop">
      <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
      <property name="brokerURL">
      <value>tcp://localhost:61616</value>
      </property>
      </bean>
      </property>
      <property name="maxConnections" value="50" >
      </bean>


      <!-- 创建消息目的地,constructor-arg是目的地的名称,此处为spring-queue -->
      <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg index="0" value="spring-queue" >
      </bean>


      <!-- 构建JmsTemplate -->
      <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
      <property name="connectionFactory" ref="connectionFactory" >
      <property name="defaultDestination" ref="destination" >
      <property name="messageConverter">
      <bean
      class="org.springframework.jms.support.converter.SimpleMessageConverter" >
      </property>
      </bean>


      </beans>
      复制
      • 生产者关键代码:SpringMessageSender

        package producer;


        import javax.jms.JMSException;
        import javax.jms.Message;
        import javax.jms.Session;
        import javax.jms.TextMessage;


        import org.springframework.context.support.ClassPathXmlApplicationContext;
        import org.springframework.jms.core.JmsTemplate;
        import org.springframework.jms.core.MessageCreator;


        /**
        * 点对点(point to point)消息发送,spring整合
        *
        * @author JPM
        */
        public class SpringMessageSender {
        public static void main(String[] args) {


        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
        "classpath:spring/springContext-activemq.xml");
        JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate");
        jmsTemplate.send(new MessageCreator() {


        public Message createMessage(Session session) throws JMSException {
        TextMessage message = session.createTextMessage();
        message.setText("hello,spring-queue!");
        return message;
        }
        });
        context.close();
        }
        }
        复制

        运行 SpringMessageSender 类,查看 ActiveMQ 管理界面

        说明消息已经发送到了 spring-queue 中。

        2、消费者代码(receive方法获取消息)

        • spring 关键代码:springContext-activemq.xml

          <?xml version="1.0" encoding="UTF-8"?>
          <beans xmlns="http://www.springframework.org/schema/beans"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://www.springframework.org/schema/beans
          http://www.springframework.org/schema/beans/spring-beans.xsd">


          <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
          destroy-method="stop">
          <property name="connectionFactory">
          <bean class="org.apache.activemq.ActiveMQConnectionFactory">
          <property name="brokerURL">
          <value>tcp://localhost:61616</value>
          </property>
          </bean>
          </property>
          <property name="maxConnections" value="50" >
          </bean>
          <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
          <constructor-arg index="0" value="spring-queue" >
          </bean>


          <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
          <property name="connectionFactory" ref="connectionFactory" >
          <property name="defaultDestination" ref="destination" >
          <property name="messageConverter">
          <bean
          class="org.springframework.jms.support.converter.SimpleMessageConverter" >
          </property>
          </bean>


          </beans>
          复制
          • 消费者关键代码:SpringMessageReceiver

            package consumer;


            import org.springframework.context.support.ClassPathXmlApplicationContext;
            import org.springframework.jms.core.JmsTemplate;


            /**
            * 点对点(point to point)消息接收,spring整合
            *
            * @author JPM
            */
            public class SpringMessageReceiver {


            public static void main(String[] args) {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
            "classpath:spring/springContext-activemq.xml");
            JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate");
            String message = (String) jmsTemplate.receiveAndConvert();
            System.out.println(message);
            context.close();
            }
            }
            复制

            运行 SpringMessageReceiver 类,查看控制台和 ActiveMQ 管理界面

            说明消费者读取到了消息,并打印到控制台显示。

            3、消费者代码(使用消息监听器获取消息)

            • 使用刚才的生产者,再次发送一条消息

            spring 关键代码:springContext-activemq.xml

              <?xml version="1.0" encoding="UTF-8"?>
              <beans xmlns="http://www.springframework.org/schema/beans"
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xsi:schemaLocation="http://www.springframework.org/schema/beans
              http://www.springframework.org/schema/beans/spring-beans.xsd">


              <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
              destroy-method="stop">
              <property name="connectionFactory">
              <bean class="org.apache.activemq.ActiveMQConnectionFactory">
              <property name="brokerURL">
              <value>tcp://localhost:61616</value>
              </property>
              </bean>
              </property>
              <property name="maxConnections" value="50" />
              </bean>
              <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
              <constructor-arg index="0" value="spring-queue" />
              </bean>


              <bean id="jmsContainer"
              class="org.springframework.jms.listener.DefaultMessageListenerContainer">
              <property name="connectionFactory" ref="connectionFactory" />
              <property name="destination" ref="destination" />
              <property name="messageListener" ref="messageListener" />
              </bean>


              <bean id="messageListener" class="consumer.SpringMessageListener" />


              </beans>
              复制

              消费者关键代码:SpringMessageListener

                package consumer;


                import java.io.IOException;


                import javax.jms.JMSException;
                import javax.jms.Message;
                import javax.jms.MessageListener;
                import javax.jms.TextMessage;


                import org.springframework.context.support.ClassPathXmlApplicationContext;


                /**
                * 点对点(point to point)消息接收,spring整合,使用Listener
                *
                * @author JPM
                */
                public class SpringMessageListener implements MessageListener {


                public void onMessage(Message message) {
                String msg = null;
                try {
                msg = ((TextMessage) message).getText();
                } catch (JMSException e) {
                e.printStackTrace();
                }
                System.out.println(msg);
                }


                public static void main(String[] args) {
                ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                "classpath:spring/springContext-activemq.xml");
                try {
                System.in.read();
                } catch (IOException e) {
                e.printStackTrace();
                }
                context.close();
                }


                }
                复制

                运行 SpringMessageListener 类,查看控制台和 ActiveMQ 管理界面

                说明消费者读取到了消息,并打印到控制台显示。

                使用 Listener 和 receive方法的区别在于 Listener 会一直运行,主动监听消息的变化,及时消费。

                Listener 验证,再次运行生产者,观察 ActiveMQ 管理界面

                文章转载自追梦Java,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论