首页 > 其他分享 >Flink读取Kafka数据下沉到HDFS

Flink读取Kafka数据下沉到HDFS

时间:2022-09-24 12:58:11浏览次数:56  
标签:HDFS 文件 Flink prop hadoopSink env setProperty new Kafka

1:采用BucketingSink的方式

public class BucketingSinkDemo {
    public static void main(String[] args) throws Exception {
    
        long rolloverInterval = 2 * 60 * 1000;
        long batchSize = 1024 * 1024 * 100;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        String topic = "ods_lark_order";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","ip:port");
        prop.setProperty("group.id","groupid");
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
        kafkaConsumer.setStartFromGroupOffsets();//默认消费策略
        DataStreamSource<String> source = env.addSource(kafkaConsumer);
        //
        BucketingSink<String> hadoopSink = new BucketingSink<>("hdfs://ip:port/flink/order_sink");
        // HDFS的配置
        Configuration configuration = new Configuration();
        // 1.能够指定block的副本数
        configuration.set("dfs.replication","1");
        hadoopSink.setFSConfig(configuration);
        // 2.指定分区文件夹的命名
        hadoopSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HH", ZoneId.of("Asia/Shanghai")));
        // 3.指定块大小和时间间隔生成新的文件
        hadoopSink.setBatchSize(batchSize);
        hadoopSink.setBatchRolloverInterval(rolloverInterval);
        // 4.指定生成文件的前缀,后缀,正在运行文件前缀
        hadoopSink.setPendingPrefix("order_sink");
        hadoopSink.setPendingSuffix("");
        hadoopSink.setInProgressPrefix(".in");
        source.addSink(hadoopSink);
        env.execute();
    }
}
采用这种方式的好处: 1.能够指定block的副本数 2.指定分区文件夹的命名 3.指定块大小和时间间隔生成新的文件 4.指定生成文件的前缀,后缀,正在运行文件前缀 缺点: 该方法已经过期,新版建议采用StreamingFileSink,笔者第一次找到该类发现能够写入成功,但是没有找到如何能够对写入HDFS进行压缩,比如parquet或者orc   2:采用StreamingFileSink的方式-行编码【forRowFormat】
public class StreamingFileSinkForRowFormatDemo {
    public static void main(String[] args) throws Exception {

        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        String topic = "ods_lark_order";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","ip:port");
        prop.setProperty("group.id","first");

        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
        myConsumer.setStartFromGroupOffsets();//默认消费策略
        DataStreamSource<String> source = env.addSource(myConsumer);


        // 自定义滚动策略
        DefaultRollingPolicy<String, String> rollPolicy = DefaultRollingPolicy.builder()
                .withRolloverInterval(TimeUnit.MINUTES.toMillis(2))/*每隔多长时间生成一个文件*/
                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))/*默认60秒,未写入数据处于不活跃状态超时会滚动新文件*/
                .withMaxPartSize(128 * 1024 * 1024)/*设置每个文件的最大大小 ,默认是128M*/
                .build();
        // 输出文件的前、后缀配置
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("prefix")
                .withPartSuffix(".txt")
                .build();
        StreamingFileSink<String> streamingFileSink = StreamingFileSink
                .forRowFormat(new Path("hdfs://192.168.1.204:9000/flink/data/"),new SimpleStringEncoder<String>("UTF-8") )
                .withBucketAssigner(new DateTimeBucketAssigner<>())
                // 设置指定的滚动策略
                .withRollingPolicy(rollPolicy)
                // 桶检查间隔,这里设置为1s
                .withBucketCheckInterval(1)
                // 指定输出文件的前、后缀
                .withOutputFileConfig(config)
                .build();
        source.addSink(streamingFileSink);
        env.execute("StreamingFileSinkTest");
    }
}
采用这种方式的好处: 1.能够指定block的副本数 2.指定分区文件夹的命名 3.指定块大小和时间间隔生成新的文件 4.指定生成文件的前缀,后缀,正在运行文件前缀 缺点: 由于是按照行进行的,所以不能进行压缩   3:采用StreamingFileSink的方式-bucket压缩 【forBulkFormat】
public class StreamingFileSinkDemo {
    public static void main(String[] args) throws Exception {

        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // checkpoint配置
        env.enableCheckpointing(60000);
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        String topic = "ods_lark_order";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","ip:port");
        prop.setProperty("group.id","first");
        // 获取流
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
        myConsumer.setStartFromGroupOffsets();
        DataStreamSource<String> source = env.addSource(myConsumer);
        DataStream<Order> nameDS = source.map(new MapFunction<String, Order>() {
            @Override
            public Order map(String s) throws Exception {
                Order order = new Order();
                JSONObject jsonObject = JSONObject.parseObject(s);
                order.setName(jsonObject.getString("name"));
                return order;
            }
        });

        // 1.输出文件的前、后缀配置
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("prefix")
                .withPartSuffix(".txt")
                .build();
        // 设置为Parquet的压缩方式
        StreamingFileSink<Order> streamingFileSink = StreamingFileSink
                .forBulkFormat(new Path("hdfs://192.168.1.204:9000/flink/data/"), ParquetAvroWriters.forReflectRecord(Order.class))
                /*这里是采用默认的分桶策略DateTimeBucketAssigner,它基于时间的分配器,每小时产生一个桶,格式如下yyyy-MM-dd--HH*/
                .withBucketAssigner(new DateTimeBucketAssigner<>())
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .withOutputFileConfig(config)
                .build();
                
        nameDS.addSink(streamingFileSink);
        env.execute("StreamingFileSinkTest");
    }
}
采用这种方式的好处: 1.输出文件的前、后缀配置 2.设置为Parquet的压缩方式 缺点: 文件生成是通过checkpoint时候触发的,当checkpoint 过于频繁的话会生成很多的小文件,同时任务数过多,也会生成很多小文件,涉及到后续的小文件合并的情况

标签:HDFS,文件,Flink,prop,hadoopSink,env,setProperty,new,Kafka
From: https://www.cnblogs.com/glblog/p/16725392.html

相关文章

  • 我的 Kafka 旅程 - Linux下的安装 & 基础命令
    准备工作安装解压缩工具tar#检查是否安装了解压缩工具taryumlisttar#如未安装taryuminstalltar-y安装必备的java#检查是否安装了java-openjdk,这......
  • kafka
    1.生产者发送流程   外部数据发送到kafka集群,创建一个main线程,创建一个kafkaproducer对象发送,首先调用一个send方法,把这批数据用send方法发送。数据到拦截器,可以......
  • 大数据(Flink)—数据写入数据库篇
    Flink写入mysql的几种方式,废话不多说直接上代码:相关jar包<dependency><groupId>org.apache.flink</groupId><artifactId>flink-co......
  • HDFS基本架构与副本备份
    HDFS官方架构图,清晰明了主角色,要注意的是NameNode因为它的特性使得它是HDFS的唯一访问入口主角色辅助角色,要注意的是SecondaryNameNode不是NameNode的备份,而是它的"......
  • 一:kafka集群 基础、概念、安装了解
    官网下载:jdk-17_linux-x64_bin.tar.gz、apache-zookeeper-3.6.3-bin.tar.gz、kafka_2.13-3.1.0.tgz需知:ApacheKafka3.0.0正式发布:已弃用对Java8和Scala2.12的支......
  • Kafka Broker HA机制(高可用)
    名词解释要想说明白kafka的HA机制,我们必须先搞明白几个缩写名词,1、AR、ISR、OSRAR:AssignedReplicas,某分区的所有副本(这里所说的副本包括leader和follower)统称为AR。......
  • BigData——HDFS操作
    HDFSshell操作配置hadoop环境变量vi/etc/profileexportHADOOP_HOME=/usr/local/soft/hadoop-2.6.0exportPATH=.:\$JAVA_HOME/bin:\$HADOOP_HOME/bin:$PATH然后执......
  • 我眼中的大数据(二)——HDFS
    Hadoop的第一个产品是HDFS,可以说分布式文件存储是分布式计算的基础,也可见分布式文件存储的重要性。如果我们将大数据计算比作烹饪,那么数据就是食材,而Hadoop分布式文件系统H......
  • Kafka报错ERROR Shutdown broker because all log dirs
    Kafka报错ERRORShutdownbrokerbecausealllogdirsin...havefailed在使用Kafka时,删除了topic后出现问题:Kafka服务开始报错:ERRORShutdownbrokerbecauseall......
  • kafka之单节点多Broker部署及使用
    (1)单节点建立多Broker的集群多个server.properties文件表示多个Broker,一个server.properties文件对应一个Brokercpserver.propertiesserver-1.properties(复制一个......