目录
1、什么是消息队列?
- 消息队列一般简称为 MQ (Messges Queue),是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成,是在消息的传输过程中保存消息的容器。消息队列本质上是一个队列,而队列中存放的是一个个消息。
- 队列是一个数据结构,具有先进先出的特点。而消息队列就是将消息放到队列里,用队列做存储消息的介质。消息的发送放称为生产者,消息的接收方称为消费者。
- 消息队列由 Broker(消息服务器,核心部分)、Producer(消息生产者)、Consumer(消息消费者)、Topic(主题)、Queue(队列)和Message(消息体)组成。
2、消息队列有哪些功能?
消息队列主要有三个功能,分别是流量消峰、应用解耦和消息分发(异步)。
2.1 流量消峰
流量削峰主要用于在高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。
举个例子,假设Qunar订单系统每秒最多能处理1000次下单,这个处理能力应对正常时段的下单是绰绰有余的。但在五一、十一期间,下单QPS峰值达到了5000甚至更高,此时如果没有消息队列这种缓冲机制,为了保证系统稳定,我们只能在一秒内订单超过1000次后就不允许用户下单了;如果有消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时虽然有些用户可能在下单后十几秒才能收到下单成功的状态,但是也比不能下单的体验要好
2.2 应用解耦
应用解耦主要用于当一个业务需要多个模块共同实现时,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
复杂的应用里会存在多个子系统,比如在电商应用中有订单系统、库存系统、物流系统、支付系统等。这个时候如果各个子系统之间的耦合性太高,整体系统的可用性就会大幅降低。多个低错误率的子系统强耦合在一起,得到的是一个高错误率的整体系统
用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验
当转变成基于消息队列的方式后,系统可用性就高多了,比如物流系统因为发生故障,需要几分钟的时间来修复,在这几分钟的时间里,物流系统要处理的内容被缓存在消息队列里,用户的下单操作可以正常完成。当物流系统恢复后,补充处理存储在消息队列里的订单信息即可,用户感知不到物流系统发生过几分钟的故障。
2.3 消息分发(异步)
消息分发主要用于某些数据需要被多个系统使用的场景。数据的产生方只需要把自己的数据写入一个消息队列即可,数据使用方根据各自需求订阅感兴趣的数据,不同系统所订阅的数据可以重复也可以不重复,互不干扰,也不必和数据产生方关联。
此外,消息分发也实现了异步的功能,对于一些实时性要求不高的业务,主系统可以将数据放入消息队列中后由其他系统来处理,主系统就可以继续执行后续操作,从而提高系统相应时间。
3、RocketMQ
3.1 RocketMQ简介
- RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。
- RocketMQ由四部分组成,分布式消息队列是用来高效地传输消息的,它的功能和现实生活中的邮局收发信件很类似,我们类比地说一下相应的模块。现实生活中的邮政系统要正常运行,离不开下面这四个角色,一是发信者,二是收信者,三是负责暂存、传输的邮局,四是负责协调各个地方邮局的管理机构。对应到RocketMQ中,这四个角色就是Producer、Consumer、Broker和NameServer。
3.2 RocketMQ使用
启动RocketMQ的顺序是先启动NameServer,再启动Broker,这时候消息队列已经可以提供服务了,想发送消息就使用Producer来发送,想接收消息就使用Consumer来接收。
代码中导入依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.1</version> </dependency> |
启动NameServer:
start mqnamesrv -n 127.0.0.1:9876
启动Broker:
start mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true
消费者接收消息:
public class RocketMqConsumer { public static void main(String[] args) throws MQClientException { new RocketMqConsumer().defaultMQPushConsumer(); } public void defaultMQPushConsumer() throws MQClientException { //定义消费者,可以指定消费集群 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name"); //同样的,指定name server 的地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //订阅topicA下的所有消息 consumer.subscribe("topicA","*"); //一个consumer可以订阅多个topic consumer.subscribe("topicB","*"); consumer.subscribe("topicC","*"); //程序第一次启动从消息队列头取数据 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //注册订阅消息(消息监听) consumer.registerMessageListener( (MessageListenerConcurrently) (list, Context) -> { MessageExt msg = list.get(0); System.out.println("-收到消息:id-"+msg.getMsgId() +","+ new String(msg.getBody(), StandardCharsets.UTF_8) +","+"keys: "+msg.getKeys() ); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } ); consumer.start(); System.out.println("consumer消费者启动"); } } |
生产者发送消息:
public class RocketMqProducer { public static void main(String[] args) throws MQBrokerException, RemotingException, UnsupportedEncodingException, InterruptedException, MQClientException { new RocketMqProducer().defaultMQProducer(); } public void defaultMQProducer() throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException { //生产者,可以指定producer集群 DefaultMQProducer producer = new DefaultMQProducer("producer_group_name"); //设置name server的地址 producer.setNamesrvAddr("127.0.0.1:9876"); /* 设置InstanceName,当一个Jvm需要启动多个Producer的时候,通过设置不同的InstanceName来区分, 不设置的话系统使用默认名称“DEFAULT” */ producer.setInstanceName("instance1"); /* 设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。 想保证不丢消息,可以设置多重试几次。 */ producer.setRetryTimesWhenSendFailed(3); producer.start(); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); System.out.println("启动了生产者producer"); //message必须指定topic,和消息体body // 可以选择指定tag,key来进行细分message Message msgA = new Message("topicA", "这是topicA的消息,没有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET)); msgA.setDelayTimeLevel(3); Message msgB = new Message("topicB", "这是topicB的消息,没有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET)); Message msgC = new Message("topicC","tag-a","这是topicC的消息,指定了tag-a".getBytes(RemotingHelper.DEFAULT_CHARSET)); Message msgD = new Message("topicC","tag-b","key2","这是topicC的消息,指定了tag-b".getBytes(RemotingHelper.DEFAULT_CHARSET)); Message msgE = new Message("topicC","tag-a","key1","这是topicC的消息,指定了tag-a和key1".getBytes(RemotingHelper.DEFAULT_CHARSET)); Message[] messages =new Message[]{msgA,msgB,msgC,msgD,msgE}; //发送消息 for (Message message : messages) { SendResult result = producer.send(message); SendResult result1 = producer.send(message, (mqs, msg, arg) -> { int id = Integer.parseInt(arg.toString()); int idMainIndex = id / 100; int size = mqs.size(); int index = idMainIndex % size; return mqs.get(index); }, "100"); System.out.println("消息发送成功:id:" + result.getMsgId() + " result:" + result.getSendStatus()); } } } |
3.3 RocketMQ四大组件
上文提到,RocketMQ最重要的四大组件,他们之间的关系如下图:
3.3.1 Consumer
消费者是 RocketMQ 中用来接收并处理消息的运行实体。 消费者通常被集成在业务系统中,从RocketMQ 服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
在RocketMQ中,消费者主要可分为两种类型:DefaultMQPushConsumer和DefaultMQPullConsumer
3.3.1.1 DefaultMQPushConsumer
使用DefaultMQPushConsumer主要是设置好各种参数和传入处理消息的函数。系统收到消息后自动调用处理函数来处理消息,自动保存Offset,而且加入新的DefaultMQPushConsumer后会自动做负载均衡。
DefaultMQPushConsumer需要设置三个参数:GroupName、NameServer的ip+port和Topic
1、GroupName
Consumer的GroupName用于把多个Consumer组织到一起,提高并发处理能力,GroupName需要和消息模式配合使用。
RocketMQ支持两种消息模式:Clustering和Broadcasting。
- 在Clustering模式下,同一个ConsumerGroup(GroupName相同)里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体,从而达到负载均衡的目的。
- 在Broadcasting模式下,同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费。
2、NameServer的ip和port
NameServer的地址和端口号,可以填写多个,用分号隔开,达到消除单点故障的目的,比如 “ip1:port; ip2:port; ip3:port”
3、Topic
Topic用来标识消息类型,需要提前创建。如果不需要消费某个Topic下的所有消息,可以通过指定消息的Tag进行消息过滤,比如:Consumer.subscribe("TopicTest", "tag1 || tag2 || tag3"),表示这个Consumer要消费“TopicTest”下带有tag1或tag2或tag3的消息(Tag是在发送消息时设置的标签)。在填写Tag参数的位置,用null或者“*”表示要消费这个Topic的所有消息。
DefaultMQPushConsumer虽然是以push命名的,但它真实的消息拉取方式并不是单纯的push方式。
Push方式是Server端接收到消息后,主动把消息推送给Client端,实时性高。对于一个提供队列服务的Server来说,用Push方式主动推送有很多弊端:首先是加大Server端的工作量,进而影响Server的性能;其次,Client的处理能力各不相同,Client的状态不受Server控制,如果Client不能及时处理Server推送过来的消息,会造成各种潜在问题。
Pull方式是Client端循环地从Server端拉取消息,主动权在Client手里,自己拉取到一定量消息后,处理妥当了再接着取。Pull方式的问题是循环拉取消息的间隔不好设定,间隔太短就处在一个“忙等”的状态,浪费资源;每个Pull的时间间隔太长,Server端有消息到来时,有可能没有被及时处理。
RocketMQ中,使用了“长轮询”的方式进行消息拉取。长轮询与传统的轮询方式不同,它允许消费者在没有消息可供拉取时等待一段时间,从而降低了不必要的轮询频率,减少了资源消耗,同时保持了实时性。它提供了一种更有效的方法来获取消息,减少资源浪费,并在实时性与效率之间取得平衡。
“长轮询”的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer。“长轮询”的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer。
流量控制:
PushConsumer有个线程池,消息处理逻辑在各个线程里同时执行,Pull获得的消息,如果直接提交到线程池里执行,很难监控和控制。
RocketMQ定义了一个快照类ProcessQueue来解决这些问题,在PushConsumer运行的时候,每个Message Queue都会有个对应的ProcessQueue对象,保存了这个Message Queue消息处理状态的快照。
ProcessQueue对象里主要的内容是一个TreeMap和一个读写锁。TreeMap里以Message Queue的Offset作为Key,以消息内容的引用为Value,保存了所有从MessageQueue获取到,但是还未被处理的消息;读写锁控制着多个线程对TreeMap对象的并发访问,
客户端在每次Pull请求之前会做下面三个判断来控制流量:消息个数、消息总大小以及Offset的跨度,任何一个值超过设定的大小就隔一段时间(默认50毫秒)再拉取消息,由此来达到流量控制的目的。
Offset存储位置:
在集群消费模式下,Consumer Offset 是存储在 Broker 上的。每个消费者组(Consumer Group)和每个消息队列都有一个 Consumer Offset。消费者组中的不同消费者实例会共享相同的消费进度(Consumer Offset),这允许消费者组内的不同实例协同工作以提供高可用性和负载均衡。
在广播消费模式下,Consumer Offset 通常是存储在消费者客户端本地的,而不是存储在 Broker 上。这是因为 Broadcasting 模式下,不同的消费者实例独立消费消息,没有协同工作的需要,所以不需要在 Broker 上存储 offset 信息
3.3.1.2 DefaultMQPullConsumer
相对于DefaultMQPushConsumer,DefaultMQPullConsumer使用起来更复杂,拉取方式也有所不同。
DefaultMQPullConsumer需要显式地调用 pull 操作来从消息队列中获取消息。消费者控制何时拉取消息,可以选择一次拉取一个消息或一批消息,但需要编写额外的逻辑来实现拉取消息和消息处理。
在使用场景上,DefaultMQPullConsumer通常用于需要更精确控制消息拉取过程的场景,例如处理特定的定时任务、数据同步等。它需要使用者编写更多的代码来处理消息拉取逻辑;而DefaultMQPushConsumer 更适用于普通的消息消费。
3.3.2 Producer
生产者是 RocketMQ 系统中用来构建并传输消息到服务端的运行实体。 生产者通常被集成在业务系统中,将业务消息按照要求封装成 RocketMQ 的消息(Message)并发送至服务端。
3.3.2.1 三种消息发送方式
- 同步(sync):发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回发送结果。
- 异步(async):发送者向MQ执行发送消息API时,指定消息发送成功后的回调函数,然后调用消息发送API后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。
- 单向(oneway):消息发送者向MQ执行发送消息API时,直接返回,不等待消息服务器的结果,也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上。
3.3.2.2 消息发送基本流程
消息发送流程主要的步骤:验证消息、查找路由、消息发送(包含异常处理机制)。
1)消息发送之前,首先确保生产者处于运行状态,然后验证消息是否符合相应的规范,具体的规范要求是主题名称、消息体不能为空、消息长度不能等于0且默认不能超过允许发送消息的最大长度4M(maxMessageSize=1024 * 1024 * 4)。
2)消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息要发送到具体的Broker节点。如果生产者中缓存了topic的路由信息,且该路由信息中包含了消息队列,则直接返回该路由信息,如果没有缓存或没有包含消息队列,则向NameServer查询该topic的路由信息。如果最终未找到路由信息,则抛出异常:无法找到主题相关路由信息异常。
3)消息发送端采用重试机制,由retryTimesWhenSendFailed指定同步方式重试次数,异步重试机制在收到消息发送结构后执行回调之前进行重试。如果消息重试次数超过允许的最大重试次数,消息将进入到DLQ延迟队列。
3.3.2.3三种消息类型
普通消息发送
生产者发送消息默认使用的是DefaultMQProducer类,发送消息要经过五个步骤(结合代码看):
1)设置Producer的GroupName
2)设置InstanceName,当一个Jvm需要启动多个Producer的时候,通过设置不同的InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”
3)设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证不丢消息,可以设置多重试几次
4)设置NameServer地址
5)组装消息并发送
消息发送的返回状态有如下四种,不同状态在不同的刷盘策略和同步策略的配置下含义是不同的:
- FLUSH_DISK_TIMEOUT:表示没有在规定时间内完成刷盘。
- FLUSH_SLAVE_TIMEOUT:表示没有在设定时间内完成主从同步。
- SLAVE_NOT_AVAILABLE:表示没有找到被配置成Slave的Broker。
- SEND_OK:表示发送成功。
延时消息发送
RocketMQ支持发送延迟消息,Broker收到这类消息后,延迟一段时间再处理,使消息在规定的一段时间后生效。
延迟消息的使用方法是在创建Message对象时,调用setDelayTimeLevel(int level)方法设置延迟时间,然后再把这个消息发送出去。
目前延迟的时间不支持任意设置,仅支持预设值的时间长度(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)。比如setDelayTimeLevel(3)表示延迟10s
事务消息发送
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。它采用两阶段提交的方式实现事务消息。
具体流程如下:
1)发送方向RocketMQ发送“待确认”消息。
2)RocketMQ将收到的“待确认”消息持久化成功后,向发送方回复消息已经发送成功,此时第一阶段消息发送完成。
3)发送方开始执行本地事件逻辑。
4)发送方根据本地事件执行结果向RocketMQ发送二次确认(Commit或是Rollback)消息,RocketMQ收到Commit状态则将第一阶段消息标记为可投递,订阅方将能够收到该消息;收到Rollback状态则删除第一阶段的消息,订阅方接收不到该消息。
5)如果出现异常情况,步骤4)提交的二次确认最终未到达RocketMQ,服务器在经过固定时间段后将对“待确认”消息发起回查请求。
6)发送方收到消息回查请求后(如果发送一阶段消息的Producer不能工作,回查请求将被发送到和Producer在同一个Group里的其他Producer),通过检查对应消息的本地事件执行结果返回Commit或Roolback状态。
7)RocketMQ收到回查请求后,按照步骤4)的逻辑处理。
3.3.3 NameServer
NameServer是整个消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息。同时,各个角色的机器都要定期向NameServer上报自己的状态,超时不上报的话,NameServer会认为某个机器出故障不可用了,其他的组件会把这个机器从可用列表里移除。
NameServer本身的高可用是通过部署多台NameServer服务器来实现,但彼此之间互不通讯,也就是NameServer服务器之间在某一时刻的数据并不完全相同,但这对消息发送并不会造成任何影响。
3.3.3.1 NameServer的集群状态存储结构
集群的状态就保存于五个变量中,NameServer 的主要工作就是维护这五个变量中存储的信息。
①.private final HashMap topicQueueTable
Key 是 Topic 的名称,它存储了所有Topic 的属性信息 。 Value 是个 QueueData 队列,队里的长度等于这个 Topic 数据存储的 MasterBroker的个数。 QueueData里存储着 Broker的名称、读写queue的数量、 同步标识等。
②.private final HashMap Broker- AddrTable
这个结构存储着一个 BrokerName 对应的属性信息, 包括所属的 Cluster 名称,Master Broker 和多个 Slave Broker 的地址信息。
③.private final HashMap ClusterAddrTable
存储的是集群中Cluster的信息 Cluster名称对应一个由BrokerName组成的集合
④.private final HashMap Broker-LiveTable
BrokerLiveTable 存储的内容是这台Broker机器的实时状态, 包括上次更新状态的时间戳, NameServer会定期检查这个时间戳,超时没有更新就认为这个 Broker无效了,将其从 Broker列表里清除。
⑤.private fina l HashMap filterServerTable
Filter Server是过滤服务器,是 RocketMQ 的一种服务端过滤方式。 一个Broker可以有一个或多个Filter Server。 Key 是 Broker 的地址 Value 是和这个 Broker关联的多个 Filter Server 的地址。
3.3.3.2 NameServer的状态维护逻辑
RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳语句,每隔30s向集群中所有NameServer发送心跳包,NameServer每隔10s检查一次,如果连续120s没有收到心跳包,NameServer将移除该Broker的路由信息同时关闭Socket连接。
RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不主动推送给客户端,而是由客户端定时拉取主题最新的路由。
3.3.3.3 新建Topic
创建Topic的代码是在org.apache.rocketmq.tools.command.topic里的UpdateTopicSubCommand类中,创建Topic的命令是updateTopic
updateTopic参数:
Option("b", "BrokerAddr", true, "create topic to which Broker"); Option("c", "ClusterName", true, "create topic to which Cluster"); |
b和c参数比较重要,而且他们俩只有一个会起作用(-b优先), b参数指定在哪个Broker上创建本Topic的Message Queue, c参数表示在这个Cluster下面所有的Master Broker上创建这个Topic的Message Queue,从而达到高可用性的目的
具体的创建动作是通过发送命令触发的,创建Topic的命令被发往对应的Broker, Broker接到创建Topic的请求后,执行具体的创建逻辑:
private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { ... this .BrokerController.getTopicConfigManager().updateTopicConfig(topicConfig); //更新本地的topicConfig this.BrokerController.registerBrokerAll(false, true); //向NameServer发送registerBroker请求 return null; } |
最后一步是向NameServer发送注册信息,NameServer完成创建Topic的逻辑后,其他客户端才能发现新增的Topic。首先更新Broker信息,然后对每个Master角色的Broker,创建一个QueueData对象。如果是新建Topic,就是添加QueueData对象;如果是修改Topic,就是把旧的QueueData删除,加入新的QueueData。
3.3.3.4 为何不用ZooKeeper
ZooKeeper的功能很强大,包括自动Master选举等,RocketMQ的架构设计决定了它不需要进行Master选举, 用不到这些复杂的功能,只需要一个轻量级的元数据服务器就足够了。
RocketMQ的NameServer只有很少的代码,容易维护,所以不需要再依赖另一个中间件,从而减少整体维护成本。
NameServer在RocketMQ集群中扮演调度中心的角色。各个Producer、Consumer 上报自己的状态上去,同时从NameServer获取其他角色的状态信息。NameServer 的功能虽然非常重要,但是被设计得很轻量级,代码量少并且几乎无磁盘存储,所有的功能都通过内存高效完成。
3.3.4 Broker
在RocketMQ中,broker是消息的中转和存储节点,负责消息的存储、分发和管理。
整体架构图:
3.3.4.1 消息存储机制
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog, ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。
存储机制这样设计有以下几个好处:
- CommitLog顺序写,可以大大提高写入效率。
- 虽然是随机读,但是利用操作系统的pagecache机制,可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度。
- 为了保证完全的顺序写,需要ConsumeQueue这个中间结构,因为ConsumeQueue里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的ConsumeQueue能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证CommitLog和ConsumeQueue的一致性,CommitLog里存储了Consume Queues、Message Key、Tag等所有信息,即使ConsumeQueue丢失,也可以通过commitLog完全恢复出来。
3.3.4.2 刷盘机制
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。
消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式:
- 异步刷盘:消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入PageCache 缓存中,然后用后台线程异步把消息刷入磁盘。异步刷盘策略就是消息写入 PageCache 后立即返回成功,这样写入效率非常高。如果能容忍消息丢失,异步刷盘是最好的选择。
- 同步刷盘:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
3.3.4.3 复制机制
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式:
- 同步复制方式是等Master和Slave均写成功后才反馈给客户端写成功状态,在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
- 异步复制方式是只要Master写成功即可反馈给客户端写成功状态,在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢失。
一个不错的选择:
把Master和Save配置成异步的刷盘方式,主从之间配置成同步的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢。