首页 > 其他分享 >Flink 1.17教程:并行度设置&优先级

Flink 1.17教程:并行度设置&优先级

时间:2023-09-02 11:37:20浏览次数:45  
标签:1.17 Flink flink 并行度 env org apache import


并行度设置&优先级

并行度(Parallelism)

并行度的设置

在Flink中,可以用不同的方法来设置并行度,它们的有效范围和优先级别也是不同的。

代码中设置

我们在代码中,可以很简单地在算子后跟着调用setParallelism()方法,来设置当前算子的并行度:

stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);

这种方式设置的并行度,只针对当前算子有效。

另外,我们也可以直接调用执行环境的setParallelism()方法,全局设定并行度:

env.setParallelism(2);

这样代码中所有算子,默认的并行度就都为2了。我们一般不会在程序中设置全局并行度,因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。

这里要注意的是,由于keyBy不是算子,所以无法对keyBy设置并行度。

提交应用时设置

在使用flink run命令提交应用时,可以增加-p参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置:

bin/flink run –p 2 –c com.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar

如果我们直接在Web UI上提交作业,也可以在对应输入框中直接添加并行度。

Flink 1.17教程:并行度设置&优先级_apache

Flink 1.17教程:并行度设置&优先级_大数据_02

Flink 1.17教程:并行度设置&优先级_flink_03

Flink 1.17教程:并行度设置&优先级_大数据_04

Flink 1.17教程:并行度设置&优先级_大数据_05

Flink 1.17教程:并行度设置&优先级_apache_06

package com.atguigu.wc;
 
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
 
/**
 * TODO DataStream实现Wordcount:读socket(无界流)
 *
 * @author
 * @version 1.0
 */
public class WordCountStreamUnboundedDemo {
    public static void main(String[] args) throws Exception {
        // TODO 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // IDEA运行时,也可以看到webui,一般用于本地测试
        // 需要引入一个依赖 flink-runtime-web
        // 在idea运行,不指定并行度,默认就是 电脑的 线程数
        // StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(3);
 
        // TODO 2.读取数据: socket
        DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);
 
        // TODO 3.处理数据: 切换、转换、分组、聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
                .flatMap(
                        (String value, Collector<Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1));
                            }
                        }
                )
                .setParallelism(2)
                .returns(Types.TUPLE(Types.STRING,Types.INT))
                // .returns(new TypeHint<Tuple2<String, Integer>>() {})
                .keyBy(value -> value.f0)
                .sum(1);
 
        // TODO 4.输出
        sum.print();
 
        // TODO 5.执行
        env.execute();
    }
}
 
/**
 并行度的优先级:
    代码:算子 > 代码:env > 提交时指定 > 配置文件
 */

并行度的优先级

代码:算子 > 代码:env > 提交时指定 > 配置文件


标签:1.17,Flink,flink,并行度,env,org,apache,import
From: https://blog.51cto.com/zhangxueliang/7331334

相关文章

  • Flink 1.17教程:算子链Operator Chain
    算子链OperatorChain在ApacheFlink中,算子链(OperatorChaining)是将多个操作符(算子)连接在一起形成一个链式结构的优化技术。算子链的作用是将多个操作符合并为一个单一的任务单元,以减少通信开销、提高执行效率和减少资源占用。通俗来说,算子链的作用可以比喻为将多个操作合并成一......
  • Flink 1.17教程:Hadoop yarn会话运行模式
    YARN运行模式_环境准备YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配T......
  • Flink 1.17教程:部署模式介绍及Standalone运行模式
    部署模式介绍在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink为各种场景提供了不同的部署模式,主要有以下三种:会话模式(SessionMode)、单作业模式(Per-JobMode)、应用模式(ApplicationMode)。它们的区别主要在于:集群的生命周期以及资源的分配方式;以及应用的mai......
  • Flink 1.17教程:Hadoop yarn运行模式——单作业模式和应用模式
    YARN运行模式_单作业模式单作业模式部署(1)执行命令提交作业YARN运行模式_应用模式应用模式同样非常简单,与单作业模式类似,直接执行flinkrun-application命令即可。如:bin/flinkrun-application-tyarn-application-ccom.atguigu.wc.WordCountStreamUnboundedDemo./FlinkTutorial......
  • Flink 1.17教程:命令行提交作业jar
    命令行提交作业bin/flinkrun-mnode001:8081-ccom.atguigu.wc.WordCountStreamUnboundedDemo../jar/FlinkTutorial-1.17-1.0-SNAPSHOT.jar连接成功Lastlogin:FriJun1614:44:012023from192.168.10.1[atguigu@node001~]$cd/opt/module/flink/flink-1.17.0/[atgu......
  • Flink 1.17教程:集群搭建、运行模式(standalone/yarn/k8s)及历史服务器
    集群角色集群启动如果是部署在本地,本地访问,无需进行任何配置,直接启动即可。如果是部署在服务器,需要远程访问,则需要将flink.conf中的localhost修改为服务器IP地址或是0.0.0.0节点服务器hadoop102hadoop103hadoop104角色JobManagerTaskManagerTaskManagerTaskManager[atguigu@node001......
  • Flink 1.17教程:WebUI提交作业及打jar包maven插件配置
    打jar包maven插件配置<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version>......
  • Flink 1.17教程:集群角色及集群启动
    集群角色集群启动如果是部署在本地,本地访问,无需进行任何配置,直接启动即可。如果是部署在服务器,需要远程访问,则需要将flink.conf中的localhost修改为服务器IP地址或是0.0.0.0节点服务器hadoop102hadoop103hadoop104角色JobManagerTaskManagerTaskManagerTaskManager[atguigu@node001......
  • Flink 1.17教程:wordcount maven工程java代码示例(批、流实现方式)
    批、流实现wordcount代码示例pom.xml<properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><......
  • Flink 1.17教程:DataStream实现Wordcount——读socket(无界流)
    pom.xml<properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>fli......