首页 > 编程语言 >本地kafka安装以及使用java作为客户端

本地kafka安装以及使用java作为客户端

时间:2023-04-11 14:13:33浏览次数:49  
标签:java CONFIG kafka org put import consumer 客户端

1.使用windows下载kafka 地址:

https://kafka.apache.org/

 

 

   下载安装后,使用命令行启动:

  进入kafka所在目录,执行命令:

   

  #启动zookeeper命令:

  bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

#启动kafka命令
bin\windows\kafka-server-start.bat .\config\server.properties

如果在控制台想要查看消费者,但是现实乱码,可以使用下面的命令:

 

Active code page: 65001
bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --group java-group --from-beginning

 

   

2.引入pom依赖

  

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.6</version>
</dependency>
 

3.编写客户端

添加application.yml文件

 

spring:
  orakafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      # # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
      retries: 1
      #一个批次可以使用的内存大小
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
      acks: all
    consumer:
      # 默认的消费组ID
      group-id: java-group
      # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: latest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量一次最大拉取数据量
      max-poll-records: 65535
      #监测消费端心跳时间
      heartbeat-interval: 30000
      # 批量拉取间隔,要大于批量拉取数据的处理时间,时间间隔太小会有重复消费
      max.poll.interval.ms: 50000
      #会话超时时间。注意:心跳时间必须小于会话时间
      session.timeout.ms: 60000
      listener:
        #手工ack,调用ack后立刻提交offset
        ack-mode: MANUAL_IMMEDIATE
        #容器运行的线程数
        concurrency: 4

添加配置类

package com.gwm.marketing.kafka;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @Author:hongtaofan
 * @Version:1.0
 * @Description: kafka配置
 * @Date: 2023/4/7 11:36
 */
@Configuration
@ConfigurationProperties(prefix = "spring.orakafka")
@Data
public class KafkaConfig {

    //@Value("${spring.orakafka.bootstrap-servers}")
    private String bootstrapServer;

    private KafkaConsumerConfig consumer;

}

 

package com.gwm.marketing.kafka;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @Author:hongtaofan
 * @Version:1.0
 * @Description:
 * @Date: 2023/4/10 16:03
 */
@Configuration
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ConfigurationProperties(prefix = "consumer")
public class KafkaConsumerConfig {

    /**
     * 消费组id
     */
    //@Value("${spring.orakafka.consumer.group-id}")
    private String groupId;

    /**
     * 偏移量起始点
     */
    //@Value("${spring.orakafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    /**
     * 一次从kafka服务拉取的数据量
     */
    //@Value("${spring.orakafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;

    /**
     * 一次从kafka服务拉取的数据量
     */
    //@Value("${spring.orakafka.consumer.max-poll-records}")
    private String maxPollRecords;

    /**
     * 监测消费端心跳时间
     */
    //@Value("${spring.orakafka.consumer.heartbeat-interval}")
    private String heartbeatInterval;

    /**
     * 两次拉取数据的最大时间间隔
     */
    //@Value("${spring.orakafka.consumer.max.poll.interval.ms}")
    private String maxPollIntervalMs;

    /**
     * session会话时间 todo 需要大于消费端心跳时间
     */
    //@Value("${spring.orakafka.consumer.session.timeout.ms}")
    private String sessionTimeoutMs;

}

由于自己使用的是nacos,如果本地单元测试,直接使用@Value。把下面的@Value的注释放开即可

package com.gwm.marketing.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author:hongtaofan
 * @Version:1.0
 * @Description:kafka初始化属性
 * @Date: 2023/4/10 16:29
 */
@Configuration
@EnableKafka
public class KafkaInit {

    @Resource
    private KafkaConsumerConfig kafkaConsumer;

    @Resource
    private KafkaConfig kafkaConfig;

    @Bean
    public Map<String,Object> kafkaProducerInit(){
        Map<String,Object> prop = new HashMap<>(4);
        //Properties prop = new Properties();
        // 设置接入点,请通过控制台获取对应 Topic 的接入点
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaConfig.getBootstrapServer());
        //kafka消息的序列化方式
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        //请求的最长等待时间
        prop.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,30*1000);
        // 构造 Producer 对象,注意,该对象是线程安全的
        // 一般来说,一个进程内一个 Producer 对象即可
        // 如果想提高性能,可构造多个对象,但最好不要超过 5个
        return prop;
    }

    @Bean
    public ProducerFactory<String,String> producerFactory(){
        return new DefaultKafkaProducerFactory<>((Map<String, Object>) kafkaProducerInit());
    }

    @Bean
    public Map<String,Object>   consumerConfigs(){
        Map<String,Object> consumer = new HashMap<>(8);
        consumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaConfig.getBootstrapServer());
        consumer.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumer.getGroupId());
        consumer.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,kafkaConsumer.getEnableAutoCommit());
        consumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,kafkaConsumer.getAutoOffsetReset());
        consumer.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,kafkaConsumer.getMaxPollRecords());
        consumer.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,kafkaConsumer.getMaxPollIntervalMs());
        consumer.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,kafkaConsumer.getHeartbeatInterval());
        consumer.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,kafkaConsumer.getSessionTimeoutMs());
        consumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return consumer;
    }


    @Bean
    public ConsumerFactory<String,String> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactoryConfigurer(){
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    @Bean
    public KafkaTemplate<String,String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }

}
package com.gwm.marketing.kafka;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

/**
 * @Author:hongtaofan
 * @Version:1.0
 * @Description:
 * @Date: 2023/4/10 17:10
 */
@Slf4j
@Component
@RequiredArgsConstructor
@Service
public class KafkaUtils {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 发送消息
     * @param topicName
     * @param msg
     */
    public void kafkaSendMsg(String topicName,String msg){
        kafkaTemplate.send(topicName,msg);
        log.info("kafka发送消息成功");
    }

    @KafkaListener(topics = {"test"},groupId = "java-group",containerFactory = "kafkaListenerContainerFactoryConfigurer")
    public void kafkaListener(ConsumerRecord<?,?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC)String topic) {
        log.info("kafka消费消息:" + record.topic() + "--" + record.partition() + "--" + record.value());
        ack.acknowledge();
    }
}

 

编写单元测试:

package com.gwm.marketing.kafka;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import javax.annotation.Resource;

/**
 * @Author:hongtaofan
 * @Version:1.0
 * @Description:
 * @Date: 2023/4/10 17:52
 */
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = {KafkaTestController.class})
@ComponentScan(value = {"com.gwm.marketing"})
//@ContextConfiguration(classes = KafkaUtils.class)
public class KafkaTestController {
 

    @Resource
    private KafkaUtils kafkaUtils;
   @Test
    public void testSendKafka(){
       System.out.println("********************");
       for (int t = 0;t<10;t++){
           kafkaUtils.kafkaSendMsg("test","发送一条kafka消息能不能被消费" + t);
       }

    }
}

 

kafka消费者类:

package com.gwm.marketing.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author:hongtaofan
 * @Version:1.0
 * @Description:
 * @Date: 2023/4/11 13:06
 */
public class KafkaConsumerExample {
    public static void main(String[] args) {
        Map<String,Object> consumer = new HashMap<>(8);
        consumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        consumer.put(ConsumerConfig.GROUP_ID_CONFIG, "java-group");
        consumer.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        consumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        consumer.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"65535");
        consumer.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,"50000");
        //todo 设置的心跳检测时间需要小于会话时间
        consumer.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"30000");
        consumer.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"60000");
        consumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(consumer);
        kafkaConsumer.subscribe(Collections.singleton("test"));

        try {
            while (true){
                ConsumerRecords<String,String> records = kafkaConsumer.poll(100);
                Thread.sleep(500);
                for (ConsumerRecord<String,String> record : records){
                    System.out.println("=======消息内容=======" + record.toString());
                    System.out.printf("消费消费者:offset = %d,key = %s,value = %s%n", record.offset(),record.key(),record.value());
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

 

 

出现过的问题:

1. KafkaTemplate 使用单元测试时候,一直提示bean注入失败。原因是自己的kafkaTemplate 没有作为@Bean初始化进去。

    @Bean
    public KafkaTemplate<String,String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
 2.提示  心跳时间必须小于会话时间  需要配置消费者的心跳时间小于会话时间

 

 参考:https://blog.csdn.net/zlfjavahome/article/details/127789860

 

标签:java,CONFIG,kafka,org,put,import,consumer,客户端
From: https://www.cnblogs.com/thinkingandworkinghard/p/17306025.html

相关文章

  • odoo中用javascript调用model中定义好的方法
    odoo中如果前端界面要调用后台model中写好的方法,很简单。使用do_action即可,比如要调用改res.users的默认语言后执行的方法 odoo.define('switch_language.SwitchLanguageMenu',function(require){"usestrict";varModel=require('web.Model');varse......
  • JavaSE05数组
    1.数组概念:指的是一种容器,可以同来存储同种数据类型的多个值。但是数组容器在存储数据的时候,需要结合隐式转换考虑。比如:定义了一个int类型的数组。那么boolean。double类型的数据是不能存到这个数组中的,但是byte类型,short类型,int类型的数据是可以存到这个数组里面的。int类型......
  • Jmeter 启动时报错:Not able to find Java executable or version. Please check your
    安装java环境,cmd窗口中执行java-version可以看到java的版本信息。双击jmeter启动文件,报错:NotabletofindJavaexecutableorversion.PleasecheckyourJavainstallation解决办法:在启动文件jmeter.bat中添加java的环境信息SETJAVA_HOME=D:\Ksoftware\openjdk-19.0.2_w......
  • Java中常用算法及示例-分治、迭代、递归、递推、动态规划、回溯、穷举、贪心
    场景1、分治算法的基本思想是将一个计算复杂的问题分成规模较小、计算简单的小问题求解,然后综合各个小问题,得到最终答案。2、穷举(又称枚举)算法的基本思想是从所有可能的情况中搜索正确的答案。3、迭代法(IterativeMethod)无法使用公式一次求解,而需要使用重复结构(即循环)......
  • java配置环境变量采坑
    用几年java了,环境变量还要采坑注意图中JAVA_HOME最后那个“;”,打开之后编辑环境变量的列表里是没有的,你得点击编辑文本把它删了......
  • 解决javascript调用本地sanic接口报跨域错误的问题
    在py代码中利用middleware()方法修饰request/response即可,无需别的操作。 app=Sanic('Sanic_Server')@app.middleware("request")defcors_middle_req(request:Request):"""路由需要启用OPTIONS方法"""ifrequest.method.lower()==......
  • Studio 3T 2023.3 (macOS, Linux, Windows) - MongoDB 的专业 GUI、IDE 和 客户端,现在
    TheprofessionalGUI,IDEandclientforMongoDB请访问原文链接:https://sysin.org/blog/studio-3t-2023/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.orgStudio3T,MongoDB的专业GUI、IDE和客户端适用于MongoDB的所有IDE、客户端和GUI工具——在Atlas......
  • JavaScript 的 ==、===、区别
    在JavaScript中,==和===都是用于比较两个值是否相等的运算符。它们之间的主要区别在于类型转换方面。具体来说:== 运算符在检查相等性之前会根据需要进行类型转换,将不同类型的值转换为相同类型。例如,如果一个操作数是字符串类型,另一个是数字类型,那么字符串会被转换成数字后......
  • Java常用的算法
    1.给定一个 n 个元素有序的(升序)整型数组 nums和一个目标值 target ,写一个函数搜索 nums 中的target,如果目标值存在返回下标,否则返回-1。classSolution{publicintsearch(int[]nums,inttarget){intlow=0,high=nums.length-1;while(low<=......
  • java大数加法的一种思路
    packageorg.example;importjava.util.ArrayList;importjava.util.List;importjava.util.Scanner;classSuperNum{publicList<Integer>numList;/***成员变量的set方法*@paramnumList*/publicvoidsetNumList(List<Inte......