报错如下:
...:localhost:9092...
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:148)
at java.lang.Thread.run(Thread.java:745)
各软件使用的版本:
kafka版本:kafka_2.12-2.2.1
zookeeper版本:zookeeper-3.4.14
需要注意的是Kafka 2.8.0版本后脱离了zookeeper依赖, 用自管理的Quorum代替ZooKeeper管理元数据。
在Centos 7.9虚拟机上安装了kafka,zookeepe之后,没有修改过其中的任何配置文件,kafka默认端口是9029,zookeeper默认端口是2181,
分别运行以下两个命令,他两都能正常运行:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
启动后,创建Topic:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
通过命令行工具(kafka-console-producer.sh和kafka-console-consumer.sh)是能够相互通信的,producer发布的信息consumer能够接收到。
但是
java通过kafka-client的API写的代码始终不能跟kafka通信:java producer的消息发不出去, java comsumer也收不到任何消息。
仔细检查了下代码中IP、端口都没有写错。
package com.heima.kafka.chapter1;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Kafka 消息生产者
*/
public class ProducerFastStart {
// Kafka集群地址
private static final String brokerList = "localhost:9092";
// 主题名称-之前已经创建
private static final String topic = "test1";
public static void main(String[] args) {
Properties properties = new Properties();
// 设置集群地址
properties.put("bootstrap.servers", brokerList);
// 设置key序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//另外一种写法
//properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
// 设置值序列化器
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// KafkaProducer 线程安全
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!");
try {
producer.send(record);
//RecordMetadata recordMetadata = producer.send(record).get();
//System.out.println("part:" + recordMetadata.partition() + ";topic:" + recordMetadata.topic());
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
解决办法
将kafka/config/server.properties文件中advertised.listeners改为如下属性。192.168.230.66是我虚拟机的IP。改完后重启kafaka。
advertised.listeners=PLAINTEXT://192.168.75.137:9092
advertised.listeners上的注释是这样的:
#Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
意思就是说:hostname、port都会广播给producer、consumer。如果你没有配置了这个属性的话,则使用listeners的值,如果listeners的值也没有配置的话,则使用
java.net.InetAddress.getCanonicalHostName()返回值(这里也就是返回localhost了)。
最后不要忘了修改Java代码:
// Kafka集群地址
private static final String brokerList = "192.168.230.66:9092";
运行Java生产者代码,好了,虚拟机端消费者可以接受到消息了。
标签:java,producer,虚拟机,kafka,apache,org,properties From: https://www.cnblogs.com/reallife/p/17279994.html