首页 > 其他分享 >NetCore 集成 Kafka 基本使用

NetCore 集成 Kafka 基本使用

时间:2023-01-10 11:56:54浏览次数:52  
标签:集成 Console NetCore zookeeper Kafka topic var new kafka

.NetCore 集成 Kafka 基本使用

我们先部署Kafka的镜像 可参考 DockerHub

我们这里直接创建一个 docker-compose.yml文件 创建以后 执行 docker-compose up -d

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.1.16
      KAFKA_CREATE_TOPICS: "test:3:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  • depends_on:zookeeper 指定kafka依赖zookeeper这个service,当启动kafka的时候自动会启动zookeeper。
  • KAFKA_ADVERTISED_HOST_NAME 这里要指定宿主机的ip
  • KAFKA_CREATE_TOPICS 这个变量只是的默认创建的topic。"test:3:1"代表创建一个名为test的topic并且创建3个分区1个复制。
  • zookeeper,kafka的分布式依赖zookeeper,所以我需要先定义它。

创建俩个Console App 名称 分别为Kafka1和Kafka2

Kafka1为生产者 Kafka2为消费者

安装 NuGet包 Confluent.Kafka

生产者

  static async Task Main(string[] args)
        {
            Console.WriteLine("Hello World Producer!");

            var config = new ProducerConfig
            {
                BootstrapServers = "192.168.0.117:9092",
                ClientId = Dns.GetHostName(),
            };


            using (var producer = new ProducerBuilder<Null, string>(config).Build())
            {
                string topic = "test";
                for (int i = 0; i < 100; i++)
                {
                    var msg = "message " + i;
                    Console.WriteLine($"Send message:   value {msg}");
                    var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = msg });
                    Console.WriteLine($"Result: key {result.Key} value {result.Value} partition:{result.TopicPartition}");
                    Thread.Sleep(500);
                }
            }

            Console.ReadLine();

        }

消费者

 static void Main(string[] args)
        {
            Console.WriteLine("Hello World kafka consumer !");

            var config = new ConsumerConfig
            {
                BootstrapServers = "192.168.0.117:9092",
                GroupId = "foo",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            var cancel = false;

            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                var topic = "test";
                consumer.Subscribe(topic);

                while (!cancel)
                {
                    var consumeResult = consumer.Consume(CancellationToken.None);

                    Console.WriteLine($"Consumer message: { consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");
                }

                consumer.Close();
            }
        }

标签:集成,Console,NetCore,zookeeper,Kafka,topic,var,new,kafka
From: https://www.cnblogs.com/Sleepy-Person/p/17039716.html

相关文章

  • 【转】Vue+springboot集成PageOffice实现在线编辑Word、excel文档
    说明:PageOffice是一款在线的office编辑软件,帮助Web应用系统或Web网站实现用户在线编辑Word、Excel、PowerPoint文档。可以完美实现在线公文流转,领导批阅,盖章。可以给文件......
  • Springboot集成Disruptor做内部消息队列
    一、基本介绍Disruptor的github主页:https://github.com/LMAX-Exchange/disruptor1,什么是Disruptor? (1)Disruptor是英国外汇交易公司LMAX开发的一个高性能的并发框架......
  • 记一次kafka重复消费连带出的JVM问题
    背景:指标对齐服务(CPU密集型)原本指标对齐服务的请求较小,响应速度不会超过1s,采用http同步处理可以解决。但最近接入的业务中有大文件业务,响应时间指数上升,但nginx的超时时间......
  • librdkafka auto.offset.reset 解释
     官网解释:Actiontotakewhenthereisnoinitialoffsetinoffsetstoreorthedesiredoffsetisoutofrange:'smallest','earliest'-automaticallyreset......
  • 【集成开发环境 (IDE)】Dev-Cpp下载与安装 [ 图文教程 ]
    版权声明©本文作者:main工作室本文链接:https://www.cnblogs.com/main-studio/p/17037280.html版权声明:本文为博客园博主「main工作室」的原创文章,遵循署名-非商业性......
  • ActiveMQ和spring集成
    1.消息发送类packagecn.com.biceng.jms.queue;importjavax.jms.Destination;importjavax.jms.JMSException;importjavax.jms.Message;importjavax.jms.Session;import......
  • NETCORE + VUE + SignalR 消息通信
     NETCORE+VUE+SignalR消息通信  分组通信:https://blog.csdn.net/qbc12345678/article/details/125215711 一.创建Net6WebApi项目NETCORE.TSignalR1.......
  • Apache Kafka 的基本概念
    基本概念主题Topictopic是Kafka最基础的组织单位,类似于关系数据库中的数据表。做为使用kafka的开发者,你最应该考虑的是和topoc相关的抽象。创建不同的topic保......
  • Kafka学习笔记(十二):Java Consumer
    JavaConsumerStringboostrapServers="127.0.0.1:9092";StringgroupId="my-second-application";Stringtopic="demo_java";//createconsumerconfigsProp......
  • Kafka学习笔记(十一):Java Producer
    KafkaJavaProgrammingpackageio.conduktor.demos.kafka;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Prod......