首页 > 其他分享 >八、activemq整合springmvc之topic

八、activemq整合springmvc之topic

时间:2022-11-08 16:14:28浏览次数:71  
标签:11 springmvc list topic TextMessage org message activemq

一、前言

spring代码基于 SSM整合(spring-springmvc-mybatis)之CRUD ;代码地址:(基础版本:https://gitee.com/joy521125/ssm-senior-base.gitmaven版:https://gitee.com/joy521125/ssm-senior.git

代码地址:https://gitee.com/joy521125/ssm-senior.git(activemq 分支)

测试代码:在/emp/list中,设置了一个推送;

jar包:

<properties>
        <org.muses.spring.versin>5.3.18</org.muses.spring.versin>
        <org.muses.activemq.versin>5.16.5</org.muses.activemq.versin>
        <org.muses.pagehelper.versin>5.3.0</org.muses.pagehelper.versin>
    </properties>

pom.xml

 1 <!-- activemq所需要的jar包 -->
 2         <dependency>
 3             <groupId>org.apache.activemq</groupId>
 4             <artifactId>activemq-all</artifactId>
 5             <version>${org.muses.activemq.versin}</version>
 6         </dependency>
 7         <dependency>
 8             <groupId>org.apache.xbean</groupId>
 9             <artifactId>xbean-spring</artifactId>
10             <version>4.21</version>
11         </dependency>
12 
13         <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
14         <!-- activeMQ对JMS的支持,整合Spring和ActiveMQ -->
15         <dependency>
16             <groupId>org.springframework</groupId>
17             <artifactId>spring-jms</artifactId>
18             <version>${org.muses.spring.versin}</version>
19         </dependency>
20 
21         <dependency>
22             <groupId>org.apache.activemq</groupId>
23             <artifactId>activemq-pool</artifactId>
24             <version>${org.muses.activemq.versin}</version>
25         </dependency>

二、activem 主题topic配置(非持久化)

消费者:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
 3         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd">
 4     <bean id="mQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 5         <!-- tcp://自己的服务器地址:端口 -->
 6         <property name="brokerURL" value="tcp://192.168.0.109:61616"></property>
 7         <property name="userName" value="admin"></property>
 8         <property name="password" value="admin"></property>
 9         <!-- 强制异步发送大幅提升性能;但意味着无论消息是否已发送,send() 方法都会立即返回,这可能导致消息丢失 -->
10         <property name="useAsyncSend" value="true"></property>
11         <!-- 如果未设置此标志,则不会使用单独的线程为连接中的每个会话分派消息。但是,如果有多个会话,或者会话未处于自动确认或重复 ok 模式,则始终使用单独的线程。 默认情况下,此值设置为 true,并且会话分派异步发生。 -->
12         <property name="alwaysSessionAsync" value="false"></property>
13     </bean>
14 
15     <bean id="connectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory">
16         <property name="connectionFactory" ref="mQConnectionFactory"></property>
17         <property name="maxConnections" value="100"></property>
18     </bean>
19 <!-- 这个是队列目的地,订阅 -->
20     <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
21         <constructor-arg index="0" value="spring-active-topic"></constructor-arg>
22     </bean>
23     <!-- 消费者配置 实际项目中消费者配置和发布者配置分开 -->
24     <bean id="mqMessageListener" class="org.muses.ssm.senior.mgt.core.utiles.MqMessageListener">
25     </bean>
26     <!-- 消费者配置,可配置多个消费者; 实际项目中消费者配置和发布者配置分开 -->
27     <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
28         <property name="connectionFactory" ref="connectionFactory"></property>
29         <property name="destination" ref="destinationTopic"></property>
30         <!-- public class myMessageListener implements MessageListener -->
31         <property name="messageListener" ref="mqMessageListener"></property>
32     </bean>
33 </beans>

 

 

发布者:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
 3         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd">
 4     <bean id="mQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 5         <!-- tcp://自己的服务器地址:端口 -->
 6         <property name="brokerURL" value="tcp://192.168.0.109:61616"></property>
 7         <property name="userName" value="admin"></property>
 8         <property name="password" value="admin"></property>
 9         <!-- 强制异步发送大幅提升性能;但意味着无论消息是否已发送,send() 方法都会立即返回,这可能导致消息丢失 -->
10         <property name="useAsyncSend" value="true"></property>
11         <!-- 如果未设置此标志,则不会使用单独的线程为连接中的每个会话分派消息。但是,如果有多个会话,或者会话未处于自动确认或重复 ok 模式,则始终使用单独的线程。 默认情况下,此值设置为 true,并且会话分派异步发生。 -->
12         <property name="alwaysSessionAsync" value="false"></property>
13     </bean>
14 
15     <bean id="connectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory">
16         <property name="connectionFactory" ref="mQConnectionFactory"></property>
17         <property name="maxConnections" value="100"></property>
18     </bean>
19 <!-- 这个是队列目的地,订阅 -->
20     <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
21         <constructor-arg index="0" value="spring-active-topic"></constructor-arg>
22     </bean>
23      <!-- 发布者配置 实际项目中消费者配置和发布者配置分开-->
24     <bean id="mQConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean>
25      <!-- 发布者配置 实际项目中消费者配置和发布者配置分开-->
26     <bean id="topicTemplate" class="org.springframework.jms.core.JmsTemplate">
27         <property name="connectionFactory" ref="connectionFactory" />
28         <property name="defaultDestination" ref="destinationTopic" />
29         <property name="messageConverter" ref="mQConverter" />
30         <property name="sessionTransacted" value="false" /><!-- 关闭事务 -->
31          <property name="sessionAcknowledgeMode" value="4"></property><!-- 设置手动签收 -->
32     </bean>
33      <!-- 发布者配置 实际项目中消费者配置和发布者配置分开-->
34     <bean class="org.muses.ssm.senior.mgt.core.entity.MessagePublisher">
35         <property name="jmsTemplate" ref="topicTemplate"></property>
36     </bean>
37     
38 </beans>

 

效果:

分两个项目启动,先启动消费者的服务;再启动发布者的服务;当发布者发布一条消息(即执行http://localhost:8080/test/emp/list; 里面添加了推送消息的方法),那么消费者就消费一条消息;

消费者监听:

 1 package org.muses.ssm.senior.mgt.core.utiles;
 2 
 3 import javax.jms.Message;
 4 import javax.jms.MessageListener;
 5 import javax.jms.TextMessage;
 6 
 7 public class MqMessageListener implements MessageListener {
 8 
 9     @Override
10     public void onMessage(Message message) {
11         if (null != message && message instanceof TextMessage) {
12             TextMessage textMessage = (TextMessage) message;
13             System.out.println("textMessage:" + textMessage);
14         }
15 
16     }
17 
18 }

发布者发布消息的方法:

 1   /**
 2      * 列表页面
 3      *
 4      * @param employee
 5      * @return
 6      */
 7     @RequestMapping("/list")
 8     public ModelAndView list(Employee employee) {
 9         ModelAndView mv = new ModelAndView("index");
10         PageInfo<Employee> list = employeeService.list(employee);
11         mv.addObject("pages", list);
12         MessagePublisher.sendMessage("test_list");
13         return mv;
14     }

 

 

三、事务:

 只需要更改activemq_queue.xml 配置信息;增加属性sessionTransacted 属性值设置为true;表示jms的事务由spring托管,但是发送消息中如果又异常,依然会推送。需要在消费者端配置事务。如果消费者端有异常,该条消息被重复发送多次(默认重发6次,redeliveryCounter=6);消息最后还是显示发送成功。

1  <!-- 消费者配置,可配置多个消费者; 实际项目中消费者配置和发布者配置分开 -->
2     <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
3         <property name="connectionFactory" ref="connectionFactory"></property>
4         <property name="destination" ref="destinationTopic"></property>
5         <!-- public class myMessageListener implements MessageListener -->
6         <property name="messageListener" ref="mqMessageListener"></property>
7         <property name="sessionTransacted" value="true" /><!-- 开启事务 -->
8     </bean>

 四、签收:

 配置同queue的一样;

 1 @Override
 2     public void onMessage(Message message) {
 3         if (null != message && message instanceof TextMessage) {
 4             TextMessage textMessage = (TextMessage) message;
 5             System.out.println("textMessage:" + textMessage);
 6             try {
 7                 message.acknowledge();// 手动签收
 8             } catch (JMSException e) {
 9                 // TODO Auto-generated catch block
10                 e.printStackTrace();
11             }
12         }
13     }

 

 消费者对的事务关闭,且设置为手动签收;配置同queue。

1 <!-- 消费者配置,可配置多个消费者; 实际项目中消费者配置和发布者配置分开 -->
2     <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
3         <property name="connectionFactory" ref="connectionFactory"></property>
4         <property name="destination" ref="destinationTopic"></property>
5         <!-- public class myMessageListener implements MessageListener -->
6         <property name="messageListener" ref="mqMessageListener"></property>
7         <property name="sessionTransacted" value="false" /><!-- 关闭事务 -->
8         <property name="sessionAcknowledgeMode" value="4"></property><!-- 设置手动签收 -->
9     </bean>

 五、持久化:

代码如下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
 3         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd">
 4     <bean id="mQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 5         <!-- tcp://自己的服务器地址:端口 -->
 6         <property name="brokerURL" value="tcp://192.168.154.14:61616"></property>
 7         <property name="userName" value="admin"></property>
 8         <property name="password" value="admin"></property>
 9         <!-- 强制异步发送大幅提升性能;但意味着无论消息是否已发送,send() 方法都会立即返回,这可能导致消息丢失 -->
10         <property name="useAsyncSend" value="true"></property>
11         <!-- 如果未设置此标志,则不会使用单独的线程为连接中的每个会话分派消息。但是,如果有多个会话,或者会话未处于自动确认或重复 ok 模式,则始终使用单独的线程。 默认情况下,此值设置为 true,并且会话分派异步发生。 -->
12         <property name="alwaysSessionAsync" value="false"></property>
13     </bean>
14 
15     <bean id="connectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory">
16         <property name="connectionFactory" ref="mQConnectionFactory"></property>
17         <property name="maxConnections" value="100"></property>
18     </bean>
19 <!-- 这个是队列目的地,订阅 -->
20     <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
21         <constructor-arg index="0" value="spring-active-topic"></constructor-arg>
22     </bean>
23     <!-- 消费者配置 :监听器  -->
24     <bean id="mqMessageListener" class="org.muses.ssm.senior.mgt.core.utiles.MqMessageListener">
25     </bean>
26     <!-- 消费者配置 实际项目中消费者配置和发布者配置分开 -->
27     <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
28         <property name="connectionFactory" ref="connectionFactory"></property>
29         <property name="destination" ref="destinationTopic"></property>
30         <!-- public class myMessageListener implements MessageListener -->
31         <property name="messageListener" ref="mqMessageListener"></property>
32         <!-- 持久化订阅者 -->
33         <property name="pubSubDomain" value="true"></property>
34         <property name="clientId" value="client_001" />  
35     <property name="durableSubscriptionName" value="client_001_dura"/>
36     </bean>
37     <!-- 发布者配置 实际项目中消费者配置和发布者配置分开-->
38     <bean id="mQConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean>
39     <!-- 发布者配置 实际项目中消费者配置和发布者配置分开-->
40     <bean id="topicTemplate" class="org.springframework.jms.core.JmsTemplate">
41         <property name="connectionFactory" ref="connectionFactory" />
42         <property name="defaultDestination" ref="destinationTopic" />
43         <property name="messageConverter" ref="mQConverter" />
44         <property name="sessionTransacted" value="true" />
45     </bean>
46     <!-- 发布者配置 实际项目中消费者配置和发布者配置分开-->
47     <bean class="org.muses.ssm.senior.mgt.core.entity.MessagePublisher">
48         <property name="jmsTemplate" ref="topicTemplate"></property>
49     </bean>
50     
51 </beans>

 

标签:11,springmvc,list,topic,TextMessage,org,message,activemq
From: https://www.cnblogs.com/lixiuming521125/p/16777596.html

相关文章

  • SpringMVC添加依赖包
    `4.0.0<groupId>org.example</groupId><artifactId>ssmbuild</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.com......
  • 鉴权kafka创建新用户和给topic赋权限等相关Acl命令失效问题定位
    问题现象背景:收到客户反馈使用kafka鉴权sdk并填入平台新建的用户组和topic相关参数,消费报鉴权失败。   排查过程:1、首先是怀疑客户使用的入参或用法有误,因此向客......
  • SpringMVC源码-文件上传
    一、环境配置IndexController.java@GetMapping("/file")publicStringfile(){ return"fileUpload";}@PostMapping("/fileUpload")publicvoidfileUpload(Mul......
  • SpringMVC源码-获取HandleAdapter和调用HandleAdapter.handle
    DispatcherServlet.getHandlerAdapter(Objecthandler)protectedHandlerAdaptergetHandlerAdapter(Objecthandler)throwsServletException{ if(this.handlerAdapt......
  • SpringMVC源码-getHandler
    DispatcherServlet.getHandler(HttpServletRequestrequest)protectedHandlerExecutionChaingetHandler(HttpServletRequestrequest)throwsException{ if(this.ha......
  • SpringMvc——拦截器学习
    1、拦截器(Interceptor)是一种动态拦截方法调用的机制作用:在指定的方法调用前后执行预先设定后的的代码阻止原始方法的执行2、拦截器与过滤器区别归属不同:Filter属于Servle......
  • SpringMVC 原理
    1.原理图(实现代表自动装配完成,虚线我们需要实现的地方)   2.实现步骤  3.实现  HelloController.javapackagecom.zxy.contorller;importorg.springfr......
  • 解决在idea中使用springMvc向mysql写入中文数据乱码
    相关设置:1、idea编码格式设置:   2、MySQL的相关编码格式设置:修改前编码:无用操作:之前通过命令行修改编码格式:setcharacter_set_client=utf8......
  • SpringMVC源码-DispatcherServlet处理请求概述
    请求由Servlet的doService处理。DispatcherServlet.doService(HttpServletRequestrequest,HttpServletResponseresponse)protectedvoiddoService(HttpServletReques......
  • SpringMVC
    《解释一下tomcat服务器中ApplicationContext与thymeleaf在解析路径时的问题》《关于tomcat》在我们将项目部署到tomcat服务器上的时候,经常要配置如下的:  我们知道......