首页 > 数据库 >Flink source API定期读取MySQL数据

Flink source API定期读取MySQL数据

时间:2024-12-30 11:32:06浏览次数:1  
标签:return Flink private source API org Override import public

主类MyPeriodQueryDbSource

import org.apache.flink.api.connector.source.*;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.util.Properties;

/**
 * 定期读取数据source
 *
 * @param <T> 输出对象泛型
 */
public class MyPeriodQueryDbSource<T> implements Source<T, MySplit, NoOpEnumState> {
    /**
     * 定期读取时间,单位ms
     */
    private final long period;

    private final Properties dbProps;

    private final String sql;

    private final Class<T> clazz;
    private final String sourceName;

    public MyPeriodQueryDbSource(String sourceName, long period, Properties dbProps,
                                 String sql, Class<T> clazz) {
        this.period = period;
        this.sourceName = sourceName;
        this.dbProps = dbProps;
        this.sql = sql;
        this.clazz = clazz;
    }

    @Override
    public SplitEnumerator<MySplit, NoOpEnumState> createEnumerator(SplitEnumeratorContext<MySplit> enumContext) throws Exception {
        return new MySplitEnumerator(enumContext);
    }

    @Override
    public Boundedness getBoundedness() {
        // 无界流
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    @Override
    public SplitEnumerator<MySplit, NoOpEnumState> restoreEnumerator(SplitEnumeratorContext<MySplit> enumContext, NoOpEnumState checkpoint) throws Exception {
        return new MySplitEnumerator(enumContext);
    }

    @Override
    public SimpleVersionedSerializer<MySplit> getSplitSerializer() {
        return new MySplitSerializer();
    }

    @Override
    public SimpleVersionedSerializer<NoOpEnumState> getEnumeratorCheckpointSerializer() {
        return new MyEumStateSerializer();
    }

    @Override
    public SourceReader<T, MySplit> createReader(SourceReaderContext readerContext) throws Exception {
        return new MySourceReader<>(readerContext,sourceName, period, dbProps, sql, clazz);
    }
}

读取Reader


import com.alibaba.druid.pool.DruidDataSourceFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.SingleColumnRowMapper;

import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;

@Slf4j
public class MySourceReader<T> implements SourceReader<T, MySplit> {
    /**
     * 定期读取时间,单位ms
     */
    private final long period;
    private final String sourceName;
    private final JdbcTemplate jdbcTemplate;
    private List<MySplit> splits;
    /**
     * 查询sql
     */
    private final String sql;
    private final Class<T> clazz;
    private RowMapper<T> rowMapper;
    private final SourceReaderContext readerContext;
    private Boolean notFirst = false;

    public MySourceReader(SourceReaderContext readerContext, String sourceName, long period, Properties dbProps,
                          String sql, Class<T> clazz) {
        this.readerContext = readerContext;
        this.period = period;
        this.sourceName = sourceName;
        this.sql = sql;
        this.clazz = clazz;
        try {
            DataSource dataSource = DruidDataSourceFactory.createDataSource(dbProps);
            jdbcTemplate = new JdbcTemplate(dataSource);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void start() {
        if (clazz == String.class || clazz.isPrimitive()) {
            rowMapper = SingleColumnRowMapper.newInstance(clazz);
        } else {
            rowMapper = BeanPropertyRowMapper.newInstance(clazz);
        }
        readerContext.sendSplitRequest();
        splits = new ArrayList<>();
        log.info("数据源:{},sql:{}", sourceName, sql);
    }

    /**
     * 如何读取数据,实现需要保证不阻塞(即,不能sleep,wait等)
     * The implementation must make sure this method is non-blocking.
     *
     * @param output ReaderOutput
     * @return InputStatus
     * @throws Exception Exception
     */
    @Override
    public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
        if (notFirst) {
            // 不是第一个直接退出
            log.warn("{}不是第一个reader,退出", readerContext.getIndexOfSubtask());
            return InputStatus.END_OF_INPUT;
        }
        if (splits.isEmpty()) {
            // 向SplitEnumerator申请Split
            readerContext.sendSplitRequest();
            // 无界流,未来可能会有
            return InputStatus.NOTHING_AVAILABLE;
        }
        try {
            List<T> list = jdbcTemplate.query(sql, rowMapper);
            if (!list.isEmpty()) {
                for (T t : list) {
                    output.collect(t);
                }
            }
        } catch (DataAccessException | IllegalArgumentException e) {
            log.warn("无法读取数据源:'{}'", sourceName, e);
            // 无界流,未来可能会有
            return InputStatus.NOTHING_AVAILABLE;
        }
        // 无界流,未来可能会有
        return InputStatus.NOTHING_AVAILABLE;
    }

    @Override
    public List<MySplit> snapshotState(long checkpointId) {
        // 定期读,不需要做状态管理,每次都是最新的
        return Collections.emptyList();
    }

    @Override
    public CompletableFuture<Void> isAvailable() {
        return CompletableFuture.runAsync(this::sleepPeriod);
    }

    private void sleepPeriod() {
        log.info("等待{} ms之后进行{}数据源的读取", period, sourceName);
        synchronized (this) {
            try {
                wait(period);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void addSplits(List<MySplit> splits) {
        // 申请到Split之后进行回调
        this.splits.addAll(splits);
    }

    @Override
    public void notifyNoMoreSplits() {
        notFirst = true;
    }

    @Override
    public void close() throws Exception {
    }
}

分片Split


import org.apache.flink.api.connector.source.SourceSplit;

import java.util.UUID;

public class MySplit implements SourceSplit {

    @Override
    public String splitId() {
        return UUID.randomUUID().toString().replace("-", "");
    }
}

状态

// 无状态,占位
public class NoOpEnumState {
}

Split序列化器


import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.IOException;

public class MySplitSerializer implements SimpleVersionedSerializer<MySplit> {
    @Override
    public int getVersion() {
        return 0;
    }

    @Override
    public byte[] serialize(MySplit obj) throws IOException {
        return new byte[0];
    }

    @Override
    public MySplit deserialize(int version, byte[] serialized) throws IOException {
        return new MySplit();
    }
}

状态序列化器



import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.IOException;

public class MyEumStateSerializer implements SimpleVersionedSerializer<NoOpEnumState> {
    @Override
    public int getVersion() {
        return 0;
    }

    @Override
    public byte[] serialize(NoOpEnumState obj) throws IOException {
        return new byte[0];
    }

    @Override
    public NoOpEnumState deserialize(int version, byte[] serialized) throws IOException {
        return new NoOpEnumState();
    }
}

标签:return,Flink,private,source,API,org,Override,import,public
From: https://www.cnblogs.com/xixisix/p/18640613

相关文章

  • 如何使用GMail API加载数据:实战指南
    如何使用GMailAPI加载数据:实战指南老铁们,今天我们来聊聊怎么从GMail中加载数据,这是个相当实用的功能,尤其是在构建对话模型时。说白了,这个技术点其实不难,不过有些细节还是需要注意的。我先前踩过这个坑,现在分享下经验。技术背景介绍在处理电子邮件数据时,我们往往需要获取......
  • wx.onApiCategoryChange
    wx.onApiCategoryChange(functionlistener)基础库2.33.0开始支持,低版本需做兼容处理。小程序插件:不支持功能描述监听API类别变化事件参数functionlistenerAPI类别变化事件的监听函数参数Objectres属性类型说明apiCategorynumberAPI类别......
  • wx.offApiCategoryChange
    wx.offApiCategoryChange(functionlistener)基础库2.33.0开始支持,低版本需做兼容处理。小程序插件:不支持功能描述移除API类别变化事件的监听函数参数functionlisteneronApiCategoryChange传入的监听函数。不传此参数则移除所有监听函数。示例代码constlisten......
  • wx.getApiCategory
    stringwx.getApiCategory()基础库2.33.0开始支持,低版本需做兼容处理。小程序插件:不支持功能描述获取当前API类别返回值stringAPI类别不同apiCategory场景下的API限制X表示API被限制无法使用;不在表格中的API不限制。defaultnativeFunctionalized......
  • WebApiDemo
    以下是一个使用ASP.NETWebAPI(基于.NETFramework)的简单示例。1.创建ASP.NETWebAPI项目首先,确保你已经安装了VisualStudio,并且选择了包含ASP.NET和Web开发工作负载的安装选项。打开VisualStudio。选择“创建新项目”。在搜索栏中输入“ASP.NETWeb应用程序(.NETFra......
  • rasa nlu 传递信息及 REST API 替代 rasa nlu
    1.4rasanlu传递信息及RESTAPI替代rasanlu1.4.1获取rasanlu传给rasacore的响应开始想在rasa内进行拦截,经过测试,难以实现所以采用接口方式获取启动rasanlu服务rasarun--enable-api请求:响应:{ "text":"今天天气怎么样", "intent":{ "name":"as......
  • UE4.27, 揣摩源码, 序列化 (三) FLinkerLoad, FLinkerSave
    3.  FLinkerLoad,FLinkerSave分别是UObject的反序列化和序列化的内核3.0.UPackage与UObjectUObject因为涉及与其他UObject的复杂引用关系,如果我们客制化地单独正反序列化每一个UObject,我们会在反序列化的时候惊觉这是繁琐而不可能的。为了满足UObject......
  • Flink状态编程
            Flink处理机制的核心就是“有状态的流处理”,在某些情况下,一条数据的计算不仅要基于当前数据自身,还需要依赖数据流中的一些其他数据。这些在一个任务中,用来辅助计算的数据我们就称之为这个任务的状态。一、按键分区状态(KeyedState)分类        按键分......
  • Hudi数据湖_数据写原理_COW和MOR表Upsert原理_Flink和Spark写入区别_Insert和Overwrit
    可以看到数据写操作,有三种方式upsert就是通过index索引来,对数据到底是insert还是update会做上标记,并且,只有索引到了数据才会update,所以是依赖index索引的.insert就是不停的插入数据,跳过了index,插入快,但是有重复数据,可能需要自己处理bulk_insert 写排序默认......
  • Flink 集群有哪些⻆⾊?各⾃有什么作⽤?
    在Flink集群中有以下几个重要角色:JobManager(作业管理器)作用:作业管理:它是Flink集群的控制中心,负责接收用户提交的作业,协调和管理整个作业的执行过程。例如,当用户提交一个实时数据处理的流计算作业时,JobManager会负责调度该作业在集群中的执行。资源分配:JobManage......