参考:BV15k4y1k7Ep
RabbitMQ 相关概念及简述中简单介绍了 RabbitMQ 提供的 6 种工作模式。下面以简单模式为例,介绍 RabbitMQ 的使用。
新建工程
先新建 Maven 工程 RabbitMQ 作为父工程,在父工程下新建三个子模块:
- common:公共包
- producer:生产者
- consumer:消费者
在三个模块中添加 amqp-client 依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
在 producer 和 consumer 中添加 common 依赖:
<dependency>
<groupId>com.zhangmingge.rabbitmq</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
编写 common
在 common 中添加用于获取 connection 的工具类,后面 producer 和 consumer 都会用到:
package com.zhangmingge.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 主机地址;默认为 localhost
connectionFactory.setHost("192.168.88.128");
// 连接端口;默认为 5672
connectionFactory.setPort(5672);
// 虚拟主机名称;默认为 /
connectionFactory.setVirtualHost("/vhost");
// 连接用户名;默认为 guest
connectionFactory.setUsername("admin");
// 连接密码;默认为 guest
connectionFactory.setPassword("123456");
// 创建连接
return connectionFactory.newConnection();
}
}
编写 producer
package com.zhangmingge.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
// 创建连接
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
/*
* 参数 1:队列名称
* 参数 2:是否定义持久化队列
* 参数 3:是否独占本次连接
* 参数 4:是否在不使用的时候自动删除队列
* 参数 5:队列其它参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 要发送的信息
String message = "你好;小兔子!";
/*
* 参数 1:交换机名称,如果没有指定则使用默认 Default Exchange
* 参数 2:路由 key,简单模式可以传递队列名称
* 参数 3:消息其它属性
* 参数 4:消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();
}
}
在执行上述的消息发送之后,登录 RabbitMQ 的管理控制台,可以发现队列和其中的消息:
编写 consumer
package com.zhangmingge.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
/*
* 参数 1:队列名称
* 参数 2:是否定义持久化队列
* 参数 3:是否独占本次连接
* 参数 4:是否在不使用的时候自动删除队列
* 参数 5:队列其它参数
*/
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
// 创建消费者:并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
/*
* consumerTag 消息者标签,在 channel.basicConsume 时候可以指定
* envelope 消息包的内容,可从中获取消息 id,消息 routingKey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
// 路由 key
System.out.println("路由 key 为:" + envelope.getRoutingKey());
// 交换机
System.out.println("交换机为:" + envelope.getExchange());
// 消息 id
System.out.println("消息 id 为:" + envelope.getDeliveryTag());
// 收到的消息
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
}
};
// 监听消息
/*
* 参数 1:队列名称
* 参数 2:是否自动确认,设置为 true 为表示消息接收到自动向 mq 回复接收到了,mq 接收到回复会删除消息,设置为 false 则需要手动确认
* 参数 3:消息接收到后回调
*/
channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
// 不关闭资源,应该一直监听消息
// channel.close();
// connection.close();
}
}
运行 consumer 后,可以看到 consumer 打印的日志消息,每运行一次 producer,consumer 就会对应打印一次消息。
小结
上述的入门案例中使用的是如下的简单模式:
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序。
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。