背景
消息队列(Message Queue,MQ)的三大特性:异步、解耦、削峰
在很多场景下,我们的业务是没有那么强的实时性的。
比如登录成功后发送短信,我们不必在登录的逻辑里实时的去调用发送短信,而是可以先“记下来”我们等下需要发送短信,然后快速的给出响应。
串行的登录+发送短信逻辑,会导致必须做完所有操作用户才能收到响应,体验十分不好。
消息队列诞生的背景之一,就是为了解决这个的问题。
1 MQ的特性
1.1 异步
概念
异步就类似于上面的例子,为了体会深刻一点,我一般都习惯用夸张的手法描述。
假设我们需要做的逻辑是登录、发短信、发微信、发邮件,每个耗时1s。
同步的场景下,用户需要等待所有接口执行完成,等待4s。
现在呢,你也知道,核心功能在于登录,微信通知能不能立马发给他,好像也没有那么重要。
我们只要确保,还记得要做发短信这个事情就好了,所以,这个流程可以改成异步。
现在我们先不管MQ后面的逻辑,来看下整个耗时,已经变成1.1s了。
所以说,关键点在于这个事情有没有必要同步的做,没必要的时候,异步可以很好的优化整个流程。
示例
用Java代码演示下,直接来4个类,每个类1个方法,sleep模拟耗时1s。
package cn.yang37.queue;
import lombok.SneakyThrows;
/**
* @description:
* @class: A
* @author: yang37z@qq.com
* @date: 2023/7/13 20:56
* @version: 1.0
*/
public class RunDemo {
static class A {
@SneakyThrows
public static String doA() {
Thread.sleep(1000);
return "ok";
}
}
static class B {
@SneakyThrows
public static String doB() {
Thread.sleep(1000);
return "ok";
}
}
static class C {
@SneakyThrows
public static String doC() {
Thread.sleep(1000);
return "ok";
}
}
static class D {
@SneakyThrows
public static String doD() {
Thread.sleep(1000);
return "ok";
}
}
}
现在,我们依次执行这几个方法,记录下耗时。
@Test
void name1() {
long start = System.currentTimeMillis();
String doA = RunDemo.A.doA();
String doB = RunDemo.B.doB();
String doC = RunDemo.C.doC();
String doD = RunDemo.D.doD();
log.info("doA: {}", doA);
log.info("doB: {}", doB);
log.info("doC: {}", doC);
log.info("doD: {}", doD);
long end = System.currentTimeMillis();
log.info("执行耗时: {}", end - start);
}
程序输出
2023-07-13 21:15:21.569 -- [main] INFO cn.yang37.queue.RunDemoTest.name1 - doA: ok
2023-07-13 21:15:21.570 -- [main] INFO cn.yang37.queue.RunDemoTest.name1 - doB: ok
2023-07-13 21:15:21.570 -- [main] INFO cn.yang37.queue.RunDemoTest.name1 - doC: ok
2023-07-13 21:15:21.571 -- [main] INFO cn.yang37.queue.RunDemoTest.name1 - doD: ok
2023-07-13 21:15:21.571 -- [main] INFO cn.yang37.queue.RunDemoTest.name1 - 执行耗时: 4034
嗯,每个方法耗时1s,总体耗时超过4s。
现在,我们只做A,异步的去操作B和C。
@Test
void name2() throws Exception {
long start = System.currentTimeMillis();
// 操作A
String doA = RunDemo.A.doA();
log.info("doA: {}", doA);
// 等待A执行完成后,BCD不同步执行,异步处理.
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(RunDemo.B::doB);
CompletableFuture<String> futureC = CompletableFuture.supplyAsync(RunDemo.C::doC);
CompletableFuture<String> futureD = CompletableFuture.supplyAsync(RunDemo.D::doD);
// futureB/C/D都执行完成后,通知combinedFuture对象
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futureB, futureC, futureD);
// combinedFuture等待并获取执行结果
combinedFuture.thenRun(() -> {
try {
String resultB = futureB.get();
String resultC = futureC.get();
String resultD = futureD.get();
log.info("doB: {}", resultB);
log.info("doC: {}", resultC);
log.info("doD: {}", resultD);
} catch (Exception e) {
log.error("执行出错!", e);
}
});
// 阻塞等待所有异步任务完成
combinedFuture.get();
long end = System.currentTimeMillis();
log.info("执行耗时: {}", end - start);
}
程序输出
2023-07-13 21:22:46.081 -- [main] INFO cn.yang37.queue.RunDemoTest.name2 - doA: ok
2023-07-13 21:22:47.100 -- [ForkJoinPool.commonPool-worker-2] INFO cn.yang37.queue.RunDemoTest.lambda$name2$0 - doB: ok
2023-07-13 21:22:47.100 -- [ForkJoinPool.commonPool-worker-2] INFO cn.yang37.queue.RunDemoTest.lambda$name2$0 - doC: ok
2023-07-13 21:22:47.100 -- [ForkJoinPool.commonPool-worker-2] INFO cn.yang37.queue.RunDemoTest.lambda$name2$0 - doD: ok
2023-07-13 21:22:47.100 -- [main] INFO cn.yang37.queue.RunDemoTest.name2 - 执行耗时: 2027
你看,同样都能输出B、C、D,我们只是改成了异步操作,耗时已经来到2s了。
对比之前4s的耗时,时间节省了大把。
1.2 解耦
解耦怎么理解呢,比如说,刚才还是执行A、B、C、D这4个方法,现在的逻辑是。
A、B、C、D这4个操作都需要执行下。
假设某天,我们的场景变成了邮件通知不需要发送,哪怕还是刚才优化后的异步代码,类比不执行C操作,也会有一个问题。
我得去掉调用这个方法的代码。
// 等待A执行完成后,BCD不同步执行,异步处理.
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(RunDemo.B::doB);
CompletableFuture<String> futureC = CompletableFuture.supplyAsync(RunDemo.C::doC);
CompletableFuture<String> futureD = CompletableFuture.supplyAsync(RunDemo.D::doD);
很简单,不调用就不执行了呗。
// 等待A执行完成后,BCD不同步执行,异步处理.
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(RunDemo.B::doB);
CompletableFuture<String> futureD = CompletableFuture.supplyAsync(RunDemo.D::doD);
哇靠,随便去掉下,就又来两个报错。
你看,只是这么个简单的操作,就要修改这么多代码。而且,实际开发中,肯定比这复杂的多。
最关键的是你改完代码还要发布个版本重启下服务。
又或者发短信的方法执行的时候,我想让他再加个发送登录时间啥的,能不能让他执行的时候帮我改下呀。
这个时候问题就出现了,就是异步操作和我们关心的主要业务逻辑耦合在了一起。
就像上面那个地方,你就在想,要是我可以动态的更改帮我异步操作的那个东西就行了。
通过MQ,可以将主要业务逻辑和不太重要的异步操作逻辑拆开,达到解耦的效果。
就比如不执行D操作,我可以告诉MQ,你别执行xx操作了,这样也实现了去掉的效果。
怎么实现的暂时不是重点,关键是,要不要执行xx操作,怎么执行xx操作,这件事情已经和你的主要逻辑分开了。
1.3 削峰
削峰比较好理解,以上面的例子为例,登录+发送短信邮件那些通知,不管怎么样,发通知执行的时候总是要耗费服务器资源的。
夸张一下,执行进来一千万个请求,如果让你同时执行,是不是开始担心了,直接登录异常了。
为了不影响我们登录的操作,可以暂时将这些不重要的操作先存到MQ里面,后续由实际的消费服务来执行,缓解我们核心程序的压力。
2 队列和主题模型
为啥要叫消息队列?
2.1 队列
很废话,这玩意不就是存放消息的队列吗,不叫队列叫什么?
嗯,关键字来了,什么是队列?
队列就是我们说的那个队列,跟日常生活中的队列一样的。
食堂排队吃饭,这是个队列。
景区门点检票,这也是个队列。
那么队列最核心的概念是什么?
先进先出,FIFO(First-In-First-Out,FIFO)。
即最先进入队列的元素将首先被处理,而最后进入队列的元素将被保留在队列末尾等待处理。
MQ做的事情很专注,就是告诉某某某需要做啥事了。
还是刚才那个场景。
现在,我们把后面3个发通知的操作告诉队列,它就会帮我去依次的去找人处理。
现在换个场景,买票,一次只能买一张,假设我们都是通知小黄帮我们去买票,买一张的时候,一个队列就够用了。
哇靠,可是我是两个人,我想要两张票,我准备找小李也帮我买下。
这个时候,你就发现,好像得变成这样。
哇靠,只有一个队列,好像不能同时处理。没办法呀,得排队,通知完黄哥再通知我李哥。
问题就来了,我就想快点操作,就想同时通知他们咋办。
问题的关键点就是:怎么将同一个消息,同时通知给多个人。
嗯,很简单嘛,我再搞一个队列进来。
哈哈,是不是很easy,一个黄哥专属队列,一个李哥专属队列,这不就能一下子通知两个人了?
这个时候就带来了一个成本的问题。问题确实解决了,但是你发现没有,那个绿色的正方形框框被复制了一个在上面的李哥队列,假设我要买它个几十张呢。
现在,可以整理出来问题点了。
-
需要创建多个队列,复制多份消息。
是会很影响资源和性能的。
-
生产者需要知道具体消费者个数,然后去创建队列和复制消息。
我本人得知道有几个人在帮忙,黄哥李哥两个人我就搞2个队列,再来个张哥我就搞3个队列。
我明明就是找别人帮忙买个票,咋还要关心这么多破事啊我靠。
不解耦,都自己搞,就是这么烦恼。
可不可以不管上面这些破事,我就在那里吼一声"帮忙买票,给两倍的米!",然后就行了啊?
当然可以,不然后面这么多东西怎么写。
这个时候就需要引出一个主题模型,也叫作发布-订阅模型。
2.2 观察者模式
我先给你手搓一个设计模式里的观察者模式。为啥突然要手搓一个,主要是我要顺带复习下。
你敢相信这玩意一年前我研究过?我现在看代码人都是懵的。
好,先搞一个观察者模式,实现上面这个功能,逻辑很简单,就是这样。
根据这个背景来编码一下,项目结构是这样的。
首先定义一个观察者接口,因为我们知道,每个观察者,肯定要能接收到通知和知道价格。
package cn.yang37.design.patterns24.observe.practice.p1;
/**
* @description: 观察者-股民
* @class: StockObserver
* @author: yang37z@qq.com
* @date: 2022/3/8 23:26
* @version: 1.0
*/
public interface StockObserver {
/**
* 股价波动通知
* @param msg 通知信息
*/
void obtainStockChangeMsg(String msg);
/**
* 获得股价
* @param stockPrice 股价
*/
void obtainStockPrice(float stockPrice);
}
股民来个小王和小宋,实现下StockObserver接口。
package cn.yang37.design.patterns24.observe.practice.p1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @description: 股民
* @class: StockInvestor
* @author: yang37z@qq.com
* @date: 2022/3/8 23:49
* @version: 1.0
*/
public class StockInvestor implements StockObserver {
private static final Logger log = LoggerFactory.getLogger(StockInvestor.class);
@Override
public void obtainStockChangeMsg(String msg) {
log.info("当前对象: {},通知信息: {}", this.getClass().getSimpleName(), msg);
}
@Override
public void obtainStockPrice(float stockPrice) {
log.info("当前对象: {},当前股价: {}", this.getClass().getSimpleName(), stockPrice);
}
}
小王和小宋继承下这个StockInvestor类,就默认有这两个方法了。
package cn.yang37.design.patterns24.observe.practice.p1.stock;
import cn.yang37.design.patterns24.observe.practice.p1.StockInvestor;
/**
* @description: 股民A
* @class: Wang
* @author: yang37z@qq.com
* @date: 2022/3/8 23:48
* @version: 1.0
*/
public class Wang extends StockInvestor {
}
package cn.yang37.design.patterns24.observe.practice.p1.stock;
import cn.yang37.design.patterns24.observe.practice.p1.StockInvestor;
/**
* @description: 股民宋
* @class: Song
* @author: yang37z@qq.com
* @date: 2022/3/8 23:48
* @version: 1.0
*/
public class Song extends StockInvestor {
}
股民部分就搞定了。
现在再来搞一个主题(消息),抽象一手,应该具备以下几个功能,增删和通知。
package cn.yang37.design.patterns24.observe.practice.p1;
/**
* @description:
* @class: StockSubject
* @author: yang37z@qq.com
* @date: 2022/3/8 23:19
* @version: 1.0
*/
public interface StockSubject {
/**
* 注册
*
* @param observe 股民对象
*/
void registerObserver(StockObserver observe);
/**
* 移除
*
* @param observe 股民对象
*/
void removeObserver(StockObserver observe);
/**
* 通知: 状态改变时,通知所有观察者.
*/
void notifyObserver();
}
然后做一下这个类的具体实现,搞一个股价变动的实现类(假设有个别的场景,比如钱亏完了的通知,你还是可以实现这个类,核心的方法就这几个)。
这里面的代码就是具体的实现操作了。
package cn.yang37.design.patterns24.observe.practice.p1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Vector;
/**
* @description: 股价改变
* @class: PriceChangeSubject
* @author: yang37z@qq.com
* @date: 2022/3/8 23:28
* @version: 1.0
*/
public class StockSubject_PriceChange implements StockSubject {
private static final Logger log = LoggerFactory.getLogger(StockSubject_PriceChange.class);
/**
* 当前股价
*/
private float nowPrice = 2.0f;
/**
* 波动标志
*/
private boolean priceChange = false;
/**
* 通知信息
*/
private String msg = "[股价变动较大通知] ";
/**
* 维护一份观察者对象
*/
private final Vector<StockObserver> stockObserverVector = new Vector<>();
@Override
public void registerObserver(StockObserver observe) {
stockObserverVector.add(observe);
}
@Override
public void removeObserver(StockObserver observe) {
stockObserverVector.remove(observe);
}
@Override
public void notifyObserver() {
stockObserverVector.forEach(observer -> {
// 通知当前价格
observer.obtainStockPrice(nowPrice);
// 股价波动标志
if (priceChange) {
// 通知波动信息
observer.obtainStockChangeMsg(msg);
}
});
// 恢复标志位
priceChange = false;
}
/**
* 股价改变
*
* @param newPricce 新价格
*/
public void stockPriceChange(float newPricce) {
// 波动情况
float differencePrice = newPricce - nowPrice;
float absDifferencePrice = Math.abs(differencePrice);
float diff = absDifferencePrice / nowPrice;
log.info("原股价 -> 新股价:{} -> {},差值: {},差值/原股价: {}", nowPrice, newPricce, differencePrice, diff);
log.info("差值: {},差值/原股价: {}\n", differencePrice, diff);
// 波动
if (diff >= 0.05) {
priceChange = true;
msg = msg + diff;
}
// 更新当前股价信息
nowPrice = newPricce;
// 通知所有股民
notifyObserver();
}
}
增加和删除订阅者信息没啥好说的,集合stockObserverVector做下增删就行了,看下这个notifyObserver()
方法,用处就是通知所有的订阅方。
@Override
public void notifyObserver() {
stockObserverVector.forEach(observer -> {
// 通知当前价格
observer.obtainStockPrice(nowPrice);
// 股价波动标志
if (priceChange) {
// 通知波动信息
observer.obtainStockChangeMsg(msg);
}
});
// 恢复标志位
priceChange = false;
}
这个通知方法,其实就是for循环了一手,取出来内部的每个订阅者信息,都通知下他们当前的价格obtainStockPrice()
,发现变动差距超过xx后,就就额外通知下价格已经变动了,反正就是发通知呗。
stockPriceChange()方法使我们测试用的,传入一个新的价格,触发下通知。
/**
* 股价改变
*
* @param newPricce 新价格
*/
public void stockPriceChange(float newPricce) {
// 波动情况
float differencePrice = newPricce - nowPrice;
float absDifferencePrice = Math.abs(differencePrice);
float diff = absDifferencePrice / nowPrice;
log.info("原股价 -> 新股价:{} -> {},差值: {},差值/原股价: {}", nowPrice, newPricce, differencePrice, diff);
log.info("差值: {},差值/原股价: {}\n", differencePrice, diff);
// 波动
if (diff >= 0.05) {
priceChange = true;
msg = msg + diff;
}
// 更新当前股价信息
nowPrice = newPricce;
// 通知所有股民
notifyObserver();
}
测试代码
@Test
void name1() {
StockSubject_PriceChange stockSubject_priceChange = new StockSubject_PriceChange();
// 添加观察者
stockSubject_priceChange.registerObserver(new Wang());
stockSubject_priceChange.registerObserver(new Song());
// 模拟股价变动
stockSubject_priceChange.stockPriceChange(2.0f);
}
运行结果
2023-07-13 22:59:36.546 -- [main] INFO cn.yang37.design.patterns24.observe.practice.p1.StockSubject_PriceChange.stockPriceChange - 原股价 -> 新股价:2.0 -> 2.0,差值: 0.0,差值/原股价: 0.0
2023-07-13 22:59:36.547 -- [main] INFO cn.yang37.design.patterns24.observe.practice.p1.StockSubject_PriceChange.stockPriceChange - 差值: 0.0,差值/原股价: 0.0
2023-07-13 22:59:36.547 -- [main] INFO cn.yang37.design.patterns24.observe.practice.p1.StockInvestor.obtainStockPrice - 当前对象: Wang,当前股价: 2.0
2023-07-13 22:59:36.548 -- [main] INFO cn.yang37.design.patterns24.observe.practice.p1.StockInvestor.obtainStockPrice - 当前对象: Song,当前股价: 2.0
可以看到,新股价传入2.0时,没有变动,大家只是收到了当前股价的通知。
// 模拟股价变动
stockSubject_priceChange.stockPriceChange(2.5f);
运行结果
2023-07-13 23:02:27.181 -- [main] INFO cn.yang37.design.patterns24.observe.practice.p1.StockSubject_PriceChange.stockPriceChange - 原股价 -> 新股价:2.0 -> 2.5,差值: 0.5,差值/原股价: 0.25
2023-07-13 23:02:27.182 -- [main] INFO cn.yang37.design.patterns24.observe.practice.p1.StockSubject_PriceChange.stockPriceChange - 差值: 0.5,差值/原股价: 0.25
2023-07-13 23:02:27.183 -- [main] INFO cn.yang37.design.patterns24.observe.practice.p1.StockInvestor.obtainStockPrice - 当前对象: Wang,当前股价: 2.5
2023-07-13 23:02:27.183 -- [main] INFO cn.yang37.design.patterns24.observe.practice.p1.StockInvestor.obtainStockChangeMsg - 当前对象: Wang,通知信息: [股价变动较大通知] 0.25
2023-07-13 23:02:27.183 -- [main] INFO cn.yang37.design.patterns24.observe.practice.p1.StockInvestor.obtainStockPrice - 当前对象: Song,当前股价: 2.5
2023-07-13 23:02:27.183 -- [main] INFO cn.yang37.design.patterns24.observe.practice.p1.StockInvestor.obtainStockChangeMsg - 当前对象: Song,通知信息: [股价变动较大通知] 0.25
修改成2.5f后,股价变动0.25超过0.05,所有人额外还会收到一条价格变动过大的通知。
通过上面的逻辑,主要实现了下面的功能。
- 新用户主要通过
registerObserver(StockObserver observe)
方法添加进来,就能收到我们的通知。 - 股价变动的时候
stockPriceChange(float newPricce)
,只管调用通知就行notifyObserver()
,不必关注有哪些人都在订阅我的信息。
那么回到上面找小黄小李买票的例子,好像,似乎,也许,已经变得简单了?
我只管在那里吼一声"帮忙买票,给两倍的米!"。
你说那你不也是for循环搞出来的,你先别管MQ是咋实现的,问题的关键点在于。
对于我们本身,现在不需要管有几个订阅方,只管吼一声就行了。
2.3 主题模型
主题模型 也可以称为 发布-订阅模型 。
在主题模型中,消息的生产者称为 发布者(Publisher) ,消息的消费者称为 订阅者(Subscriber) ,存放消息的容器称为 主题(Topic) 。
标签:13,yang37,cn,队列,背景,observe,通知,Rocketmq From: https://www.cnblogs.com/yang37/p/17552489.html