首页 > 其他分享 >Apache/InLong InLong Manager 支持配置 Flink 任务并发度/Adjust sort resources according to data scale心得

Apache/InLong InLong Manager 支持配置 Flink 任务并发度/Adjust sort resources according to data scale心得

时间:2024-07-07 11:08:25浏览次数:1  
标签:sort InLong getInstance get DEFAULT Flink server API Configuration

audit已经实现了对于InLong系统的Agent、DataProxy、Sort模块的入流量、出流量进行实时审计对账。 对账的粒度有分钟、小时、天三种粒度。

  1. audit的数据缓存在org.apache.inlong.audit.cache的各个类中,有DayCache HalfHourCache等等
  2. 请求audit数据的api在org.apache.inlong.audit.config.OpenApiConstants中,通过org.apache.inlong.audit.service.ApiService来调用。调用的时候注意参数的合法性,有合法性检查,以免请求不到数据。居然没有用controller,挺神奇的,贴一段代码上来,便于以后研究

private void initHttpServer() {

点击查看代码
 private void initHttpServer() {
        int bindPort = Configuration.getInstance().get(KEY_HTTP_SERVER_BIND_PORT, DEFAULT_HTTP_SERVER_BIND_PORT);
        try {
            HttpServer server = HttpServer.create(new InetSocketAddress(bindPort),
                    Configuration.getInstance().get(KEY_API_BACKLOG_SIZE, DEFAULT_API_BACKLOG_SIZE));
            server.setExecutor(Executors.newFixedThreadPool(
                    Configuration.getInstance().get(KEY_API_THREAD_POOL_SIZE, DEFAULT_API_THREAD_POOL_SIZE)));
            server.createContext(Configuration.getInstance().get(KEY_API_DAY_PATH, DEFAULT_API_DAY_PATH),
                    new AuditHandler(DAY));
            server.createContext(Configuration.getInstance().get(KEY_API_HOUR_PATH, DEFAULT_API_HOUR_PATH),
                    new AuditHandler(HOUR));
            server.createContext(Configuration.getInstance().get(KEY_API_MINUTES_PATH, DEFAULT_API_MINUTES_PATH),
........省略类似的方法.........
            server.start();
            LOGGER.info("Init http server success. Bind port is: {}", bindPort);
        } catch (Exception e) {
            LOGGER.error("Init http server has exception!", e);
        }
    }
    int bindPort = Configuration.getInstance().get(KEY_HTTP_SERVER_BIND_PORT, DEFAULT_HTTP_SERVER_BIND_PORT);
    try {
        HttpServer server = HttpServer.create(new InetSocketAddress(bindPort),
                Configuration.getInstance().get(KEY_API_BACKLOG_SIZE, DEFAULT_API_BACKLOG_SIZE));
        server.setExecutor(Executors.newFixedThreadPool(
                Configuration.getInstance().get(KEY_API_THREAD_POOL_SIZE, DEFAULT_API_THREAD_POOL_SIZE)));
        server.createContext(Configuration.getInstance().get(KEY_API_DAY_PATH, DEFAULT_API_DAY_PATH),
                new AuditHandler(DAY));
        server.createContext(Configuration.getInstance().get(KEY_API_HOUR_PATH, DEFAULT_API_HOUR_PATH),
                new AuditHandler(HOUR));
        server.createContext(Configuration.getInstance().get(KEY_API_MINUTES_PATH, DEFAULT_API_MINUTES_PATH),

........省略类似的方法.........
server.start();
LOGGER.info("Init http server success. Bind port is: {}", bindPort);
} catch (Exception e) {
LOGGER.error("Init http server has exception!", e);
}
}

  1. 请求方法样例,查询小时数据curl localhost:10080/audit/hour?startTime=2024-07-01T00:00:00&endTime=2024-07-07T23:59:59&inlongGroupId=testGroup&inlongStreamId=testStream&auditId=testAudit

标签:sort,InLong,getInstance,get,DEFAULT,Flink,server,API,Configuration
From: https://www.cnblogs.com/peterzh/p/18288279

相关文章

  • np.argsort
    函数解释np.argsort是NumPy库中的一个函数,用于对数组进行排序并返回排序后的索引。它不会直接对数组进行排序,而是返回一个数组,这个数组中的元素是原数组中元素按升序排序后的索引。numpy.argsort(a,axis=-1,kind=None,order=None)参数如下:a:要排序的数组axis:要排序的轴,默......
  • flink提交yarn 集群模式失败
    flink版本1.14.6在通过./bin/flinkrun-application-tyarn-application模式提交到yarn时失败。报错信息:点击查看代码Causedby:java.lang.ClassCastException:cannotassigninstanceoforg.apache.kafka.clients.consumer.OffsetResetStrategytofieldorg.apache.......
  • 用WSL2+Docker Desktop部署InLong的坑和经验
    WSL的网络坑死了————题记看到腾讯在搞开源,邂逅了ApacheInLong,觉得很有意思,就开始研究。考虑到这是和性能有关的东西,以后说不定还要压测什么的,所以就决定用WSL2+DockerDesktop,不用虚拟机了,感觉这样性能会好一点,正好也熟悉熟悉命令行,毕竟打CTF虽然天天用Linux但基本能用GU......
  • Flink 窗口触发器(Trigger)(一)
    Flink的窗口触发器(Trigger)是流处理中一个非常关键的概念,它定义了窗口何时被触发并决定触发后的行为(如进行窗口数据的计算或清理)。一、基本概念定义:触发器决定了窗口何时被触发以及触发后的行为。在Flink中,窗口的触发是通过设置定时器来实现的。作用:控制窗口数据的聚合时机......
  • 55、Flink 中使用 Java Lambda 表达式详解
    1)概述1.注意Flink支持对JavaAPI的所有算子使用Lambda表达式,但是,当Lambda表达式使用Java泛型时,需要显式地声明类型信息。2.示例和限制示例:map()函数使用Lambda表达式计算输入值的平方。不需要声明map()函数的输入i和输出参数的数据类型,因为Java编......
  • 56、Flink DataStream 的管理执行配置详解
    1)概述1.执行配置StreamExecutionEnvironment包含了ExecutionConfig,它允许在运行时设置作业特定的配置值。StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();ExecutionConfigexecutionConfig=env.getConfig();以下是可用......
  • 从0到1Flink的成长之路(二十)-Flink 高级特性(二)之自动重启策略和恢复 ,固定延迟重启策
    从0到1Flink的成长之路(二十)-Flink高级特性(二)之自动重启策略和恢复,,固定延迟重启策略(开发中使用)自动重启策略和恢复1)、重启策略配置方式配置文件在flink-conf.yml中可以进行配置,示例如下:restart-strategy:fixed-delayrestart-strategy.fixed-delay.attempts:3restart-strat......
  • 大数据面试题之Flink(1)
    目录Flink架构 Flink的窗口了解哪些,都有什么区别,有哪几种?如何定义? Flink窗口函数,时间语义相关的问题 介绍下Flink的watermark(水位线),watermark需要实现哪个实现类,在何处定义?有什么作用? Flink的窗口(实现)机制 说下Flink的CEP 说一说Flink的Checkpoint机制 ......
  • 大数据面试题之Flink(2)
    Flink中Checkpoint超时原因 Flink的ExactlyOnce语义怎么保证? Flink的端到端ExactlyOnce Flink的水印(Watermark),有哪几种? Flink的时间语义 Flink相比于其它流式处理框架的优点? Flink和Spark的区别?什么情况下使用Flink?有什么优点? FlinkbackPressure反压机......
  • 大数据面试题之Flink(3)
    如何确定Flink任务的合理并行度? Flink任务如何实现端到端一致? Flink如何处理背(反)压? Flink解决数据延迟的问题 Flink消费kafka分区的数据时flink件务并行度之间的关系 使用flink-client消费kafka数据还是使用flink-connector消费 如何动态修改Flink的配置,前提......