主要由生产者、Broker、消费者三方共同保证
1 生产者
常用发送消息分为同步发送和异步发送两种(还有一种单向发送, 自行了解哈)
同步发送
消息发送会同步阻塞等待Broker返回结果。Broker确认收到消息后才会返回sendResult, 这个过程中发生异常就需要生产者重新发送。(代码片段如下)
String topic = "topic";
String tags = "tag";
String keys = "key";
String msgBody = "msgJsonStr";
Message msg = new Message(topic, tags, keys, msgBody.getBytes());
try{
SendResult sendResult = producer.send(mag);
//正常的业务逻辑
}cach(Exception e){
//可以重试,也可以先存起来后续处理
}
异步发送
异步发送需要生产者重写SendCallback的onSuccess和onException方法, 用于给Broker进行回调(代码片段如下)
String topic = "topic";
String tags = "tag";
String keys = "key";
String msgBody = "msgJsonStr";
Message msg = new Message(topic, tags, keys, msgBody.getBytes());
SendResult sendResult = producer.send(mag, new SendCallback(){
@Override
public void onSuccess(SendResult sendResult){
//正常的业务逻辑
}
@Override
public void onException(Throwable e){
//可以重试,也可以先存起来后续处理
}
});
当然也可以让RocketMQ自己去重试
producer.setRetryTimesWhenSendFailed(3); // 同步发送失败重试3次
producer.setRetryTimesWhenSendAsyncFailed(3); //异步发送失败重试3次
2 Broker
默认情况, Broker在接收消息会先存到内存, 内存存储成功就直接返回给用户, 然后再异步刷盘, 这个过程中宕机就会丢数据, 所以可以将保存机制改成同步刷盘
flushDiskType=SYNC_FLUSH ##默认是ASYNC_FLUSH
通常RockerMQ都是以集群方式进行部署, Broker采用一主多从进行部署, 通过主从同步方式进行数据复制, 且一般都会进行Broker备份(RockerMQ支持Broker配置多实例), 多个Broker之间进行冗余备份, 保证数据的可靠, 默认情况, Broker接收到消息, 写入master成功就可以返回响应生产者, 接着消息会异步复制到slave节点, 这个中master挂了就会丢数据, 所以可以将同步机制改为同步复制方式 (master将数据同步到slave节点后再返回给生产者结果)
brokerRole=SYNC_MASTER ##默认是ASYNC_MASTER
3 消费者
设置手动提交, 消费完手动返回ack给Bocker, 在业务逻辑处理完后返回 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS, 注意要做好幂等避免重复消费
DefaultMQPushConsumer consume = new DefaultMQPushConsumer("groupName");
consume.setNamesrvAddr("nameserver:9898");
....//各种配置赋值
consume.registerMessageListener(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs...){
for(MessageExt msg : msgs){
String msgId = msg.getMsgId();
if(isDeal(msgId)){
//重复消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
try{
//业务逻辑
....
//业务处理完手动提交
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}cache(Exception ex){
//业务处理失败, 要重试
return ConsumeConcurrentlyStatus.CONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
});
consume.start();
标签:topic,String,重试,Broker,发送,ConsumeConcurrentlyStatus,保证,丢失,RocketMQ
From: https://blog.csdn.net/weixin_44541808/article/details/143570330