首页 > 其他分享 >RabbitMQ通讯方式第二讲:Work Queues

RabbitMQ通讯方式第二讲:Work Queues

时间:2024-09-24 22:54:46浏览次数:10  
标签:WORK false AMQP Queues Work RabbitMQ QUEUE channel NAME

了解Work Queues

    1.1官网中的图片:通过官网里的图片,我们可以看到Word Queues 与Hello World 的区别,这里的消费者增加,但是时多个消费者消费单个队列,在这里我们依然要注意,这里面使用的是默认的交换机,并不是直接连接的队列。

    1.2直观的图片:

更好的理解每次的连接都是需要交换机的,对于后续的学习是有帮助的

二.消费者消费的过程。

2.1 :了解AMQP:

RabbitMQ是AMQP协议的一个实现者,它是一个开源的消息队列系统,基于AMQP协议实现。

AMQP协议将消息传递分为两个部分:交换(exchanges)和消息队列(message queues)。消息队列是基本的,专注于将消息传递给消费者。交换则根据一组规则将新消息路由到正确的消息队列。RabbitMQ支持多种类型的交换,如直接交换(Direct exchange)、扇出交换(Fanout exchange)和主题交换(Topic exchange)。

AMQP还引入了消息确认(message acknowledgements)的概念,以确保可靠的消息传递。消息确认有两种模式:自动确认和显式确认。自动确认模式下,消息一旦发送就被视为已确认;显式确认模式则允许消费者决定何时确认消息,例如在接收或处理消息后

2.1 :了解在work queue 下的消息的默认分配

    2.1在资源的非配上采取的是轮询(round-robin) 的方式将消息平均分配给所有消费者。在这里,我们想一个问题:因为消费者处理的能力不同,会影响效率,我们该如何解决呢?

三.代码出发,演示如何使用Work Queues

    3.1 提供者:这里的代码与hello world  并没有什么不同

         这里面连接工厂的代码在我的文章:RabbitMQ通讯方式第一讲:Hello World-CSDN博客  在这里就不再赘述了。

         注意这里面的交换机是默认的,与队列连接的方式式直接连接。

public class Provider {
    public static final String WORK_QUEUE_NAME="work";
    @Test
    public void p() throws IOException, TimeoutException {
        Connection connection = RabbitMQConnectionUtil.getConnection();
        Channel channel=connection.createChannel();
        channel.queueDeclare(WORK_QUEUE_NAME,false,false,false,null);
        //workQueues 模式下还是默认的交换机;
        for (int i = 0; i < 10; i++) {
            String msg=("开始发送第"+i+"条消息");
            channel.basicPublish("",WORK_QUEUE_NAME,null,msg.getBytes());
        }
        System.out.println("消息发送成功");
    }

    3.2消费者代码

@Test
    public void consumer1() throws IOException, TimeoutException {
        Connection connection = RabbitMQConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(WORK_QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);
        DefaultConsumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("得到数据"+new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //将autoACk设置为非自动
        channel.basicConsume(WORK_QUEUE_NAME,false,callback);
        System.out.println("获取数据成功");
        System.in.read();
    }
@Test
    public void consumer2() throws IOException, TimeoutException {
        Connection connection = RabbitMQConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(WORK_QUEUE_NAME,false,false,false,null);
        //设置消息的流控
        channel.basicQos(3);
        DefaultConsumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false);
                System.out.println("得到数据"+new String(body,"utf-8"));
            }
        };
        channel.basicConsume(WORK_QUEUE_NAME,false,callback);
        System.out.println("获取数据成功");
        System.in.read();
    }

在这段代码里面,我们解决了处理效率不同的问题。在消费者一的代码里面,线程休眠了100ms

 try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

在消费者二的代码里面,线程休眠了1000ms 显然一的速度更快。在这里的运行时我们关闭了自动确认

channel.basicConsume(WORK_QUEUE_NAME,false,callback);

    3.3介绍流控:

在这段代码里面,我们开启的流控起到的作用是:

channel.basicQos(3);

在 RabbitMQ 中,basicQos 方法用于设置消息的流控,即消费者预取(prefetch)设置。这是 AMQP 0-9-1 协议中的一个特性,允许消费者告诉 RabbitMQ 它能够处理多少条未被确认(unacknowledged)的消息。这种机制可以防止 RabbitMQ 向消费者发送过多消息,从而避免消费者过载。

标签:WORK,false,AMQP,Queues,Work,RabbitMQ,QUEUE,channel,NAME
From: https://blog.csdn.net/hdk5855/article/details/142470934

相关文章

  • C# .net 8 used Pomelo.EntityFrameworkCore.MySql
    1.dotnetaddpackagePomelo.EntityFrameworkCore.MySqlusingMicrosoft.EntityFrameworkCore;namespaceConsoleApp84{internalclassProgram{staticvoidMain(string[]args){using(varcontext=newDbBookDataContex......
  • 告别页面卡顿:Web Worker 助你解决前端性能瓶颈
    背景随着现代前端开发的复杂度不断提升,网页应用变得越来越丰富,用户期望更加流畅的交互体验。然而,JavaScript是单线程的,意味着它不能同时处理多个任务。一旦有耗时的任务执行,例如大量数据处理、复杂算法的计算、或是繁重的文件解析,页面的主线程很容易被阻塞,导致界面卡顿或无响应,严......
  • 日新月异 PyTorch - pytorch 基础: 通过卷积神经网络(Convolutional Neural Networks,
    源码https://github.com/webabcd/PytorchDemo作者webabcd日新月异PyTorch-pytorch基础:通过卷积神经网络(ConvolutionalNeuralNetworks,CNN)做图片分类-通过ResNet50做图片分类的学习(对cifar10数据集做训练和测试),保存训练后的模型,加载训练后的模型并评估指定的......
  • RabbitMQ在.net core中的应用
    RabbitMQ是一个可靠且成熟的消息传递和流代理,它很容易部署在云环境、内部部署和本地机器上。它目前被全世界数百万人使用。1.基本概念生产者(Producer)生产者是一个发送消息的程序。发送消息的程序可以是任何语言编写的,只要它能够连接到RabbitMQ服务器,并且能够发送消息到RabbitM......
  • 【Unity】 HTFramework框架(五十六)MarkdownText:支持运行时解析并显示Markdown文本
    更新日期:2024年9月15日。Github源码:[点我获取源码]Gitee源码:[点我获取源码]索引MarkdownText支持的Markdown语法标题强调文本表格嵌入图像超链接使用MarkdownText设置项运行时属性解析使用ID模式嵌入图像MarkdownTextMarkdownText为UGUIText的扩展加强版,支持在运行时解析并显......
  • COMPSCI 315 Web Server Workload Characterization
    WebServerWorkloadCharacterizationAssignment3and4,COMPSCI315Due:RefertothedeadlineonCanvas;SubmissionviaCanvas1IntroductionInternettrafficmeasurementinvolvescollectingnetworkdatathatcanbeanalyzedforseveralpurposessuchas......
  • 万象更新 Html5 - h5: h5 通过 web worker 实现多线程
    源码https://github.com/webabcd/Html5作者webabcd万象更新Html5-h5:h5通过webworker实现多线程示例如下:h5\webWorker\worker.html<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><title>通过web......
  • 万象更新 Html5 - h5: h5 通过 web worker 实现多线程(演示如何转移数据的所有权)
    源码https://github.com/webabcd/Html5作者webabcd万象更新Html5-h5:h5通过webworker实现多线程(演示如何转移数据的所有权)示例如下:h5\webWorker\worker_transferable.html<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"&......
  • 万象更新 Html5 - h5: h5 通过 Service Worker 拦截和处理网络请求(可以实现网络资源的
    源码https://github.com/webabcd/Html5作者webabcd万象更新Html5-h5:h5通过ServiceWorker拦截和处理网络请求(可以实现网络资源的缓存)示例如下:h5\serviceWorker\worker.html<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8">......
  • Rabbitmq 集群+镜像队列的搭建
    目录概念普通集群模式镜像列队模式概念任何一个服务,如果仅仅是单机部署,那么性能总是有上限的,RabbitMQ也不例外,当单台RabbitMQ服务处理消息的能力到达瓶颈时,可以通过集群来实现高可用和负载均衡。通常情况下,在集群中我们把每一个服务称之为一个节点,在RabbitMQ集群中......