首页 > 其他分享 >七、activemq整合springmvc之queue

七、activemq整合springmvc之queue

时间:2022-12-15 13:35:37浏览次数:58  
标签:xml springmvc spring queue TextMessage org message activemq

一、前言

spring代码基于 ​​SSM整合(spring-springmvc-mybatis)之CRUD ​​;代码地址:(基础版本:​​https://gitee.com/joy521125/ssm-senior-base.git​​​maven版:​​https://gitee.com/joy521125/ssm-senior.git​​)

代码地址:​​https://gitee.com/joy521125/ssm-senior.git​​(activemq 分支)

测试代码:在/emp/list中,设置了一个推送;

jar包:

 

版本:

1 <properties>
2 <org.muses.spring.versin>5.3.18</org.muses.spring.versin>
3 <org.muses.activemq.versin>5.16.5</org.muses.activemq.versin>
4 <org.muses.pagehelper.versin>5.3.0</org.muses.pagehelper.versin>
5 </properties>

View Code

jar包:

 

1 <!-- activemq所需要的jar包 -->
2 <dependency>
3 <groupId>org.apache.activemq</groupId>
4 <artifactId>activemq-all</artifactId>
5 <version>${org.muses.activemq.versin}</version>
6 </dependency>
7 <dependency>
8 <groupId>org.apache.xbean</groupId>
9 <artifactId>xbean-spring</artifactId>
10 <version>4.21</version>
11 </dependency>
12
13 <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
14 <!-- activeMQ对JMS的支持,整合Spring和ActiveMQ -->
15 <dependency>
16 <groupId>org.springframework</groupId>
17 <artifactId>spring-jms</artifactId>
18 <version>${org.muses.spring.versin}</version>
19 </dependency>
20
21 <dependency>
22 <groupId>org.apache.activemq</groupId>
23 <artifactId>activemq-pool</artifactId>
24 <version>${org.muses.activemq.versin}</version>
25 </dependency>

View Code

二、activem 队列queue配置

默认是持久化;

 activemq_queue.xml(生产者)配置信息:

1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
3 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd">
4 <bean id="mQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
5 <!-- tcp://自己的服务器地址:端口 -->
6 <property name="brokerURL" value="tcp://localhost:61616"></property>
7 <property name="userName" value="admin"></property>
8 <property name="password" value="admin"></property>
9 <!-- 强制异步发送大幅提升性能;但意味着无论消息是否已发送,send() 方法都会立即返回,这可能导致消息丢失 -->
10 <property name="useAsyncSend" value="true"></property>
11 <!-- 如果未设置此标志,则不会使用单独的线程为连接中的每个会话分派消息。但是,如果有多个会话,或者会话未处于自动确认或重复 ok 模式,则始终使用单独的线程。 默认情况下,此值设置为 true,并且会话分派异步发生。 -->
12 <property name="alwaysSessionAsync" value="false"></property>
13 </bean>
14
15 <bean id="connectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory">
16 <property name="connectionFactory" ref="mQConnectionFactory"></property>
17 <property name="maxConnections" value="100"></property>
18 </bean>
19 <!-- 这个是队列目的地,点对点的 -->
20 <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
21 <constructor-arg index="0" value="spring-active-queue"></constructor-arg>
22 </bean>
23
24 <!-- 发布者配置 实际项目中消费者配置和发布者配置分开 -->
25 <bean id="mQConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean>
26 <!-- 发布者配置 实际项目中消费者配置和发布者配置分开 -->
27 <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
28 <property name="connectionFactory" ref="connectionFactory" />
29 <property name="defaultDestination" ref="destinationQueue" />
30 <property name="messageConverter" ref="mQConverter" />
31 <property name="sessionTransacted" value="true" />
32 <!-- 消息持久化配置 这里配置的是持久化 -->
33 <!-- 消息持久化 explicitQosEnabled 必须配置 -->
34 <property name="explicitQosEnabled" value="true"></property>
35 <property name="deliveryPersistent" value="true" /><!-- 非持久化配置为 false -->
36 <property name="deliveryMode" value="2"></property><!-- 非持久化配置为 1 -->
37 </bean>
38 <!-- 发布者配置 实际项目中消费者配置和发布者配置分开 -->
39 <bean class="org.muses.ssm.senior.mgt.core.entity.MessagePublisher">
40 <property name="jmsTemplate" ref="queueTemplate"></property>
41 </bean>
42
43 </beans>

发布消息MessagePublisher.java:

1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
3 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd">
4 <bean id="mQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
5 <!-- tcp://自己的服务器地址:端口 -->
6 <property name="brokerURL" value="tcp://localhost:61616"></property>
7 <property name="userName" value="admin"></property>
8 <property name="password" value="admin"></property>
9 <!-- 强制异步发送大幅提升性能;但意味着无论消息是否已发送,send() 方法都会立即返回,这可能导致消息丢失 -->
10 <property name="useAsyncSend" value="true"></property>
11 <!-- 如果未设置此标志,则不会使用单独的线程为连接中的每个会话分派消息。但是,如果有多个会话,或者会话未处于自动确认或重复 ok 模式,则始终使用单独的线程。 默认情况下,此值设置为 true,并且会话分派异步发生。 -->
12 <property name="alwaysSessionAsync" value="false"></property>
13 </bean>
14
15 <bean id="connectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory">
16 <property name="connectionFactory" ref="mQConnectionFactory"></property>
17 <property name="maxConnections" value="100"></property>
18 </bean>
19 <!-- 这个是队列目的地,点对点的 -->
20 <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
21 <constructor-arg index="0" value="spring-active-queue"></constructor-arg>
22 </bean>
23 <!-- 消费者配置 :监听器 -->
24 <bean id="mqMessageListener" class="org.muses.ssm.senior.mgt.core.utiles.MqMessageListener">
25 </bean>
26 <!-- 消费者配置 实际项目中消费者配置和发布者配置分开 -->
27 <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
28 <property name="connectionFactory" ref="connectionFactory"></property>
29 <property name="destination" ref="destinationQueue"></property>
30 <property name="messageListener" ref="mqMessageListener"></property>
31 </bean>
32 </beans>

 

web.xml中添加 activemq_queue.xml配置:

1     <context-param>
2 <param-name>contextConfigLocation</param-name>
3 <param-value>classpath:spring.xml,classpath:activemq_queue.xml</param-value>
4 </context-param>

完整web.xml:

1 <?xml version="1.0" encoding="UTF-8"?>
2 <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xmlns="http://xmlns.jcp.org/xml/ns/javaee"
4 xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
5 version="4.0">
6 <filter>
7 <filter-name>CharacterEncodingFilter</filter-name>
8 <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
9 <init-param>
10 <param-name>encoding</param-name>
11 <param-value>utf-8</param-value>
12 </init-param>
13 </filter>
14 <filter-mapping>
15 <filter-name>CharacterEncodingFilter</filter-name>
16 <url-pattern>/*</url-pattern>
17 </filter-mapping>
18 <context-param>
19 <param-name>contextConfigLocation</param-name>
20 <param-value>classpath:spring.xml,classpath:activemq_queue.xml</param-value>
21 </context-param>
22 <listener>
23 <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
24 </listener>
25 <servlet>
26 <servlet-name>springDispatcherServlet</servlet-name>
27 <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
28 <init-param>
29 <param-name>contextConfigLocation</param-name>
30 <param-value>classpath:springmvc.xml</param-value>
31 </init-param>
32 <load-on-startup>1</load-on-startup>
33 </servlet>
34
35 <servlet-mapping>
36 <servlet-name>springDispatcherServlet</servlet-name>
37 <url-pattern>/</url-pattern>
38 </servlet-mapping>
39 <!-- 配置HiddenHttpMethodFilter :可以把post请求转为delete or put请求 -->
40
41 <filter>
42 <filter-name>HiddenHttpMethodFilter</filter-name>
43 <filter-class>org.springframework.web.filter.HiddenHttpMethodFilter</filter-class>
44 </filter>
45 <filter-mapping>
46 <filter-name>HiddenHttpMethodFilter</filter-name>
47 <url-pattern>/*</url-pattern>
48 </filter-mapping>
49 </web-app>

View Code

MqMessageListener:

1 package org.muses.ssm.senior.mgt.core.utiles;
2
3 import javax.jms.Message;
4 import javax.jms.MessageListener;
5 import javax.jms.TextMessage;
6
7 public class MqMessageListener implements MessageListener {
8
9 @Override
10 public void onMessage(Message message) {
11 if (null != message && message instanceof TextMessage) {
12 TextMessage textMessage = (TextMessage) message;
13 System.out.println("textMessage:" + textMessage);
14 }
15 }
16
17 }

View Code

 推送方法MessagePublisher :

1 package org.muses.ssm.senior.mgt.core.entity;
2
3 import org.springframework.jms.core.JmsTemplate;
4
5 public class MessagePublisher {
6
7 private static JmsTemplate jmsTemplate;
8
9 public static JmsTemplate getJmsTemplate() {
10 return jmsTemplate;
11 }
12
13 public static void setJmsTemplate(JmsTemplate jmsTemplate) {
14 MessagePublisher.jmsTemplate = jmsTemplate;
15 }
16
17 public static void sendMessage(Object message) {
18 jmsTemplate.convertAndSend(message);
19 }
20
21 }

View Code

 

三、事务配置

在Spring+JMS的设计里面,通过sessionTransacted=true可以满足jms的事务,但是这个事务说的是在消费者端,也就是消费不成功会消息会回滚再次被消费。

事务配置

只需要更改activemq_queue.xml 配置信息;增加属性sessionTransacted 属性值设置为true;表示jms的事务由spring托管,但是发送消息中如果又异常,依然会推送。需要在消费者端配置事务,才能达到回滚的效果;

即:

1 <!-- 发布者配置 实际项目中消费者配置和发布者配置分开 -->
2 <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
3 <property name="connectionFactory" ref="connectionFactory" />
4 <property name="defaultDestination" ref="destinationQueue" />
5 <property name="messageConverter" ref="mQConverter" />
6 <!-- 开启事务配置 -->
7 <property name="sessionTransacted" value="true" />
8 <!-- 消息持久化配置 这里配置的是持久化 -->
9 <!-- 消息持久化 explicitQosEnabled 必须配置 -->
10 <property name="explicitQosEnabled" value="true"></property>
11 <property name="deliveryPersistent" value="true" /><!-- 非持久化配置为 false -->
12 <property name="deliveryMode" value="2"></property><!-- 非持久化配置为 1 -->
13 </bean>

官方文档地址:​​https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms​

1 @Override
2 public void onMessage(Message message) {
3 if (null != message && message instanceof TextMessage) {
4 TextMessage textMessage = (TextMessage) message;
5 System.out.println("textMessage:" + textMessage);
6 int a = 1 / 0;
7 }
8 }
1  <!-- 消费者配置 实际项目中消费者配置和发布者配置分开 -->
2 <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
3 <property name="connectionFactory" ref="connectionFactory"></property>
4 <property name="destination" ref="destinationQueue"></property>
5 <property name="messageListener" ref="mqMessageListener"></property>
6 <property name="sessionTransacted" value="true" />
7 </bean>

 

那么效果就是:

七、activemq整合springmvc之queue_mq

 

四、签收

  1. AUTO_ACKNOWLEDGE = 1 :自动确认
  2. CLIENT_ACKNOWLEDGE = 2:客户端手动确认
  3. DUPS_OK_ACKNOWLEDGE = 3: 自动批量确认
  4. SESSION_TRANSACTED = 0:事务提交并确认
  5. INDIVIDUAL_ACKNOWLEDGE = 4:单条消息确认   但是在activemq补充了一个自定义的ACK模式:

设置签收方式,必须关闭事务,否则签收方式的配置无效;

1     <!-- 消费者配置 实际项目中消费者配置和发布者配置分开 -->
2 <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
3 <property name="connectionFactory" ref="connectionFactory"></property>
4 <property name="destination" ref="destinationQueue"></property>
5 <property name="messageListener" ref="mqMessageListener"></property>
6 <property name="sessionTransacted" value="false"></property><!-- 设置签收方式,必须关闭事务 -->
7 <property name="sessionAcknowledgeMode" value="4"></property><!-- 设置手动签收 -->
8 </bean>

消息监听,设置手动签收:

1  @Override
2 public void onMessage(Message message) {
3 if (null != message && message instanceof TextMessage) {
4 TextMessage textMessage = (TextMessage) message;
5 System.out.println("textMessage:" + textMessage);
6 try {
7 message.acknowledge();// 手动签收
8 } catch (JMSException e) {
9 // TODO Auto-generated catch block
10 e.printStackTrace();
11 }
12 }
13 }

 

 如果消息监听没有设置了签收:

七、activemq整合springmvc之queue_xml_02

 

sessionAcknowledgeMode 设置为2时,不管客户端设置不设置签收都会签收,原因如下:

七、activemq整合springmvc之queue_mq_03

七、activemq整合springmvc之queue_spring_04

 

我从来不相信什么懒洋洋的自由。我向往的自由是通过勤奋和努力实现的更广阔的人生。 我要做一个自由又自律的人,靠势必实现的决心认真地活着。



标签:xml,springmvc,spring,queue,TextMessage,org,message,activemq
From: https://blog.51cto.com/u_10632206/5939318

相关文章

  • SpringMVC学习
    SpringMVC学习1.回顾MVC1.1什么是MVCMVC是模型(Model)、视图(View)、控制器(Controller)的简写,是一种软件设计规范。是将业务逻辑、数据、显示分离的方法来组织代码......
  • SpringMVC中HelloWorld实现(三)
    我机器的开发环境为:开发工具:EclipseForJavaEE;数据库:MySql5.5.35;运行环境:TomCatV7.0;JDK:JDK1.7.0_45;项目工程为:DynamicWebProject; 一、项目依赖的jar包:[ht......
  • springMVC06(1-响应,2-类返回成JSON数据)
    一、大纲二、响应JSON数据(把你给的"类"转化成"JSON"数据)2.1:需要有"@ResponseBody"这个注解2.2:需要导入JSON坐标<dependency><groupId>com.fasterxml.......
  • springMvc05(“日期型”参数传值)
    一、@DateTimeFormat注解的解析:二、解析:2.1-@DateTimeFormat(pattern="yyyy/MM/ddHH:mm:ss")Datedate3自定义传入时间的类型2.2-它所对应的传参值:在PostMan中:3......
  • 解决SpringMVC重定向参数无法携带问题
    解决SpringMVC重定向参数无法携带问题场景重定向时请求参数会丢失,我们往往需要重新携带请求参数,我们可以进⾏⼿动参数拼接如下:return"redirect:handle01?name="+na......
  • SpringMVC-day01
    SpringMVC_day01今日内容理解SpringMVC相关概念完成SpringMVC的入门案例学会使用PostMan工具发送请求和数据掌握SpringMVC如何接收请求、数据和响应结果掌握RESTfu......
  • springMvc4-第一个spring mvc项目
    一个SpringMVC的项目如何创建?请看这里。代码编辑器:IntellijIDEA请提前在电脑上配置好自己的tomcat!该文属于小白教程,适合初学者。1创建SpringMVC项目第一步,点击新建项目......
  • springMvc32-原生apiSpring MVC过滤器-HiddenHttpMethodFilter
    浏览器form​​表单​​只支持GET与POST请求,而DELETE、PUT等method并不支持,spring3.0添加了一个过滤器,可以将这些请求转换为标准的http方法,使得支持GET、POST、PUT与DELETE......
  • springmvc常用标签库
    input标签相当于text<form:inputpath="username"/></td>password标签<form:passwordpath="password"/>hidden标签<form:hiddenpath="id"/>textarea标签是一个支持多行输......
  • 使用Eclipse构建Maven的SpringMVC项目
    使用Eclipse构建Maven的SpringMVC项目首先Eclipse需要安装Maven的插件,地址:http://m2eclipse.sonatype.org/sites/m2e。用MyEclipse安装Maven插件,......