Work Queues
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
3.1 轮询分发消息
该 demo 中一个线程发消息,两个工作线程处理消息
抽取获取连接的工具类
package com.kwin.rabbitmq.utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMqUtils { //得到一个连接的 channel public static Channel getChannel() throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("119.91.219.72"); factory.setUsername("admin"); factory.setPassword("a123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
启动两个工作线程
package com.kwin.rabbitmq.two; import com.kwin.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; import java.sql.DriverManager; public class Worker01 { //队列名称 private static final String QUEUE_NAME = "hello1"; //消息消费的方法 private static void consumeMessage(String threadName) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(threadName + ",接收到消息:" + new String(message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println(threadName + ",消息消费被中断"); }; System.out.println(threadName + "启动,等待消费......"); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } public static void main(String[] args) { for (int i = 1; i <= 2; i++) { new Thread(()->{ try { consumeMessage(Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } }, "工作线程" + i).start(); } } }
启动一个发送线程
package com.kwin.rabbitmq.two; import com.kwin.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import java.util.Scanner; public class Task01 { //队列名称 private static final String QUEUE_NAME = "hello1"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化(磁盘) 默认情况下存储在内存中 * 3.该队列是否只供一个消费者进行消费,是否可以消息共享,true可以多个消费者消费, false只能一个消费者消费 * 4.是否自动删除 最后一个消费者断开连接之后,该队列是否自动删除 true自动删除 false不自动删除 * 5.其他参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送完成:" + message); } } }
测试
先启动两个工作线程
启动消息发送的线程,控制台输入AA、BB、CC…
查看工作线程日志