概述
希望通过本示例对flink 有一个轮廓性的认识
本示例实现效果:flink 连接 Socket Server,从Socker Server 中按行读取数据作为数据输入,将输入的数据根据空格切分、分组、求和,最终在控制台输出 各个词组出现的次数
前置条件
- Flink 集群
- 开发工具 Intellij idea
- 开发语言---本示例采用java
示例中采用的版本信息
-
Flink 集群 :flink-1.15.2
-
JDK: 11
-
spring boot版本:2.7.5
WordCount 程序
引入依赖
-
创建Maven Project,引入依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.5</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.bbx</groupId> <artifactId>flinkdemo2022</artifactId> <version>0.0.1-SNAPSHOT</version> <name>flinkdemo2022</name> <description>Demo project for Spring Boot</description> <properties> <java.version>11</java.version> <flink.version>1.15.2</flink.version> <scala.version></scala.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.1</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!--注意此处 是要运行的main 方法 --> <mainClass>com.bbx.flinkdemo2022.wordconut.WcUnBoundStream</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
创建flink 程序
-
Flink 程序整体分为五部分
- 创建执行环境
- 配置输入数据源
- 数据加工处理
- 数据输出
- 启动执行
-
WordCount 程序示例
package com.bbx.flinkdemo2022.wordconut; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; @Slf4j public class WcUnBoundStream { public static void main(String[] args) throws Exception { //获取输入参数 ParameterTool parameterTool = ParameterTool.fromArgs(args); String host = parameterTool.get("host"); int port = parameterTool.getInt("port"); //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //获取输入源 env.socketTextStream(host, port) //按照 空格进行分割 .flatMap((String line, Collector<Tuple2<String, Long>> collector) -> { Arrays.stream(line.split(" ")).forEach(j -> { collector.collect(Tuple2.of(j, 1L)); }); }).returns(Types.TUPLE(Types.STRING,Types.LONG)) .keyBy(data -> data.f0) .sum(1) // 输出 .print(); // 启动执行 env.execute("word count"); } }
-
打包程序--将程序进行打包
启动Socket Server
-
在Linux 环境中通过 nc 命令启动
nc -lk 3300 #本示例中执行该命令的主机IP为:192.168.10.131
提交job
Flink UI 端提交
-
访问Flink UI,点击“Submit New Job ”菜单中 “Add New” 按钮 选择程序包(flinkdemo2022-0.0.1-SNAPSHOT-jar-with-dependencies.jar)
-
配置必要的信息后提交
-
查看运行中job
-
在 Socket Server 端输入信息
aaa bbb aaa qwe qwe aaa
-
点击 “Task Managers” 菜单选择 taskmanager 查看 Stdout中输出
-
取消任务
命令提交
-
提交命令
./bin/flink run -c com.bbx.flinkdemo2022.wordconut.WcUnBoundStream -p 2 flinkdemo2022-0.0.1-SNAPSHOT-jar-with-dependencies.jar --host 192.168.10.131 --port 3300
-
查看运行job
./bin/flink list
-
取消命令
bin/flink cancel 9f4a7fb98ddaae95d3333f4cce46d8a8 # 9f4a7fb98ddaae95d3333f4cce46d8a8 为JobID,可以通过命令
-
命令指南
./bin/flink -h