首页 > 其他分享 >第一章、Flink wordcount 入门示例

第一章、Flink wordcount 入门示例

时间:2022-11-02 16:44:47浏览次数:119  
标签:Flink 示例 flink wordcount import apache org

概述

希望通过本示例对flink 有一个轮廓性的认识

本示例实现效果:flink 连接 Socket Server,从Socker Server 中按行读取数据作为数据输入,将输入的数据根据空格切分、分组、求和,最终在控制台输出 各个词组出现的次数

前置条件

  • Flink 集群
  • 开发工具 Intellij idea
  • 开发语言---本示例采用java

示例中采用的版本信息

  • Flink 集群 :flink-1.15.2
  • JDK: 11
  • spring boot版本:2.7.5

WordCount 程序

引入依赖

  • 创建Maven Project,引入依赖
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <parent>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-parent</artifactId>
          <version>2.7.5</version>
          <relativePath/> <!-- lookup parent from repository -->
      </parent>
    
    
      <groupId>com.bbx</groupId>
      <artifactId>flinkdemo2022</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <name>flinkdemo2022</name>
      <description>Demo project for Spring Boot</description>
      <properties>
          <java.version>11</java.version>
          <flink.version>1.15.2</flink.version>
          <scala.version></scala.version>
      </properties>
      <dependencies>
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
          </dependency>
    
          <dependency>
              <groupId>org.projectlombok</groupId>
              <artifactId>lombok</artifactId>
              <optional>true</optional>
          </dependency>
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
          </dependency>
          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-java</artifactId>
              <version>${flink.version}</version>
          </dependency>
          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-streaming-java</artifactId>
              <version>${flink.version}</version>
          </dependency>
          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-clients</artifactId>
              <version>${flink.version}</version>
          </dependency>
      </dependencies>
    
      <build>
          <plugins>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-assembly-plugin</artifactId>
                  <version>3.1.1</version>
                  <configuration>
                      <descriptorRefs>
                          <descriptorRef>jar-with-dependencies</descriptorRef>
                      </descriptorRefs>
                      <archive>
                          <manifest>
                              <!--注意此处 是要运行的main 方法 -->
                              <mainClass>com.bbx.flinkdemo2022.wordconut.WcUnBoundStream</mainClass>
                          </manifest>
                      </archive>
                  </configuration>
                  <executions>
                      <execution>
                          <id>make-assembly</id>
                          <phase>package</phase>
                          <goals>
                              <goal>single</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>
    </project>

创建flink 程序

  • Flink 程序整体分为五部分
    1. 创建执行环境
    2. 配置输入数据源
    3. 数据加工处理
    4. 数据输出
    5. 启动执行
  • WordCount 程序示例
    package com.bbx.flinkdemo2022.wordconut;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    import java.util.Arrays;
    
    @Slf4j
    public class WcUnBoundStream {
      public static void main(String[] args) throws Exception {
          //获取输入参数
          ParameterTool parameterTool = ParameterTool.fromArgs(args);
          String host = parameterTool.get("host");
          int port = parameterTool.getInt("port");
    
          //创建执行环境
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          //获取输入源
          env.socketTextStream(host, port)
                  //按照 空格进行分割
                  .flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
                      Arrays.stream(line.split(" ")).forEach(j -> {
                          collector.collect(Tuple2.of(j, 1L));
                      });
                  }).returns(Types.TUPLE(Types.STRING,Types.LONG))
                  .keyBy(data -> data.f0)
                  .sum(1)
                  // 输出
                  .print();
          // 启动执行
          env.execute("word count");
      }
    }
  • 打包程序--将程序进行打包

启动Socket Server

  • 在Linux 环境中通过 nc 命令启动
    nc -lk 3300      #本示例中执行该命令的主机IP为:192.168.10.131

提交job

Flink UI 端提交

  • 访问Flink UI,点击“Submit New Job ”菜单中 “Add New” 按钮 选择程序包(flinkdemo2022-0.0.1-SNAPSHOT-jar-with-dependencies.jar)
    image-20221021183420362
  • 配置必要的信息后提交
    image-20221021184010606
  • 查看运行中jobimage-20221021184312140
  • 在 Socket Server 端输入信息
    aaa bbb
    aaa qwe
    qwe aaa
  • 点击 “Task Managers” 菜单选择 taskmanager 查看 Stdout中输出
    image-20221021185720675
  • 取消任务
    image-20221021190420831

命令提交

  • 提交命令
    ./bin/flink    run -c com.bbx.flinkdemo2022.wordconut.WcUnBoundStream  -p 2 flinkdemo2022-0.0.1-SNAPSHOT-jar-with-dependencies.jar --host 192.168.10.131 --port 3300
  • 查看运行job
    ./bin/flink list
  • 取消命令
    bin/flink cancel  9f4a7fb98ddaae95d3333f4cce46d8a8    # 9f4a7fb98ddaae95d3333f4cce46d8a8 为JobID,可以通过命令
  • 命令指南
    ./bin/flink  -h


来自为知笔记(Wiz)

标签:Flink,示例,flink,wordcount,import,apache,org
From: https://www.cnblogs.com/sxubo/p/16851518.html

相关文章

  • FlinkSQL之Windowing TVF
    WindowingTVF在Flink1.13版本之后出现的替代之前的Groupwindow的产物,官网描述其ismorepowerfulandeffective //TVF中的tumble滚动窗口 //tumble(tablesenso......
  • Qt编写本地摄像头综合应用示例(qcamera/ffmpeg/v4l2等)
    一、功能特点同时支持qcamera、ffmpeg、v4l2三种内核解析本地摄像头。提供函数findCamera自动搜索环境中的所有本地摄像头设备,搜索结果信号发出。支持自动搜索和指......
  • 设计模式-模板模式在Java中的使用示例
    场景模板模式模板模式又叫模板方法模式(TemplateMethodPattern),是指定义一个算法的骨架,并允许子类为一个或者多个步骤提供实现。模板模式使得子类可以在不改变算法......
  • Flink SQL UNNEST/UDTF 如何实现列转行?
    在SQL任务里面经常会遇到一列转多行的需求,今天就来总结一下在FlinkSQL里面如何实现列转行的,先来看下面的一个具体案例. 需求:原始数据格式如下namedata......
  • 编译gRPC相关示例程序,undefined reference to `deflateInit2_'等相关错误解决
    编译gRPC相关示例程序时,出现如下链接错误:/home/suph/.local/lib/libgrpc.a(message_compress.cc.o):Infunction`zlib_compress(grpc_slice_buffer*,grpc_slice_buffer*......
  • 伪分布示例解决方法
    产生问题如下:JAVA_HOME不存在,解决方法:vim/usr/local/hadoop/etc/hadoop/hadoop-env.sh中将JAVA_HOME与HADOOP_HOME进行填充,显示如下:此解决方案的依据是如下: ......
  • Flink集群部署
    集群standalone安装部署下载安装包下载页面:https://archive.apache.org/dist/flink/flink-1.7.2/我这里安装的flink-1.7.2-bin-hadoop27-scala_2.11.tgz版本。......
  • 如何使用容联SDK,以及如何使用回调简单示例
    一、容联SDK如何使用A、收费标准(公有云走网络的都是免费的)     语音会议提供一个房间最大支持32方     视频会议提供一个房间最大支持30方(免费的弊......
  • Qt用Poppler库解析PDF成图片的简单示例
    解析PDF这里用的是Poppler库,与之相关的库还有MuPDF库,参考了这个链接:​​https://people.freedesktop.org/~aacid/docs/qt5/​​相关链接:​​qt显示pdf——poppler-qt问题​......
  • FlinkSql之TableAPI详解
    一、FlinkSql的概念核心概念Flink的TableAPI和SQL是流批统一的API。这意味着TableAPI&SQL在无论有限的批式输入还是无限的流式输入下,都具有相同的语义。......