功能
Pulsar是一种用于服务器到服务器消息传递的多租户、高性能解决方案。Pulsar 最初由 Yahoo 开发,由 Apache 软件基金会管理。
Pulsar是由Apache软件基金会开发的分布式发布/订阅消息系统,它结合了传统消息系统(如RabbitMQ)和发布-订阅系统(如Apache Kafka)的最佳特性,为可伸缩性、可靠性和灵活性提供了很大的优势。
主要特点
- 原生支持 Pulsar 实例中的多个集群,并跨集群无缝地复制消息。
- 非常低的发布和端到端延迟。
- 无缝扩展到超过 100 万个主题。
- 一个简单的客户端 API,带有 Java、Go、Python 和 C++ 的绑定。
- 主题的多种订阅类型(独占、共享和故障转移)。
- 使用 Apache BookKeeper 提供的持久消息存储保证消息传递。Pulsar Functions 是一个无服务器轻量级计算框架,提供了流原生数据处理的能力。
- 基于 Pulsar Functions 构建的无服务器连接器框架 Pulsar IO 可以更轻松地将数据移入和移出 Apache Pulsar。
- 分层存储 当数据老化时,将数据从热/暖存储卸载到冷/长期存储(例如 S3 和 GCS)。
主题的多种订阅类型(独占、共享和故障转移)是指在消息队列或发布-订阅系统中,客户端可以以不同的方式订阅主题并接收消息的机制。以下是这些订阅类型的详细解释:
- 独占订阅
- 定义:独占订阅意味着一个主题只能有一个订阅者。如果有多个客户端尝试以独占方式订阅同一个主题,只有第一个成功订阅的客户端会接收到消息,其他客户端将被拒绝。
- 特点:
- 单一接收者:确保每条消息只被一个客户端接收。
- 防止重复处理:适用于需要确保消息不被重复处理的场景。
- 简单性:实现和管理相对简单。
- 共享订阅
- 定义:共享订阅允许多个客户端订阅同一个主题,并且所有订阅者都能接收到消息的副本。
- 特点:
- 多接收者:每个消息都会被发送到所有订阅者。
- 负载均衡:适用于需要将消息分发给多个处理单元的场景。
- 灵活性:可以根据需求动态增加或减少订阅者。
- 故障转移订阅
- 定义:故障转移订阅是一种特殊的共享订阅,其中一个客户端被指定为主订阅者,其他客户端作为备用。当主订阅者不可用时,备用客户端会接管并开始接收消息。
- 特点:
- 高可用性:确保即使在主订阅者失败的情况下,消息处理也不会中断。
- 自动切换:系统会自动检测主订阅者的状态并进行切换。
- 顺序保证:通常能保证消息的顺序性,因为只有一个客户端在处理消息。
- 应用场景
- 独占订阅:适用于需要严格控制消息处理流程的场景,如金融交易系统。
- 共享订阅:适用于需要并行处理消息以提高吞吐量的场景,如日志分析系统。
- 故障转移订阅:适用于需要高可用性和容错能力的场景,如关键业务应用。
通过这些不同的订阅类型,系统设计者可以根据具体的业务需求和系统特性选择最合适的订阅模式,以实现最佳的性能和可靠性。
架构和组件
Pulsar的整体架构采用了分层设计,主要包含以下几个关键组件:
- Broker(代理):
- Pulsar的核心组件,负责接收、存储和传递消息。
- Pulsar集群包含多个Broker,它们分布在整个系统中,形成一个高度可伸缩的消息传递网络。
- ZooKeeper:
- 用于进行集群协调和元数据管理。
- 维护了Pulsar集群的配置信息、主题信息以及活跃的Broker列表等关键元数据。
- 负责检测Pulsar集群中的节点故障,并进行相应的故障恢复。
- BookKeeper:
- Pulsar的存储层,用于持久化消息。
- 提供了高度可靠的分布式日志存储,确保消息的持久性和可靠性。
- 使用分布式日志来记录消息的写入操作,使得多个Broker能够协同工作,同时确保消息的有序性。
- Pulsar Proxy(可选组件):
- 允许客户端通过代理访问Pulsar集群。
- 提供了负载均衡、安全性和协议转换等功能。
- Pulsar Functions:
- 一个用于处理和转换消息的Serverless计算框架。
- 允许用户以简单的方式编写和部署函数来处理消息流。
传送消息
Pulsar 构建在 publish-subscribe 模式(通常缩写为 pub-sub)之上。在此模式中,创建者将消息发布到主题;使用者订阅这些主题,处理传入消息,并在处理完成后向代理发送确认。
创建订阅时,Pulsar **会保留**所有消息,即使 consumer 断开连接。只有当使用者确认所有这些消息都已成功处理时,才会丢弃保留消息。如果消息使用失败,并且您希望再次使用该消息,则可以启用消息重新传递机制以请求代理重新发送此消息。
-
**消息重新传递:**Apache Pulsar 中的消息重投使用至少一次的消息传递语义来避免异步消息传递失败和其他消息传递失败,以确保 Pulsar 多次处理消息。要使用消息重新投递,你需要在 broker 可以在 Apache Pulsar 客户端中重新发送未确认的消息之前启用此机制。你可以使用三种方法在 Apache Pulsar 中激活消息重投机制。1.否定确认 2.确认超时 3.重试信件主题
-
消息保留和过期:默认情况下brokers中会立即删除 Consumer 已确认的所有消息,以及将所有未确认的消息持久存储在 Message backlog 中。但是,Pulsar 有两个功能,使您能够覆盖此默认行为:消息保留使您能够存储使用者已确认的消息,消息过期 使您能够为尚未确认的消息设置生存时间 (TTL)。所有消息保留和过期都在命名空间级别进行管理。
-
**消息去重:**当 Pulsar 多次持久化消息时,会发生消息重复。消息重复数据删除可确保在 Pulsar 主题上生成的每条消息只持久化到磁盘一次,即使该消息被多次生成。消息重复数据删除在服务器端自动处理。每个 Pulsar 消息都属于其主题的有序序列。消息的序列 ID 最初由其生成者分配,指示其在该序列中的顺序,并且还可以自定义。序列 ID 可用于消息重复数据删除。如果
brokerDeduplicationEnabled
设置为true
,则每条消息的序列 ID 在主题(非分区)或分区的创建者中是唯一的。 -
延迟消息传递:通过延迟消息传送,您可以稍后使用消息。在这种机制中,一条消息存储在 BookKeeper 中。在将消息发布到 broker 后,
DelayedDeliveryTracker
会在内存中维护时间索引 (time -> messageId)。一旦指定的延迟结束,此消息将传递给使用者。
架构概念
在最高级别,Pulsar 实例由一个或多个 Pulsar 集群组成。实例中的集群可以在它们之间复制数据。
Pulsar 集群由以下组件组成:一个或多个 broker 处理和负载均衡来自 producer 的传入消息,将消息分派给消费者,与 Pulsar 配置存储通信以处理各种协调任务,将消息存储在 BookKeeper 实例(又名 bookie)中,依赖特定于集群的 ZooKeeper 集群来完成某些任务,等等。由一个或多个 bookie 组成的 BookKeeper 集群处理消息的持久存储。特定于该集群的 ZooKeeper 集群处理 Pulsar 集群之间的协调任务。
-
Brokers:Pulsar 消息代理是一个无状态组件,主要负责运行其他两个组件:一个 HTTP 服务器,它为创建者和使用者的管理任务和主题查找公开 REST API。创建者连接到代理以发布消息,使用者连接到代理以使用消息。调度程序,它是基于自定义二进制协议的异步 TCP 服务器,用于所有数据传输。
-
**Clusters 集群:**一个 Pulsar 实例由一个或多个 Pulsar 集群组成。集群又包括:一个或多个Pulsar broker,用于集群级配置和协调的 ZooKeeper 仲裁,用于持久存储消息的 bookie 集合。群集可以使用异地复制在它们之间进行复制。
-
**Metadata store 元素数据存储:**Pulsar 元数据存储维护 Pulsar 集群的所有元数据,例如主题元数据、schema、broker 加载数据等。Pulsar 使用 Apache ZooKeeper 进行元数据存储、集群配置和协调。Pulsar 元数据存储可以部署在单独的 ZooKeeper 集群上,也可以部署在现有的 ZooKeeper 集群上。您可以将一个 ZooKeeper 集群用于 Pulsar 元数据存储和 BookKeeper 元数据存储。如果要部署连接到现有 BookKeeper 集群的 Pulsar broker,则需要分别为 Pulsar 元数据存储和 BookKeeper 元数据存储部署单独的 ZooKeeper 集群。Pulsar 还支持更多元数据后端服务,包括 etcd 和 RocksDB(仅适用于独立的 Pulsar)。在 Pulsar 实例中:配置存储 quorum 存储租户、命名空间和其他需要全局一致的实体的配置。每个集群都有自己的本地 ZooKeeper ensemble,用于存储特定于集群的配置和协调,例如哪些代理负责哪些主题以及所有权元数据、代理负载报告、BookKeeper 账本元数据等。
-
**Configuration store 配置存储:**配置存储是 ZooKeeper 仲裁,用于特定于配置的任务,它维护 Pulsar 实例的所有配置,例如集群、租户、命名空间、分区主题相关配置等。一个 Pulsar 实例可以有单个本地集群、多个本地集群或多个跨区域集群。因此,配置存储可以在 Pulsar 实例下的多个集群之间共享配置。配置存储可以部署在单独的 ZooKeeper 集群上,也可以部署在现有的 ZooKeeper 集群上。
-
**Persistent storage持久存储:**Pulsar 为应用程序提供有保障的消息传递。如果消息成功到达 Pulsar broker,它将被传送到其预期目标。此保证要求将未确认的消息持久存储,直到它们可以传送给使用者并得到确认。这种消息传递模式通常称为持久性消息传递。在 Pulsar 中,所有消息的 N 个副本都存储在磁盘上并同步,例如,在两台服务器上存储 4 个副本,每台服务器上都有镜像 RAID 卷。
客户端
Pulsar 公开了一个客户端 API,其中包含 Java、C++、Go、Python、Node.js 和 C# 的语言绑定。客户端 API 优化和封装了 Pulsar 的客户端-代理通信协议,并公开了一个简单直观的 API 供应用程序使用。Pulsar 客户端库支持透明重新连接和/或连接故障转移到 broker,消息排队直到 broker 确认,以及启发式方法,例如使用回退进行连接重试。
- Client setup phase客户端设置阶段
在应用程序创建 producer/consumer 之前,Pulsar 客户端库需要启动一个设置阶段,包括两个步骤:
- 客户端尝试通过向代理发送 HTTP 查找请求来确定主题的所有者。该请求可以到达其中一个活动代理,该代理通过查看(缓存的)Zookeeper 元数据知道谁在为主题提供服务,或者在没有人为主题提供服务的情况下,尝试将其分配给负载最少的代理。
- 客户端库获得代理地址后,它会创建一个 TCP 连接(或重用池中的现有连接)并对其进行身份验证。在此连接中,客户端和代理交换来自自定义协议的二进制命令。此时,客户端向代理发送创建创建者/使用者的命令,代理将在验证授权策略后遵守。
- Producer生产者
生产者 (producer) 是附加到主题并将消息发布到 Pulsar broker 的进程。Pulsar broker 处理消息。
- Send mode 发送模式
发送模式是一种机制,用于确定创建者是同步 (sync) 还是异步 (async) 向代理发送消息。同步:创建者在发送每条消息后等待代理的确认。如果未收到确认,则创建者会将发送操作视为失败。异步:创建者将消息放入阻塞队列中并立即返回。客户端库在后台将消息发送到代理。如果队列已满(您可以配置最大大小),则创建者在调用 API 时将被阻止或立即失败,具体取决于传递给创建者的参数。
- Access mode访问模式
访问模式是一种确定创建者对 Topic 的权限的机制。默认:Shared 共享,多个生产者可以发布一个主题。Exclusive独家:一个生产者只可以发布一个主题。如果已经连接了生产者,则尝试在此主题上发布内容的其他生产者会立即收到错误。如果“旧”生产者遇到与代理的网络分区,则“旧”生产者将被逐出,并且“新”生产者被选为下一个独占生产者。**ExclusiveWithFencing独家带围栏:**一个生产者只可以发布一个主题,如果已经连接了生产者,它将被立即删除并失效。**WaitForExclusive等待排他:**成功成为独家生产商的生产商被视为领导者。因此,如果您想为您的应用程序实现 leader election scheme,则可以使用此访问模式。请注意,提到的 leader 模式方案是指使用 Pulsar 作为预写日志 (WAL),这意味着 leader 将其 “决策” 写入主题。在错误情况下,只有当 broker 尝试编写消息并因适当的错误而失败时,leader 才会收到通知它不再是 leader。
在Pulsar中,Producer(生产者)的访问模式主要有以下几种:
- 独占(Exclusive)模式:
- 在这种模式下,一旦某个Producer成功创建并连接到主题,它将成为该主题的唯一访问者。
- 如果其他Producer尝试访问该主题,它们将立即出错或必须等待,直到获得主题的独占访问权。
- 等待独占(WaitForExclusive)模式:
- 类似于独占模式,但在这种模式下,如果主题已被其他Producer独占,则后续尝试访问的Producer将等待,直到该独占权被释放。
- 共享(Shared)模式:
- 在这种模式下,多个Producer可以并发地访问并发送消息到同一个主题。
- 消息将被均匀地分发到主题的不同分区(如果主题被分区)上,或者按照某种路由策略进行分发。
- 故障转移(Failover)模式:
- 在这种模式下,如果某个Producer发生故障或断开连接,则其他指定的Producer(通常是故障转移组中的下一个Producer)将接管其消息发送任务。
- 这有助于确保在Producer故障时,消息发送的连续性和可靠性。
这些访问模式允许Pulsar根据不同的应用场景和需求,灵活地配置Producer的访问权限和消息分发策略。选择哪种模式取决于具体的业务需求和系统架构。例如,在需要确保消息顺序和一致性的场景中,可能会选择独占模式;而在需要高可用性和负载均衡的场景中,可能会选择共享或故障转移模式。
- Consumer消费者
使用者是通过订阅附加到主题,然后接收消息的进程。
消费者向 broker 发送 flow permit 请求以获取消息。消费者端有一个队列,用于接收从 broker 推送的消息。您可以使用 receiverQueueSize
参数配置队列大小。默认大小为 1000
)。每次调用 consumer.receive()
时,都会从缓冲区中取消一条消息。
- Receive mode 接收模式
接收模式是一种机制,用于确定是从代理同步 (sync) 还是异步 (async) 接收消息。**Sync receive 同步接收:**在消息可用之前,同步接收将被阻止。Async receive 异步接收:异步接收会立即返回一个 future 值(例如,Java 中的 CompletableFuture
),该值在新消息可用时完成。
- Listener听者
客户端库为使用者提供侦听器实现。例如,Java 客户端提供了一个 MesssageListener 接口。在此接口中,每当收到新消息时,都会调用 received
方法。
- Reader读者
在 Pulsar 中,“标准”消费者接口涉及使用 Consumer 来监听主题、处理传入的消息,并最终在处理这些消息时确认这些消息。每当创建新订阅时,它最初都会位于主题的末尾(默认情况下),与该订阅关联的使用者会从之后创建的第一条消息开始阅读。每当使用者使用预先存在的订阅连接到主题时,它就会从该订阅中最早的未确认消息开始读取。总之,使用 consumer 接口,订阅游标由 Pulsar 自动管理以响应消息确认。Pulsar 的读取器接口使应用程序能够手动管理游标。当您使用读取器连接到主题时—而不是使用使用者—您需要指定读取器在连接到主题时从哪条消息开始读取。连接到主题时,Reader 界面允许您从以下位置开始:
- 主题中最早的可用消息。
- 主题中的最新可用消息。
- 最早和最新的一些其他消息。如果您选择此选项,则需要明确提供消息 ID。您的应用程序将负责提前 “知道” 此消息 ID,可能会从持久性数据存储或缓存中获取它。
reader 接口对于使用 Pulsar 为流处理系统提供 effective-once 处理语义等用例非常有用。对于此使用案例,流处理系统必须能够将主题“倒回”到特定消息并开始阅读。reader 接口为 Pulsar 客户端提供了在 topic 中“手动定位”自身所需的低级抽象。在内部,reader 接口作为使用者实现,使用对具有随机分配名称的主题的独占、非持久订阅。
异地复制
无论哪个行业,当发生不可预见的事件并导致日常运营停止时,组织都需要一个准备充分的灾难恢复计划,以快速恢复对客户的服务。但是,灾难恢复计划通常需要具有地理位置分散的数据中心的多数据中心部署。这种多数据中心部署需要异地复制机制,以便在数据中心发生故障时提供额外的冗余。
Pulsar 的异地复制机制通常用于灾难恢复,支持跨多个数据中心复制持久存储的消息数据。例如,您的应用程序正在一个区域中发布数据,并且您希望对其进行处理以供其他区域使用。借助 Pulsar 的异地复制机制,可以在不同的地理位置生产和使用消息。
- Replication mechanisms复制机制
异地复制机制可分为同步异地复制和异步异地复制策略。Pulsar 支持这两种复制机制。
- 异步异地复制
异步异地复制群集由在不同数据中心设置的多个物理群集组成。在 Pulsar 主题上生成的消息首先持久化到本地集群,然后由 broker 异步复制到远程集群。
在正常情况下,当没有连接问题时,消息会立即复制,同时将消息分派给本地使用者。通常,端到端交付延迟由数据中心之间的网络往返时间 (RTT) 定义。应用程序可以在任何集群中创建创建者和使用者,即使无法访问远程集群时(例如,在网络分区期间)。异步异地复制提供较低的延迟,但由于某些数据尚未复制的潜在复制滞后,可能会导致一致性保证较弱。
- 同步异地复制
在同步异地复制中,数据将同步复制到多个数据中心,客户端必须等待其他数据中心的确认。如下图所示,当客户端向一个集群发出写入请求时,写入的数据将被复制到其他两个数据中心。只有当大多数数据中心(在本例中,至少 2 个数据中心)已确认写入已被保留时,才会向客户端确认写入请求。
Pulsar 中的同步异地复制是通过 BookKeeper 实现的。同步异地复制集群由一个在多个数据中心运行的 bookie 集群和一个代理集群以及一个全局 Zookeeper 安装(ZooKeeper 集成在多个数据中心运行)组成。您需要配置 BookKeeper 区域感知的放置策略,以跨多个数据中心存储数据并保证写入时的可用性约束。
同步异地复制提供了最高的可用性,并保证了不同数据中心之间更强的数据一致性。但是,您的应用程序必须跨数据中心支付额外的延迟损失。
- Replication patterns复制模式
Pulsar 为自定义复制策略提供了极大的灵活性。您可以设置不同的复制模式,以便为多个数据中心之间的应用程序提供复制策略。
- 全网状复制
使用全网状复制并应用选择性消息复制,您可以在任意数量的数据中心之间自定义复制策略和拓扑。
- 主动-主动复制
主动-主动复制是全网状复制的一种变体,只有两个数据中心。创建者可以在任何数据中心运行以生成消息,而使用者可以使用来自所有数据中心的所有消息。
- 聚合复制
在将消息从边缘复制到云时,通常使用聚合复制模式。例如,假设您在 3 个前端数据中心中有 3 个集群,在一个中央数据中心有 1 个聚合集群,并且您希望将消息从多个前端数据中心复制到中央数据中心以进行聚合。然后,您可以为每个前端数据中心使用的主题创建单独的命名空间,并将聚合数据中心分配给这些命名空间。
故障转移
- 自动集群级故障转移:集群级自动故障转移支持 Pulsar 客户端根据用户设置的配置检测策略,在检测到故障转移事件时,自动无缝地从主集群切换到一个或多个备份集群。
- 受控集群级故障转移:受控集群级故障转移支持 Pulsar 客户端从主集群切换到一个或多个备份集群。切换由管理员手动设置。
一旦主集群再次运行,Pulsar 客户端就可以切换回主集群。大多数时候,用户甚至不会注意到任何事情。用户可以继续使用应用程序和服务,而不会中断或超时。
- 为什么使用集群级故障转移
集群级故障转移同时提供容错、持续可用性和高可用性。它带来了许多好处,包括但不限于:
- Reduced cost 降低成本:服务可以自动切换和恢复,而不会丢失数据。
- Simplified management 简化管理:企业可以在 “始终在线” 的基础上运营,因为不需要用户立即干预。
- Improved stability and robustness 更高的稳定性和稳健性:它可确保持续性能并最大限度地减少服务停机时间。
- 何时使用集群级故障转移
集群级故障转移以多种方式保护您的环境,包括但不限于:
-
Disaster recovery 灾难恢复:集群级故障转移可以自动无缝地将主集群上的生产工作负载转移到一个或多个备份集群,从而确保最大限度地减少数据丢失并缩短恢复时间。
-
Planned migration 计划迁移:如果您想将生产工作负载从旧集群迁移到新集群,可以通过集群级故障转移来提高迁移效率。例如,您可以测试在发生故障转移事件时数据迁移是否顺利,在迁移之前识别可能的问题和风险。
-
何时触发集群级故障转移
- 自动集群级故障转移:Network failure 网络故障,Internet 连接丢失。Power failure 电源故障,主集群的关闭时间超过时间限制。Service error 服务错误,主集群上发生错误(例如,由于时间限制,主集群无法运行)。Crashed storage space 存储空间崩溃,主集群没有足够的存储空间,但备份服务器上相应的存储空间运行正常。
- 为什么集群级故障转移会失败
显然,如果活动的 Pulsar 客户端无法访问备份集群,则集群级别的故障转移不会成功。发生这种情况的原因有很多,包括但不限于:
- Power failure 电源故障:备份集群已关闭或无法正常运行。
- Crashed storage space 存储空间崩溃:主集群和备份集群没有足够的存储空间。
- 如果启动了故障转移,但由于错误导致没有集群能够承担可用集群的角色,并且主集群无法正常提供服务。
- 如果您手动启动切换,但服务无法切换到备份集群服务器,则系统将尝试将服务切换回主集群。
- 无法在主集群和备份集群之间或两个备份集群之间进行身份验证或授权。
- 集群级故障转移有哪些限制
目前,集群级别的故障转移可以执行探测以防止数据丢失,但无法检查备份集群的状态。如果备份集群运行状况不佳,则无法生成或使用数据。
- 集群级故障转移的工作原理是什么
- 自动集群级故障转移:Pulsar 客户端以
checkInterval
中定义的间隔运行探测任务。如果探测任务发现主集群的故障时间超过failoverDelay
参数中设置的时间,则会在备份集群中搜索可用的运行状况良好的集群。 如果有健康的备份集群,Pulsar 客户端会按照secondary
中定义的顺序切换到备份集群。如果没有健康的备份集群,Pulsar 客户端不会执行切换,探测任务会继续寻找可用的备份集群。探测任务检查主集群是否运行良好。如果主集群恢复并且持续健康时间超过switchBackDelay
中设置的时间,则 Pulsar 客户端会切换回主集群。 如果主集群没有返回,则 Pulsar 客户端不会执行切换。 - 自控集群级故障转移:Pulsar 客户端以
checkInterval
中定义的间隔运行探测任务。探测任务从 URL 提供程序服务获取服务 URL 配置,该服务由urlProvider
配置。 如果服务 URL 配置发生更改,则探测任务将切换到目标集群,而不检查目标集群的运行状况。 如果服务 URL 配置没有改变,Pulsar 客户端不会进行切换。如果 Pulsar 客户端切换到目标集群,探测任务会继续按照checkInterval
中定义的时间间隔从 URL provider 服务获取服务 URL 配置。如果服务 URL 配置发生更改,则探测任务将切换到目标集群,而不检查目标集群的运行状况。 如果服务 URL 配置未更改,则不会执行切换。
身份验证和授权
Pulsar 支持可在 proxy 和/或 broker 上配置的可插拔身份验证机制。Pulsar 还支持可插拔授权机制。这些机制协同工作以识别客户端及其对主题、命名空间和租户的访问权限。
主题压缩
Pulsar 以高度可扩展的消息数据持久存储为主要目标。Pulsar 主题使你能够根据需要持久存储任意数量的未确认消息,同时保留消息顺序。默认情况下,Pulsar 会存储 topic 上产生的所有未确认/未处理的消息。对于许多 Pulsar 用例来说,在一个主题上积累许多未确认的消息是必要的,但对于 Pulsar 使用者来说,“倒带”整个消息日志也可能非常耗时。
对于某些使用案例,使用者不需要主题日志的完整 “image”。它们可能只需要几个值来构建日志的更“浅”图像,甚至可能只是最新的值。对于这类用例,Pulsar 提供了主题压缩。当你对一个 topic 运行 compaction 时,Pulsar 会遍历 topic 的 backlog 并删除被后面的消息掩盖的消息,即 topic compaction 在每个 key 的基础上遍历 topic,只留下与该 key 关联的最新消息。
- compaction feature功能
- Allows for faster “rewind” through topic logs 允许更快地“倒带”主题日志
- Applies only to persistent topics:仅适用于持久性主题
- 当积压达到一定大小时自动触发,或者可以通过命令行手动触发
- 在概念和操作上与保留和到期不同。但是,主题压缩确实会考虑保留。如果保留已从主题的消息积压中删除了消息,则该消息也将无法从压缩的主题账本中读取。
- Topic Compaction的工作原理
- Pulsar 将从头到尾迭代整个主题。对于它遇到的每个键,压缩例程将保留该键的最新出现记录。
- 之后,broker 将创建一个新的 BookKeeper 账本,并对该主题的每条消息进行第二次迭代。对于每条消息:如果密钥与该密钥的最新匹配项匹配,则密钥的数据负载、消息 ID 和元数据将写入新创建的账本。如果密钥与最新密钥不匹配,则将跳过该消息并保留该消息。如果任何给定的消息具有空有效负载,则将被跳过并被视为已删除(类似于键值数据库中的逻辑删除概念)。
- 在主题的第二次迭代结束时,新创建的 BookKeeper 账本被关闭,并且有两件事被写入主题的元数据中:BookKeeper 账本的 ID,最后压缩的消息的消息 ID(这称为主题的压缩范围)。写入此元数据后,压缩即完成。
- 在初始 compaction 操作之后,每当未来对 compaction horizon 和 compacted backlog 进行任何更改时,都会通知拥有该主题的 Pulsar broker。发生此类更改时:启用了 read compacted 的客户端(使用者和读取者)将尝试从主题中读取消息,并且:像往常一样从主题中读取(如果消息 ID 大于或等于压缩范围)或从压缩范围开始读取(如果消息 ID 低于压缩范围)。
消息调度限制
- 什么是消息调度限制
较大的消息负载可能会导致内存使用峰值,从而导致性能下降。Pulsar 采用限速机制进行消息分发,避免了流量激增,提高了消息送达率。您可以设置阈值来限制可传送到客户端的消息数量和条目的字节大小,从而在每单位时间的流量超过阈值时阻止后续传送。例如,当您将调度速率限制配置为每秒 10 条消息时,每秒可传送到客户端的消息数最多为 10。
- 为什么要使用消息调度限制
消息调度限制详细带来了以下好处:
- 将 broker 的读取请求负载限制为 BookKeeper
- 在主题/订阅级别平衡 broker 的硬件资源分配
- 在主题/订阅级别限制客户端硬件资源的分配
当有大量积压的消息需要消费时,客户端可能会在短时间内收到大量数据,从而垄断了客户端的计算资源。由于客户端没有主动限制消费速率的机制,因此使用消息调度限制功能也可以调节客户端硬件资源的分配。
- 消息调度限制的工作原理是什么
- 代理通过计算剩余配额来估计要从 bookies 读取的条目数。
- 代理从 bookie 读取消息。
- 代理将消息分派给客户端并更新计数器以减少配额。计划任务在限制期结束时刷新配额。