1 概述:Flink (反)序列化器
简述
- 序列化器:多用于 Sink 输出时
- 反序列化器:多用于 Source 读取时
依赖包及版本
- 依赖包及版本信息(汇总)
org.apache.kafka:kafka-clients:${kafka-clients.version=2.4.1}
org.apache.flink:flink-java:${flink.version=1.12.6}
org.apache.flink:flink-clients_${scala.version=2.11}:${flink.version}
org.apache.flink:flink-streaming-java_${scala.version}:${flink.version}
org.apache.flink:flink-connector-kafka_${scala.version}:${flink.version}
org.apache.flink:flink-statebackend-rocksdb_${scala.version}:${flink.version}
//org.apache.flink:flink-table-api-java-bridge_${scala.version}:${flink.version}
//org.apache.flink:flink-table-planner-blink_${scala.version}:${flink.version}
//com.alibaba.ververica:flink-connector-mysql-cdc:1.3.0
...
2 Flink (反)序列化器的种类
Kafka 反序列化器
Deserializer
+ KafkaConsumer
【推荐/普通JAVA应用】
- 核心API:
org.apache.kafka.common.serialization.Deserializer
org.apache.kafka.clients.consumer.KafkaConsumer
org.apache.kafka.clients.consumer.ConsumerRecords
/org.apache.kafka.clients.consumer.ConsumerRecord
-
依赖库 :
kafka-clients:2.4.1
-
使用案例
- 定义反序列化器
public class CompanyDeserializer implements Deserializer<Company>
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CompanyDeserializer implements Deserializer<Company> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public Company deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
ByteBuffer buffer = ByteBuffer.wrap(data);
int nameLen, addressLen;
String name, address;
nameLen = buffer.getInt();
byte[] nameBytes = new byte[nameLen];
buffer.get(nameBytes);
addressLen = buffer.getInt();
byte[] addressBytes = new byte[addressLen];
buffer.get(addressBytes);
try {
name = new String(nameBytes, "UTF-8");
address = new String(addressBytes, "UTF-8");
} catch (UnsupportedEncodingException ex) {
throw new SerializationException("Error:"+ex.getMessage());
}
return new Company(name,address);
}
@Override
public void close() {
}
}
- 使用反序列化器
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class CompanyConsumer {
public static void main(String[] args) {
Properties properties=new Properties();
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxx.xxx.xxx.xxx:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"debug-group");
KafkaConsumer<String, Company> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Collections.singletonList("companyTopic"));
while(true){
ConsumerRecords<String,Company> consumerRecords=kafkaConsumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String,Company> consumerRecord: consumerRecords){
System.out.println(consumerRecord.value());
}
}
}
}
补充:使用案例2
//org.apache.kafka.clients.consumer.ConsumerConfig
//org.apache.kafka.clients.consumer.KafkaConsumer
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaDeserializerType.STRING_DESERIALIZER.getDeserializer());//key.deserializer
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaDeserializerType.BYTE_ARRAY_DESERIALIZER.getDeserializer());//value.deserializer
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(properties)
...
//org.apache.kafka.clients.consumer.ConsumerRecords
ConsumerRecords<String, byte[]> records = onsumer.poll(1000);
KafkaRecordDeserializer + KafkaSource(Builder) 【推荐/Flink】
- 核心API:
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer
org.apache.flink.connector.kafka.source.KafkaSourceBuilder
: Flink 社区推荐使用org.apache.flink.connector.kafka.source.KafkaSource
: Flink 社区推荐使用org.apache.kafka.clients.consumer.ConsumerRecord