首页 > 其他分享 >批量生产千万级数据 推送到kafka代码

批量生产千万级数据 推送到kafka代码

时间:2024-06-18 19:30:37浏览次数:36  
标签:sendKafkaMsg String producer 千万级 kafka CompletableFuture import 推送 runAsync

1、批量规则生成代码

1、随机IP生成代码
2、指定时间范围内随机日期生成代码
3、随机中文名生成代码。

package com.wfg.flink.connector.utils;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

/**
 * @author wfg
 */
public class RandomGeneratorUtils {
    /**
     * 生成随机的IP地址
     * @return ip
     */
    public static String generateRandomIp() {
        Random random = new Random();

        // 生成IP地址的四个部分 0-255
        int part1 = random.nextInt(256);
        int part2 = random.nextInt(256);
        int part3 = random.nextInt(256);
        int part4 = random.nextInt(256);

        // 将每个部分转换为字符串,并用点连接
        return part1 + "." + part2 + "." + part3 + "." + part4;
    }
    /**
     * 在指定的开始和结束日期之间生成一个随机的日期时间。
     *
     * @param startDate 开始日期
     * @param endDate 结束日期
     * @return 随机生成的日期时间
     */
    public static LocalDateTime generateRandomDateTime(LocalDate startDate, LocalDate endDate) {

        long startEpochDay = startDate.toEpochDay();
        long endEpochDay = endDate.toEpochDay();
        Random random = new Random();
        LocalDate randomDate = startDate;
        if (startEpochDay != endEpochDay) {
            long randomDay = startEpochDay + random.nextLong(endEpochDay - startEpochDay);
            randomDate = LocalDate.ofEpochDay(randomDay);
        }

        LocalTime randomTime = LocalTime.of(
                // 小时
                random.nextInt(24),
                // 分钟
                random.nextInt(60),
                // 秒
                random.nextInt(60)
        );

        return LocalDateTime.of(randomDate, randomTime);
    }


    private static final List<String> FIRST_NAMES = new ArrayList<>();
    private static final List<String> LAST_NAMES = new ArrayList<>();

    static {
        // 初始化一些常见的名字和姓氏
        FIRST_NAMES.addAll(Arrays.asList("王","李","赵","刘","张", "钱", "孙", "周", "吴", "郑", "王", "冯", "陈", "褚", "卫", "蒋", "沈", "韩", "杨", "朱", "秦", "尤", "许", "何", "吕", "施",
                "张", "孔", "曹", "严", "华", "金", "魏", "陶", "姜", "戚", "谢", "邹", "喻", "柏", "水", "窦", "章", "云", "苏", "潘", "葛", "奚", "范", "彭", "郎",
                "鲁","韦", "昌", "马", "苗", "凤", " 刚", "文", "张", "桂", "富", "生", "龙", "功", "夫", "周", "建", "余", "冲", "程", "沙", "阳", "江", "潘"));

        LAST_NAMES.addAll(Arrays.asList("伟","芳","丽","强","明","风","阳","海","天","藏","霞","刚", "夫", "勇", "毅", "俊", "峰", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义",
                "义", "兴", "良", "海", "山", "仁", "波", "宁", "贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春",
                "明", "文", "辉", "建", "永", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山",
                "仁", "波", "宁", "贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建",
                "永", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山", "仁", "波", "宁",
                "贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建", "永", "强", "军",
                "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山", "仁", "波", "宁", "贵", "福", "生",
                "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建", "永", "强", "军", "平", "保", "东"));
    }
    /**
     * 生成一个随机的全名,由随机的姓和名组成。
     *
     * @return 随机全名
     */
    public static String generateRandomFullName() {
        Random random = new Random();
        String firstName = FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size()));
        String lastName = LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
        return firstName + lastName;
    }
}

2. 生成对象体

package com.wfg.flink.connector.dto;

import lombok.Data;

/**
 * @author wfg
 */
@Data
public class KafkaPvDto {
    // uuid
    private String uuid;
    // 用户名
    private String userName;
    // 访问时间
    private String visitTime;
    // 访问地址
    private String visitIp;
    // 访问服务IP
    private String visitServiceIp;
}

3. 批量数据生成推送代码

package com.wfg.flink.test.kafka;

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.connector.dto.KafkaPvDto;
import com.wfg.flink.connector.utils.RandomGeneratorUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.time.LocalDate;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import static com.wfg.flink.connector.constants.Constants.KAFKA_BROKERS;
import static com.wfg.flink.connector.constants.Constants.TEST_TOPIC_PV;

public class KafkaTestProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", KAFKA_BROKERS);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        try (Producer<String, String> producer = new KafkaProducer<>(props)){
            int times = 100000;
            for (int i = 0; i < times; i++) {
                System.out.println("Send No. :" + i );
                CompletableFuture.allOf(
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
                        CompletableFuture.runAsync(() -> sendKafkaMsg(producer))
                ).join();
                producer.flush();
            }
        }
    }
    private static void sendKafkaMsg (Producer < String, String > producer){
        String msg = createMsg();
        System.out.println(msg);
        producer.send(new ProducerRecord<>(TEST_TOPIC_PV, UUID.randomUUID().toString().replaceAll("-", ""), msg));
    }
    private static String createMsg () {
        KafkaPvDto dto = new KafkaPvDto();
        dto.setUuid(UUID.randomUUID().toString().replaceAll("-", ""));
        dto.setUserName(RandomGeneratorUtils.generateRandomFullName());
        dto.setVisitIp(RandomGeneratorUtils.generateRandomIp());
        DateTime begin = DateUtil.beginOfDay(new Date());
        String timeStr = DateUtil.format(RandomGeneratorUtils.generateRandomDateTime(LocalDateTimeUtil.of(begin).toLocalDate(), LocalDate.now()), "yyyy-MM-dd HH:mm:ss");
        dto.setVisitTime(timeStr);
        dto.setVisitServiceIp(RandomGeneratorUtils.generateRandomIp());
        return JSONObject.toJSONString(dto);
    }
}

注意:
kafka本机运行请参考: kafka本地部署链接

标签:sendKafkaMsg,String,producer,千万级,kafka,CompletableFuture,import,推送,runAsync
From: https://blog.csdn.net/mqiqe/article/details/139781555

相关文章

  • Flink1.17.0-报错: java.lang.NoSuchMethodError: org.apache.kafka.clients.admin.De
    背景:启动Flink的sql-client.sh,创建Kafka的source端表,然后查询Kafka的数据时报错。报错信息:2024-06-1816:10:12org.apache.flink.util.FlinkException:GlobalfailuretriggeredbyOperatorCoordinatorfor'Source:kafka_rmc_cust_analog_u[1]'(operatorbc764cd8ddf7a0c......
  • 日志监测与文件句柄数监控推送脚本
    点击查看代码#!/bin/bashecho`date`#获取最新的错误计数new_error_count_8080=$(grep"Toomanyopenfiles"/var/log/router/8080/error.log|wc-l)new_error_count_8181=$(grep"Toomanyopenfiles"/var/log/router/8181/error.log|wc-l)......
  • 17.零代码八爪鱼采集器数据采集与数据导出——如何导出不同格式数据和数据推送到数据
    首先,多数情况下免费版本的功能,已经可以满足绝大多数采集需求,想了解八爪鱼采集器版本区别的详情,请访问这篇帖子: 3.无代码爬虫八爪鱼采集器工具介绍——个人版本、团队版本的适用性_八爪鱼采集器有单机版本吗-CSDN博客免费版八爪鱼采集器下载​​https://affiliate.bazhuayu.c......
  • kafka事务流程
    流程kafka事务使用的5个API//1.初始化事务voidinitTransactions();//2.开启事务voidbeginTransaction()throwsProducerFencedException;//3.在事务内提交已经消费的偏移量(主要用于消费者)voidsendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata>......
  • debezium+kafka实现sqlserver数据同步(debezium-connector-sqlserver)
    SELECTCASEWHENdss.[status]=4THEN1ELSE0ENDASisRunningFROM[#db].sys.dm_server_servicesdssWHEREdss.[servicename]LIKEN'SQLServerAgent(%'1.情景展示在企业当中,往往会存在不同数据库之间的表的数据需要保持一致的情况(数据同步)。如何将A库a表的数据......
  • kafka常用命令(详细)
    目录一、KAFKA启停命令1.前台启动2.后台启动3.停止命令二、Topic 相关命令2.1.创建Topic2.2.查询Topic列表2.3.查询Topic详情2.4.增加Topic的partition数2.5.查看topic指定分区offset的最大值或最小值2.6.删除Topic三、消息相关命令3.1.......
  • 使用SpringBoot对接Kafka
    Kafka是什么,以及如何使用SpringBoot对接Kafka一、Kafka与流处理我们先来看看比较正式的介绍:Kafka是一种流处理平台,由LinkedIn公司创建,现在是Apache下的开源项目。Kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等优点,使其成为......
  • (高清pdf集合)图灵程序设计丛书:大规模数据处理入门与实战(套装全10册)【图灵出品!一套囊括S
    书:pan.baidu.com/s/1tIHXj9HmIYojAHqje09DTA?pwd=jqso提取码:jqso数据处理基础:介绍数据处理的基本概念、流程和应用场景,帮助读者建立对数据处理的整体认识。SQL语言与应用:详细讲解SQL的语法和用法,包括数据查询、数据操作和数据定义等,以及在实际应用中的最佳实践。Python数据挖......
  • 【Kafka专栏 05】一条消息的完整生命周期:Kafka如何保证消息的顺序消费
    作者名称:夏之以寒作者简介:专注于Java和大数据领域,致力于探索技术的边界,分享前沿的实践和洞见文章专栏:夏之以寒-kafka专栏专栏介绍:本专栏旨在以浅显易懂的方式介绍Kafka的基本概念、核心组件和使用场景,一步步构建起消息队列和流处理的知识体系,无论是对分布式系统感兴趣,还......
  • Spring (58)什么是Spring Kafka
    SpringKafka是一个基于Spring框架的项目,它提供了对ApacheKafka的集成支持。Kafka是一个分布式流媒体平台,专门用于构建实时数据管道和流应用程序。SpringKafka提供了一种简单的抽象来发送和接收消息,使得与Kafka交云进行通讯变得容易。核心概念SpringKafka主......