首页 > 编程语言 >Java代码:flink wordcount代码示例及解读

Java代码:flink wordcount代码示例及解读

时间:2023-09-02 11:34:12浏览次数:38  
标签:Java String 示例 word 代码 flink 单词 org public


WordCount

WordCountPojo.java

代码

package wordCount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import wordCount.util.WordCountData;

@SuppressWarnings("serial")
public class WordCountPojo {

    /**
     * This is the POJO (Plain Old Java Object) that is being used
     * for all the operations.
     * As long as all fields are public or have a getter/setter, the system can handle them
     */
    public static class Word {

        // fields
        private String word;
        private int frequency;

        // constructors
        public Word() {}

        public Word(String word, int i) {
            this.word = word;
            this.frequency = i;
        }

        // getters setters
        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public int getFrequency() {
            return frequency;
        }

        public void setFrequency(int frequency) {
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return "Word=" + word + " freq=" + frequency;
        }
    }

    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataSet<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            // get default test text data
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            text = WordCountData.getDefaultTextLineDataSet(env);
        }

        DataSet<Word> counts =
                // split up the lines into Word objects (with frequency = 1)
                text.flatMap(new Tokenizer())
                        // group by the field word and sum up the frequency
                        .groupBy("word")
                        .reduce(new ReduceFunction<Word>() {
                            @Override
                            public Word reduce(Word value1, Word value2) throws Exception {
                                return new Word(value1.word, value1.frequency + value2.frequency);
                            }
                        });

        if (params.has("output")) {
            counts.writeAsText(params.get("output"), FileSystem.WriteMode.OVERWRITE);
            // execute program
            env.execute("WordCount-Pojo Example");
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }

    }

    // *************************************************************************
    //     USER FUNCTIONS
    // *************************************************************************

    /**
     * Implements the string tokenizer that splits sentences into words as a user-defined
     * FlatMapFunction. The function takes a line (String) and splits it into
     * multiple Word objects.
     */
    public static final class Tokenizer implements FlatMapFunction<String, Word> {

        @Override
        public void flatMap(String value, Collector<Word> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Word(token, 1));
                }
            }
        }
    }

}

代码解释

这段代码是一个使用 Apache Flink 实现的简单的单词计数程序。它包含了一个 POJO 类 WordCountPojo.Word 和一个 main 方法。

代码解释如下:

  1. @SuppressWarnings("serial") 是一个注解,用于告诉编译器忽略特定类型的警告信息。
  2. WordCountPojo 类是一个包含 main 方法的公共类。
  3. Word 是一个静态嵌套类,用于表示单词及其频率的 POJO(Plain Old Java Object)。它包含了以下成员:
  • word:表示单词的字符串类型字段。
  • frequency:表示单词出现频率的整数类型字段。
  • Word():默认构造函数。
  • Word(String word, int i):带参数的构造函数,用于设置单词和频率的初始值。
  • getWord()setWord():用于获取和设置单词字段的方法。
  • getFrequency()setFrequency():用于获取和设置频率字段的方法。
  • toString():重写的 toString() 方法,返回包含单词和频率的字符串表示。
  1. main 方法是程序的入口点。它使用 Apache Flink 的执行环境 ExecutionEnvironment 来设置和执行 Flink 作业。下面是主要步骤:
  • 创建 ParameterTool 对象 params,用于从命令行参数中获取配置参数。
  • 获取执行环境 env
  • 将参数设置为全局作业参数,以便在 Web 接口中使用。
  • 根据参数中指定的输入路径读取文本数据集,或者使用默认的测试文本数据集。
  • 对文本进行处理,首先使用 Tokenizer 函数将每行文本拆分为单词,并转换为 Word 对象(频率初始值为1)。
  • 将转换后的数据按照单词字段进行分组,并使用 ReduceFunction 对相同单词的频率进行累加。
  • 如果参数中指定了输出路径,则将结果写入到指定的文件中,并执行作业。
  • 如果没有指定输出路径,则将结果打印到标准输出。
  1. Tokenizer 是一个实现了 FlatMapFunction 接口的静态内部类,用于将输入的文本行拆分为单词并转换为 Word 对象。它包含以下方法:
  • flatMap(String value, Collector<Word> out):接收一个输入字符串,将其转换为小写并按非字母字符拆分为单词。然后遍历每个单词,如果单词长度大于0,则创建一个新的 Word 对象,并将其添加到输出集合中。

总体而言,该代码实现了一个简单的单词计数程序,使用 Flink 提供的数据处理功能。它将输入文本拆分为单词,并统计每个单词出现的频率,最后将结果输出到文件或标准输出。

WordCount.java

代码

package wordCount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
import wordCount.util.WordCountData;

public class WordCount {

    // *************************************************************************
    //     PROGRAM
    // *************************************************************************

    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataSet<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            // get default test text data
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            text = WordCountData.getDefaultTextLineDataSet(env);
        }

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy(0)
                        .sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsCsv(params.get("output"), "\n", " ");
            // execute program
            env.execute("WordCount Example");
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }

    }

    // *************************************************************************
    //     USER FUNCTIONS
    // *************************************************************************

    /**
     * Implements the string tokenizer that splits sentences into words as a user-defined
     * FlatMapFunction. The function takes a line (String) and splits it into
     * multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
     */
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws InterruptedException {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    // Sleep 10s while processing each for word
                    Thread.sleep(1_000L);
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }

}

代码解释

这段代码是一个使用 Apache Flink 实现的简单的单词计数程序。它包含了一个 main 方法和一个 Tokenizer 内部类。

代码解释如下:

  1. WordCount 类是一个公共类,包含了 main 方法。
  2. main 方法是程序的入口点。它使用 Apache Flink 的执行环境 ExecutionEnvironment 来设置和执行 Flink 作业。以下是主要步骤:
  • 创建 ParameterTool 对象 params,用于从命令行参数中获取配置参数。
  • 获取执行环境 env
  • 将参数设置为全局作业参数,以便在 Web 接口中使用。
  • 根据参数中指定的输入路径读取文本数据集,或者使用默认的测试文本数据集。
  • 对文本进行处理,首先使用 Tokenizer 函数将每行文本拆分为单词,并转换为 (word, 1) 的元组。
  • 将转换后的数据按照单词字段进行分组,并对元组的第二个字段进行求和。
  • 如果参数中指定了输出路径,则将结果以 CSV 格式写入到指定的文件中,并执行作业。
  • 如果没有指定输出路径,则将结果打印到标准输出。
  1. Tokenizer 是一个实现了 FlatMapFunction 接口的静态内部类,用于将输入的文本行拆分为单词并转换为 (word, 1) 的元组。它包含以下方法:
  • flatMap(String value, Collector<Tuple2<String, Integer>> out):接收一个输入字符串,将其转换为小写并按非字母字符拆分为单词。然后遍历每个单词,如果单词长度大于0,则创建一个新的 (word, 1) 的元组,并将其添加到输出集合中。在每个单词处理时,线程会休眠1秒钟,以模拟一个耗时的操作。

总体而言,该代码实现了一个简单的单词计数程序,使用 Flink 提供的数据处理功能。它将输入文本拆分为单词,并统计每个单词出现的次数,最后将结果输出到文件或标准输出。在 Tokenizer 类中,还模拟了一个耗时的操作,以展示在处理数据时可以执行一些自定义操作的能力。

maven pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>batch</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>

        <!-- Flink dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.9.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.9.0</version>
        </dependency>


    </dependencies>


    <build>
        <plugins>
            <!--JDK版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <showWarnings>true</showWarnings>
                </configuration>
            </plugin>
            <!--  flink 打包插件    -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>wordCount.WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

links:

https://github.com/will-che/flink-simple-tutorial


标签:Java,String,示例,word,代码,flink,单词,org,public
From: https://blog.51cto.com/zhangxueliang/7331362

相关文章

  • Java集合面试之Queue篇
    Java集合面试之Queue篇(qq.com)1、队列是什么?队列是常用数据结构之一。是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,故为先进先出(FIFO,firstinfirstout)线性表。和栈一样,队列是一种操作受限制的线性表。2、队列的分类?Qu......
  • 从零开发Java入门项目--十天掌握
    简介这是一个靠谱的Java入门项目实战,名字叫蚂蚁爱购。从零开发项目,视频加文档,十天就能学会开发Java项目,教程路线是:搭建环境=>安装软件=>创建项目=>添加依赖和配置=>通过表生成代码=>编写Java代码=>代码自测=>前后端联调=>准备找工作。学完即可成为合格的Java开发,心里有底,再......
  • 说说你知道哪些Java集合吧
    Java集合,主要由两大接口派生而来一个是Collection接口,主要用于存放单一元素;下面有三个主要的子接口,List、Set、Queue。List实现类主要有ArrayList、LinkedList、Vector、StackSet实现类主要是HashSet、LinkedHashSet、TreeSetQueue主要是实现类有ArrayDeque、PriorityQueue......
  • Java List常见面试题
    Java集合面试之List篇你好,面试官|我用JavaList狂怼面试官~(qq.com)本文涉及ArrayList与LinkedList区别、ArrayList扩容机制、CopyOnWriteArrayList特点、场景、思想ArrayList:基于数组实现的非线程安全的集合。实现RandomAccess接口,支持随机访问,查询元素快,插入,......
  • 代码随想录——数组篇
    二分查找题目链接注意:求均值防溢出,left+(right-left)/2等价于(left+right)/2。原地移除元素题目链接给你一个数组nums和一个值val,你需要原地移除所有数值等于val的元素,并返回移除后数组的新长度。不要使用额外的数组空间,你必须仅使用O(1)额外空间并原地修改输入......
  • Java入门
    Java初识Java发展史时间节点1991年,Sun公司进军嵌入式开发,让电视、冰箱、微波炉等设备能够用上编程语言,成立了Green项目小组;1992年,由于C++语言的繁琐且不支持跨平台,研发团队基于C++开发了Oak语言;1995年,互联网大爆发,跨平台的特性使得Oak语言得到飞速发展,同时正式更名为Java(爪......
  • mybatis中的UserMapper.xml模板与测试mybatis的代码
    2023-09-02UserMapper.xml模板<?xmlversion="1.0"encoding="UTF-8"?><!DOCTYPEmapperPUBLIC"-//mybatis.org//DTDMapper3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mappe......
  • 全局多项式(趋势面)与IDW逆距离加权插值:MATLAB代码
      本文介绍基于MATLAB实现全局多项式插值法与逆距离加权法的空间插值的方法,并对不同插值方法结果加以对比分析。目录1背景知识2实际操作部分2.1空间数据读取2.2异常数据剔除2.3验证集筛选2.4最小二乘法求解2.5逆距离加权法求解2.6插值精度检验2.7数据导出与专题地图制......
  • 220230825-纯js实现以下代码
    题目<ul><li>1<li><li>2<li><li>3<li></ul>代码<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metaname="viewport"con......
  • android面试题:谈谈对Java中多态的理解
     Java中的多态是面向对象编程的一个重要特征,它允许同一个类型的对象在不同的情况下表现出不同的行为。多态是Java语言中实现代码复用、提高代码可维护性和可扩展性的重要手段。 多态的实现基于两个核心概念:继承和方法重写。在Java中,子类可以继承父类的方法,并且可以重写(覆......