1:采用BucketingSink的方式
public class BucketingSinkDemo { public static void main(String[] args) throws Exception { long rolloverInterval = 2 * 60 * 1000; long batchSize = 1024 * 1024 * 100; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); System.setProperty("HADOOP_USER_NAME", "hadoop"); String topic = "ods_lark_order"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers","ip:port"); prop.setProperty("group.id","groupid"); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop); kafkaConsumer.setStartFromGroupOffsets();//默认消费策略 DataStreamSource<String> source = env.addSource(kafkaConsumer); // BucketingSink<String> hadoopSink = new BucketingSink<>("hdfs://ip:port/flink/order_sink"); // HDFS的配置 Configuration configuration = new Configuration(); // 1.能够指定block的副本数 configuration.set("dfs.replication","1"); hadoopSink.setFSConfig(configuration); // 2.指定分区文件夹的命名 hadoopSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HH", ZoneId.of("Asia/Shanghai"))); // 3.指定块大小和时间间隔生成新的文件 hadoopSink.setBatchSize(batchSize); hadoopSink.setBatchRolloverInterval(rolloverInterval); // 4.指定生成文件的前缀,后缀,正在运行文件前缀 hadoopSink.setPendingPrefix("order_sink"); hadoopSink.setPendingSuffix(""); hadoopSink.setInProgressPrefix(".in"); source.addSink(hadoopSink); env.execute(); } }采用这种方式的好处: 1.能够指定block的副本数 2.指定分区文件夹的命名 3.指定块大小和时间间隔生成新的文件 4.指定生成文件的前缀,后缀,正在运行文件前缀 缺点: 该方法已经过期,新版建议采用StreamingFileSink,笔者第一次找到该类发现能够写入成功,但是没有找到如何能够对写入HDFS进行压缩,比如parquet或者orc 2:采用StreamingFileSink的方式-行编码【forRowFormat】
public class StreamingFileSinkForRowFormatDemo { public static void main(String[] args) throws Exception { //获取Flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); System.setProperty("HADOOP_USER_NAME", "hadoop"); String topic = "ods_lark_order"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers","ip:port"); prop.setProperty("group.id","first"); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop); myConsumer.setStartFromGroupOffsets();//默认消费策略 DataStreamSource<String> source = env.addSource(myConsumer); // 自定义滚动策略 DefaultRollingPolicy<String, String> rollPolicy = DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(2))/*每隔多长时间生成一个文件*/ .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))/*默认60秒,未写入数据处于不活跃状态超时会滚动新文件*/ .withMaxPartSize(128 * 1024 * 1024)/*设置每个文件的最大大小 ,默认是128M*/ .build(); // 输出文件的前、后缀配置 OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("prefix") .withPartSuffix(".txt") .build(); StreamingFileSink<String> streamingFileSink = StreamingFileSink .forRowFormat(new Path("hdfs://192.168.1.204:9000/flink/data/"),new SimpleStringEncoder<String>("UTF-8") ) .withBucketAssigner(new DateTimeBucketAssigner<>()) // 设置指定的滚动策略 .withRollingPolicy(rollPolicy) // 桶检查间隔,这里设置为1s .withBucketCheckInterval(1) // 指定输出文件的前、后缀 .withOutputFileConfig(config) .build(); source.addSink(streamingFileSink); env.execute("StreamingFileSinkTest"); } }采用这种方式的好处: 1.能够指定block的副本数 2.指定分区文件夹的命名 3.指定块大小和时间间隔生成新的文件 4.指定生成文件的前缀,后缀,正在运行文件前缀 缺点: 由于是按照行进行的,所以不能进行压缩 3:采用StreamingFileSink的方式-bucket压缩 【forBulkFormat】
public class StreamingFileSinkDemo { public static void main(String[] args) throws Exception { //获取Flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // checkpoint配置 env.enableCheckpointing(60000); System.setProperty("HADOOP_USER_NAME", "hadoop"); String topic = "ods_lark_order"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers","ip:port"); prop.setProperty("group.id","first"); // 获取流 FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop); myConsumer.setStartFromGroupOffsets(); DataStreamSource<String> source = env.addSource(myConsumer); DataStream<Order> nameDS = source.map(new MapFunction<String, Order>() { @Override public Order map(String s) throws Exception { Order order = new Order(); JSONObject jsonObject = JSONObject.parseObject(s); order.setName(jsonObject.getString("name")); return order; } }); // 1.输出文件的前、后缀配置 OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("prefix") .withPartSuffix(".txt") .build(); // 设置为Parquet的压缩方式 StreamingFileSink<Order> streamingFileSink = StreamingFileSink .forBulkFormat(new Path("hdfs://192.168.1.204:9000/flink/data/"), ParquetAvroWriters.forReflectRecord(Order.class)) /*这里是采用默认的分桶策略DateTimeBucketAssigner,它基于时间的分配器,每小时产生一个桶,格式如下yyyy-MM-dd--HH*/ .withBucketAssigner(new DateTimeBucketAssigner<>()) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(config) .build(); nameDS.addSink(streamingFileSink); env.execute("StreamingFileSinkTest"); } }采用这种方式的好处: 1.输出文件的前、后缀配置 2.设置为Parquet的压缩方式 缺点: 文件生成是通过checkpoint时候触发的,当checkpoint 过于频繁的话会生成很多的小文件,同时任务数过多,也会生成很多小文件,涉及到后续的小文件合并的情况 标签:HDFS,文件,Flink,prop,hadoopSink,env,setProperty,new,Kafka From: https://www.cnblogs.com/glblog/p/16725392.html