首页 > 其他分享 >12.25每日总结2

12.25每日总结2

时间:2023-12-25 18:01:12浏览次数:43  
标签:总结 每日 flink 12.25 api import apache org public

今天中午接着做大数据的实验

实验8

Flink初级编程实践

 

1.实验目的

(1)通过实验掌握基本的Flink编程方法。

(2)掌握用IntelliJ IDEA工具编写Flink程序的方法。

2.实验平台

(1)Ubuntu18.04(或Ubuntu16.04)。

(2)IntelliJ IDEA。

(3)Flink1.9.1。

3.实验步骤

(1)使用IntelliJ IDEA工具开发WordCount程序

在Linux系统中安装IntelliJ IDEA,然后使用IntelliJ IDEA工具开发WordCount程序,并打包成JAR文件,提交到Flink中运行。

 

WordCountData

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

public class WordCountData {
    public static final String[] WORDS=new String[]{"My name is LCZ, I am a college student studying at Shijiazhuang Railway University."};
    public WordCountData() {
    }
    public static DataSet<String> getDefaultTextLineDataset(ExecutionEnvironment env){
        return env.fromElements(WORDS);
    }
}


WordCountTokenizer

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{
    public WordCountTokenizer(){}
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] tokens = value.toLowerCase().split("\\W+");
        int len = tokens.length;
        for(int i = 0; i<len;i++){
            String tmp = tokens[i];
            if(tmp.length()>0){
                out.collect(new Tuple2<String, Integer>(tmp,Integer.valueOf(1)));
            }
        }
    }
}

WordCount

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.utils.ParameterTool;
public class WordCount {
    public WordCount(){}
    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        Object text;

        if(params.has("input")){
            text = env.readTextFile(params.get("input"));
        }else{
            text = WordCountData.getDefaultTextLineDataset(env);
        }
        AggregateOperator counts = ((DataSet)text).flatMap(new WordCountTokenizer()).groupBy(new int[]{0}).sum(1);

        if(params.has("output")){
            counts.writeAsCsv(params.get("output"),"\n", " ");
            env.execute();
        }else{
            counts.print();
        }
    }
}

 

  

 

 

(2)数据流词频统计

使用Linux系统自带的NC程序模拟生成数据流,不断产生单词并发送出去。编写Flink程序对NC程序发来的单词进行实时处理,计算词频,并把词频统计结果输出。要求首先在IntelliJ IDEA中开发和调试程序,然后,再打成JAR包部署到Flink中运行。

 

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class TongJi {
    public static void main(String[] args) throws Exception {
//定义socket的端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        } catch (Exception e) {
            System.err.println("指定port参数,默认值为9000");
            port = 9001;
        }
//获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入的数据
        DataStreamSource<String> text = env.socketTextStream("127.0.0.1", port, "\n");
//计算数据
        DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
                    public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                        String[] splits = value.split("\\s");
                        for (String word : splits) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })//打平操作,把每行的单词转为<word,count>类型的数据
                .keyBy("word")//针对相同的word数据进行分组
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小
                .sum("count");
//把数据打印到控制台
        windowCount.print()
                .setParallelism(1);//使用一个并行度
//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
        env.execute("streaming word count");
    }
    /**
     * 主要为了存储单词以及单词出现的次数
     */
    public static class WordWithCount {
        public String word;
        public long count;
        public WordWithCount() {
        }
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}
Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>TongJi</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.9.2</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.9.2</version>
        </dependency>
    </dependencies>

</project>

 

  

 

标签:总结,每日,flink,12.25,api,import,apache,org,public
From: https://www.cnblogs.com/louwangshayu/p/17926667.html

相关文章

  • 2023-2024-1学期20232412《网络空间安全导论》第六周学习总结
    教材学习总结初步认知应用安全在不同领域的应用了解身份认证与信任管理的方式认识隐私的定义及隐私保护方法了解云计算、物联网、人工智能的相关知识思维导图教材学习中的问题及解决方法问题1:对差分隐私的知识不够理解解决方式:向ChatGPT询问,寻求清晰的解释问题2:对比特......
  • Windows 11 绕过 TPM 方法总结,通用免 TPM 镜像下载 (2023 年 12 月更新)
    Windows11绕过TPM方法总结,通用免TPM镜像下载(2023年12月更新)在虚拟机、Mac电脑和TPM不符合要求的旧电脑上安装Windows11的通用方法总结请访问原文链接:https://sysin.org/blog/windows-11-no-tpm/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.org本文......
  • 12/25每日总结
    树与二叉树的转化树要变成二叉树,那就将树中的所有兄弟结点进行链接,然后每一层与上一层的连接只留下第一个结点的连接二叉树要变成树,那就反方向来一次,将除了第一个结点的其他结点与根节点连接上,然后将兄弟结点连接,这时候二叉树就变回了原来的树森林与二叉树的转化森林转化为二叉树,森......
  • 每日总结2023年12月25日
    临近期末,最近事比较多,对之前软件构造实验做一个汇总软件构造实验作业实验名称:百度机器翻译SDK实验,百度图像增强与特效SDK实验,JFinal极速开发框架实验班级:信2105-1 学号:20214309 姓名:陈俊杰 实验一:百度机器翻译SDK实验一、实验要求任务一:下载配置百度翻译Java相关库及......
  • 每日导数17
    很难的放缩:对数均值不等式已知函数\(f(x)=-2x-2\sinx+2m\lnx,m>0\)若存在\(f(x_1)=f(x_2)(x_1\neqx_2)\)\((1)\)判断\(2(x-\sinx)\)的单调性\((2)\)证明:\(x_1+x_2>1+\lnm\)解\((1)\)令\(g(x)=2(x-\sinx)\),\(g^{\prime}(x)=2-2\cosx>0\)从而其单调递增\((2)\......
  • 12.25
    今日写大作业实验三:JFinal极速开发框架实验 (2023.12.13日完成)    根据参考资料,学习JFinal极速开发框架的使用并如下任务:    任务一:了解Maven及其使用方法,总结其功能作用(占20%)    任务二:学习JFinal框架,基于Maven建立JFinal工程,并对JFinal框架功能进行总结介绍(占30%......
  • 南外集训 2023.12.25 T1
    给定一个图,求\(s\)到\(t\)的最短路,其中路径的长度是其长度前\(k\)大边的长度和。\(n,k\le1000,m\le2000\)。做法枚举被算入的最小边权\(w\),所有小于\(w\)的边权都可以视为\(0\),而我们需要确保大于等于\(w\)的边至少走了\(k\)条。如何实现这一点呢?通过记录已......
  • 12月24每日打卡
    实验8Flink初级编程实践 1.实验目的(1)通过实验掌握基本的Flink编程方法。(2)掌握用IntelliJIDEA工具编写Flink程序的方法。2.实验平台(1)Ubuntu18.04(或Ubuntu16.04)。(2)IntelliJIDEA。(3)Flink1.9.1。3.实验步骤(1)使用IntelliJIDEA工具开发WordCount程序在Linux系统中安装In......
  • 12.24每日总结1
    今天早上做了大数据的实验三(一)编程实现以下指定功能,并用Hadoop提供的HBaseShell命令完成相同任务:列出HBase所有的表的相关信息,例如表名;  在终端打印出指定的表的所有记录数据;  向已经创建好的表添加和删除指定的列族或列;  清空指定的表的所有记录数据;  ......
  • 12.24每日总结2
    今天下午做了大数据实验四1实验内容与完成情况:根据上面给出的Student表,在MySQL数据库中完成如下操作:(1)在MySQL中创建Student表,并录入数据;   (2)用SQL语句输出Student表中的所有记录;  (3)查询zhangsan的Computer成绩;  (4)修改lisi的Math成绩,改为95。     ......