首页 > 编程语言 >kafka(java客户端)生产者消费者不能连接虚拟机kafka

kafka(java客户端)生产者消费者不能连接虚拟机kafka

时间:2023-04-02 10:23:07浏览次数:33  
标签:java producer 虚拟机 kafka apache org properties

报错如下:

...:localhost:9092...
java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:148)
    at java.lang.Thread.run(Thread.java:745)

各软件使用的版本:
kafka版本:kafka_2.12-2.2.1
zookeeper版本:zookeeper-3.4.14
需要注意的是Kafka 2.8.0版本后脱离了zookeeper依赖, 用自管理的Quorum代替ZooKeeper管理元数据。

在Centos 7.9虚拟机上安装了kafka,zookeepe之后,没有修改过其中的任何配置文件,kafka默认端口是9029,zookeeper默认端口是2181,
分别运行以下两个命令,他两都能正常运行:

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

启动后,创建Topic:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

通过命令行工具(kafka-console-producer.sh和kafka-console-consumer.sh)是能够相互通信的,producer发布的信息consumer能够接收到。

但是
java通过kafka-client的API写的代码始终不能跟kafka通信:java producer的消息发不出去, java comsumer也收不到任何消息。
仔细检查了下代码中IP、端口都没有写错。

package com.heima.kafka.chapter1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Kafka 消息生产者
 */
public class ProducerFastStart {
    // Kafka集群地址
    private static final String brokerList = "localhost:9092";
    // 主题名称-之前已经创建
    private static final String topic = "test1";

    public static void main(String[] args) {
        Properties properties = new Properties();
        // 设置集群地址
        properties.put("bootstrap.servers", brokerList);
        // 设置key序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //另外一种写法
        //properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 设置重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 设置值序列化器
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // KafkaProducer 线程安全
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!");
        try {
            producer.send(record);
            //RecordMetadata recordMetadata = producer.send(record).get();
            //System.out.println("part:" + recordMetadata.partition() + ";topic:" + recordMetadata.topic());
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.close();
    }
}

解决办法
将kafka/config/server.properties文件中advertised.listeners改为如下属性。192.168.230.66是我虚拟机的IP。改完后重启kafaka。
advertised.listeners=PLAINTEXT://192.168.75.137:9092

advertised.listeners上的注释是这样的:

#Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().

意思就是说:hostname、port都会广播给producer、consumer。如果你没有配置了这个属性的话,则使用listeners的值,如果listeners的值也没有配置的话,则使用
java.net.InetAddress.getCanonicalHostName()返回值(这里也就是返回localhost了)。

最后不要忘了修改Java代码:

// Kafka集群地址
private static final String brokerList = "192.168.230.66:9092";

运行Java生产者代码,好了,虚拟机端消费者可以接受到消息了。

标签:java,producer,虚拟机,kafka,apache,org,properties
From: https://www.cnblogs.com/reallife/p/17279994.html

相关文章

  • java面向对象编程-方法回顾
    方法回顾和加深方法的定义修饰符返回类型方法名:注意规范,见名知意参数列表:参数类型参数名异常抛出:后面讲解  方法的调用静态方法非静态方法形参和实参值传递和引用传递this关键字    ......
  • Java的运行时数据区域
    本文从概念上介绍Java虚拟机内存的各个区域,讲解这些区域的作用、服务对象以及其中可能产生的问题。Java虚拟机在执行Java程序的过程中会把它所管理的内存划分为若干个不同的数据区域。这些区域有各自的用途,以及创建和销毁的时间,有些区域随着虚拟机进程的启动而一直存在,有......
  • java题目集1~3的总结性Blog
    一、前言经历前三次java题目集的话,应该也算是相对了解了java这门语言了,因为有了c语言的基础在里面,所以一部分基础的语法部分就理解来说的话不算是很困难,但从面向过程到面向对象的一个思想观念的改变让我有些不适应。如果把编写代码比作下棋的话,那么面向过程就是一步一步的下......
  • Java创建线程的三种方式
    创建线程的三种方式1.继承Thread类,重写run方法publicclassMyThreadextendsThread{publicvoidrun(){System.out.println("HellofromMyThread!");}}publicclassMain{publicstaticvoidmain(String[]args){MyThreadthread......
  • java 中required_通过实例学习Spring @Required注释原理
    @Required注释应用于bean属性的setter方法,它表明受影响的bean属性在配置时必须放在XML配置文件中,否则容器就会抛出一个BeanInitializationException异常。下面显示的是一个使用@Required注释的示例。示例:让我们使EclipseIDE处于工作状态,请按照下列步骤创建一个......
  • java 中required_通过实例学习Spring @Required注释原理
    @Required注释应用于bean属性的setter方法,它表明受影响的bean属性在配置时必须放在XML配置文件中,否则容器就会抛出一个BeanInitializationException异常。下面显示的是一个使用@Required注释的示例。示例:让我们使EclipseIDE处于工作状态,请按照下列步骤创建一个......
  • 每日总结--2023/3/31(解决了数据库连接不正常的问题,完成了javaweb暂时的配置)
    今天耗费一天的时间总算是找到了问题所在.问题出在mysqlServlet的版本上。在重装系统前,我所装的mysqlSevlet版本是5.0左右的,所以连接数据库的驱动也是5.0,包括url,而在重装系统后我的mysqlSevlet版本是8.0以上的,所以用原来的语句是不正确的,要修改为8.0版本的才能够运行,同......
  • Java面向对象--接口和多态
    final关键字最终修饰符可以修饰类方法变量被final修饰后不能被继承重写二次赋值修饰类时该类不可以被继承修饰方法时该方法不能被重写修饰变量时,该变量只能赋值一次,不可以二次赋值是常量修饰引用变量时,地址值不能改变但对象中的属性值可以改变修饰成员变......
  • [VM virtual Box] vbox虚拟机使用问题处理
    虚拟机报错提示“一个键加速配置页中已启动硬件虚拟化,但主机并不支持。需要警用硬件虚拟化才能启动虚拟机”“虚拟电脑草藏系统提示设为64位。64位系统需要硬件虚拟。若设置则更改时自动选择”解决方法:关闭系统的虚拟化设置控制面板启用Hyper-V功能找回VirtualBox的......
  • Java线程:wait()和notify()
    一、wait()和notify()含义二、标准代码示例创建两个线程Thread0和Thread1。代码实现:运行流程详解三、什么时候释放锁—wait()、notify()四、用生活故事讲懂线程的等待唤醒1.老王和老李(专家程序员):2.王哥和李哥(普通程序员):3.小王和小李(新手程序员):五、问题理解1、执行wait()的......