首页 > 其他分享 >今日总结

今日总结

时间:2023-12-18 21:57:24浏览次数:23  
标签:总结 java -- flink 如下 所示 apache 今日

Flink是Apache软件基金会的一个顶级项目,是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架,并且可以同时支持实时计算和批量计算。Flink起源于Stratosphere 项目,该项目是在2010年到2014年间由柏林工业大学、柏林洪堡大学和哈索普拉特纳研究所联合开展的。2014年4月,Stratosphere代码被贡献给Apache软件基金会,成为Apache软件基金会孵化器项目。之后,团队的大部分创始成员离开大学,共同创办了一家名为Data Artisans的公司。在项目孵化期间,为了避免与另外一个项目发生重名,Stratosphere被重新命名为Flink。在德语中,Flink是“快速和灵巧”的意思,使用这个词作为项目名称,可以彰显流计算框架的速度快和灵活性强的特点。
本教程首先介绍Flink的安装,然后以WordCount程序为实例来介绍Flink编程方法。

一. 安装Flink

Flink的运行需要Java环境的支持,因此,在安装Flink之前,请先参照相关资料安装Java环境(比如Java8)。然后,到Flink官网下载安装包,比如下载得到的安装文件为flink-1.9.1-bin-scala_2.11.tgz,并且保存在Linux系统中(假设当前使用hadoop用户名登录了Linux系统,并且安装文件被保存到了“/home/hadoop/Downloads”目录下)。
也可以直接点击这里从百度云盘下载软件(提取码:ziyu)。进入百度网盘后,进入“软件”目录,找到flink-1.9.1-bin-scala_2.11.tgz文件,下载到本地。

然后,使用如下命令对安装文件进行解压缩:

  1. cd /home/hadoop/Downloads
  2. sudo tar -zxvf flink-1.9.1-bin-scala_2.11.tgz -C /usr/local
Shell 命令

修改目录名称,并设置权限,命令如下:

  1. cd /usr/local
  2. sudo mv ./ flink-1.9.1 ./flink
  3. sudo chown -R hadoop:hadoop ./flink
Shell 命令

Flink对于本地模式是开箱即用的,如果要修改Java运行环境,可以修改“/usr/local/flink/conf/flink-conf.yaml”文件中的env.java.home参数,设置为本地Java的绝对路径。
使用如下命令添加环境变量:

  1. vim ~/.bashrc
Shell 命令

在.bashrc文件中添加如下内容:

export FLINK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH

保存并退出.bashrc文件,然后执行如下命令让配置文件生效:

  1. source ~/.bashrc
Shell 命令

使用如下命令启动Flink:

  1. cd /usr/local/flink
  2. ./bin/start-cluster.sh
Shell 命令

使用jps命令查看进程:

$ jps
17942 TaskManagerRunner
18022 Jps
17503 StandaloneSessionClusterEntrypoint

如果能够看到TaskManagerRunner和StandaloneSessionClusterEntrypoint这两个进程,就说明启动成功。
Flink的JobManager同时会在8081端口上启动一个Web前端,可以在浏览器中输入“http://localhost:8081”来访问。
Flink安装包中自带了测试样例,这里可以运行WordCount样例程序来测试Flink的运行效果,具体命令如下:

  1. cd /usr/local/flink/bin
  2. ./flink run /usr/local/flink/examples/batch/WordCount.jar
Shell 命令

执行上述命令以后,如果执行成功,应该可以看到类似如下的屏幕信息:

Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
……

二. 编程实现WordCount程序

编写WordCount程序主要包括以下几个步骤:
(1)安装Maven
(2)编写代码
(3)使用Maven打包Java程序
(4)通过flink run命令运行程序

1. 安装Maven

Ubuntu中没有自带安装Maven,需要手动安装Maven。可以访问Maven官网下载安装文件,下载地址如下:
···
http://apache.fayea.com/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.zip
···
下载到Maven安装文件以后,保存到“~/Downloads”目录下。然后,可以选择安装在“/usr/local/maven”目录中,命令如下:

  1. sudo unzip ~/Downloads/apache-maven-3.3.9-bin.zip -d /usr/local
  2. cd /usr/local
  3. sudo mv apache-maven-3.3.9/ ./maven
  4. sudo chown -R hadoop ./maven
Shell 命令

2. 编写代码

在Linux终端中执行如下命令,在用户主文件夹下创建一个文件夹flinkapp作为应用程序根目录:

  1. cd ~ #进入用户主文件夹
  2. mkdir -p ./flinkapp/src/main/java
Shell 命令

然后,使用vim编辑器在“./flinkapp/src/main/java”目录下建立三个代码文件,即WordCountData.java、WordCountTokenizer.java和WordCount.java。
WordCountData.java用于提供原始数据,其内容如下:

  1. package cn.edu.xmu;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4.  
  5. public class WordCountData {
  6. public static final String[] WORDS=new String[]{"To be, or not to be,--that is the question:--", "Whether \'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,", "And by opposing end them?--To die,--to sleep,--", "No more; and by a sleep to say we end", "The heartache, and the thousand natural shocks", "That flesh is heir to,--\'tis a consummation", "Devoutly to be wish\'d. To die,--to sleep;--", "To sleep! perchance to dream:--ay, there\'s the rub;", "For in that sleep of death what dreams may come,", "When we have shuffled off this mortal coil,", "Must give us pause: there\'s the respect", "That makes calamity of so long life;", "For who would bear the whips and scorns of time,", "The oppressor\'s wrong, the proud man\'s contumely,", "The pangs of despis\'d love, the law\'s delay,", "The insolence of office, and the spurns", "That patient merit of the unworthy takes,", "When he himself might his quietus make", "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary life,", "But that the dread of something after death,--", "The undiscover\'d country, from whose bourn", "No traveller returns,--puzzles the will,", "And makes us rather bear those ills we have", "Than fly to others that we know not of?", "Thus conscience does make cowards of us all;", "And thus the native hue of resolution", "Is sicklied o\'er with the pale cast of thought;", "And enterprises of great pith and moment,", "With this regard, their currents turn awry,", "And lose the name of action.--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons", "Be all my sins remember\'d."};
  7. public WordCountData() {
  8. }
  9. public static DataSet<String> getDefaultTextLineDataset(ExecutionEnvironment env){
  10. return env.fromElements(WORDS);
  11. }
  12. }
Java

WordCountTokenizer.java用于切分句子,其内容如下:

  1. package cn.edu.xmu;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.util.Collector;
  5.  
  6.  
  7. public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{
  8.  
  9. public WordCountTokenizer(){}
  10.  
  11.  
  12. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
  13. String[] tokens = value.toLowerCase().split("\\W+");
  14. int len = tokens.length;
  15.  
  16. for(int i = 0; i<len;i++){
  17. String tmp = tokens[i];
  18. if(tmp.length()>0){
  19. out.collect(new Tuple2<String, Integer>(tmp,Integer.valueOf(1)));
  20. }
  21. }
  22. }
  23. }
Java

WordCount.java提供主函数,其内容如下:

  1. package cn.edu.xmu;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.operators.AggregateOperator;
  5. import org.apache.flink.api.java.utils.ParameterTool;
  6.  
  7.  
  8. public class WordCount {
  9.  
  10. public WordCount(){}
  11.  
  12. public static void main(String[] args) throws Exception {
  13. ParameterTool params = ParameterTool.fromArgs(args);
  14. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  15. env.getConfig().setGlobalJobParameters(params);
  16. Object text;
  17. //如果没有指定输入路径,则默认使用WordCountData中提供的数据
  18. if(params.has("input")){
  19. text = env.readTextFile(params.get("input"));
  20. }else{
  21. System.out.println("Executing WordCount example with default input data set.");
  22. System.out.println("Use -- input to specify file input.");
  23. text = WordCountData.getDefaultTextLineDataset(env);
  24. }
  25.  
  26. AggregateOperator counts = ((DataSet)text).flatMap(new WordCountTokenizer()).groupBy(new int[]{0}).sum(1);
  27. //如果没有指定输出,则默认打印到控制台
  28. if(params.has("output")){
  29. counts.writeAsCsv(params.get("output"),"\n", " ");
  30. env.execute();
  31. }else{
  32. System.out.println("Printing result to stdout. Use --output to specify output path.");
  33. counts.print();
  34. }
  35.  
  36. }
  37. }
Java

该程序依赖Flink Java API,因此,我们需要通过Maven进行编译打包。需要新建文件pom.xml,然后,在pom.xml文件中添加如下内容,用来声明该独立应用程序的信息以及与Flink的依赖关系:

<project>
    <groupId>cn.edu.xmu</groupId>
    <artifactId>simple-project</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>Simple Project</name>
    <packaging>jar</packaging>
    <version>1.0</version>
    <repositories>
        <repository>
            <id>jboss</id>
            <name>JBoss Repository</name>
            <url>http://repository.jboss.com/maven2/</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
    </dependencies>
</project>

3.使用Maven打包Java程序

为了保证Maven能够正常运行,先执行如下命令检查整个应用程序的文件结构:

  1. cd ~/flinkapp
  2. find .
Shell 命令

文件结构应该是类似如下的内容:

.
./src
./src/main
./src/main/java
./src/main/java/WordCountData.java
./src/main/java/WordCount.java
./src/main/java/WordCountTokenizer.java
./pom.xml

接下来,我们可以通过如下代码将整个应用程序打包成JAR包(注意:计算机需要保持连接网络的状态,而且首次运行打包命令时,Maven会自动下载依赖包,需要消耗几分钟的时间):

  1. cd ~/flinkapp #一定把这个目录设置为当前目录
  2. /usr/local/maven/bin/mvn package
Shell 命令

如果屏幕返回的信息中包含“BUILD SUCCESS”,则说明生成JAR包成功。

4.通过flink run命令运行程序

最后,可以将生成的JAR包通过flink run命令提交到Flink中运行(请确认已经启动Flink),命令如下:

  1. /usr/local/flink/bin/flink run --class cn.edu.xmu.WordCount ~/flinkapp/target/simple-project-1.0.jar
Shell 命令

执行成功后,可以在屏幕上看到词频统计结果。

三、使用IntelliJ IDEA开发调试WordCount程序

请参考相关网络资料完成IntelliJ IDEA的安装。
在开始本实验之前,首先要启动Flink。
下面介绍如何使用IntelliJ IDEA工具开发WordCount程序。
启动进入IDEA,如下图所示,新建一个项目。

执行如下图所示的操作。

如下图所示,填写GroupId和ArtifactId。这里的GroupId是dblab,ArtifactId是FlinkWordCount。

如下图所示,设置Project name为FlinkWordCount。

如果出现如下图所示内容,则点击Enable Auto-Import。

这时生成的项目目录结构如下图所示。

打开pom.xml文件,输入如下内容:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>dblab</groupId>
    <artifactId>FlinkWordCount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
    </dependencies>

如下图所示,创建Package。

如下图所示,输入package的名称为“cn.edu.xmu”。

如下图所示,新建一个java class文件。

如下图所示,输入文件名称WordCountData。

WordCountData.java用于提供原始数据,其内容如下:

  1. package cn.edu.xmu;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4.  
  5. public class WordCountData {
  6. public static final String[] WORDS=new String[]{"To be, or not to be,--that is the question:--", "Whether \'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,", "And by opposing end them?--To die,--to sleep,--", "No more; and by a sleep to say we end", "The heartache, and the thousand natural shocks", "That flesh is heir to,--\'tis a consummation", "Devoutly to be wish\'d. To die,--to sleep;--", "To sleep! perchance to dream:--ay, there\'s the rub;", "For in that sleep of death what dreams may come,", "When we have shuffled off this mortal coil,", "Must give us pause: there\'s the respect", "That makes calamity of so long life;", "For who would bear the whips and scorns of time,", "The oppressor\'s wrong, the proud man\'s contumely,", "The pangs of despis\'d love, the law\'s delay,", "The insolence of office, and the spurns", "That patient merit of the unworthy takes,", "When he himself might his quietus make", "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary life,", "But that the dread of something after death,--", "The undiscover\'d country, from whose bourn", "No traveller returns,--puzzles the will,", "And makes us rather bear those ills we have", "Than fly to others that we know not of?", "Thus conscience does make cowards of us all;", "And thus the native hue of resolution", "Is sicklied o\'er with the pale cast of thought;", "And enterprises of great pith and moment,", "With this regard, their currents turn awry,", "And lose the name of action.--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons", "Be all my sins remember\'d."};
  7. public WordCountData() {
  8. }
  9. public static DataSet<String> getDefaultTextLineDataset(ExecutionEnvironment env){
  10. return env.fromElements(WORDS);
  11. }
  12. }
Java

按照刚才同样的操作,创建第2个文件WordCountTokenizer.java。
WordCountTokenizer.java用于切分句子,其内容如下:

  1. package cn.edu.xmu;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.util.Collector;
  5.  
  6.  
  7. public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{
  8.  
  9. public WordCountTokenizer(){}
  10.  
  11.  
  12. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
  13. String[] tokens = value.toLowerCase().split("\\W+");
  14. int len = tokens.length;
  15.  
  16. for(int i = 0; i<len;i++){
  17. String tmp = tokens[i];
  18. if(tmp.length()>0){
  19. out.collect(new Tuple2<String, Integer>(tmp,Integer.valueOf(1)));
  20. }
  21. }
  22. }
  23. }
Java

按照刚才同样的操作,创建第3个文件WordCount.java。
WordCount.java提供主函数,其内容如下:

  1. package cn.edu.xmu;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.operators.AggregateOperator;
  5. import org.apache.flink.api.java.utils.ParameterTool;
  6.  
  7. public class WordCount {
  8. public WordCount(){}
  9. public static void main(String[] args) throws Exception {
  10. ParameterTool params = ParameterTool.fromArgs(args);
  11. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  12. env.getConfig().setGlobalJobParameters(params);
  13. Object text;
  14. //如果没有指定输入路径,则默认使用WordCountData中提供的数据
  15. if(params.has("input")){
  16. text = env.readTextFile(params.get("input"));
  17. }else{
  18. System.out.println("Executing WordCount example with default input data set.");
  19. System.out.println("Use -- input to specify file input.");
  20. text = WordCountData.getDefaultTextLineDataset(env);
  21. }
  22.  
  23. AggregateOperator counts = ((DataSet)text).flatMap(new WordCountTokenizer()).groupBy(new int[]{0}).sum(1);
  24. //如果没有指定输出,则默认打印到控制台
  25. if(params.has("output")){
  26. counts.writeAsCsv(params.get("output"),"\n", " ");
  27. env.execute();
  28. }else{
  29. System.out.println("Printing result to stdout. Use --output to specify output path.");
  30. counts.print();
  31. }
  32. }
  33. }
Java

三个代码文件创建好以后的效果,如下图所示。

如下图所示,在左侧目录树的pom.xml文件上单击鼠标右键,在弹出的菜单中选择Maven,再在弹出的菜单中选择Generate Sources and Update Folders。

如下图所示,在左侧目录树的pom.xml文件上单击鼠标右键,在弹出的菜单中选择Maven,再在弹出的菜单中选择Reimport。

如下图所示,执行编译。

如下图所示,打开WordCount.java代码文件,在这个代码文件的代码区域,鼠标右键单击,弹出菜单中选中“Run WordCount.main()”。

如下图所示,执行成功以后,可以看到词频统计结果。

下面要把代码进行编译打包,打包成jar包。为此,需要做一些准备工作。
如下图所示,进入设置界面。

如下图所示进行设置。

如下图所示进入Project Structure界面。

如下图所示进行设置。

如下图所示进行设置。

如下图所示进行设置。

如下图所示进行设置。

如下图所示进行设置。

如下图所示进行设置。在搜索框中输入“WordCount”就会自动搜索到主类,然后在搜索到的结果条上双击鼠标。

如下图所示,设置META-INF目录。

如下图所示进行设置。

如下图所示,进入编译打包菜单。

如下图所示,开始编译打包。

如下图所示,编译打包成功以后,可以看到生成的FlinkWordCount.jar文件。

最后,到Flink中运行FlinkWordCount.jar。这里一定要注意,要确认已经开启Flink系统。运行的命令和执行结果如下图所示。

标签:总结,java,--,flink,如下,所示,apache,今日
From: https://www.cnblogs.com/zhaoyueheng/p/17912385.html

相关文章

  • pandas常用方法总结
    pandas常用方法总结|pandas是用于数据分析的Python库,包含许多有用的方法,以下是pandas中一些主要的方法和功能:1.数据读取与写入2.DataFrame基本操作3.数据选择4.数据清洗与处理5.数据转换6.数据筛选与排序7.数据汇总与统计8.合并与连接9.缺失数据处理10.重塑与透视熟练掌握这些p......
  • 2023.12.18——每日总结
    学习所花时间(包括上课):9h代码量(行):0行博客量(篇):1篇今天,上午学习,下午学习;我了解到的知识点:1.JFinal明日计划:学习......
  • 12.18每日总结
    软件设计模式简单分类我们在未正式学习设计模式之前先去简单了解一下设计模式的主要三种分类:创建型模式用于描述“怎样创建对象”,它的主要特点是“将对象的创建与使用分离”。书中提供了单例、原型、工厂方法、抽象工厂、建造者等5种创建型模式。结构型模式用于描述如......
  • 总结篇:SpringBoot常用注解总结
    使用springboot开发的优点,就是不用部署war文件因为内部嵌入了tomcat的,允许通过maven来根据需要的starter,非常的方便,可以自动配置spring,为程序员减少大量时间用于写业务逻辑,更不用担心使用某个依赖的版本问题,springboot全部为你自己选择。springboot的常用注解:1、@SpringBootAppl......
  • redis实践经验总结
    Redis内存配置当Redis内存不足时,可能导致Key频繁被删除、响应时间变长、QPS不稳定等问题。当内存使用率达到80%以上时就需要我们警惕,并快速定位到内存占用的原因。一般来说,会有以下几种占用内存的情况:数据内存是Redis最主要的部分,存储Redis的键值信息。主要问题是BigKey问题......
  • border-image用法总结
    border-image支持渐变,可实现虚线边框,斑马纹边框border-image支持在外部显示图像,不占空间,不影响布局,且不受overflow:hidden限制border-image,box-shadow,outline均支持内填充,外填充,可以实现背景,边框,外延border-image内填充border-image:linear-gradient(rgba(0,0,0,.05),......
  • 数据库版本历史的总结-信创部分
    数据库版本历史的总结-信创部分背景总结了开源和国外商业数据库的非常简单的历史.发现想总结一下国产的数据库非常困难.云和恩墨的数据库排行榜上面,国产数据库有接近300种我感觉我这边几乎是无法进行学习和总结的.所以只能够将几种比较常见的数据库进行一些总结.人......
  • ASP.NET WEBAPI 接入微信公众平台总结,Token验证失败解决办法
    首先,请允许我说一句:shit!因为这个问题不难,但是网上有关ASP.NETWEBAPI的资料太少。都是PHP等等的。我也是在看了某位大神的博客后有启发,一点点研究出来的。来看正题!1.微信公众平台的接入方法,无非4个参数(signature,timestamp,nonce,echostr)加1个Token(两边对应)2.Token,timestamp,......
  • 今日头条丨零数科技创始人林乐:区块链或成数字经济转型中流砥柱
    3月31日到4月2日,中国电动汽车百人会论坛(2023)在北京召开。在区块链领域深耕多年的零数科技创始人兼CEO林乐在汽车产业数字化论坛上,发表了主旨演讲《以数据共享网络推动汽车产业数字化转型》。会后,记者就零数科技在区块链领域创新应用及商业落地方面的成果等有关话题对林乐进行了专访......
  • C#中ref关键字的用法总结
    C#中ref关键字的用法总结 ref表示引用的意思,C#中它有多种用法,这里简单总结一下:1、按引用传递参数具体可见:C#中的值传递与引用传递(in、out、ref)2、引用局部变量引用局部变量指的是在变量声明时使用ref关键字(或者使用refreadonly表示未只读),表示这个变......