- 什么是消息队列
- 消息队列是一种用于在应用程序之间传递消息的通信方式,消息队列允许应用程序异步地发送和接收消息,并且不需要直接连接到对方;
- 消息是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象;
- 队列可以说是一个数据结构,可以存储数据,先进先出;
- 使用消息队列有什么缺点
- 降低系统的可用性:系统引用的外部依赖越多,越容易挂掉;
- 系统复杂度提高:使用MQ后可能需要保证消息没有被重复消费、处理消息丢失的情况,保证消息传递的顺序性等等问题;
- 一致性问题:A系统处理完了直接返回成功了,但问题是:要是B、C、D三个系统那里,B和D两个系统写库成功了,结果C系统写库失败了,就造成数据不一致了。
- Rabbitmq四大核心
- 生产者:发送消息的一个应用程序,将消息发布到特定的队列中,
- 消费者:接受消息的应用程序,从队列中获取消息并进行处理;
- 队列:消息在rabbitmq里的存储区域;
- 交换机:路由的核心组件,生产者发送消息时,消息首先到达交换机,交换机会根据其中的配置,把消息路由到相应的队列中;
- AMQP协议
- 消息:消息头,消息体,消息属性;
- 交换机:消息传递中介,将消息路由到一个或多个队列中;
- 队列:消息容器
- 绑定
- 启动
- 在D:\rabbitmq\rabbitmq\rabbitmq_server-3.13.2\sbin 输入cmd ;
- rabbitmqctl status //查看当前状态
-
rabbitmq-plugins enable rabbitmq_management //开启Web插件
-
rabbitmq-server start //启动服务
-
rabbitmq-server stop //停止服务
-
rabbitmq-server restart //重启服务
- 工作原理
- Broker是rabbitMQ的一个实体,接受分发消息的应用,一个broker可以有多个vhost,vhost下面是exchange交换机,一个vhost可以有多个exchange,一个exchange可以有多个队列,在同一个vhost中交换机和队列不能重名
- pom文件添加
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.16.0</version> </dependency>
- 生产者
- 创建连接
//创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //服务地址 factory.setHost("localhost"); //账号 factory.setUsername("guest"); //密码 factory.setPassword("guest"); //端口号 factory.setPort(5672); //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel();
- 创建交换机
/** * 创建交换机 * 1,交换机名称 * 2.交换机类型,direct,topic,fanout,headers * 3.指定交换机是否需要持久化,如果设置为true,那么交换机的元数据需要持久化 * 4.指定交换机在没有队列绑定时,是否需要删除,设置false表示不删除 * 5.Map<String,Object>类型,用来指定交换机其他的一些机构化参数,一般直接设置为null */ channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,null);
- 生成一个队列
/** * 生成一个队列 * 1.队列名称 * 2.队列是否需要持久化,但是要注意,只是队列名称等这些元数据的持久化,不是队列中消息的持久化 * 3.表示队列是不是私有化,如果是私有化的,只有创建他的应用程序才能消费消息 * 4.队列在没有消费者订阅的情况下是否自动删除; * 5.队列的一些结构化信息,比如声明死信队列,磁盘队列会用到; */ channel.queueDeclare(queueName,true,false,false,null);
-
将我们的交换机和队列绑定
/** * 将我们的交换机和队列绑定 * 1.队列名称 * 2.交换机名称 * 3.路由键,在直连模式下,可以为我们的队列名称 */ channel.queueBind(queueName,exchangeName,queueName);
- 发送消息
String queueName ="xc_queue_name"; String exchangeName ="xc_exchange_name"; //发送消息 String message = "hello rabbitmq"; System.out.println("消息发送成功"); /** * 发送消息 * 1.发送到哪个交换机 * 2.队列名称 * 3.其他参数信息 * 4,发送消息的消息体 */ channel.basicPublish(exchangeName,queueName,null,message.getBytes());
- 关闭连接和通道
channel.close(); connection.close();
- 创建连接
- 消费者
- 建立连接
//创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //服务地址 factory.setHost("localhost"); //账号 factory.setUsername("guest"); //密码 factory.setPassword("guest"); //端口号 factory.setPort(5672); //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel();
- 回调函数
//接受消息的回调函数 DeliverCallback deliverCallback = (consumerTage,message) ->{ System.out.println("接收到消息"+new String(message.getBody())); }; //取消消息的回调函数 CancelCallback cancelCallback = consumerTage ->{ System.out.println("消费信息被中断"); };
- 消费消息
/** * 消费消息 * 1.消费哪个队列 * 2.消费成功之后是否需要自动应答,true 自动应答 * 3.接受消息的回调函数 * 4.取消消息的回调 */ channel.basicConsume("xc_queue_name",true,deliverCallback,cancelCallback);
- 建立连接
- 交换机类型
- direct:路由键与队列完全匹配交换机,通过RoutingKey路由键将交换机和队列进行绑定,消息被发送到exchange时,需要根据消息的RoutingKey,来进行匹配,只将消息发送到完全匹配到此RoutingKey的队列;
- fanout:扇出类型交换机,此种交换机,会将消息分发给所有绑定了此交换机的队列,此时RoutingKey参数无效;
- topic:和direct类似,也是需要通过routingkey路由键进行匹配分发,区别是topic可以进行模糊匹配,direct是完全匹配;
- topic中,将routingkey通过“.”来分为多个部分;
- “*”:代表一个部分;
- “#”:代表0个或多个部分(如果绑定的路由键为“#”时,则接受所有消息,因为路由键所有都匹配);
- headers:匹配AMQP消息的header而不是路由键,此外header交换器和direct交换机完全一致,但性能差很多消费方指定的headers中必须包含一个“x-match”的键;
- x-match = all:表示所有的键值对都匹配才能接受到消息;
- x-match = any:表示只要有键值对匹配就能接受到消息;
-
RabbitMQ
1.什么是生产者
2.什么是消费者,多个消费者消息分发机制
3.生产者和消费者的确认机制
4.Channels ,Exchanges,Queues