首页 > 其他分享 >Flink实验

Flink实验

时间:2023-12-28 11:48:32浏览次数:20  
标签:count Flink word String flink 实验 import public

 

题目:

实验八

姓名

 

日期12.8

实验环境:(1)Ubuntu18.04(或Ubuntu16.04)。

(2)IntelliJ IDEA。

(3)Flink1.9.1。

 

实验内容与完成情况:(1)使用IntelliJ IDEA工具开发WordCount程序

在Linux系统中安装IntelliJ IDEA,然后使用IntelliJ IDEA工具开发WordCount程序,并打包成JAR文件,提交到Flink中运行。

 

 

 

 

 

(2)数据流词频统计

使用Linux系统自带的NC程序模拟生成数据流,不断产生单词并发送出去。编写Flink程序对NC程序发来的单词进行实时处理,计算词频,并把词频统计结果输出。要求首先在IntelliJ IDEA中开发和调试程序,然后,再打成JAR包部署到Flink中运行。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WordCount {
    public static void main(String[] args) throws Exception {
    //定义socket的端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        } catch (Exception e) {
            System.err.println("指定port参数,默认值为9000");
            port = 9000;
        }
//获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入的数据
        DataStreamSource<String> text = env.socketTextStream("node1", port, "\n");
//计算数据
        DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
                })//打平操作,把每行的单词转为<word,count>类型的数据
                .keyBy("word")//针对相同的word数据进行分组
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小
                .sum("count");
//把数据打印到控制台
        windowCount.print()
                .setParallelism(1);//使用一个并行度
//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
        env.execute("streaming word count");
    }
    /**
     * 主要为了存储单词以及单词出现的次数
     */
    public static class WordWithCount {
        public String word;
        public long count;
        public WordWithCount() {
        }
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

 

 

出现的问题:1.在Idea中编写代码时,遇到无法找到依赖库和配置不正确的问题。2.maven打包出现问题

 

解决方案(列出遇到的问题和解决办法,列出没有解决的问题):1.网上查找资料,正确导包2.查阅资料解决

 

标签:count,Flink,word,String,flink,实验,import,public
From: https://www.cnblogs.com/jy-all-bug/p/17932362.html

相关文章

  • 05 读写I2C接口EEPROM实验
    软件版本:VIVADO2021.1操作系统:WIN1064bit硬件平台:适用XILINXA7/K7/Z7/ZU/KU系列FPGA登录米联客(MiLianKe)FPGA社区-www.uisrc.com观看免费视频课程、在线答疑解惑!1概述我们知道I2C总线具备广泛的用途,比如寄存器的配置,EEPROM的使用,更重要的是I2C总线上可以挂载非常多的外......
  • 软件构造实验三——调用JFinal框架实现增删改查的学生信息管理系统
    项目结构具体代码_JFinalDemoGenerator.javapackageorg.example.common.model;importcom.jfinal.plugin.activerecord.dialect.MysqlDialect;importcom.jfinal.plugin.activerecord.generator.Generator;importcom.jfinal.plugin.activerecord.generator.TypeMapping......
  • 软件构造实验二——图像增强与特效
    项目结构具体代码BaseConvert.java--将base64编码解码成我们能看懂的东西(功能)packagecom.example.testDong;importjava.io.*;importjava.util.Base64;publicclassBaseConvert{/***图片转化成base64字符串*@paramimgPath*@return*......
  • 01 FPGA流水灯实验
    软件版本:VIVADO2021.1操作系统:WIN1064bitaa硬件平台:适用XILINXA7/K7/Z7/ZU/KU系列FPGA登录米联客(MiLianKe)FPGA社区-www.uisrc.com观看免费视频课程、在线答疑解惑!1概述本章课程以大家熟悉的流水灯为例子,详细讲解了VIVADO软件的使用,包括创建FPGA工程,编写Verilog代码,添加......
  • OSPF GR(第14个实验)
    1、GR的作用技术保证了设备再重启过程中转发层面能过继续知道数据转发,同时控制层面邻居关系的重建以及路由计算等动作不会影像转发层面的功能,从而避免了路由震荡引发的业务中断,保证了关键业务的数据转发,提高了整个网络的可靠性。2、GR增加了Type9OpaqueLSA关键参数(TLV类型Typel......
  • Impala与Flink开发应用_tyt2023
    本实验基于MRS环境,Impala部分主要介绍基本操作。假定用户开发一个应用程序,用于管理企业中的使用A业务的用户信息,使用Impala客户端实现A业务操作流程。Flink部分主要介绍如何实现Flink与Kafka的连接以满足实时计算场景应用。购买MRS集群选择“自定义购买”区域:华北-北京四......
  • flink 的安装以及fink-cdc 基于多数据源导入的es 的简单使用
    此文档是参照flink-cdc文档(https://ververica.github.io/flink-cdc-connectors/master/content/快速上手/mysql-postgres-tutorial-zh.html)案例 的最佳实践1.下载flinkrelease最新版本1.18.0并解压, https://repo.maven.apache.org/maven2/org/apache/flink/flink-......
  • Flink计算TopN
    在ApacheFlink中实现高效的TopN数据处理,尤其是涉及时间窗口和多条件排序时,需要精细地控制数据流和状态管理。普通计算TopN:1.定义数据源(Source)首先,我们需要定义数据源。这可能是Kafka流、文件、数据库或任何其他支持的数据源。valstream:DataStream[YourType]=en......
  • 【Flink从入门到精通 05】Source&Sink
    【Flink从入门到精通05】Source&SinkFlink用于处理有状态的流式计算,需要对Source端的数据进行加工处理,然后写入到Sink端,下图展示了在Flink中数据所经历的过程,今天就根据这张图分别给大家分享下。01EnvironmentFlink所有的程序都从这一步开始,只有创建了执行环境,才能开......
  • Java版Flink(一)概述和入门案例
    一、概述1、Flink是什么ApacheFlinkisaframeworkanddistributedprocessingengineforstatefulcomputationsoverunboundedandboundeddatastreams.ApacheFlink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。官网地址2、Flink特点......