为啥要用rabbitmq
1,松耦合结构(解耦,异步处理,缓冲能力,伸缩性,扩展性)
2,性能是万级的
rabbitmq的生产者:
rabbitmq的生产者如何保证数据安全问题:
1,发送者确认
2,失败者通知
rabbitmq的消费者
rabbitmq的消费者如何保证数据安全问题:
1,手动消费确认
rabbitmq与springboot的集成:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
package touchbase.infrastructure.common.config.rabbitmq;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 1,使用顺序
*/
@Configuration
public class RabbitMqConfig {
@Value("${touchbase.rabbitmq.host}")
private String host;
@Value("${touchbase.rabbitmq.port}")
private String port;
@Value("${touchbase.rabbitmq.username}")
private String username;
@Value("${touchbase.rabbitmq.password}")
private String password;
@Value("${touchbase.rabbitmq.virtual-host}")
private String virtualhost;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host + ":" + port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
return connectionFactory;
}
}
生产者:
package touchbase.infrastructure.common.config.rabbitmq;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class RabbitMqProducerConfig {
@Autowired
private ConnectionFactory connectionFactory;
@PostConstruct
public void init() {
// 如果交换器,队列未创建,可以通过RabbitAdmin进行创建
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
DirectExchange directExchange = new DirectExchange("cmpn.strategy");
rabbitAdmin.declareExchange(directExchange);
Queue weChatQueue = new Queue("touch.channel.wechatassis");
rabbitAdmin.declareQueue(weChatQueue);
rabbitAdmin.declareBinding(BindingBuilder.bind(weChatQueue).to(directExchange)
.with("touch.channel.wechatassis"));
}
//使用template
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
//失败通知(在不可路由的情况下)
template.setMandatory(true);
//发送方确认
template.setConfirmCallback(confirmCallback());
//失败回调
template.setReturnCallback(returnCallback());
return template;
}
/*使用了RabbitMq系统的缺省的交换器(direct交换器:完全匹配)*/
//申明队列
@Bean
public Queue weChatAssisQueue() {
return new Queue("touch.channel.wechatassis");
}
/*生产者发送确认*/
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback() {
return new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("发送者确认发送给mq成功");
} else {
//处理失败的消息
System.out.println("发送者发送给给mq失败,考虑重发:" + cause);
}
}
};
}
/*失败者通知*/
@Bean
public RabbitTemplate.ReturnCallback returnCallback() {
return new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message,
int replyCode,
String replyText,
String exchange,
String routingKey) {
System.out.println("无法路由的消息,需要考虑另外处理");
System.out.println("returned replyText:" + replyText);
System.out.println("returned exchange:" + exchange);
System.out.println("returned routingKey:" + routingKey);
String msgJson = new String(message.getBody());
System.out.println("Returned Message:" + msgJson);
}
};
}
}
消费者:
package touchbase.infrastructure.common.config.rabbitmq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConsumerConfig {
@Autowired
private ConnectionFactory connectionFactory;
/**
* @Description: SimpleMessageListenerContainer的工厂類
* @Author rty
* @Date: 2023/10/21 23:42
* @Version 1.0
*/
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置当前的消费者数量
factory.setConcurrentConsumers(1);
//设置当前的消费者数量上限
factory.setMaxConcurrentConsumers(5);
//设置是否重回队列
factory.setDefaultRequeueRejected(true);
return factory;
}
}
package touchbase.adapter.message;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import touchbase.application.dto.common.FlowDataDTO;
import touchbase.application.service.channeltask.ChannelTaskService;
import java.io.IOException;
import java.util.List;
/**
* @Description: 渠道任务的消费
* @Author rty
* @Date: 2023/10/21 15:36
* @Version 1.0
*/
@Component
@Slf4j
public class ChannelTaskConsumer {
public static final String WECHATASSIS = "WECHATASSIS";
@Autowired
private ChannelTaskService channelTaskService;
@RabbitListener(containerFactory = "simpleRabbitListenerContainerFactory", queues = "touch.channel.wechatassis")
@RabbitHandler
public void dealWeChatChannelTaskConsumer(List<Message> messages, Channel channel) throws IOException {
long deliveryTag = 0l;
try {
for (Message message : messages) {
String flowDataString = new String(message.getBody());
log.info("企微专员任务流转", flowDataString);
deliveryTag = message.getMessageProperties().getDeliveryTag();
channelTaskService.dealChannelChannelTask(JSONObject.parseArray(flowDataString, FlowDataDTO.class),
WECHATASSIS);
}
} catch (Exception e) {
log.error("企微专员任务流转失败,{}", e);
//basicReject一次只能拒绝接收一个消息,basicNack方法可以支持一次0个或多个消息的拒收
} finally {
channel.basicAck(deliveryTag, false);
}
}
}
标签:amqp,rabbitmq,springframework,org,import,RabbitMq,String From: https://www.cnblogs.com/flybirdR/p/17822923.html