JavaFlink系列之一:Maven程序搭建及Java入门案例多种写法
一、Flink项目依赖配置
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://maven.apache.org/POM/4.0.0"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.hainiu</groupId>
- <artifactId>hainiuflink</artifactId>
- <version>1.0</version>
-
- <properties>
- <java.version>1.8</java.version>
- <scala.version>2.11</scala.version>
- <flink.version>1.9.3</flink.version>
- <parquet.version>1.10.0</parquet.version>
- <hadoop.version>2.7.3</hadoop.version>
- <fastjson.version>1.2.72</fastjson.version>
- <redis.version>2.9.0</redis.version>
- <mysql.version>5.1.35</mysql.version>
- <log4j.version>1.2.17</log4j.version>
- <slf4j.version>1.7.7</slf4j.version>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.build.scope>compile</project.build.scope>
- <!-- <project.build.scope>provided</project.build.scope>-->
- <mainClass>com.hainiu.Driver</mainClass>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
-
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>${log4j.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- flink的hadoop兼容 -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- flink的hadoop兼容 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-hadoop-compatibility_${scala.version}</artifactId>
- <version>${flink.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- flink的java的api -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- flink streaming的java的api -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.version}</artifactId>
- <version>${flink.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- flink的scala的api -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_${scala.version}</artifactId>
- <version>${flink.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- flink streaming的scala的api -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_${scala.version}</artifactId>
- <version>${flink.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- flink运行时的webUI -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime-web_${scala.version}</artifactId>
- <version>${flink.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- 使用rocksdb保存flink的state -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
- <version>${flink.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- flink操作hbase -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-hbase_${scala.version}</artifactId>
- <version>${flink.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- flink操作es -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-elasticsearch5_${scala.version}</artifactId>
- <version>${flink.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- flink 的kafka -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.10_${scala.version}</artifactId>
- <version>${flink.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- flink 写文件到HDFS -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-filesystem_${scala.version}</artifactId>
- <version>${flink.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- mysql连接驱动 -->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>${mysql.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- redis连接 -->
- <dependency>
- <groupId>redis.clients</groupId>
- <artifactId>jedis</artifactId>
- <version>${redis.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- flink操作parquet文件格式 -->
- <dependency>
- <groupId>org.apache.parquet</groupId>
- <artifactId>parquet-avro</artifactId>
- <version>${parquet.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.parquet</groupId>
- <artifactId>parquet-hadoop</artifactId>
- <version>${parquet.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-parquet_${scala.version}</artifactId>
- <version>${flink.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <!-- json操作 -->
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>${fastjson.version}</version>
- <scope>${project.build.scope}</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.0.0</version>
- </dependency>
- </dependencies>
-
- <build>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptors>
- <descriptor>src/assembly/assembly.xml</descriptor>
- </descriptors>
- <archive>
- <manifest>
- <mainClass>${mainClass}</mainClass>
- </manifest>
- </archive>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.12</version>
- <configuration>
- <skip>true</skip>
- <forkMode>once</forkMode>
- <excludes>
- <exclude>**/**</exclude>
- </excludes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
- <encoding>${project.build.sourceEncoding}</encoding>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- </project>
二、JavaFlink案例
- package com.linwj.flink;
-
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.functions.RichFlatMapFunction;
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.api.java.tuple.Tuple;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.api.java.tuple.Tuple3;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
- import org.apache.flink.streaming.api.functions.ProcessFunction;
- import org.apache.flink.util.Collector;
-
- import java.util.Arrays;
-
- public class test {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment scc = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
- DataStreamSource<String> socket = scc.socketTextStream("localhost", 6666); // nc -lk 6666
-
- // // 1.lambda写法
- // SingleOutputStreamOperator<String> t1_flatmap = socket.flatMap((String a, Collector<String> out) -> { // 传入一个收集工具 Collector<String> out,更多案例见https://vimsky.com/examples/detail/java-method-org.apache.flink.util.Collector.collect.html
- // Arrays.stream(a.split(" ")).forEach((x1) -> {
- // out.collect(x1);
- // });
- // }).returns(Types.STRING);
- t1_flatmap.print();
- //
- // SingleOutputStreamOperator<Tuple3<String, Integer, String>> t2_map =
- // t1_flatmap.map((x1) -> Tuple3.of(x1, 1, "gg")).returns(Types.TUPLE(Types.STRING,Types.INT,Types.STRING));
- t2_map.print();
- //
- // SingleOutputStreamOperator<Tuple3<String, Integer, String>> t3_keyBy_sum =
- // t2_map.keyBy(0).sum(1); //没有->lambda表达式无需再声明返回类型
- // t3_keyBy_sum.print();
- //
- // scc.execute();
- //
- // // 2.function写法
- // /*
- // 1)匿名内部类的格式: new 父类名&接口名(){ 定义子类成员或者覆盖父类方法 }.方法。而内部类是有自己定的类名的。
- // 2) FlatMapFunction<String,String>如果不写泛型会编译报错:Class 'Anonymous class derived from FlatMapFunction' must either be declared abstract or implement abstract method 'flatMap(T, Collector<O>)' in 'FlatMapFunction'
- // 保持<String,String>对应实现接口或类的泛型
- // */
- // SingleOutputStreamOperator<String> t1_flatmap = socket.flatMap(new FlatMapFunction<String,String>() {
- // @Override
- // public void flatMap(String value, Collector<String> out) throws Exception {
- // String[] s = value.split(" ");
- // for (String ss : s) {
- // out.collect(ss);
- // }
- // }
- // });
- t1_flatmap.print();
- //
- // SingleOutputStreamOperator<Tuple2<String, Integer>> t2_map = t1_flatmap.map(new MapFunction<String, Tuple2<String, Integer>>() {
- // @Override
- // public Tuple2<String, Integer> map(String s) throws Exception {
- // return Tuple2.of(s, 1);
- // }
- // });
- t2_map.print();
- //
- // SingleOutputStreamOperator<Tuple2<String, Integer>> t3_keyBy_sum = t2_map.keyBy(0).sum(1);
- // t3_keyBy_sum.print();
-
- // // 3.function组合写法
- // SingleOutputStreamOperator<Tuple2<String, Integer>> flatmap = socket.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
- //
- // @Override
- // public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
- // String[] s = value.split(" ");
- // for (String ss : s) {
- // out.collect(Tuple2.of(ss, 1));
- // }
- // }
- // });
- //
- // SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatmap.keyBy(0).sum(1);
- // sum.print();
-
- // // 4.richfunction组合写法
- // SingleOutputStreamOperator<Tuple2<String, Integer>> flatmap = socket.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
- // private String name = null;
- //
- // @Override
- // public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
- // String[] s = value.split(" ");
- // for (String ss : s) {
- // System.out.println(getRuntimeContext().getIndexOfThisSubtask()); //RichMapFunction富函数,额外提供了获取运行时上下文的方法 getRuntimeContext(),可以拿到状态,还有并行度、任务名称之类的运行时信息。
- // out.collect(Tuple2.of(name + ss, 1));
- // }
- //
- // }
- //
- // @Override
- // public void open(Configuration parameters) {
- // name = "linwj_";
- // }
- //
- // @Override
- // public void close() {
- // name = null;
- // }
- //
- // });
- // SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatmap.keyBy(0).sum(1);
- // sum.print();
-
- //5.processfunction组合写法
- SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socket.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
-
- private String name = null;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- name = "linwj";
- }
-
- @Override
- public void close() throws Exception {
- name = null;
- }
-
- @Override
- public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
- // getRuntimeContext()
- String[] s = value.split(" ");
- for (String ss : s) {
- System.out.println(getRuntimeContext().getIndexOfThisSubtask());
- out.collect(Tuple2.of(name + ss, 1));
- }
- }
- }).keyBy(0).process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
- private Integer num = 0;
-
- @Override
- public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
- num += value.f1;
- out.collect(Tuple2.of(value.f0,num));
- }
- });
- sum.print();
-
- scc.execute();
-
-
-
- }
- }
原文链接:https://blog.csdn.net/qq_44000055/article/details/127424229
标签:project,Java,JavaFlink,flink,Maven,version,build,apache,org
From: https://www.cnblogs.com/sunny3158/p/18033613