1、批量规则生成代码
1、随机IP生成代码
2、指定时间范围内随机日期生成代码
3、随机中文名生成代码。
package com.wfg.flink.connector.utils;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
/**
* @author wfg
*/
public class RandomGeneratorUtils {
/**
* 生成随机的IP地址
* @return ip
*/
public static String generateRandomIp() {
Random random = new Random();
// 生成IP地址的四个部分 0-255
int part1 = random.nextInt(256);
int part2 = random.nextInt(256);
int part3 = random.nextInt(256);
int part4 = random.nextInt(256);
// 将每个部分转换为字符串,并用点连接
return part1 + "." + part2 + "." + part3 + "." + part4;
}
/**
* 在指定的开始和结束日期之间生成一个随机的日期时间。
*
* @param startDate 开始日期
* @param endDate 结束日期
* @return 随机生成的日期时间
*/
public static LocalDateTime generateRandomDateTime(LocalDate startDate, LocalDate endDate) {
long startEpochDay = startDate.toEpochDay();
long endEpochDay = endDate.toEpochDay();
Random random = new Random();
LocalDate randomDate = startDate;
if (startEpochDay != endEpochDay) {
long randomDay = startEpochDay + random.nextLong(endEpochDay - startEpochDay);
randomDate = LocalDate.ofEpochDay(randomDay);
}
LocalTime randomTime = LocalTime.of(
// 小时
random.nextInt(24),
// 分钟
random.nextInt(60),
// 秒
random.nextInt(60)
);
return LocalDateTime.of(randomDate, randomTime);
}
private static final List<String> FIRST_NAMES = new ArrayList<>();
private static final List<String> LAST_NAMES = new ArrayList<>();
static {
// 初始化一些常见的名字和姓氏
FIRST_NAMES.addAll(Arrays.asList("王","李","赵","刘","张", "钱", "孙", "周", "吴", "郑", "王", "冯", "陈", "褚", "卫", "蒋", "沈", "韩", "杨", "朱", "秦", "尤", "许", "何", "吕", "施",
"张", "孔", "曹", "严", "华", "金", "魏", "陶", "姜", "戚", "谢", "邹", "喻", "柏", "水", "窦", "章", "云", "苏", "潘", "葛", "奚", "范", "彭", "郎",
"鲁","韦", "昌", "马", "苗", "凤", " 刚", "文", "张", "桂", "富", "生", "龙", "功", "夫", "周", "建", "余", "冲", "程", "沙", "阳", "江", "潘"));
LAST_NAMES.addAll(Arrays.asList("伟","芳","丽","强","明","风","阳","海","天","藏","霞","刚", "夫", "勇", "毅", "俊", "峰", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义",
"义", "兴", "良", "海", "山", "仁", "波", "宁", "贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春",
"明", "文", "辉", "建", "永", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山",
"仁", "波", "宁", "贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建",
"永", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山", "仁", "波", "宁",
"贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建", "永", "强", "军",
"平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山", "仁", "波", "宁", "贵", "福", "生",
"龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建", "永", "强", "军", "平", "保", "东"));
}
/**
* 生成一个随机的全名,由随机的姓和名组成。
*
* @return 随机全名
*/
public static String generateRandomFullName() {
Random random = new Random();
String firstName = FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size()));
String lastName = LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));
return firstName + lastName;
}
}
2. 生成对象体
package com.wfg.flink.connector.dto;
import lombok.Data;
/**
* @author wfg
*/
@Data
public class KafkaPvDto {
// uuid
private String uuid;
// 用户名
private String userName;
// 访问时间
private String visitTime;
// 访问地址
private String visitIp;
// 访问服务IP
private String visitServiceIp;
}
3. 批量数据生成推送代码
package com.wfg.flink.test.kafka;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.connector.dto.KafkaPvDto;
import com.wfg.flink.connector.utils.RandomGeneratorUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.time.LocalDate;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import static com.wfg.flink.connector.constants.Constants.KAFKA_BROKERS;
import static com.wfg.flink.connector.constants.Constants.TEST_TOPIC_PV;
public class KafkaTestProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_BROKERS);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (Producer<String, String> producer = new KafkaProducer<>(props)){
int times = 100000;
for (int i = 0; i < times; i++) {
System.out.println("Send No. :" + i );
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),
CompletableFuture.runAsync(() -> sendKafkaMsg(producer))
).join();
producer.flush();
}
}
}
private static void sendKafkaMsg (Producer < String, String > producer){
String msg = createMsg();
System.out.println(msg);
producer.send(new ProducerRecord<>(TEST_TOPIC_PV, UUID.randomUUID().toString().replaceAll("-", ""), msg));
}
private static String createMsg () {
KafkaPvDto dto = new KafkaPvDto();
dto.setUuid(UUID.randomUUID().toString().replaceAll("-", ""));
dto.setUserName(RandomGeneratorUtils.generateRandomFullName());
dto.setVisitIp(RandomGeneratorUtils.generateRandomIp());
DateTime begin = DateUtil.beginOfDay(new Date());
String timeStr = DateUtil.format(RandomGeneratorUtils.generateRandomDateTime(LocalDateTimeUtil.of(begin).toLocalDate(), LocalDate.now()), "yyyy-MM-dd HH:mm:ss");
dto.setVisitTime(timeStr);
dto.setVisitServiceIp(RandomGeneratorUtils.generateRandomIp());
return JSONObject.toJSONString(dto);
}
}
标签:sendKafkaMsg,String,producer,千万级,kafka,CompletableFuture,import,推送,runAsync From: https://blog.csdn.net/mqiqe/article/details/139781555注意:
kafka本机运行请参考: kafka本地部署链接