安装
1.下载erlang并安装,地址:http://erlang.org
2.下载mq并安装,地址:http://www.rabbitmq.com/download.html
3.安装完成后,管理后台地址:http://localhost:15672,初始账号和密码:guest/guest
优缺点
优点:解耦、削峰、数据分发
缺点:
- 系统可用性降低;系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响
- 系统复杂度提高;MQ的加入大大增加了系统的复杂性,以前系统间是同步的远程调用,现在是通过进行异步调用。如何保证MQ的高可用?如何保证消息没有被重复消费?怎么处理消息丢失情况?怎么保证消息传递的顺序性?
- 一致性问题;A系统处理完业务,通过MQ给BCD三个系统发消息数据,如果B系统,C系统处理成功,D系统处理失败。如何保证消息数据处理的一致性?
对比
ActiveMQ | RabbitMQ | RocketMQ | kafka | |
---|---|---|---|---|
开发语言 | java | erlang | java | scala |
单击吞吐量 | 万级 | 万级 | 十万级 | 十万级 |
时效性 | ms级 | us级 | ms级 | ms级以内 |
可用性 | 高(主从架构) | 高(主从架构) | 非常高(分布式架构) | 非常高(分布式架构) |
功能特性 | 成熟的产品,在很多公司得到应用,有较多的文档;各种协议支持较好 | 基于erlang开发,所以并发能力很强,性能及其好,延时很低;管理界面丰富,缺点是很难进行二次开发 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。 |
备注:初学,以下内容涉及的队列和交换机,以及绑定关系都是在管理后台进行的操作
//Spring Boot项目中引入amqp包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
//配置文件中加入如下配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
一、直接发送消息到队列,不经过交换机
1、简单队列,只有一个消费者
/**
* 简单队列
*/
@Test
public void testSimpleQueue(){
String queueName ="simple.queue";
String message ="testSimpleQueue";
rabbitTemplate.convertAndSend(queueName,message);
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者收到消息====================="+msg);
}
2、工作队列,多个消费者
/**
* 工作队列
*/
@Test
public void testWorkQueue() throws InterruptedException {
String queueName ="work.queue";
String message ="testWorkQueue";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName,message+i);
Thread.sleep(20);
}
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) {
System.out.println("消费者work.queue 1收到消息====================="+msg);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.out.println("消费者work.queue 2收到消息====================="+msg);
Thread.sleep(100);
}
每个消息只会被消费一次,默认情况下消费者轮流消费,可以通过设置spring.rabbitmq.listener.simple.prefetch=1来确保同一时刻最多投递给消费者1条消息,从而避免消息堆积
二、消息经过交换机
交换机的作用是对消息进行路由,但不具备保存消息的能力。
1.Fanout交换机
会将接收到的消息广播到每一个跟其绑定的队列,所以也叫广播模式
举例,交换机和队列进行绑定如下,
/**
* fanout交换机
*/
@Test
public void testSimpleQueue(){
String exchangeName ="test.fanout";
String message ="hello,everyone!";
rabbitTemplate.convertAndSend(exchangeName,null,message);
}
@RabbitListener(queues = "fanout.queue")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者fanout.queue 收到消息====================="+msg);
}
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者fanout.queue 1收到消息====================="+msg);
}
结果:
消费者fanout.queue 收到消息=====================hello,everyone!
消费者fanout.queue 1收到消息=====================hello,everyone!
2.Direct交换机
Direct交换机会将接收到的消息根据规则路由到指定的队列,因此称为定向路由。
每一个队列都与交换机设置一个BindingKey;
发布者发送消息时,指定消息的RoutingKey;
交换机将消息路由到BingingKey与消息RoutingKey一致的队列;
举例,交换机与队列的BindingKey如下,
@Test
public void testDirectQueue(){
String exchangeName ="test.direct";
String message ="hello!";
rabbitTemplate.convertAndSend(exchangeName,"red",message);
rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者direct.queue1 收到消息====================="+msg);
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者direct.queue2 收到消息====================="+msg);
}
结果:
消费者direct.queue1 收到消息=====================hello!
消费者direct.queue2 收到消息=====================hello!
消费者direct.queue1 收到消息=====================hello!
3.Topic交换机
与Direct交换机类似,区别在于RoutingKey可以是多个单词的列表,以.分隔。BindingKey可以使用通配符。#代指0个或多个单词,*代指一个单词(比如routingKey是多个单词,就不匹配)
举例,当交换机与队列的BindingKey如下时,
@Test
public void testTopicQueue(){
String exchangeName ="test.topic";
rabbitTemplate.convertAndSend(exchangeName,"japan.news","日本红色预警!");
rabbitTemplate.convertAndSend(exchangeName,"china.news","中国蓝色预警!");
rabbitTemplate.convertAndSend(exchangeName,"china.weather","阳光明媚!");
}
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) {
System.out.println("消费者topic.queue1 收到消息====================="+msg);
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) {
System.out.println("消费者topic.queue2 收到消息====================="+msg);
}
结果:
消费者topic.queue2 收到消息=====================日本红色预警!
消费者topic.queue1 收到消息=====================中国蓝色预警!
消费者topic.queue2 收到消息=====================中国蓝色预警!
消费者topic.queue1 收到消息=====================阳光明媚!
标签:String,void,笔记,学习,交换机,消息,msg,RabbitMQ,public
From: https://www.cnblogs.com/ginb/p/17891313.html