首页 > 其他分享 >Kafka简单入门

Kafka简单入门

时间:2023-10-02 13:12:11浏览次数:52  
标签:KAFKA 入门 CONFIG kafka 简单 put import Kafka config

使用docker安装

docker pull bitnami/kafka
docker run -d -p 9092:9092 --name kafka-server \
-e ALLOW_PLAINTEXT_LISTENER=yes \
-e KAFKA_CFG_ZOOKEEPER_CONNECT=ip:2181 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://ip:9092 \
-e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true \
-e KAFKA_HEAP_OPTS="-Xmx256m -Xms256m" \
bitnami/kafka

ALLOW_PLAINTEXT_LISTENER=yes 允许不加密监听
KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://ip:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP。
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 配置kafka的监听端口
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true topic不存在时自动创建
KAFKA_HEAP_OPTS JVM参数,内存太大,服务器内存不足,内存太小,启动不起来。

安装控制台

docker pull dushixiang/kafka-map
docker run -d \
    -p 9093:8080 \
    -e DEFAULT_USERNAME=admin \
    -e DEFAULT_PASSWORD=szz123 \
    --name kafka-console \
    dushixiang/kafka-map

注意开启端口号9093的限制,访问路径 http://ip:9093

使用

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.3.1</version>
</dependency>
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;

public class Producer {

    public static void main(String[] args) {
        final String url = "ip:9092";
        final String topic = "order_pay_topic";
        // 配置.
        Map<String, Object> config = new HashMap<>();
        // 连接地址
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
        // ACK
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        // 相应超时.
        config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5000);
        // 缓冲区大小. (发送给服务器的)
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024 * 1024 * 10);
        // 每次最多发10K
        config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1024 * 10);
        // 不重试,有些非幂等性可以.
        config.put(ProducerConfig.RETRIES_CONFIG, 0);
        // snappy 压缩..
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        // 序列化
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // ok了.
        KafkaProducer<String, String> producer = new KafkaProducer<>(config);
        IntStream.range(0, 10).forEach(value -> {
            // 发送
            producer.send(new ProducerRecord<>(topic, "cur-time", String.format("id: %d, time : %d.", value, System.currentTimeMillis())));
        });
        // 最后记得刷新出去.
        producer.flush();
    }
}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class Consumer {

    public static void main(String[] args) {

        final String topic = "order_pay_topic";
        final String group = "springboot_consumer_group2";
        final String url = "ip:9092";

        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5));
            poll.forEach(record ->
                    System.out.println(String.format("topic: %s, key: %s, value: %s, offset:%d.",
                            record.topic(), record.key(), record.value(), record.offset())));
            // 提交偏移量.
            consumer.commitSync();
        }
    }
}

同一个消费组,第一个接入到组中的才会接收到消息。

参考

kafka-docker镜像仓库
Docker安装kafka
快速入门 Kafka (Java客户端版本)

标签:KAFKA,入门,CONFIG,kafka,简单,put,import,Kafka,config
From: https://www.cnblogs.com/strongmore/p/17181636.html

相关文章

  • 26、Flink 的SQL之概览与入门示例
    文章目录Flink系列文章一、SQL1、数据类型2、保留关键字二、SQL入门1、FlinkSQL环境准备1)、安装Flink及提交任务方式2)、SQL客户端使用介绍3)、简单示例2、Source表介绍及示例3、连续查询介绍及示例4、Sink表介绍及示例本文简单的介绍了SQL和SQL的入门,并以三个简单的示例进行介......
  • Mybatis入门
    Mybatis入门前言在前面我们学习MySQL数据库时,都是利用图形化客户端工具(如:idea、datagrip),来操作数据库的。在客户端工具中,编写增删改查的SQL语句,发给MySQL数据库管理系统,由数据库管理系统执行SQL语......
  • 01. Kubernetes基础入门
    目录1、前言2、Kubernetes介绍2.1、什么是Kubernetes2.2、主要功能2.3、与Docker的关系2.4、Kubernetes集群架构体系3、Kubernetes组件3.1、核心组件3.2、附加组件4、Kubernetes对象4.1、对象管理4.2、命名空间4.3、标签1、前言Docker容器技术将应用及其依赖打包到镜像中,从而很好......
  • 实验1 C语言输入输出和简单程序编写
    1.实验任务1task1_1.c源代码1#include<stdio.h>2intmain()3{4printf("o\n");5printf("<H>\n");6printf("II\n");7printf("o\n");8printf("<H>\n"......
  • 实验1c语言的简单输入输出和简单程序编写
    实验1#include<stdio.h>#include<stdlib.h>intmain(){printf("0\n");printf("<H>\n");printf("II\n");system("pause");return0;}实验2#include<stdio.h>#include<stdlib.......
  • 掌握这些技巧,让Excel批量数据清洗变得简单高效!
    什么是数据清洗数据清洗是指在数据处理过程中对原始数据进行筛选、转换和修正,以确保数据的准确性、一致性和完整性的过程。它是数据预处理的一部分,旨在处理和纠正可能存在的错误、缺失值、异常值和不一致性等数据质量问题。为什么要数据清洗Excel在数据采集场景中非常常用。作......
  • 实验1C语言输入输出和简单程序编写
    1.实验1实验1.1源代码 1//打印一个字符小人23#include<stdio.h>4intmain()5{6printf("0\n");7printf("<H>\n");8printf("II\n");9printf("0\n");10printf("<H>......
  • aws awswrangler 集成minio 简单试用
    awsawswrangler现在已经改名为aws-sdk-pandas,但是对于python使用的时候安装已经是使用awswrangler名称以下是一个简单的集成minio的测试,核心是配置环境变量,这个也比较符合aws对于相关资源的集成玩法环境准备docker-compose文件 version:'3'services......
  • 【C语言入门】第一天
    [例题1]输入两个正整数a和b,输出a+b的值。其中a,b<10000.#include<stdio.h>intmain(){inta,b;scanf("%d%d",&a,&b);printf("%d",(a+b));return0;}[例题2]先输入一个t(t<100),然后输入t组数据。对于每组数据,输入两个整数a和b,输出a+b值......
  • token+redis的简单使用方式
    以用户登录为例,讲解token+redis的使用方式,环境是vue和springboot。一、用户登录时序图二、前端代码分析1、前端使用vuestore保存token2、在每次发起请求时进行响应拦截,从vuestore取出token,放在每次请求的请求头上三、后端代码分析1、在控制层接收账号,密码,调用服务层代......