首页 > 编程语言 >使用java代码提交flink job 任务

使用java代码提交flink job 任务

时间:2022-09-21 23:12:47浏览次数:54  
标签:java flink job client org apache import configuration

转:https://blog.csdn.net/pingweicheng/article/details/118223041

以下代码是使用java程序客户端提交flink job的示例代码

package client;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.graph.StreamGraph;
 
import java.io.File;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
 
 
/**
 * @ClassName FlinkClient
 * @Description TODO
 * @Author Getech
 * @Date 2021/6/24 17:59
 */
public class FlinkClient {
 
    public static void main(String[] args) {
        String jarFilePath = "D:\\02develop\\2020workspace\\apache-flink\\example\\WordCountSQL.jar";
        RestClusterClient<StandaloneClusterId> client = null;
        try {
            // 集群信息
            Configuration configuration = new Configuration();
            configuration.setString(JobManagerOptions.ADDRESS, "10.8.4.129");
            configuration.setInteger(JobManagerOptions.PORT, 6123);
            configuration.setInteger(RestOptions.PORT, 8081);
            client = new RestClusterClient<StandaloneClusterId>(configuration, StandaloneClusterId.getInstance());
            int parallelism = 1;
            File jarFile=new File(jarFilePath);
            SavepointRestoreSettings savepointRestoreSettings=SavepointRestoreSettings.none();
            PackagedProgram program = PackagedProgram.newBuilder()
                    .setConfiguration(configuration)
                    .setEntryPointClassName("org.apache.flink.table.examples.java.WordCountSQL")
                    .setJarFile(jarFile)
                    .setSavepointRestoreSettings(savepointRestoreSettings).build();
            JobGraph jobGraph=PackagedProgramUtils.createJobGraph(program,configuration,parallelism,false);
            CompletableFuture<JobID> result = client.submitJob(jobGraph);
            JobID jobId=  result.get();
            System.out.println("提交完成");
            System.out.println("jobId:"+ jobId.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
 
    }
}

  

标签:java,flink,job,client,org,apache,import,configuration
From: https://www.cnblogs.com/qsds/p/16717561.html

相关文章

  • java如何获取一个文本文件的编码(格式)信息呢?
    转自:http://www.java265.com/JavaJingYan/202110/16350332691561.html 文本文件是我们在windows平台下常用的一种文件格式,这种格式会随着操作系统的语言不同,而出现其......
  • JVM方法调用——java之间
    Java方法之间解释方法到解释方法进入解释方法到解释方法是最为简单的一种情况,最常见的调用是invokevirtual。有关的代码在TemplateTable::invokevirtual中:voidTemplat......
  • [javascript] js如何获取浏览器的语言
    当想要实现多语种时,需要获取浏览器的当前语言最直接的,就是访问浏览器内置的 navigator.language 属性:varlang=navigator.language 根据你的浏览器的设置,这段代码......
  • Java Stream流
    Java8Stream流编程Stream使用一种类似于SQL语句从数据库查询数据的直观方式来提供对Java集合运算和表达的高阶抽象。得益于Lambda所带来的函数式编程,StreamAPI可......
  • javascript: 复制数组时的深拷贝及浅拷贝(chrome 105.0.5195.125)
    一,js代码:<html><head><metacharset="utf-8"/><title>测试</title></head><body><buttononclick="assignCopy()">无效:变量直接赋值</button><br/><br......
  • Javaweb学习笔记第十弹
    本章存在的意义,大概就是为了回顾一下被遗忘不久的html了HTML:超文本标记语言(不区分大小写,语法较为松散,但建议书写时规范一些)HTML标签由浏览器来解析标签展示图片具体详......
  • javaScript 字符串方法,字符串搜索,
     //这是字符串 能够使用单引号或双引号    varmko='helloworedw'    varqwe="hello worasd"    //new 一个字符串   ......
  • Day7 Javase抽象接口以及异常的捕获和抛出
    Day7面向对象编程抽象abstract修饰抽象类,如果修饰方法就是抽象方法。抽象方法可以写方法体,然后让继承抽象类的类去重写抽象方法。java的类是单继承的,但是接口可以实现......
  • JAVA多线程-学习笔记
    1.1概述程序:程序是指令和数据的有序集合,其本身没有任何运行的含义,是一个静态的概念。进程(Porcess):是执行程序的一次执行过程,是一个动态的概念,是系统资源分配的单位。线......
  • Java: Immutable Patterns
     /***版权所有2022涂聚文有限公司*许可信息查看:*描述:*不变模式ImmutablePatterns*历史版本:JDK14.02*2022-09-12创建者geovindu*2022-09-1......