一、生产者【2062】
1、pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>3.0.7.RELEASE</version>
</dependency>
2、application.yml
server:
port: 2062
spring:
application:
name: providerstream2062
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
input: #(默认邦定器:output)内置的获取信息的通道,从交换机(rabbit-exchang01)获取
destination: rabbit-exchang01
binders: #默认邦定器
defaultRabbit:
type: rabbit
3、MessageSender 发送者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* 负责向中间件发送数据
*/
@Component
@EnableBinding(Source.class)
public class MessageSender {
@Autowired
private MessageChannel output;
//发信息
public void send(Object obj){
output.send(MessageBuilder.withPayload(obj).build());
}
}
4、Providerstream2062Application
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Providerstream2062Application {
public static void main(String[] args) {
SpringApplication.run(Providerstream2062Application.class, args);
}
}
5、Controller发送信息
1)、
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
2)、
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ControllerTest {
@Autowired
private MessageSender messageSender;
@GetMapping(value = "/send")
public void test(String msg){
messageSender.send(msg);
} }
二、消费者【2058】
1、pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>3.0.7.RELEASE</version>
</dependency>
2、MessageListener监听
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class MessageListener {
//监听binding中的消息,绑定器input
@StreamListener(Sink.INPUT)
public void input(String message){
System.out.println("获取到信息:"+message);
}
}
3、application.yml
server:
port: 2058
spring:
application:
name: consumberstream2058
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
input: #(默认邦定器:input)内置的获取信息的通道,从交换机(rabbit-exchang01)获取
destination: rabbit-exchang01
binders: #默认邦定器
defaultRabbit:
type: rabbit
4、Consumberstream2058Application
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Consumberstream2058Application {
public static void main(String[] args) {
SpringApplication.run(Consumberstream2058Application.class, args);
}
}
三、Rabbit[15671] web客户端
#交换机rabbit-exchange01 直接发送信息
四、绑定器(自定义)
1、生产者【2062】
2)、application.yml
server:
port: 2062
spring:
application:
name: providerstream2062
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
output: #(默认邦定器:output)内置的获取信息的通道,从交换机(rabbit-exchang01)获取
destination: rabbit-exchang01outputMy: #配置绑定器outputMy(自定义)
destination: rabbit-exchang01
binders: #默认邦定器
defaultRabbit:
type: rabbit
3)、MessageSender 发送者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* 负责向中间件发送数据
*/
@Component
//@EnableBinding(Source.class)
@EnableBinding(MyProcessor.class)
public class MessageSender {
@Autowired
private MessageChannel outputMy;
//发信息
public void send(Object obj){
outputMy.send(MessageBuilder.withPayload(obj).build());
}
}
5)、Controller发送信息
i)、
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
ii)、
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ControllerTest {
@Autowired
private MessageSender messageSender; @GetMapping(value = "/send")
public void test(String msg){
messageSender.send(msg);
} }
6)、MyProcessor 绑定器(自定义)
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel; public interface MyProcessor {
String INPUT_MY="inputMy";
String OUPUT_MY="outputMy";
@Input(INPUT_MY)
SubscribableChannel inputMy();
@Output(OUPUT_MY)
MessageChannel outputMy();
}
2、消费者【2058】
2)、MessageListener监听
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
@Component
//@EnableBinding(Sink.class)
@EnableBinding(MyProcessor.class)
public class MessageListener {
//监听binding中的消息
//@StreamListener(Sink.INPUT)
@StreamListener(MyProcessor.INPUT_MY)
public void input(String message){
System.out.println("获取到信息:"+message);
} }
3)、application.yml
server:
port: 2058
spring:
application:
name: consumberstream2058
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
input: #(默认邦定器:input)内置的获取信息的通道,从交换机(rabbit-exchang01)获取
destination: rabbit-exchang01inputMy: #配置绑定器inputMy(自定义)
destination: rabbit-exchang01
binders: #默认邦定器
defaultRabbit:
type: rabbit
5)、MyProcessor 绑定器(自定义)
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface MyProcessor {
String INPUT_MY="inputMy";
String OUPUT_MY="outputMy";
@Input(INPUT_MY)
SubscribableChannel inputMy();
@Output(OUPUT_MY)
MessageChannel outputMy();
}
3、Rabbit[15671] web客户端
#交换机rabbit-exchange01 直接发送信息
标签:stream,20230112,springframework,rabbit,import,org,cloud From: https://blog.51cto.com/smallfa/6010512