1 /// <summary> 2 /// 指定的组别的消费者开始消费指定主题的消息 3 /// </summary> 4 /// <param name="broker">Kafka消息服务器的地址</param> 5 /// <param name="topic">Kafka消息所属的主题</param> 6 /// <param name="groupID">Kafka消费者所属的组别</param> 7 public void Consume(string broker, string topic, string groupID) 8 { 9 Task.Run(() => 10 { 11 try 12 { 13 if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0) 14 { 15 throw new ArgumentException("Kafka消息服务器的地址不能为空!"); 16 } 17 if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0) 18 { 19 throw new ArgumentNullException("Kafka消息所属的主题不能为空!"); 20 } 21 if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0) 22 { 23 throw new ArgumentException("用户分组ID不能为空!"); 24 } 25 var config = new ConsumerConfig 26 { 27 GroupId = groupID, 28 BootstrapServers = broker, 29 SecurityProtocol = SecurityProtocol.SaslPlaintext, 30 SaslMechanism = SaslMechanism.Plain, 31 StatisticsIntervalMs = 5000, 32 EnableAutoCommit = false, //手动提交 33 //EnableAutoCommit = true, //自动提交 34 //AutoCommitIntervalMs = 5000, //自动提交偏移量的间隔时间,默认值为 5,000 毫秒(即5秒) 35 HeartbeatIntervalMs = 3000, //消费者发送心跳的间隔时间,默认值为 3,000 毫秒(即3秒) 36 SessionTimeoutMs = 10000, //消费者与 Kafka broker 之间的会话超时时间,默认值为 10,000 毫秒(即10秒) 37 MaxPollIntervalMs = 300000, //两次 poll 调用之间的最大时间间隔,默认值为 300,000 毫秒(即5分钟) 38 AutoOffsetReset = AutoOffsetReset.Latest, //最新消息 39 //AutoOffsetReset = AutoOffsetReset.Earliest, 40 EnablePartitionEof = true, 41 PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky, 42 SaslUsername = "*****", //SASL账户 43 SaslPassword = "***************", //SASL密码 44 }; 45 46 using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()) 47 { 48 consumer.Subscribe(topic); //订阅主题 49 while (IsCancelled) //循环消费 50 { 51 WriteLog("KafkaLog", "Kafka Consumer Starting..."); 52 OnKafkaEventHandler("Kafka Consumer Starting..."); 53 var consumerResult = consumer.Consume(); //消费Kafka 54 //var consumerResult = consumer.Consume(60000); //消费Kafka 没有消息时设置超时时间 55 56 if (consumerResult.Message != null) 57 { 58 var key = consumerResult.Message.Key; 59 var message = consumerResult.Message.Value; 60 61 //显示Kafka数据 62 //OnKafkaEventHandler(message); 63 64 //记录Log 65 WriteLog("KafkaLog", "Kafka Consumer Succeed"); 66 OnKafkaEventHandler("Kafka Consumer Succeed"); 67 68 //插入数据库 69 InsertOracleDataBase(message); 70 71 //手动提交 72 consumer.Commit(consumerResult); 73 WriteLog("KafkaLog", "Kafka Commit Succeed"); 74 OnKafkaEventHandler("Kafka Commit Succeed"); 75 } 76 else 77 { 78 WriteLog("KafkaLog", "Kafka Consumer NoData"); 79 OnKafkaEventHandler("Kafka Consumer NoData"); 80 } 81 } 82 } 83 } 84 catch (Exception ex) 85 { 86 IsCancelled = false; 87 WriteLog("KafkaLog", ex.Message); 88 OnKafkaEventHandler(ex.Message); 89 } 90 }); 91 }
标签:OnKafkaEventHandler,KafkaLog,string,Kafka,consumerResult,kafka,Consumer From: https://www.cnblogs.com/fl-1998/p/17580083.html