首页 > 编程语言 >C#实现kafka消息队列-Confluent.Kafka

C#实现kafka消息队列-Confluent.Kafka

时间:2023-03-27 10:15:45浏览次数:37  
标签:string C# Confluent kafka static var new consumer public

一、本文是C#引用Confluent.Kafka.dll实现kafka消息队列的实际开发例子。在实际开发中遇到9094端口始终消息生产和消费超时的问题,需要对网络白名单进行配置或者直接使用9092端口。大部分Time Out情况与代码无关,跟kafka的安装配置有关。

二、、Nuget中添加引用Confluent.Kafka

 

 

三、消费

public class KafkaConsumer
    {
        public static string brokerUrl = ConfigurationManager.AppSettings["Broker"];
        public static string topic = ConfigurationManager.AppSettings["ConsumeTopic"];
        public static string groupid = ConfigurationManager.AppSettings["GroupID"];
        public static string consumercount = ConfigurationManager.AppSettings["ConsumerCount"];
        public static void Consume()
        {
            var mode = "consume";
            var brokerList = brokerUrl;
            List<string> topics = new List<string>(topic.Split(','));
 
            try
            {
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) =>
                {
                    e.Cancel = true; // prevent the process from terminating.
                    cts.Cancel();
                };
 
                switch (mode)
                {
                    case "consume":
                        Run_Consume(brokerList, topics, cts.Token);
                        break;
                    case "manual":
                        Run_ManualAssign(brokerList, topics, cts.Token);
                        break;
                    default:
                        PrintUsage();
                        break;
                }
            }
            catch (Exception ex)
            {
                LogHelper.WriteProgramLog(DateTime.Now.ToString() + ex.Message);
            }
 
        }
        /// <summary>
        ///     In this example
        ///         - offsets are manually committed.
        ///         - no extra thread is created for the Poll (Consume) loop.
        /// </summary>
        public static void Run_Consume(string brokerList, List<string> topics, CancellationToken cancellationToken)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = brokerList,
                GroupId = groupid,
                EnableAutoCommit = false,
                StatisticsIntervalMs = 5000,
                SessionTimeoutMs = 6000,
                AutoOffsetReset = AutoOffsetResetType.Earliest
            };
 
            const int commitPeriod = 5;
 
            using (var consumer = new Consumer<Ignore, string>(config))
            {
                consumer.Subscribe(topics);
 
                while (!cancellationToken.IsCancellationRequested)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(cancellationToken);
                        if (consumeResult.Offset % commitPeriod == 0)
                        {
                            // The Commit method sends a "commit offsets" request to the Kafka
                            // cluster and synchronously waits for the response. This is very
                            // slow compared to the rate at which the consumer is capable of
                            // consuming messages. A high performance application will typically
                            // commit offsets relatively infrequently and be designed handle
                            // duplicate messages in the event of failure.
                            var committedOffsets = consumer.Commit(consumeResult);
                            Console.WriteLine(string.Format("Committed offset: {0}", committedOffsets));
                        }
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine(string.Format("Consume error: {0}", e.Error));
                    }
                }
 
                consumer.Close();
            }
        }
        /// <summary>
        ///     In this example
        ///         - consumer group functionality (i.e. .Subscribe + offset commits) is not used.
        ///         - the consumer is manually assigned to a partition and always starts consumption
        ///           from a specific offset (0).
        /// </summary>
        public static void Run_ManualAssign(string brokerList, List<string> topics, CancellationToken cancellationToken)
        {
            var config = new ConsumerConfig
            {
                // the group.id property must be specified when creating a consumer, even 
                // if you do not intend to use any consumer group functionality.
                GroupId = new Guid().ToString(),
                BootstrapServers = brokerList,
                // partition offsets can be committed to a group even by consumers not
                // subscribed to the group. in this example, auto commit is disabled
                // to prevent this from occuring.
                EnableAutoCommit = true
            };
 
            using (var consumer = new Consumer<Ignore, string>(config))
            {
                consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 0, Offset.Beginning)).ToList());
 
                consumer.OnError += (_, e)
                    => Console.WriteLine(string.Format("Error: {0}", e.Reason));
 
                consumer.OnPartitionEOF += (_, topicPartitionOffset)
                    => Console.WriteLine(string.Format("End of partition: {0}", topicPartitionOffset));
 
                while (!cancellationToken.IsCancellationRequested)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(cancellationToken);
                        Console.WriteLine(string.Format("Received message at {0}:${1}", consumeResult.TopicPartitionOffset, consumeResult.Message));
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine(string.Format("Consume error: {0}", e.Error));
                    }
                }
 
                consumer.Close();
            }
        }
        private static void PrintUsage()
        {
            Console.WriteLine("Usage: .. <poll|consume|manual> <broker,broker,..> <topic> [topic..]");
        }
    }

四、生产

/// <summary>
    /// Kafka消息生产者
    /// </summary>
    public class KafkaProducer
    {
        public static string brokerUrl = ConfigurationManager.AppSettings["Broker"];
        public static string topic = ConfigurationManager.AppSettings["ProduceTopic"];
        public static string groupid = ConfigurationManager.AppSettings["GroupID"];
        private static readonly object Locker = new object();
        private static Producer<string, string> _producer;
        /// <summary>
        /// 单例生产
        /// </summary>
        public KafkaProducer()
        {
            if (_producer == null)
            {
                lock (Locker)
                {
                    if (_producer == null)
                    {
                        var config = new ProducerConfig
                        {
                            BootstrapServers = brokerUrl
                        };
                        _producer = new Producer<string, string>(config);
                    }
                }
            }
        }
 
        /// <summary>
        /// 生产消息并发送消息
        /// </summary>
        /// <param name="key">key</param>
        /// <param name="message">需要传送的消息</param>
        public static void Produce(string key, string message)
        {
            bool result = false;
            new KafkaProducer();
            if (string.IsNullOrEmpty(message) || string.IsNullOrWhiteSpace(message) || message.Length <= 0)
            {
                throw new ArgumentNullException("消息内容不能为空!");
            }
            var deliveryReport = _producer.ProduceAsync(topic, new Message<string, string> { Key = key, Value = message });
            deliveryReport.ContinueWith(task =>
            {
                Console.WriteLine("Producer:" + _producer.Name + "\r\nTopic:" + topic + "\r\nPartition:" + task.Result.Partition + "\r\nOffset:" + task.Result.Offset + "\r\nMessage:" + task.Result.Value + "\r\nResult:" + result);
            });
            _producer.Flush(TimeSpan.FromSeconds(10));
        }
    }

 

标签:string,C#,Confluent,kafka,static,var,new,consumer,public
From: https://www.cnblogs.com/wl-blog/p/17260546.html

相关文章

  • C#:C#终结器(析构函数)
    C#终结器是一种特殊的方法,它在对象被垃圾回收之前被调用。它可以用来释放非托管资源,如文件句柄、数据库连接等。在C#中,终结器是通过在类的定义中添加一个名为“~类名”的......
  • linux环境下tomcat日志切割
    1、打开Tomcat的配置文件catalina.sh(或catalina.bat,取决于你的操作系统),在其中找到以下配置项:#LoggingJAVA_OPTS="$JAVA_OPTS-Djava.util.logging.manager=org.apache......
  • 快慢指针-leetcode141-判断链表中是否有环。
    LeetCode#141题目描述:给定一个链表,判断链表中是否有环。如果链表中存在环,则返回true。否则,返回false。进阶:你能用O(1)(即,常量)内存解决此问题吗?示例1:example1......
  • Go语言:一文看懂什么是DI依赖注入(dependency injection)设计模式
    前言:本文主要介绍的是Goalng中关于DI的部分,前一部分会先通过典型的面向对象语言Java引入DI这个概念仅供初学者理解使用,文章如有纰漏敬请指出本文涉及到的知识面较为......
  • bing-create怎么样?如何申请注册?
    bing-Creator怎么样?ImageCreator是由微软推出的一款人工智能应用程序,它可以使用GPT-3模型生成图片。应用程序使用了DALL·E模型,它是OpenAI开发的一个图像生成......
  • Oracle问题:ORA-01565
    问题oracle启动时报错,找不到spfile文件。ORA-01078:failureinprocessingsystemparametersORA-01565:errorinidentifyingfile'+datadg/prod/parameterfile/spf......
  • 滤波器 CNN
    加入滤波器的作用?保持形状不变名字:滤波层(filter)也叫kernelCNN的计算实际上,就是滤波器的计算不用自己计算,计算机会自动的“学习”,设置层数就行......
  • 房产中介管理软件第13课:使用AutoFac做依赖注入处理
    使用了AutoFac做依赖注入的处理一、nuget加载AutoFac二、Program.cs中注入代码#regionAutofac自动注册builder.Host.UseServiceProviderFactory(newAutofacServic......
  • css-6个可以在css属性中使用的函数
    1-calc计算结果div{width:calc(100vm-50px);}2-var它可以将CSS变量的值赋予属性:root{--main-bg-color:coral;--main-padding:15px;}div{backgro......
  • 依赖注入之IConfiguration
    publicclassStartup{publicStartup(IConfigurationconfiguration){Configuration=configuration;}pu......