一、前言
spring代码基于 SSM整合(spring-springmvc-mybatis)之CRUD ;代码地址:(基础版本:https://gitee.com/joy521125/ssm-senior-base.gitmaven版:https://gitee.com/joy521125/ssm-senior.git)
代码地址:https://gitee.com/joy521125/ssm-senior.git(activemq 分支)
测试代码:在/emp/list中,设置了一个推送;
jar包:
<properties> <org.muses.spring.versin>5.3.18</org.muses.spring.versin> <org.muses.activemq.versin>5.16.5</org.muses.activemq.versin> <org.muses.pagehelper.versin>5.3.0</org.muses.pagehelper.versin> </properties>
pom.xml
1 <!-- activemq所需要的jar包 --> 2 <dependency> 3 <groupId>org.apache.activemq</groupId> 4 <artifactId>activemq-all</artifactId> 5 <version>${org.muses.activemq.versin}</version> 6 </dependency> 7 <dependency> 8 <groupId>org.apache.xbean</groupId> 9 <artifactId>xbean-spring</artifactId> 10 <version>4.21</version> 11 </dependency> 12 13 <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms --> 14 <!-- activeMQ对JMS的支持,整合Spring和ActiveMQ --> 15 <dependency> 16 <groupId>org.springframework</groupId> 17 <artifactId>spring-jms</artifactId> 18 <version>${org.muses.spring.versin}</version> 19 </dependency> 20 21 <dependency> 22 <groupId>org.apache.activemq</groupId> 23 <artifactId>activemq-pool</artifactId> 24 <version>${org.muses.activemq.versin}</version> 25 </dependency>
二、activem 主题topic配置(非持久化)
消费者:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 3 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd"> 4 <bean id="mQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 5 <!-- tcp://自己的服务器地址:端口 --> 6 <property name="brokerURL" value="tcp://192.168.0.109:61616"></property> 7 <property name="userName" value="admin"></property> 8 <property name="password" value="admin"></property> 9 <!-- 强制异步发送大幅提升性能;但意味着无论消息是否已发送,send() 方法都会立即返回,这可能导致消息丢失 --> 10 <property name="useAsyncSend" value="true"></property> 11 <!-- 如果未设置此标志,则不会使用单独的线程为连接中的每个会话分派消息。但是,如果有多个会话,或者会话未处于自动确认或重复 ok 模式,则始终使用单独的线程。 默认情况下,此值设置为 true,并且会话分派异步发生。 --> 12 <property name="alwaysSessionAsync" value="false"></property> 13 </bean> 14 15 <bean id="connectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory"> 16 <property name="connectionFactory" ref="mQConnectionFactory"></property> 17 <property name="maxConnections" value="100"></property> 18 </bean> 19 <!-- 这个是队列目的地,订阅 --> 20 <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> 21 <constructor-arg index="0" value="spring-active-topic"></constructor-arg> 22 </bean> 23 <!-- 消费者配置 实际项目中消费者配置和发布者配置分开 --> 24 <bean id="mqMessageListener" class="org.muses.ssm.senior.mgt.core.utiles.MqMessageListener"> 25 </bean> 26 <!-- 消费者配置,可配置多个消费者; 实际项目中消费者配置和发布者配置分开 --> 27 <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 28 <property name="connectionFactory" ref="connectionFactory"></property> 29 <property name="destination" ref="destinationTopic"></property> 30 <!-- public class myMessageListener implements MessageListener --> 31 <property name="messageListener" ref="mqMessageListener"></property> 32 </bean> 33 </beans>
发布者:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 3 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd"> 4 <bean id="mQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 5 <!-- tcp://自己的服务器地址:端口 --> 6 <property name="brokerURL" value="tcp://192.168.0.109:61616"></property> 7 <property name="userName" value="admin"></property> 8 <property name="password" value="admin"></property> 9 <!-- 强制异步发送大幅提升性能;但意味着无论消息是否已发送,send() 方法都会立即返回,这可能导致消息丢失 --> 10 <property name="useAsyncSend" value="true"></property> 11 <!-- 如果未设置此标志,则不会使用单独的线程为连接中的每个会话分派消息。但是,如果有多个会话,或者会话未处于自动确认或重复 ok 模式,则始终使用单独的线程。 默认情况下,此值设置为 true,并且会话分派异步发生。 --> 12 <property name="alwaysSessionAsync" value="false"></property> 13 </bean> 14 15 <bean id="connectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory"> 16 <property name="connectionFactory" ref="mQConnectionFactory"></property> 17 <property name="maxConnections" value="100"></property> 18 </bean> 19 <!-- 这个是队列目的地,订阅 --> 20 <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> 21 <constructor-arg index="0" value="spring-active-topic"></constructor-arg> 22 </bean> 23 <!-- 发布者配置 实际项目中消费者配置和发布者配置分开--> 24 <bean id="mQConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean> 25 <!-- 发布者配置 实际项目中消费者配置和发布者配置分开--> 26 <bean id="topicTemplate" class="org.springframework.jms.core.JmsTemplate"> 27 <property name="connectionFactory" ref="connectionFactory" /> 28 <property name="defaultDestination" ref="destinationTopic" /> 29 <property name="messageConverter" ref="mQConverter" /> 30 <property name="sessionTransacted" value="false" /><!-- 关闭事务 --> 31 <property name="sessionAcknowledgeMode" value="4"></property><!-- 设置手动签收 --> 32 </bean> 33 <!-- 发布者配置 实际项目中消费者配置和发布者配置分开--> 34 <bean class="org.muses.ssm.senior.mgt.core.entity.MessagePublisher"> 35 <property name="jmsTemplate" ref="topicTemplate"></property> 36 </bean> 37 38 </beans>
效果:
分两个项目启动,先启动消费者的服务;再启动发布者的服务;当发布者发布一条消息(即执行http://localhost:8080/test/emp/list; 里面添加了推送消息的方法),那么消费者就消费一条消息;
消费者监听:
1 package org.muses.ssm.senior.mgt.core.utiles; 2 3 import javax.jms.Message; 4 import javax.jms.MessageListener; 5 import javax.jms.TextMessage; 6 7 public class MqMessageListener implements MessageListener { 8 9 @Override 10 public void onMessage(Message message) { 11 if (null != message && message instanceof TextMessage) { 12 TextMessage textMessage = (TextMessage) message; 13 System.out.println("textMessage:" + textMessage); 14 } 15 16 } 17 18 }
发布者发布消息的方法:
1 /** 2 * 列表页面 3 * 4 * @param employee 5 * @return 6 */ 7 @RequestMapping("/list") 8 public ModelAndView list(Employee employee) { 9 ModelAndView mv = new ModelAndView("index"); 10 PageInfo<Employee> list = employeeService.list(employee); 11 mv.addObject("pages", list); 12 MessagePublisher.sendMessage("test_list"); 13 return mv; 14 }
三、事务:
只需要更改activemq_queue.xml 配置信息;增加属性sessionTransacted 属性值设置为true;表示jms的事务由spring托管,但是发送消息中如果又异常,依然会推送。需要在消费者端配置事务。如果消费者端有异常,该条消息被重复发送多次(默认重发6次,redeliveryCounter=6);消息最后还是显示发送成功。
1 <!-- 消费者配置,可配置多个消费者; 实际项目中消费者配置和发布者配置分开 --> 2 <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 3 <property name="connectionFactory" ref="connectionFactory"></property> 4 <property name="destination" ref="destinationTopic"></property> 5 <!-- public class myMessageListener implements MessageListener --> 6 <property name="messageListener" ref="mqMessageListener"></property> 7 <property name="sessionTransacted" value="true" /><!-- 开启事务 --> 8 </bean>
四、签收:
配置同queue的一样;
1 @Override 2 public void onMessage(Message message) { 3 if (null != message && message instanceof TextMessage) { 4 TextMessage textMessage = (TextMessage) message; 5 System.out.println("textMessage:" + textMessage); 6 try { 7 message.acknowledge();// 手动签收 8 } catch (JMSException e) { 9 // TODO Auto-generated catch block 10 e.printStackTrace(); 11 } 12 } 13 }
消费者对的事务关闭,且设置为手动签收;配置同queue。
1 <!-- 消费者配置,可配置多个消费者; 实际项目中消费者配置和发布者配置分开 --> 2 <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 3 <property name="connectionFactory" ref="connectionFactory"></property> 4 <property name="destination" ref="destinationTopic"></property> 5 <!-- public class myMessageListener implements MessageListener --> 6 <property name="messageListener" ref="mqMessageListener"></property> 7 <property name="sessionTransacted" value="false" /><!-- 关闭事务 --> 8 <property name="sessionAcknowledgeMode" value="4"></property><!-- 设置手动签收 --> 9 </bean>
五、持久化:
代码如下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 3 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd"> 4 <bean id="mQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 5 <!-- tcp://自己的服务器地址:端口 --> 6 <property name="brokerURL" value="tcp://192.168.154.14:61616"></property> 7 <property name="userName" value="admin"></property> 8 <property name="password" value="admin"></property> 9 <!-- 强制异步发送大幅提升性能;但意味着无论消息是否已发送,send() 方法都会立即返回,这可能导致消息丢失 --> 10 <property name="useAsyncSend" value="true"></property> 11 <!-- 如果未设置此标志,则不会使用单独的线程为连接中的每个会话分派消息。但是,如果有多个会话,或者会话未处于自动确认或重复 ok 模式,则始终使用单独的线程。 默认情况下,此值设置为 true,并且会话分派异步发生。 --> 12 <property name="alwaysSessionAsync" value="false"></property> 13 </bean> 14 15 <bean id="connectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory"> 16 <property name="connectionFactory" ref="mQConnectionFactory"></property> 17 <property name="maxConnections" value="100"></property> 18 </bean> 19 <!-- 这个是队列目的地,订阅 --> 20 <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> 21 <constructor-arg index="0" value="spring-active-topic"></constructor-arg> 22 </bean> 23 <!-- 消费者配置 :监听器 --> 24 <bean id="mqMessageListener" class="org.muses.ssm.senior.mgt.core.utiles.MqMessageListener"> 25 </bean> 26 <!-- 消费者配置 实际项目中消费者配置和发布者配置分开 --> 27 <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 28 <property name="connectionFactory" ref="connectionFactory"></property> 29 <property name="destination" ref="destinationTopic"></property> 30 <!-- public class myMessageListener implements MessageListener --> 31 <property name="messageListener" ref="mqMessageListener"></property> 32 <!-- 持久化订阅者 --> 33 <property name="pubSubDomain" value="true"></property> 34 <property name="clientId" value="client_001" /> 35 <property name="durableSubscriptionName" value="client_001_dura"/> 36 </bean> 37 <!-- 发布者配置 实际项目中消费者配置和发布者配置分开--> 38 <bean id="mQConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean> 39 <!-- 发布者配置 实际项目中消费者配置和发布者配置分开--> 40 <bean id="topicTemplate" class="org.springframework.jms.core.JmsTemplate"> 41 <property name="connectionFactory" ref="connectionFactory" /> 42 <property name="defaultDestination" ref="destinationTopic" /> 43 <property name="messageConverter" ref="mQConverter" /> 44 <property name="sessionTransacted" value="true" /> 45 </bean> 46 <!-- 发布者配置 实际项目中消费者配置和发布者配置分开--> 47 <bean class="org.muses.ssm.senior.mgt.core.entity.MessagePublisher"> 48 <property name="jmsTemplate" ref="topicTemplate"></property> 49 </bean> 50 51 </beans>
标签:11,springmvc,list,topic,TextMessage,org,message,activemq From: https://www.cnblogs.com/lixiuming521125/p/16777596.html