主类MyPeriodQueryDbSource
import org.apache.flink.api.connector.source.*;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.util.Properties;
/**
* 定期读取数据source
*
* @param <T> 输出对象泛型
*/
public class MyPeriodQueryDbSource<T> implements Source<T, MySplit, NoOpEnumState> {
/**
* 定期读取时间,单位ms
*/
private final long period;
private final Properties dbProps;
private final String sql;
private final Class<T> clazz;
private final String sourceName;
public MyPeriodQueryDbSource(String sourceName, long period, Properties dbProps,
String sql, Class<T> clazz) {
this.period = period;
this.sourceName = sourceName;
this.dbProps = dbProps;
this.sql = sql;
this.clazz = clazz;
}
@Override
public SplitEnumerator<MySplit, NoOpEnumState> createEnumerator(SplitEnumeratorContext<MySplit> enumContext) throws Exception {
return new MySplitEnumerator(enumContext);
}
@Override
public Boundedness getBoundedness() {
// 无界流
return Boundedness.CONTINUOUS_UNBOUNDED;
}
@Override
public SplitEnumerator<MySplit, NoOpEnumState> restoreEnumerator(SplitEnumeratorContext<MySplit> enumContext, NoOpEnumState checkpoint) throws Exception {
return new MySplitEnumerator(enumContext);
}
@Override
public SimpleVersionedSerializer<MySplit> getSplitSerializer() {
return new MySplitSerializer();
}
@Override
public SimpleVersionedSerializer<NoOpEnumState> getEnumeratorCheckpointSerializer() {
return new MyEumStateSerializer();
}
@Override
public SourceReader<T, MySplit> createReader(SourceReaderContext readerContext) throws Exception {
return new MySourceReader<>(readerContext,sourceName, period, dbProps, sql, clazz);
}
}
读取Reader
import com.alibaba.druid.pool.DruidDataSourceFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.SingleColumnRowMapper;
import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
@Slf4j
public class MySourceReader<T> implements SourceReader<T, MySplit> {
/**
* 定期读取时间,单位ms
*/
private final long period;
private final String sourceName;
private final JdbcTemplate jdbcTemplate;
private List<MySplit> splits;
/**
* 查询sql
*/
private final String sql;
private final Class<T> clazz;
private RowMapper<T> rowMapper;
private final SourceReaderContext readerContext;
private Boolean notFirst = false;
public MySourceReader(SourceReaderContext readerContext, String sourceName, long period, Properties dbProps,
String sql, Class<T> clazz) {
this.readerContext = readerContext;
this.period = period;
this.sourceName = sourceName;
this.sql = sql;
this.clazz = clazz;
try {
DataSource dataSource = DruidDataSourceFactory.createDataSource(dbProps);
jdbcTemplate = new JdbcTemplate(dataSource);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void start() {
if (clazz == String.class || clazz.isPrimitive()) {
rowMapper = SingleColumnRowMapper.newInstance(clazz);
} else {
rowMapper = BeanPropertyRowMapper.newInstance(clazz);
}
readerContext.sendSplitRequest();
splits = new ArrayList<>();
log.info("数据源:{},sql:{}", sourceName, sql);
}
/**
* 如何读取数据,实现需要保证不阻塞(即,不能sleep,wait等)
* The implementation must make sure this method is non-blocking.
*
* @param output ReaderOutput
* @return InputStatus
* @throws Exception Exception
*/
@Override
public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
if (notFirst) {
// 不是第一个直接退出
log.warn("{}不是第一个reader,退出", readerContext.getIndexOfSubtask());
return InputStatus.END_OF_INPUT;
}
if (splits.isEmpty()) {
// 向SplitEnumerator申请Split
readerContext.sendSplitRequest();
// 无界流,未来可能会有
return InputStatus.NOTHING_AVAILABLE;
}
try {
List<T> list = jdbcTemplate.query(sql, rowMapper);
if (!list.isEmpty()) {
for (T t : list) {
output.collect(t);
}
}
} catch (DataAccessException | IllegalArgumentException e) {
log.warn("无法读取数据源:'{}'", sourceName, e);
// 无界流,未来可能会有
return InputStatus.NOTHING_AVAILABLE;
}
// 无界流,未来可能会有
return InputStatus.NOTHING_AVAILABLE;
}
@Override
public List<MySplit> snapshotState(long checkpointId) {
// 定期读,不需要做状态管理,每次都是最新的
return Collections.emptyList();
}
@Override
public CompletableFuture<Void> isAvailable() {
return CompletableFuture.runAsync(this::sleepPeriod);
}
private void sleepPeriod() {
log.info("等待{} ms之后进行{}数据源的读取", period, sourceName);
synchronized (this) {
try {
wait(period);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
@Override
public void addSplits(List<MySplit> splits) {
// 申请到Split之后进行回调
this.splits.addAll(splits);
}
@Override
public void notifyNoMoreSplits() {
notFirst = true;
}
@Override
public void close() throws Exception {
}
}
分片Split
import org.apache.flink.api.connector.source.SourceSplit;
import java.util.UUID;
public class MySplit implements SourceSplit {
@Override
public String splitId() {
return UUID.randomUUID().toString().replace("-", "");
}
}
状态
// 无状态,占位
public class NoOpEnumState {
}
Split序列化器
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
public class MySplitSerializer implements SimpleVersionedSerializer<MySplit> {
@Override
public int getVersion() {
return 0;
}
@Override
public byte[] serialize(MySplit obj) throws IOException {
return new byte[0];
}
@Override
public MySplit deserialize(int version, byte[] serialized) throws IOException {
return new MySplit();
}
}
状态序列化器
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
public class MyEumStateSerializer implements SimpleVersionedSerializer<NoOpEnumState> {
@Override
public int getVersion() {
return 0;
}
@Override
public byte[] serialize(NoOpEnumState obj) throws IOException {
return new byte[0];
}
@Override
public NoOpEnumState deserialize(int version, byte[] serialized) throws IOException {
return new NoOpEnumState();
}
}
标签:return,Flink,private,source,API,org,Override,import,public
From: https://www.cnblogs.com/xixisix/p/18640613