package com.huft.flk117.test;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
//使用Flink进行MD5加密
//1. 读取源文件 2.MD5加密 3.加密内容写入目标文件
public class Md5Demo {
public static void main(String[] args) throws Exception {
// String password = "15057630319";
// String encryptedPassword = md5(password);
// System.out.println("MD5 Encrypted Password: " + encryptedPassword);
t();
}
public static void t() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// env.getConfig().setDeploymentOption(ExecutionConfig.DeploymentOptions.BATCH);
FileSource<String> fileSource = FileSource
.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("D:/temp/p.txt")
)
.build();
DataStreamSource<String> ds = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource");
SingleOutputStreamOperator<String> map = ds.map(s -> md5(s));
FileSink<String> sink = FileSink
.forRowFormat(new Path("D:/temp/p1.txt"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(1024))
.build())
.build();
map.sinkTo(sink);
//map.print();
env.execute();
}
public static String toHexString(byte[] bytes) {
StringBuilder hexString = new StringBuilder();
for (byte b : bytes) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
return hexString.toString();
}
public static String md5(String input) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] messageDigest = md.digest(input.getBytes());
return toHexString(messageDigest);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
}
标签:Flink,加密,String,flink,new,org,apache,import,MD5 From: https://www.cnblogs.com/huft/p/18205790