首页 > 编程语言 >Flink 1.17教程:wordcount maven工程java代码示例(批、流实现方式)

Flink 1.17教程:wordcount maven工程java代码示例(批、流实现方式)

时间:2023-09-02 11:34:54浏览次数:41  
标签:1.17 java 示例 flink api import apache org TODO


批、流实现wordcount代码示例

pom.xml

<properties>
        <flink.version>1.17.0</flink.version>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

代码

DataSet批处理实现Wordcount

package com.atguigu.wc;
 
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
 
/**
 * TODO DataSet API 实现 wordCount
 */
public class WordCountBatchDemo {
    public static void main(String[] args) throws Exception {
        // TODO 1.创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
        // TODO 2.读取文件:从文件中读取
        DataSource<String> lineDS = env.readTextFile("input/word.txt");
 
        // TODO 3.切分、转换(word, 1)
        FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                // TODO 3.1 按照空格切分单词
                String[] words = value.split(" ");
                // TODO 3.2 将单词转换为(word, 1)格式
                for (String word : words) {
                    Tuple2<String, Integer> wordTuple2 = Tuple2.of(word, 1);
                    // TODO 3.3 使用Collector向下游发送数据
                    out.collect(wordTuple2);
                }
            }
        });
 
        // TODO 4.按照word分组
        UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroupBy = wordAndOne.groupBy(0);
 
        // TODO 5.各分组内聚合
        AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroupBy.sum(1); //1是位置,表示第二个元素
 
        // TODO 6.输出
        sum.print();
    }
}

ctrl + p:查看传参方式。

Flink 1.17教程:wordcount maven工程java代码示例(批、流实现方式)_maven

ctrl + p:查看传参方式。

Flink 1.17教程:wordcount maven工程java代码示例(批、流实现方式)_java_02

src同级根目录:input/word.txt

hello flink
hello world
hello java

执行结果

Flink 1.17教程:wordcount maven工程java代码示例(批、流实现方式)_apache_03

DataStream有界流实现Wordcount

package com.atguigu.wc;
 
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
 
/**
 * TODO DataStream实现Wordcount:读文件(有界流)
 *
 */
public class WordCountStreamDemo {
    public static void main(String[] args) throws Exception {
        // TODO 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        // TODO 2.读取数据:从文件读
        DataStreamSource<String> lineDS = env.readTextFile("input/word.txt");
 
        // TODO 3.处理数据: 切分、转换、分组、聚合
        // TODO 3.1 切分、转换
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = lineDS //<输入类型, 输出类型>
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        // 按照 空格 切分
                        String[] words = value.split(" ");
                        for (String word : words) {
                            // 转换成 二元组 (word,1)
                            Tuple2<String, Integer> wordsAndOne = Tuple2.of(word, 1);
                            // 通过 采集器 向下游发送数据
                            out.collect(wordsAndOne);
                        }
                    }
                });
        // TODO 3.2 分组
        KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOneDS.keyBy(
                new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                }
        );
        // TODO 3.3 聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1);
 
        // TODO 4.输出数据
        sumDS.print();
 
        // TODO 5.执行:类似 sparkstreaming最后 ssc.start()
        env.execute();
    }
}
 
/**
 * 接口 A,里面有一个方法a()
 * 1、正常实现接口步骤:
 * <p>
 * 1.1 定义一个class B  实现 接口A、方法a()
 * 1.2 创建B的对象:   B b = new B()
 * <p>
 * <p>
 * 2、接口的匿名实现类:
 * new A(){
 * a(){
 * <p>
 * }
 * }
 */

批、流代码对比

Flink 1.17教程:wordcount maven工程java代码示例(批、流实现方式)_apache_04

  1. 创建执行环境的不同,流处理程序使用的是 StreamExecutionEnvironment。
  2. 转换处理之后,得到的数据对象类型不同。
  3. 分组操作调用的是 keyBy 方法,可以传入一个匿名函数作为键选择器(KeySelector), 指定当前分组的 key 是什么。
  4. 代码末尾需要调用 env 的 execute 方法,开始执行任务。

标签:1.17,java,示例,flink,api,import,apache,org,TODO
From: https://blog.51cto.com/zhangxueliang/7331352

相关文章

  • Flink 1.17教程:DataStream实现Wordcount——读socket(无界流)
    pom.xml<properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>fli......
  • Java代码:flink wordcount代码示例及解读
    WordCountWordCountPojo.java代码packagewordCount;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.functions.ReduceFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.Executio......
  • Java集合面试之Queue篇
    Java集合面试之Queue篇(qq.com)1、队列是什么?队列是常用数据结构之一。是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,故为先进先出(FIFO,firstinfirstout)线性表。和栈一样,队列是一种操作受限制的线性表。2、队列的分类?Qu......
  • 从零开发Java入门项目--十天掌握
    简介这是一个靠谱的Java入门项目实战,名字叫蚂蚁爱购。从零开发项目,视频加文档,十天就能学会开发Java项目,教程路线是:搭建环境=>安装软件=>创建项目=>添加依赖和配置=>通过表生成代码=>编写Java代码=>代码自测=>前后端联调=>准备找工作。学完即可成为合格的Java开发,心里有底,再......
  • 说说你知道哪些Java集合吧
    Java集合,主要由两大接口派生而来一个是Collection接口,主要用于存放单一元素;下面有三个主要的子接口,List、Set、Queue。List实现类主要有ArrayList、LinkedList、Vector、StackSet实现类主要是HashSet、LinkedHashSet、TreeSetQueue主要是实现类有ArrayDeque、PriorityQueue......
  • Java List常见面试题
    Java集合面试之List篇你好,面试官|我用JavaList狂怼面试官~(qq.com)本文涉及ArrayList与LinkedList区别、ArrayList扩容机制、CopyOnWriteArrayList特点、场景、思想ArrayList:基于数组实现的非线程安全的集合。实现RandomAccess接口,支持随机访问,查询元素快,插入,......
  • Java入门
    Java初识Java发展史时间节点1991年,Sun公司进军嵌入式开发,让电视、冰箱、微波炉等设备能够用上编程语言,成立了Green项目小组;1992年,由于C++语言的繁琐且不支持跨平台,研发团队基于C++开发了Oak语言;1995年,互联网大爆发,跨平台的特性使得Oak语言得到飞速发展,同时正式更名为Java(爪......
  • android面试题:谈谈对Java中多态的理解
     Java中的多态是面向对象编程的一个重要特征,它允许同一个类型的对象在不同的情况下表现出不同的行为。多态是Java语言中实现代码复用、提高代码可维护性和可扩展性的重要手段。 多态的实现基于两个核心概念:继承和方法重写。在Java中,子类可以继承父类的方法,并且可以重写(覆......
  • 剑指 Offer 48. 最长不含重复字符的子字符串 java
    请从字符串中找出一个最长的不包含重复字符的子字符串,计算该最长子字符串的长度。示例1:输入:"abcabcbb"输出:3解释:因为无重复字符的最长子串是"abc",所以其长度为3。示例2:输入:"bbbbb"输出:1解释:因为无重复字符的最长子串是"b",所以其长度为1。示例3:输入......
  • java基础-流程控制-day04
    目录1.if单分支2.ifelse多分支3.ifelse双分支4.随机生成一定区间的整数5switch1.if单分支publicclassTestIf01{ publicstaticvoidmain(String[]args){ //对三个数(1-6)求和 intnum1=6; intnum2=6; intnum3=5; intsum=0; sum+=nu......