首页 > 其他分享 >SpringCloud-Stream整合RabbitMQ

SpringCloud-Stream整合RabbitMQ

时间:2023-01-06 16:23:48浏览次数:60  
标签:Stream SpringCloud RabbitMQ binder MqMessageSource spring msg public cloud

我们知道,当微服务越来越来多的时候,仅仅是feign的http调用方式已经满足不了我们的使用场景了。这个时候系统就需要接入消息中间件了。相比较于传统的Spring项目、SpringBoot项目使用消息中间件的很多配置不同,SpringCloud Stream抽象了中间件产品的不同,在SpringCloud中你仅仅需要修改几行配置文件就可以灵活的切换中间件产品而不需要修改任何代码。

现在我们以SpringCloud Stream整合RabbitMQ为例来学习一下

创建生产者

1. 引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. 定义配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spring:
cloud:
stream:
binders:
test:
type: rabbit
environment:
spring:
rabbitmq:
addresses: 10.0.20.132
port: 5672
username: root
password: root
virtual-host: /unicode-pay
bindings:
testOutPut:
destination: testRabbit
content-type: application/json
default-binder: test

现在来解释一下这些配置的含义

  1. binders: 这是一组binder的集合,这里配置了一个名为test的binder,这个binder中是包含了一个rabbit的连接信息
  2. bindings:这是一组binding的集合,这里配置了一个名为testOutPut的binding,这个binding中配置了指向名test的binder下的一个交换机testRabbit。
  3. 扩展: 如果我们项目中不仅集成了rabbit还集成了kafka那么就可以新增一个类型为kafka的binder、如果项目中会使用多个交换机那么就使用多个binding,

3.创建通道

1
2
3
4
5
6
7
8
public interface  MqMessageSource {

String TEST_OUT_PUT = "testOutPut";

@Output(TEST_OUT_PUT)
MessageChannel testOutPut();

}

这个通道的名字就是上方binding的名字

4. 发送消息

1
2
3
4
5
6
7
8
9
10
11
12
@EnableBinding(MqMessageSource.class)
public class MqMessageProducer {
@Autowired
@Output(MqMessageSource.TEST_OUT_PUT)
private MessageChannel channel;


public void sendMsg(String msg) {
channel.send(MessageBuilder.withPayload(msg).build());
System.err.println("消息发送成功:"+msg);
}
}

这里就是使用上方的通道来发送到指定的交换机了。需要注意的是withPayload方法你可以传入任何类型的对象,但是需要实现序列化接口

5. 创建测试接口

EnableBinding注解绑定的类默认是被Spring管理的,我们可以在controller中注入它

1
2
3
4
5
6
7
8
@Autowired
private MqMessageProducer mqMessageProducer;

@GetMapping(value = "/testMq")
public String testMq(@RequestParam("msg")String msg){
mqMessageProducer.sendMsg(msg);
return "发送成功";
}

生产者的代码到此已经完成了。

创建消费者

1. 引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. 定义配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spring:
cloud:
stream:
binders:
test:
type: rabbit
environment:
spring:
rabbitmq:
addresses: 10.0.20.132
port: 5672
username: root
password: root
virtual-host: /unicode-pay
bindings:
testInPut:
destination: testRabbit
content-type: application/json
default-binder: test

这里与生产者唯一不同的地方就是testIntPut了,相信你已经明白了,它是binding的名字,也是通道与交换机绑定的关键

3.创建通道

1
2
3
4
5
6
7
8
public interface  MqMessageSource {

String TEST_IN_PUT = "testInPut";

@Input(TEST_IN_PUT)
SubscribableChannel testInPut();

}

4. 接受消息

1
2
3
4
5
6
7
8
@EnableBinding(MqMessageSource.class)
public class MqMessageConsumer {
@StreamListener(MqMessageSource.TEST_IN_PUT)
public void messageInPut(Message<String> message) {
System.err.println(" 消息接收成功:" + message.getPayload());
}

}

这个时候启动Eureka、消息生产者和消费者,然后调用生产者的接口应该就可以接受到来自mq的消息了。

GitHub地址:https://github.com/shiyujun/spring-cloud-demo。代码所在模块:cloud-demo-consumer,cloud-demo-provider-2

 

转https://www.cnblogs.com/zhixiang-org-cn/p/10093367.html

标签:Stream,SpringCloud,RabbitMQ,binder,MqMessageSource,spring,msg,public,cloud
From: https://www.cnblogs.com/smallfa/p/17030810.html

相关文章

  • Linux-Windows-Mac-RabbitMQ安装教程
    本说明分一下三种方式安装,请根据具体环境选择相关版本。linux服务器安装win版服务器安装mac版安装linux服务器rabbitmq安装1、RabbitMQ是基于Erlang的,所以首先必须配置Erlan......
  • RabbitMQ:消息丢失、消息重复、消息积压的原因和解决方案(3)
    两个概念RabbitMQ避免消息丢失的方法主要是利用消息确认机制和手动签收机制,所以有必要把这两个概念搞清楚。1、消息确认机制主要是生产者使用的机制,用来确认消息是否......
  • RabbitMQ 学习笔记
    文章目录​​1.中间件​​​​1.1特点​​​​1.2消息中间件​​​​1.3消息组成部分​​​​1.4协议​​​​1.5协议组成​​​​1.6`AMQP`​​​​1.6.1生产者流转过......
  • Redis实现秒杀功能 lua脚本判断库存、判断一人一单、添加到stream队列、异步处理订单
    需求:新增秒杀商品-将秒杀商品的id和秒杀数量添加到秒杀表中数据库操作将秒杀信息保存到Redis中基于Lua脚本,判断秒杀库存、一人一单,决定用户是否有下单资格如果抢购......
  • RabbitMQ 启动报错 ERROR: epmd error for host xxx: timeout (timed out)
    文章目录​​问题描述:​​​​解决方案:​​问题描述:Rabbitmq-serverrestart启动报错解决方案:编辑​​/etc/hosts​​文件。添加对应host的映射即可。......
  • rabbitmq目录
    回到占占推荐博客索引最近整理了一下近10年自己写的关于rabbitmq的文章,希望对各位有所帮助.rabbitMQ目录RabbitMQ~开篇与环境部署Rabbitmq~linux环境的部署Rabbi......
  • RabbitMQ 持久化 可靠性投递 可靠性消费(2)
    默认情况下,exchange、queue、message等数据都是存储在内存中的,这意味着如果RabbitMQ重启、关闭、宕机时所有的信息都将丢失。RabbitMQ提供了持久化来解决这个问题,持久......
  • SpringCloud 2020.x.x工程bootstrap引导配置不生效的解决方案
      关注公众号风色年代(itfantasycc)500GJava微服务资料合集送上~注意:2020版本以后,添加spring-cloud-context是没有用的,因为官方重构了bootstrap引导配置的加载方式需要增......
  • docker安装rabbitmq
    创建挂载目录mkdir-p/opt/docker/rabbitmq启动dockerrun-d--restart=always--name=rabbitmq-p5672:5672-p15672:15672\-v/opt/docker/rabbitmq:/var/lib/rab......
  • 尚硅谷SpringCloud(H版&alibaba)
    0.SpringCloud升级,部分组件停用:Eureka停用,可以使用zk作为服务注册中心服务调用,Ribbon准备停更,代替为LoadBalanceFeign改为OpenFeignHystrix停更,改为resilence4j或者......