首页 > 编程语言 >java api调用kafka

java api调用kafka

时间:2023-05-23 17:23:17浏览次数:46  
标签:java kafka println api props put import

已经启用了zookeeper和kafka

单机的ip为192.168.80.128

加入maven 的pom.xml代码如下 

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.8.2.1</version>
          </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.8.2.1</version>
        </dependency>

 

生产者的KafkaProducerDemo.java代码如下 

package com.anjubao.weixin.web.weChat;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
  @project:standardProject
  @class:KafkaProducerDemo.java
  @author:fuanyu E-mail:[email protected]
  @date:2022年11月9日 下午2:20:14
 */
public class KafkaProducerDemo {
    
     public static void main(String[] args) throws InterruptedException {
            /* 1、连接集群,通过配置文件的方式
             * 2、发送数据-topic:order,value
             */
            Properties props = new Properties(); props.put("bootstrap.servers", "192.168.80.128:9092"); props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432); 
            props.put("key.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer"); 
            props.put("value.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
     KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>
                    (props);
     System.out.println("*222*********");
            for (int i = 0; i < 10; i++) {
    // 发送数据 ,需要一个producerRecord对象,最少参数 String topic, V value 
                 System.out.println("*1*****");
                kafkaProducer.send(new ProducerRecord<String, String>("topic", "订单 息!"+i));
                Thread.sleep(100);
                System.out.println("*2*****");
            }
        }
     

}

消费者代码KafkaConsumerDemo.java

package com.anjubao.weixin.web.weChat;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

import org.apache.kafka.common.utils.CollectionUtils;

/**
  @project:standardProject
  @class:KafkaConsumerDemo.java
  @author:fuanyu E-mail:[email protected]
  @date:2022年11月9日 下午2:22:30
 */
public class KafkaConsumerDemo {
    
     
        public static void main(String[] args) throws Exception {
            Properties properties = new Properties();
            properties.put("zookeeper.connect", "192.168.80.128:2181");
            properties.put("auto.commit.enable", "true");
            properties.put("auto.commit.interval.ms", "60000");
            properties.put("group.id", "test");

            ConsumerConfig consumerConfig = new ConsumerConfig(properties);

            ConsumerConnector javaConsumerConnector = (ConsumerConnector) Consumer.createJavaConsumerConnector(consumerConfig);
            System.out.println("**********");
            //topic的过滤器
            Whitelist whitelist = new Whitelist("topic");
            List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist);

            if (partitions==null) {
            System.out.println("empty!");
            TimeUnit.SECONDS.sleep(1);
            }

            //消费消息
            for (KafkaStream<byte[], byte[]> partition : partitions) {

            ConsumerIterator<byte[], byte[]> iterator = partition.iterator();
            while (iterator.hasNext()) {
            MessageAndMetadata<byte[], byte[]> next = iterator.next();
            System.out.println("partiton:" + next.partition());
            System.out.println("offset:" + next.offset());
            System.out.println("接收到message:" + new String(next.message(), "utf-8"));
            }
            }
            }

}

 

测试时看运行KafkaConsumerDemo.java的代码。然后再运行KafkaProducerDemo.java

从结果上看消费者的结果如下

 

标签:java,kafka,println,api,props,put,import
From: https://www.cnblogs.com/fuanyu/p/17425822.html

相关文章

  • Java图像二值化,并裁去白边
    手写签名场景,为更符合签名效果,节省服务器存储空间,将原图二值化后,再将多于空白去裁去。java中图像二值化有个小技巧,能够很方便将图像二值化,不用再重费心思去研究二值化过程,技巧在这个参数BufferedImage.TYPE_BYTE_BINARY 1packagecom.test;23importjava.awt.Color;......
  • 【Java Web】MultipartFile和byte[]互转
    接口接收文件@RequestParam("file")MultipartFilefileMultipartFile转byte[]Stringname=file.getName();StringoriginalFilename=file.getOriginalFilename();StringcontentType=file.getContentType();Stringbytes=file.getBytes();byte[]转Multipar......
  • javascript中的错误类型
    javascript中的错误类型:SyntaxErrorTypeErrorReferenceErrorRangeErrorURLErrorErrorSyntaxError语法错误//当您在编写一个函数时忘记了括号,)来括起您的代码,您将收到一个SyntaxError错误functionsay(text){returntext;}say('shark';//outputUncaug......
  • java.sql.SQLException: Access denied for user 'root'@'localhost' (using password
    org.apache.ibatis.exceptions.PersistenceException:###Errorqueryingdatabase.Cause:java.sql.SQLException:Accessdeniedforuser'root'@'localhost'(usingpassword:YES)###Theerrormayexistincom/itheima/mapper/BrandMapper.j......
  • Uni-app 封装 API 请求
    一、在文件夹API下创建api.jsconstBASE_URL="";//封装请求方法constrequest=(url,method,data)=>{ wx.showLoading({ title:'加载中'//数据请求前loading }) returnnewPromise((resolve,reject)=>{ wx.request({ url:BASE_URL+url,......
  • org.springframework.data.annotation.Transient 和 javax.persistence.Transient 的
    1、org.springframework.data.annotation.Transient 和 javax.persistence.Transient 都是用于标记一个属性不需要被持久化到数据库中的注解。它们的区别在于它们所处的框架和使用场景。org.springframework.data.annotation.Transient 是SpringDataJPA框架提供的注解,用......
  • java入门
    java简介:1.什么是程序:程序通常指完成某些事情的一种既定方式和过程;(方式){细节1;细节2;细节3}(过程){第一步;第二步;第三步}2.java的产生:1995年诞生。java之父高斯林3.java的发展史:java诞生于1995年。java目前在企业开发过程中,使用和占有率最高的是jdk1.84.java的技术平台:java技术平......
  • java 给实体类赋默认值通用方法
    importjava.lang.reflect.Field;importjava.lang.reflect.Modifier;importjava.math.BigDecimal;importjava.sql.Date;importjava.sql.Timestamp;importjava.util.ArrayList;importjava.util.List;/***使用反射给实体类k赋值(默认值)*insertupdate会报null......
  • java中运行指令浅析
    后续业务可能需要在程序中运行指令,所以这里简单探究了一下,分别从win和linux两个平台进行研究,又以为java是跨平台语言,可能二者之间的区别应该只是返回内容与输入指令的不同.(还不是在win上开发)1.如何使用Runtime.getRuntime().exec("notepad");RuntimeUtil.exec(......
  • Java 局部变量
     局部变量声明在方法、构造方法或者语句块中。局部变量在方法、构造方法、或者语句块被执行的时候创建,当它们执行完成后,变量将会被销毁。局部变量必须在使用前声明,并且不能被访问修饰符修饰,因为它们的作用域已经被限制在了声明它们的方法、代码块或构造函数中。局部变量只在......