.NetCore 集成 Kafka 基本使用
我们先部署Kafka的镜像 可参考 DockerHub
我们这里直接创建一个 docker-compose.yml文件 创建以后 执行 docker-compose up -d
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.1.16
KAFKA_CREATE_TOPICS: "test:3:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- depends_on:zookeeper 指定kafka依赖zookeeper这个service,当启动kafka的时候自动会启动zookeeper。
- KAFKA_ADVERTISED_HOST_NAME 这里要指定宿主机的ip
- KAFKA_CREATE_TOPICS 这个变量只是的默认创建的topic。"test:3:1"代表创建一个名为test的topic并且创建3个分区1个复制。
- zookeeper,kafka的分布式依赖zookeeper,所以我需要先定义它。
创建俩个Console App 名称 分别为Kafka1和Kafka2
Kafka1为生产者 Kafka2为消费者
安装 NuGet包 Confluent.Kafka
生产者
static async Task Main(string[] args)
{
Console.WriteLine("Hello World Producer!");
var config = new ProducerConfig
{
BootstrapServers = "192.168.0.117:9092",
ClientId = Dns.GetHostName(),
};
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
string topic = "test";
for (int i = 0; i < 100; i++)
{
var msg = "message " + i;
Console.WriteLine($"Send message: value {msg}");
var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = msg });
Console.WriteLine($"Result: key {result.Key} value {result.Value} partition:{result.TopicPartition}");
Thread.Sleep(500);
}
}
Console.ReadLine();
}
消费者
static void Main(string[] args)
{
Console.WriteLine("Hello World kafka consumer !");
var config = new ConsumerConfig
{
BootstrapServers = "192.168.0.117:9092",
GroupId = "foo",
AutoOffsetReset = AutoOffsetReset.Earliest
};
var cancel = false;
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
var topic = "test";
consumer.Subscribe(topic);
while (!cancel)
{
var consumeResult = consumer.Consume(CancellationToken.None);
Console.WriteLine($"Consumer message: { consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");
}
consumer.Close();
}
}
标签:集成,Console,NetCore,zookeeper,Kafka,topic,var,new,kafka
From: https://www.cnblogs.com/Sleepy-Person/p/17039716.html