一、本文是C#引用Confluent.Kafka.dll实现kafka消息队列的实际开发例子。在实际开发中遇到9094端口始终消息生产和消费超时的问题,需要对网络白名单进行配置或者直接使用9092端口。大部分Time Out情况与代码无关,跟kafka的安装配置有关。
二、、Nuget中添加引用Confluent.Kafka
三、消费
public class KafkaConsumer { public static string brokerUrl = ConfigurationManager.AppSettings["Broker"]; public static string topic = ConfigurationManager.AppSettings["ConsumeTopic"]; public static string groupid = ConfigurationManager.AppSettings["GroupID"]; public static string consumercount = ConfigurationManager.AppSettings["ConsumerCount"]; public static void Consume() { var mode = "consume"; var brokerList = brokerUrl; List<string> topics = new List<string>(topic.Split(',')); try { CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; switch (mode) { case "consume": Run_Consume(brokerList, topics, cts.Token); break; case "manual": Run_ManualAssign(brokerList, topics, cts.Token); break; default: PrintUsage(); break; } } catch (Exception ex) { LogHelper.WriteProgramLog(DateTime.Now.ToString() + ex.Message); } } /// <summary> /// In this example /// - offsets are manually committed. /// - no extra thread is created for the Poll (Consume) loop. /// </summary> public static void Run_Consume(string brokerList, List<string> topics, CancellationToken cancellationToken) { var config = new ConsumerConfig { BootstrapServers = brokerList, GroupId = groupid, EnableAutoCommit = false, StatisticsIntervalMs = 5000, SessionTimeoutMs = 6000, AutoOffsetReset = AutoOffsetResetType.Earliest }; const int commitPeriod = 5; using (var consumer = new Consumer<Ignore, string>(config)) { consumer.Subscribe(topics); while (!cancellationToken.IsCancellationRequested) { try { var consumeResult = consumer.Consume(cancellationToken); if (consumeResult.Offset % commitPeriod == 0) { // The Commit method sends a "commit offsets" request to the Kafka // cluster and synchronously waits for the response. This is very // slow compared to the rate at which the consumer is capable of // consuming messages. A high performance application will typically // commit offsets relatively infrequently and be designed handle // duplicate messages in the event of failure. var committedOffsets = consumer.Commit(consumeResult); Console.WriteLine(string.Format("Committed offset: {0}", committedOffsets)); } } catch (ConsumeException e) { Console.WriteLine(string.Format("Consume error: {0}", e.Error)); } } consumer.Close(); } } /// <summary> /// In this example /// - consumer group functionality (i.e. .Subscribe + offset commits) is not used. /// - the consumer is manually assigned to a partition and always starts consumption /// from a specific offset (0). /// </summary> public static void Run_ManualAssign(string brokerList, List<string> topics, CancellationToken cancellationToken) { var config = new ConsumerConfig { // the group.id property must be specified when creating a consumer, even // if you do not intend to use any consumer group functionality. GroupId = new Guid().ToString(), BootstrapServers = brokerList, // partition offsets can be committed to a group even by consumers not // subscribed to the group. in this example, auto commit is disabled // to prevent this from occuring. EnableAutoCommit = true }; using (var consumer = new Consumer<Ignore, string>(config)) { consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 0, Offset.Beginning)).ToList()); consumer.OnError += (_, e) => Console.WriteLine(string.Format("Error: {0}", e.Reason)); consumer.OnPartitionEOF += (_, topicPartitionOffset) => Console.WriteLine(string.Format("End of partition: {0}", topicPartitionOffset)); while (!cancellationToken.IsCancellationRequested) { try { var consumeResult = consumer.Consume(cancellationToken); Console.WriteLine(string.Format("Received message at {0}:${1}", consumeResult.TopicPartitionOffset, consumeResult.Message)); } catch (ConsumeException e) { Console.WriteLine(string.Format("Consume error: {0}", e.Error)); } } consumer.Close(); } } private static void PrintUsage() { Console.WriteLine("Usage: .. <poll|consume|manual> <broker,broker,..> <topic> [topic..]"); } }
四、生产
/// <summary> /// Kafka消息生产者 /// </summary> public class KafkaProducer { public static string brokerUrl = ConfigurationManager.AppSettings["Broker"]; public static string topic = ConfigurationManager.AppSettings["ProduceTopic"]; public static string groupid = ConfigurationManager.AppSettings["GroupID"]; private static readonly object Locker = new object(); private static Producer<string, string> _producer; /// <summary> /// 单例生产 /// </summary> public KafkaProducer() { if (_producer == null) { lock (Locker) { if (_producer == null) { var config = new ProducerConfig { BootstrapServers = brokerUrl }; _producer = new Producer<string, string>(config); } } } } /// <summary> /// 生产消息并发送消息 /// </summary> /// <param name="key">key</param> /// <param name="message">需要传送的消息</param> public static void Produce(string key, string message) { bool result = false; new KafkaProducer(); if (string.IsNullOrEmpty(message) || string.IsNullOrWhiteSpace(message) || message.Length <= 0) { throw new ArgumentNullException("消息内容不能为空!"); } var deliveryReport = _producer.ProduceAsync(topic, new Message<string, string> { Key = key, Value = message }); deliveryReport.ContinueWith(task => { Console.WriteLine("Producer:" + _producer.Name + "\r\nTopic:" + topic + "\r\nPartition:" + task.Result.Partition + "\r\nOffset:" + task.Result.Offset + "\r\nMessage:" + task.Result.Value + "\r\nResult:" + result); }); _producer.Flush(TimeSpan.FromSeconds(10)); } }
标签:string,C#,Confluent,kafka,static,var,new,consumer,public From: https://www.cnblogs.com/wl-blog/p/17260546.html