首页 > 其他分享 >Spring Boot集成Spring Cloud Stream实现消息驱动微服务

Spring Boot集成Spring Cloud Stream实现消息驱动微服务

时间:2024-08-15 22:06:10浏览次数:18  
标签:stream Stream Spring Boot springframework import org cloud

Spring Boot集成Spring Cloud Stream实现消息驱动微服务

大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!

在构建微服务架构时,消息驱动的微服务是一种常见的设计模式。Spring Cloud Stream提供了一种简单而强大的模型来发送和接收消息,从而实现解耦和异步处理。

Spring Cloud Stream 简介

Spring Cloud Stream是用于构建消息驱动微服务的框架。它通过抽象消息代理(如RabbitMQ、Kafka等)的操作,简化了消息的生产者和消费者开发。

集成 Spring Cloud Stream

首先,需要在Spring Boot项目中添加Spring Cloud Stream的依赖。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

配置绑定器

Spring Cloud Stream使用绑定器来连接消息代理。需要在配置文件中定义绑定器的配置。

spring.cloud.stream.bindings.input=queue:myQueue
spring.cloud.stream.bindings.output=queue:myQueue
spring.cloud.stream.rabbit.bindings.input.consumer.queueName=myQueue
spring.cloud.stream.rabbit.bindings.output.producer.queueName=myQueue

消息生产者

使用MessageChannel发送消息。

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import cn.juwatech.common.messaging.MyMessageChannel;

@EnableBinding(Source.class)
public class MyProducer {

    private final MessageChannel output;

    public MyProducer(Source source) {
        this.output = source.output();
    }

    public void sendMessage(String payload) {
        output.send(MessageBuilder.withPayload(payload).build());
    }
}

消息消费者

使用@StreamListener注解来接收消息。

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
import cn.juwatech.common.messaging.MyMessageListener;

@Component
public class MyConsumer {

    @StreamListener("input")
    public void receiveMessage(String payload) {
        // 处理接收到的消息
    }
}

自定义消息头

Spring Cloud Stream支持自定义消息头。

import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.Message;

public class CustomHeaderUtil {

    public static Message<?> addCustomHeader(Message<?> message, String headerName, String headerValue) {
        MessageHeaderAccessor accessor = MessageHeaderAccessor.create(message);
        accessor.setHeader(headerName, headerValue);
        return accessor.getMessage();
    }
}

消息分区

对于需要高吞吐量的场景,可以使用消息分区。

spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.type
spring.cloud.stream.instanceCount=3

消息事务

Spring Cloud Stream支持消息的事务性处理。

import org.springframework.transaction.annotation.Transactional;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;

@EnableBinding(Processor.class)
public class MyTransactionalProcessor {

    private final MyService myService;

    public MyTransactionalProcessor(MyService myService) {
        this.myService = myService;
    }

    @Transactional
    public void processMessage(String payload) {
        myService.doSomething(payload);
    }
}

错误处理

Spring Cloud Stream提供了错误处理机制。

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.Message;
import cn.juwatech.common.errorhandling.MyErrorHandler;

@Component
public class MyConsumerWithErrorHandler {

    private final MyErrorHandler errorHandler;

    public MyConsumerWithErrorHandler(MyErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    @StreamListener("input")
    public void receiveMessage(@Payload String payload, Message<?> message) {
        try {
            // 尝试处理消息
        } catch (Exception e) {
            errorHandler.handleError(e, message);
        }
    }
}

消息追踪

Spring Cloud Stream可以集成Spring Cloud Sleuth来实现消息追踪。

spring.zipkin.base-url=http://localhost:9411
spring.sleuth.sampler.probability=1.0

消息监控

Spring Boot Actuator可以用来监控消息的发送和接收状态。

management.endpoints.web.exposure.include=stream

消息的测试

在开发过程中,对消息驱动的功能进行测试是非常重要的。

import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.boot.test.context.TestConfiguration;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
    properties = "spring.main.web-application-type=none")
@TestConfiguration
public class StreamTest {

    @Value("${spring.cloud.stream.bindings.input}")
    private String input;

    @Autowired
    private MyConsumer myConsumer;

    @Autowired
    private InputDestination inputDestination;

    @Test
    public void testStream() {
        inputDestination.send(MessageBuilder.withPayload("test message").build());
        // 验证消息是否被正确处理
    }
}

总结

本文详细介绍了Spring Boot集成Spring Cloud Stream实现消息驱动微服务的方法,包括消息的发送和接收、自定义消息头、消息分区、事务性处理、错误处理、消息追踪、监控和测试。通过这些内容,开发者可以快速掌握如何在Spring Boot应用中实现消息驱动的微服务,提高系统的响应性和可扩展性。

本文著作权归聚娃科技微赚淘客系统开发者团队,转载请注明出处!

标签:stream,Stream,Spring,Boot,springframework,import,org,cloud
From: https://www.cnblogs.com/szk123456/p/18361877

相关文章

  • Spring Boot应用的版本控制与发布流程
    SpringBoot应用的版本控制与发布流程大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!在软件开发过程中,版本控制和发布流程是确保软件质量和高效协作的关键环节。SpringBoot作为当前流行的Java开发框架,与版本控制和持续集成/持续部署(CI/CD)工具的集成......
  • Spring Boot应用的微服务链路追踪
    SpringBoot应用的微服务链路追踪大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!在微服务架构中,一个请求可能会经过多个服务节点,链路追踪成为了定位问题和优化性能的关键技术。SpringBoot提供了多种方式来实现微服务的链路追踪。链路追踪的基本概......
  • Spring Boot集成Spring Cloud Bus进行消息总线通信
    SpringBoot集成SpringCloudBus进行消息总线通信大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!在微服务架构中,服务之间的通信是一个常见需求。SpringCloudBus提供了一种基于消息总线的通信机制,可以用于服务间的配置更新、事件发布和订阅等场景......
  • Spring Boot中的跨域资源共享(CORS)处理
    SpringBoot中的跨域资源共享(CORS)处理大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!在Web应用开发中,跨域资源共享(CORS)是一个常见的问题。当一个Web应用需要与另一个域下的Web服务进行交互时,浏览器出于安全考虑,会默认阻止这种跨域请求。SpringBoot......
  • Spring Boot集成Zuul API网关
    SpringBoot集成ZuulAPI网关大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!在微服务架构中,API网关是一个不可或缺的组件,它负责请求的路由、负载均衡、认证、监控等任务。Zuul是一个高性能的API网关服务,由Netflix开源。SpringBoot集成Zuul可以快速......
  • [Spring]springboot
    简单介绍一下Spring?有啥缺点?Spring是重量级企业开发框架EnterpriseJavaBean(EJB)的替代品,Spring为企业级Java开发提供了一种相对简单的方法,通过依赖注入和面向切面编程,用简单的Java对象(PlainOldJavaObject,POJO)实现了EJB的功能虽然Spring的组件代码是轻量......
  • SpringBoot统一异常处理
    简介在SpringBoot项目中实现统一的异常处理是一种常见的做法,这有助于保持代码的整洁并提供一致的错误响应格式。SpringBoot中的统一异常处理是一种机制,用于集中管理和格式化应用程序中抛出的所有异常。这种机制可以提高程序的健壮性和用户体验,同时简化开发过程。统一异常处理......
  • 基于Spring AOP与Redisson的令牌桶限流注解实践
    1.什么是限流举个例子......
  • 全面掌握 Spring Cloud LoadBalancer:从自定义到策略优化的实战教程
    引言在微服务架构中,负载均衡是保障系统高效运行的关键技术之一。无论是服务端负载均衡还是客户端负载均衡,合理的负载均衡策略都能显著提升系统的稳定性和响应速度。本文将从基础概念入手,详细讲解如何在SpringCloud中实现和优化负载均衡,并结合实际案例,帮助读者快速上手并......