首页 > 其他分享 >flink 启动Job加载外部jar都有哪些方法?

flink 启动Job加载外部jar都有哪些方法?

时间:2024-09-20 15:27:57浏览次数:10  
标签:OPTION flink jar Job static new options Option

flink 启动Job加载外部jar都有哪些方法
在 Apache Flink 版本中,启动 Job 时加载外部 Jar 包有几种不同的方法。这些方法允许用户引入自定义的 UDF(用户定义函数)或其他依赖项。以下是几种常见的方法:

1.使用 flink run 命令直接启动
你可以通过命令行工具 flink run 来指定你的 Job 的主类以及外部 Jar 包的位置。例如:

flink run -c com.example.YourMainClass /path/to/your-job.jar -d --classpaths /path/to/external-jar.jar
这里的 -c 参数指定了 Job 的入口类,-d 表示在后台运行 Job,–classpaths 参数用来指定外部 Jar 包的位置。
2. 通过 Flink Web UI 提交 Job
如果使用的是 Flink 的 Web UI,可以直接在界面上选择你要提交的 Jar 包。在 Flink 的 Web UI 页面上,找到 “Submit New Job”,然后选择要上传运行的 Jar 包,并进行任务配置。
3. 使用 Flink 的 YARN Session 模式
如果你是在 YARN 上使用 Flink 的 Session 模式,那么可以在提交 Job 时通过配置文件或者命令行参数指定额外的 ClassPath。例如,在 flink-conf.yaml 中添加额外的 ClassPath:
`
yarn.application.classpaths: file:///path/to/external-jar.jar

`
4.使用 flink yarn cluster 或 flink yarn session 命令
当使用 Flink 的 YARN 集群模式时,也可以通过命令行参数来指定外部 Jar 包的位置:

flink yarn cluster --classpaths /path/to/external-jar.jar

flink yarn session --classpaths /path/to/external-jar.jar
5.将 Jar 包打包进主 Job Jar 包中
另一种方法是将所有的依赖 Jar 包都打成一个单独的 Jar 包,然后将这个 Jar 包作为主 Job 的一部分。这样就不需要在运行时显式地指定外部 Jar 包了。
注意事项
确保外部 Jar 包与你的 Flink 版本兼容。
如果使用的是 YARN 模式,确保外部 Jar 包能够正确地分发到各个 TaskManager 上。
如果遇到内存问题,可能需要调整 YARN 或 Flink 的配置,例如增加可用的内存或者调整并行度。
不知道各位是否发现,前面几种方法,都一次指定一个jar。如果某个job依赖多个jar,比如20,30个jar ,通过-C参数一个个指定。不太现实。
第5种方法,虽然可以将多个jar打成一个jar,但是我尝试过,多个jar打包成一个jar了,但是运行时,死活找不到类。下面我介绍第6种方法
6. 修改flink 源码 ,增加支持传入jar目录参数。
这样提供一个jar目录参数,不管有多少jar都放到这个目录。一个命令参数搞定,然后增加的功能,循环从jar目录装载jar到类路径中。这样就比较省事,另外就是不会跟系统或者其他job发生jar冲突。非常棒。
下面以flink 1.13版本为例,
先从git下载,url: https://github.com/apache/flink.git
说明怎么修改代码,一共修改了两个文件,
分别是CliFrontendParser.java,ProgramOptions.java
它们在flink-clients,但是运行环境它在flink-dist.jar中,修改完后,需要替换flink-dist.jar中对应这两个类
CliFrontendParser.java
备注//新增地方就是修改地方

点击查看CliFrontendParser代码
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.client.cli;

import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

/**
 * A simple command line parser (based on Apache Commons CLI) that extracts command line options.
 */
public class CliFrontendParser {

    static final Option HELP_OPTION =
            new Option(
                    "h",
                    "help",
                    false,
                    "Show the help message for the CLI Frontend or the action.");

    static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");

    static final Option CLASS_OPTION =
            new Option(
                    "c",
                    "class",
                    true,
                    "Class with the program entry point (\"main()\" method). Only needed if the "
                            + "JAR file does not specify the class in its manifest.");

    static final Option CLASSPATH_OPTION =
            new Option(
                    "C",
                    "classpath",
                    true,
                    "Adds a URL to each user code "
                            + "classloader  on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be "
                            + "accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple "
                            + "times for specifying more than one URL. The protocol must be supported by the "
                            + "{@link java.net.URLClassLoader}.");

    public static final Option PARALLELISM_OPTION =
            new Option(
                    "p",
                    "parallelism",
                    true,
                    "The parallelism with which to run the program. Optional flag to override the default value "
                            + "specified in the configuration.");
    public static final Option DETACHED_OPTION =
            new Option("d", "detached", false, "If present, runs " + "the job in detached mode");

    public static final Option SHUTDOWN_IF_ATTACHED_OPTION =
            new Option(
                    "sae",
                    "shutdownOnAttachedExit",
                    false,
                    "If the job is submitted in attached mode, perform a best-effort cluster shutdown "
                            + "when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.");
//**********新增地方*****************
    static final Option JARDIR_OPTION =
            new Option(
                    "jd",
                    "jardir",
                    true,
                    "Adds a jar dir to each user code "
                            + "classloader  on all nodes in the cluster. The paths must specify exists and be "
                            + "accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple "
                            + "times for specifying more than one URL. ");
//**********新增地方*****************                            
    /**
     * @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN
     *     deployments
     */
    @Deprecated
    public static final Option YARN_DETACHED_OPTION =
            new Option(
                    "yd",
                    "yarndetached",
                    false,
                    "If present, runs "
                            + "the job in detached mode (deprecated; use non-YARN specific option instead)");

    public static final Option ARGS_OPTION =
            new Option(
                    "a",
                    "arguments",
                    true,
                    "Program arguments. Arguments can also be added without -a, simply as trailing parameters.");

    public static final Option ADDRESS_OPTION =
            new Option(
                    "m",
                    "jobmanager",
                    true,
                    "Address of the JobManager to which to connect. "
                            + "Use this flag to connect to a different JobManager than the one specified in the configuration.");

    public static final Option SAVEPOINT_PATH_OPTION =
            new Option(
                    "s",
                    "fromSavepoint",
                    true,
                    "Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).");

    public static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION =
            new Option(
                    "n",
                    "allowNonRestoredState",
                    false,
                    "Allow to skip savepoint state that cannot be restored. "
                            + "You need to allow this if you removed an operator from your "
                            + "program that was part of the program when the savepoint was triggered.");

    static final Option SAVEPOINT_DISPOSE_OPTION =
            new Option("d", "dispose", true, "Path of savepoint to dispose.");

    // list specific options
    static final Option RUNNING_OPTION =
            new Option("r", "running", false, "Show only running programs and their JobIDs");

    static final Option SCHEDULED_OPTION =
            new Option("s", "scheduled", false, "Show only scheduled programs and their JobIDs");

    static final Option ALL_OPTION =
            new Option("a", "all", false, "Show all programs and their JobIDs");

    static final Option ZOOKEEPER_NAMESPACE_OPTION =
            new Option(
                    "z",
                    "zookeeperNamespace",
                    true,
                    "Namespace to create the Zookeeper sub-paths for high availability mode");

    static final Option CANCEL_WITH_SAVEPOINT_OPTION =
            new Option(
                    "s",
                    "withSavepoint",
                    true,
                    "**DEPRECATION WARNING**: "
                            + "Cancelling a job with savepoint is deprecated. Use \"stop\" instead. \n Trigger"
                            + " savepoint and cancel job. The target directory is optional. If no directory is "
                            + "specified, the configured default directory ("
                            + CheckpointingOptions.SAVEPOINT_DIRECTORY.key()
                            + ") is used.");

    public static final Option STOP_WITH_SAVEPOINT_PATH =
            new Option(
                    "p",
                    "savepointPath",
                    true,
                    "Path to the savepoint (for example hdfs:///flink/savepoint-1537). "
                            + "If no directory is specified, the configured default will be used (\""
                            + CheckpointingOptions.SAVEPOINT_DIRECTORY.key()
                            + "\").");

    public static final Option STOP_AND_DRAIN =
            new Option(
                    "d",
                    "drain",
                    false,
                    "Send MAX_WATERMARK before taking the savepoint and stopping the pipelne.");

    public static final Option PY_OPTION =
            new Option(
                    "py",
                    "python",
                    true,
                    "Python script with the program entry point. "
                            + "The dependent resources can be configured with the `--pyFiles` option.");

    public static final Option PYFILES_OPTION =
            new Option(
                    "pyfs",
                    "pyFiles",
                    true,
                    "Attach custom files for job. The standard resource file suffixes such as .py/.egg/.zip/.whl or directory are all supported. "
                            + "These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. "
                            + "Files suffixed with .zip will be extracted and added to PYTHONPATH. "
                            + "Comma (',') could be used as the separator to specify multiple files "
                            + "(e.g., --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).");

    public static final Option PYMODULE_OPTION =
            new Option(
                    "pym",
                    "pyModule",
                    true,
                    "Python module with the program entry point. "
                            + "This option must be used in conjunction with `--pyFiles`.");

    public static final Option PYREQUIREMENTS_OPTION =
            new Option(
                    "pyreq",
                    "pyRequirements",
                    true,
                    "Specify a requirements.txt file which defines the third-party dependencies. "
                            + "These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. "
                            + "A directory which contains the installation packages of these dependencies could be specified "
                            + "optionally. Use '#' as the separator if the optional parameter exists "
                            + "(e.g., --pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir).");

    public static final Option PYARCHIVE_OPTION =
            new Option(
                    "pyarch",
                    "pyArchives",
                    true,
                    "Add python archive files for job. The archive files will be extracted to the working directory "
                            + "of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory "
                            + "be specified. If the target directory name is specified, the archive file will be extracted to a "
                            + "directory with the specified name. Otherwise, the archive file will be extracted to a "
                            + "directory with the same name of the archive file. The files uploaded via this option are accessible "
                            + "via relative path. '#' could be used as the separator of the archive file path and the target directory "
                            + "name. Comma (',') could be used as the separator to specify multiple archive files. "
                            + "This option can be used to upload the virtual environment, the data files used in Python UDF "
                            + "(e.g., --pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutable "
                            + "py37.zip/py37/bin/python). The data files could be accessed in Python UDF, e.g.: "
                            + "f = open('data/data.txt', 'r').");

    public static final Option PYEXEC_OPTION =
            new Option(
                    "pyexec",
                    "pyExecutable",
                    true,
                    "Specify the path of the python interpreter used to execute the python UDF worker "
                            + "(e.g.: --pyExecutable /usr/local/bin/python3). "
                            + "The python UDF worker depends on Python 3.6+, Apache Beam (version == 2.27.0), "
                            + "Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). "
                            + "Please ensure that the specified environment meets the above requirements.");

    static {
        HELP_OPTION.setRequired(false);

        JAR_OPTION.setRequired(false);
        JAR_OPTION.setArgName("jarfile");

        CLASS_OPTION.setRequired(false);
        CLASS_OPTION.setArgName("classname");

        CLASSPATH_OPTION.setRequired(false);
        CLASSPATH_OPTION.setArgName("url");

        ADDRESS_OPTION.setRequired(false);
        ADDRESS_OPTION.setArgName("host:port");

        PARALLELISM_OPTION.setRequired(false);
        PARALLELISM_OPTION.setArgName("parallelism");

        DETACHED_OPTION.setRequired(false);
        SHUTDOWN_IF_ATTACHED_OPTION.setRequired(false);
        YARN_DETACHED_OPTION.setRequired(false);
        JARDIR_OPTION.setRequired(false);//新增地方

        ARGS_OPTION.setRequired(false);
        ARGS_OPTION.setArgName("programArgs");
        ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES);

        RUNNING_OPTION.setRequired(false);
        SCHEDULED_OPTION.setRequired(false);

        SAVEPOINT_PATH_OPTION.setRequired(false);
        SAVEPOINT_PATH_OPTION.setArgName("savepointPath");

        SAVEPOINT_ALLOW_NON_RESTORED_OPTION.setRequired(false);

        ZOOKEEPER_NAMESPACE_OPTION.setRequired(false);
        ZOOKEEPER_NAMESPACE_OPTION.setArgName("zookeeperNamespace");

        CANCEL_WITH_SAVEPOINT_OPTION.setRequired(false);
        CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory");
        CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true);

        STOP_WITH_SAVEPOINT_PATH.setRequired(false);
        STOP_WITH_SAVEPOINT_PATH.setArgName("savepointPath");
        STOP_WITH_SAVEPOINT_PATH.setOptionalArg(true);

        STOP_AND_DRAIN.setRequired(false);

        PY_OPTION.setRequired(false);
        PY_OPTION.setArgName("pythonFile");

        PYFILES_OPTION.setRequired(false);
        PYFILES_OPTION.setArgName("pythonFiles");

        PYMODULE_OPTION.setRequired(false);
        PYMODULE_OPTION.setArgName("pythonModule");

        PYREQUIREMENTS_OPTION.setRequired(false);

        PYARCHIVE_OPTION.setRequired(false);

        PYEXEC_OPTION.setRequired(false);
    }

    static final Options RUN_OPTIONS = getRunCommandOptions();

    private static Options buildGeneralOptions(Options options) {
        options.addOption(HELP_OPTION);
        // backwards compatibility: ignore verbose flag (-v)
        options.addOption(new Option("v", "verbose", false, "This option is deprecated."));
        return options;
    }

    private static Options getProgramSpecificOptions(Options options) {
        options.addOption(JAR_OPTION);
        options.addOption(CLASS_OPTION);
        options.addOption(CLASSPATH_OPTION);
        options.addOption(PARALLELISM_OPTION);
        options.addOption(ARGS_OPTION);
        options.addOption(DETACHED_OPTION);
        options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
        options.addOption(YARN_DETACHED_OPTION);
        options.addOption(PY_OPTION);
        options.addOption(PYFILES_OPTION);
        options.addOption(PYMODULE_OPTION);
        options.addOption(PYREQUIREMENTS_OPTION);
        options.addOption(PYARCHIVE_OPTION);
        options.addOption(PYEXEC_OPTION);
        options.addOption(JARDIR_OPTION);  //新增地方
        return options;
    }

    private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) {
        options.addOption(CLASS_OPTION);
        options.addOption(CLASSPATH_OPTION);
        options.addOption(PARALLELISM_OPTION);
        options.addOption(DETACHED_OPTION);
        options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
        options.addOption(PY_OPTION);
        options.addOption(PYFILES_OPTION);
        options.addOption(PYMODULE_OPTION);
        options.addOption(PYREQUIREMENTS_OPTION);
        options.addOption(PYARCHIVE_OPTION);
        options.addOption(PYEXEC_OPTION);
        options.addOption(JARDIR_OPTION); //新增地方
        return options;
    }

    public static Options getRunCommandOptions() {
        Options options = buildGeneralOptions(new Options());
        options = getProgramSpecificOptions(options);
        options.addOption(SAVEPOINT_PATH_OPTION);
        return options.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
    }

    static Options getInfoCommandOptions() {
        Options options = buildGeneralOptions(new Options());
        return getProgramSpecificOptions(options);
    }

    static Options getListCommandOptions() {
        Options options = buildGeneralOptions(new Options());
        options.addOption(ALL_OPTION);
        options.addOption(RUNNING_OPTION);
        return options.addOption(SCHEDULED_OPTION);
    }

    static Options getCancelCommandOptions() {
        Options options = buildGeneralOptions(new Options());
        return options.addOption(CANCEL_WITH_SAVEPOINT_OPTION);
    }

    static Options getStopCommandOptions() {
        return buildGeneralOptions(new Options())
                .addOption(STOP_WITH_SAVEPOINT_PATH)
                .addOption(STOP_AND_DRAIN);
    }

    static Options getSavepointCommandOptions() {
        Options options = buildGeneralOptions(new Options());
        options.addOption(SAVEPOINT_DISPOSE_OPTION);
        return options.addOption(JAR_OPTION);
    }

    // --------------------------------------------------------------------------------------------
    //  Help
    // --------------------------------------------------------------------------------------------

    private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
        Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
        o.addOption(SAVEPOINT_PATH_OPTION);
        return o.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
    }

    private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) {
        options.addOption(CLASS_OPTION);
        options.addOption(PARALLELISM_OPTION);
        return options;
    }

    private static Options getListOptionsWithoutDeprecatedOptions(Options options) {
        options.addOption(RUNNING_OPTION);
        options.addOption(ALL_OPTION);
        options.addOption(SCHEDULED_OPTION);
        return options;
    }

    private static Options getCancelOptionsWithoutDeprecatedOptions(Options options) {
        return options.addOption(CANCEL_WITH_SAVEPOINT_OPTION);
    }

    private static Options getStopOptionsWithoutDeprecatedOptions(Options options) {
        return options.addOption(STOP_WITH_SAVEPOINT_PATH).addOption(STOP_AND_DRAIN);
    }

    private static Options getSavepointOptionsWithoutDeprecatedOptions(Options options) {
        options.addOption(SAVEPOINT_DISPOSE_OPTION);
        options.addOption(JAR_OPTION);
        return options;
    }

    /** Prints the help for the client. */
    public static void printHelp(Collection<CustomCommandLine> customCommandLines) {
        System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
        System.out.println();
        System.out.println("The following actions are available:");

        printHelpForRun(customCommandLines);
        printHelpForRunApplication(customCommandLines);
        printHelpForInfo();
        printHelpForList(customCommandLines);
        printHelpForStop(customCommandLines);
        printHelpForCancel(customCommandLines);
        printHelpForSavepoint(customCommandLines);

        System.out.println();
    }

    public static void printHelpForRun(Collection<CustomCommandLine> customCommandLines) {
        HelpFormatter formatter = new HelpFormatter();
        formatter.setLeftPadding(5);
        formatter.setWidth(80);

        System.out.println("\nAction \"run\" compiles and runs a program.");
        System.out.println("\n  Syntax: run [OPTIONS] <jar-file> <arguments>");
        formatter.setSyntaxPrefix("  \"run\" action options:");
        formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));

        printCustomCliOptions(customCommandLines, formatter, true);

        System.out.println();
    }

    public static void printHelpForRunApplication(
            Collection<CustomCommandLine> customCommandLines) {
        HelpFormatter formatter = new HelpFormatter();
        formatter.setLeftPadding(5);
        formatter.setWidth(80);

        System.out.println("\nAction \"run-application\" runs an application in Application Mode.");
        System.out.println("\n  Syntax: run-application [OPTIONS] <jar-file> <arguments>");
        formatter.setSyntaxPrefix("  \"run-application\" action options:");

        // Only GenericCLI works with application mode, the other CLIs will be phased out
        // in the future
        List<CustomCommandLine> filteredCommandLines =
                customCommandLines.stream()
                        .filter((cli) -> cli instanceof GenericCLI)
                        .collect(Collectors.toList());

        printCustomCliOptions(filteredCommandLines, formatter, true);
        System.out.println();
    }

    public static void printHelpForInfo() {
        HelpFormatter formatter = new HelpFormatter();
        formatter.setLeftPadding(5);
        formatter.setWidth(80);

        System.out.println(
                "\nAction \"info\" shows the optimized execution plan of the program (JSON).");
        System.out.println("\n  Syntax: info [OPTIONS] <jar-file> <arguments>");
        formatter.setSyntaxPrefix("  \"info\" action options:");
        formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options()));

        System.out.println();
    }

    public static void printHelpForList(Collection<CustomCommandLine> customCommandLines) {
        HelpFormatter formatter = new HelpFormatter();
        formatter.setLeftPadding(5);
        formatter.setWidth(80);

        System.out.println("\nAction \"list\" lists running and scheduled programs.");
        System.out.println("\n  Syntax: list [OPTIONS]");
        formatter.setSyntaxPrefix("  \"list\" action options:");
        formatter.printHelp(" ", getListOptionsWithoutDeprecatedOptions(new Options()));

        printCustomCliOptions(customCommandLines, formatter, false);

        System.out.println();
    }

    public static void printHelpForStop(Collection<CustomCommandLine> customCommandLines) {
        HelpFormatter formatter = new HelpFormatter();
        formatter.setLeftPadding(5);
        formatter.setWidth(80);

        System.out.println(
                "\nAction \"stop\" stops a running program with a savepoint (streaming jobs only).");
        System.out.println("\n  Syntax: stop [OPTIONS] <Job ID>");
        formatter.setSyntaxPrefix("  \"stop\" action options:");
        formatter.printHelp(" ", getStopOptionsWithoutDeprecatedOptions(new Options()));

        printCustomCliOptions(customCommandLines, formatter, false);

        System.out.println();
    }

    public static void printHelpForCancel(Collection<CustomCommandLine> customCommandLines) {
        HelpFormatter formatter = new HelpFormatter();
        formatter.setLeftPadding(5);
        formatter.setWidth(80);

        System.out.println("\nAction \"cancel\" cancels a running program.");
        System.out.println("\n  Syntax: cancel [OPTIONS] <Job ID>");
        formatter.setSyntaxPrefix("  \"cancel\" action options:");
        formatter.printHelp(" ", getCancelOptionsWithoutDeprecatedOptions(new Options()));

        printCustomCliOptions(customCommandLines, formatter, false);

        System.out.println();
    }

    public static void printHelpForSavepoint(Collection<CustomCommandLine> customCommandLines) {
        HelpFormatter formatter = new HelpFormatter();
        formatter.setLeftPadding(5);
        formatter.setWidth(80);

        System.out.println(
                "\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones.");
        System.out.println("\n  Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]");
        formatter.setSyntaxPrefix("  \"savepoint\" action options:");
        formatter.printHelp(" ", getSavepointOptionsWithoutDeprecatedOptions(new Options()));

        printCustomCliOptions(customCommandLines, formatter, false);

        System.out.println();
    }

    /**
     * Prints custom cli options.
     *
     * @param formatter The formatter to use for printing
     * @param runOptions True if the run options should be printed, False to print only general
     *     options
     */
    private static void printCustomCliOptions(
            Collection<CustomCommandLine> customCommandLines,
            HelpFormatter formatter,
            boolean runOptions) {
        // prints options from all available command-line classes
        for (CustomCommandLine cli : customCommandLines) {
            formatter.setSyntaxPrefix("  Options for " + cli.getId() + " mode:");
            Options customOpts = new Options();
            cli.addGeneralOptions(customOpts);
            if (runOptions) {
                cli.addRunOptions(customOpts);
            }
            formatter.printHelp(" ", customOpts);
            System.out.println();
        }
    }

    public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLine commandLine) {
        if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
            String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
            boolean allowNonRestoredState =
                    commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
            return SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
        } else {
            return SavepointRestoreSettings.none();
        }
    }

    // --------------------------------------------------------------------------------------------
    //  Line Parsing
    // --------------------------------------------------------------------------------------------

    public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions)
            throws CliArgsException {
        final DefaultParser parser = new DefaultParser();

        try {
            return parser.parse(options, args, stopAtNonOptions);
        } catch (ParseException e) {
            throw new CliArgsException(e.getMessage());
        }
    }

    /**
     * Merges the given {@link Options} into a new Options object.
     *
     * @param optionsA options to merge, can be null if none
     * @param optionsB options to merge, can be null if none
     * @return
     */
    public static Options mergeOptions(@Nullable Options optionsA, @Nullable Options optionsB) {
        final Options resultOptions = new Options();
        if (optionsA != null) {
            for (Option option : optionsA.getOptions()) {
                resultOptions.addOption(option);
            }
        }

        if (optionsB != null) {
            for (Option option : optionsB.getOptions()) {
                resultOptions.addOption(option);
            }
        }

        return resultOptions;
    }
}

ProgramOptions.java 备注*//新增地方*就是修改地方
点击查看ProgramOptions代码
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.client.cli;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import org.apache.commons.cli.CommandLine;

import java.io.File;
import java.io.FilenameFilter;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.JARDIR_OPTION;//新增地方
import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
import static org.apache.flink.client.cli.ProgramOptionsUtils.containsPythonDependencyOptions;
import static org.apache.flink.client.cli.ProgramOptionsUtils.createPythonProgramOptions;
import static org.apache.flink.client.cli.ProgramOptionsUtils.isPythonEntryPoint;

/** Base class for command line options that refer to a JAR file program. */
public class ProgramOptions extends CommandLineOptions {

    private String jarFilePath;

    protected String entryPointClass;

    private final List<URL> classpaths;

    private final String[] programArgs;

    private final int parallelism;

    private final boolean detachedMode;

    private final boolean shutdownOnAttachedExit;

    private final SavepointRestoreSettings savepointSettings;

    protected ProgramOptions(CommandLine line) throws CliArgsException {
        super(line);

        this.entryPointClass =
                line.hasOption(CLASS_OPTION.getOpt())
                        ? line.getOptionValue(CLASS_OPTION.getOpt())
                        : null;

        this.jarFilePath =
                line.hasOption(JAR_OPTION.getOpt())
                        ? line.getOptionValue(JAR_OPTION.getOpt())
                        : null;

        this.programArgs = extractProgramArgs(line);

        List<URL> classpaths = new ArrayList<URL>();
        if (line.hasOption(CLASSPATH_OPTION.getOpt())) {
            for (String path : line.getOptionValues(CLASSPATH_OPTION.getOpt())) {
                try {
                    classpaths.add(new URL(path));
                } catch (MalformedURLException e) {
                    throw new CliArgsException("Bad syntax for classpath: " + path);
                }
            }
        }
        //*** 新增地方*****
        // load jardir all jar.
        if (line.hasOption(JARDIR_OPTION.getOpt())) {
            for (String path : line.getOptionValues(JARDIR_OPTION.getOpt())) {
                List<URL> jarFiles = null;
                try {
                    jarFiles = loadAllJarFromPathURl(path);
                } catch (MalformedURLException e) {
                    e.printStackTrace();
                    throw new CliArgsException("Bad syntax for classpath: " + path);
                }
                // classpaths.add(new URL(path));
                classpaths.addAll(jarFiles);
            }
        }
        //*** 新增地方*****
        this.classpaths = classpaths;

        if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
            String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());
            try {
                parallelism = Integer.parseInt(parString);
                if (parallelism <= 0) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new CliArgsException(
                        "The parallelism must be a positive number: " + parString);
            }
        } else {
            parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
        }

        detachedMode =
                line.hasOption(DETACHED_OPTION.getOpt())
                        || line.hasOption(YARN_DETACHED_OPTION.getOpt());
        shutdownOnAttachedExit = line.hasOption(SHUTDOWN_IF_ATTACHED_OPTION.getOpt());

        this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line);
    }
    //***新增地方**** start
    private List<URL> loadAllJarFromPathURl(String path) throws MalformedURLException {
        // 指定需要搜索的目录.
        List<URL> urls = new ArrayList<>();
        System.out.println("jar dir:" + path);
        // 创建File对象表示目录.
        File directory = new File(path);

        // 使用FilenameFilter过滤出以.jar结尾的文件.
        File[] jarFiles =
                directory.listFiles(
                        new FilenameFilter() {
                            @Override
                            public boolean accept(File dir, String name) {
                                return name.toLowerCase().endsWith(".jar");
                            }
                        });
        System.out.println("jarFiles len:" + jarFiles.length);
        // 遍历找到的jar文件
        if (jarFiles != null) {
            for (File jarFile : jarFiles) {
                System.out.println(jarFile.getAbsolutePath());
                URL url = jarFile.toURI().toURL();
                urls.add(url);
            }
        }
        return urls;
    }
    //***新增地方**** end
    protected String[] extractProgramArgs(CommandLine line) {
        String[] args =
                line.hasOption(ARGS_OPTION.getOpt())
                        ? line.getOptionValues(ARGS_OPTION.getOpt())
                        : line.getArgs();

        if (args.length > 0 && !line.hasOption(JAR_OPTION.getOpt())) {
            jarFilePath = args[0];
            args = Arrays.copyOfRange(args, 1, args.length);
        }

        return args;
    }

    public void validate() throws CliArgsException {
        // Java program should be specified a JAR file
        if (getJarFilePath() == null) {
            throw new CliArgsException("Java program should be specified a JAR file.");
        }
    }

    public String getJarFilePath() {
        return jarFilePath;
    }

    public String getEntryPointClassName() {
        return entryPointClass;
    }

    public List<URL> getClasspaths() {
        return classpaths;
    }

    public String[] getProgramArgs() {
        return programArgs;
    }

    public int getParallelism() {
        return parallelism;
    }

    public boolean getDetachedMode() {
        return detachedMode;
    }

    public boolean isShutdownOnAttachedExit() {
        return shutdownOnAttachedExit;
    }

    public SavepointRestoreSettings getSavepointRestoreSettings() {
        return savepointSettings;
    }

    public void applyToConfiguration(Configuration configuration) {
        if (getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT) {
            configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, getParallelism());
        }

        configuration.setBoolean(DeploymentOptions.ATTACHED, !getDetachedMode());
        configuration.setBoolean(
                DeploymentOptions.SHUTDOWN_IF_ATTACHED, isShutdownOnAttachedExit());
        ConfigUtils.encodeCollectionToConfig(
                configuration, PipelineOptions.CLASSPATHS, getClasspaths(), URL::toString);
        SavepointRestoreSettings.toConfiguration(getSavepointRestoreSettings(), configuration);
    }

    public static ProgramOptions create(CommandLine line) throws CliArgsException {
        if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {
            return createPythonProgramOptions(line);
        } else {
            return new ProgramOptions(line);
        }
    }
}
到底行不行呢?然后我们编写验证代码,分两部分 (1).flink job代码 这里直接复制wordcount例子,改个类名,然后调用(2)的jar中一个类 TestLoadExtJar.java
点击查看TestLoadExtJar代码
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.examples.java.testloadextjar;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

import com.test.A;

/**
 * Implements the "WordCount" program that computes a simple word occurrence histogram over text
 * files.
 *
 * <p>The input is a plain text file with lines separated by newline characters.
 *
 * <p>Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>
 * If no parameters are provided, the program is run with default data from {@link WordCountData}.
 *
 * <p>This example shows how to:
 *
 * <ul>
 *   <li>write a simple Flink program.
 *   <li>use Tuple data types.
 *   <li>write and use user-defined functions.
 * </ul>
 */
public class TestLoadExtJar {

    // *************************************************************************
    //     PROGRAM
    // *************************************************************************

    public static void main(String[] args) throws Exception {

        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        A a = new A();
        a.test();
        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataSet<String> text = null;
        if (params.has("input")) {
            // union all the inputs from text files
            for (String input : params.getMultiParameterRequired("input")) {
                if (text == null) {
                    text = env.readTextFile(input);
                } else {
                    text = text.union(env.readTextFile(input));
                }
            }
            Preconditions.checkNotNull(text, "Input DataSet should not be null.");
        } else {
            // get default test text data
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            text = WordCountData.getDefaultTextLineDataSet(env);
        }

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy(0)
                        .sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsCsv(params.get("output"), "\n", " ");
            // execute program
            env.execute("WordCount Example");
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }
    }

    // *************************************************************************
    //     USER FUNCTIONS
    // *************************************************************************

    /**
     * Implements the string tokenizer that splits sentences into words as a user-defined
     * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the
     * form of "(word,1)" ({@code Tuple2<String, Integer>}).
     */
    public static final class Tokenizer
            implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

(2)模拟第三方代码
使用idea 创建一个maven项目,创建一个A类很简单,供TestLoadExtJar 调用
A.java

点击查看A代码
package com.test;

public class A {
    public void test() {
        System.out.println("A");
    }
}
然后把TestLoadExtJar,模拟第三方代码这两个项目打包jar,假如TestLoadExtJar例子打包为TestLoadExtJar.jar 模拟第三方代码打包为testcallextjar-1.0-SNAPSHOT.jar,放在/usr/local/flink-1.13.0/extlib目录下 然后在flink 下运行,先用原来方式运行,然后看报错信息,再加jd参数,指定jar目录,看看能否解决 未加jd参数: ![](/i/l/?n=24&i=blog/1849636/202409/1849636-20240920151146409-50927224.png)

看到没,报类没找到
加了jd参数

然后就可以执行了
【注意】:如果不能从 https://github.com/apache/flink.git下载,可以从https://gitee.com/longsebo/flink.git下载(这个仓库了,代码已经修改)

最后
如果有问题或想沟通,可以留言

标签:OPTION,flink,jar,Job,static,new,options,Option
From: https://www.cnblogs.com/longsebo/p/18422523

相关文章

  • 如何基于Flink CDC与OceanBase构建实时数仓,实现简化链路,高效排查
    本文作者:阿里云FlinkSQL负责人,伍翀,ApacheFlinkPMCMember&Committer众多数据领域的专业人士都很熟悉ApacheFlink,它作为流式计算引擎,流批一体,其核心在于其强大的分布式流数据处理能力,同时巧妙地融合了流计算与批计算的能力,因此成为了众多企业在进行流式计算业务时的首......
  • Flink-cdc丢失数据排查
    一、获取任务信息任务id:i01f51582-d8be-4262-aefa-000000任务名称:ods_test1234丢失的数据时间:2024-09-1609:28:47 二、数据同步查看日志1、筛选日志筛选2024-09-1609:28:47到5分钟后数据2、查找快照id,筛选内容Committedsnapshot7258609197164498019(BaseRowDelt......
  • 大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
    点一下关注吧!!!非常感谢!!持续更新!!!目前已经更新到了:Hadoop(已更完)HDFS(已更完)MapReduce(已更完)Hive(已更完)Flume(已更完)Sqoop(已更完)Zookeeper(已更完)HBase(已更完)Redis(已更完)Kafka(已更完)Spark(已更完)Flink(正在更新!)章节内容上节我们完成了如下的内容:ManageOperatorStateStateBackendCheckpoint......
  • 大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
    点一下关注吧!!!非常感谢!!持续更新!!!目前已经更新到了:Hadoop(已更完)HDFS(已更完)MapReduce(已更完)Hive(已更完)Flume(已更完)Sqoop(已更完)Zookeeper(已更完)HBase(已更完)Redis(已更完)Kafka(已更完)Spark(已更完)Flink(正在更新!)章节内容上节我们完成了如下的内容:Flink并行度Flink并行度详解Flink并行度......
  • 大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置
    点一下关注吧!!!非常感谢!!持续更新!!!目前已经更新到了:Hadoop(已更完)HDFS(已更完)MapReduce(已更完)Hive(已更完)Flume(已更完)Sqoop(已更完)Zookeeper(已更完)HBase(已更完)Redis(已更完)Kafka(已更完)Spark(已更完)Flink(正在更新!)章节内容上节我们完成了如下的内容:FlinkTimeWatermarkJava代码实例测试简单介......
  • 替西帕肽;Mounjaro;Tirzepatide;CAS:2023788-19-2
    【替西帕肽Tirzepatide简介】    替西帕肽是一种GIP/GLP-1受体激动剂,由39个氨基酸的多肽组成。Tirzepatide(LY3298176)是葡萄糖依赖性胰岛素营养多肽(GIP)和胰高血糖素样肽-1(GLP-1)受体双重激动剂。Tirzepatide(LY3298176)在血糖控制和体重减轻方面的疗效明......
  • Java 如何计算jar包的HASH哈希值
    在做授权系统的时候用到了一个小功能发出来分享一下。全部代码如下:importjava.io.File;importjava.io.FileInputStream;importjava.io.InputStream;importjava.net.URISyntaxException;importjava.security.MessageDigest;importjava.security.NoSuchAlgorithmExcepti......
  • 如何使用Maven将项目中的依赖打进jar包
    需求有时候写一些库,需要其它三方依赖,但是又不想这个依赖影响到使用方,可以将这些三方依赖打到自己的项目jar包,并且更换包名,避免冲突(更换包名之后,项目中的类引用第三方依赖的类import语句也会跟着变化)。如Mybatis就使用了Ognl库,在打包时把Ognl的所有类都打到了Mybatis自己的jar中......
  • 计算机毕业设计Flink+Hadoop广告推荐系统 广告预测 广告数据分析可视化 广告爬虫 大数
    《Flink+Hadoop广告推荐系统》开题报告一、项目背景与意义随着互联网技术的飞速发展和数据量的爆炸性增长,广告推荐系统已成为互联网企业提升用户体验和增加收益的重要手段。传统的广告推荐系统往往面临计算效率低、实时性差、推荐精度不足等问题,难以满足当前复杂多变的业务需......
  • JAVA-IO获取resource WEB-INF 中文件 JAR包中
    getResource+getPath()classPaththis.getClass().getClassLoader().getResource(StringUtils.EMPTY).getPath()Stringpath=this.getClass().getClassLoader().getResource(fileName).getPath();StringfilePath=URLDecoder.decode(path,StandardCharsets.UTF_8);......