概念:
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
对比:
RabbitMQ对比Kafka mq来说更安全可靠 但大数据处理来说kafka更适合IO高吞吐的处理
安装部署(基于linux):
https://blog.csdn.net/kuaixiao0217/article/details/128593503
基于springboot实现MQ
引入依赖pom.xml 配置文件application.yaml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring: application: name: springboot_rabbitmq rabbitmq: host: 10.15.0.9 port: 5672 username: ems password: 123 virtual-host: /ems
第一种模式 直连
生产者
package com.example; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class DemoApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { rabbitTemplate.convertAndSend("hello","hello world"); // 生产端没有指定交换机只有routingKey和Object。 //消费方产生hello队列,放在默认的交换机(AMQP default)上。 //而默认的交换机有一个特点,只要你的routerKey的名字与这个 //交换机的队列有相同的名字,他就会自动路由上。 //生产端routingKey 叫hello ,消费端生产hello队列。 //他们就路由上了 } }
消费者
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component // 生产端没有指定交换机只有routingKey和Object。 //消费方产生hello队列,放在默认的交换机(AMQP default)上。 //而默认的交换机有一个特点,只要你的routerKey的名字与这个 //交换机的队列有相同的名字,他就会自动路由上。 //生产端routingKey 叫hello ,消费端生产hello队列。 //他们就路由上了 @RabbitListener(queuesToDeclare = @Queue(value = "hello")) public class HelloCustomer { @RabbitHandler public void receive1(String message){ System.out.println("message = " + message); } }
第二种模式 work模型
生产者
package com.example; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class DemoApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("work","hello work!"); // 生产端没有指定交换机只有routingKey和Object。 //消费方产生work队列,放在默认的交换机(AMQP default)上。 //而默认的交换机有一个特点,只要你的routerKey的名字与这个 //交换机的队列有相同的名字,他就会自动路由上。 //生产端routingKey 叫work ,消费端生产work队列。 //他们就路由上了 } } }
消费者
package com.example; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class WorkCustomer { // 生产端没有指定交换机只有routingKey和Object。 //消费方产生work队列,放在默认的交换机(AMQP default)上。 //而默认的交换机有一个特点,只要你的routerKey的名字与这个 //交换机的队列有相同的名字,他就会自动路由上。 //生产端routingKey 叫work ,消费端生产work队列。 //他们就路由上了 @RabbitListener(queuesToDeclare = @Queue("work")) public void receive1(String message){ System.out.println("work message1 = " + message); } @RabbitListener(queuesToDeclare = @Queue("work")) public void receive2(String message){ System.out.println("work message2 = " + message); } }
Fanout 广播模型
生产者
package com.example; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class DemoApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { rabbitTemplate.convertAndSend("logs","","这是日志广播"); // 参数1为交换机,参数2为路由key,“”表示为任意路由,参数3为消息内容 } }
消费者
package com.example; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class WorkCustomer { @RabbitListener(bindings = @QueueBinding( value = @Queue, // 创建临时队列 exchange = @Exchange(name = "logs", type = "fanout") )) public void receive1(String message) { System.out.println("message1 = " + message); } @RabbitListener(bindings = @QueueBinding( value = @Queue, //创建临时队列 exchange = @Exchange(name = "logs", type = "fanout") //绑定交换机类型 )) public void receive2(String message) { System.out.println("message2 = " + message); } }
Route 路由模型
生产者
package com.example; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class DemoApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { rabbitTemplate.convertAndSend("directs","error","error 的日志信息"); } }
消费者
package com.example; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class WorkCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, // 创建临时队列 key = {"info", "error"}, // 路由key exchange = @Exchange(type = "direct", name = "directs") )}) public void receive1(String message) { System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, key = {"error"}, exchange = @Exchange(type = "direct", name = "directs") )}) public void receive2(String message) { System.out.println("message2 = " + message); } }
Topic 订阅模型(动态路由模型)
生产者
package com.example; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class DemoApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { rabbitTemplate.convertAndSend("topics","user.save.findAll","user.save.findAll 的消息"); } }
消费者
package com.example; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class WorkCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, key = {"user.*"}, exchange = @Exchange(type = "topic",name = "topics") ) }) public void receive1(String message){ System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, key = {"user.#"}, exchange = @Exchange(type = "topic",name = "topics") ) }) public void receive2(String message){ System.out.println("message2 = " + message); } }
标签:复习,队列,springframework,RabbitMQ,rabbit,import,org,annotation,amqp From: https://www.cnblogs.com/ziwang520/p/17617522.html