首页 > 其他分享 >kafka

kafka

时间:2023-07-25 16:00:14浏览次数:28  
标签:OnKafkaEventHandler KafkaLog string Kafka consumerResult kafka Consumer

 1         /// <summary>
 2         /// 指定的组别的消费者开始消费指定主题的消息
 3         /// </summary>
 4         /// <param name="broker">Kafka消息服务器的地址</param>
 5         /// <param name="topic">Kafka消息所属的主题</param>
 6         /// <param name="groupID">Kafka消费者所属的组别</param> 
 7         public void Consume(string broker, string topic, string groupID)
 8         {
 9             Task.Run(() =>
10             {
11                 try
12                 {
13                     if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
14                     {
15                         throw new ArgumentException("Kafka消息服务器的地址不能为空!");
16                     }
17                     if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
18                     {
19                         throw new ArgumentNullException("Kafka消息所属的主题不能为空!");
20                     }
21                     if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
22                     {
23                         throw new ArgumentException("用户分组ID不能为空!");
24                     }
25                     var config = new ConsumerConfig
26                     {
27                         GroupId = groupID,
28                         BootstrapServers = broker,
29                         SecurityProtocol = SecurityProtocol.SaslPlaintext,
30                         SaslMechanism = SaslMechanism.Plain,
31                         StatisticsIntervalMs = 5000,
32                         EnableAutoCommit = false,      //手动提交 
33                         //EnableAutoCommit = true,     //自动提交
34                         //AutoCommitIntervalMs = 5000,   //自动提交偏移量的间隔时间,默认值为 5,000 毫秒(即5秒)
35                         HeartbeatIntervalMs = 3000,    //消费者发送心跳的间隔时间,默认值为 3,000 毫秒(即3秒)
36                         SessionTimeoutMs = 10000,      //消费者与 Kafka broker 之间的会话超时时间,默认值为 10,000 毫秒(即10秒)
37                         MaxPollIntervalMs = 300000,    //两次 poll 调用之间的最大时间间隔,默认值为 300,000 毫秒(即5分钟)
38                         AutoOffsetReset = AutoOffsetReset.Latest,     //最新消息
39                         //AutoOffsetReset = AutoOffsetReset.Earliest,
40                         EnablePartitionEof = true,
41                         PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
42                         SaslUsername = "*****",                //SASL账户 
43                         SaslPassword = "***************",  //SASL密码 
44                     };
45 
46                     using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
47                     {
48                         consumer.Subscribe(topic);  //订阅主题 
49                         while (IsCancelled)         //循环消费
50                         {
51                             WriteLog("KafkaLog", "Kafka Consumer Starting...");
52                             OnKafkaEventHandler("Kafka Consumer Starting...");
53                             var consumerResult = consumer.Consume();         //消费Kafka  
54                             //var consumerResult = consumer.Consume(60000);  //消费Kafka  没有消息时设置超时时间
55 
56                             if (consumerResult.Message != null)
57                             {
58                                 var key = consumerResult.Message.Key;
59                                 var message = consumerResult.Message.Value;
60 
61                                 //显示Kafka数据
62                                 //OnKafkaEventHandler(message); 
63 
64                                 //记录Log 
65                                 WriteLog("KafkaLog", "Kafka Consumer Succeed");
66                                 OnKafkaEventHandler("Kafka Consumer Succeed");
67 
68                                 //插入数据库
69                                 InsertOracleDataBase(message);
70 
71                                 //手动提交
72                                 consumer.Commit(consumerResult);
73                                 WriteLog("KafkaLog", "Kafka Commit Succeed");
74                                 OnKafkaEventHandler("Kafka Commit Succeed");
75                             }
76                             else
77                             {
78                                 WriteLog("KafkaLog", "Kafka Consumer NoData");
79                                 OnKafkaEventHandler("Kafka Consumer NoData");
80                             }
81                         }
82                     }
83                 }
84                 catch (Exception ex)
85                 {
86                     IsCancelled = false;
87                     WriteLog("KafkaLog", ex.Message);
88                     OnKafkaEventHandler(ex.Message);
89                 }
90             });
91         }

 

标签:OnKafkaEventHandler,KafkaLog,string,Kafka,consumerResult,kafka,Consumer
From: https://www.cnblogs.com/fl-1998/p/17580083.html

相关文章

  • Kafka
    目录Kafka组成Kafka的配置简单的Kafka生产和消费者生产者消费者Kafka组成基础组成生产者会将信息推送给topic并由topic决定要将该消息发送给哪一个分区轮询key值分区:生产者根据key的值来决定将信息发送给哪个分区消费者组组中的消费者不会接收到全量信息但是全量......
  • 【项目实战】Kafka 的 Leader 选举和负载均衡
    ......
  • 【项目实战】Kafka 重平衡 Consumer Group Rebalance 机制
    ......
  • Kafka核心API -- Connect
    Connect基本概念KafkaConnect是Kafka流式计算的一部分KafkaConnect主要用来与其他中间件建立流式通道KafkaConnect支持流式和批量处理集成 环境准备创建两个表createtableusers_bak(`uuid`intprimarykeyauto_increment,`name`VARCHAR(20),`ag......
  • Kafka客户端操作
    五类API Kafka客户端API类型AdminClientAPI:允许管理和检测Topic、broker以及其它Kafka对象(类似于命令行的createtopic)ProducerAPI:发送消息到1个或多个TopicConsumerAPI:订阅一个或多个Topic,并处理产生的消息StreamsAPI:高效地将输入流转换到输出流ConnectorAPI:从一......
  • debezium同步postgresql数据至kafka
    0实验环境全部部署于本地虚拟机debeziumdocker部署postgresql、kafka本机部署1postgresql1.1配置设置postgres密码为123仿照example,创建databasepostgres,schemeinventory,tablecustomers因为postgres用户有replication权限,所以可以直接使用修改postgresql.conf文......
  • kafka基础操作
    什么是kafkakafka本身并不是消息队列,而是一份分布式流平台(高并发,低延迟。高吞吐量)。kafka是基于zookeeper的分布式消息系统。kafka具有高吞吐率、高性能、实时及高可靠等特点。kafka基本概念Topic:一个虚拟的概念,由一个到多个Partitions组成Partition:实际消息存储单位P......
  • Kafka - kafka为啥这么快?(基于磁盘存储的,为何还能拥有高性能)
    总结1.顺序读写磁盘读写有两种方式:顺序读写或者随机读写。Kafka是磁盘顺序读写,利用了一种分段式的、只追加(Append-Only)的日志,基本上把自身的读写操作限制为顺序I/O,磁盘的顺序读写速度和内存持平(见图1.1)。kafkatopic的每一个Partition其实都是一个文件,收到消息后Kaf......
  • MQTT 与 Kafka|物联网消息与流数据集成实践
    MQTT如何与Kafka一起使用?MQTT(MessageQueuingTelemetryTransport)是一种轻量级的消息传输协议,专为受限网络环境下的设备通信而设计。ApacheKafka是一个分布式流处理平台,旨在处理大规模的实时数据流。Kafka和MQTT是实现物联网数据端到端集成的互补技术。通过结合使用......
  • 【项目实战】Kafka 生产者幂等性和事务
    ......