Java代码:flink wordcount代码示例及解读

package wordCount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import wordCount.util.WordCountData;

public class WordCountPojo {

     * This is the POJO (Plain Old Java Object) that is being used
     * for all the operations.
     * As long as all fields are public or have a getter/setter, the system can handle them
    public static class Word {

        // fields
        private String word;
        private int frequency;

        // constructors
        public Word() {}

        public Word(String word, int i) {
            this.word = word;
            this.frequency = i;

        // getters setters
        public String getWord() {
            return word;

        public void setWord(String word) {
            this.word = word;

        public int getFrequency() {
            return frequency;

        public void setFrequency(int frequency) {
            this.frequency = frequency;

        public String toString() {
            return "Word=" + word + " freq=" + frequency;

    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface

        // get input data
        DataSet<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            // get default test text data
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            text = WordCountData.getDefaultTextLineDataSet(env);

        DataSet<Word> counts =
                // split up the lines into Word objects (with frequency = 1)
                text.flatMap(new Tokenizer())
                        // group by the field word and sum up the frequency
                        .reduce(new ReduceFunction<Word>() {
                            public Word reduce(Word value1, Word value2) throws Exception {
                                return new Word(value1.word, value1.frequency + value2.frequency);

        if (params.has("output")) {
            counts.writeAsText(params.get("output"), FileSystem.WriteMode.OVERWRITE);
            // execute program
            env.execute("WordCount-Pojo Example");
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");


    // *************************************************************************
    // *************************************************************************

     * Implements the string tokenizer that splits sentences into words as a user-defined
     * FlatMapFunction. The function takes a line (String) and splits it into
     * multiple Word objects.
    public static final class Tokenizer implements FlatMapFunction<String, Word> {

        public void flatMap(String value, Collector<Word> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Word(token, 1));



这段代码是一个使用 Apache Flink 实现的简单的单词计数程序。它包含了一个 POJO 类 WordCountPojo.Word 和一个 main 方法。


  1. @SuppressWarnings("serial") 是一个注解,用于告诉编译器忽略特定类型的警告信息。
  2. WordCountPojo 类是一个包含 main 方法的公共类。
  3. Word 是一个静态嵌套类,用于表示单词及其频率的 POJO(Plain Old Java Object)。它包含了以下成员:
  • word:表示单词的字符串类型字段。
  • frequency:表示单词出现频率的整数类型字段。
  • Word():默认构造函数。
  • Word(String word, int i):带参数的构造函数,用于设置单词和频率的初始值。
  • getWord()setWord():用于获取和设置单词字段的方法。
  • getFrequency()setFrequency():用于获取和设置频率字段的方法。
  • toString():重写的 toString() 方法,返回包含单词和频率的字符串表示。
  1. main 方法是程序的入口点。它使用 Apache Flink 的执行环境 ExecutionEnvironment 来设置和执行 Flink 作业。下面是主要步骤:
  • 创建 ParameterTool 对象 params,用于从命令行参数中获取配置参数。
  • 获取执行环境 env
  • 将参数设置为全局作业参数,以便在 Web 接口中使用。
  • 根据参数中指定的输入路径读取文本数据集,或者使用默认的测试文本数据集。
  • 对文本进行处理,首先使用 Tokenizer 函数将每行文本拆分为单词,并转换为 Word 对象(频率初始值为1)。
  • 将转换后的数据按照单词字段进行分组,并使用 ReduceFunction 对相同单词的频率进行累加。
  • 如果参数中指定了输出路径,则将结果写入到指定的文件中,并执行作业。
  • 如果没有指定输出路径,则将结果打印到标准输出。
  1. Tokenizer 是一个实现了 FlatMapFunction 接口的静态内部类,用于将输入的文本行拆分为单词并转换为 Word 对象。它包含以下方法:
  • flatMap(String value, Collector<Word> out):接收一个输入字符串,将其转换为小写并按非字母字符拆分为单词。然后遍历每个单词,如果单词长度大于0,则创建一个新的 Word 对象,并将其添加到输出集合中。

总体而言,该代码实现了一个简单的单词计数程序,使用 Flink 提供的数据处理功能。它将输入文本拆分为单词,并统计每个单词出现的频率,最后将结果输出到文件或标准输出。



package wordCount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
import wordCount.util.WordCountData;

public class WordCount {

    // *************************************************************************
    //     PROGRAM
    // *************************************************************************

    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface

        // get input data
        DataSet<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            // get default test text data
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            text = WordCountData.getDefaultTextLineDataSet(env);

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"

        // emit result
        if (params.has("output")) {
            counts.writeAsCsv(params.get("output"), "\n", " ");
            // execute program
            env.execute("WordCount Example");
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");


    // *************************************************************************
    // *************************************************************************

     * Implements the string tokenizer that splits sentences into words as a user-defined
     * FlatMapFunction. The function takes a line (String) and splits it into
     * multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws InterruptedException {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    // Sleep 10s while processing each for word
                    out.collect(new Tuple2<>(token, 1));



这段代码是一个使用 Apache Flink 实现的简单的单词计数程序。它包含了一个 main 方法和一个 Tokenizer 内部类。


  1. WordCount 类是一个公共类,包含了 main 方法。
  2. main 方法是程序的入口点。它使用 Apache Flink 的执行环境 ExecutionEnvironment 来设置和执行 Flink 作业。以下是主要步骤:
  • 创建 ParameterTool 对象 params,用于从命令行参数中获取配置参数。
  • 获取执行环境 env
  • 将参数设置为全局作业参数,以便在 Web 接口中使用。
  • 根据参数中指定的输入路径读取文本数据集,或者使用默认的测试文本数据集。
  • 对文本进行处理,首先使用 Tokenizer 函数将每行文本拆分为单词,并转换为 (word, 1) 的元组。
  • 将转换后的数据按照单词字段进行分组,并对元组的第二个字段进行求和。
  • 如果参数中指定了输出路径,则将结果以 CSV 格式写入到指定的文件中,并执行作业。
  • 如果没有指定输出路径,则将结果打印到标准输出。
  1. Tokenizer 是一个实现了 FlatMapFunction 接口的静态内部类,用于将输入的文本行拆分为单词并转换为 (word, 1) 的元组。它包含以下方法:
  • flatMap(String value, Collector<Tuple2<String, Integer>> out):接收一个输入字符串,将其转换为小写并按非字母字符拆分为单词。然后遍历每个单词,如果单词长度大于0,则创建一个新的 (word, 1) 的元组,并将其添加到输出集合中。在每个单词处理时,线程会休眠1秒钟,以模拟一个耗时的操作。

总体而言,该代码实现了一个简单的单词计数程序,使用 Flink 提供的数据处理功能。它将输入文本拆分为单词,并统计每个单词出现的次数,最后将结果输出到文件或标准输出。在 Tokenizer 类中,还模拟了一个耗时的操作,以展示在处理数据时可以执行一些自定义操作的能力。

maven pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">


        <!-- Flink dependencies -->

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->


            <!--JDK版本 -->
            <!--  flink 打包插件    -->
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">




