下载:wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
注意:kafka正常运行,必须配置zookeeper,kafka安装包已经包括zookeeper服务
解压:tar -zxvf kafka_2.12-3.3.1.tgz
修改config 目录下 server.properties文件
修改config 目录下 zookeeper.properties 文件
启动Kafka,kafka2.12目录下运行 zookeeper 和 kafka
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
nohup bin/kafka-server-start.sh config/server.properties &
或
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
./bin/kafka-server-start.sh -daemon config/server.properties
检查是否启动成功:
ps -ef|grep kafka 或 lsof -i:2181
创建一个 topic 命名为:my-topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-topic
查看已创建的topic信息:
bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
命令行的工具生产消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
This is a message
消费工具消费消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
使用Ctrl-C停止生产者和消费者客户端。
【啰嗦一下】:
若要模拟分布式可以复制多个server.properties 文件,并修改 broker.id,port,log.dirs
再一一启动时指定不同的server.properties文件启动即可
API地址:https://kafka.apache.org/documentation/#api
Windos 环境启动kafka:
与linux相同的压缩包,解压后修改server.properties 文件,zookeeper.properties文件
到bin/windows目录下执行cmd 打开两个窗口,再分别执行以下两个命令,具体目录根据自己的文件目录修改
zookeeper-server-start.bat D:/ProgramFiles/kafka2.12/config/zookeeper.properties
kafka-server-start.bat D:/ProgramFiles/kafka2.12/config/server.properties
其创建生产消费等命令与linux类似,把sh文件换成bat文件执行命令
生产消息代码:
public class KafkaMyProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(props);
//生产数据
for (int i = 0; i < 5; i++) {
// topic: 消息队列的名称,可以先行在kafka服务中进行创建。如果kafka中并未创建该topic,那么便会自动创建!
// key:键值,value:要发送的数据,数据格式为String类型的。
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("mytopic", Integer.toString(i), "Hello kafka A" + i);
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (Objects.nonNull(e)) {
System.out.println("发送消息失败:" + e.getMessage());
}
if (Objects.nonNull(metadata)) {
System.out.println("同步发送消息结果:topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
}
}
});
}
producer.close();
}
}
消费消息代码:
public class KafkaMyConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1");//组名 不同组名可以重复消费
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("mytopic")); //需要先订阅一个topic,也就是指定消费哪一个topic
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(20));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
标签:示例,Windows,zookeeper,server,kafka,topic,--,Kafka,properties From: https://www.cnblogs.com/zhey/p/17016134.html