首页 > 数据库 >flinksql实时数仓开发

flinksql实时数仓开发

时间:2022-09-21 19:22:58浏览次数:84  
标签:数仓 provided -- flinksql flink 实时 version apache org

pom文件

<groupId>com.ssi</groupId>
<artifactId>datalake</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<name>DataLake</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.1</flink.version>
<flink.binary.version>1.15</flink.binary.version>
<iceberg.version>0.14.0</iceberg.version>
<flink.shaded.guava.version>30.1.1-jre-15.0</flink.shaded.guava.version>
<flink.cdc.version>2.2.1</flink.cdc.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.7</scala.version>
<log4j.version>2.18.0</log4j.version>
<hadoop.version>3.0.0-cdh6.3.0</hadoop.version>
</properties>

<modules>
<module>common</module>
<module>sz</module>
<module>compass</module>
<module>ne</module>
</modules>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.12</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.json4s/json4s-jackson -->
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
<version>4.0.5</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
<scope>provided</scope>
</dependency>

<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-2.2.0_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>


<!--iceberg -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-${flink.binary.version}</artifactId>
<version>${iceberg.version}</version>
<scope>provided</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-guava -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>${flink.shaded.guava.version}</version>
<scope>provided</scope>
</dependency>

<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-orc</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-orc</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.3</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<version>0.9.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
<version>3.5.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1-cdh6.3.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>

<!-- hadoop-->
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client-->
<!-- <dependency>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <artifactId>hadoop-hdfs-client</artifactId>-->
<!-- <version>${hadoop.version}</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-jobclient-->
<!-- <dependency>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <artifactId>hadoop-mapreduce-client-jobclient</artifactId>-->
<!-- <version>${hadoop.version}</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->

<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <artifactId>hadoop-hdfs</artifactId>-->
<!-- <version>${hadoop.version}</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <artifactId>hadoop-common</artifactId>-->
<!-- <version>${hadoop.version}</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
</dependencies>

<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<build>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.example.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>

<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<args>
<arg>-nobootcp</arg>
<arg>-target:jvm-${target.java.version}</arg>
</args>
</configuration>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
流程:
从kafka接收数据,建临时表,可以指定catalog,该次默认为default_catalog.default.database.table
1、从kafka实时接入
CREATE TABLE  表名(列名    类型 comment '') with  'connector' = 'kafka',
'topic' = '指定source topic',
'properties.bootstrap.servers' = 'host:port,host:port,host:port',
                                          'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
)
如上表数据已经接入。其他数据源可仿造来写。中间分析过程基本都用sql完成,
注意如果是cdc数据接入 with中的format要换成
debezium-json
2、写入kafka
CREATE TABLE  dws_sz_to_kafka_table (
列名 数据类型
PRIMARY KEY (sz_dws_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'topic名称',
'properties.bootstrap.servers' = 'host:port,host:port,host:port,
    'properties.group.id' = '消费组',
'key.format' = 'json',
'value.format' = 'json'
)
上面是upsert写入kafka主要他的connector是upsert-kafka 不是一般的kafka,后面的format也是key value 都要format,具体可查看如下官网。
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/upsert-kafka/
3、
cast(PROCTIME() as string) as dwd_time
PROCTIME()这个函数是flinksql自带的时间处理时间。



标签:数仓,provided,--,flinksql,flink,实时,version,apache,org
From: https://www.cnblogs.com/shuaidong/p/16716850.html

相关文章