首页 > 其他分享 >RabbitMQ

RabbitMQ

时间:2024-01-25 09:35:17浏览次数:26  
标签:oa springframework OA org import RabbitMQ public

RabbitMQ

配置

    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
​
​
    @Configuration
    public class OARabbitMqConfig {
​
        //
        //更新OA 交换机
        public static final String OA_EXCHANGE = "oa.info.exchange";
        //更新OA 路由的 key
        public static final String OA_ROUTING_KEY = "oa.info.key";
        //更新OA 队列名称
        public static final String OA_QUEUE = "oa.info.queue";
​
​
        //更新OA 死信交换机
        public static final String OA_ERROR_EXCHANGE = "oa.error.exchange";
        public static final String OA_ERROR_ROUTING_KEY = "oa.error.key";
        public static final String OA_ERROR_QUEUE = "oa.error.queue";
​
​
        //OA 队列声明
        @Bean
        public Queue oaQueue() {
            return QueueBuilder.durable(OA_QUEUE)
                    .deadLetterExchange(OA_ERROR_EXCHANGE)
                    .deadLetterRoutingKey(OA_ERROR_ROUTING_KEY)
                    .ttl(1296000000)
                    .build();
        }
​
        //OA 直连交换机
        @Bean
        public DirectExchange oaExchange() {
            return ExchangeBuilder.directExchange(OA_EXCHANGE).delayed().durable(true).build();
        }
​
​
        //OA 队列绑定
        @Bean
        public Binding oaBinding() {
            return BindingBuilder.bind(oaQueue()).to(oaExchange()).with(OA_ROUTING_KEY);
        }
​
​
        @Bean
        public DirectExchange oaErrorExchange() {
            return new DirectExchange(OA_ERROR_EXCHANGE);
        }
​
​
        @Bean
        public Queue oaErrorQueue() {
            return QueueBuilder
                    //指定队列名称,并持久化
                    .durable(OA_ERROR_QUEUE)
                    //设置队列的超时时间,15天
                    .ttl(1296000000)
                    //指定死信交换机
                    .build();
        }
​
        @Bean
        public Binding oaErrorBinding() {
            return BindingBuilder.bind(oaErrorQueue()).to(oaErrorExchange()).with(OA_ERROR_ROUTING_KEY);
        }
​
​
    }

 

消费者

获取rabbitMQ消息队列里面的数据

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.util.List;
​
@Component
@Slf4j
public class OAConsumer {
​
​
    @RabbitHandler
    @RabbitListener(queues = OARabbitMqConfig.OA_QUEUE)
    public void receive(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        try {
            ObjectMapper objectMapper = SpringUtils.getBean(ObjectMapper.class);
            OAQueueMessage oaQueueMessage = objectMapper.readValue(message.getBody(), OAQueueMessage.class);
            if (OAProducer.FLOW_TYPE_RCV.intValue() == oaQueueMessage.getFlowType().intValue()) {
                ObjectMapper mapper = SpringUtils.getBean(ObjectMapper.class);
                List<Long> oaIds = mapper.readValue(oaQueueMessage.getJsonParams(), new TypeReference<List<Long>>() {
                });
                IUfCgddtxmService ufCgddtxmService = SpringUtils.getBean(IUfCgddtxmService.class);
                //sql server in 只支持2100的数量
                int batchSize = 2000;
                int totalData = oaIds.size();
                int startIndex = 0;
                while (startIndex < totalData) {
                    int endIndex = Math.min(startIndex + batchSize, totalData);
                    List<Long> subOAIds = oaIds.subList(startIndex, endIndex);
                    ufCgddtxmService.updateSfyshByIds(subOAIds);
                    startIndex += batchSize;
                }
            }
            log.info("oa:timeAt:[{}];deliveryTag:[{}];message:[{}]", DateUtils.dateTimeNow(), deliveryTag, oaQueueMessage);
            //不批量的ACK该消息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.info("oa:timeAt:[{}];deliveryTag:[{}];errorMsh:[{}]", DateUtils.dateTimeNow(), deliveryTag, e.getMessage());
            //不批量的NACK该消息,并且投入死信队列
            channel.basicNack(deliveryTag, false, false);
        }
    }
​
}

 

生产者

/**
 *
 */
@Component
@Slf4j
public class OAProducer {
​
    //已收货
    public static final Integer FLOW_TYPE_RCV = 1;
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    @Autowired
    private ConfirmCallback confirmCallback;
​
    @Autowired
    ObjectMapper objectMapper;
​
​
    @PostConstruct
    public void init() {
        //消息发送给交换机会调用该类的confirm方法
        rabbitTemplate.setConfirmCallback(confirmCallback);
    }
​
    /**
     *
     */
    public void sendMessage(String jsonParams, Integer flowType) {
        try {
            OAQueueMessage oaQueueMessage = new OAQueueMessage();
            oaQueueMessage.setJsonParams(jsonParams);
            oaQueueMessage.setFlowType(flowType);
            byte[] bytes = objectMapper.writeValueAsString(oaQueueMessage).getBytes(StandardCharsets.UTF_8);
            CorrelationData correlationData = new CorrelationData();
            Message message = MessageBuilder.withBody(bytes)
                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                    .build();
            correlationData.setReturnedMessage(message);
            rabbitTemplate.convertAndSend(OARabbitMqConfig.OA_EXCHANGE, OARabbitMqConfig.OA_ROUTING_KEY, message, correlationData);
        } catch (JsonProcessingException e) {
            throw new UtilException("发送消息异常" + e.getMessage());
        }
    }
}

 

应用层

直接调用生产者的sendMessage方法

//发送更新oa数据消息<已收货>
oaProducer.sendMessage(BeanUtils.toJson(oaIds), OAProducer.FLOW_TYPE_RCV);

 

标签:oa,springframework,OA,org,import,RabbitMQ,public
From: https://www.cnblogs.com/jessi200/p/17986344

相关文章

  • RabbitMQ的几种应用场景
    之前的几篇文章介绍了一下RabbitMQ的概念以及环境的搭建和配置,有了RabbitMQ环境就可以基于其实现一些特殊的任务场景了。RabbitMQ官方有个很好的Tutorials基本覆盖了RabbitMQ的各中常见应用场景,现以代码加注释的方式以其Python客户端pika为例简单介绍如下。更详尽的信息可参阅:htt......
  • Python三方库:Pika(RabbitMQ基础使用)
    Python有多种插件都支持RabbitMQ,本文介绍的是RabbitMQ推荐的Pika插件。使用pip直接安装即可pipinstallpika。一、RabbitMQ简介1.MQ简介MQ(MessageQueue,消息队列),是一个在消息传输过程中保存消息的容器,多用在分布式系统之间进行通信。MQ优势应用解耦:提高系统容错性和可......
  • ERROR:Only one ConfirmCallback is supported by each RabbitTemplate] with root cau
     错误:OnlyoneConfirmCallbackissupportedbyeachRabbitTemplate]withrootcause 原因:因为Spring的Bean默认都是单例;而RabbitTemplate对象同样支持一个回调。 解决:使用@Scope("prototype")可通知Spring将被注解的Bean变为多例。代码: //改Ra......
  • RabbitMq批量删除队列
    RabbitMq批量删除队列​ 由于部分公司同事使用RabbitMq时,没有将Client设置为autodelete,导致大量冗余队列。其中这些队列又是无routekey队列,收到了批量的订阅消息,占用服务器内存。​ 如何将这些无用的队列删除成为一个问题?经过多次摸索,在rabbitmqmanagementapi里面找到了方案:u......
  • RabbitMQ学习八 消费者可靠性
    一、消费者确认机制消费者的可靠性是靠消费者确认机制来保证。RabbitMQ提供了消费者确认机制(consumerAcknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己处理状态。回执有三种可选值:ack:成功处理消息,RabbitMQ从队列中删除该消息nack:......
  • RabbitMQ学习六 生产者可靠性
    一、生产者重连由于网络波动可能造成客户端连接MQ失败的情况,通过配置可以开启连接失败后的重连机制:spring:rabbitmq:addresses:xxx.xx.xx.xxport:5672username:xxxxxpassword:xxxxvirtual-host:/xxxxconnection-timeout:1s#设置MQ的连......
  • RabbitMQ学习五 springboot连接RabbitMQ
    一、入门引入依赖在springboot中引入spring-amqp-starter<!--amqp的起步依赖--><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>编写配置文件spring:rabbitmq......
  • RabbitMQ安装-Windows
      Windows安装RabbitMQ配置:Eralng:opt-20.2RabbitMQ-server-3.7.4(习惯安装到无中文且无空格目录下) 1.安装erlang并配置环境变量安装:otp_win64_20.2.exeotp_win64_20.2.exe配置环境变量变量名:ERLANG_HOME变量值:(安......
  • RabbitMq基础版
    微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们成这种调用方式为同步调用,也可以叫同......
  • RabbitMQ学习四 java客户端连接RabbitMQ
    RabbitMQ的工作模式,可以参考官网:https://www.rabbitmq.com/getstarted.html一、简单方式以下两种都是生产者直接发消息给队列,不通过交换机。且发送给队列的消息只能被消费一次。比如一个队列被C1和C2消费,在队列中的消息只会被一个消费者消费。生产者代码逻辑图代码如下:p......