首页 > 其他分享 >Flink 1.17教程:输出算子之输出到文件

Flink 1.17教程:输出算子之输出到文件

时间:2023-09-10 10:02:47浏览次数:46  
标签:输出 编码 1.17 Flink 文件 env FileSink new


输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:
• 行编码: FileSink.forRowFormat(basePath,rowEncoder)
• 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)
示例:

public class SinkFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每个目录中,都有 并行度个数的 文件在写入
        env.setParallelism(2);

        // 必须开启checkpoint,否则一直都是 .inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(1000),
                Types.STRING
        );

        DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");

        // 输出到文件系统
        FileSink<String> fieSink = FileSink
                // 输出行式存储的文件,指定路径、指定编码
                .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                // 输出文件的一些配置: 文件名的前缀、后缀
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("atguigu-")
                                .withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶:如下,就是每个小时一个目录
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略:  1分钟 或 1m
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024*1024))
                                .build()
                )
                .build();


        dataGen.sinkTo(fieSink);

        env.execute();
    }
}


标签:输出,编码,1.17,Flink,文件,env,FileSink,new
From: https://blog.51cto.com/zhangxueliang/7423607

相关文章

  • flink kerberos认证源码剖析
    文章目录01引言02flink的安全机制03源码流程分析3.1程序入口3.2安全模块安装3.3模块安装源码04文末01引言官方的文档:https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-kerberos/我们都知道,如果某个大数据组件(如:hadoop、flink等)......
  • FMC DA 子卡 14bit 2.5GS/s 双通道输出
    概要QT7225是一款具有2通道输出的转换速率最高为2.5GSPS的DAC回放板,DAC位数14bit;板卡基于ADI的DAC芯片AD9739BBC和时钟芯片AD9516设计;板卡支持3路触发输出/输入通道;DAC的时钟支持内部参考时钟、外部参考时钟、外部采样时钟三种方式;QT7225设计了风冷和导冷版本,能够满足用户在......
  • Java语言怎么把输出的字符串用绿色来输出
    importjava.util.Scanner;publicclassGreenConsoleOutput{publicstaticvoidmain(String[]args){//创建一个Scanner对象用于接收用户输入Scannerscanner=newScanner(System.in);//提示用户输入要输出的文本System.out......
  • python实现输入一个字符串,输出第m个只出现过n次的字符
    功能需求输入一个字符串str,输出第m个只出现过n次的字符功能分析1:定义一个函数,函数传入三个参数,分别是输入的字符串、第m个、n次。2:统计每个字符在字符串中出现的次数,然后按照出现次数进行排序。3:找到第m个只出现n次的字符并输出。程序实现deffind_char(str,m,n):#统......
  • python 格式输出
    格式化输出目录格式化输出1使用"%"1.1格式符1.2字符串输出(%s)1.3浮点数输出(%f)2使用format2.1位置匹配2.2格式转换2.3高阶用法python格式有两种方法:"%"和format1使用"%"1.1格式符格式符描述%s字符串(采用str()的显示)%r字符串(采用repr()的显示......
  • (J-Link)HC32F460JETA SEGGER RTT打印输入输出调试信息
    完美解决https://blog.csdn.net/qq_40675506/article/details/127005532起初最后输出部分费了好大劲在填(setRTTAddr)的时候,找地址很不容易。 不过之后很长一段时间了,直接勾选的auto就直接可以了。很神奇 ......
  • MLPClassifier 隐藏层不包括输入和输出
    多层感知机(MLP)原理简介多层感知机(MLP,MultilayerPerceptron)也叫人工神经网络(ANN,ArtificialNeuralNetwork),除了输入输出层,它中间可以有多个隐层,最简单的MLP只含一个隐层,即三层的结构,如下图: 从上图可以看到,多层感知机层与层之间是全连接的(全连接的意思就是:上一层的任何一个神经元与......
  • stm32 pwm输出比较模式,和pwm输出模式的区别
    输出比较模式     STM32定时器输出比较模式是STM32定时器的一种工作模式,它可以通过改变ARR(自动重装载寄存器)和CCR(捕获比较寄存器)寄存器的值来控制输出的占空比,从而控制输出的电平。在输出比较模式下,CCR寄存器的值是固定的,而占空比是通过改变ARR寄存器(自动重装载值)......
  • 输入输出(io)控制方式
        ......
  • 使用ABAP输出:Hello World!
    WRITE:'HelloWorld!'.  ......