首页 > 其他分享 >Flink 生成ParquetFile

Flink 生成ParquetFile

时间:2023-04-08 11:55:20浏览次数:46  
标签:Flink ParquetFile flink avro hadoop 生成 version apache org

前言

这周主要是学习使用Flink, 其中有一部分学习的内容就是生成parquet。 Flink自身提供的文档写了个大概,但是真要自己动手去生成pqrquet文件,发现还是有些小坑,本文就是记录这些坑。

开始

官方文档总是最好的开始的地方, 下面是官方文档上面的内容
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/#file-sink
image
从官方文档上面看,似乎很简单, 使用FileSink, 然后设置下格式使用AvroParquetWriters就可以了。
但是按照这个设置后,连FileSink这个类都找不到。
FilkSink需要这个dependency,

org.apache.flink
flink-connector-files
${flink.version}

AvroParquetWriters需要的是这个dependency

org.apache.flink
flink-parquet
${flink.version}
provided

使用AVRO

官方文档中使用了AvroParquetWriters, 那我们就先定义一个AVRO的schema文件MarketPrice.avsc,然后生成对应的类,

{
  "namespace": "com.ken.parquet",
  "type": "record",
  "name": "MarketPrice",
  "fields": [
    {"name":"performance_id", "type":"string"},
    {"name":"price_as_of_date", "type":"int", "logicalType": "date"},
    {"name":"open_price", "type": ["null", "double"], "default": null},
    {"name":"high_price", "type": ["null", "double"], "default": null},
    {"name":"low_price", "type": ["null", "double"], "default": null},
    {"name":"close_price", "type": ["null", "double"], "default": null}
  ]
}

然后加上Maven插件, 通过这个文件来生成Java类

            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

添加好后,我们使用maven, compile的时候会生成对应的Java类。

编写代码

我们这里不从外部读取了,直接用env.fromCollection, 然后输出到本地文件系统中


@Component
public class ParquetRunner implements ApplicationRunner {
    @Override
    public void run(ApplicationArguments args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        List<MarketPrice> marketPriceList = new ArrayList<>();
        MarketPrice marketPrice = new MarketPrice();
        marketPrice.setPerformanceId("123456789");
        marketPrice.setPriceAsOfDate(100);
        marketPrice.setOpenPrice(100d);
        marketPrice.setHighPrice(120d);
        marketPrice.setLowPrice(99d);
        marketPrice.setClosePrice(101.1d);
        marketPriceList.add(marketPrice);

        DataStream<MarketPrice>  marketPriceDataStream = env.fromCollection(marketPriceList);

        String localPath = "C:\\temp\\flink\\";

        File outputParquetFile = new File(localPath);

        String localURI = outputParquetFile.toURI().toString();
        Path outputPath = new Path(localURI);

        final FileSink<MarketPrice> sink = FileSink
                .forBulkFormat(outputPath, AvroParquetWriters.forSpecificRecord(MarketPrice.class))
                .build();

        marketPriceDataStream.sinkTo(sink);

        marketPriceDataStream.print();

        env.execute();

    }
}

代码很简单,就是初始化DataStream, 然后Sink到本地。
运行程序报错
“Caused by: java.lang.RuntimeException: Could not load the AvroTypeInfo class. You may be missing the 'flink-avro' dependency”
添加dependency

org.apache.flink
flink-avro
${flink.version}
provided

继续运行,继续报错
“Caused by: java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter”
查找了一番,添加这个dependency

org.apache.parquet
parquet-avro
1.12.3

继续运行, 继续报错
“Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration”
看起来还需要hadoop的东西,这里可以添加

org.apache.hadoop
hadoop-core

正好我们后面需要生成到S3,我找到了这个

org.apache.flink
flink-s3-fs-hadoop
${flink.version}

这样可以不用上面hadoop-core了,
继续运行,继续报错
“Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/mapreduce/lib/output/FileOutputFormat”
加上这个dependency

org.apache.hadoop
hadoop-mapreduce-client-core
3.3.5

运行,成功,生成了parquet file, 如下图
image

如果要生成到AWS的S3上面去,只需要把Path换下, 很简单。 当然你需要有AWS的权限,我这里直接通过IDEA启动Environment variables里面加上AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_SESSION_TOKEN。

        String s3Path = "s3a://yourbucket/yourkey/";
        Path outputPath = new Path(s3Path);
		final FileSink<MarketPrice> sink = FileSink
                .forBulkFormat(outputPath, AvroParquetWriters.forSpecificRecord(MarketPrice.class))
                .build();

总结

这些dependency的依赖,你要是缺少了,运行起来就会缺东少西,然后花时间去找,还蛮废时间的。官方文档往往又没有那么细,所以算是一些小小的坑,好在都解决了,顺利的用Flink生成了Parquet file, 比较完成的POM文件列在这里

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.12.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-s3-fs-hadoop</artifactId>
            <version>${flink.version}</version>
        </dependency>

标签:Flink,ParquetFile,flink,avro,hadoop,生成,version,apache,org
From: https://www.cnblogs.com/dk168/p/17298265.html

相关文章

  • 2023-04-07 无向有权图之最小生成树问题
    无向有权图之最小生成树问题前10章我们讲解地都是无向无权图,本章我们将讲解无向有权图,以及无向有权图的经典问题:最小生成树问题(MST:MinimumSpanningTree)1~2无向有权图的实现主要是用TreeMap代替了无向无权图的TreeSet本节用到的图上面的graph.txt对应的图如下:最......
  • JS生成随机颜色
    //传统写法functionrandomColor1(){varr=Math.floor(Math.random()*256),g=Math.floor(Math.random()*256),b=Math.floor(Math.random()*256);return`rgb(${r},${g},${b})`;}//取巧functionrand......
  • Python 之生成验证码
    一、代码importrandomfromioimportBytesIOfromPILimportImage,ImageDraw,ImageFont,ImageFilterclassCaptcha:def__init__(self,width,height,code_num=4,code_type=1,font_size=24,is_blur=True,font='Arial.ttf',x_......
  • 使用模板窗口生成测试数据
    1.准备工作*需要的环境1.Oralce、MySQL、PG等主流数据库2.HHDBCS7.6及以上版本*测试步骤1.建立两张表带有主外键关系2.使用模板窗口生成数据,主键表生成100条,外键表生成10000条3.校验数据生成情况2.建立两张表带有主外键关系--主键表createtabledept(d_id......
  • 详解 Flink Catalog 在 ChunJun 中的实践之路
    我们知道Flink有Table(表)、View(视图)、Function(函数/算子)、Database(数据库)的概念,相对于这些耳熟能详的概念,Flink里还有一个Catalog(目录)的概念。本文将为大家带来FlinkCatalog的介绍以及FlinkCatalog在ChunJun中的实践之路。FlinkCatalog简介Catalog提供元数据,如数据......
  • PHP随机生成指定时间段的指定个数时间
    一、生成某个范围内的随机时间 /***生成某个范围内的随机时间*@param<type>$begintime起始时间格式为Y-m-dH:i:s*@param<type>$endtime结束时间格式为Y-m-dH:i:s*@param<type>$now是否是时间戳格式为Boolean*/function......
  • 详解 Flink Catalog 在 ChunJun 中的实践之路
    我们知道Flink有Table(表)、View(视图)、Function(函数/算子)、Database(数据库)的概念,相对于这些耳熟能详的概念,Flink里还有一个Catalog(目录)的概念。本文将为大家带来FlinkCatalog的介绍以及FlinkCatalog在ChunJun中的实践之路。FlinkCatalog简介Catalog提供元数据,如......
  • LightOJ - 1041 Road Construction(最小生成树)
    题目大意:给你N条边,看能否形成最小生成树,如果存在,输出值,不存在,另外输出解题思路:模版题#include<cstdio>#include<cstring>#include<algorithm>#include<vector>#include<map>#include<string>#include<iostream>usingnamespacestd;constintMAXNOD......
  • Java serialVersionUID 作用和自动生成设置
    JavaserialVersionUID作用和自动生成设置原文链接:https://blog.csdn.net/plqwf19880902/article/details/129103336一、由来最近在做一个军工的项目,代码提交后,军方用代码安全扫描工具,对代码进行全局扫描,提示一个漏洞,导致原因是实体类实现了Serializable接口,未对serialVer......
  • mybatis-plus 生成器
    依赖<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-generator</artifactId><version>3.4.1</version></dependency><dependency>......