一、RabbitMQ的基本概念,以及6种工作模式,消息确认机制
RabbitMQ 简介:RabbitMQ 基于 AMQP 标准,采用 Erlang 语言开发的消息中间件。
基本概念:
●Producer:作为消息的生成者。
●Consumer:作为消息的消费者。
●Connection:消息的发布方或者消息的消费方 和broker 之间的 TCP 连接。
●Channel:Channel 是在 connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以 channel之间是完全隔离的,减少了操作系统建立 TCP connection 的开销。
●Broker:接收和分发消息的应用,RabbitMQ服务就是Message Broker。
●Virtual host:虚拟机,出于多租户和安全因素设计的,把 AMQP的基本组件划分到一个虚拟的分组中,可以类比mysql数据库会创建很多库,库和库之间是独立的。当多个不同的用户使用同一个RabbitMQserver 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
●Queue:队列,消息队列,接收消息、缓存消息,消息最终被送到这里等待 consumer 取走。
●Binding:绑定,exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
●Exchange:交换机,message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。
交换机常用的类型有: Fanout:广播,将消息交给所有绑定到交换机的队列 Direct:定向,把消息交给符合指定routing key 的队列 Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
二、6 种工作模式
1.理论
RabbitMQ 提供了 6 种工作模式,简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算消息队列)
简单模式
一个生产者生产消息发送到队列里面,一个消费者从队列里面拿消息,进行消费消息。一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)说明:类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
Work queues 工作队列模式
一个生产者生产消息发送到队列里面,一个或者多个消费者从队列里面拿消息,进行消费消息。一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)说明:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。应用场景:过年过节12306抢票,发短信给用户,可以接入多个短信服务进行发送,提供任务的处理速度。
Pub/Sub 订阅模式
一个生产者生产消息发送到交换机里面,由交换机处理消息,队列与交换机的任意绑定,将消息指派给某个队列,一个或者多个消费者从队列里面拿消息,进行消费消息。需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。说明:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
Routing 路由模式
一个生产者生产消息发送到交换机里面,并且指定一个路由key,队列与交换机的绑定是通过路由key进行绑定的,消费者在消费的时候需要根据路由key从交换机里面拿消息,进行消费消息。需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。说明:Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。
Topics 通配符模式
一个生产者生产消息发送到交换机里面,并且使用通配符的形式(类似mysql里面的模糊查询,比如想获取一批带有item前缀的数据),队列与交换机的绑定是通过通配符进行绑定的,消费者在消费的时候需要根据根据通配符从交换机里面拿消息,进行消费消息。需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列
说明:通配符规则:# 匹配一个或多个词, 匹配不多不少恰好1个词。例如:Lazy.# 能够匹配 Lazy.insert.content或者 Lazy.insert,Lazy. 只能匹配Lazy.insert。
2.代码
创建一个Maven工程,引入pom依赖 <dependencies> <!--rabbitmq客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.3.0</version> </dependency> <!--json转换工具包--> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> </dependencies>创建一个连接Rabbitmq的工具类: import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitUtils { private static ConnectionFactory connectionFactory = new ConnectionFactory(); static { connectionFactory.setHost("你的rabbitmq的ip地址"); connectionFactory.setPort(5672);//RabbitMQ的默认端口号,根据实际情况修改 connectionFactory.setUsername("你的rabbitmq的用户名称"); connectionFactory.setPassword("你的rabbitmq的用户密码"); connectionFactory.setVirtualHost("你的rabbitmq的虚拟机"); } public static Connection getConnection(){ Connection conn = null; try { conn = connectionFactory.newConnection(); return conn; } catch (Exception e) { throw new RuntimeException(e); } } }
简单模式
为了区分好理解,我每个模式都去创建一个虚拟机,这里我先去rabbitMq管控页面创建一个虚拟机
修改工具类的虚拟机:
生产者:
import com.liao.rabbitmq.utils.RabbitConstant;消费者:
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
public static void main(String[] args) throws Exception {
//获取TCP长连接
Connection conn = RabbitUtils.getConnection();
//创建通信“通道”,相当于TCP中的虚拟连接
Channel channel = conn.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//channel.queueDeclare的五个参数
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare(RabbitConstant.QUEUE_TEST,false, false, false, null);
String message = "要发送的message";
//channel.basicPublish的四个参数
//exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到
//队列名称
//额外的设置属性
//最后一个参数是要传递的消息字节数组
channel.basicPublish("", RabbitConstant.QUEUE_TEST, null,message.getBytes());
channel.close();
conn.close();
System.out.println("===发送成功===");
}
}
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception{
//获取TCP长连接
Connection conn = RabbitUtils.getConnection();
//创建通信“通道”,相当于TCP中的虚拟连接
Channel channel = conn.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare(RabbitConstant.QUEUE_TEST,false, false, false, null);
//从MQ服务器中获取数据
//创建一个消息消费者
//第一个参数:队列名
//第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
//第三个参数要传入DefaultConsumer的实现类
channel.basicConsume(RabbitConstant.QUEUE_TEST, false, new Reciver(channel));
}
}
class Reciver extends DefaultConsumer {
private Channel channel;
//重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
public Reciver(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者接收到的消息:" + message);
System.out.println("消息的TagId:" + envelope.getDeliveryTag());
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
我先启动消费者后启动生产者,这样只要生产者一生产消息,消费者就可以立马消费。
Work queues 工作队列模式
为了区分好理解,我每个模式都去创建一个虚拟机,这里我先去rabbitMq管控页面创建一个虚拟机
修改工具类的虚拟机
为了模拟某些业务,这里使用自定义实体类发送消息,所以我新建了一个自定义实体类
public class SenderContent { private String name; private String content; public SenderContent(String name, String content) { this.name = name; this.content = content; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
生产者: import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.google.gson.Gson; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 生成者 */ public class Producer { public static void main(String[] args) throws Exception { Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null); for(int i = 1 ; i <= 100 ; i++) { SenderContent senderContent = new SenderContent("姓名:" + i, "内容:" + i); String jsonSMS = new Gson().toJson(senderContent); channel.basicPublish("" , RabbitConstant.QUEUE_SENDER_CONTENT , null , jsonSMS.getBytes()); } System.out.println("发送数据成功"); channel.close(); connection.close(); } } 消费者一: /** * 消费者1 */ public class ConsumerOne { public static void main(String[] args) throws Exception { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null); //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.basicQos(1);//处理完一个取一个 channel.basicConsume(RabbitConstant.QUEUE_SENDER_CONTENT , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("ConsumerOne-发送成功:" + jsonSMS); try { Thread.sleep(10); } catch (Exception e) { e.printStackTrace(); } //确认签收 channel.basicAck(envelope.getDeliveryTag() , false); } }); } } 消费者二: import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * 消费者2 */ public class ConsumerTwo { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null); //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.basicQos(1);//处理完一个取一个 channel.basicConsume(RabbitConstant.QUEUE_SENDER_CONTENT , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("ConsumerTwo-发送成功:" + jsonSMS); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //确认签收 channel.basicAck(envelope.getDeliveryTag() , false); } }); } } 消费者三: import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * 消费者3 */ public class ConsumerThree { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null); //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.basicQos(1);//处理完一个取一个 channel.basicConsume(RabbitConstant.QUEUE_SENDER_CONTENT , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("ConsumerThree-发送成功:" + jsonSMS); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } //确认签收 channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
这里对每个消费者设置不同的休眠时间演示每个消费者处理业务的时间不同,查看消息消费的情况
可以看出消费者一消费的最多,消费者三消费的最少,因为代码中设置了这个
channel.basicQos(1);//处理完一个取一个
消费者处理完一个消息后(确认后),在从队列中获取一个新的。
Pub/Sub 订阅模式
为了区分好理解,我每个模式都去创建一个虚拟机,这里我先去rabbitMq管控页面创建一个虚拟机
创建一个交换机:这里用广播模式作为交换机的类型用来演示
修改工具类的虚拟机
生产者 import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.util.Scanner; /** * 发布者 */ public class Producer { public static void main(String[] args) throws Exception { Connection connection = RabbitUtils.getConnection(); //键盘输入 String input = new Scanner(System.in).next(); Channel channel = connection.createChannel(); //第一个参数交换机名字 其他参数和之前的一样 channel.basicPublish(RabbitConstant.EXCHANGE_CONTENT,"" , null , input.getBytes()); channel.close(); connection.close(); } } 消费者一: import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException;
/** * 消费者1 */ public class ConsumerOne { public static void main(String[] args) throws IOException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_ONE, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到) channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT, ""); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_ONE , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者一收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
消费者二:
import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * 消费者2 */ public class ConsumerTwo { public static void main(String[] args) throws IOException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_TWO, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到) channel.queueBind(RabbitConstant.QUEUE_TWO, RabbitConstant.EXCHANGE_CONTENT, ""); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_TWO , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者二收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
演示效果:
Routing 路由模式
为了区分好理解,我每个模式都去创建一个虚拟机,这里我先去rabbitMq管控页面创建一个虚拟机
修改工具类的虚拟机
创建交换机:这里的交换机type类型一定要改成routing模式,如果还是广播模式的fanout的话,跟上面发布和订阅模式出现的效果会是一样的。
错误实例:
正确的实例:
生产者:
import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; /** * 发布者 */ public class Producer { public static void main(String[] args) throws Exception { Map area = new LinkedHashMap<String, String>(); area.put("routing.one.a.20201127", "中国湖南长沙20201127私密数据"); area.put("routing.one.d.20201128", "中国河北石家庄20201128私密数据"); area.put("routing.two.b.20201127", "中国湖北武汉20201127私密数据"); area.put("routing.two.e.20201128", "中国湖北武汉20201128私密数据"); area.put("routing.three.c.20201127", "中国湖南株洲20201128私密数据"); area.put("routing.three.f.20201128", "中国河南郑州20201128私密数据"); area.put("us.one.a.20201127", "美国加州洛杉矶20201127私密数据"); area.put("us.two.b.20201128", "美国加州洛杉矶20201128私密数据"); Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator(); while (itr.hasNext()) { Map.Entry<String, String> me = itr.next(); //第一个参数交换机名字 第二个参数作为 消息的routing key channel.basicPublish(RabbitConstant.EXCHANGE_CONTENT_ROUTING,me.getKey() , null , me.getValue().getBytes()); } channel.close(); connection.close(); } }
消费者一:
import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * 消费者1 */ public class ConsumerOne { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_ONE, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT_ROUTING, "routing.one.a.20201127"); channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT_ROUTING, "us.one.a.20201127"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_ONE , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
消费者二:
import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * 消费者2 */ public class ConsumerTwo { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_TWO, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key channel.queueBind(RabbitConstant.QUEUE_TWO, RabbitConstant.EXCHANGE_CONTENT_ROUTING, "routing.one.d.20201128"); channel.queueBind(RabbitConstant.QUEUE_TWO, RabbitConstant.EXCHANGE_CONTENT_ROUTING, "routing.two.e.20201128"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_TWO , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
效果:
路由模式需要消费者指定路由key
Topics 通配符模式
为了区分好理解,我每个模式都去创建一个虚拟机,这里我先去rabbitMq管控页面创建一个虚拟机
修改工具类的虚拟机
创建交互机,类型为topic
import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; /** * 发布者 */ public class Producer { public static void main(String[] args) throws Exception { Map area = new LinkedHashMap<String, String>(); area.put("routing.one.a.20201127", "中国湖南长沙20201127私密数据"); area.put("routing.one.d.20201128", "中国河北石家庄20201128私密数据"); area.put("routing.two.b.20201127", "中国湖北武汉20201127私密数据"); area.put("routing.two.e.20201128", "中国湖北武汉20201128私密数据"); area.put("routing.three.c.20201127", "中国湖南株洲20201128私密数据"); area.put("routing.three.f.20201128", "中国河南郑州20201128私密数据"); area.put("us.one.a.20201127", "美国加州洛杉矶20201127私密数据"); area.put("us.two.b.20201128", "美国加州洛杉矶20201128私密数据"); Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator(); while (itr.hasNext()) { Map.Entry<String, String> me = itr.next(); //第一个参数交换机名字 第二个参数作为 消息的routing key channel.basicPublish(RabbitConstant.EXCHANGE_CONTENT_TOPIC,me.getKey() , null , me.getValue().getBytes()); } channel.close(); connection.close(); } }
消费者一:
import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * 消费者1 */ public class ConsumerOne { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_ONE, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT_TOPIC, "*.*.*.20201127"); // channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT_TOPIC, "us.two.b.20201128"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_ONE , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
消费者二:
import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * 消费者2 */ public class ConsumerTwo { public static void main(String[] args) throws IOException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_TWO, false, false, false, null); //指定队列与交换机以及routing key之间的关系 channel.queueBind(RabbitConstant.QUEUE_TWO, RabbitConstant.EXCHANGE_CONTENT_TOPIC, "us.#"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_TWO , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
效果:
说明:如果想切换模式进行测试,只需要修改工具类中的虚拟机即可。前面的命名都是一样的,就是为了在这个时候体现出每个虚拟机都是隔离的,所以那么key是一样的也没关系。
三、消息确认机制:confirm状态和return状态
1.理论
confirm状态:表示生产者将消息投递到Broker时产生的状态,会出现二种情况:
- ack:表示已经被Broker签收
- nack:表示表示已经被Broker拒收,原因可能有队列满了,限流,IO异常...
return状态:表示生产者将消息投递到Broker,被Broker签收,但是没有对应的队列进行投递,将消息回退给生产者的状态。
说明:这二种状态都只和生产者有关,于消费者没关系。
2.代码
沿用之前的topic虚拟机,就不在创建新的虚拟机了
创建一个交换机:
import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; /** * 发布者 */ public class Producer { public static void main(String[] args) throws Exception { Map area = new LinkedHashMap<String, String>(); area.put("routing.one.a.20211001", "中国长沙20211001私密数据"); area.put("routing.two.b.20211001", "中国武汉20211001私密数据"); area.put("routing.three.c.20211001", "中国株洲20211001私密数据"); area.put("routing.one.d.20211002", "中国石家庄20211002私密数据"); area.put("routing.two.e.20211002", "中国武汉20211002私密数据"); area.put("routing.three.f.20211002", "中国郑州20211002私密数据"); area.put("routing.error.f.aaa", "未成功投递的私密数据"); area.put("us.one.a.20211001", "美国洛杉矶20211001私密数据"); area.put("us.two.b.20211002", "美国洛杉矶20211002私密数据"); Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); //开启confirm监听模式 channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { public void handleAck(long l, boolean b) throws IOException { //第二个参数代表接收的数据是否为批量接收,一般我们用不到。 System.out.println("消息已被Broker接收,Tag:" + l ); } public void handleNack(long l, boolean b) throws IOException { System.out.println("消息已被Broker拒收,Tag:" + l); } }); channel.addReturnListener(new ReturnCallback() { public void handle(Return r) { System.err.println("==========================="); System.err.println("Return编码:" + r.getReplyCode() + "-Return描述:" + r.getReplyText()); System.err.println("交换机:" + r.getExchange() + "-路由key:" + r.getRoutingKey() ); System.err.println("Return主题:" + new String(r.getBody())); System.err.println("==========================="); } }); Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator(); while (itr.hasNext()) { Map.Entry<String, String> me = itr.next(); //Routing key 第二个参数相当于数据筛选的条件 //第三个参数为:mandatory true代表如果消息无法正常投递则return回生产者,如果false,则直接将消息放弃。 channel.basicPublish(RabbitConstant.EXCHANGE_CONTENT_TOPIC_CONFIRM,me.getKey() ,true, null , me.getValue().getBytes()); } //如果关闭则无法进行监听,因此此处不需要关闭 /*channel.close(); connection.close();*/ } }
消费者一:
import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * 消费者1 */ public class ConsumerOne { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_ONE, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT_TOPIC_CONFIRM, "*.*.*.20211001"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_ONE , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1收到信息:" + new String(body)); //channel.basicNack的三个参数 //第一个参数:long deliveryTag:唯一标识 ID。 //第二个参数:boolean multiple:是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。 //第三个参数:boolean requeue:如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。 // channel.basicNack(envelope.getDeliveryTag() , false,false); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
消费者二:
import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * 消费者2 */ public class ConsumerTwo { public static void main(String[] args) throws IOException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_TWO, false, false, false, null); //指定队列与交换机以及routing key之间的关系 channel.queueBind(RabbitConstant.QUEUE_TWO, RabbitConstant.EXCHANGE_CONTENT_TOPIC_CONFIRM, "us.#"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_TWO , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2收到信息:" + new String(body)); // channel.basicNack(envelope.getDeliveryTag() , false,false); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
演示效果:
可以看到打印return都是key中没有20211001后缀或者没有us前缀的数据
四、RabbitMQ集群搭建
1.集群搭建步骤
设置服务器别名
- 服务器1:hostnamectl set‐hostname m1 - 服务器2:hostnamectl set‐hostname m2
在m1服务器中统一 erlang.cookie 文件中 cookie 值 将m1中的 .erlang.cookie 同步到 m2中
scp /var/lib/rabbitmq/.erlang.cookie m2:/var/lib/rabbitmq/.erlang.cookie
说明:上面的命令中m2使用ip也行
Rabbitmq 集群添加节点:重启m2机器中rabbitmq 的服务在m2执行
#停止用户请求 rabbitmqctl stop_app #将m2合并到集群中 rabbitmqctl join_cluster ‐‐ram rabbit@m2 #开启用户请求 rabbitmqctl start_app #开启管理页面 rabbitmq‐plugins enable rabbitmq_management #重启服务 systemctl restart rabbitmq‐server.service
查看集群信息
rabbitmqctl cluster_status
2.集群搭建负载均衡-HAProxy搭建
1.执行安装
#1、安装 yum install haproxy
#2、配置haproxy.cfg文件 具体参照 如下配置HAProxy vim /etc/haproxy/haproxy.cfg。进入文件找到maxconn 3000把后面的内容都删除,添加集群监听,开启haproxy监控服务,代码如下:
#对MQ集群进行监听 listen rabbitmq_cluster bind 0.0.0.0:5672 option tcplog mode tcp option clitcpka timeout connect 1s timeout client 10s timeout server 10s balance roundrobin server node1 节点1 ip地址:5672 check inter 5s rise 2 fall 3 server node2 节点2 ip地址:5672 check inter 5s rise 2 fall 3 #开启haproxy监控服务 listen http_front bind 0.0.0.0:1080 stats refresh 30s stats uri /haproxy_stats stats auth admin:admin
#3、启动haproxy systemctl start haproxy
#4、查看haproxy进程状态 systemctl status haproxy.service
#状态如下说明已经启动成功 Active: active (running)
#访问如下地址对mq节点进行监控 http://服务器IP:1080/haproxy_stats
#代码中访问mq集群地址,则变为访问haproxy地址:5672
2.haproxy.cfg配置详解
listen rabbitmg cluster bind 0.0.0.0:5672#通过5672对M1, M2进行映射 option tcplog #记录tcp连接的状态和时间 mode tcp#四层协议代理,即对TCP协议转发 option clitcpka #开启TCP的Keep Alive. (长连接模式) timeout connect 1s #haproxy与mq建立连接的超时时间 timeout client 10s#客户端与haproxy最大空闲时间。 timeout server 10s #服务器与haproxy最大空闲时间 balance roundrobin #采用轮询转发消息 #每5秒发送一次心跳包,如连续两次有响应则代表状态良好。 #如连续三次没有响应,则视为服务故障,该节点将被剔除。 server node1 ip1:5672 check inter 5s rise 2 fall 3 server node2 ip2:5672 check inter 5s rise 2 fall 3 listen http front #监听端口 bind 0.0.0.0:1080 #统计页面自动刷新时间stats refresh 30s #统计页面url stats uri /haproxy?stats #指定HAproxy访问用户名和密码设置 stats auth admin:admin
这个时候连接就可以通过haproxy代理连接啦,当然啦,HAProxy也有自己的管理页面,就是你安装服务器的ip加配置的1080端口,可以直接访问啦,当然HA也可以配置多台的。
五、常见问题
1.消息可靠性投递
消息可靠性实现需要保证以下几点:
- 持久化
exchange要持久化
queue要持久化
message要持久化
- 生产方确认Confirm
- 消费方确认Ack
- Broker高可用
1.rabbitmq 整个消息投递的路径
producer--->rabbitmq broker--->exchange--->queue--->consumer
- 消息从producer 到 exchange 则会返回一个 confirmCallback 。
- 消息从exchange-->queue 投递失败则会返回一个 returnCallback 。
2.实现消息可靠性投递的步骤
- 生产者设置ConnectionFactory的publisher-confirms="true" 开启 确认模式。
- 生产者设置ConnectionFactory的publisher-returns="true" 开启 退回模式。
- 生产者使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
- 生产者使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
- 消费者在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认(none自动确认模式很危险,当生产者发送多条消息,消费者接收到一条信息时,会自动认为当前发送的消息已经签收了,这个时候消费者进行业务处理时出现了异常情况,也会认为消息已经正常签收处理了,而队列里面显示都被消费掉了。所以真实开发都会改为手动签收,可以防止消息丢失)
- 消费者如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
- 消费者如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
2.消息在消费端限流
1.限流示例图
2.实现步骤
- 在rabbit:listener-container中配置 prefetch属性设置消费端一次拉取多少消息
- 消费端的确认模式一定为手动确认:acknowledge="manual"
3.TTL
1.业务场景
比如我们在下订单的时候,如果超过30分钟未支付,就取消这个订单,把当前商品的库存加回去。
2.定义
TTL 全称 Time To Live(存活时间/过期时间),当消息到达存活时间后,还没有被消费,会被自动清除。RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
3.实现步骤
- 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
- 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
- 如果两者都进行了设置,以时间短的为准。
4.死信队列
1.定义
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
说明:死信交换机和死信队列和普通的没有区别,当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列。
2.消息成为死信的三种情况
- 队列消息长度到达限制;
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
3.队列绑定死信交换机
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
5.延迟队列
1.定义
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。在RabbitMQ中并未提供延迟队列功能,但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
2.场景
- 下单后,30分钟未支付,取消订单,回滚库存。
- 新用户注册成功7天后,发送短信问候。
6.消息积压
1.场景
- 消费者宕机消息积压
- 消费者消费能力不足
- 发送这发送流量太大
2.解决方案
上线更多的消费者,进行正常消费,上线专门的队列消费访问,先将消息批量取出来,记录到数据库中,再慢慢处理。
7.消息幂等性
1.定义
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
2.解决方案
消息幂等性保障--乐观锁机制
标签:false,队列,基础,实践,RabbitMQ,RabbitConstant,rabbitmq,import,channel From: https://www.cnblogs.com/mushang/p/17541453.html