首页 > 编程语言 >JavaFlink系列之一:Maven程序搭建及Java入门案例多种写法

JavaFlink系列之一:Maven程序搭建及Java入门案例多种写法

时间:2024-02-26 09:11:44浏览次数:28  
标签:project Java JavaFlink flink Maven version build apache org

JavaFlink系列之一:Maven程序搭建及Java入门案例多种写法

 一、Flink项目依赖配置

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns="http://maven.apache.org/POM/4.0.0"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.hainiu</groupId>
  7. <artifactId>hainiuflink</artifactId>
  8. <version>1.0</version>
  9. <properties>
  10. <java.version>1.8</java.version>
  11. <scala.version>2.11</scala.version>
  12. <flink.version>1.9.3</flink.version>
  13. <parquet.version>1.10.0</parquet.version>
  14. <hadoop.version>2.7.3</hadoop.version>
  15. <fastjson.version>1.2.72</fastjson.version>
  16. <redis.version>2.9.0</redis.version>
  17. <mysql.version>5.1.35</mysql.version>
  18. <log4j.version>1.2.17</log4j.version>
  19. <slf4j.version>1.7.7</slf4j.version>
  20. <maven.compiler.source>1.8</maven.compiler.source>
  21. <maven.compiler.target>1.8</maven.compiler.target>
  22. <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
  23. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  24. <project.build.scope>compile</project.build.scope>
  25. <!-- <project.build.scope>provided</project.build.scope>-->
  26. <mainClass>com.hainiu.Driver</mainClass>
  27. </properties>
  28. <dependencies>
  29. <dependency>
  30. <groupId>org.slf4j</groupId>
  31. <artifactId>slf4j-log4j12</artifactId>
  32. <version>${slf4j.version}</version>
  33. <scope>${project.build.scope}</scope>
  34. </dependency>
  35. <dependency>
  36. <groupId>log4j</groupId>
  37. <artifactId>log4j</artifactId>
  38. <version>${log4j.version}</version>
  39. <scope>${project.build.scope}</scope>
  40. </dependency>
  41. <!-- flink的hadoop兼容 -->
  42. <dependency>
  43. <groupId>org.apache.hadoop</groupId>
  44. <artifactId>hadoop-client</artifactId>
  45. <version>${hadoop.version}</version>
  46. <scope>${project.build.scope}</scope>
  47. </dependency>
  48. <!-- flink的hadoop兼容 -->
  49. <dependency>
  50. <groupId>org.apache.flink</groupId>
  51. <artifactId>flink-hadoop-compatibility_${scala.version}</artifactId>
  52. <version>${flink.version}</version>
  53. <scope>${project.build.scope}</scope>
  54. </dependency>
  55. <!-- flink的java的api -->
  56. <dependency>
  57. <groupId>org.apache.flink</groupId>
  58. <artifactId>flink-java</artifactId>
  59. <version>${flink.version}</version>
  60. <scope>${project.build.scope}</scope>
  61. </dependency>
  62. <!-- flink streaming的java的api -->
  63. <dependency>
  64. <groupId>org.apache.flink</groupId>
  65. <artifactId>flink-streaming-java_${scala.version}</artifactId>
  66. <version>${flink.version}</version>
  67. <scope>${project.build.scope}</scope>
  68. </dependency>
  69. <!-- flink的scala的api -->
  70. <dependency>
  71. <groupId>org.apache.flink</groupId>
  72. <artifactId>flink-scala_${scala.version}</artifactId>
  73. <version>${flink.version}</version>
  74. <scope>${project.build.scope}</scope>
  75. </dependency>
  76. <!-- flink streaming的scala的api -->
  77. <dependency>
  78. <groupId>org.apache.flink</groupId>
  79. <artifactId>flink-streaming-scala_${scala.version}</artifactId>
  80. <version>${flink.version}</version>
  81. <scope>${project.build.scope}</scope>
  82. </dependency>
  83. <!-- flink运行时的webUI -->
  84. <dependency>
  85. <groupId>org.apache.flink</groupId>
  86. <artifactId>flink-runtime-web_${scala.version}</artifactId>
  87. <version>${flink.version}</version>
  88. <scope>${project.build.scope}</scope>
  89. </dependency>
  90. <!-- 使用rocksdb保存flink的state -->
  91. <dependency>
  92. <groupId>org.apache.flink</groupId>
  93. <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
  94. <version>${flink.version}</version>
  95. <scope>${project.build.scope}</scope>
  96. </dependency>
  97. <!-- flink操作hbase -->
  98. <dependency>
  99. <groupId>org.apache.flink</groupId>
  100. <artifactId>flink-hbase_${scala.version}</artifactId>
  101. <version>${flink.version}</version>
  102. <scope>${project.build.scope}</scope>
  103. </dependency>
  104. <!-- flink操作es -->
  105. <dependency>
  106. <groupId>org.apache.flink</groupId>
  107. <artifactId>flink-connector-elasticsearch5_${scala.version}</artifactId>
  108. <version>${flink.version}</version>
  109. <scope>${project.build.scope}</scope>
  110. </dependency>
  111. <!-- flink 的kafka -->
  112. <dependency>
  113. <groupId>org.apache.flink</groupId>
  114. <artifactId>flink-connector-kafka-0.10_${scala.version}</artifactId>
  115. <version>${flink.version}</version>
  116. <scope>${project.build.scope}</scope>
  117. </dependency>
  118. <!-- flink 写文件到HDFS -->
  119. <dependency>
  120. <groupId>org.apache.flink</groupId>
  121. <artifactId>flink-connector-filesystem_${scala.version}</artifactId>
  122. <version>${flink.version}</version>
  123. <scope>${project.build.scope}</scope>
  124. </dependency>
  125. <!-- mysql连接驱动 -->
  126. <dependency>
  127. <groupId>mysql</groupId>
  128. <artifactId>mysql-connector-java</artifactId>
  129. <version>${mysql.version}</version>
  130. <scope>${project.build.scope}</scope>
  131. </dependency>
  132. <!-- redis连接 -->
  133. <dependency>
  134. <groupId>redis.clients</groupId>
  135. <artifactId>jedis</artifactId>
  136. <version>${redis.version}</version>
  137. <scope>${project.build.scope}</scope>
  138. </dependency>
  139. <!-- flink操作parquet文件格式 -->
  140. <dependency>
  141. <groupId>org.apache.parquet</groupId>
  142. <artifactId>parquet-avro</artifactId>
  143. <version>${parquet.version}</version>
  144. <scope>${project.build.scope}</scope>
  145. </dependency>
  146. <dependency>
  147. <groupId>org.apache.parquet</groupId>
  148. <artifactId>parquet-hadoop</artifactId>
  149. <version>${parquet.version}</version>
  150. <scope>${project.build.scope}</scope>
  151. </dependency>
  152. <dependency>
  153. <groupId>org.apache.flink</groupId>
  154. <artifactId>flink-parquet_${scala.version}</artifactId>
  155. <version>${flink.version}</version>
  156. <scope>${project.build.scope}</scope>
  157. </dependency>
  158. <!-- json操作 -->
  159. <dependency>
  160. <groupId>com.alibaba</groupId>
  161. <artifactId>fastjson</artifactId>
  162. <version>${fastjson.version}</version>
  163. <scope>${project.build.scope}</scope>
  164. </dependency>
  165. <dependency>
  166. <groupId>org.apache.maven.plugins</groupId>
  167. <artifactId>maven-assembly-plugin</artifactId>
  168. <version>3.0.0</version>
  169. </dependency>
  170. </dependencies>
  171. <build>
  172. <resources>
  173. <resource>
  174. <directory>src/main/resources</directory>
  175. </resource>
  176. </resources>
  177. <plugins>
  178. <plugin>
  179. <groupId>org.apache.maven.plugins</groupId>
  180. <artifactId>maven-assembly-plugin</artifactId>
  181. <configuration>
  182. <descriptors>
  183. <descriptor>src/assembly/assembly.xml</descriptor>
  184. </descriptors>
  185. <archive>
  186. <manifest>
  187. <mainClass>${mainClass}</mainClass>
  188. </manifest>
  189. </archive>
  190. </configuration>
  191. <executions>
  192. <execution>
  193. <id>make-assembly</id>
  194. <phase>package</phase>
  195. <goals>
  196. <goal>single</goal>
  197. </goals>
  198. </execution>
  199. </executions>
  200. </plugin>
  201. <plugin>
  202. <groupId>org.apache.maven.plugins</groupId>
  203. <artifactId>maven-surefire-plugin</artifactId>
  204. <version>2.12</version>
  205. <configuration>
  206. <skip>true</skip>
  207. <forkMode>once</forkMode>
  208. <excludes>
  209. <exclude>**/**</exclude>
  210. </excludes>
  211. </configuration>
  212. </plugin>
  213. <plugin>
  214. <groupId>org.apache.maven.plugins</groupId>
  215. <artifactId>maven-compiler-plugin</artifactId>
  216. <version>3.1</version>
  217. <configuration>
  218. <source>${java.version}</source>
  219. <target>${java.version}</target>
  220. <encoding>${project.build.sourceEncoding}</encoding>
  221. </configuration>
  222. </plugin>
  223. </plugins>
  224. </build>
  225. </project>

二、JavaFlink案例

  1. package com.linwj.flink;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.common.functions.MapFunction;
  4. import org.apache.flink.api.common.functions.RichFlatMapFunction;
  5. import org.apache.flink.api.common.typeinfo.Types;
  6. import org.apache.flink.api.java.tuple.Tuple;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.api.java.tuple.Tuple3;
  9. import org.apache.flink.configuration.Configuration;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  14. import org.apache.flink.streaming.api.functions.ProcessFunction;
  15. import org.apache.flink.util.Collector;
  16. import java.util.Arrays;
  17. public class test {
  18. public static void main(String[] args) throws Exception {
  19. StreamExecutionEnvironment scc = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
  20. DataStreamSource<String> socket = scc.socketTextStream("localhost", 6666); // nc -lk 6666
  21. // // 1.lambda写法
  22. // SingleOutputStreamOperator<String> t1_flatmap = socket.flatMap((String a, Collector<String> out) -> { // 传入一个收集工具 Collector<String> out,更多案例见https://vimsky.com/examples/detail/java-method-org.apache.flink.util.Collector.collect.html
  23. // Arrays.stream(a.split(" ")).forEach((x1) -> {
  24. // out.collect(x1);
  25. // });
  26. // }).returns(Types.STRING);
  27. t1_flatmap.print();
  28. //
  29. // SingleOutputStreamOperator<Tuple3<String, Integer, String>> t2_map =
  30. // t1_flatmap.map((x1) -> Tuple3.of(x1, 1, "gg")).returns(Types.TUPLE(Types.STRING,Types.INT,Types.STRING));
  31. t2_map.print();
  32. //
  33. // SingleOutputStreamOperator<Tuple3<String, Integer, String>> t3_keyBy_sum =
  34. // t2_map.keyBy(0).sum(1); //没有->lambda表达式无需再声明返回类型
  35. // t3_keyBy_sum.print();
  36. //
  37. // scc.execute();
  38. //
  39. // // 2.function写法
  40. // /*
  41. // 1)匿名内部类的格式: new 父类名&接口名(){ 定义子类成员或者覆盖父类方法 }.方法。而内部类是有自己定的类名的。
  42. // 2) FlatMapFunction<String,String>如果不写泛型会编译报错:Class 'Anonymous class derived from FlatMapFunction' must either be declared abstract or implement abstract method 'flatMap(T, Collector<O>)' in 'FlatMapFunction'
  43. // 保持<String,String>对应实现接口或类的泛型
  44. // */
  45. // SingleOutputStreamOperator<String> t1_flatmap = socket.flatMap(new FlatMapFunction<String,String>() {
  46. // @Override
  47. // public void flatMap(String value, Collector<String> out) throws Exception {
  48. // String[] s = value.split(" ");
  49. // for (String ss : s) {
  50. // out.collect(ss);
  51. // }
  52. // }
  53. // });
  54. t1_flatmap.print();
  55. //
  56. // SingleOutputStreamOperator<Tuple2<String, Integer>> t2_map = t1_flatmap.map(new MapFunction<String, Tuple2<String, Integer>>() {
  57. // @Override
  58. // public Tuple2<String, Integer> map(String s) throws Exception {
  59. // return Tuple2.of(s, 1);
  60. // }
  61. // });
  62. t2_map.print();
  63. //
  64. // SingleOutputStreamOperator<Tuple2<String, Integer>> t3_keyBy_sum = t2_map.keyBy(0).sum(1);
  65. // t3_keyBy_sum.print();
  66. // // 3.function组合写法
  67. // SingleOutputStreamOperator<Tuple2<String, Integer>> flatmap = socket.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  68. //
  69. // @Override
  70. // public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
  71. // String[] s = value.split(" ");
  72. // for (String ss : s) {
  73. // out.collect(Tuple2.of(ss, 1));
  74. // }
  75. // }
  76. // });
  77. //
  78. // SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatmap.keyBy(0).sum(1);
  79. // sum.print();
  80. // // 4.richfunction组合写法
  81. // SingleOutputStreamOperator<Tuple2<String, Integer>> flatmap = socket.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
  82. // private String name = null;
  83. //
  84. // @Override
  85. // public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
  86. // String[] s = value.split(" ");
  87. // for (String ss : s) {
  88. // System.out.println(getRuntimeContext().getIndexOfThisSubtask()); //RichMapFunction富函数,额外提供了获取运行时上下文的方法 getRuntimeContext(),可以拿到状态,还有并行度、任务名称之类的运行时信息。
  89. // out.collect(Tuple2.of(name + ss, 1));
  90. // }
  91. //
  92. // }
  93. //
  94. // @Override
  95. // public void open(Configuration parameters) {
  96. // name = "linwj_";
  97. // }
  98. //
  99. // @Override
  100. // public void close() {
  101. // name = null;
  102. // }
  103. //
  104. // });
  105. // SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatmap.keyBy(0).sum(1);
  106. // sum.print();
  107. //5.processfunction组合写法
  108. SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socket.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
  109. private String name = null;
  110. @Override
  111. public void open(Configuration parameters) throws Exception {
  112. name = "linwj";
  113. }
  114. @Override
  115. public void close() throws Exception {
  116. name = null;
  117. }
  118. @Override
  119. public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
  120. // getRuntimeContext()
  121. String[] s = value.split(" ");
  122. for (String ss : s) {
  123. System.out.println(getRuntimeContext().getIndexOfThisSubtask());
  124. out.collect(Tuple2.of(name + ss, 1));
  125. }
  126. }
  127. }).keyBy(0).process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
  128. private Integer num = 0;
  129. @Override
  130. public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
  131. num += value.f1;
  132. out.collect(Tuple2.of(value.f0,num));
  133. }
  134. });
  135. sum.print();
  136. scc.execute();
  137. }
  138. }

原文链接:https://blog.csdn.net/qq_44000055/article/details/127424229

标签:project,Java,JavaFlink,flink,Maven,version,build,apache,org
From: https://www.cnblogs.com/sunny3158/p/18033613

相关文章

  • java实现scp功能实现目录下所有文件拷贝至指定服务器
    1、添加pom依赖<dependency><groupId>com.jcraft</groupId><artifactId>jsch</artifactId><version>0.1.55</version></dependency>2、示例代码publicstaticvoidmain(String[]args)throwsIOException{try{......
  • Java中的数组-暂未完结
    数组的定义数组是相同类型数据的有序集合数组描述的是相同类型的若干个数据,按照一定的先后次序排列组合而成。◆其中,每一个数据称作一个数组元素,每个数组元素可以通过一个下标来访问它们。数组声明创建◆首先必须声明数组变量,才能在程序中使用数组。下面是声明数组变量的......
  • Java HashMap merge() 方法
    在3020.子集中元素的最大数量【力扣周赛382】用哈希表统计元素个数使用点击查看代码classSolution{publicintmaximumLength(int[]nums){Map<Long,Integer>cnt=newHashMap<>();for(intx:nums){cnt.merge((long)x,1,In......
  • java中的基础方法使用
    何谓方法?◆System.out.printIn(),那么它是什么呢?◆Java方法是语句的集合,它们在一起执行一个功能。◆方法是解决一类问题的步骤的有序组合◆方法包含于类或对象中◆方法在程序中被创建,在其他地方被引用设计方法的原则:方法的本意是功能块,就是实现某个功能的语句块的集合。我......
  • Java基础12:JavaDoc生成文档
    JavaDoc1.javadoc命令是用来生成自己API文档的2.参数信息2.1@author作者名2.2@version版本号2.3@since指明需要最早使用的jdk版本2.4@param参数名2.5@return返回值情况2.6@throws异常抛出情况 ......
  • FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memo
    <---JSstacktrace--->====JSstacktrace=========================================Securitycontext:000000A526FA5891<JSObject>1:fromString(akafromString)[buffer.js:314][bytecode=0000010AE305A149offset=164](this=00000347695822D1&......
  • Java基础11:包机制
    1.包机制:1.1为了更好地组织类,Java提供了包机制,用于区别类名的命名空间1.2包语句的语法格式为:1.3一般利用公司域名倒置最为包名;1.4为了能够使用某一个包的成员,我们需要在Java程序中明确导入该包。使用`import`语句可以完成此功能 创建一个包......
  • Java的IO流
    夯实基础的第三篇:IO流1、流的概念流就是一连串的连续动态数据的集合,在Java的IO流中我们可以将其看做一个管道,将其包含输入输出流,Sourcet通过输入流将数据信息传给程序,而程序又通过输出流将数据信息传给目的地。就类似于我们从外部将东西传给程序就要输入流,我们从程序传给外部就......
  • DVWA-Javascript
    Javascript漏洞指的是通过某种方式绕过前端的javascript逻辑进行服务器访问。 --low级别:服务器端代码:提交token无效的结果如下:因为这里要求是在文本框中提交success内容,并且获取到最新token信息,才能提交成功。因此,先在文本框中输入success,然后再console控制台中,手动调用g......
  • Java学习笔记(1)
    常用DOS指令helloworld勾选文件扩展名创建.txt文件去掉.class后缀,运行java的执行原理组成跨平台使用IDEA创建项目之后直接编写代码使用Scanner接收输入数组静态初始化形式要注意数组的访问代码演示:publicclassMain{publicstaticvoidmain(......