首页 > 其他分享 >.net core 下使用 Kafka 生产者批量发送给消息处理,使用事务(四)

.net core 下使用 Kafka 生产者批量发送给消息处理,使用事务(四)

时间:2023-01-30 21:34:41浏览次数:47  
标签:core producer 偏移量 Kafka result offset var new net

生产者批量发送消息,使用事务,要么全部失败要么全部成功

重要 说明 事物id必须要设置

producerConfig.TransactionalId =Guid.NewGuid().ToString();//必须设置事物id

 

 1   /// <summary>
 2         /// 批量发送
 3         /// </summary>
 4         /// <param name="msglist"></param>
 5 
 6         public void BatchSends(List<string> msglist)
 7         {
 8             ProducerConfig producerConfig = this.CreateProducerConnection();
 9             producerConfig.MessageTimeoutMs = 5000;//失败重试时间
10             producerConfig.EnableIdempotence = true;///幂等性:如果生产者发送失败不重复发消息失败重试
11             producerConfig.TransactionalId =Guid.NewGuid().ToString();//必须设置事物id
12             var builder = new ProducerBuilder<string, string>(producerConfig);
13             builder.SetDefaultPartitioner(RoundRobinPartitioner);//委托:调用负载均衡方法
14             using (var producer = builder.Build())
15             {
16                 // 1、初始化事务
17                 producer.InitTransactions(TimeSpan.FromSeconds(60));
18                 try
19                 {
20                     // 2、开发事务
21                     producer.BeginTransaction();
22                     foreach (var item in msglist)
23                     {
24                         //这里就是测试
25                        // var OrderJson = JsonConvert.SerializeObject(item);
26                        // var dr = producer.ProduceAsync("order-create", new Message<string, string> { Key = "order", Value = OrderJson }).GetAwaiter().GetResult();
27                         var dr = producer.ProduceAsync(KafkaTopic.Topic, new Message<string, string> { Key = "order", Value = item }).GetAwaiter().GetResult();
28                         _logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
29                     }
30                     // 3、提交事务
31                     producer.CommitTransaction();
32                 }
33                 catch (ProduceException<string, string> ex)
34                 {
35                     _logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", "order", ex.Error.Reason);
36                     // 4、关闭事务
37                     producer.AbortTransaction();
38                 }
39             }
40         }

 

webapi 项目调用

 1         /// <summary>
 2         /// 批量发送消息
 3         /// </summary>
 4         /// <param name="msglist"></param>
 5         /// <returns></returns>
 6         [HttpPost("BatchSend")]
 7         public string BatchSend(List<string> msglist) {
 8 
 9             this._kafKaSevice.BatchSends(msglist);
10             return "";
11         }    

消息消费还是用上一篇的代码就行不需要更改 还是粘出来吧

 1   /// <summary>
 2         /// 消费消息 redis 存储 offset 偏移量
 3         /// </summary>
 4         public void Reveice()
 5         {
 6             ConsumerConfig consumerConfig = this.CreateConsumerConnection();
 7             //AutoOffsetReset.Latest 这里是kafka 停止重启 从最新产生的数据开始消费
 8             //AutoOffsetReset.Earliest 这里是kafka 停止重启 从第0个索引开始消费
 9             //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
10             //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
11             consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
12             consumerConfig.GroupId = "order";//分组为order组
13             //consumerConfig.EnableAutoCommit = true;
14             //EnableAutoCommit = true//自动确认 如果设置false 会被重复消费
15             //缺陷 当业务逻辑 失败时候 可以自动确认 导致 消息丢失
16             //所以 要手动确认 但是如果手动确认时候 kafka服务宕机 重启时候会导致重复消费
17             //此时就需要 偏移量+1 重置 确保偏移量 是最新的
18             var builder = new ConsumerBuilder<string, string>(consumerConfig);
19             string redisValue = this._distributedCache.GetString(KafkaTopic.Topic);
20             int offset = 0;
21             if (!string.IsNullOrEmpty(redisValue))
22             {
23                 offset = int.Parse(redisValue);
24             }
25             this._distributedCache.SetString(KafkaTopic.Topic, (offset + 1).ToString());
26             using (var consumer = builder.Build())
27             {
28                 //offset重置
29                 //consumer.Assign(new TopicPartition(KafkaTopic.Topic, new Partition(1)));
30                 consumer.Assign(new TopicPartitionOffset(KafkaTopic.Topic, new Partition(0), offset + 1));
31                 while (true)
32                 {
33                     // 订阅 生产者的主题
34                     consumer.Subscribe(KafkaTopic.Topic);
35                     // 消息 获取
36                     var result = consumer.Consume();
37                     // 获取偏移量
38                     _logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");
39                     // 把kafka队列中偏移量存起来。redis mysql
40                     // redis存储 kafka队列的偏移量
41                     _distributedCache.SetString("create-order", result.Offset.Value.ToString());
42                     string key = result.Key;
43                     string value = result.Value;
44                     //使用了redis 缓存 offset 重置 就不需要手动确认了
45                     //consumer.Commit(result);
46                     //consumer.StoreOffset(result);
47                     this._logger.LogError($" 收到消息了:{value} / offset:{result.Offset}/Partition: {result.Partition} ");
48                 }
49             }
50         }

 

测试

 

标签:core,producer,偏移量,Kafka,result,offset,var,new,net
From: https://www.cnblogs.com/yyxone/p/17077292.html

相关文章

  • API对象--Service(chrono《kubernetes入门实战课》笔记整理)
     【概念解说】当pod被实例化出来,如果运行一段时间会销毁,虽然deployment和ds会注意管理和维护pod的数目,但是pod销毁后再重建,ip会发生变化,这对于服务来说,是很麻烦的。所......
  • 基于.net core的Azure function 如何使用.net framework所支持的编码
    在azurefunction中通过http请求call第三方api时,response返回是一堆中文乱码,发现数据格式使用的是"gb2312"编码因此在StreamReader的时候,增加了“gb2312”的encoding,代码......
  • Quartz.Net 官方教程 Tutorial 3/3
    Schedule相关属性设置扩展属性方式varhost=Host.CreateDefaultBuilder().ConfigureServices(services=>{services.AddQuartz(opt=>......
  • ASP.NET Core RESTful学习理解
    一、了解什么是RESTREST是“REpresentationalStateTransfer”的缩写,表述性状态传递;REST是一种软件架构风格,用于构造简单、可靠、高性能的WEB应用程序;REST中,资源(Resou......
  • EFCore build failure
    今日学习源代码,里面按照业务划分了6个微服务,挨个执行add-migrationinit时提示buildfailure,无其他任何提示。Ctrl+Shift+B生成解决方案后显示出是另外一个类库的问题,......
  • .NET 批量替换关键字
    最近nfx462的项目升级.NET6需要批量替换Ilogger为Ilogger<类名>vs自带的搜索替换其正则表达式好像只能匹配一行,直接扫描文件替换吧//Seehttps://aka.ms/new-console-te......
  • .NET7后端框架:读取配置文件
    前言在项目开发过程中,不可避免的会设置一些全局的可变的参数,如连接字符串、功能开关、Swagger配置、Redis配置等等。.NETCore将这些配置参数统一放在appsettings.json......
  • How to join testnet
    HowtojointestnetThesestepsassumeyouhaveChia installed.Step1.Run chiainitStep2.Ifyoudonotalreadyhavekeysgeneratedthenrun chiakeysg......
  • Theory-guided physics-informed neural networks for boundary layer problems with
    JCP2023  这篇文章聚焦了PINN在处理奇异摄动问题时所面临的困难。(用不同的分支网络去表示内部区域和外部区域中边界层问题的不同阶数的近似)。但本文所提出的方法计算......
  • Net6/SuperSocket2.0课程1,一个Telnet示例
    十年河东,十年河西,莫欺少年穷学无止境,精益求精1、新建控制台程序并引入包dotnetaddpackageSuperSocket.Server 2、书写代码usingSystem;usingSystem.Text;......