首页 > 其他分享 >Flink WordCount入门

Flink WordCount入门

时间:2022-10-18 16:33:26浏览次数:69  
标签:flatMap word 入门 Flink WordCount executionEnvironment public String

下面通过一个单词统计的案例,快速上手应用 Flink,进行流处理(Streaming)和批处理(Batch)

单词统计(批处理)

  1. 引入依赖
<!--flink核心包-->
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.7.2</version>
  </dependency>
  <!--flink流处理包-->
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>1.7.2</version>
  </dependency>
  1. 代码实现
public class WordCountBatch {

    public static void main(String[] args) throws Exception {
        String inputFile= "E:\\data\\word.txt";
        String outPutFile= "E:\\data\\wordResult.txt";
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        //1. 读取数据
        DataSource<String> dataSource = executionEnvironment.readTextFile(inputFile);
        //2. 对数据进行处理,转成word,1的格式
        FlatMapOperator<String, Tuple2<String, Integer>> flatMapOperator = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = s.split(" ");
                for (String word : words) {
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        });
        //3. 对数据分组,相同word的一个组
        UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = flatMapOperator.groupBy(0);
        //4. 对分组后的数据求和
        AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);
        //5. 写出数据
        sum.writeAsCsv(outPutFile).setParallelism(1);
        //执行
        executionEnvironment.execute("wordcount batch process");
    }
}

执行 main 方法,得出结果。我测试的 word.txt 内容如下:

ni hao hi
wang mei mei
liu mei
ni hao
wo hen hao
this is a good idea
Apache Flink

输出的文件结果:

a,1
mei,3
Apache,1
Flink,1
good,1
hen,1
hi,1
idea,1
ni,2
is,1
liu,1
this,1
wo,1
hao,3
wang,1

单词统计(流数据)

需求:Socket 模拟实时发送单词,使用 Flink 实时接收数据,对指定时间窗口内(如 5s)的数据进行聚合统计,每隔 1s 汇总计算一次,并且把时间窗口内计算结果打印出来

public class WordCountStream {

    public static void main(String[] args) throws Exception {
        int port = 7000;
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> textStream = executionEnvironment.socketTextStream("192.168.56.103", port, "\n");
        SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] split = s.split("\\s");
                for (String word : split) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> word = tuple2SingleOutputStreamOperator.keyBy(0)
                .timeWindow(Time.seconds(5),Time.seconds(1)).sum(1);
        word.print();
        executionEnvironment.execute("wordcount stream process");
    }
}

运行起来之后,我们就可以开始发送 socket 请求过去。我们测试可以使用 netcat 工具。
在 linux 上安装好后,使用下面的命令:

nc -lk 7000

然后发送数据即可。

标签:flatMap,word,入门,Flink,WordCount,executionEnvironment,public,String
From: https://www.cnblogs.com/javammc/p/16803040.html

相关文章

  • C语言零基础入门-文件
    C语言零基础入门-文件这节课的主要内容:1,文件的基本知识。2,文件操作实例。1.基础知识1.1基础概念1.1.1文件这个概念不用说大家应该都知道是什么,虽然自己的定义可能不是很......
  • C语言零基础入门-结构体-01
    C语言零基础入门-结构体-01这节课的主要内容:1,什么是结构体。2,结构体的定义以及使用。1.什么是结构体这个问题就变得有意思了,为什么呢?因为他可以与我们之前的学习紧密相关了......
  • C语言零基础入门-06-习题
    C语言零基础入门-06-习题本节课的任务是: 布置5道指针的习题。习题01给定的5个整数(9,7,5,6,2),书写子函数进行排序,按照由小较大的顺序输出。要求:子函数之间数据传递使用指针进行。......
  • C语言零基础入门-指针-03
    C语言零基础入门-指针-03本节要点:1,指针操作int类型数据。2,指针操作float,char类型数据。01.指针操作int类型数据其实这一小节主要是带大家回顾一下,因为前边我们已经讲了很多......
  • C语言零基础入门-指针-04
    C语言零基础入门-指针-04本节要点:1,指向一维数组的指针。2,指向二维数组的指针。3,指针的输出练习。01.指向一维数组的指针有意思的来了,指针指向数组的情况非常常见,同样也是一......
  • 长链剖分入门
    模拟赛考到了,但是完全不会。填个坑。P5903【模板】树上\(k\)级祖先#include<bits/stdc++.h>#defineempemplace_backusingnamespacestd;constintN=5e5+......
  • 【2022.10.18】Linux入门基础(1)
    内容概要主题:linux运维(记)linux基础几乎以记忆为主(理论知识)运维的本质服务器介绍服务器品牌服务器参数服务器组件磁盘阵列虚拟化技术虚拟化软件安装虚......
  • HM-SCAli4【服务治理介绍、nacos入门】
    1服务治理介绍先来思考一个问题通过上一章的操作,我们已经可以实现微服务之间的调用。但是我们把服务提供者的网络地址(ip,端口)等硬编码到了代码中,这种做法存在许多问题:......
  • 一篇文章带你了解网页框架——Vue简单入门
    一篇文章带你了解网页框架——Vue简单入门这篇文章将会介绍我们前端入门级别的框架——Vue的简单使用如果你以后想从事后端程序员,又想要稍微了解前端框架知识,那么这篇文......
  • Golang入门:Linux上的go语言安装与配置
    Tips:本文以本文撰写时的Go语言最新版本,也就是go.1.19.2版本为例。Linux发行版本使用Ubuntu22.04.1LTS为例来做演示。安装C工具Go的工具链是用C语言编写......