package com.ecarx.sumatra.data.tab.conf; import org.apache.flink.api.java.utils.ParameterTool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Optional; public class ConfigManager { private static final Logger LOG = LoggerFactory.getLogger(ConfigManager.class); private static ParameterTool parameterTool; /** * 初始化配置管理器并加载指定环境的配置文件。 * * @param env 环境名称(例如 "dev", "test", "prod") */ public static void init(String env) { String configFilePath = String.format("/application-%s.properties", env); try (var configStream = ConfigManager.class.getResourceAsStream(configFilePath)) { if (configStream == null) { throw new IOException("Configuration file not found: " + configFilePath); } parameterTool = ParameterTool.fromPropertiesFile(configStream); LOG.info("Application running on environment: {}", env); LOG.debug("Loaded configuration from: {}", configFilePath); } catch (IOException e) { LOG.error("Failed to load configuration files.", e); throw new RuntimeException("Initialization failed due to configuration loading error.", e); } } /** * 获取配置项的值。 * * @param key 配置项的键 * @return 对应的值,如果没有找到则返回 null */ public static String getValue(String key) { return Optional.ofNullable(parameterTool).map(tool -> tool.get(key)).orElse(null); } /** * 获取指定类型的配置项值。 * * @param <T> 配置项类型 * @param key 配置项的键 * @param clazz 配置项的目标类型 * @return 对应的值,如果没有找到则返回 null */ public static <T> T getValue(String key, Class<T> clazz) { if (clazz == Integer.class) { return clazz.cast(Optional.ofNullable(parameterTool) .map(tool -> tool.getInt(key)) .orElse(null)); } else if (clazz == Boolean.class) { return clazz.cast(Optional.ofNullable(parameterTool) .map(tool -> tool.getBoolean(key)) .orElse(null)); } else { return clazz.cast(getValue(key)); } } /** * 主方法仅用于测试目的。 */ public static void main(String[] args) { // 直接在代码中指定环境 init("dev"); // 你可以在这里切换环境,如 "test" 或 "prod" if (parameterTool != null) { parameterTool.toMap().forEach((key, value) -> System.out.println(key + "=" + value)); } } }
使用:
public class MyFlinkJob { public static void main(String[] args) throws Exception { // 在代码中直接指定环境 ConfigManager.init("dev"); // 你可以在这里切换环境,如 "test" 或 "prod" // 加载配置后继续执行其他逻辑... } }
获取配置:
String jobName = ConfigManager.getValue("flink.job.name"); int parallelism = ConfigManager.getValue("flink.parallelism", Integer.class); String kafkaServers = ConfigManager.getValue("kafka.bootstrap.servers"); String kafkaTopic = ConfigManager.getValue("kafka.topic");
标签:String,配置文件,flink,getValue,ConfigManager,static,key,public From: https://www.cnblogs.com/jiangbei/p/18624115