1.引入spring cloud, spring cloud alibaba, spring boot依赖
<!-- SpringCloud 微服务 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2020.0.1</version> <type>pom</type> <scope>import</scope> </dependency> <!-- SpringCloud Alibaba 微服务 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2021.1</version> <type>pom</type> <scope>import</scope> </dependency> <!-- SpringBoot 依赖配置 --> <!--<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.4.2</version> <type>pom</type> <scope>import</scope> </dependency>-->
2.引入spring cloud stream rabbit依赖
<!-- spring cloud stream RabbitMQ --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
3. 配置
spring: cloud: stream: binders: default-rabbit: type: rabbit environment: spring: rabbitmq: host: ${RABBITMQ_HOST:xx} port: ${RABBITMQ_PORT:5672} username: xx password: xx bindings: #MQ生产者 test-out-0: destination: test-topic content-type: application/json binder: default-rabbit #MQ消费者 test1-in-0: destination: test-topic group: test1-group content-type: application/json binder: default-rabbit #MQ消费者 test2-in-0: destination: test-topic group: test2-group content-type: application/json binder: default-rabbit rabbit: bindings: test-out-0: producer: routingKeyExpression: headers.type exchange-type: direct test1-in-0: consumer: bindingRoutingKey: k1 exchange-type: direct acknowledge-mode: AUTO test2-in-0: consumer: bindingRoutingKey: k2 exchange-type: direct acknowledge-mode: AUTO #消费者方法名 function: definition: test1;test2;
4 生产者代码
@Autowired private StreamBridge streamBridge; @ApiOperation("动态解析并发送消息到指定名称的通道") @PostMapping("test/send/{destination}/{routingKey}") @ResponseStatus(HttpStatus.ACCEPTED) public void send(@PathVariable String destination, @PathVariable String routingKey){ Map<String,String> payload = new HashMap<>(); payload.put("key1", "value1"); payload.put("key2", "value2"); Message<Map<String, String>> message = MessageBuilder .withPayload(payload) .setHeader("type", routingKey) .build(); streamBridge.send(destination, message); } // 请求 : http://localhost/test/send/test-out-0/k2
5 消费者
@Bean public Consumer test1() { return request -> { log.info("test1 MQ消费 req:{}", request); }; } @Bean public Consumer test2() { return request -> { log.info("test2 MQ消费 req:{}", request); }; }
6 源码分析
生产者发送源码分析:
org.springframework.cloud.stream.function.StreamBridge#send SubscribableChannel messageChannel = org.springframework.cloud.stream.function.StreamBridge#resolveDestination 封装channel SubscribableChannel messageChannel = new DirectWithAttributesChannel(); Binding<T> binding = org.springframework.cloud.stream.binding.BindingService#bindProducer org.springframework.cloud.stream.binding.BindingService#getBinder org.springframework.cloud.stream.binder.DefaultBinderFactory#getBinder org.springframework.cloud.stream.binder.DefaultBinderFactory#doGetBinder org.springframework.cloud.stream.binder.DefaultBinderFactory#getBinderInstance 1.得到RabbitMessageChannelBinder,启动时注入的org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration 2.得到DefaultBinderFactory, 启动时注入的org.springframework.cloud.stream.config.BindingServiceConfiguration org.springframework.cloud.stream.binding.BindingService#doBindProducer org.springframework.cloud.stream.binder.AbstractMessageChannelBinder#doBindProducer org.springframework.cloud.stream.binder.AbstractMessageChannelBinder#createProducerMessageHandler org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder#createProducerMessageHandler MessageHandler endpoint = new AmqpOutboundEndpoint()
标签:stream,spring,springframework,binder,3.1,org,cloud From: https://www.cnblogs.com/smileblogs/p/18011781