首页 > 其他分享 >sc stream-rabbit20230116

sc stream-rabbit20230116

时间:2023-01-06 19:11:22浏览次数:44  
标签:rabbit20230116 stream spring springframework import sc org cloud

 

 

 

 

一、Exchanges:testRabbit

 

 

 

 

 

二、MQ生产者

 

 

1、pom.xml
  <properties>
    <java.version>1.8</java.version>
    <spring-cloud.version>2021.0.5</spring-cloud.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
      <version>3.0.7.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
      <version>3.0.7.RELEASE</version>
    </dependency>
   </dependencies>
  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

 

2、application.properties
  # 应用名称
  spring.application.name=providerstream2054
  # 应用服务 WEB 访问端口
  server.port=2054
  ##eureka
  eureka.client.service-url.defaultZone=http://localhost:2013/eureka/
  eureka.instance.prefer-ip-address=true
  eureka.instance.instance-id=${spring.cloud.client.ip-address}:${server.port}
  eureka.instance.lease-renewal-interval-in-seconds=5
  eureka.instance.lease-expiration-duration-in-seconds=10

 

3、application.yml
  spring:
    cloud:
     stream:
      binders:
        test:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                addresses: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        testOutPut:
          destination: testRabbit
          content-type: application/json
          default-binder: test


4、MqMessageSource

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface MqMessageSource {
  String TEST_OUT_PUT="testOutPut";

  @Output(TEST_OUT_PUT)
  MessageChannel testOutPut();
}

 

5、MqMessageProducer
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

  @EnableBinding(MqMessageSource.class)
  public class MqMessageProducer {
    @Autowired
    @Output(MqMessageSource.TEST_OUT_PUT)
    private MessageChannel channel;


    public void sendMsg(String msg) {
      channel.send(MessageBuilder.withPayload(msg).build());
      System.err.println("端口【2054】消息发送成功:"+msg);
    }
  }

6、sendController
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.web.bind.annotation.GetMapping;
  import org.springframework.web.bind.annotation.RequestParam;
  import org.springframework.web.bind.annotation.RestController;

  @RestController
  public class sendController {
    @Autowired
    private MqMessageProducer mqMessageProducer;

    @GetMapping(value = "/send")
    public String sendMq(@RequestParam("msg") String msg)
    {
      mqMessageProducer.sendMsg(msg);
      return "发送成功";
    }
  }

7、Provderstream2054Application
  import org.springframework.boot.SpringApplication;
  import org.springframework.boot.autoconfigure.SpringBootApplication;
  import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

  @EnableEurekaClient
  @SpringBootApplication
  public class Provderstream2054Application {

  public static void main(String[] args) {
    SpringApplication.run(Provderstream2054Application.class, args);
  }

}

 

 

 

 

 

 

 

 

 

 

 

三、MQ消费者

1、pom.xml
  <properties>
    <java.version>1.8</java.version>
    <spring-cloud.version>2021.0.5</spring-cloud.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
      <version>3.0.7.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
      <version>3.0.7.RELEASE</version>
    </dependency>
  </dependencies>
  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

 

2、application.properties

# 应用名称
spring.application.name=consumberstream2055
# 应用服务 WEB 访问端口
server.port=2055
##eureka
eureka.client.service-url.defaultZone=http://localhost:2013/eureka/
eureka.instance.prefer-ip-address=true
eureka.instance.instance-id=${spring.cloud.client.ip-address}:${server.port}
eureka.instance.lease-renewal-interval-in-seconds=5
eureka.instance.lease-expiration-duration-in-seconds=10

 

3、application.yml
spring:
  cloud:
    stream:
    binders:
      test:
        type: rabbit
        environment:
          spring:
            rabbitmq:
              addresses: localhost
              port: 5672
              username: guest
              password: guest
    bindings:
      testInPut:
        destination: testRabbit
        content-type: application/json
        default-binder: test

 

4、MqMessageSource

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MqMessageSource {
  String TEST_IN_PUT="testInPut";

  @Input(TEST_IN_PUT)
  SubscribableChannel testInPut();
}

 

5、MqMessageConsumer
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;

  @EnableBinding(MqMessageSource.class)
  public class MqMessageConsumer {
    @StreamListener(MqMessageSource.TEST_IN_PUT)
    public void messageInPut(Message<String> message){
      System.err.println("端口【2055】消息接收成功:"+message.getPayload());
    }
  }

 

6、Consumberstream2055Application
  import org.springframework.boot.SpringApplication;
  import org.springframework.boot.autoconfigure.SpringBootApplication;

  @SpringBootApplication
  public class Consumberstream2055Application {

    public static void main(String[] args) {
      SpringApplication.run(Consumberstream2055Application.class, args);
    }

  }

 

 

 

 

 

 

 

 

 

 四、EK2013

 

标签:rabbit20230116,stream,spring,springframework,import,sc,org,cloud
From: https://www.cnblogs.com/smallfa/p/17031394.html

相关文章