一、Kafka简介
1. 概念
Kafka是一个分布式的、基于发布/订阅的消息队列,最初由LinkedIn开发,并于2011年成为Apache项目的一部分。Kafka具有高吞吐量、可扩展性、持久性和容错性等特性,被广泛应用于大数据领域,如数据采集、数据传输、数据处理等场景。
Kafka主要由以下三个部分组成:
-
Producer:生产者,负责生产消息并将其发送到Kafka集群。
-
Broker:Kafka集群中的服务器,负责存储消息并进行消息的路由和传输。
-
Consumer:消费者,负责从Kafka集群中订阅并消费消息。
Kafka的消息模型基于发布/订阅模式,可以将消息分为多个主题(Topic),每个主题可以分为多个分区(Partition),每个分区可以分配到不同的Broker上进行存储。Kafka采用了基于时间的存储机制,即消息一旦被写入分区,就会被保留一定的时间,即使已经被消费也不会立即删除。
Kafka的使用步骤主要包括以下几个方面:
-
安装部署Kafka集群:根据实际需求安装部署Kafka集群。
-
创建Topic:使用Kafka提供的工具创建需要的Topic。
-
生产者生产消息:使用Kafka提供的API或者客户端工具,向指定Topic中生产消息。
-
消费者消费消息:使用Kafka提供的API或者客户端工具,从指定的Topic中消费消息。
-
监控和管理Kafka集群:使用Kafka提供的工具监控和管理Kafka集群,如监控集群健康状态、处理消息堆积等。
2. 简单示例
- 创建Topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- 生产者生产消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
- 消费者消费消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
二、详细示例
- 安装Kafka
首先,需要下载并安装Kafka。在安装完成之后,需要启动Kafka集群和Zookeeper服务器。可以通过以下命令启动:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
- 创建Topic
在使用Kafka发送和消费消息之前,需要创建一个Topic。
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
上述命令将创建一个名为“test”的Topic,它有1个副本和1个分区。可以通过以下命令来查看已创建的Topic:
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
- 发送消息
可以使用以下命令向Topic发送消息:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
在控制台输入消息后,按“Enter”键即可将消息发送到Topic中。
- 消费消息
可以使用以下命令从Topic中消费消息:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
上述命令将从Topic“test”中消费消息,并将它们输出到控制台。如果要停止消费消息,请按“Ctrl-C”。
- 使用Java API发送消息
可以使用Java API来发送消息。以下是一个发送消息的示例代码:
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) throws Exception{
String topicName = "test";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
System.out.println("Message sent successfully");
producer.close();
}
}
- 使用Java API消费消息
可以使用Java API来消费消息。以下是一个消费消息的示例代码:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
String topicName = "test";
Properties props = new Properties();
props
标签:--,kafka,Topic,详解,消息,test,Kafka,快速
From: https://www.cnblogs.com/arek/p/17255107.html