有关Avro Logical Types的序列化,官网给的文档十分粗糙,这里给出详细的序列和反序列化方法
1. 本地
1.1 Logical Type在avro文件中的写法
{
"type": "record",
"name": "User",
"namespace": "org.example",
"fields":[
{"name":"id","type":"string"},
{"name":"date","type":{"type": "int", "logicalType": "date"}},
{"name":"timestamp","type":{"type": "long", "logicalType": "timestamp-millis"}},
{"name":"value","type":{"type":"bytes","logicalType":"decimal","precision":22,"scale":2}}
]
}
以date为例,avro在序列化后实际存储的是int格式,反序列化时转换回date类型,即int是date的实际类型,每个Logical Type的实际类型可在官网找到
1.2 序列化和反序列化
//读入schema
Schema schema = new Schema.Parser().parse(Main.class.getClassLoader().getResourceAsStream("avro/view.avsc"));
//创建一个record
GenericData.Record record = new GenericData.Record(schema);
record.put("id", "001");
record.put("value", BigDecimal.valueOf(67.78));
record.put("date", LocalDate.now());
record.put("timestamp", Instant.now());
// 序列化
GenericDatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(schema);
//!!!最重要的一步,需要添加所需类型的转换器
writer.getData().addLogicalTypeConversion(new Conversions.DecimalConversion());
writer.getData().addLogicalTypeConversion(new TimeConversions.DateConversion());
writer.getData().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(outputStream, null);
writer.write(record, encoder);
//序列化完成,record转为了byte类型,此时就可以传输了
byte[] bytes = outputStream.toByteArray();
//反序列化
GenericDatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema);
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null);
GenericData.Record newRecord = null;
try {
newRecord = reader.read(null, decoder);
} catch (IOException e) {
throw new RuntimeException(e);
}
System.out.println(newRecord);
2.集成到kafka
主要是实现kafka的Serializer
接口,并在kafka的properties
中指定
2.1 发送端
完成Serializer
接口
public class GenericRecordSerializer implements Serializer<GenericData.Record> {
protected GenericDatumWriter<GenericData.Record> writer;
@Override
public byte[] serialize(String s, GenericData.Record record) {
writer = new GenericDatumWriter<>(record.getSchema());
writer.getData().addLogicalTypeConversion(new Conversions.DecimalConversion());
writer.getData().addLogicalTypeConversion(new TimeConversions.DateConversion());
writer.getData().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(outputStream, null);
try {
writer.write(record, encoder);
encoder.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
return outputStream.toByteArray();
}
}
在kafka
的properties
中指定
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GenericRecordSerializer.class);
Producer<String, GenericRecord> sender = new KafkaProducer<>(props);
2.2 接收端
完成Deserializer
接口
public class GenericRecordDeserializer implements Deserializer<GenericData.Record>{
//Deserializer同样需要添加转换器!!!
public GenericRecordDeserializer(){
GenericDatumWriter<GenericData.Record> writer = new GenericDatumWriter<>();
writer.getData().addLogicalTypeConversion(new Conversions.DecimalConversion());
writer.getData().addLogicalTypeConversion(new TimeConversions.DateConversion());
writer.getData().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
}
@Override
public GenericData.Record deserialize(String s, byte[] bytes) {
//指定Schema
Schema schema = ....
GenericDatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema);
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null);
GenericData.Record record;
try {
record = reader.read(null, decoder);
} catch (IOException e) {
throw new RuntimeException(e);
}
return record;
}
}
在kafka
的properties
中指定
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "record-consumer-14");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GenericRecordDeserializer.class);
KafkaConsumer<String, GenericData.Record> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("record-2"));
标签:writer,Avro,Kafka,record,props,put,new,序列化
From: https://www.cnblogs.com/INnoVationv2/p/17094166.html