首页 > 其他分享 >aaaa

aaaa

时间:2022-11-02 16:59:38浏览次数:48  
标签:flinkdemo2022 flink boot aaaa import apache org

概述

希望通过本示例对flink 有一个轮廓性的认识

本示例实现效果:flink 连接 Socket Server,从Socker Server 中按行读取数据作为数据输入,将输入的数据根据空格切分、分组、求和,最终在控制台输出 各个词组出现的次数

前置条件

  • Flink 集群
  • 开发工具 Intellij idea
  • 开发语言---本示例采用java

示例中采用的版本信息

  • Flink 集群 :flink-1.15.2

  • JDK: 11

  • spring boot版本:2.7.5

WordCount 程序

引入依赖

  • 创建Maven Project,引入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.7.5</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
    
        <groupId>com.bbx</groupId>
        <artifactId>flinkdemo2022</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>flinkdemo2022</name>
        <description>Demo project for Spring Boot</description>
        <properties>
            <java.version>11</java.version>
            <flink.version>1.15.2</flink.version>
            <scala.version></scala.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.1.1</version>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                            <manifest>
                                <!--注意此处 是要运行的main 方法 -->
                                <mainClass>com.bbx.flinkdemo2022.wordconut.WcUnBoundStream</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    
  • Flink 程序整体分为五部分

    1. 创建执行环境
    2. 配置输入数据源
    3. 数据加工处理
    4. 数据输出
    5. 启动执行
  • WordCount 程序示例

    package com.bbx.flinkdemo2022.wordconut;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    import java.util.Arrays;
    
    @Slf4j
    public class WcUnBoundStream {
        public static void main(String[] args) throws Exception {
            //获取输入参数
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            String host = parameterTool.get("host");
            int port = parameterTool.getInt("port");
    
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //获取输入源
            env.socketTextStream(host, port)
                    //按照 空格进行分割
                    .flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
                        Arrays.stream(line.split(" ")).forEach(j -> {
                            collector.collect(Tuple2.of(j, 1L));
                        });
                    }).returns(Types.TUPLE(Types.STRING,Types.LONG))
                    .keyBy(data -> data.f0)
                    .sum(1)
                    // 输出
                    .print();
            // 启动执行
            env.execute("word count");
        }
    }
    
  • 打包程序--将程序进行打包

启动Socket Server

  • 在Linux 环境中通过 nc 命令启动

    nc -lk 3300      #本示例中执行该命令的主机IP为:192.168.10.131
    

提交job

  • 访问Flink UI,点击“Submit New Job ”菜单中 “Add New” 按钮 选择程序包(flinkdemo2022-0.0.1-SNAPSHOT-jar-with-dependencies.jar)
    image-20221021183420362

  • 配置必要的信息后提交
    image-20221021184010606

  • 查看运行中jobimage-20221021184312140

  • 在 Socket Server 端输入信息

    aaa bbb
    aaa qwe
    qwe aaa
    
  • 点击 “Task Managers” 菜单选择 taskmanager 查看 Stdout中输出
    image-20221021185720675

  • 取消任务
    image-20221021190420831

命令提交

  • 提交命令

    ./bin/flink    run -c com.bbx.flinkdemo2022.wordconut.WcUnBoundStream  -p 2 flinkdemo2022-0.0.1-SNAPSHOT-jar-with-dependencies.jar --host 192.168.10.131 --port 3300
    
  • 查看运行job

    ./bin/flink list
    
  • 取消命令

    bin/flink cancel  9f4a7fb98ddaae95d3333f4cce46d8a8    # 9f4a7fb98ddaae95d3333f4cce46d8a8 为JobID,可以通过命令
    
  • 命令指南

    ./bin/flink  -h
    

标签:flinkdemo2022,flink,boot,aaaa,import,apache,org
From: https://www.cnblogs.com/sxubo/p/16851526.html

相关文章

  • aaaa
    704.二分查找1.题目给定一个n个元素有序的(升序)整型数组nums和一个目标值target,写一个函数搜索nums中的target,如果目标值存在返回下标,否则返回-1。示例1:输......
  • AAAAAAAAAA
    name_score=-beta3*fuzz.ratio(name,dete_tokens_list[i])#dete_tokens_list=问题的文字形式if(head_id,name)intuple_topic:#如果预测的主语是......
  • FileNotFoundError: [Errno 2] No such file or directory: 'save\\txt\\aaaaa_po
    Python:使用Numpy.savetxt保存文件时,如果文件名过长,显示文件路径不存在错误首先排除文件路径是否真的不存在,加一句判断然后新建文件目录  如果文件路径存在,就可能是......