首页 > 其他分享 >Springboot 整合RabbitMQ.

Springboot 整合RabbitMQ.

时间:2022-12-08 16:06:24浏览次数:31  
标签:Springboot boot rabbitmq springframework RabbitMQ 整合 org import com


先看一下项目结构:

Springboot 整合RabbitMQ._spring boot


首先创建 rabbitmq-publisher:

pom.xml

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.niezhiliang</groupId>
<artifactId>rabbitmq-common</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>

yml文件:

spring:
rabbitmq:
addresses: 192.168.8.144:5672
username: chaowei
password: 123456
#虚拟主机地址
virtual-host: /
#连接超时时间
connection-timeout: 15000
publisher-confirms: true
publisher-returns: true
template:
mandatory: true
#重新投递时间(分钟)
overtime: 1
package com.niezhiliang.springboot.rabbitmq.publisher.producer;

import com.niezhiliang.springboot.rabbitmq.common.domain.BrokerMessageLog;
import com.niezhiliang.springboot.rabbitmq.common.domain.BrokerMessageLogExample;
import com.niezhiliang.springboot.rabbitmq.common.domain.Order;
import com.niezhiliang.springboot.rabbitmq.common.mapper.BrokerMessageLogMapper;
import com.niezhiliang.springboot.rabbitmq.publisher.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* @Author NieZhiLiang
* @Email [email protected]
* @Date 2019/1/24 上午9:56
*/
@Component
public class OrderSender {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private BrokerMessageLogMapper brokerMessageLogMapper;



final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {

/**
*
* @param correlationData 唯一标识,有了这个唯一标识,我们就知道可以确认(失败)哪一条消息了
* @param ack 是否投递成功
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String messageId = correlationData.getId();

BrokerMessageLogExample brokerMessageLogExample =
new BrokerMessageLogExample();
brokerMessageLogExample.createCriteria()
.andMessage_idEqualTo(messageId);

BrokerMessageLog brokerMessageLog = null;
try {
brokerMessageLog =
brokerMessageLogMapper.selectByExample(brokerMessageLogExample).get(0);

} catch (IndexOutOfBoundsException e) {
logger.error("不存在messageId:{}的日志记录",messageId);
}

//返回成功,表示消息被正常投递
if (ack) {
brokerMessageLog.setStatus(Constants.ORDER_SEND_SUCCESS);
brokerMessageLog.setUpdate_time(new Date());
brokerMessageLogMapper.updateByPrimaryKeySelective(brokerMessageLog);

logger.info("信息投递成功,messageId:{}",brokerMessageLog.getMessage_id());

} else {
logger.error("消费信息失败,messageId:{} 原因:{}",brokerMessageLog.getMessage_id(),cause);
}
}
};


/**
* 信息投递的方法
* @param order
* @throws Exception
*/
public void send(Order order) throws Exception{
//设置投递回调
rabbitTemplate.setConfirmCallback(confirmCallback);
CorrelationData correlationData = new CorrelationData();
correlationData.setId(order.getMessage_id());

rabbitTemplate.convertAndSend("order-exchange",
"order.abcd",
order,
correlationData);
}

}

接下来,创建rabbitmq-consumer项目:
pom.xml里的jar依赖:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

然后是 application.yml:

spring:
rabbitmq:
#基本配置
addresses: 192.168.8.144:5672
username: chaowei
password: 123456
virtual-host: /
connection-timeout: 15000
#消费端配置
listener:
simple:
#消费端
concurrency: 5
#最大消费端数
max-concurrency: 10
#自动签收auto 手动 manual
acknowledge-mode: manual
#限流(海量数据,同时只能过来一条)
prefetch: 1


server:
port: 8000
package com.niezhiliang.springboot.rabbitmq.customer.consumer;

import com.niezhiliang.springboot.rabbitmq.common.domain.Order;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* @Author NieZhiLiang
* @Email [email protected]
* @Date 2019/1/24 上午10:29
*/
@Component
public class OrderReceiver {


@RabbitListener(
bindings = @QueueBinding( //数据是否持久化
value = @Queue(value = "order-queue",durable = "true"),
exchange = @Exchange(name = "order-exchange",
durable = "true",type = "topic"),
key="order.*"
)
)
@RabbitHandler
public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel) throws Exception {
System.out.println("----收到消息,开始消费-----");
System.out.println("d订单id:"+order.getId());
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);

/**
* 取值为 false 时,表示通知 RabbitMQ 当前消息被确认
* 如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认
*/
channel.basicAck(deliveryTag,false);
System.out.println("--------消费完成--------");
}

}


标签:Springboot,boot,rabbitmq,springframework,RabbitMQ,整合,org,import,com
From: https://blog.51cto.com/u_15906694/5922597

相关文章

  • springboot最核心的三个特有注解
    SpringBoot最大的特点是无需XML配置文件,能自动扫描包路径装载并注入对象,并能做到根据classpath下的jar包自动配置。所以SpringBoot最核心的3个注解就是:@Config......
  • springboot jpa创建表自动添加注释
    @Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE,ElementType.METHOD,ElementType.FIELD})public@interfaceComment{Stringvalue()default......
  • RabbitMQ安装教程
    RabbitMQ安装教程安装RabbitMQ需要依赖erlang语言环境,所以需要我们下载erlang的环境安装程序。1:下载安装erlangerlang环境安装程序下载路径:https://www.erlang.org/down......
  • JAX-RS之resteasy跟spring整合
    其实,在JAX-RS标准下,jboss的resteasy跟spring结合的话,无非是如何去取得spring中的bean而已.两个方法,例子如下1比如有个接口和实现类J......
  • flex3+blazeds+spring+hibernate整合小结
       近来flex盛行,因此这两天也借了本书看了两天,发觉作为非页面设计人员,flex还是很好的,flexbuilder很好用,拖拉就有很COOL的界面了,而且flex总的来说基本东西不难学,有编程基础......
  • java springboot 大文件分片上传处理
    ​  1 背景用户本地有一份txt或者csv文件,无论是从业务数据库导出、还是其他途径获取,当需要使用蚂蚁的大数据分析工具进行数据加工、挖掘和共创应用的时候,首先要将本地......
  • AMQP (RabbitMQ) 支持
    Spring集成提供了通道适配器,用于使用高级消息队列协议(AMQP)接收和发送消息。您需要将此依赖项包含在项目中:<dependency><groupId>org.springframework.integration</g......
  • 黑马程序员2022新版SSM框架Spring+SpringMVC+Maven高级+SpringBoot+MyBatisPlus企业实
    Spring为什么要学Spring?1.专业角度:简化开发,降低企业级开发的复杂性框架整合,高效整合其他计算,提高企业级应用开发与运行效率2.学什么?简化开发IOCAOP事务......
  • 使用SpringBoot时出现了找不到测试类的情况或There are test failures
    出现场景:在使用SpringBoot做单元测试时在Maven编译或打包项目时具体bug描述:Therearetestfailures或者找不到测试类解决方案:首先去运行控制台看causeby后面的......
  • springboot项目集成xxl-job
    一、xxl-job简介xxl-job是一个开源的分布式定时任务框架,它可以与其他微服务组件一起构成微服务集群。它的调度中心(xxl-job)和执行器(自己的springboot项目中有@XxlJob("......