首页 > 其他分享 >rabbitMQ实战生产者-交换机-队列-消费者细谈

rabbitMQ实战生产者-交换机-队列-消费者细谈

时间:2024-06-21 12:22:11浏览次数:20  
标签:exchange rabbitMQ queue connection 交换机 routing 细谈 message channel

 

生产者

rabbitmq的配置

创建交换机,创建queue,绑定交换机的routingkey到queue

一,默认的exchange列表

 二,将exchange的routingkey绑定到queue

 

三,生产端关心消息将发放哪个交换机,哪个routingkey,

也可以用通配符(如calc.*,calc.#)匹配相应的routingkey

 

mq服务匹配exchange,routingkey,到queue

 

消费者只关心queue

 

以下是部分代码

python连接到mq

# 建立到RabbitMQ的连接
connection_params = pika.ConnectionParameters(
    host=mq['host'],
    port=mq['port'],
    virtual_host='/',
    credentials=pika.PlainCredentials(mq['username'], mq['passwd']),
    channel_max=1024,
    heartbeat=15,
    retry_delay=5,
    connection_attempts=10
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
default_routing_key = mq['routing_key']

 

python端的生产者

 

def send_message(message, retry=3, routing_key=default_routing_key):
    global channel
    global connection
    for i in range(retry):
        try:
            if not channel.is_open:
                channel = connection.channel()
            # 发送消息到 exchange
            channel.basic_publish(exchange=mq['exchange'], routing_key=routing_key, body=json.dumps(message),
                                  mandatory=True)
            logger.info(f"Sent message to MQ: {message}")
            break  # 如果发送成功,跳出循环
        except pika.exceptions.AMQPConnectionError as e:
            logger.error(f"Failed to send message to MQ: {e}, retrying...")
            connection = pika.BlockingConnection(connection_params)
        except pika.exceptions.ChannelWrongStateError as e:
            logger.error(f"Failed to send message to MQ: {e}, retrying...")
            channel = connection.channel()
        except Exception as e:
            logger.exception("MQ异常")
            if i < retry - 1:  # 如果不是最后一次重试,等待一段时间后继续尝试
                time.sleep(2 ** i)

 

python端的消费者,这个写的有点复杂了,消费者不需要指定交换机和routing_key

        def start_consuming():
            with lock:
                sub_flag = subscript_dict.get(func.__qualname__)
                if sub_flag:
                    logger.info(f"Function {func.__qualname__} has been subscribed, skipping...")
                    return
                subscript_dict[func.__qualname__] = True

            logger.info(f'process: {os.getpid()} thread: {threading.current_thread().name}, {func.__qualname__} start consuming ...')
            _connection = pika.BlockingConnection(connection_params)
            _channel = _connection.channel()
            # 声明交换机
            _channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True)

            # 声明一个排他队列,名称由 RabbitMQ 自动生成
            result = _channel.queue_declare(queue='', exclusive=True)
            queue_name = result.method.queue

            # 将队列绑定到交换机
            _channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=routing_key)

            # 订阅队列的消息
            _channel.basic_consume(queue=queue_name, on_message_callback=wrapped_callback, auto_ack=True)

            logger.info(f'Waiting for messages in {queue_name}, server: {mq["host"]}:{mq["port"]}, exchange: {exchange_name},'
                        f' routing_key: {routing_key} ...')
            _channel.start_consuming()

        # 在装饰器中启动一个线程来执行订阅操作
        thread = threading.Thread(target=start_consuming)
        thread.daemon = True
        thread.start()

        return func  # 返回原始函数以保持签名不变

  

 

java端的连接

    @Bean
    @ConfigurationProperties(prefix = "spring.rabbitmq.template")
    public RabbitTemplate calcRabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

        rabbitTemplate.setReturnsCallback(returnedMessage -> log.error("无法路由[消息:{},回应码:{},回应信息:{},交换机:{},路由键:{}]",
                returnedMessage.getMessage(), returnedMessage.getReplyCode(), returnedMessage.getReplyText(),
                returnedMessage.getExchange(), returnedMessage.getRoutingKey()));

        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("交换机异步确认:[相关数据:{},确认情况:{},原因:{}]",
                correlationData, ack, cause));
        return rabbitTemplate;
    }

  

java端的生产者

    
    @Autowired
@Qualifier("calcRabbitTemplate")
private RabbitTemplate rabbitTemplate;

public void sendMessage(Long testId, Integer current) { try { WsCalcDto wsCalcDto = new WsCalcDto(testId, current); WsMessageDto wsMessageDto = new WsMessageDto(wsCalcDto); rabbitTemplate.convertAndSend( mqCalcConfig.getExchangeConfig().getExchangeName(), mqCalcConfig.getRoutingKey(), wsMessageDto, new GorgeMessagePostProcessor(mqCalcConfig.getMessageConfig())); } catch (Exception e) { log.error("发送消息到RabbitMQ失败", e); } }

  

java端的消费者

    @Autowired
    private MqCalcConfig mqCalcConfig;

    @RabbitListener(queues = "queue")
    public void processMessage(Message message, Channel channel) throws Exception {
        try {
            String body = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("接收RabbitMQ数据, {}", body);
            JSONObject jsonObject = JSON.parseObject(body);

            // ack表示确认消息,第二个参数为批量ack,multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("消息消费异常", e);
            // Reject表示拒绝消息。requeue:false表示被拒绝的消息是丢弃;true表示重回队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }

 

标签:exchange,rabbitMQ,queue,connection,交换机,routing,细谈,message,channel
From: https://www.cnblogs.com/difs/p/18260245

相关文章

  • 不同交换机之间相同VLAN间主机通信
    1、搭建网络拓扑搭建拓扑,分配IP地址,划分vlan,分配端口2、配置交换机//进入全局配置模式Switch>enableSwitch#configterminalEnterconfigurationcommands,oneperline.EndwithCNTL/Z.Switch(config)#hostnameSW1//配置交换机名称,自行配置//创建vl......
  • 小说爬虫-02 爬取小说详细内容和章节列表 推送至RabbitMQ 消费ACK确认 Scrapy爬取 SQL
    代码仓库代码我已经上传到Github,大家需要的可以顺手点个Star!https://github.com/turbo-duck/biquge_fiction_spider背景介绍上一节已经拿到了每个小说的编码:fiction_code,并且写入了数据库表。接下来,我们写一个小工具,将数据表中的数据,都推送到RabbitMQ中。为了保......
  • 【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使
    问题描述原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用AzureEventHub呢? 问题解答RabbitMQ使用的协议是AMQP0-9-1,而AzureEventHub或ServiceBus使用的是AMQP1.0,所以无法直接复用之前的代码。需要使用AzureEventHubSDK来生产/消费消息。Which......
  • RabbitMQ实战宝典:从新手到专家的全面探索
    前言在当今分布式系统架构中,消息队列已成为不可或缺的一部分,而RabbitMQ作为其中的佼佼者,凭借其强大的功能和灵活性,广泛应用于各种规模的应用场景中。本文将带你从基础概念出发,深入探讨RabbitMQ的核心特性,通过实战案例与Java代码示例,引领你踏上成为RabbitMQ大师的旅程。第......
  • 交换机配置(2)--跨交换机VLAN
    实验目的和要求简述数据链路层的主要功能及服务。认识交换机的概念、特点和功能。掌握网络拓扑图的设计过程。初步掌握跨交换机的VLAN配置。知识理论基础简要解释下面网络术语。简述数据链路层的主要功能及服务。答:数据链路层是OSI模型中的第二......
  • 9.华为交换机telnet远程管理配置aaa认证
    目的:telnet远程管理设备LSW1配置[Huawei]intVlanif1[Huawei-Vlanif1]ipadd1.1.1.124[Huawei-Vlanif1]q[Huawei]user-interfacevty04[Huawei-ui-vty0-4]authentication-modeaaa[Huawei-ui-vty0-4]q[Huawei]aaa[Huawei-aaa]local-useradminpasswordciph......
  • rabbitmq实现手动确认
    实现逻辑是:spring容器启动时创建监听容器工厂javaBean,工厂的acknowlegemodel属性设置为手动确认,后续通过该容器产生的所有代理对象都需要手动确认,然后通过对象的反射来调用process方法来完成业务逻辑。但是这里存在一个问题那就是所有通过该让容器产生的监听器都变成了手动确认......
  • 华为交换机配置为ssh登陆
    1、进入特权模式sys2、进入aaaaaa3、创建用户local-user(用户名)passwordcipher(密码)3.1、##各交换机密码位数要求不同、若要求32位以上可以执行一下命令、设置若密码位数要求8位直接忽视即可local-user(用户名)password         ##直接回车......
  • 实验:交换机的基本原理与配置
    1.实验目的掌握交换机的基本原理。2.实验拓扑交换机基本原理的实验拓扑图3.实验步骤(1)PC1的配置如下图所示(2)PC2的配置如下图所示(3)PC3的配置如下图所示(4)PC4的配置如下图所示4.实验调试(1)查看交换机的MAC地址表,命令如下:<Huawei>system-view //进入系统视图[Huaw......
  • 采用java语言+Redis+RabbitMQ开发的 门诊his系统源码 一站式的门诊his系统 门诊业务流
    采用java语言+Redis+RabbitMQ开发的门诊his系统源码一站式的门诊his系统门诊业务流程医院信息系统(HIS系统)门诊业务是医院信息化建设的重要组成部分之一,它涵盖了医院门诊部门涉及的各项业务。HIS系统门诊业务的实施,可以实现医院门诊业务的信息化管理和数据化处理,提高医疗服......