首页 > 其他分享 >RabbitMQ-消息入队

RabbitMQ-消息入队

时间:2025-01-13 22:23:27浏览次数:1  
标签:durable factory RabbitMQ queue 入队 消息 true channel

1 分布式异步的问题

对于一个业务线的处理,如果是一个完整的处理,应该是消息正 常进入队列,同时消息正常被消费掉。

问题来了:

生产者发送消息,在传输过程中,消息丢失了,咋办?

消息发到RabbitMq队列,RabbitMq宕机了,咋办?

消费者在消费消息的时候,消费异常了,咋办?

方案思路

1、要保证消息一定能够正常的发到队列中去。

2、要保证入队的消息,一定不能丢失。

3、要保证一定是正常消费的消息,才会从队列中删除。

2 消息入队-确认-Confirm模式

Confirm方式有三种模式: 1、普通Confirm模式 2、批量Confirm模式 3、异步Confirm模式

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码 
factory.Port = 5672;

using (var connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        #region 声明路由和队列 
        channel.QueueDeclare(queue: "EnteringQueueTrans", durable: true, 
            exclusive: false, autoDelete: false, arguments: null);
        channel.ExchangeDeclare(type: ExchangeType.Fanout, 
            exchange: "EnteringQueueTransExChange", durable: true, 
            autoDelete: false, arguments: null);
        channel.QueueBind(queue: "EnteringQueueTrans",
            exchange: "EnteringQueueTransExChange",routingKey: string.Empty);
        #endregion

        //开启Confirm模式
        channel.ConfirmSelect();
        try
        {
            for (int i = 1; i <= 10; i++)
            {
                string msg = $"Confirm 模式===消息入队确认=={i}";
                byte[] bytes = Encoding.UTF8.GetBytes(msg);
                channel.BasicPublish("EnteringQueueTransExChange", string.Empty, null, bytes);
            }

            //WaitForConfirms确认消息(可以同时确认多条消息)是否发送成功
            bool isOk = channel.WaitForConfirms();
            if (isOk)//判断 channel.WaitForConfirms() 是否已经发送到队列中去---返回bool值
            {
                Console.WriteLine("消息已发送~~");
            }
            //执行这句话---如果成功了, 就成功了,如果失败,就抛出异常 
            channel.WaitForConfirmsOrDie();
        }
        catch (Exception)
        {
            //如果消息发送给交换机的过程出现异常,则捕捉并进行回滚
            // 配置重试下
            throw;
        }
        finally
        {
            //关闭通道
            channel.Close();
            connection.Close();
        }

    }
}

3 消息入队-确认-事务支持

channel.txSelect(): 将当前信道设置成事务模式

channel.txCommit(): 用于提交事务

channel.txRollback(): 用于回滚事务

通过事务实现机制,只有消息成功被rabbitmq服务器接收, 事务才能提交成功,否则便可在捕获异常之后进行回滚,然后进行消息重发。

但是事务非常影响rabbitmq的性能。还有就是事务机制是阻塞的过程,只有等待服务器回应之后才会处理下一条消息。

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码 
factory.Port = 5672;

using (var connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        #region 声明路由和队列 
        channel.QueueDeclare(queue: "EnteringQueueTrans", durable: true,
            exclusive: false, autoDelete: false, arguments: null);

        channel.ExchangeDeclare(type: ExchangeType.Fanout, 
            exchange: "EnteringQueueTransExChange", durable: true,
            autoDelete: false, arguments: null);

        channel.QueueBind(queue: "EnteringQueueTrans",
            exchange: "EnteringQueueTransExChange",routingKey: string.Empty);
        #endregion

        //将信道设置为事务模式
        channel.TxSelect();
        try
        {
            for (int i = 1; i <= 10; i++)
            {
                string msg = $"事务模式===消息入队确认=={i}";
                byte[] bytes = Encoding.UTF8.GetBytes(msg);
                channel.BasicPublish("EnteringQueueTransExChange", string.Empty, null, bytes);
            }
            //事务提交 前面写入的消息,一定是在这句话成功执行难后,才会写入到队列中去
            channel.TxCommit();  

        }
        catch (Exception)
        {
            //如果消息发送给交换机的过程出现异常,则捕捉并进行回滚
            channel.TxRollback();
            throw;
        }
        finally
        {
            //关闭通道
            channel.Close();
            connection.Close();
        }

    }
}

4 消息入队后—消息持久化

在前边已经介绍了exchange和queue的持久化,把exchange和 queue的durable属性设置为true.

exchange和queue也会恢复。我们需要注意的是:如果queue设置 durable=true,rabbitmq服务重启后队列虽然会存在,但是队列内 的消息会丢全部丢失。那么怎么实现消息的持久化呢?实现的方法 很简单:将exchange和queue都设置durable=true,然后在消息发 布的时候设置persistent=true即可。

服务器宕机,出现异常的时候, 消息固化到硬盘上----硬盘存储是 可以断电的;

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码 
factory.Port = 5672;

using (var connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        #region 声明路由和队列 

        //支持持久化队列:durable: true
        channel.QueueDeclare(queue: "PersistenceQueue", durable: true,
            exclusive: false, autoDelete: false, arguments: null);

        //支持持久化交换机durable: true
        channel.ExchangeDeclare(type: ExchangeType.Fanout,
            exchange: "PersistenceQueueExChange", durable: true,
            autoDelete: false, arguments: null);

        channel.QueueBind(queue: "PersistenceQueue",
            exchange: "PersistenceQueueExChange", routingKey: string.Empty);
        #endregion

        //表达发送的是持久化消息
        var props = channel.CreateBasicProperties();
        props.Persistent = true;  //配置的这句话就是--消息是可以支持持久化的

        for (int i = 1; i <= 1000; i++)
        {
            string msg = $"持久化消息--持久化队列===消息入队确认=={i}";
            byte[] bytes = Encoding.UTF8.GetBytes(msg);
            channel.BasicPublish("PersistenceQueueExChange", string.Empty, props, bytes);
        }
    }
}

标签:durable,factory,RabbitMQ,queue,入队,消息,true,channel
From: https://www.cnblogs.com/nullcodeworld/p/18669524

相关文章

  • RabbitMQ-死信队列
    死信,就是无法被消费的消息,一般来说生产者将消息投递到broker或者直接到队列里了,消费者从队列取出消息进行消费。但某些时候由于特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有死信队列。死信队列还是队列---只是用来接受特......
  • RabbitMQ-集群
    RabbitMQ集群----主备关系,在运行的时候,如果非主要节点宕机,程序操作不受影响;如果主节点宕机了,程序会中断操作。而Rabbitmq集群,会马上让没有宕机的节点参选,选出新的主要节点。程序重试的时候,会进入到新的节点中执行。历史消息不受影响的。基于Docker构建RabbitMQ集群1.启动......
  • RabbitMQ-优先级队列及消息配置
    优先级队列C#数据类型queue----先进先出RabbitMQ---队列-----默认也是先进先出~~RabbitMQ设置优先级----可以配置让消费顺序,不按照先进先出的默认规则;给定的优先级---最终体现在消费者;优先级越高,消费的时候,就优先消费。就在前面消费案例:设置{"vip1","hello2","wor......
  • RabbitMQ 高可用方案:原理、构建与运维全解析
    文章目录前言:1集群方案的原理2RabbitMQ高可用集群相关概念2.1设计集群的目的2.2集群配置方式2.3节点类型3集群架构3.1为什么使用集群3.2集群的特点3.3集群异常处理3.4普通集群模式3.5镜像集群模式前言:在实际生产中,RabbitMQ常以集群方案部署。因选用它......
  • windows消息循环和linux消息循环的异同
     一、消息循环的基本概念消息循环(MessageLoop)是一种编程结构,用于等待和分派消息。在不同的系统或机制下,消息循环有不同的称呼,如事件循环(EventLoop)或运行循环(RunLoop)。它是经典的消息驱动机制的基础‌。 二、相同点事件驱动机制的核心地位在Linux和Windows中......
  • Kafka 是一个分布式流式平台,主要用于处理大规模、高吞吐量的消息传递、日志收集和实时
    Kafka集群是什么?Kafka是一个分布式流式平台,主要用于处理大规模、高吞吐量的消息传递、日志收集和实时数据流。Kafka集群是由多个Kafka服务器(称为Broker)组成的,它们共同工作以实现消息的高可用性、可靠性、可扩展性和容错性。Kafka集群的目的是确保消息的持久化和高效传输,同......
  • Redis 是一个开源的高性能键值对存储数据库,通常被用作缓存、消息队列和持久化数据库。
    Redis服务器是什么?Redis是一个开源的高性能键值对存储数据库,通常被用作缓存、消息队列和持久化数据库。Redis支持多种数据结构,如字符串、哈希、列表、集合、有序集合、位图等。它被广泛用于需要快速读写操作、低延迟的场景。Redis可以作为一个独立的数据库使用,也可以作为缓......
  • JAVA学习之路(九)—— 消息队列MQ
    JAVA学习之路(九)——消息队列MQ说明消息队列MQ什么是消息队列?消息对列有什么用?异步处理削峰/限流降低系统耦合性使用消息队列可能会带来哪些问题?常见的消息队列KafkaRocketmqRabbitMQActiveMQ总结RocketMQ工作流程NameServerProducerProducer启动流程Producer发消息......
  • rust学习十六.2、并发-利用消息传递进行线程间通讯
    通过信道是rust的解决线程之间通信的2个工具之一,另外1个是是共享内存状态。rust推出这个,明显地是因为受到go之类的影响。在书籍中,作者提到go编程文档中的内容:不要通过共享内存来通讯;而是通过通讯来共享内存(Donotcommunicatebysharingmemory;instead,sharememorybyco......
  • VUE +WebSocket+speak-tt 实现在浏览器右下角实时给商家推送订单消息
    先看效果  1、WebSocket服务建立 1.1引入包 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>1.2新建配置类packagecom......