首页 > 编程语言 >kafka入门例子 for java

kafka入门例子 for java

时间:2023-04-25 23:05:22浏览次数:39  
标签:java 入门 producer new kafka XX props import


1,生产者

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {  
  
        public static void main(String[] args) {  
            Properties props = new Properties();  
            props.setProperty("metadata.broker.list","10.XX.XX.XX:9092");  
            props.setProperty("serializer.class","kafka.serializer.StringEncoder");  
            props.put("request.required.acks","1");  
            ProducerConfig config = new ProducerConfig(props);  
            Producer<String, String> producer = new Producer<String, String>(config);  
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("mykafka","test-kafka");  
            try {  
            	int i =1;
            	while(i < 1000){
            		
            		producer.send(data);  
            	}
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
            producer.close();  
        }  
}

2,消费者

import java.util.HashMap;
import java.util.List;  
import java.util.Map;  
import java.util.Properties;  
  
import kafka.consumer.ConsumerConfig;  
import kafka.consumer.ConsumerIterator;  
import kafka.consumer.KafkaStream;  
import kafka.javaapi.consumer.ConsumerConnector; 

public class TestConsumer extends Thread{  
        private final ConsumerConnector consumer;  
        private final String topic;  
  
        public static void main(String[] args) {  
            TestConsumer consumerThread = new TestConsumer("mykafka");  
            consumerThread.start();  
        }  
        public TestConsumer(String topic) {  
            consumer =kafka.consumer.Consumer  
                    .createJavaConsumerConnector(createConsumerConfig());  
            this.topic =topic;  
        }  
  
    private static ConsumerConfig createConsumerConfig() {  
        Properties props = new Properties();  
        props.put("zookeeper.connect","10.XX.XX.XX:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");  
        props.put("group.id", "0");  
        props.put("zookeeper.session.timeout.ms","10000");  
        return new ConsumerConfig(props);  
    }  
  
    public void run(){  
        Map<String,Integer> topickMap = new HashMap<String, Integer>();  
        topickMap.put(topic, 1);  
        Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);  
        KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  
        ConsumerIterator<byte[],byte[]> it =stream.iterator();  
        System.out.println("*********Results********");  
        while(true){  
        	if(it.hasNext()){
        		
        		System.err.println("get data:" +new String(it.next().message()));  
        	}
            try {  
                Thread.sleep(1000);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}



3,分别启动生产者和消费者,在消费者输出中看到下日志即成功

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
*********Results********
get data:test-kafka




4,启动生产者如果报错如下:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
	at kafka.producer.Producer.send(Producer.scala:76)
	at kafka.javaapi.producer.Producer.send(Producer.scala:33)
	at ProducerTest.main(TestProducer.java:21)



需要改动config文件夹下的server.properties中的以下两个属性


zookeeper.connect=localhost:2181改成zookeeper.connect=10.0.30.221:2181 
以及默认注释掉的
#host.name=localhost改成host.name=10.0.30.221




标签:java,入门,producer,new,kafka,XX,props,import
From: https://blog.51cto.com/u_16088628/6225515

相关文章

  • JAVA
    1编写输出到控制台窗口的程序矩形,其侧面尺寸,宽度:23列,高度:11行;publicclassH1{publicstaticvoidmain(String[]args){introws=11;intcolumns=23;for(inti=0;i<rows;i++){for(intj=0;j<columns;j++......
  • JAVA 基础(学习img)
    Date:2023-04-2418:57:14尚硅谷Java零基础全套视频教程PS:只记重要的,自己不会的P1课程简介P2课程目录P3JAVA学习路线对我重要的step14IDEA安装使用P4谈谈JAVA吹水P5学习路线2023最新Java学习路线学习路线P6计算机硬件P7常见DOS命令P8不同编程语......
  • Rust编程语言入门之最后的项目:多线程 Web 服务器
    最后的项目:多线程Web服务器构建多线程Web服务器在socket上监听TCP连接解析少量的HTTP请求创建一个合适的HTTP响应使用线程池改进服务器的吞吐量优雅的停机和清理注意:并不是最佳实践创建项目~/rust➜cargonewhelloCreatedbinary(application)`......
  • Java模拟实现一个基于文本界面的《记账软件》
    /**@author:Noiimplant*@version:1.0*/1.利用java实现简易记账软件根据尚硅谷java教程进行练习2.实现功能记录家庭支出、收入,打印收支明细表使用分级菜单的方式3.代码实现3.1GuliAccount.javapackageGuliAccount;importjava.text.SimpleDateFormat;import......
  • 大模型入门(五)—— 基于peft微调ChatGLM模型
    ChatGLM是基于 GeneralLanguageModel(GLM) 架构,针对中文问答和对话进行了优化。经过中英双语训练,辅以监督微调、反馈自助、人类反馈强化学习等技术,ChatGLM因为是中文大模型,在中文任务的表现要优于LLaMa,我在一些实体抽取的任务中微调ChatGLM-6B,都取得了很不错的效果。GL......
  • java stream 多重groupingBy
    importcom.alibaba.fastjson.JSON;importlombok.Data;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;importjava.util.stream.Collectors;/***@ClassNameTest*@Description*@Authorcsg*@Data2023/4/2521:31*@Version......
  • Java静态代理总结
    总结:真实对象和代理对象都要实现同一个接口代理对象要代理真实角色优点:代理对象可以补充真实对象所要做的事情真实对象只需要关注自己做的事情代码示例:执行结果:......
  • 《Python入门与核心语法》电子书
    《Python入门与核心语法》是由刘永富编写、中国水利水电出版社智博尚书分社编辑整理的电子书,共61页,内容包括Python的下载和安装,代码编写与执行等基础内容。PDF电子书下载: 链接:https://pan.baidu.com/s/1I9HhkKBWuujzk2jBgkR5-w提取码:2022配套视频合集: https://www.bilibili......
  • Java 编程问题:三、使用日期和时间
    本章包括20个涉及日期和时间的问题。这些问题通过Date、Calendar、LocalDate、LocalTime、LocalDateTime、ZoneDateTime、OffsetDateTime、OffsetTime、Instant等涵盖了广泛的主题(转换、格式化、加减、定义时段/持续时间、计算等)。到本章结束时,您将在确定日期和时间方面没有问题,......
  • Java endorsed
    endorsed目录下放置的jar会覆盖JDK。endorsed目录可以通过以下代码获得:System.out.println(System.getProperty("java.endorsed.dirs"))可以通过-Djava.endorsed.dirs指定的目录面放置的jar文件,将有覆盖系统API的功能。可以牵强的理解为,将自己修改后的API打入到虚拟机指定的启动A......