一、生产者
直接使用HelloWorld模式下的应用案例依赖和代码,将生产者Give类拷贝一份。
将发送消息部分调整为遍历发送,连发10次:
//遍历发送多条消息 for (int i = 0; i < 10; i++) { //发送内容 channel.basicPublish("",QUEUE,null,("这是第" + (i+1) + "条消息").getBytes()); }
二、消费者
1、普通分发
直接使用HelloWorld模式下的应用案例依赖和代码,将消费者Get类拷贝两份创建两个消费者,分别为GetOne和GetTwo,将其接受消息展示的内容加上前缀(XXX接收:)表示区分:
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String str = new String(body); System.out.println("GetOne接收:" + str); channel.basicAck(envelope.getDeliveryTag(),false); }
同时开启两个消费者的情况下,启动生产者,结果如下:
两个消费者根据启动接收方法的先后顺序,也依次地从消息队列中拿到对应的数据。
将GetTwo的消息处理逻辑调整,添加一个睡眠,模拟实际操作的时间错位:
String str = new String(body); System.out.println("GetTwo接收:" + str); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(),false);
再次按照上面的方式发送消息,结果如下:
第一个消费者不受影响,第二个消费者每睡眠一段时间拿一次消息。但是依旧如之前一般,GetOne只拿奇数,GetTwo只拿偶数。
也就是说在消息发送的时候,就已经按照两个消费者去分配了,不管谁接收的快,谁接收的慢,都会按照固有的工作量去分配。即便第一个已经接收完五个消息了,而第二个还在缓慢的接收第二个消息。
在实际应用中,这肯定不合适,消息的处理量肯定不是按照这么去分。万一有一个消费者慢了,就会拖慢整个工作任务,很影响应用的整体性能。所以要对规则进行调整,让消费者能者多劳。
2、能者多劳式分发
为了提升整体的运行效率和性能,要对两个消费者做分发规则的调整,在两个消费者的通道上加上:
//在消费者确认消息前,让服务器暂时不要给自己分发消息 channel.basicQos(1);
这样在其中某些消费者运行慢的时候,消息就会更多地向运行快的消费者分发。
再次按照上面的方式运行,结果如下:
在GetTwo接收一次消息之后,陷入了三秒的沉睡,这期间GetOne已经默默帮懒人GetTwo做完了本该属于他的工作。最后结果就是GetOne分发了9条消息,而GetTwo只有一条。
但从整体的角度来看,本次消息分发完所需要的时间,相比之前大大降低了。
之前光是GetTwo接收五条消息所需的四次睡眠间隔就需要12秒,而这次整体只需要三秒多。
标签:分发,消费者,队列,Work,Rabbitmq,GetTwo,消息,接收,String From: https://www.cnblogs.com/guobin-/p/17805603.html