首页 > 其他分享 >activemq - ack机制

activemq - ack机制

时间:2024-11-04 09:10:01浏览次数:3  
标签:ack TYPE broker public ACK 消息 机制 activemq message

疑问:

  • 在写 demo 的时候,如果 client 被强制中断,消息来不及处理,这时候消息又出队列了,这样不是会产生严重的问题嘛?
  • 一个会话中,可以同时处理一批数据,如果一条失败了,之前的也要求回滚的话,要怎么处理?
  • 获取一个消息之后,发现程序无法处理这条消息,想要退还回去,该怎么办?

方案:

实际上,activemq 中还有一个消息确认机制。

消息被 consumer 接收之后,仅仅只是成功收到消息而已;

broker 只有接收到 ACK 指令,才会认为消息被正确的消费了;


在写 demo 的时候,一般设置 AUTO_ACKNOWLEDGE,消息确认过程被自动处理了。

要解决前面提出的这些问题,别让程序自动处理,我们需要手动处理,即可解决问题。

brocker 是啥?

就是一个 MQ 服务实例,消息队列服务端。

可以在代码中单独启动 brocker,与使用安装包启动的程序相比,只包含核心部分,缺少日志、数据存储等功能‌。

此外,像是 RabbitMQ,有集群的概念,此时 borcker 就是集群的一个节点。

public class LocalBroker {

    public static void main(String[] args) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setUseJmx(true);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }
}

消息确认机制(ACK)

ACK(Acknowledgement)是确认字符的缩写,JMS API 中约定了四种 ACK_MODE(javax.jms.Session);

‌* 自动ACK(AUTO_ACKNOWLEDGE)‌:这是默认的确认模式,只要消息被发送或接收,即视为完成;
‌* 手动ACK(CLIENT_ACKNOWLEDGE)‌:对一条消息进行 ACK,ACK 的时候,会话上的所有消息,都会被 ACK;
‌* 批量ACK(DUPS_OK_ACKNOWLEDGE)‌:允许批量确认消息,因为线程不安全,可能会重复消费,该种方式很少使用到;
‌* 事务ACK(SESSION_TRANSACTED)‌:事务提交的时候确认‌;

此外 AcitveMQ 补充了一个自定义的 ACK_MODE(org.apache.activemq.ActiveMQSession)

  • 单条确认(INDIVIDUAL_ACKNOWLEDGE):对一条消息进行 ACK,只会 ACK 那一条消息;

手动 ACK

有 3 个 ACK 的方法,只要关注第一个即可;

后面两个方法,需要特殊写法才能调用,用的机会并不多。

  1. message.acknowledge(),
  2. ActiveMQMessageConsumer.acknowledege(),
  3. ActiveMQSession.acknowledge();

class Test{
	public void a(){	
        ActiveMQSession mqSession = (ActiveMQSession) session;		
        activeMQSession.acknowledge();

		
        ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) mqSession.createConsumer(topic);
        consumer.acknowledge();
	}
}

ACK_TYPE

client 与 broker 在交换 ACK 指令的时候,还需要告知 ACK_TYPE,ACK_TYPE 表示此确认指令的类型,不同的 ACK_TYPE 将传递着消息的状态,broker 可以根据不同的 ACK_TYPE 对消息进行不同的操作。

在 JMS API 中并没有定义 ACT_TYPE,因为它通常是一种内部机制,并不会面向开发者,简单了解一下即可。

  • DELIVERED_ACK_TYPE = 0 消息"已接收",但尚未处理结束
  • STANDARD_ACK_TYPE = 2 "标准"类型,通常表示为消息"处理成功",broker端可以删除消息了
  • POSION_ACK_TYPE = 1 消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
  • REDELIVERED_ACK_TYPE = 3 消息需"重发",比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
  • INDIVIDUAL_ACK_TYPE = 4 表示只确认"单条消息",无论在任何ACK_MODE下
  • UNMATCHED_ACK_TYPE = 5 BROKER间转发消息时,接收端"拒绝"消息
  • EXPIRED_ACK_TYPE = 6 消息发生过程中已经过期
package org.apache.activemq.command;

public class MessageAck extends BaseCommand {

    /**
     * Used to let the broker know that the message has been delivered to the
     * client. Message will still be retained until an standard ack is received.
     * This is used get the broker to send more messages past prefetch limits
     * when an standard ack has not been sent.
     */
    public static final byte DELIVERED_ACK_TYPE = 0;

    /**
     * The standard ack case where a client wants the message to be discarded.
     */
    public static final byte STANDARD_ACK_TYPE = 2;

    /**
     * In case the client want's to explicitly let the broker know that a
     * message was not processed and the message was considered a poison
     * message.
     */
    public static final byte POISON_ACK_TYPE = 1;

    /**
     * In case the client want's to explicitly let the broker know that a
     * message was not processed and it was re-delivered to the consumer
     * but it was not yet considered to be a poison message.  The messageCount 
     * field will hold the number of times the message was re-delivered. 
     */
    public static final byte REDELIVERED_ACK_TYPE = 3;
    
    /**
     * The  ack case where a client wants only an individual message to be discarded.
     */
    public static final byte INDIVIDUAL_ACK_TYPE = 4;

	/**
     * The ack case where a durable topic subscription does not match a selector.
     */
    public static final byte UNMATCHED_ACK_TYPE = 5;

    /**
     * the case where a consumer does not dispatch because message has expired inflight
     */
    public static final byte EXPIRED_ACK_TYPE = 6;
}

标签:ack,TYPE,broker,public,ACK,消息,机制,activemq,message
From: https://www.cnblogs.com/chenss15060100790/p/18524416

相关文章

  • activemq - jms规范
    什么是JMS?‌ActiveMQJMS是JavaMessageService的缩写。‌JMS是Java平台上的一个标准API,用于实现应用程序之间的消息传递和通信。它是JavaEE规范的一部分,旨在提供一个与厂商无关的API,以便访问不同的消息中间件系统‌。JMS的组成结构和特点很多内容之前已经提到......
  • activemq - mqttv3
    相比于mqtt-client,mqttv3使用的人相对多些,如果出现问题,好排查一些。activemq部署MQTT服务查看文件:conf\activemq.xml,如果包含下面内容,activemq本身已经包含MQTT服务,不需要任何其它配置。activemq不局限于下面这些,还可以继续扩展,比如:NIO、SSL。前往官网查看:https://a......
  • Webpack 项目构建与优化指南
    文章目录Webpack项目构建与优化指南简介一、创建基本项目1.初始化项目2.创建项目结构3.引入React和TypeScript二、Webpack配置1.基础配置2.开发环境配置3.生产环境配置三、环境变量配置四、文件别名配置五、构建速度优化1.Webpack进度条2.开启持久化存储......
  • Differences Between Datasets Crack
    DifferencesBetweenDatasetsCrackDatacomparisontoolsenableuserstocomparedatavaluesinSQLServerdatabasetables,identifyingdiscrepancies,inconsistencies,andanomalies.Datacomparisontoolsarespecializedsoftwareapplications......
  • 自然语言处理进阶手册--Seq2seq 和注意力机制
    Seq2seq和注意力机制Seq2seqSeq2seq是一种框架结构,它接受一个序列(单词、字母、图像特征等)并输出另一个序列,由编码和解码两部分构成。如在机器翻译任务中,一个序列指的是一系列的词,一个接一个地被处理编码,同样,输出的也是一系列单词,一个接一个地进行解码。具体地,编码器处......
  • 【Unity魔法音效包】SPELLS MAGIC 1 SOUND FX PACK 施法、技能释放、法术命中
    SPELLSMAGIC1:SOUNDFXPACK是UnityAssetStore上的一个音效包,专为奇幻类游戏中的魔法和法术效果设计。该插件提供了丰富的魔法音效,适用于施法、技能释放、法术命中、环境音效等场景。该音效包旨在帮助开发者提升游戏中的法术表现力,使玩家在施展魔法时获得更强的沉浸感......