更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群
Source Connector
本文将主要介绍负责数据读取的组件SourceReader:SourceReader
每个SourceReader都在独立的线程中执行,只要我们保证SourceSplitCoordinator分配给不同SourceReader的切片没有交集,在SourceReader的执行周期中,我们就可以不考虑任何有关并发的细节。SourceReader接口
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable { void start(); void pollNext(SourcePipeline<T> pipeline) throws Exception; void addSplits(List<SplitT> splits); /** * Check source reader has more elements or not. */ boolean hasMoreElements(); /** * There will no more split will send to this source reader. * Source reader could be exited after process all assigned split. */ default void notifyNoMoreSplits() { } /** * Process all events which from {@link SourceSplitCoordinator}. */ default void handleSourceEvent(SourceEvent sourceEvent) { } /** * Store the split to the external system to recover when task failed. */ List<SplitT> snapshotState(long checkpointId); /** * When all tasks finished snapshot, notify checkpoint complete will be invoked. */ default void notifyCheckpointComplete(long checkpointId) throws Exception { } interface Context { TypeInfo<?>[] getTypeInfos(); String[] getFieldNames(); int getIndexOfSubtask(); void sendSplitRequest(); } }
构造方法
这里需要完成和数据源访问各种配置的提取,比如数据库库名表名、消息队列cluster和topic、身份认证的配置等等。示例
public RocketMQSourceReader(BitSailConfiguration readerConfiguration, Context context, Boundedness boundedness) { this.readerConfiguration = readerConfiguration; this.boundedness = boundedness; this.context = context; this.assignedRocketMQSplits = Sets.newHashSet(); this.finishedRocketMQSplits = Sets.newHashSet(); this.deserializationSchema = new RocketMQDeserializationSchema( readerConfiguration, context.getTypeInfos(), context.getFieldNames()); this.noMoreSplits = false; cluster = readerConfiguration.get(RocketMQSourceOptions.CLUSTER); topic = readerConfiguration.get(RocketMQSourceOptions.TOPIC); consumerGroup = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_GROUP); consumerTag = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_TAG); pollBatchSize = readerConfiguration.get(RocketMQSourceOptions.POLL_BATCH_SIZE); pollTimeout = readerConfiguration.get(RocketMQSourceOptions.POLL_TIMEOUT); commitInCheckpoint = readerConfiguration.get(RocketMQSourceOptions.COMMIT_IN_CHECKPOINT); accessKey = readerConfiguration.get(RocketMQSourceOptions.ACCESS_KEY); secretKey = readerConfiguration.get(RocketMQSourceOptions.SECRET_KEY); }
start方法
初始化数据源的访问对象,例如数据库的执行对象、消息队列的consumer对象或者文件系统的连接。示例
消息队列public void start() { try { if (StringUtils.isNotEmpty(accessKey) && StringUtils.isNotEmpty(secretKey)) { AclClientRPCHook aclClientRPCHook = new AclClientRPCHook( new SessionCredentials(accessKey, secretKey)); consumer = new DefaultMQPullConsumer(aclClientRPCHook); } else { consumer = new DefaultMQPullConsumer(); } consumer.setConsumerGroup(consumerGroup); consumer.setNamesrvAddr(cluster); consumer.setInstanceName(String.format(SOURCE_READER_INSTANCE_NAME_TEMPLATE, cluster, topic, consumerGroup, UUID.randomUUID())); consumer.setConsumerPullTimeoutMillis(pollTimeout); consumer.start(); } catch (Exception e) { throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e); } }数据库
public void start() { this.connection = connectionHolder.connect(); // Construct statement. String baseSql = ClickhouseJdbcUtils.getQuerySql(dbName, tableName, columnInfos); String querySql = ClickhouseJdbcUtils.decorateSql(baseSql, splitField, filterSql, maxFetchCount, true); try { this.statement = connection.prepareStatement(querySql); } catch (SQLException e) { throw new RuntimeException("Failed to prepare statement.", e); } LOG.info("Task {} started.", subTaskId); }FTP
public void start() { this.ftpHandler.loginFtpServer(); if (this.ftpHandler.getFtpConfig().getSkipFirstLine()) { this.skipFirstLine = true; } }
addSplits方法
将SourceSplitCoordinator给当前Reader分配的Splits列表添加到自己的处理队列(Queue)或者集合(Set)中。示例
public void addSplits(List<RocketMQSplit> splits) { LOG.info("Subtask {} received {}(s) new splits, splits = {}.", context.getIndexOfSubtask(), CollectionUtils.size(splits), splits); assignedRocketMQSplits.addAll(splits); }
hasMoreElements方法
在无界的流计算场景中,会一直返回true保证Reader线程不被销毁。 在批式场景中,分配给该Reader的切片处理完之后会返回false,表示该Reader生命周期的结束。public boolean hasMoreElements() { if (boundedness == Boundedness.UNBOUNDEDNESS) { return true; } if (noMoreSplits) { return CollectionUtils.size(assignedRocketMQSplits) != 0; } return true; }
pollNext方法
在addSplits方法添加完成切片处理队列且hasMoreElements返回true时,该方法调用,开发者实现此方法真正和数据交互。 开发者在实现pollNext方法时候需要关注下列问题:-
切片数据的读取
- 从构造好的切片中去读取数据。
-
数据类型的转换
- 将外部数据转换成BitSail的Row类型
示例
以RocketMQSourceReader为例: 从split队列中选取split进行处理,读取其信息,之后需要将读取到的信息转换成BitSail的Row类型,发送给下游处理。public void pollNext(SourcePipeline<Row> pipeline) throws Exception { for (RocketMQSplit rocketmqSplit : assignedRocketMQSplits) { MessageQueue messageQueue = rocketmqSplit.getMessageQueue(); PullResult pullResult = consumer.pull(rocketmqSplit.getMessageQueue(), consumerTag, rocketmqSplit.getStartOffset(), pollBatchSize, pollTimeout); if (Objects.isNull(pullResult) || CollectionUtils.isEmpty(pullResult.getMsgFoundList())) { continue; } for (MessageExt message : pullResult.getMsgFoundList()) { Row deserialize = deserializationSchema.deserialize(message.getBody()); pipeline.output(deserialize); if (rocketmqSplit.getStartOffset() >= rocketmqSplit.getEndOffset()) { LOG.info("Subtask {} rocketmq split {} in end of stream.", context.getIndexOfSubtask(), rocketmqSplit); finishedRocketMQSplits.add(rocketmqSplit); break; } } rocketmqSplit.setStartOffset(pullResult.getNextBeginOffset()); if (!commitInCheckpoint) { consumer.updateConsumeOffset(messageQueue, pullResult.getMaxOffset()); } } assignedRocketMQSplits.removeAll(finishedRocketMQSplits); }
转换为BitSail Row类型的常用方式
自定义RowDeserializer类
对于不同格式的列应用不同converter,设置到相应Row的Field。public class ClickhouseRowDeserializer { interface FiledConverter { Object apply(ResultSet resultSet) throws SQLException; } private final List<FiledConverter> converters; private final int fieldSize; public ClickhouseRowDeserializer(TypeInfo<?>[] typeInfos) { this.fieldSize = typeInfos.length; this.converters = new ArrayList<>(); for (int i = 0; i < fieldSize; ++i) { converters.add(initFieldConverter(i + 1, typeInfos[i])); } } public Row convert(ResultSet resultSet) { Row row = new Row(fieldSize); try { for (int i = 0; i < fieldSize; ++i) { row.setField(i, converters.get(i).apply(resultSet)); } } catch (SQLException e) { throw BitSailException.asBitSailException(ClickhouseErrorCode.CONVERT_ERROR, e.getCause()); } return row; } private FiledConverter initFieldConverter(int index, TypeInfo<?> typeInfo) { if (!(typeInfo instanceof BasicTypeInfo)) { throw BitSailException.asBitSailException(CommonErrorCode.UNSUPPORTED_COLUMN_TYPE, typeInfo.getTypeClass().getName() + " is not supported yet."); } Class<?> curClass = typeInfo.getTypeClass(); if (TypeInfos.BYTE_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getByte(index); } if (TypeInfos.SHORT_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getShort(index); } if (TypeInfos.INT_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getInt(index); } if (TypeInfos.LONG_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getLong(index); } if (TypeInfos.BIG_INTEGER_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> { BigDecimal dec = resultSet.getBigDecimal(index); return dec == null ? null : dec.toBigInteger(); }; } if (TypeInfos.FLOAT_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getFloat(index); } if (TypeInfos.DOUBLE_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getDouble(index); } if (TypeInfos.BIG_DECIMAL_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getBigDecimal(index); } if (TypeInfos.STRING_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getString(index); } if (TypeInfos.SQL_DATE_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getDate(index); } if (TypeInfos.SQL_TIMESTAMP_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getTimestamp(index); } if (TypeInfos.SQL_TIME_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getTime(index); } if (TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getBoolean(index); } if (TypeInfos.VOID_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> null; } throw new UnsupportedOperationException("Unsupported data type: " + typeInfo); } }
实现DeserializationSchema接口
相对于实现RowDeserializer,我们更希望大家去实现一个继承DeserializationSchema接口的实现类,将一定类型格式的数据对数据比如JSON、CSV转换为BitSail Row类型。 在具体的应用时,我们可以使用统一的接口创建相应的实现类public class TextInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> { private BitSailConfiguration deserializationConfiguration; private TypeInfo<?>[] typeInfos; private String[] fieldNames; private transient DeserializationSchema<byte[], Row> deserializationSchema; public TextInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration, TypeInfo<?>[] typeInfos, String[] fieldNames) { this.deserializationConfiguration = deserializationConfiguration; this.typeInfos = typeInfos; this.fieldNames = fieldNames; ContentType contentType = ContentType.valueOf( deserializationConfiguration.getNecessaryOption(HadoopReaderOptions.CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase()); switch (contentType) { case CSV: this.deserializationSchema = new CsvDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames); break; case JSON: this.deserializationSchema = new JsonDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames); break; default: throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType); } } @Override public Row deserialize(Writable message) { return deserializationSchema.deserialize((message.toString()).getBytes()); } @Override public boolean isEndOfStream(Row nextElement) { return false; } }也可以自定义当前需要解析类专用的DeserializationSchema:
public class MapredParquetInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> { private final BitSailConfiguration deserializationConfiguration; private final transient DateTimeFormatter localDateTimeFormatter; private final transient DateTimeFormatter localDateFormatter; private final transient DateTimeFormatter localTimeFormatter; private final int fieldSize; private final TypeInfo<?>[] typeInfos; private final String[] fieldNames; private final List<DeserializationConverter> converters; public MapredParquetInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration, TypeInfo<?>[] typeInfos, String[] fieldNames) { this.deserializationConfiguration = deserializationConfiguration; this.typeInfos = typeInfos; this.fieldNames = fieldNames; this.localDateTimeFormatter = DateTimeFormatter.ofPattern( deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_TIME_PATTERN)); this.localDateFormatter = DateTimeFormatter .ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_PATTERN)); this.localTimeFormatter = DateTimeFormatter .ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.TIME_PATTERN)); this.fieldSize = typeInfos.length; this.converters = Arrays.stream(typeInfos).map(this::createTypeInfoConverter).collect(Collectors.toList()); } @Override public Row deserialize(Writable message) { int arity = fieldNames.length; Row row = new Row(arity); Writable[] writables = ((ArrayWritable) message).get(); for (int i = 0; i < fieldSize; ++i) { row.setField(i, converters.get(i).convert(writables[i].toString())); } return row; } @Override public boolean isEndOfStream(Row nextElement) { return false; } private interface DeserializationConverter extends Serializable { Object convert(String input); } private DeserializationConverter createTypeInfoConverter(TypeInfo<?> typeInfo) { Class<?> typeClass = typeInfo.getTypeClass(); if (typeClass == TypeInfos.VOID_TYPE_INFO.getTypeClass()) { return field -> null; } if (typeClass == TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass()) { return this::convertToBoolean; } if (typeClass == TypeInfos.INT_TYPE_INFO.getTypeClass()) { return this::convertToInt; } throw BitSailException.asBitSailException(CsvFormatErrorCode.CSV_FORMAT_COVERT_FAILED, String.format("Csv format converter not support type info: %s.", typeInfo)); } private boolean convertToBoolean(String field) { return Boolean.parseBoolean(field.trim()); } private int convertToInt(String field) { return Integer.parseInt(field.trim()); } }
snapshotState方法
生成并保存State的快照信息,用于ckeckpoint。示例
public List<RocketMQSplit> snapshotState(long checkpointId) { LOG.info("Subtask {} start snapshotting for checkpoint id = {}.", context.getIndexOfSubtask(), checkpointId); if (commitInCheckpoint) { for (RocketMQSplit rocketMQSplit : assignedRocketMQSplits) { try { consumer.updateConsumeOffset(rocketMQSplit.getMessageQueue(), rocketMQSplit.getStartOffset()); LOG.debug("Subtask {} committed message queue = {} in checkpoint id = {}.", context.getIndexOfSubtask(), rocketMQSplit.getMessageQueue(), checkpointId); } catch (MQClientException e) { throw new RuntimeException(e); } } } return Lists.newArrayList(assignedRocketMQSplits); }
hasMoreElements方法
每次调用pollNext方法之前会做sourceReader.hasMoreElements()的判断,当且仅当判断通过,pollNext方法才会被调用。示例
public boolean hasMoreElements() { if (noMoreSplits) { return CollectionUtils.size(assignedHadoopSplits) != 0; } return true; }
notifyNoMoreSplits方法
当Reader处理完所有切片之后,会调用此方法。示例
public void notifyNoMoreSplits() { LOG.info("Subtask {} received no more split signal.", context.getIndexOfSubtask()); noMoreSplits = true; }【关于BitSail】: ⭐️ Star 不迷路 https://github.com/bytedance/bitsail 提交问题和建议:https://github.com/bytedance/bitsail/issues 贡献代码:https://github.com/bytedance/bitsail/pulls BitSail官网:https://bytedance.github.io/bitsail/zh/ 订阅邮件列表:bitsail+subscribe@googlegroups.com 加入BitSail技术社群
标签:return,resultSet,SourceReader,public,Connector,private,getTypeClass,TYPE,BitSail From: https://www.cnblogs.com/bytedata/p/17637073.html