首页 > 编程语言 >java向kafka发送消息(生产者)

java向kafka发送消息(生产者)

时间:2022-11-15 08:56:18浏览次数:48  
标签:maven java String producer plugin kafka 发送 props

代码如下,简单记录一下

package org.example;

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class test {

    private final KafkaProducer<String, String> producer;

    public final static String TOPIC = "lijian12";

    private test() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.31.87:9092");//xxx服务器ip
        props.put("acks", "all");//所有follower都响应了才认为消息提交成功,即"committed"
        props.put("retries", 0);//retries = MAX 无限重试,直到你意识到出现了问题:)
        props.put("batch.size", 16384);//producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数
        //batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms
        props.put("linger.ms", 1);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理
        props.put("buffer.memory", 33554432);//producer可以用来缓存数据的内存大小。
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<String, String>(props);
    }

    public void produce() throws InterruptedException {
        int messageNo = 1;
        final int COUNT = 20000;
        while(messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            //String data = String.format("hello KafkaProducer message %s from hubo 06291018 ", key);
            String data = String.format("testdatccc|kk");
            Thread.sleep(1);  //1000代表1秒
            System.out.println(messageNo);
            try {
                producer.send(new ProducerRecord<String, String>(TOPIC, data));
            } catch (Exception e) {
                e.printStackTrace();
            }
            messageNo++;
        }
        producer.close();
    }

    public static void main(String[] args) throws InterruptedException {
        new test().produce();
    }
}

 

pom.xml文件如下:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>untitled3</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>untitled3</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <!-- kafka -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.0</version>
    </dependency>

  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

 

标签:maven,java,String,producer,plugin,kafka,发送,props
From: https://www.cnblogs.com/lxqtest/p/16891238.html

相关文章

  • Java反应式编程(2)
    您好,我是湘王,这是我的博客园,欢迎您来,欢迎您再来~ 看了前面对反应式编程的说明,可能很多人都会打退堂鼓了。Lambda都已经够难理解的了,又来了一个反应式编程!实在卷不动了~其......
  • Java 同步锁ReentrantLock与抽象同步队列AQS
    AbstractQueuedSynchronizer抽象同步队列,它是个模板类提供了许多以锁相关的操作,常说的AQS指的就是它。AQS继承了AbstractOwnableSynchronizer类,AOS用于保存线程对象,保存什......
  • JAVA学习
    JAVA问题解决综合问题语句FILEf=newFILE("Grades.txt");在IDEA中运行时,读取的目标文件不一定是源码所在目录下的"Grades.txt",而是源码类所属的父模块中最高级的模......
  • javaScript书写位置
    1.内部javaScript   直接写在html文件里,用script标签包住  规范:script标签写在</body>上面2.外部js:代码写在以.js结尾的文件里语法:通过scr......
  • Javascript的字符串不可变性
    JS的字符串不可变指的是字符串值是不变的,只是变量指向的地址变化了,重新在内存中开辟了空间.vara='111';vara='222';此时字符串'111'的空间依然存在内存中,重新开辟......
  • 使用JAVA实现循环相克令
    一、问题描述:循环相克令是说有两个人在玩游戏,分别出猎人、枪、狗熊中的一个,其中猎人胜枪,枪胜狗熊,狗熊胜猎人。他们两个人进行若干次比赛,然后我们判断一下每次的胜负。二、......
  • 【Java】Springboot + Redis + AOP切面实现字典翻译
     使用案例演示:先开发了一个简单的Demo:普通DTO类注解翻译的字段和翻译来源  在需要翻译的方法上注解@Translate  接口返回结果:  框架思路:1、标记的......
  • java调用WPS或pdfcreator的com接口实现doc转pdf
    使用了jacob.jar来调用activex控件,本机需安装WPS或pdfcreator。还需要jacob.jar以及jacob.dll请看附件jacob.dll需要放置在系统system32下,如果系统是c盘:C://windows/sys......
  • java——接口作为方法的参数和返回值
    接口作为方法的参数和返回值packagecn.itcast.day11.demo07;importjava.util.ArrayList;importjava.util.List;/*java.util.List正是ArrayList所实现的接口。......
  • 云服务器(Linux)安装部署Kafka
    云服务器(Linux)安装部署Kafka前期准备kafka的安装需要依赖于jdk,需要在服务器上提前安装好该环境,这里使用用jdk1.8。下载安装包官网地址:较新的版本已自带Zookeeper,无......