1. 如何保证消息不丢失?
消息确认机制
2. 如何保证消息只被消费一次?
为了保证消息丢失,需要付出两方面的代价:一方面是性能的损耗,一方面可能造成消息重复消费。为了保证消息只被消费一次,我们需要保证消费多条消息时所得到的结果就是相同的,即幂等的。消息在生产和消费的过程中都可能会产生重复,所以你要做的是在生产过程和消费过程中增加消息幂等性的保证。
-
生产者保证消息唯一性
给每一个生产者和消息赋予唯一的ID,消息存储时以<生产者ID,最后一条消息ID>存储,当某一个生产者产生新的消息时,消息队列服务端会比对存储的最后一条,如果一致就认为是重复的消息,服务端会自动丢弃。
-
消费者保证消费的唯一性
-
设置全局唯一的消息ID。在消息被生产的时候给它生成一个全局唯一的消息ID,消息被处理之后把这个ID存储在数据库/缓存中,在处理下一条消息之前先从数据库里面检查这个全局ID是否被消费过,如果被消费过就放弃消费。
问题:如果消息在处理之后,还没有来得及写入数据库,消费者宕机了重启之后发现数据库中并没有这条消息,还是会重复执行两次消费逻辑。
解决:这时你就需要引入事务机制,保证消息处理和写入数据库必须同时成功或者同时失败。
适用:全局唯一的消息ID+事务的范式使得消息处理的成本就更高了,所以如果对于消息重复没有特别严格的要求,可以直接使用全局唯一的消息ID,而不考虑引入事务。
-
适用乐观锁。给数据中增加一个版本号的字段,在生产消息时先查询版本号,并且将版本号连同消息一起发送给消息队列。消费端在拿到消息和版本号后,在执行更新账户金额SQL的时候带上版本号
update user set amount = amount + 20, version=version+1 where userId=1 and version=1;
SQL可以执行成功,version+1;在执行第二条相同的消息时,由于version值不再是1,所以这条SQL不能执行成功,也就保证了消息的幂等性。
-
3. 减少消息延迟
消息队列已经堆积了大量的消息,消费者由于性能问题不能及时消费消息,就容易造成消息堆积。
如何增加增加消费者的消息处理能力?
- 消息队列: 为Topic(话题)配置多个Partition(分区),过增加分区来提高消费者的处理能力。
-
消费者:
- 优化消费代码提升性能;
- 增加消费者的数量。消费者使用线程池,在接收到消息之后把消息丢到线程池中来异步地处理,这样,原本串行的消费消息的流程就变成了并行的消费,可以提高消息消费的吞吐量