首页 > 编程语言 >C# Kafka重置到最新的偏移量,即从指定的Partition订阅消息使用Assign方法

C# Kafka重置到最新的偏移量,即从指定的Partition订阅消息使用Assign方法

时间:2023-04-11 16:46:08浏览次数:40  
标签:消费 消费者 C# Partition 偏移量 Kafka 消息 Offset

在使用Kafka的过程中,消费者断掉之后,再次开始消费时,消费者会从断掉时的位置重新开始消费。

场景再现:比如昨天消费者晚上断掉了,今天上午我们会发现kafka消费的数据不是最新的,而是昨天晚上的数据,由于数据量比较多,也不会及时的消费到今天上午的数据,这个时候就需要我们对偏移量进行重置为最新的,以获取最新的数据。

前提,我们使用的AutoOffsetReset配置是Latest,即从连接到Kafka那一刻开始消费之后产生的消息,之前发布的消息不在消费,这也是默认的配置。

关于AutoOffsetReset这个枚举的配置项如下:

    • latest (default) which means consumers will read messages from the tail of the partition
      最新(默认) ,这意味着使用者将从分区的尾部读取消息,只消费最新的信息,即自从消费者上线后才开始推送来的消息。那么会导致忽略掉之前没有处理的消息。
    • earliest which means reading from the oldest offset in the partition
      这意味着从分区中最早的偏移量读取;自动从消费者上次开始消费的位置开始,进行消费。
    • none throw exception to the consumer if no previous offset is found for the consumer's group
      如果没有为使用者的组找到以前的偏移量,则不会向使用者抛出异常。

接下来,我们直接使用下面这一段代码即可:

使用Assign订阅指定的分区,注意最后还需要使用Subscribe方法订阅

consumer.Assign(new TopicPartitionOffset(new TopicPartition(topic, new Partition(1)),Offset.End));//从指定的Partition订阅消息使用Assign方法

consumer.Subscribe(topic);//订阅消息使用Subscribe方法

从指定的分区获取数据,并且指定了对应的偏移量

 关于Offset这个枚举不同配置项的说明如下:

Offset 可以被设置为 Beginning、End、Stored 和 Unset。这些值的含义如下:

  1. Beginning:从 Kafka 分区的最早消息(Offset 为 0)开始消费。如果分区中有新消息产生,消费者会继续消费这些消息。

  2. End:从 Kafka 分区的最新消息开始消费。如果消费者在启动后到达了 Kafka 分区的末尾,它将停止消费,并等待新消息的到来。

  3. Stored:从消费者存储的 Offset 开始消费。这个 Offset 通常是消费者在上次停止消费时存储的 Offset。如果存储的 Offset 失效或者已过期,消费者会从最新的消息(End)开始消费。

  4. Unset:在消费者启动时,Offset 没有被设置。在这种情况下,消费者将根据 auto.offset.reset 配置项的值来决定从哪里开始消费。如果 auto.offset.reset 的值为 latest,则从最新的消息开始消费;如果 auto.offset.reset 的值为 earliest,则从最早的消息开始消费。

需要注意的是,如果设置了 Stored 的 Offset,但是在 Kafka 中找不到对应的消息,消费者将会从最新的消息(End)开始消费。

因此,存储的 Offset 必须要有效才能够被正确地使用

标签:消费,消费者,C#,Partition,偏移量,Kafka,消息,Offset
From: https://www.cnblogs.com/goodboydcc/p/17306646.html

相关文章

  • SATA 之 DMA Setup Auto-Activate
     1. 原文在《SATA3.2协议》中的13.3.3有介绍,如下:13.3.3Enable/disableDMASetupFISauto-activateoptimizationACount(7:0)valueof02hisusedbythehosttoenableordisabletheDMASetupFISoptimizationforautomaticallyactivatingtransferofthefirs......
  • Spring-Security
    SecurityConfig@Configuration@EnableWebSecuritypublicclassSecurityConfigimplementsWebMvcConfigurer{@BeanpublicSecurityFilterChainfilterChain(HttpSecurityhttpSecurity)throwsException{//httpSecurity.authorizeHttpRe......
  • C# 控制台应用windows修改host文件
    配置文件修改App.config部分主要是IP与地址<?xmlversion="1.0"encoding="utf-8"?><configuration><startup><supportedRuntimeversion="v4.0"sku=".NETFramework,Version=v4.7.2"/></star......
  • cmake学习
    -DCMAKE_BUILD_TYPE=Debug和Release有什么区别-DCMAKE_BUILD_TYPE是用于指定CMake构建类型的CMake变量。它有两个常见的值,即"Debug"和"Release",它们分别用于在构建C++/C项目时指定不同的构建类型。区别如下:Debug构建类型:Debug构建类型用于在开发阶段进行调试和测......
  • ThreadLocal原理探究
    四大引用是什么,分别有什么特点:1强引用、软引用、弱引用、虚引用强引用:发生gc的时候,只要对象还有引用,就不会被回收软引用:发生gc的时候,内存够用就不会回收,内存不够时,就会回收。可以及时的避免oom。Map<String,SoftReference<BitMap>>imageCache=newHashMap<Str......
  • 使用navigator.geolocation解决h5公众号定位不准确的问题
    封装js(utils/geolocation.min.js):window.qq=window.qq||{},qq.maps=qq.maps||{},window.soso||(window.soso=qq),soso.maps||(soso.maps=qq.maps),qq.maps.Geolocation=function(){"usestrict";vare=[],t=null,o=0,n="_geoIframe_"+Math.ceil(1e7*Mat......
  • docker基础
    docker介绍什么是虚拟化?在计算机中,虚拟化(英语:Virtualization)是一种资源管理技术,是将计算机的各种实体资源,如服务器、网络、内存及存储等,予以抽象、转换后呈现出来,打破实体结构间的不可切割的障碍,使用户可以比原本的组态更好的方式来应用这些资源。这些资源的新虚拟部份是不受现......
  • EasyCVR平台基于GB28181协议的语音对讲配置操作教程
    EasyCVR基于云边端协同,具有强大的数据接入、处理及分发能力,平台可支持海量视频的轻量化接入与汇聚管理,可提供视频监控直播、视频轮播、视频录像、云存储、回放与检索、智能告警、服务器集群、语音对讲、云台控制、电子地图、平台级联等功能。其中,语音对讲功能在视频监控场景中具有......
  • EasyCVR平台如何正确配置设备移动侦测告警信息的上传?
    EasyCVR视频融合平台基于云边端协同架构,支持海量视频汇聚管理,平台融合性强、拓展灵活、视频能力丰富,具体包括:视频监控直播、轮播、录像、视频转码、云存储、检索与回看、告警上报、电子地图、云台控制、语音对讲、集群、级联共享等。用户在现场部署了EasyCVR,需要将设备的移动侦测告......
  • ABC216G
    将区间按照右端点排序,贪心的往最右边填\(1\),不难发现这样一定是正确的。感性理解一下就是越往右的位置对于后面的区间贡献越大。而且每个点最多只会被放置一个\(1\),所以我们可以暴力的找到下一个可以填的位置,并填入\(1\),可以使用线段树维护,复杂度是\(\mathcal{O}(n\logn)\)......