首页 > 其他分享 >SpringCloud Stream集成RabbitMQ

SpringCloud Stream集成RabbitMQ

时间:2023-04-29 09:55:51浏览次数:38  
标签:Stream stream spring SpringCloud springframework RabbitMQ import org cloud

1.概述

SpringCloud Stream框架抽象出了三个最基础的概念来对各种消息中间件提供统一调用:

  • Destination Binders: 负责集成外部消息系统的组件。

  • Destination Binding: 由Binders创建的,负责沟通外部消息系统、消息发送者和消息消费者的桥梁。

  • Message: 消息发送者与消息消费者沟通的简单数据结构。

2.创建生产者项目

创建项目rabbitmq-stream-sender

  • pom.xml添加依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.olive</groupId>
	<artifactId>rabbitmq-stream-sender</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.7</version>
		<relativePath />
	</parent>
	<dependencies>
		<!--rabbitMQ相关-->
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>2021.0.6</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>
</project>
  • application.yml 配置
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: rabbitmqExchange
          content-type: text/plain
          group: stream
  rabbitmq:
    username: admin
    password: admin123
    port: 5672
    host: 127.0.0.1
    virtual-host: /

server:
  port: 8081
  • 生产者
package com.olive.service;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Source.class)
public class MessageProducer {

}
  • 发送消息接口
package com.olive.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {
	
	@Autowired
	private Source source;
	
	@GetMapping("/api/send")
	public String send(String message) {
		MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message);
		source.output().send(messageBuilder.build());
		return "success" ;
	}
}

3.创建消费者者项目

创建项目rabbitmq-stream-revicer

  • pom.xml添加依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.olive</groupId>
  <artifactId>rabbitmq-stream-revicer</artifactId>
  <version>0.0.1-SNAPSHOT</version>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.7</version>
		<relativePath />
	</parent>
	<dependencies>
		<!--rabbitMQ相关-->
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>2021.0.6</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>
</project>
  • application.yml 配置
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: rabbitmqExchange
          content-type: text/plain
          group: stream
  rabbitmq:
    username: admin
    password: admin123
    port: 5672
    host: 127.0.0.1
    virtual-host: /

server:
  port: 8082
  • 消费者
package com.olive.service;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class MessageConsumer {
	
	@StreamListener(Sink.INPUT)
	public void process(Object message) {
		System.out.println("received message: " + message);
	}
	
}

4. 验证

  • 分别启动rabbitmq-stream-senderrabbitmq-stream-revicer项目

  • 访问

http://127.0.0.1:8081/api/send?message=hello world stream

参考:
blog.csdn.net/admin522043032/article/details/124877134
blog.csdn.net/Extraordinarylife/article/details/114208673
www.cnblogs.com/wessonshin/p/12602072.html

标签:Stream,stream,spring,SpringCloud,springframework,RabbitMQ,import,org,cloud
From: https://www.cnblogs.com/happyhuangjinjin/p/17363614.html

相关文章

  • rabbitMQ--类型
    1.五种消息模型1.1基本消息模型 1.2work消息模型 1.3订阅模型1.3.1Fanout,也称为广播。流程说明流程图: 在广播模式下,消息发送流程是这样的:1)可以有多个消费者2)每个消费者有自己的queue(队列)3)每个队列都要绑定到Exchange(交换机)4)生产者发送的消息,只能......
  • 将字节数组输入流拷贝成字节数组输出流,将ByteArrayInputStream转成ByteArrayOutputStr
    /**将ByteArrayInputStream拷贝成ByteArrayOutputStream*将字节数组输入流拷贝成字节数组输出流*/publicstaticByteArrayOutputStreamgetByteArrayOutputStream(ByteArrayInputStreaminputStream)throwsIOException{ByteArrayOutpu......
  • Java1.8 新特性之Stream流
    转:Java1.8新特性之Stream流JDK1.8新特性 ......
  • RabbitMQ _ How to Close a Channel
    https://low-orbit.net/rabbitmq-how-to-close-a-channel RabbitMQHowtoCloseaChannelIfyouhavefoundyourwaytothispageyouareprobablywonderinghowtocloseachannelinRabbitMQ.Channelsshouldbeclosedwhentheyarenolongerinuse.There......
  • springcloud gateway filter 重写response
     importorg.reactivestreams.Publisher;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.cloud.gateway.filter.GatewayFilterChain;importorg.springframework.cloud.gateway.filter.GlobalFilter;importorg.springfram......
  • Linux安装RabbitMQ
    前言:还是和以前一样,linux安装软件的目录都是data目录 1.这次稍微不一样,不过还是进入data目录,创建RabbitMq目录并进入该目录cd/datamkdirrabbitMqcdrabbitMq 2.上传"erlang-21.1-1.el7.x86_64.rpm"文件和 "rabbitmq-server-3.7.7-1.el7.noarch.rpm"文件到当前......
  • SpringCloud微服务架构分析说明!
    SpringCloud是一个基于SpringBoot的微服务框架,它提供了一系列的工具和组件,用于构建分布式系统中各个微服务之间的通信和互联,实现服务发现、负载均衡、分布式配置等功能。下面我们来具体解析一下SpringCloud微服务架构。服务注册与发现在微服务架构中,服务的数量非常多,因此需要一个机......
  • Streamlit
    目录简介优点和缺点(fromClaude)命令运行Streamlit的程序CMD下运行anacondaPowershell下运行代码示例简介一个傻瓜式构建可视化web的程序Streamlit库官方地址:https://streamlit.io/API文档地址:https://docs.streamlit.io/Streamlit库基本介绍Streamlit是一个基于Python的......
  • 小团队真的适合引入SpringCloud微服务吗?
    单体应用时代接口定义持续集成(CI)微服务时代服务拆分原则框架选择架构改造自动化部署链路跟踪运维监控容器化时代架构改造SpringCloud与k8s的融合CI的改造小结微服务是否适合小团队是个见仁见智的问题。回归现象看本质,随着业务......
  • Java8使用Stream API转换Map遇到的2种异常报错和解决思路
    问题java8提供了StreamAPI,配合Lambda表达式,让开发者能对集合对象进行便利、高效的操作。在日常业务开发中,有个经常用到的场景是将List类型对象转换为Map类型对象,方便后续操作。在java8之前,这种转换需要先new一个Map对象,遍历list然后通过Map#put来初始化。使用java8后,可方便的......