WordCount
WordCountPojo.java
代码
package wordCount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import wordCount.util.WordCountData;
@SuppressWarnings("serial")
public class WordCountPojo {
/**
* This is the POJO (Plain Old Java Object) that is being used
* for all the operations.
* As long as all fields are public or have a getter/setter, the system can handle them
*/
public static class Word {
// fields
private String word;
private int frequency;
// constructors
public Word() {}
public Word(String word, int i) {
this.word = word;
this.frequency = i;
}
// getters setters
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getFrequency() {
return frequency;
}
public void setFrequency(int frequency) {
this.frequency = frequency;
}
@Override
public String toString() {
return "Word=" + word + " freq=" + frequency;
}
}
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet<String> text;
if (params.has("input")) {
// read the text file from given input path
text = env.readTextFile(params.get("input"));
} else {
// get default test text data
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = WordCountData.getDefaultTextLineDataSet(env);
}
DataSet<Word> counts =
// split up the lines into Word objects (with frequency = 1)
text.flatMap(new Tokenizer())
// group by the field word and sum up the frequency
.groupBy("word")
.reduce(new ReduceFunction<Word>() {
@Override
public Word reduce(Word value1, Word value2) throws Exception {
return new Word(value1.word, value1.frequency + value2.frequency);
}
});
if (params.has("output")) {
counts.writeAsText(params.get("output"), FileSystem.WriteMode.OVERWRITE);
// execute program
env.execute("WordCount-Pojo Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple Word objects.
*/
public static final class Tokenizer implements FlatMapFunction<String, Word> {
@Override
public void flatMap(String value, Collector<Word> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Word(token, 1));
}
}
}
}
}
代码解释
这段代码是一个使用 Apache Flink 实现的简单的单词计数程序。它包含了一个 POJO 类 WordCountPojo.Word
和一个 main
方法。
代码解释如下:
-
@SuppressWarnings("serial")
是一个注解,用于告诉编译器忽略特定类型的警告信息。 -
WordCountPojo
类是一个包含main
方法的公共类。 Word
是一个静态嵌套类,用于表示单词及其频率的 POJO(Plain Old Java Object)。它包含了以下成员:
-
word
:表示单词的字符串类型字段。 -
frequency
:表示单词出现频率的整数类型字段。 -
Word()
:默认构造函数。 -
Word(String word, int i)
:带参数的构造函数,用于设置单词和频率的初始值。 -
getWord()
和setWord()
:用于获取和设置单词字段的方法。 -
getFrequency()
和setFrequency()
:用于获取和设置频率字段的方法。 -
toString()
:重写的toString()
方法,返回包含单词和频率的字符串表示。
main
方法是程序的入口点。它使用 Apache Flink 的执行环境ExecutionEnvironment
来设置和执行 Flink 作业。下面是主要步骤:
- 创建
ParameterTool
对象params
,用于从命令行参数中获取配置参数。 - 获取执行环境
env
。 - 将参数设置为全局作业参数,以便在 Web 接口中使用。
- 根据参数中指定的输入路径读取文本数据集,或者使用默认的测试文本数据集。
- 对文本进行处理,首先使用
Tokenizer
函数将每行文本拆分为单词,并转换为Word
对象(频率初始值为1)。 - 将转换后的数据按照单词字段进行分组,并使用
ReduceFunction
对相同单词的频率进行累加。 - 如果参数中指定了输出路径,则将结果写入到指定的文件中,并执行作业。
- 如果没有指定输出路径,则将结果打印到标准输出。
Tokenizer
是一个实现了FlatMapFunction
接口的静态内部类,用于将输入的文本行拆分为单词并转换为Word
对象。它包含以下方法:
-
flatMap(String value, Collector<Word> out)
:接收一个输入字符串,将其转换为小写并按非字母字符拆分为单词。然后遍历每个单词,如果单词长度大于0,则创建一个新的Word
对象,并将其添加到输出集合中。
总体而言,该代码实现了一个简单的单词计数程序,使用 Flink 提供的数据处理功能。它将输入文本拆分为单词,并统计每个单词出现的频率,最后将结果输出到文件或标准输出。
WordCount.java
代码
package wordCount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
import wordCount.util.WordCountData;
public class WordCount {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet<String> text;
if (params.has("input")) {
// read the text file from given input path
text = env.readTextFile(params.get("input"));
} else {
// get default test text data
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = WordCountData.getDefaultTextLineDataSet(env);
}
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
*/
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws InterruptedException {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
// Sleep 10s while processing each for word
Thread.sleep(1_000L);
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
代码解释
这段代码是一个使用 Apache Flink 实现的简单的单词计数程序。它包含了一个 main
方法和一个 Tokenizer
内部类。
代码解释如下:
-
WordCount
类是一个公共类,包含了main
方法。 main
方法是程序的入口点。它使用 Apache Flink 的执行环境ExecutionEnvironment
来设置和执行 Flink 作业。以下是主要步骤:
- 创建
ParameterTool
对象params
,用于从命令行参数中获取配置参数。 - 获取执行环境
env
。 - 将参数设置为全局作业参数,以便在 Web 接口中使用。
- 根据参数中指定的输入路径读取文本数据集,或者使用默认的测试文本数据集。
- 对文本进行处理,首先使用
Tokenizer
函数将每行文本拆分为单词,并转换为(word, 1)
的元组。 - 将转换后的数据按照单词字段进行分组,并对元组的第二个字段进行求和。
- 如果参数中指定了输出路径,则将结果以 CSV 格式写入到指定的文件中,并执行作业。
- 如果没有指定输出路径,则将结果打印到标准输出。
Tokenizer
是一个实现了FlatMapFunction
接口的静态内部类,用于将输入的文本行拆分为单词并转换为(word, 1)
的元组。它包含以下方法:
-
flatMap(String value, Collector<Tuple2<String, Integer>> out)
:接收一个输入字符串,将其转换为小写并按非字母字符拆分为单词。然后遍历每个单词,如果单词长度大于0,则创建一个新的(word, 1)
的元组,并将其添加到输出集合中。在每个单词处理时,线程会休眠1秒钟,以模拟一个耗时的操作。
总体而言,该代码实现了一个简单的单词计数程序,使用 Flink 提供的数据处理功能。它将输入文本拆分为单词,并统计每个单词出现的次数,最后将结果输出到文件或标准输出。在 Tokenizer
类中,还模拟了一个耗时的操作,以展示在处理数据时可以执行一些自定义操作的能力。
maven pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>batch</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!--JDK版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<!-- flink 打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>wordCount.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
links:
https://github.com/will-che/flink-simple-tutorial