flink 启动Job加载外部jar都有哪些方法
在 Apache Flink 版本中,启动 Job 时加载外部 Jar 包有几种不同的方法。这些方法允许用户引入自定义的 UDF(用户定义函数)或其他依赖项。以下是几种常见的方法:
1.使用 flink run 命令直接启动
你可以通过命令行工具 flink run 来指定你的 Job 的主类以及外部 Jar 包的位置。例如:
flink run -c com.example.YourMainClass /path/to/your-job.jar -d --classpaths /path/to/external-jar.jar
这里的 -c 参数指定了 Job 的入口类,-d 表示在后台运行 Job,–classpaths 参数用来指定外部 Jar 包的位置。
2. 通过 Flink Web UI 提交 Job
如果使用的是 Flink 的 Web UI,可以直接在界面上选择你要提交的 Jar 包。在 Flink 的 Web UI 页面上,找到 “Submit New Job”,然后选择要上传运行的 Jar 包,并进行任务配置。
3. 使用 Flink 的 YARN Session 模式
如果你是在 YARN 上使用 Flink 的 Session 模式,那么可以在提交 Job 时通过配置文件或者命令行参数指定额外的 ClassPath。例如,在 flink-conf.yaml 中添加额外的 ClassPath:
`
yarn.application.classpaths: file:///path/to/external-jar.jar
`
4.使用 flink yarn cluster 或 flink yarn session 命令
当使用 Flink 的 YARN 集群模式时,也可以通过命令行参数来指定外部 Jar 包的位置:
flink yarn cluster --classpaths /path/to/external-jar.jar
或
flink yarn session --classpaths /path/to/external-jar.jar
5.将 Jar 包打包进主 Job Jar 包中
另一种方法是将所有的依赖 Jar 包都打成一个单独的 Jar 包,然后将这个 Jar 包作为主 Job 的一部分。这样就不需要在运行时显式地指定外部 Jar 包了。
注意事项
确保外部 Jar 包与你的 Flink 版本兼容。
如果使用的是 YARN 模式,确保外部 Jar 包能够正确地分发到各个 TaskManager 上。
如果遇到内存问题,可能需要调整 YARN 或 Flink 的配置,例如增加可用的内存或者调整并行度。
不知道各位是否发现,前面几种方法,都一次指定一个jar。如果某个job依赖多个jar,比如20,30个jar ,通过-C参数一个个指定。不太现实。
第5种方法,虽然可以将多个jar打成一个jar,但是我尝试过,多个jar打包成一个jar了,但是运行时,死活找不到类。下面我介绍第6种方法
6. 修改flink 源码 ,增加支持传入jar目录参数。
这样提供一个jar目录参数,不管有多少jar都放到这个目录。一个命令参数搞定,然后增加的功能,循环从jar目录装载jar到类路径中。这样就比较省事,另外就是不会跟系统或者其他job发生jar冲突。非常棒。
下面以flink 1.13版本为例,
先从git下载,url: https://github.com/apache/flink.git
说明怎么修改代码,一共修改了两个文件,
分别是CliFrontendParser.java,ProgramOptions.java
它们在flink-clients,但是运行环境它在flink-dist.jar中,修改完后,需要替换flink-dist.jar中对应这两个类
CliFrontendParser.java
备注//新增地方就是修改地方
点击查看CliFrontendParser代码
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.cli;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
* A simple command line parser (based on Apache Commons CLI) that extracts command line options.
*/
public class CliFrontendParser {
static final Option HELP_OPTION =
new Option(
"h",
"help",
false,
"Show the help message for the CLI Frontend or the action.");
static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");
static final Option CLASS_OPTION =
new Option(
"c",
"class",
true,
"Class with the program entry point (\"main()\" method). Only needed if the "
+ "JAR file does not specify the class in its manifest.");
static final Option CLASSPATH_OPTION =
new Option(
"C",
"classpath",
true,
"Adds a URL to each user code "
+ "classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be "
+ "accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple "
+ "times for specifying more than one URL. The protocol must be supported by the "
+ "{@link java.net.URLClassLoader}.");
public static final Option PARALLELISM_OPTION =
new Option(
"p",
"parallelism",
true,
"The parallelism with which to run the program. Optional flag to override the default value "
+ "specified in the configuration.");
public static final Option DETACHED_OPTION =
new Option("d", "detached", false, "If present, runs " + "the job in detached mode");
public static final Option SHUTDOWN_IF_ATTACHED_OPTION =
new Option(
"sae",
"shutdownOnAttachedExit",
false,
"If the job is submitted in attached mode, perform a best-effort cluster shutdown "
+ "when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.");
//**********新增地方*****************
static final Option JARDIR_OPTION =
new Option(
"jd",
"jardir",
true,
"Adds a jar dir to each user code "
+ "classloader on all nodes in the cluster. The paths must specify exists and be "
+ "accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple "
+ "times for specifying more than one URL. ");
//**********新增地方*****************
/**
* @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN
* deployments
*/
@Deprecated
public static final Option YARN_DETACHED_OPTION =
new Option(
"yd",
"yarndetached",
false,
"If present, runs "
+ "the job in detached mode (deprecated; use non-YARN specific option instead)");
public static final Option ARGS_OPTION =
new Option(
"a",
"arguments",
true,
"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
public static final Option ADDRESS_OPTION =
new Option(
"m",
"jobmanager",
true,
"Address of the JobManager to which to connect. "
+ "Use this flag to connect to a different JobManager than the one specified in the configuration.");
public static final Option SAVEPOINT_PATH_OPTION =
new Option(
"s",
"fromSavepoint",
true,
"Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).");
public static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION =
new Option(
"n",
"allowNonRestoredState",
false,
"Allow to skip savepoint state that cannot be restored. "
+ "You need to allow this if you removed an operator from your "
+ "program that was part of the program when the savepoint was triggered.");
static final Option SAVEPOINT_DISPOSE_OPTION =
new Option("d", "dispose", true, "Path of savepoint to dispose.");
// list specific options
static final Option RUNNING_OPTION =
new Option("r", "running", false, "Show only running programs and their JobIDs");
static final Option SCHEDULED_OPTION =
new Option("s", "scheduled", false, "Show only scheduled programs and their JobIDs");
static final Option ALL_OPTION =
new Option("a", "all", false, "Show all programs and their JobIDs");
static final Option ZOOKEEPER_NAMESPACE_OPTION =
new Option(
"z",
"zookeeperNamespace",
true,
"Namespace to create the Zookeeper sub-paths for high availability mode");
static final Option CANCEL_WITH_SAVEPOINT_OPTION =
new Option(
"s",
"withSavepoint",
true,
"**DEPRECATION WARNING**: "
+ "Cancelling a job with savepoint is deprecated. Use \"stop\" instead. \n Trigger"
+ " savepoint and cancel job. The target directory is optional. If no directory is "
+ "specified, the configured default directory ("
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key()
+ ") is used.");
public static final Option STOP_WITH_SAVEPOINT_PATH =
new Option(
"p",
"savepointPath",
true,
"Path to the savepoint (for example hdfs:///flink/savepoint-1537). "
+ "If no directory is specified, the configured default will be used (\""
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key()
+ "\").");
public static final Option STOP_AND_DRAIN =
new Option(
"d",
"drain",
false,
"Send MAX_WATERMARK before taking the savepoint and stopping the pipelne.");
public static final Option PY_OPTION =
new Option(
"py",
"python",
true,
"Python script with the program entry point. "
+ "The dependent resources can be configured with the `--pyFiles` option.");
public static final Option PYFILES_OPTION =
new Option(
"pyfs",
"pyFiles",
true,
"Attach custom files for job. The standard resource file suffixes such as .py/.egg/.zip/.whl or directory are all supported. "
+ "These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. "
+ "Files suffixed with .zip will be extracted and added to PYTHONPATH. "
+ "Comma (',') could be used as the separator to specify multiple files "
+ "(e.g., --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).");
public static final Option PYMODULE_OPTION =
new Option(
"pym",
"pyModule",
true,
"Python module with the program entry point. "
+ "This option must be used in conjunction with `--pyFiles`.");
public static final Option PYREQUIREMENTS_OPTION =
new Option(
"pyreq",
"pyRequirements",
true,
"Specify a requirements.txt file which defines the third-party dependencies. "
+ "These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. "
+ "A directory which contains the installation packages of these dependencies could be specified "
+ "optionally. Use '#' as the separator if the optional parameter exists "
+ "(e.g., --pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir).");
public static final Option PYARCHIVE_OPTION =
new Option(
"pyarch",
"pyArchives",
true,
"Add python archive files for job. The archive files will be extracted to the working directory "
+ "of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory "
+ "be specified. If the target directory name is specified, the archive file will be extracted to a "
+ "directory with the specified name. Otherwise, the archive file will be extracted to a "
+ "directory with the same name of the archive file. The files uploaded via this option are accessible "
+ "via relative path. '#' could be used as the separator of the archive file path and the target directory "
+ "name. Comma (',') could be used as the separator to specify multiple archive files. "
+ "This option can be used to upload the virtual environment, the data files used in Python UDF "
+ "(e.g., --pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutable "
+ "py37.zip/py37/bin/python). The data files could be accessed in Python UDF, e.g.: "
+ "f = open('data/data.txt', 'r').");
public static final Option PYEXEC_OPTION =
new Option(
"pyexec",
"pyExecutable",
true,
"Specify the path of the python interpreter used to execute the python UDF worker "
+ "(e.g.: --pyExecutable /usr/local/bin/python3). "
+ "The python UDF worker depends on Python 3.6+, Apache Beam (version == 2.27.0), "
+ "Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). "
+ "Please ensure that the specified environment meets the above requirements.");
static {
HELP_OPTION.setRequired(false);
JAR_OPTION.setRequired(false);
JAR_OPTION.setArgName("jarfile");
CLASS_OPTION.setRequired(false);
CLASS_OPTION.setArgName("classname");
CLASSPATH_OPTION.setRequired(false);
CLASSPATH_OPTION.setArgName("url");
ADDRESS_OPTION.setRequired(false);
ADDRESS_OPTION.setArgName("host:port");
PARALLELISM_OPTION.setRequired(false);
PARALLELISM_OPTION.setArgName("parallelism");
DETACHED_OPTION.setRequired(false);
SHUTDOWN_IF_ATTACHED_OPTION.setRequired(false);
YARN_DETACHED_OPTION.setRequired(false);
JARDIR_OPTION.setRequired(false);//新增地方
ARGS_OPTION.setRequired(false);
ARGS_OPTION.setArgName("programArgs");
ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES);
RUNNING_OPTION.setRequired(false);
SCHEDULED_OPTION.setRequired(false);
SAVEPOINT_PATH_OPTION.setRequired(false);
SAVEPOINT_PATH_OPTION.setArgName("savepointPath");
SAVEPOINT_ALLOW_NON_RESTORED_OPTION.setRequired(false);
ZOOKEEPER_NAMESPACE_OPTION.setRequired(false);
ZOOKEEPER_NAMESPACE_OPTION.setArgName("zookeeperNamespace");
CANCEL_WITH_SAVEPOINT_OPTION.setRequired(false);
CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory");
CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true);
STOP_WITH_SAVEPOINT_PATH.setRequired(false);
STOP_WITH_SAVEPOINT_PATH.setArgName("savepointPath");
STOP_WITH_SAVEPOINT_PATH.setOptionalArg(true);
STOP_AND_DRAIN.setRequired(false);
PY_OPTION.setRequired(false);
PY_OPTION.setArgName("pythonFile");
PYFILES_OPTION.setRequired(false);
PYFILES_OPTION.setArgName("pythonFiles");
PYMODULE_OPTION.setRequired(false);
PYMODULE_OPTION.setArgName("pythonModule");
PYREQUIREMENTS_OPTION.setRequired(false);
PYARCHIVE_OPTION.setRequired(false);
PYEXEC_OPTION.setRequired(false);
}
static final Options RUN_OPTIONS = getRunCommandOptions();
private static Options buildGeneralOptions(Options options) {
options.addOption(HELP_OPTION);
// backwards compatibility: ignore verbose flag (-v)
options.addOption(new Option("v", "verbose", false, "This option is deprecated."));
return options;
}
private static Options getProgramSpecificOptions(Options options) {
options.addOption(JAR_OPTION);
options.addOption(CLASS_OPTION);
options.addOption(CLASSPATH_OPTION);
options.addOption(PARALLELISM_OPTION);
options.addOption(ARGS_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
options.addOption(YARN_DETACHED_OPTION);
options.addOption(PY_OPTION);
options.addOption(PYFILES_OPTION);
options.addOption(PYMODULE_OPTION);
options.addOption(PYREQUIREMENTS_OPTION);
options.addOption(PYARCHIVE_OPTION);
options.addOption(PYEXEC_OPTION);
options.addOption(JARDIR_OPTION); //新增地方
return options;
}
private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) {
options.addOption(CLASS_OPTION);
options.addOption(CLASSPATH_OPTION);
options.addOption(PARALLELISM_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
options.addOption(PY_OPTION);
options.addOption(PYFILES_OPTION);
options.addOption(PYMODULE_OPTION);
options.addOption(PYREQUIREMENTS_OPTION);
options.addOption(PYARCHIVE_OPTION);
options.addOption(PYEXEC_OPTION);
options.addOption(JARDIR_OPTION); //新增地方
return options;
}
public static Options getRunCommandOptions() {
Options options = buildGeneralOptions(new Options());
options = getProgramSpecificOptions(options);
options.addOption(SAVEPOINT_PATH_OPTION);
return options.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
}
static Options getInfoCommandOptions() {
Options options = buildGeneralOptions(new Options());
return getProgramSpecificOptions(options);
}
static Options getListCommandOptions() {
Options options = buildGeneralOptions(new Options());
options.addOption(ALL_OPTION);
options.addOption(RUNNING_OPTION);
return options.addOption(SCHEDULED_OPTION);
}
static Options getCancelCommandOptions() {
Options options = buildGeneralOptions(new Options());
return options.addOption(CANCEL_WITH_SAVEPOINT_OPTION);
}
static Options getStopCommandOptions() {
return buildGeneralOptions(new Options())
.addOption(STOP_WITH_SAVEPOINT_PATH)
.addOption(STOP_AND_DRAIN);
}
static Options getSavepointCommandOptions() {
Options options = buildGeneralOptions(new Options());
options.addOption(SAVEPOINT_DISPOSE_OPTION);
return options.addOption(JAR_OPTION);
}
// --------------------------------------------------------------------------------------------
// Help
// --------------------------------------------------------------------------------------------
private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
o.addOption(SAVEPOINT_PATH_OPTION);
return o.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
}
private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) {
options.addOption(CLASS_OPTION);
options.addOption(PARALLELISM_OPTION);
return options;
}
private static Options getListOptionsWithoutDeprecatedOptions(Options options) {
options.addOption(RUNNING_OPTION);
options.addOption(ALL_OPTION);
options.addOption(SCHEDULED_OPTION);
return options;
}
private static Options getCancelOptionsWithoutDeprecatedOptions(Options options) {
return options.addOption(CANCEL_WITH_SAVEPOINT_OPTION);
}
private static Options getStopOptionsWithoutDeprecatedOptions(Options options) {
return options.addOption(STOP_WITH_SAVEPOINT_PATH).addOption(STOP_AND_DRAIN);
}
private static Options getSavepointOptionsWithoutDeprecatedOptions(Options options) {
options.addOption(SAVEPOINT_DISPOSE_OPTION);
options.addOption(JAR_OPTION);
return options;
}
/** Prints the help for the client. */
public static void printHelp(Collection<CustomCommandLine> customCommandLines) {
System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
System.out.println();
System.out.println("The following actions are available:");
printHelpForRun(customCommandLines);
printHelpForRunApplication(customCommandLines);
printHelpForInfo();
printHelpForList(customCommandLines);
printHelpForStop(customCommandLines);
printHelpForCancel(customCommandLines);
printHelpForSavepoint(customCommandLines);
System.out.println();
}
public static void printHelpForRun(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println("\nAction \"run\" compiles and runs a program.");
System.out.println("\n Syntax: run [OPTIONS] <jar-file> <arguments>");
formatter.setSyntaxPrefix(" \"run\" action options:");
formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));
printCustomCliOptions(customCommandLines, formatter, true);
System.out.println();
}
public static void printHelpForRunApplication(
Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println("\nAction \"run-application\" runs an application in Application Mode.");
System.out.println("\n Syntax: run-application [OPTIONS] <jar-file> <arguments>");
formatter.setSyntaxPrefix(" \"run-application\" action options:");
// Only GenericCLI works with application mode, the other CLIs will be phased out
// in the future
List<CustomCommandLine> filteredCommandLines =
customCommandLines.stream()
.filter((cli) -> cli instanceof GenericCLI)
.collect(Collectors.toList());
printCustomCliOptions(filteredCommandLines, formatter, true);
System.out.println();
}
public static void printHelpForInfo() {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println(
"\nAction \"info\" shows the optimized execution plan of the program (JSON).");
System.out.println("\n Syntax: info [OPTIONS] <jar-file> <arguments>");
formatter.setSyntaxPrefix(" \"info\" action options:");
formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options()));
System.out.println();
}
public static void printHelpForList(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println("\nAction \"list\" lists running and scheduled programs.");
System.out.println("\n Syntax: list [OPTIONS]");
formatter.setSyntaxPrefix(" \"list\" action options:");
formatter.printHelp(" ", getListOptionsWithoutDeprecatedOptions(new Options()));
printCustomCliOptions(customCommandLines, formatter, false);
System.out.println();
}
public static void printHelpForStop(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println(
"\nAction \"stop\" stops a running program with a savepoint (streaming jobs only).");
System.out.println("\n Syntax: stop [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"stop\" action options:");
formatter.printHelp(" ", getStopOptionsWithoutDeprecatedOptions(new Options()));
printCustomCliOptions(customCommandLines, formatter, false);
System.out.println();
}
public static void printHelpForCancel(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println("\nAction \"cancel\" cancels a running program.");
System.out.println("\n Syntax: cancel [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"cancel\" action options:");
formatter.printHelp(" ", getCancelOptionsWithoutDeprecatedOptions(new Options()));
printCustomCliOptions(customCommandLines, formatter, false);
System.out.println();
}
public static void printHelpForSavepoint(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println(
"\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones.");
System.out.println("\n Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]");
formatter.setSyntaxPrefix(" \"savepoint\" action options:");
formatter.printHelp(" ", getSavepointOptionsWithoutDeprecatedOptions(new Options()));
printCustomCliOptions(customCommandLines, formatter, false);
System.out.println();
}
/**
* Prints custom cli options.
*
* @param formatter The formatter to use for printing
* @param runOptions True if the run options should be printed, False to print only general
* options
*/
private static void printCustomCliOptions(
Collection<CustomCommandLine> customCommandLines,
HelpFormatter formatter,
boolean runOptions) {
// prints options from all available command-line classes
for (CustomCommandLine cli : customCommandLines) {
formatter.setSyntaxPrefix(" Options for " + cli.getId() + " mode:");
Options customOpts = new Options();
cli.addGeneralOptions(customOpts);
if (runOptions) {
cli.addRunOptions(customOpts);
}
formatter.printHelp(" ", customOpts);
System.out.println();
}
}
public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLine commandLine) {
if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
boolean allowNonRestoredState =
commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
return SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
} else {
return SavepointRestoreSettings.none();
}
}
// --------------------------------------------------------------------------------------------
// Line Parsing
// --------------------------------------------------------------------------------------------
public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions)
throws CliArgsException {
final DefaultParser parser = new DefaultParser();
try {
return parser.parse(options, args, stopAtNonOptions);
} catch (ParseException e) {
throw new CliArgsException(e.getMessage());
}
}
/**
* Merges the given {@link Options} into a new Options object.
*
* @param optionsA options to merge, can be null if none
* @param optionsB options to merge, can be null if none
* @return
*/
public static Options mergeOptions(@Nullable Options optionsA, @Nullable Options optionsB) {
final Options resultOptions = new Options();
if (optionsA != null) {
for (Option option : optionsA.getOptions()) {
resultOptions.addOption(option);
}
}
if (optionsB != null) {
for (Option option : optionsB.getOptions()) {
resultOptions.addOption(option);
}
}
return resultOptions;
}
}
点击查看ProgramOptions代码
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.cli;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.commons.cli.CommandLine;
import java.io.File;
import java.io.FilenameFilter;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.JARDIR_OPTION;//新增地方
import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
import static org.apache.flink.client.cli.ProgramOptionsUtils.containsPythonDependencyOptions;
import static org.apache.flink.client.cli.ProgramOptionsUtils.createPythonProgramOptions;
import static org.apache.flink.client.cli.ProgramOptionsUtils.isPythonEntryPoint;
/** Base class for command line options that refer to a JAR file program. */
public class ProgramOptions extends CommandLineOptions {
private String jarFilePath;
protected String entryPointClass;
private final List<URL> classpaths;
private final String[] programArgs;
private final int parallelism;
private final boolean detachedMode;
private final boolean shutdownOnAttachedExit;
private final SavepointRestoreSettings savepointSettings;
protected ProgramOptions(CommandLine line) throws CliArgsException {
super(line);
this.entryPointClass =
line.hasOption(CLASS_OPTION.getOpt())
? line.getOptionValue(CLASS_OPTION.getOpt())
: null;
this.jarFilePath =
line.hasOption(JAR_OPTION.getOpt())
? line.getOptionValue(JAR_OPTION.getOpt())
: null;
this.programArgs = extractProgramArgs(line);
List<URL> classpaths = new ArrayList<URL>();
if (line.hasOption(CLASSPATH_OPTION.getOpt())) {
for (String path : line.getOptionValues(CLASSPATH_OPTION.getOpt())) {
try {
classpaths.add(new URL(path));
} catch (MalformedURLException e) {
throw new CliArgsException("Bad syntax for classpath: " + path);
}
}
}
//*** 新增地方*****
// load jardir all jar.
if (line.hasOption(JARDIR_OPTION.getOpt())) {
for (String path : line.getOptionValues(JARDIR_OPTION.getOpt())) {
List<URL> jarFiles = null;
try {
jarFiles = loadAllJarFromPathURl(path);
} catch (MalformedURLException e) {
e.printStackTrace();
throw new CliArgsException("Bad syntax for classpath: " + path);
}
// classpaths.add(new URL(path));
classpaths.addAll(jarFiles);
}
}
//*** 新增地方*****
this.classpaths = classpaths;
if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());
try {
parallelism = Integer.parseInt(parString);
if (parallelism <= 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new CliArgsException(
"The parallelism must be a positive number: " + parString);
}
} else {
parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
}
detachedMode =
line.hasOption(DETACHED_OPTION.getOpt())
|| line.hasOption(YARN_DETACHED_OPTION.getOpt());
shutdownOnAttachedExit = line.hasOption(SHUTDOWN_IF_ATTACHED_OPTION.getOpt());
this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line);
}
//***新增地方**** start
private List<URL> loadAllJarFromPathURl(String path) throws MalformedURLException {
// 指定需要搜索的目录.
List<URL> urls = new ArrayList<>();
System.out.println("jar dir:" + path);
// 创建File对象表示目录.
File directory = new File(path);
// 使用FilenameFilter过滤出以.jar结尾的文件.
File[] jarFiles =
directory.listFiles(
new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.toLowerCase().endsWith(".jar");
}
});
System.out.println("jarFiles len:" + jarFiles.length);
// 遍历找到的jar文件
if (jarFiles != null) {
for (File jarFile : jarFiles) {
System.out.println(jarFile.getAbsolutePath());
URL url = jarFile.toURI().toURL();
urls.add(url);
}
}
return urls;
}
//***新增地方**** end
protected String[] extractProgramArgs(CommandLine line) {
String[] args =
line.hasOption(ARGS_OPTION.getOpt())
? line.getOptionValues(ARGS_OPTION.getOpt())
: line.getArgs();
if (args.length > 0 && !line.hasOption(JAR_OPTION.getOpt())) {
jarFilePath = args[0];
args = Arrays.copyOfRange(args, 1, args.length);
}
return args;
}
public void validate() throws CliArgsException {
// Java program should be specified a JAR file
if (getJarFilePath() == null) {
throw new CliArgsException("Java program should be specified a JAR file.");
}
}
public String getJarFilePath() {
return jarFilePath;
}
public String getEntryPointClassName() {
return entryPointClass;
}
public List<URL> getClasspaths() {
return classpaths;
}
public String[] getProgramArgs() {
return programArgs;
}
public int getParallelism() {
return parallelism;
}
public boolean getDetachedMode() {
return detachedMode;
}
public boolean isShutdownOnAttachedExit() {
return shutdownOnAttachedExit;
}
public SavepointRestoreSettings getSavepointRestoreSettings() {
return savepointSettings;
}
public void applyToConfiguration(Configuration configuration) {
if (getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT) {
configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, getParallelism());
}
configuration.setBoolean(DeploymentOptions.ATTACHED, !getDetachedMode());
configuration.setBoolean(
DeploymentOptions.SHUTDOWN_IF_ATTACHED, isShutdownOnAttachedExit());
ConfigUtils.encodeCollectionToConfig(
configuration, PipelineOptions.CLASSPATHS, getClasspaths(), URL::toString);
SavepointRestoreSettings.toConfiguration(getSavepointRestoreSettings(), configuration);
}
public static ProgramOptions create(CommandLine line) throws CliArgsException {
if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {
return createPythonProgramOptions(line);
} else {
return new ProgramOptions(line);
}
}
}
点击查看TestLoadExtJar代码
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.java.testloadextjar;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import com.test.A;
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text
* files.
*
* <p>The input is a plain text file with lines separated by newline characters.
*
* <p>Usage: <code>WordCount --input <path> --output <path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
*
* <p>This example shows how to:
*
* <ul>
* <li>write a simple Flink program.
* <li>use Tuple data types.
* <li>write and use user-defined functions.
* </ul>
*/
public class TestLoadExtJar {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
A a = new A();
a.test();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet<String> text = null;
if (params.has("input")) {
// union all the inputs from text files
for (String input : params.getMultiParameterRequired("input")) {
if (text == null) {
text = env.readTextFile(input);
} else {
text = text.union(env.readTextFile(input));
}
}
Preconditions.checkNotNull(text, "Input DataSet should not be null.");
} else {
// get default test text data
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = WordCountData.getDefaultTextLineDataSet(env);
}
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the
* form of "(word,1)" ({@code Tuple2<String, Integer>}).
*/
public static final class Tokenizer
implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
(2)模拟第三方代码
使用idea 创建一个maven项目,创建一个A类很简单,供TestLoadExtJar 调用
A.java
点击查看A代码
package com.test;
public class A {
public void test() {
System.out.println("A");
}
}
看到没,报类没找到
加了jd参数
然后就可以执行了
【注意】:如果不能从 https://github.com/apache/flink.git下载,可以从https://gitee.com/longsebo/flink.git下载(这个仓库了,代码已经修改)
最后
如果有问题或想沟通,可以留言