首页 > 其他分享 >springboot配置activemq

springboot配置activemq

时间:2023-01-30 18:11:58浏览次数:43  
标签:springboot true jms 配置 configuration activemq class pool

前言

网上有好多介绍springboot集成activemq的文章,看了一些文章感觉比较零散,还是抽时间自己详细总结一个如何使用,需要注意哪些点。尤其是关于连接池的配置,需要重点关注,否则在消息量大的情况下会把服务器搞挂。

快速配置

如果你只是连接一个activemq集群或节点,那么配置非常简单(这也是springboot便捷的原因)。

如下:

1
2
3
spring.activemq.broker-url=tcp://127.0.0.1:61616?connectionTimeout=3000&soTimeout=500&tcpNoDelay=true&jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=10
spring.activemq.user=admin
spring.activemq.password=admin

就这么简单!有了上面的配置你就可以发送消息了(通过JmsTemplate)。这背后的原理是通过springboot提供的ActiveMQAutoConfiguration来实现的。

1
2
3
4
5
6
7
8
9
10
11
@Configuration
@AutoConfigureBefore(JmsAutoConfiguration.class)
@AutoConfigureAfter({ JndiConnectionFactoryAutoConfiguration.class })
@ConditionalOnClass({ ConnectionFactory.class, ActiveMQConnectionFactory.class })
@ConditionalOnMissingBean(ConnectionFactory.class)
@EnableConfigurationProperties(ActiveMQProperties.class)
@Import({ ActiveMQXAConnectionFactoryConfiguration.class,
truetrueActiveMQConnectionFactoryConfiguration.class })
public class ActiveMQAutoConfiguration {

}

ActiveMQAutoConfiguration的代码能得知,只要你的classpath里面存在ConnectionFactory.class和ActiveMQConnectionFactory.class 并且容器里面没有类型为ConnectionFactory.class的Bean,那么该自动配置组件就会生效。

通过ActiveMQAutoConfiguration,我们在spring容器中就能自动获取一个类型为ConnectionFactory.class的Bean 和 JmsTemplate.class的Bean。

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles(profiles = {"dev"})
public class TestSendAmqMsg {

@Resource
private JmsTemplate jmsTemplate;

@Test
public void testSendMsgDefault() throws Exception {
jmsTemplate.convertAndSend("java");
//等待消费,因为是自发自消费
Thread.sleep(1000 * 20);
}
}

上面的代码其实是不能正常工作的。原因是jmsTemplate.convertAndSend没有指定Destination

Destination的指定有两种方式,一种是通过方法参数指定。如下所示:

1
jmsTemplate.convertAndSend("hello-jms-queue", "java");

一种是通过在application.properties文件中指定一个默认值:

1
spring.jms.template.default-destination=hello-jms-default

还有一点需要注意的是Destination类型,是Topic还是Queue。默认是Queue。Destination类型也可以通过两种方式设置。

一种是通过在application.properties文件中指定一个默认值:

1
2
# false 表示是Queue
spring.jms.pub-sub-domain=false

一种是通过API

1
jmsTemplate.setPubSubDomain(false);

注意:上面的例子虽然能实现消息的发送和接收,但是非常有局限性。一个ActiveMQ上既有Topic也有Queue,我们通过JmsTemplate发送和消费消息时,最好是通过参数Destination来指定目的地,热不是一个字符串(不知道是具体是什么类型,只能通过全局配置)。

消费消息

有了上面的配置,我们可以有两种消费消息的方式。

  1. 通过JmsTemplate的API来主动消费。这个就不详细讲了。
  2. 通过@JmsListener来被动消费

通过@JmsListener来实现消息消费,配置如下。

1
2
3
4
5
6
7
8
9
@Configuration
@EnableJms
public class JmsConfig {

@JmsListener(containerFactory = "jmsListenerContainerFactory", destination = "hello-jms")
public void consumerMsg(String msg) {
System.out.println("############# Received message is : [" + msg + "]*************");
}
}

@EnableJms的作用是启用spring的Jms的注解驱动能力。注册了JmsListenerAnnotationBeanPostProcessorBean。原理如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(JmsBootstrapConfiguration.class)
public @interface EnableJms {
}

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class JmsBootstrapConfiguration {

@Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public JmsListenerAnnotationBeanPostProcessor jmsListenerAnnotationProcessor() {
truetruereturn new JmsListenerAnnotationBeanPostProcessor();
true}

true@Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
truepublic JmsListenerEndpointRegistry defaultJmsListenerEndpointRegistry() {
truetruereturn new JmsListenerEndpointRegistry();
true}

}

JmsAnnotationDrivenConfiguration该配置类非常关键:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
@ConditionalOnClass(EnableJms.class)
class JmsAnnotationDrivenConfiguration {
@Bean
@ConditionalOnMissingBean
public DefaultJmsListenerContainerFactoryConfigurer jmsListenerContainerFactoryConfigurer() {
DefaultJmsListenerContainerFactoryConfigurer configurer = new DefaultJmsListenerContainerFactoryConfigurer();
configurer.setDestinationResolver(this.destinationResolver.getIfUnique());
configurer.setTransactionManager(this.transactionManager.getIfUnique());
configurer.setMessageConverter(this.messageConverter.getIfUnique());
configurer.setJmsProperties(this.properties);
return configurer;
}

true@Bean
true@ConditionalOnMissingBean(name = "jmsListenerContainerFactory")
truepublic DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
DefaultJmsListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
truetrueDefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
truetrueconfigurer.configure(factory, connectionFactory);
truetruereturn factory;
true}
}

配置JmsTemplate属性

1
2
3
4
5
6
7
8
spring.jms.template.default-destination=hello-jms-default
spring.jms.template.delivery-mode=non_persistent
spring.jms.template.priority=100
spring.jms.template.qos-enabled=true
spring.jms.template.time-to-live=50
# 设置消息延迟投递时间 需要jms 2.0 支持
#spring.jms.template.delivery-delay=1
spring.jms.template.receive-timeout=100

配置消费属性

1
2
3
4
5
# 消息消费
spring.jms.listener.acknowledge-mode=client
spring.jms.listener.auto-startup=true
spring.jms.listener.concurrency=10
spring.jms.listener.max-concurrency=20

连接池配置

通过上面的学习,我们已经能实现消息的发送和消费了。但是有一个问题就是,我们会和ActiveMQ Broker建立大量的短连接。在高并发下肯定是不可以的。通过在application.properties中简单配置,我们就能获得连接池能力。

添加依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.14.3</version>
</dependency>

修改配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# 只有配置了该选项,才会启用activemq连接池功能,参见:ActiveMQConnectionFactoryConfiguration
spring.activemq.pool.enabled=true
# 配置连接池参数
spring.activemq.pool.configuration.max-connections=10
spring.activemq.pool.configuration.idle-timeout=30000
spring.activemq.pool.configuration.expiry-timeout=0

spring.activemq.pool.configuration.create-connection-on-startup=false
spring.activemq.pool.configuration.time-between-expiration-check-millis=60000
spring.activemq.pool.configuration.maximum-active-session-per-connection=100
spring.activemq.pool.configuration.reconnect-on-exception=true
spring.activemq.pool.configuration.block-if-session-pool-is-full=true
spring.activemq.pool.configuration.block-if-session-pool-is-full-timeout=3000

连接池自动配置实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@ConditionalOnClass(PooledConnectionFactory.class)
static class PooledConnectionFactoryConfiguration {

true@Bean(destroyMethod = "stop")
true@ConditionalOnProperty(prefix = "spring.activemq.pool", name = "enabled", havingValue = "true", matchIfMissing = false)
true@ConfigurationProperties(prefix = "spring.activemq.pool.configuration")
truepublic PooledConnectionFactory pooledJmsConnectionFactory(
ActiveMQProperties properties) {
truetruePooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(
truetruetruetruenew ActiveMQConnectionFactoryFactory(properties)
truetruetruetruetruetrue.createConnectionFactory(ActiveMQConnectionFactory.class));

truetrueActiveMQProperties.Pool pool = properties.getPool();
truetruepooledConnectionFactory.setMaxConnections(pool.getMaxConnections());
truetruepooledConnectionFactory.setIdleTimeout(pool.getIdleTimeout());
truetruepooledConnectionFactory.setExpiryTimeout(pool.getExpiryTimeout());
truetruereturn pooledConnectionFactory;
true}
}

有了这个实现作为参考,如果我们不想使用springboot提供的ActiveMQ自动配置功能,我们自己写代码配置,也能实现连接池的功能,无非就是普通的ActiveMQConnectionFactoryFactory进行包装而已。

有一个细节需要注意:前置为spring.activemq.pool.configuration的配置属性是如何设置到PooledConnectionFactory的呢?但是是通过ConfigurationPropertiesBindingPostProcessor 该类会处理注解ConfigurationProperties 指定的属性,通过反射设置到生成的Bean中(在Bean初始化前)。

标签:springboot,true,jms,配置,configuration,activemq,class,pool
From: https://www.cnblogs.com/telwanggs/p/17076892.html

相关文章

  • springboot整合activemq(三)配置文件
    #服务端口,8080被另一服务占用server.port=9090spring.activemq.broker-url=tcp://127.0.0.1:61616#在考虑结束之前等待的时间#spring.activemq.close-timeout=15s#默认代......
  • 一、开发环境配置
    1.开发工具配置1.1开发工具版本服务端开发基础工具版本列表开发工具版本号IntelliJ-IDEA2021.x以上版本JavaJDK-1.8.xMaven3.6.x以上版本Centos7.......
  • 随笔(十五)『SpringBoot 整合 Redis』
    一、添加依赖<!--redis启动器--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>......
  • springboot集成swagger的坑
    1、端口问题无法访问此网站网址为 http://localhost:6666/swagger-ui.html 的网页可能暂时无法连接,或者它已永久性地移动到了新网址。ERR_UNSAFE_PORT 如图:......
  • SQL Server 安装、配置、迁移 三
    本片博文记录数据迁移。两台电脑,一台IP地址为:10.15.66.168,有名为TestprgDB的数据库,另一台IP地址10.15.66.169,无数据库。本次任务目的为把168电脑的数据库拷贝到169电脑。......
  • springboot~openfeign开启熔断之后MDC为null的解决
    上一篇说了关于MDC跨线程为null的理解,而本讲主要说一下,如何去解决它,事实上,Hystrix为我们留了这个口,我们只需要继承HystrixConcurrencyStrategy,然后重写wrapCallable方法,再......
  • Fitter-细节-过滤器拦截方式配置 Fitter-细节-过滤器链(多个过滤器)
    Fitter-细节-过滤器拦截方式配置  拦截方式配置:资源被访问的方式注解配置:设置dispatcherTypes属性1.REQUEST:默认值。浏览器直接请求资源......
  • mysql8.0远程访问配置
    mysql8.0远程访问配置一、修改bind-address查找配置文件my.confwhichmysql/usr/bin/mysql--verbose--help|grep-A1'Defaultoptions'vimy.cnfbind-addres......
  • .NET7后端框架:读取配置文件
    前言在项目开发过程中,不可避免的会设置一些全局的可变的参数,如连接字符串、功能开关、Swagger配置、Redis配置等等。.NETCore将这些配置参数统一放在appsettings.json......
  • SQL Server 安装、配置、迁移 二
    本篇博客记录网络配置。本机IP地址:10.15.66.169打开配置界面,如图所示设置TCP/IP为Enabled。双击TCP/IP,出现TCP/IPProperties窗口。选择一个指向本......