首页 > 其他分享 >05期:面向业务的消息服务落地实践

05期:面向业务的消息服务落地实践

时间:2023-03-28 19:00:58浏览次数:41  
标签:服务 落地 05 private class String 面向 public 消息

简介:传统的消息队列对业务方提出了更高的要求,我们期望提供的是一种以业务为重心的,面向服务的解决方案。

这里记录的是学习分享内容,文章维护在 Github:studeyang/leanrning-share

我们在上次分享中聊到了领域驱动设计和微服务,在 DDD 中有一个术语叫做领域事件,例如订单模型中的订单已创建、商品已发货。领域事件会触发下一步的业务操作,如果领域事件发生在微服务内,可以通过观察者模式很容易实现消息监听并处理。

<img src="https://technotes.oss-cn-shenzhen.aliyuncs.com/2023/202303242122856.png" style="zoom:50%;" />

如果发生在微服务之间,则需引入事件总线或者消息中间件。

一、消息队列解决方案

经过技术选型后,我们决定使用 Kafka 作为消息中间件,此时微服务间的通信示意图如下:

不过,直接使用消息队列将面临以下问题:

  1. 开发成本大:开发团队成员都需要对消息队列如 Kafka 技术有一定的了解,并且还需要关注连接消息队列的一些配置;
  2. 管理难度大:各团队都使用一个消息队列,其中一个团队使用不当时,例如创建了很多个 topic,造成资源浪费;
  3. 监控难度大:当前只有对 Kafka 集群简单的监控功能;
  4. 运维困难:遇到线上消息没有消费时,很难排查问题,无从下手;
  5. 升级难度大:Kafka-Client 需要升级时,涉及到服务太多,导致升级成本高;

我们期望提供的是一种以业务为重心的,面向服务的解决方案。

也就是说,即使团队中没人了解消息队列技术,也能够收发消息。于是对 Kafka SDK 二次封装,主要就是为了简化消息的接入,无需关注配置。

封装后解决了开发成本大、管理难度大的问题,但是离面向服务的解决方案目标还有一定的差距。比如业务方监听到消息后,执行一系列的业务逻辑异常了,想要做业务补偿,我们的“基于 Kafka SDK 二次封装”的方案就没办法满足,只能要求消息发送方再发一次消息,但这又会影响其他消息监听者。

于是我们决定将消息列队封装成消息服务,对业务方提供切实的服务能力。

二、消息服务解决方案

我们熟知计算机中总线,在计算机系统中,不同的组件和设备需要相互通信以完成各种任务,此时,计算机总线就发挥了重要作用。类似的,微服务系统中,微服务就像是计算机系统中的各个组件和设备,而消息服务充当的就是计算机总线的角色。消息总线由此而来。

本文中出现的消息总线和消息服务指的是同一个东西。

2.1 架构设计

发送消息和接收消息是消息服务最基本的能力,这两项能力分别由消息生产服务、消息消费服务提供。

2.2 消息的流转过程

三、消息服务初体验

微服务架构采用的技术栈是:SpringBoot、Kubernetes。

我们将消息总线取名为 Courier,Courier 的意思是“快递员”,消息的传递类似于快递的收发,消息总线正是快递员的角色。下面开始消息服务的初体验。

3.1 零配置接入消息总线

由于我们的微服务使用的是 SpingBoot 来落地的,因此我们提供了一个接入消息总线的 spring-boot-starter。

<dependency>
    <groupId>com.casstime.open</groupId>
    <artifactId>courier-spring-boot-starter</artifactId>
</dependency>

接入消息总线,微服务只需要一个@EnableMessage注解即可加载所有相关配置:

@EnableMessage
@SpringBootApplication
public class WebApplication {
    public static void main(String[] args) {
        SpringApplication.run(WebApplication.class, args);
    }
}

3.2 消息结构定义

下面代码定义了一个消息的基本信息,也称为消息 Header,包括消息 id,分区键 primaryKey,来源服务 service,消息 topic,创建时间 timstamp。

public abstract class Message {
    private String id;
    private String primaryKey;
    private String service;
    private String topic;
    private Date timeStamp;
}

消息可以分为两类,一类是事件,另一类是广播。定义如下:

// 事件
public abstract class Event extends Message {
}
// 广播
public abstract class Event extends Message {
}

业务消息内容称为消息 Body,例如订单已创建这个消息体的定义:

@Topic(name = "order")
public class OrderCreated extends Event {
    private String orderId;
    private String orderName;
    private Date createdAt;
}

3.3 使消息收发变得简单

业务方可以在业务执行方法的任一处,只需要一行代码,即可完成消息的发送。

// 发送消息
EventPublisher.publish(new OrderCreated());

对于消息的监听,业务方只需关注业务逻辑的执行,屏蔽了 Offset 提交、重试等技术实现。

// 接收消息
@EventHandler(topic = "order", consumerGroup = "consumer-group1")
public class OrderMessageHandler {
    public void handle(OrderCreated orderCreated) {
        System.out.println("receive message: " + orderCreated);
    }
}

3.4 提供 5 种功能类型的消息

我们提供了 5 种不同功能类型的消息,满足各类业务场景。

1、事件消息

@Topic(name = "order")
public class OrderCreated extends Event {
    private String orderId;
    private String orderName;
    private Date createdAt;
}

public void send() {
    EventPublisher.publish(new OrderCreated());
}

上面消息定义是事件,这是使用最多的一种消息。

2、广播消息

广播消息的消费示意图如下:

@Topic(name = "order")
public class CacheUpdate extends Broadcast {
    private String orderId;
    private String orderName;
    private Date createdAt;
}

public void send() {
    EventPublisher.publish(new CacheUpdate());
}

上面消息定义时,继承了Broadcast,表示这是一个广播消息,消费服务的每个节点都将会收到这个广播。例如更新本地缓存事件,就需要用到广播消息。

3、顺序消息

@Topic(name = "order")
public class OrderCreated extends Event {
    @PrimaryKey
    private String orderId;
    private String orderName;
    private Date createdAt;
}

public void send() {
    EventPublisher.publish(new OrderCreated());
}

上面消息定义时,在orderId上加了@PrimaryKey注解,表示相同orderId的消息会有序的消费。

4、事务消息

@Topic(name = "order")
public class OrderCreated extends Event {
    private String orderId;
    private String orderName;
    private Date createdAt;
}

@Transactional
public void send() {
    EventPublisher.publish(new OrderCreated());
}

上面消息发送时,在方法上添加了@Transactional注解,这是 Spring 的注解,表示这个方法里的逻辑执行是有事务性的。

5、延迟消息

@Topic(name = "order")
public class OrderCreated extends Event {
    private String orderId;
    private String orderName;
    private Date createdAt;
}

@Transactional
public void send() {
    EventPublisher.publish(new OrderCreated(), 2, TimeUnit.SECONDS);
}

上面消息发送多了两个参数,表示延迟 2 秒接收。

3.5 消息追踪

只要是通过EventPublisher.publish()方法发送的消息,都可以追踪到这条消息记录。

消息定义了 5 种状态:

  • 发送失败(SEND_FAIL):通常消息定义不规范,消息体过大;少数由于网络抖动。
  • 已提交(COMMITED):消息总线已收到消息。
  • 推送失败(PUSH_FAIL):例如服务已下线。
  • 处理失败(HANDLE_FAIL):监听到了消息,但是执行业务逻辑抛出了异常。
  • 已处理(HANDLED)

作为消息的发送方,关注的是消息是否发送成功,可通过下面页面查询。

作为消息的接收方,关注的是消息是否正常消费,可通过下面页面查询。

3.6 消息高可靠

对于 5 种状态的消息,处理策略如下:

  • 发送失败(SEND_FAIL):自动重试+手动重试,可在消息管理中心手动再发送。
  • 已提交(COMMITED):长期处理已提交状态的消息,可能消费方已接收,但状态流转异常,消息总线会定时重试。
  • 推送失败(PUSH_FAIL):自动重试+延迟重试。
  • 处理失败(HANDLE_FAIL):自动重试默认关闭,由消费方决定是否开启重试。
  • 已处理(HANDLED):也可手动重试。

封面

相关文章

也许你对下面文章也感兴趣。

标签:服务,落地,05,private,class,String,面向,public,消息
From: https://blog.51cto.com/u_15894879/6155254

相关文章

  • C#面向对象核心-继承
    继承继承主要实现重用代码,来节省开发时间。1继承基本概念一个类B继承一个类A,被继承的类A称为父类、基类、超类,继承的类B称为子类、派生类。子类会继承父类的所有成......
  • 面向对象设计原则
    1、简介常用的面向对象设计原则包括7个,这些原则并不是孤立存在的,它们相互依赖,相互补充。SRP:就一个类而言,应该只有一个引起它变化的原因,也就是一个类只有一个职责,这个......
  • C#面向对象核心-封装
    封装封装定义为"把一个或多个项目封闭在一个物理的或者逻辑的包中",这个包就是类。在面向对象程序设计方法论中,封装可以防止对实现细节的访问。1类和对象1.1什么是类......
  • (原创)【B4A】一步一步入门09:xCustomListView,加强版列表、双行带图片、复选框按钮等自定
    一、前言上篇((原创)【B4A】一步一步入门08:ListView,列表、单行、双行、双行带图片、列表项样式(控件篇04))我们讲了ListView,目前官方已经不推荐再使用ListView了,而是推荐使用xCu......
  • SpringBoot 整合AOP(面向切面编程)其中@Around失效问题
    1.AOP实现知识点​核心思想:动态代理。​支持技术:反射。2.官方文档名词解释​Aspect(切面):关注点的模块化(新增业务的模块化)。为完成新业务而编写的类对象。(带@Aspect注解......
  • 数据结构-->单链表OJ题--->讲解_05
    本期我们讲解:>1.给定值x为基准将链表分割成两部分,所有小于x的结点排在大于或等于x的结点之前本题的思路是创建两个链表,通过比较,一个存放小于x的结点的链表,另一个存放大于......
  • python-05
    python模块的定义;Python模块(Module),是一个Python文件,以.py结尾,包含了Python对象定义和Python语句。模块让你能够有逻辑地组织你的Python代码段。把相关的代码分配到一......
  • 05 Golang 流程控制
    一、条件判断1.条件判断简介条件语句是用来判断给定的条件是否满足(表达式值是否为true或者false),并根据判断的结果决定执行情况的语句。go语言中的条件语句主要包含如......
  • 就在明天!用友企业数智化财务峰会落地广州,聚焦实现业财合一新价值
    3月28日,以「智能会计价值财务」为主题的“2023企业数智化财务创新峰会”即将登陆广州。一同见证“智能会计”新时代下,用友“价值财务”的新主张,同时在全球化的今天,助力大......
  • 我的收藏周刊052
    编辑寄语:
2022年3月28日收藏周刊创刊号发布,转眼一年的时间过去了,周刊发布了51期,其中由于个人疏忽停刊一期。创建此周刊是收到了阮一峰科技爱好者周刊的启发,且由于时......