首页 > 其他分享 >【Flink metric(3)】chunjun是如何实现脏数据管理的

【Flink metric(3)】chunjun是如何实现脏数据管理的

时间:2024-06-30 19:55:43浏览次数:3  
标签:metric Flink chunjun dirty mysql 线程 数据 void DirtyDataCollector

文章目录

一. 基础逻辑

脏数据管理模块的基本逻辑是:

  1. 当数据消费失败时,将脏数据拦截并保存到dirtyDataCollector中;
  2. 全局metric判断:脏数据达到设定值之后,任务报错,flink停止运行,并将脏数据输出到flink日志中、或mysql的配置中。

对于代码实现:

DirtyManager用于管理DirtyDataCollector,串起DirtyDataCollector的生命周期,DirtyDataCollector主要用于收集脏数据并输出(到日志中,mysql中),脏数据数量达到设定值之后,flink停止运行。

 
具体的DataCollector实现有:

在这里插入图片描述

分别用于输出到taskmanager的日志、(最后报错时)jobmanager日志、输出到mysql表中。

所以这里有三层代码结构:

  • DirtyManager:管理DirtyDataCollector
  • DirtyDataCollector:主要用于收集脏数据并输出,并判断脏数据是否达到临界值
  • 具体的DataCollector的实现:具体的输出实现:输出到日志,输出到mysql。

接下来我们逐个看每层的具体实现逻辑
 

二. DirtyManager

DirtyManager用于管理DirtyDataCollector,串起DirtyDataCollector的生命周期(open、run、close),主要流程如下:

  1. 设置系统配置给DirtyDataCollector
  2. 开启DirtyManager线程,主要用于DirtyDataCollector消费脏数据(收集脏数据)
  3. 关闭资源:DirtyDataCollector、DirtyManager的线程资源。

1. 初始化

初始化DirtyManager

  • 根据配置加载特定的DirtyDataCollector:用于脏数据的收集
  • 获取系统信息:jobId、jobName、operationName
  • 获取脏数据metric,用于定期合并脏数据为全局脏数据。
public DirtyManager(DirtyConfig dirtyConfig, RuntimeContext runtimeContext) {  
    //通过反射注册DirtyDataCollector
    this.consumer = DataSyncFactoryUtil.discoverDirty(dirtyConfig);  
    Map<String, String> allVariables = runtimeContext.getMetricGroup().getAllVariables();  
    this.jobId = allVariables.get(JOB_ID);  
    this.jobName = allVariables.getOrDefault(JOB_NAME, "defaultJobName");  
    this.operationName = allVariables.getOrDefault(OPERATOR_NAME, "defaultOperatorName");  
    this.errorCounter = runtimeContext.getLongCounter(Metrics.NUM_ERRORS);  
}

 

2. 收集脏数据并check

被具体的连接器调用:
具体当连接器生产数据或写数据到数据源报错时,调用此方法收集脏数据

  1. 创建线程,用于异步执行DirtyDataCollector,开始消费脏数据到日志或mysql表中
  2. 添加脏数据条数,同步到全局脏数据metric中
  3. 脏数据信息,存到队列中,等待具体的脏数据收集器消费
  4. 子流程:判断脏数据条数是否大于总脏数据条数
public void collect(Object data, Throwable cause, String field, long globalErrors) {  
    if (executor == null) {  
        execute();  
    }  
  
    DirtyDataEntry entity = new DirtyDataEntry();  
    entity.setJobId(jobId);  
    entity.setJobName(jobName);  
    entity.setOperatorName(operationName);  
    entity.setCreateTime(new Timestamp(System.currentTimeMillis()));  
    entity.setDirtyContent(toString(data));  
    entity.setFieldName(field);  
    entity.setErrorMessage(ExceptionUtil.getErrorMessage(cause));  
  
    //积累metric:errorCounter,这里直接同步到jobmanager?
    errorCounter.add(1L);  
    //将脏数据添加到队列,等待消费。
    consumer.offer(entity, globalErrors);  
}

/**  
 * 创建线程,用于异步执行DirtyDataCollector  
 */
 public void execute() {  
    if (executor == null) {  
        executor =  
                new ThreadPoolExecutor(  
                        MAX_THREAD_POOL_SIZE,  
                        MAX_THREAD_POOL_SIZE,  
                        0,  
                        TimeUnit.MILLISECONDS,  
                        new LinkedBlockingQueue<>(),  
                        new ChunJunThreadFactory(  
                                "dirty-consumer",  
                                true,  
                                (t, e) -> {  
                                    log.error(  
                                            String.format(  
                                                    "Thread [%s] consume failed.", t.getName()),  
                                            e);  
                                }),  
                        new ThreadPoolExecutor.CallerRunsPolicy());  
    }  
  
    //初始化DirtyDataCollector:比如脏数据定时发送到mysql时的线程注册  
    consumer.open();  
    //拿出一个线程执行DirtyDataCollector的execute方法  
    executor.execute(consumer);  
}

 

3. 关闭资源

/** Close manager. */  
public void close() {  
    if (!isAlive.get()) {  
        return;  
    }  
    //先关闭datacollector的资源
    if (consumer != null) {  
        consumer.close();  
    }  
    //再关闭executor线程
    if (executor != null) {  
        executor.shutdown();  
    }  
  
    isAlive.compareAndSet(true, false);  
}

 

三. DirtyDataCollector

处于第二层的dirtyDataCollector实现了脏数据的临时保存并等待具体DataCollector的消费,
它的基本逻辑是:

  1. 当脏数据消费失败时,将脏数据拦截并保存到consumeQueue中,等待被消费
  2. 全局的metric:脏数据达到设定值之后,任务报错,flink停止运行,并将脏数据输出到flink日志中。

 

1. 初始化

在DirtyManager实例化时,注册DirtyDataCollector时的操作,

这里获取脏数据最大值,允许消费脏数据失败的条数,以及对具体DataCollector的初始化,我们下节分析。


public void initializeConsumer(DirtyConfig conf) {  
    this.maxConsumed = conf.getMaxConsumed();  
    this.maxFailedConsumed = conf.getMaxFailedConsumed();  
  
    this.init(conf);  
}

被DirtyManager调用:在开启脏数据收集器线程之前执行

初始化具体脏数据收集器:目前之后mysql脏数据收集器实现了此方法:消费线程、mysql连接

public void open() {  
}

 

2. 收集脏数据并check

offer方法被DirtyManager的collect方法调用

  • 用于存储具体脏数据并更新单个slot的脏数据条数。
  • 每添加一条脏数据,就判断脏数据是否达到了设定值,如果是则抛出异常。

其中:globalErrors是上文AccumulatorCollector定期更新的结果。


//存储脏数据具体内容,并更新单个slot的脏数据条数
public synchronized void offer(DirtyDataEntry dirty, long globalErrors) {  
    consumeQueue.offer(dirty);  
    addConsumed(1L, dirty, globalErrors);  
}


/**  
 * 添加脏数据  
 * 通过metric判断此时的脏数据条数,是否已经超过全局设置的脏数据条数  
 * @param count  
 * @param dirty  
 * @param globalErrors  
 */  
protected void addConsumed(long count, DirtyDataEntry dirty, long globalErrors) {  
    consumedCounter.add(count);  
    // 因为总体的脏数据需要tm和jm进行通讯(每tm心跳+1s),会有延迟,且当单slot运行时误差将达到最大  
    // 所以这里需要判断延迟情况  
    long max =  
            consumedCounter.getLocalValue() >= globalErrors  
                    ? consumedCounter.getLocalValue()  
                    : globalErrors;  
    // 但这里仍然有误差:此时如果所有的slot都消费了脏数据那么其他slot的脏数据就记录不到。也就是会多消费脏数据  
    // 所以这里要有取舍:是否要消费完全准确的脏数据  
    if (max >= maxConsumed) {  
        StringJoiner dirtyMessage =  
                new StringJoiner("\n")  
                        .add("\n****************Dirty Data Begin****************\n")  
                        .add(dirty.toString())  
                        .add("\n****************Dirty Data End******************\n");  
        throw new NoRestartException(  
                String.format(  
                        "The dirty consumer shutdown, due to the consumed count exceed the max-consumed [%s]",  
                        maxConsumed)  
                        + dirtyMessage);  
    }  
}

 

3. run:消费脏数据

由DirtyManager开启脏数据消费线程,

具体的DataCollector(log、mysql)消费脏数据,发送到Taskmanager日志或mysql表中。

/**  
 * 开启脏数据消费线程  
 * 定时消费脏数据,发送到执行脏数据管理器中:log、mysql等  
 */  
@Override  
public void run() {  
    while (isRunning.get()) {  
        try {  
            //指定的DataCollector消费脏数据
            DirtyDataEntry dirty = consumeQueue.take();  
            consume(dirty);  
        } catch (Exception e) {  
            //未成功将脏数据收集到脏数据管理模块中  
            addFailedConsumed(e, 1L);  
        }  
    }  
}

/**  
 *  消费脏数据用于输出到日志、mysql等  
 */
 protected abstract void consume(DirtyDataEntry dirty) throws Exception;

 

4. 释放资源

不同的DataCollector有不同的操作,下节分析

public abstract void close();

 

四. LogDirtyDataCollector

实现比较简单:拿到的数据直接打印到Taskmanager中,关闭时,设定isRunning为false

/**  
 * 没有线程,调用即输出到日志中  
 */  
@Slf4j  
public class LogDirtyDataCollector extends DirtyDataCollector {  
  
    private static final long serialVersionUID = 7366317208451727471L;  
    private Long printRate;  
  
    @Override  
    protected void init(DirtyConfig conf) {  
        this.printRate = conf.getPrintRate();  
    }  
  
    /**  
     * 输出脏数据到taskmanager  
     * @param dirty dirty-data which should be consumed.  
     */    @Override  
    protected void consume(DirtyDataEntry dirty) {  
        if (consumedCounter.getLocalValue() % printRate == 0) {  
            StringJoiner dirtyMessage =  
                    new StringJoiner("\n")  
                            .add("\n====================Dirty Data=====================")  
                            .add(dirty.toString())  
                            .add("\n===================================================");  
            log.warn(dirtyMessage.toString());  
        }  
    }  
  
    @Override  
    public void close() {  
        isRunning.compareAndSet(true, false);  
        log.info("Print consumer closed.");  
    }  
}

 

下篇分析MysqlDirtyDataCollector是如何消费数据。

标签:metric,Flink,chunjun,dirty,mysql,线程,数据,void,DirtyDataCollector
From: https://blog.csdn.net/hiliang521/article/details/139940708

相关文章

  • 从工具产品体验对比spark、hadoop、flink
    作为一名大数据开发,从工具产品的角度,对比一下大数据工具最常使用的框架spark、hadoop和flink。工具无关好坏,但人的喜欢有偏好。目录评价标准1效率2用户体验分析从用户的维度来看从市场的维度来看从产品的维度来看3用户体验的基本原则成本和产出是否成正比操作是否“......
  • Apache Flink 和 Apache Spark详细介绍、优缺点、使用场景以及选型抉择?
    ApacheFlink和ApacheSpark我该投入谁的怀抱?ApacheFlink简介:ApacheFlink是一个用于分布式流处理和批处理的开源框架。它以实时数据处理和事件驱动的流处理著称,提供高吞吐量和低延迟的处理能力。功能:流处理:Flink可以处理实时数据流,支持低延迟和高吞吐量的流处理......
  • flinksql API StreamTableEnvironment StreamStatementSet应用
    1.问题描述在应用flink实时消费kafka数据多端中,一般会使用flink原生的addsink或flinkSQL利用SqlDialect,比如消费kafka数据实时写入hive和kafka一般用两种方式:第一种方式是写入hive利用SqlDialect,写入kafka利用flink的旁路输出流+原生addSink第二种方式是写入hive和kafka都利用S......
  • Flink报错 java.lang.IllegalArgumentException: too many arguments
    错误信息/Library/Java/JavaVirtualMachines/zulu-21.jdk/Contents/Home/bin/java-javaagent:/Users/liuyu/Applications/IntelliJIDEAUltimate.app/Contents/lib/idea_rt.jar=51748:/Users/liuyu/Applications/IntelliJIDEAUltimate.app/Contents/bin-Dfile.encoding=UTF-......
  • Flink Sink中jdbc sink
    这里介绍一下FlinkSink中jdbcsink的使用方法,以mysql为例,这里代码分为两种,事务和非事务此处sink只处理append,主要是事务和非事务的调用方法,upsert等未实现非事务代码importorg.apache.flink.connector.jdbc.JdbcConnectionOptions;importorg.apache.flink.connector.jdb......
  • IDEA 2024 配置 Flink Scala开发环境
    IDEA2024配置FlinkScala开发环境一、环境IntelliJIDEA2024.1(UltimateEdition)项目JDK版本:ZuluJDK11Scala2.12.19Scala编译ServerJDK版本:JDK21Flink1.19.1二、步骤、创建Java项目安装Scala插件,安装后重启位置:Settings-->Plugins-->Marketplace......
  • flink版本: 1.14.6 flink水位生成以及基于水位触发窗口的计算
    Flink是间断性(punctuate)或者周期性(periodic)生成水位线的1.定义和用途*punctuate:为每条消息都尝试生成watermark,这提供了更细粒度的控制,但增加了不必要的计算开销*periodic:周期性的生成watermark,可以通过env.getConfig().setAutoWatermarkInterval(1*1000L)设置周期间......
  • flink 如果是有序流,还需要 forMonotonousTimestamps吗
    如果数据是有序的,即数据完全按照时间发生的顺序到达,那么在flink中,虽然理论上不需要额外的Watermark策略来标识数据的有序性,但使用forMonotonousTimestamps策略仍然有其必要性。以下是详细解释:水位的作用即使数据完全有序,flink的窗口计算仍然需要watermark来触发。watermark提......
  • Flink状态(一)
    key状态和算子状态key状态key状态总是与key有关,只能被用于keyedStream类型的函数与算子。你可以认为key状态是一种被分区的算子状态,每一个key有一个状态分区。每一个key状态逻辑上由<parellel-operator-instance,key>唯一确定,由于每一个key只分布在key算子的多个并发实例中的一......
  • Flink状态(二)
    Flink提供了不同的状态存储方式,并说明了状态如何存和存储在哪里。状态可以被存储在Jvm的堆和堆外。根据状态存储方式的不同,Flink也能代替应用管理状态,意思是Flink能够进行内存管理(有必要的时候,可能会溢出到硬盘),允许应用保存非常大的状态。默认情况下,在配置文件flink-conf.yam......