RabbitMQ提供了七种通讯方式,可以去官方查看:https://rabbitmq.com/getstarted.html
一、 RabbitMQ提供的通讯方式
其中通讯方式如下:
- Hello World!:为了入门操作提供的方式
- Work queues:一个队列被多个消费者消费
- Publish/Subscribe:手动创建Exchange(FANOUT)
- Routing:手动创建Exchange(DIRECT)
- Topics:手动创建Exchange(TOPIC)
- RPC:RPC方式
- Publisher Confirms:保证消息可靠性
二、 构建Connection工具类
创建Maven项目,导入依赖如下:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
创建com.augus.util包,在下面创建构建工具类 RabbitMQConnectionUtil
package com.augus.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQConnectionUtil {
public static final String RABBITMQ_HOST = "192.168.42.136";
public static final int RABBITMQ_PORT = 5672;
public static final String RABBITMQ_USERNAME = "guest";
public static final String RABBITMQ_PASSWORD = "guest";
public static final String RABBITMQ_VIRTUAL_HOST = "/";
public static Connection getConnection() throws IOException, TimeoutException, TimeoutException {
//1.创建Connection工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置RabbitMQ的连接信息
connectionFactory.setHost(RABBITMQ_HOST);
connectionFactory.setPort(RABBITMQ_PORT);
connectionFactory.setUsername(RABBITMQ_USERNAME);
connectionFactory.setPassword(RABBITMQ_PASSWORD);
connectionFactory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);
//返回连接对象
Connection connection = connectionFactory.newConnection();
return connection;
}
}
三、通讯方式实现
3.1.Hello World通讯方式
3.1.1.架构图
通讯方式架构说明:
3.1.2.生产者
在com.augus.helloworld包下创建生产者:Publisher
package com.augus.helloworld;
import com.augus.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Publisher {
//声明了一个名为 QUEUE_NAME 的常量,用于指定要发送消息的队列名称。
public static final String QUEUE_NAME = "hello";
@Test
public void publish() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建队列
/**
* QUEUE_NAME:要声明的队列的名称。在这段代码中,使用了常量 QUEUE_NAME,它指定了队列的名称为 "hello"。
* durable:指定队列是否是持久化的。如果设置为 true,则表示队列将在服务器重启后保留下来。如果设置为 false,则表示队列是非持久化的,服务器重启后队列将被删除。在这段代码中,设置为 false,表示队列是非持久化的。
* exclusive:指定队列是否是排他的。如果设置为 true,则表示只有声明该队列的连接可以使用该队列。其他连接将无法访问该队列。在这段代码中,设置为 false,表示队列不是排他的。
* autoDelete:指定队列是否是自动删除的。如果设置为 true,则表示当最后一个消费者断开连接后,队列将被自动删除。在这段代码中,设置为 false,表示队列不会自动删除。
* arguments:指定队列的其他属性。在这段代码中,设置为 null,表示没有指定其他属性。
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//4.发布消息
String message = "你好,明天去哪里";
/**
* "":指定消息要发送到的交换机名称。在这里,空字符串表示使用默认的交换机。默认的交换机会将消息直接发送到指定的队列中。
* QUEUE_NAME:指定消息要发送到的队列名称。在这段代码中,使用了常量 QUEUE_NAME,它指定了要发送消息的队列名称为 "hello"。
* null:指定消息的额外属性。在这里,设置为 null,表示没有指定额外的属性。
* message.getBytes():指定要发送的消息内容。message 是一个字符串,通过调用 getBytes() 方法将其转换为字节数组进行发送。
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送成功!");
}
}
3.1.3.消费者
在com.augus.helloworld包下创建消费者:Consumer
package com.augus.helloworld;
import com.augus.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Consumer {
@Test
public void Consume() throws IOException, TimeoutException {
//1.创建连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建队列
channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
//4.监听消息
/**
*
* 推送消息消费最后一般都是采用实现Consumer接口亦或是继承DefaultConsumer类,DefaultConsumer实现了接口Consumer,
* 但是大多数方法都是空实现,需要重写其中的逻辑。
*
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
//消费消息逻辑
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获取到消息:" + new String(body, StandardCharsets.UTF_8));
}
};
/**
* Publisher.QUEUE_NAME: 表示要消费的队列的名称。
* true: 表示自动确认消息消费,即消费者在收到消息后会自动向消息代理发送确认消息。
* defaultConsumer: 是一个实现了 DefaultConsumer 抽象类的对象,用于处理接收到的消息。在 handleDelivery 方法中,可以编写自定义的逻辑来处理接收到的消息。
*/
channel.basicConsume(Publisher.QUEUE_NAME,true, defaultConsumer);
System.out.println("开始监听队列");
System.in.read();
}
}
3.1.4.测试
启动生产者发送消息
然后启动消费者,消费消息
查看控制台,消费者这里也可以看到从队列中获取的消息,如下:
3.2.Work Queues通讯方式
3.2.1.架构图
说明如下:
- 一个队列中的消息,只会被一个消费者成功的消费
- 默认情况下,RabbitMQ的队列会将消息以轮询的方式交给不同的消费者消费
- 消费者拿到消息后,需要给RabbitMQ一个ack,RabbitMQ认为消费者已经拿到消息了
3.2.2.生产者
生产者:生产者和Hello World的形式是一样的,都是将消息推送到默认交换机。这里只是增加了一个循环发送10条消息
package com.augus.workqueues;
import com.augus.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Publisher {
//声明了一个名为 QUEUE_NAME 的常量,用于指定要发送消息的队列名称。
public static final String QUEUE_NAME = "WorkQueues";
@Test
public void publish() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建队列
/**
* QUEUE_NAME:要声明的队列的名称。在这段代码中,使用了常量 QUEUE_NAME,它指定了队列的名称为 "hello"。
* durable:指定队列是否是持久化的。如果设置为 true,则表示队列将在服务器重启后保留下来。如果设置为 false,则表示队列是非持久化的,服务器重启后队列将被删除。在这段代码中,设置为 false,表示队列是非持久化的。
* exclusive:指定队列是否是排他的。如果设置为 true,则表示只有声明该队列的连接可以使用该队列。其他连接将无法访问该队列。在这段代码中,设置为 false,表示队列不是排他的。
* autoDelete:指定队列是否是自动删除的。如果设置为 true,则表示当最后一个消费者断开连接后,队列将被自动删除。在这段代码中,设置为 false,表示队列不会自动删除。
* arguments:指定队列的其他属性。在这段代码中,设置为 null,表示没有指定其他属性。
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//4.发布消息
for (int i = 1; i < 11; i++) {
String message = "你好,明天去哪里"+i;
/**
* "":指定消息要发送到的交换机名称。在这里,空字符串表示使用默认的交换机。默认的交换机会将消息直接发送到指定的队列中。
* QUEUE_NAME:指定消息要发送到的队列名称。在这段代码中,使用了常量 QUEUE_NAME,它指定了要发送消息的队列名称为 "hello"。
* null:指定消息的额外属性。在这里,设置为 null,表示没有指定额外的属性。
* message.getBytes():指定要发送的消息内容。message 是一个字符串,通过调用 getBytes() 方法将其转换为字节数组进行发送。
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
}
System.out.println("消息发送成功!");
}
}
3.2.3.消费者案例一
消费者:让消费者关闭自动ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息,在com.augug.workqueues包下创建两个Consumer,分别启动
package com.augus.workqueues;
import com.augus.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Consumer {
@Test
public void consume1() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建队列
channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//4.监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1号:" + new String(body, StandardCharsets.UTF_8));
}
};
channel.basicConsume(Publisher.QUEUE_NAME,false,defaultConsumer);
System.out.println("开始监听队列");
System.in.read();
}
@Test
public void consume2() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建队列
channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//4.监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-2号:" + new String(body, StandardCharsets.UTF_8));
}
};
channel.basicConsume(Publisher.QUEUE_NAME,false,defaultConsumer);
System.out.println("开始监听队列");
System.in.read();
}
}
启动后再调用生产者,发送消息到队列,观察两个消费者,发现是以轮询的方式交给消费者进行消费的,代码如下:
- consume1如下:
- consume2如下:
3.2.4.消费者案例二
修改消费者consume,让consume1消费需要100ms,consume2消费需要1000ms,通过sleep实现代码如下:
public class Consumer {
@Test
public void consume1() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建队列
channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//4.监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者-1号:" + new String(body, StandardCharsets.UTF_8));
}
};
channel.basicConsume(Publisher.QUEUE_NAME,false,defaultConsumer);
System.out.println("开始监听队列");
System.in.read();
}
@Test
public void consume2() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建队列
channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//4.监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者-2号:" + new String(body, StandardCharsets.UTF_8));
}
};
channel.basicConsume(Publisher.QUEUE_NAME,false,defaultConsumer);
System.out.println("开始监听队列");
System.in.read();
}
}
执行会发现consume1消费能力比较强(休眠时间短),但是受限于consume2的消费能力(休眠时间长),制约了consume1·,导致消费能力强的consume1无法消费
3.2.5.消费者案例三
由于上面的问题,所以在Work Queues中可以设置流控,后面还需要设置手动ACK代码如下:
package com.augus.workqueues;
import com.augus.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Consumer {
@Test
public void consume1() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建队列
channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
//3.5.设置消息的流控
/**
* 在 RabbitMQ 中,channel.basicQos(3) 是用于设置消费者的预取数量(prefetch count)。它的作用是控制消费者从队列中获取消息的速度。
* 参数 3 表示消费者一次可以预取的消息数量。这意味着在消费者处理完这些消息之前,RabbitMQ 不会向其发送更多的消息。当消费者处理完这些消息之一部分后,RabbitMQ 会再次发送相同数量的消息给消费者。
* 这个机制可以用来实现负载均衡和公平分发。通过限制每个消费者一次获取的消息数量,可以确保每个消费者在处理消息时都能够公平地分配资源,避免某个消费者一次性获取过多的消息而导致其他消费者无法及时处理消息。
* 需要注意的是,channel.basicQos(3) 只对同一个连接下的消费者有效。如果有多个连接,每个连接下的消费者都需要单独设置预取数量。
*/
channel.basicQos(3);
//4.监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者-1号:" + new String(body, StandardCharsets.UTF_8));
/**
* channel.basicAck(envelope.getDeliveryTag(), false) 是用于确认消息的处理结果,并告知 RabbitMQ 可以将该消息从队列中移除。
* 具体作用如下:
* envelope.getDeliveryTag() 获取消息的交付标签(delivery tag),它是一个唯一的标识符,用于标识消息在队列中的位置。
* channel.basicAck(deliveryTag, false) 通过调用 basicAck 方法,向 RabbitMQ 发送确认消息的指令。
* 第二个参数 false 表示只确认当前的消息,而不是之前的所有未确认消息。
* 这个操作的目的是告知 RabbitMQ 消费者已经成功处理了该消息,并且可以安全地将其从队列中删除。这样可以确保消息不会被重复消费。
* 需要注意的是,basicAck 方法通常在消费者处理完消息后调用。如果消费者在处理消息期间发生异常或者处理失败,可以选择不调用 basicAck 方法,这样 RabbitMQ 将会将该消息重新放回队列中,以便其他消费者重新处理。
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Publisher.QUEUE_NAME,false,defaultConsumer);
System.out.println("开始监听队列");
System.in.read();
}
@Test
public void consume2() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建队列
channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
//3.5.设置消息的流控
/**
* 在 RabbitMQ 中,channel.basicQos(3) 是用于设置消费者的预取数量(prefetch count)。它的作用是控制消费者从队列中获取消息的速度。
* 参数 3 表示消费者一次可以预取的消息数量。这意味着在消费者处理完这些消息之前,RabbitMQ 不会向其发送更多的消息。当消费者处理完这些消息之一部分后,RabbitMQ 会再次发送相同数量的消息给消费者。
* 这个机制可以用来实现负载均衡和公平分发。通过限制每个消费者一次获取的消息数量,可以确保每个消费者在处理消息时都能够公平地分配资源,避免某个消费者一次性获取过多的消息而导致其他消费者无法及时处理消息。
* 需要注意的是,channel.basicQos(3) 只对同一个连接下的消费者有效。如果有多个连接,每个连接下的消费者都需要单独设置预取数量。
*/
channel.basicQos(3);
//4.监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者-2号:" + new String(body, StandardCharsets.UTF_8));
/**
* channel.basicAck(envelope.getDeliveryTag(), false) 是用于确认消息的处理结果,并告知 RabbitMQ 可以将该消息从队列中移除。
* 具体作用如下:
* envelope.getDeliveryTag() 获取消息的交付标签(delivery tag),它是一个唯一的标识符,用于标识消息在队列中的位置。
* channel.basicAck(deliveryTag, false) 通过调用 basicAck 方法,向 RabbitMQ 发送确认消息的指令。
* 第二个参数 false 表示只确认当前的消息,而不是之前的所有未确认消息。
* 这个操作的目的是告知 RabbitMQ 消费者已经成功处理了该消息,并且可以安全地将其从队列中删除。这样可以确保消息不会被重复消费。
* 需要注意的是,basicAck 方法通常在消费者处理完消息后调用。如果消费者在处理消息期间发生异常或者处理失败,可以选择不调用 basicAck 方法,这样 RabbitMQ 将会将该消息重新放回队列中,以便其他消费者重新处理。
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Publisher.QUEUE_NAME,false,defaultConsumer);
System.out.println("开始监听队列");
System.in.read();
}
}
启动两个消费者,然后再次启动生成者,发现消费者能力强的consume1会尽可能消费,不会受制于消费能力弱的consume2
- consume1
- consume2
3.3.Publish/Subscribe通讯方式
3.3.1.通讯架构
怎么构建一个自定义的交换机,并指定类型是FANOUT让交换机和多个Queue绑定到一起?
3.3.2.生产者
常见的交换机类型如下:
在RabbitMQ中,生产者的消息都是通过交换器来接收,然后再从交换器分发到不同的队列中去,在分发的过程中交换器类型会影响分发的逻辑。rabitmq中的交换器有4种类型
- 直连交换机:Direct exchange
- 扇形交换机:Fanout exchange
- 主题交换机:Topic exchange
- 首部交换机:Headers exchange
其中前三种较为常见,后面一种用的比较少。
生产者:自行构建Exchange并绑定指定队列(FANOUT类型),创建包com.augus.PublishSubscribe ,创建 Publisher 生产者,代码如下:
public class Publisher {
public static final String EXCHANGE_NAME = "pubsub";
public static final String QUEUE_NAME1 = "pubsub1";
public static final String QUEUE_NAME2 = "pubsub2";
@Test
public void publish() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建交换机(设置名称和类型)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//4.构建队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
//5.绑定交换机和队列,使用FANOUT类型的交换机,绑定方式是直接绑定(routingKey是绑定方式,直接绑定空着就行)
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");
//6.发消息到交换机
String message = "你好,明天去哪里";
/**
* EXCHANGE_NAME:指定消息要发送到的交换机名称。在这里,空字符串表示使用默认的交换机。默认的交换机会将消息直接发送到指定的队列中。
* routingKey:交换机和队列的绑定方式,交换机是FANOUT类型,所以是直接绑定,可以写任意内容,不写也可以
* null:指定消息的额外属性。在这里,设置为 null,表示没有指定额外的属性。
* message.getBytes():指定要发送的消息内容。message 是一个字符串,通过调用 getBytes() 方法将其转换为字节数组进行发送。
*/
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("消息发送成功!");
}
}
3.3.3.消费者
创建包com.augus.PublishSubscribe ,创建两个消费者,分别消费不同的队列中的消息,代码如下:
package com.augus.PublishSubscribe;
import com.augus.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Publisher {
public static final String EXCHANGE_NAME = "pubsub";
public static final String QUEUE_NAME1 = "pubsub1";
public static final String QUEUE_NAME2 = "pubsub2";
@Test
public void publish() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建交换机(设置名称和类型)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//4.构建队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
//5.绑定交换机和队列,使用FANOUT类型的交换机,绑定方式是直接绑定(routingKey是绑定方式,直接绑定空着就行)
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");
//6.发消息到交换机
String message = "你好,明天去哪里";
/**
* EXCHANGE_NAME:指定消息要发送到的交换机名称。在这里,空字符串表示使用默认的交换机。默认的交换机会将消息直接发送到指定的队列中。
* routingKey:交换机和队列的绑定方式,交换机是FANOUT类型,所以是直接绑定,可以写任意内容,不写也可以
* null:指定消息的额外属性。在这里,设置为 null,表示没有指定额外的属性。
* message.getBytes():指定要发送的消息内容。message 是一个字符串,通过调用 getBytes() 方法将其转换为字节数组进行发送。
*/
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("消息发送成功!");
}
}
3.3.4.测试
先启动两个消费者,让其处于监听状态,然后再启动生产者创建交换机,绑定队列,两个消费者进行消费,如下:
- consume1:
- consume2
可以暂时测试不写消费者直接在图形化界面中,进行查看消费队列中的消息,如下生产者发送消息,可以看到有两个队列,每一个队列中有一条消息
点击进入任意一个队列
3.4.Routing通讯方式
3.4.1.Routing通讯方式架构
和上一个章节相比,交换机类型不同,同时交换机和队列绑定方式不同,
3.4.2.生产者
之上一个章节注意修改交换机类型和绑定方式,创建com.augus.routing,在创建 Publisher作为生产者:
package com.augus.routing;
import com.augus.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Publisher {
public static final String EXCHANGE_NAME = "routing";
public static final String QUEUE_NAME1 = "routing1";
public static final String QUEUE_NAME2 = "routing2";
@Test
public void publish() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建交换机(设置名称和类型)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//4.构建队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
//5.绑定交换机和队列,使用FANOUT类型的交换机,绑定方式是直接绑定(routingKey是绑定方式,直接绑定空着就行)
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN");
//6.发消息到交换机
/**
* EXCHANGE_NAME:指定消息要发送到的交换机名称。在这里,空字符串表示使用默认的交换机。默认的交换机会将消息直接发送到指定的队列中。
* routingKey:交换机和队列的绑定方式,交换机是FANOUT类型,所以是直接绑定,可以写任意内容,不写也可以
* null:指定消息的额外属性。在这里,设置为 null,表示没有指定额外的属性。
* message.getBytes():指定要发送的消息内容。message 是一个字符串,通过调用 getBytes() 方法将其转换为字节数组进行发送。
*/
channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"天霜拳".getBytes());
channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"排云掌".getBytes());
//如果这里的routingKey和步骤5的不一样,消息就无法发送到队列,匹配不上了
channel.basicPublish(EXCHANGE_NAME,"GREEN",null,"风神腿".getBytes());
System.out.println("消息发送成功!");
}
}
3.4.3.消费者
消费者代码这里和上一个章节一致
public class Consumer {
@Test
public void consume1() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建队列
channel.queueDeclare(Publisher.QUEUE_NAME1,false,false,false,null);
//3.5.设置消息的流控
/**
* 在 RabbitMQ 中,channel.basicQos(3) 是用于设置消费者的预取数量(prefetch count)。它的作用是控制消费者从队列中获取消息的速度。
* 参数 3 表示消费者一次可以预取的消息数量。这意味着在消费者处理完这些消息之前,RabbitMQ 不会向其发送更多的消息。当消费者处理完这些消息之一部分后,RabbitMQ 会再次发送相同数量的消息给消费者。
* 这个机制可以用来实现负载均衡和公平分发。通过限制每个消费者一次获取的消息数量,可以确保每个消费者在处理消息时都能够公平地分配资源,避免某个消费者一次性获取过多的消息而导致其他消费者无法及时处理消息。
* 需要注意的是,channel.basicQos(3) 只对同一个连接下的消费者有效。如果有多个连接,每个连接下的消费者都需要单独设置预取数量。
*/
channel.basicQos(4);
//4.监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者-1号:" + new String(body, StandardCharsets.UTF_8));
/**
* channel.basicAck(envelope.getDeliveryTag(), false) 是用于确认消息的处理结果,并告知 RabbitMQ 可以将该消息从队列中移除。
* 具体作用如下:
* envelope.getDeliveryTag() 获取消息的交付标签(delivery tag),它是一个唯一的标识符,用于标识消息在队列中的位置。
* channel.basicAck(deliveryTag, false) 通过调用 basicAck 方法,向 RabbitMQ 发送确认消息的指令。
* 第二个参数 false 表示只确认当前的消息,而不是之前的所有未确认消息。
* 这个操作的目的是告知 RabbitMQ 消费者已经成功处理了该消息,并且可以安全地将其从队列中删除。这样可以确保消息不会被重复消费。
* 需要注意的是,basicAck 方法通常在消费者处理完消息后调用。如果消费者在处理消息期间发生异常或者处理失败,可以选择不调用 basicAck 方法,这样 RabbitMQ 将会将该消息重新放回队列中,以便其他消费者重新处理。
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Publisher.QUEUE_NAME1,false,defaultConsumer);
System.out.println("开始监听队列");
System.in.read();
}
@Test
public void consume2() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建队列
channel.queueDeclare(Publisher.QUEUE_NAME2,false,false,false,null);
//3.5.设置消息的流控
/**
* 在 RabbitMQ 中,channel.basicQos(3) 是用于设置消费者的预取数量(prefetch count)。它的作用是控制消费者从队列中获取消息的速度。
* 参数 3 表示消费者一次可以预取的消息数量。这意味着在消费者处理完这些消息之前,RabbitMQ 不会向其发送更多的消息。当消费者处理完这些消息之一部分后,RabbitMQ 会再次发送相同数量的消息给消费者。
* 这个机制可以用来实现负载均衡和公平分发。通过限制每个消费者一次获取的消息数量,可以确保每个消费者在处理消息时都能够公平地分配资源,避免某个消费者一次性获取过多的消息而导致其他消费者无法及时处理消息。
* 需要注意的是,channel.basicQos(3) 只对同一个连接下的消费者有效。如果有多个连接,每个连接下的消费者都需要单独设置预取数量。
*/
channel.basicQos(4);
//4.监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者-2号:" + new String(body, StandardCharsets.UTF_8));
/**
* channel.basicAck(envelope.getDeliveryTag(), false) 是用于确认消息的处理结果,并告知 RabbitMQ 可以将该消息从队列中移除。
* 具体作用如下:
* envelope.getDeliveryTag() 获取消息的交付标签(delivery tag),它是一个唯一的标识符,用于标识消息在队列中的位置。
* channel.basicAck(deliveryTag, false) 通过调用 basicAck 方法,向 RabbitMQ 发送确认消息的指令。
* 第二个参数 false 表示只确认当前的消息,而不是之前的所有未确认消息。
* 这个操作的目的是告知 RabbitMQ 消费者已经成功处理了该消息,并且可以安全地将其从队列中删除。这样可以确保消息不会被重复消费。
* 需要注意的是,basicAck 方法通常在消费者处理完消息后调用。如果消费者在处理消息期间发生异常或者处理失败,可以选择不调用 basicAck 方法,这样 RabbitMQ 将会将该消息重新放回队列中,以便其他消费者重新处理。
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
/**
* channel: 这是一个表示与 RabbitMQ 服务器之间的通信通道的对象。您可以使用该通道进行消息的发布和消费操作。
* basicConsume: 这是一个方法,用于注册一个消费者来接收消息。它的作用是告诉 RabbitMQ 服务器开始将消息发送给指定的消费者。
* Publisher.QUEUE_NAME2: 这是一个字符串,表示要消费的队列的名称。您需要将其替换为实际的队列名称。
* false: 这是一个布尔值,表示是否启用消息的自动确认。如果设置为 false,则需要手动确认消息的接收。如果设置为 true,则消息会在消费者接收到后自动确认。
* defaultConsumer: 这是一个实现了 Consumer 接口的对象,用于处理接收到的消息。您需要在 defaultConsumer 对象中实现消息处理的逻辑。
*/
channel.basicConsume(Publisher.QUEUE_NAME2,false,defaultConsumer);
System.out.println("开始监听队列");
System.in.read();
}
}
3.4.4.测试
先启动生产者,查看rabbitMQ的中队列信息,队列1有一条消息,而队列2有两条消息如下:
启动两个消费者,如下图:
- consume1
- consume2
3.5.Topic通讯方式
3.5.1.Topic通讯方式架构图
交换机类型为TOPIC,可以编写带有特殊意义的routingKey的绑定方式
3.5.2.生产者
直连交换机Topic的routing_key方案非常简单,如果我们希望一条消息发送给多个队列,那么这个交换机需要绑定上非常多的routing_key,假设每个交换机上都绑定一堆的routing_key连接到各个队列上。那么消息的管理就会异常地困难。所以RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。
主题交换机的routing_key需要有一定的规则,交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用.作为分隔符,其中:
- *表示一个单词
- #表示任意数量(零个或多个)单词。
假设有一条消息的routing_key为fast.rabbit.white,那么带有这样binding_key的几个队列都会接收这条消息:
- 路由键以
.
为分隔符,每一个分隔符的代表一个单词 - 通配符
*
匹配一个单词、通配符#
可以匹配多个单词 -
*
可以在routingKey
和bindKey
上使用,#
只能用于RoutingKey
中
package com.augus.topic;
import com.augus.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Publisher {
public static final String EXCHANGE_NAME = "topic";
public static final String QUEUE_NAME1 = "topic1";
public static final String QUEUE_NAME2 = "topic2";
@Test
public void publish() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建交换机(设置名称和类型)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//4.构建队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
//5.绑定交换机和队列
/**
* "*.orange.*", "*.*.rabbit", "lazy.#": 这些是路由键(routing key)的模式。路由键是用于将消息从交换机路由到队列的关键信息。
* "*.orange.*": 表示匹配一个单词在第一个位置,一个单词在第三个位置的路由键。例如,"quick.orange.rabbit" 可以匹配该模式
* "*.*.rabbit": 表示匹配两个单词在第二个和第三个位置的路由键。例如,"lazy.brown.rabbit" 可以匹配该模式
* "lazy.#": 表示匹配以 “lazy.” 开头的路由键,后面可以是任意数量的单词。例如,"lazy.pink.rabbit" 和 “lazy.orange.moose” 都可以匹配该模式。
* 其中有两个特殊字符:*(相当于占位符),#(相当通配符)
*/
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");
//6.发消息到交换机
/**
* EXCHANGE_NAME:指定消息要发送到的交换机名称。在这里,空字符串表示使用默认的交换机。默认的交换机会将消息直接发送到指定的队列中。
* routingKey:交换机和队列的绑定方式,交换机是FANOUT类型,所以是直接绑定,可以写任意内容,不写也可以
* null:指定消息的额外属性。在这里,设置为 null,表示没有指定额外的属性。
* message.getBytes():指定要发送的消息内容。message 是一个字符串,通过调用 getBytes() 方法将其转换为字节数组进行发送。
*/
channel.basicPublish(EXCHANGE_NAME,"lazy.orange.moose",null,"天霜拳".getBytes());//符合1 3
channel.basicPublish(EXCHANGE_NAME,"quick.topic.rabbit",null,"排云掌".getBytes());//符合2
channel.basicPublish(EXCHANGE_NAME,"lazy.pink.fanout.fanout",null,"风神腿".getBytes());//符合3
System.out.println("消息发送成功!");
}
}
3.5.3.消费者
消费者和之前一致吗,故而不列出,太繁琐了,
3.5.4.测试
启动生产者代码,根据之前的规则,队列1有1条数据,队列3有两条数据,如下图:
分别启动两个消费者代码
- consume1
- consume2
再次查看,发现数据已经被消费了
3.6.RPC方式
3.6.1.架构说明
因为两个服务在交互时,可以尽量做到Client和Server的解耦,通过RabbitMQ进行解耦操作,需要让Client发送消息时,携带两个属性:
- replyTo告知Server将相应信息放到哪个队列
- correlationId告知Server发送相应消息时,需要携带位置标示来告知Client响应的信息
3.6.2.客户端
客户端代码如下:
package com.augus.rpc;
import com.augus.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class Publisher {
public static final String QUEUE_PUBLISHER = "rpc_publisher";
public static final String QUEUE_CONSUMER = "rpc_consumer";
@Test
public void publish() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建队列
channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
//4.发布消息
String message = "明天去哪里玩?";
String uuid = UUID.randomUUID().toString();
/**
* 下面是创建消息的属性对象。在这里,使用 AMQP.BasicProperties.Builder 构建器创建一个消息属性对象
* replyTo(QUEUE_CONSUMER): 设置回复消息的目标队列,即消费者队列的名称。
* correlationId(uuid): 设置消息的关联ID,用于将请求和响应进行关联。
*/
AMQP.BasicProperties build = new AMQP.BasicProperties()
.builder()
.replyTo(QUEUE_CONSUMER)
.correlationId(uuid)
.build();
/**
* channel.basicPublish("", QUEUE_PUBLISHER, build, message.getBytes());: 这是将消息发布到指定的队列。具体参数的含义如下:
* "": 交换机名称,这里使用空字符串表示使用默认的交换机。
* QUEUE_PUBLISHER: 队列名称,将消息发布到名为 QUEUE_PUBLISHER 的队列。
* build: 消息的属性对象,包含了回复队列和关联ID等信息。
* message.getBytes(): 将消息内容转换为字节数组进行发布。
*/
channel.basicPublish("",QUEUE_PUBLISHER,build,message.getBytes());
//接收服务端的响应
/**
* channel.basicConsume(QUEUE_CONSUMER, false, new DefaultConsumer(channel) {...});: 这是注册一个消费者来接收服务端的响应消息。具体参数的含义如下:
* QUEUE_CONSUMER: 队列名称,指定要消费的队列。
* false: 是否启用消息的自动确认,这里设置为 false,需要手动确认消息的接收。
* new DefaultConsumer(channel) {...}: 创建一个继承自 DefaultConsumer 的匿名内部类,用于处理接收到的响应消息。在 handleDelivery 方法中,打印接收到的响
*/
channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到服务端的响应:"+new String(body, StandardCharsets.UTF_8));
}
});
System.out.println("消息发送成功!");
System.in.read();
}
}
3.6.3.服务端
服务端代码如下:
package com.augus.rpc;
import com.augus.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static final String QUEUE_PUBLISHER = "rpc_publisher";
public static final String QUEUE_CONSUMER = "rpc_consumer";
@Test
public void consume() throws IOException, TimeoutException {
//1. 获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2. 构建Channel
Channel channel = connection.createChannel();
//3. 构建队列
channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
//4.监听消息
//创建一个继承自 DefaultConsumer 的匿名内部类,用于处理接收到的消息。在 handleDelivery 方法中,定义了消费者接收到消息后的处理逻辑。
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获取到消息:" + new String(body, StandardCharsets.UTF_8));
//定义响应的消息
String resp = "周末想去游乐场玩";
//获取请求消息中设置的回复队列名称。
String replyTo = properties.getReplyTo();
//获取请求消息中设置的关联ID
String uuid = properties.getCorrelationId();
//创建一个新的消息属性对象,设置关联ID为之前请求消息中的关联ID
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(uuid).build();
//将响应消息发布到回复队列中
channel.basicPublish("", replyTo, props, resp.getBytes());
//手动确认消息的接收,通知 RabbitMQ 服务器已成功处理该消息。
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
/**
* 注册一个消费者来接收消息。具体参数的含义如下:
* QUEUE_PUBLISHER:队列名称,指定要消费的队列
* false:是否启用消息的自动确认,这里设置为 false,需要手动确认消息的接收。
* defaultConsumer:消费者对象,用于处理接收到的消息。
*/
channel.basicConsume(QUEUE_PUBLISHER,false,defaultConsumer);
System.out.println("开始监听队列");
System.in.read();
}
}
3.6.4.测试
执行客户端、服务端代码如下: