使用docker安装
docker pull bitnami/kafka
docker run -d -p 9092:9092 --name kafka-server \
-e ALLOW_PLAINTEXT_LISTENER=yes \
-e KAFKA_CFG_ZOOKEEPER_CONNECT=ip:2181 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://ip:9092 \
-e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true \
-e KAFKA_HEAP_OPTS="-Xmx256m -Xms256m" \
bitnami/kafka
ALLOW_PLAINTEXT_LISTENER=yes 允许不加密监听
KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://ip:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP。
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 配置kafka的监听端口
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true topic不存在时自动创建
KAFKA_HEAP_OPTS JVM参数,内存太大,服务器内存不足,内存太小,启动不起来。
安装控制台
docker pull dushixiang/kafka-map
docker run -d \
-p 9093:8080 \
-e DEFAULT_USERNAME=admin \
-e DEFAULT_PASSWORD=szz123 \
--name kafka-console \
dushixiang/kafka-map
注意开启端口号9093的限制,访问路径 http://ip:9093
使用
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;
public class Producer {
public static void main(String[] args) {
final String url = "ip:9092";
final String topic = "order_pay_topic";
// 配置.
Map<String, Object> config = new HashMap<>();
// 连接地址
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
// ACK
config.put(ProducerConfig.ACKS_CONFIG, "all");
// 相应超时.
config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5000);
// 缓冲区大小. (发送给服务器的)
config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024 * 1024 * 10);
// 每次最多发10K
config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1024 * 10);
// 不重试,有些非幂等性可以.
config.put(ProducerConfig.RETRIES_CONFIG, 0);
// snappy 压缩..
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// 序列化
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// ok了.
KafkaProducer<String, String> producer = new KafkaProducer<>(config);
IntStream.range(0, 10).forEach(value -> {
// 发送
producer.send(new ProducerRecord<>(topic, "cur-time", String.format("id: %d, time : %d.", value, System.currentTimeMillis())));
});
// 最后记得刷新出去.
producer.flush();
}
}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class Consumer {
public static void main(String[] args) {
final String topic = "order_pay_topic";
final String group = "springboot_consumer_group2";
final String url = "ip:9092";
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.GROUP_ID_CONFIG, group);
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5));
poll.forEach(record ->
System.out.println(String.format("topic: %s, key: %s, value: %s, offset:%d.",
record.topic(), record.key(), record.value(), record.offset())));
// 提交偏移量.
consumer.commitSync();
}
}
}
同一个消费组,第一个接入到组中的才会接收到消息。
参考
kafka-docker镜像仓库
Docker安装kafka
快速入门 Kafka (Java客户端版本)