首页 > 其他分享 >RabbitMq

RabbitMq

时间:2023-11-09 21:46:22浏览次数:31  
标签:amqp rabbitmq springframework org import RabbitMq String

为啥要用rabbitmq

1,松耦合结构(解耦,异步处理,缓冲能力,伸缩性,扩展性)

2,性能是万级的

rabbitmq的生产者:
rabbitmq的生产者如何保证数据安全问题:

1,发送者确认

2,失败者通知
rabbitmq的消费者
rabbitmq的消费者如何保证数据安全问题:

1,手动消费确认
rabbitmq与springboot的集成:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

package touchbase.infrastructure.common.config.rabbitmq;


import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 1,使用顺序
*/
@Configuration
public class RabbitMqConfig {
@Value("${touchbase.rabbitmq.host}")
private String host;
@Value("${touchbase.rabbitmq.port}")
private String port;
@Value("${touchbase.rabbitmq.username}")
private String username;
@Value("${touchbase.rabbitmq.password}")
private String password;
@Value("${touchbase.rabbitmq.virtual-host}")
private String virtualhost;

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host + ":" + port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
return connectionFactory;
}
}

生产者:

package touchbase.infrastructure.common.config.rabbitmq;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Configuration
public class RabbitMqProducerConfig {

@Autowired
private ConnectionFactory connectionFactory;

@PostConstruct
public void init() {
// 如果交换器,队列未创建,可以通过RabbitAdmin进行创建
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
DirectExchange directExchange = new DirectExchange("cmpn.strategy");
rabbitAdmin.declareExchange(directExchange);

Queue weChatQueue = new Queue("touch.channel.wechatassis");
rabbitAdmin.declareQueue(weChatQueue);
rabbitAdmin.declareBinding(BindingBuilder.bind(weChatQueue).to(directExchange)
.with("touch.channel.wechatassis"));
}

//使用template
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
//失败通知(在不可路由的情况下)
template.setMandatory(true);
//发送方确认
template.setConfirmCallback(confirmCallback());
//失败回调
template.setReturnCallback(returnCallback());
return template;
}

/*使用了RabbitMq系统的缺省的交换器(direct交换器:完全匹配)*/
//申明队列
@Bean
public Queue weChatAssisQueue() {
return new Queue("touch.channel.wechatassis");
}

/*生产者发送确认*/
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback() {
return new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("发送者确认发送给mq成功");
} else {
//处理失败的消息
System.out.println("发送者发送给给mq失败,考虑重发:" + cause);
}

}
};
}

/*失败者通知*/
@Bean
public RabbitTemplate.ReturnCallback returnCallback() {
return new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message,
int replyCode,
String replyText,
String exchange,
String routingKey) {
System.out.println("无法路由的消息,需要考虑另外处理");
System.out.println("returned replyText:" + replyText);
System.out.println("returned exchange:" + exchange);
System.out.println("returned routingKey:" + routingKey);
String msgJson = new String(message.getBody());
System.out.println("Returned Message:" + msgJson);
}
};
}
}

消费者:
package touchbase.infrastructure.common.config.rabbitmq;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConsumerConfig {

@Autowired
private ConnectionFactory connectionFactory;

/**
* @Description: SimpleMessageListenerContainer的工厂類
* @Author rty
* @Date: 2023/10/21 23:42
* @Version 1.0
*/
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置当前的消费者数量
factory.setConcurrentConsumers(1);
//设置当前的消费者数量上限
factory.setMaxConcurrentConsumers(5);
//设置是否重回队列
factory.setDefaultRequeueRejected(true);
return factory;
}
}

package touchbase.adapter.message;

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import touchbase.application.dto.common.FlowDataDTO;
import touchbase.application.service.channeltask.ChannelTaskService;

import java.io.IOException;
import java.util.List;

/**
* @Description: 渠道任务的消费
* @Author rty
* @Date: 2023/10/21 15:36
* @Version 1.0
*/
@Component
@Slf4j
public class ChannelTaskConsumer {

public static final String WECHATASSIS = "WECHATASSIS";

@Autowired
private ChannelTaskService channelTaskService;

@RabbitListener(containerFactory = "simpleRabbitListenerContainerFactory", queues = "touch.channel.wechatassis")
@RabbitHandler
public void dealWeChatChannelTaskConsumer(List<Message> messages, Channel channel) throws IOException {
long deliveryTag = 0l;
try {
for (Message message : messages) {
String flowDataString = new String(message.getBody());
log.info("企微专员任务流转", flowDataString);
deliveryTag = message.getMessageProperties().getDeliveryTag();
channelTaskService.dealChannelChannelTask(JSONObject.parseArray(flowDataString, FlowDataDTO.class),
WECHATASSIS);
}
} catch (Exception e) {
log.error("企微专员任务流转失败,{}", e);
//basicReject一次只能拒绝接收一个消息,basicNack方法可以支持一次0个或多个消息的拒收
} finally {
channel.basicAck(deliveryTag, false);
}
}
}




标签:amqp,rabbitmq,springframework,org,import,RabbitMq,String
From: https://www.cnblogs.com/flybirdR/p/17822923.html

相关文章

  • Spring Boot中使用RabbitMQ完成延迟功能
    MQ-消息队列简单来说就是将“消息”放到“队列”中,然后慢慢处理队列中的消息。完成延迟功能总体的思路是将消息放到队列中,为消息设置过期时间,不直接处理这个队列中的消息,等到消息过期,将它转到另一个队列进行处理,从而完成延迟功能。基本概念1.队列队列是RabbitMQ的内部对象,用......
  • Springboot整合RabbitMQ值Direct交换机
    常用的交换机有以下三种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送/接收模式也会有以下几种:DirectExchange 直连型交换机,根据消息携带的路由键将消息投递给对应队列。大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键routingkey......
  • 安装RabbitMQ
    1)安装虚拟机环境:Mac+ParallesDesktop16+Debian10Arm64IP地址:166.166.166.92配置源:debhttps://mirror.sjtu.edu.cn/debian/bullseyemaincontribnon-freedeb-srchttps://mirror.sjtu.edu.cn/debian/bullseyemaincontribnon-freedebhttps://mirror.sjtu.edu.cn/d......
  • Rabbitmq消息队列:Topic话题模式简单应用
    一、生产者声明topic话题模式的交换机,分别发送几条消息到不同的路由key。packagetest.topic;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutils.RabbitmqConUtil;publicclassGive{//定义交换机privatefinals......
  • Rabbitmq消息队列:Route路由模式简单应用
    一、生产者在发布订阅模式的代码基础上,进行一定的调整,将声明交换机的路由模式调整为direct路由模式。这个时候需要用到路由key,类似于给消息用来分类的标签。分别发送三条消息,发向GetOne、GetTwo和GetThree三个路由key://声明交换机(类型direct->路由模式)channel......
  • Rabbitmq消息队列:Publish/Subscribe模式简单应用
    一、生产者packagetest.publish;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutils.RabbitmqConUtil;publicclassGive{//定义交换机privatefinalstaticStringEXCHANGE="test-publish";publicstati......
  • Rabbitmq消息队列:Work模式简单应用
    一、生产者直接使用HelloWorld模式下的应用案例依赖和代码,将生产者Give类拷贝一份。将发送消息部分调整为遍历发送,连发10次://遍历发送多条消息for(inti=0;i<10;i++){//发送内容channel.basicPublish("",QUEUE,null,("这是第"+(i+1)+"条消息")......
  • linux 安装rabbitmq流程记录
    Linux系统:CentOS7.x(如果是CentOS8.x的话,需要修改下面两个环境版本号中的el7为el8)Erlang:erlang-22.3.4.12-1.el7.x86_64.rpmRabbitMQ:rabbitmq-server-3.8.13-1.el7.noarch.rpm1安装erlangLinux系统:CentOS7.x(如果是CentOS8.x的话,需要修改下面两个环境版本号中的el7为el8......
  • RabbitMQ安装——window10 64位
    一、下载并安装erlang环境  RabbitMQ是由erlang编程语言开发的消息队列,因此需要在电脑上安装erlang的环境。1、官网下载erlang环境在RabbitMQ的最新版本详情页面RabbitMQ Project Announcements — RabbitMQ查看最新版本支持的erlang环境,目前支持erlang的版本号为OTP25.x......
  • RabbitMQ教程
    1.同步通讯和异步通信(1)同步通信另外,有服务停机时,影响其他服务的调用,级联失败 问题: (2)异步调用 优势一:服务解耦 ,现在只需要订阅事件即可 优势二:性能提升,吞吐量提高,完成一个订单任务,只需要支付服务执行消耗即可优势三:服务没有强依赖,不担心级联失败问题优势四:流量削峰 ,服务请求......