首页 > 其他分享 >Kafka 序列化

Kafka 序列化

时间:2023-02-24 14:11:06浏览次数:52  
标签:String encoding Kafka 序列化 data public name

序列化

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给 Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象

消息的 key 和 value 都使用字符串,对应程序中的序列化器也使用了客户端自带的 org.apache.kafka.common.serialization.StringSerializer,除了用于 String 类型的序列化器,还有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 这几种类型,它们都实现了 org.apache.kafka.common.serialization.Serializer 接口,此接口有3个方法:

public void configure(Map<String, ?> configs, boolean isKey)
public byte[] serialize(String topic, T data)
public void close()

configure() 方法用来配置当前类,serialize() 方法用来执行序列化操作。而 close() 方法用来关闭当前的序列化器,一般情况下 close() 是一个空方法,如果实现了此方法,则必须确保此方法的幂等性,因为这个方法很可能会被 KafkaProducer 调用多次

生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的,如果生产者使用了某种序列化器,比如 StringSerializer,而消费者使用了另一种序列化器,比如 IntegerSerializer,那么是无法解析出想要的数据的。

下面就以 StringSerializer 为例来看看 Serializer 接口中的3个方法的使用方法。

//代码清单4-1 StringSerializer的代码实现
public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" :
                "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing " +
                    "string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

首先是 configure() 方法,这个方法是在创建 KafkaProducer 实例的时候调用的,主要用来确定编码类型,不过一般客户端对于 key.serializer.encoding、value.serializer. encoding 和 serializer.encoding 这几个参数都不会配置,在 KafkaProducer 的参数集合(ProducerConfig)里也没有这几个参数(它们可以看作用户自定义的参数),所以一般情况下 encoding 的值就为默认的“UTF-8”。serialize() 方法非常直观,就是将 String 类型转为 byte[] 类型。

如果 Kafka 客户端提供的几种序列化器都无法满足应用需求,则可以选择使用如 Avro、JSON、Thrift、ProtoBuf 和 Protostuff 等通用的序列化工具来实现,或者使用自定义类型的序列化器来实现。下面就以一个简单的例子来介绍自定义类型的使用方法。

假设我们要发送的消息都是 Company 对象,这个 Company 的定义很简单,只有名称 name 和地址 address,示例代码参考如下(为了构建方便,示例中使用了 lombok 工具):

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Company {
    private String name;
    private String address;
}

下面我们再来看一下 Company 对应的序列化器 CompanySerializer。

//代码清单4-2 自定义的序列化器CompanySerializer
public class CompanySerializer implements Serializer<Company> {
    @Override
    public void configure(Map configs, boolean isKey) {}

    @Override
    public byte[] serialize(String topic, Company data) {
        if (data == null) {
            return null;
        }
        byte[] name, address;
        try {
            if (data.getName() != null) {
                name = data.getName().getBytes("UTF-8");
            } else {
                name = new byte[0];
            }
            if (data.getAddress() != null) {
                address = data.getAddress().getBytes("UTF-8");
            } else {
                address = new byte[0];
            }
            ByteBuffer buffer = ByteBuffer.
                    allocate(4+4+name.length + address.length);
            buffer.putInt(name.length);
            buffer.put(name);
            buffer.putInt(address.length);
            buffer.put(address);
            return buffer.array();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return new byte[0];
    }

    @Override
    public void close() {}
}

上面的这段代码的逻辑很简单,configure() 和close() 方法也都为空。如何使用自定义的序列化器 CompanySerializer 呢?只需将 KafkaProducer 的 value.serializer 参数设置为 CompanySerializer 类的全限定名即可。

//代码清单4-3 自定义序列化器使用示例
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        CompanySerializer.class.getName());
properties.put("bootstrap.servers", brokerList);

KafkaProducer<String, Company> producer =
        new KafkaProducer<>(properties);
Company company = Company.builder().name("hiddenkafka")
        .address("China").build();
ProducerRecord<String, Company> record =
        new ProducerRecord<>(topic, company);
producer.send(record).get();

标签:String,encoding,Kafka,序列化,data,public,name
From: https://www.cnblogs.com/fxh0707/p/17151280.html

相关文章

  • 使用JsonTextReader提高Json.NET反序列化的性能
    一、碰到的问题在服务器的文件系统上有一个业务生成的BigTable.json文件,其可能包含的JSON字符串很大,同时里边的集合会包含很多的记录;我们使用以下的代码来反序列化,虽然使......
  • Kafka 上手实战
    参数配置bootstrap.servers:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以......
  • 33-DRF框架-反序列化使用
    #增加修改#把参数转成model对象,操作数据库#步骤:#1.创建序列化对象data传递参数进行验证#2.is_validate()函数验证#3.通过可以使用validated......
  • 深入学习jquery源码之序列化表单
    深入学习jquery源码之序列化表单serialize()概述序列表表格内容为字符串。序列表表格内容为字符串,用于Ajax请求。<pid="results"><b>Results:</b></p><form><select......
  • 想完全弄懂kafka?看这篇就够了
    有人说世界上有三个伟大的发明:火,轮子,以及Kafka。发展到现在,ApacheKafka无疑是很成功的,Confluent公司曾表示世界五百强中有三分之一的企业在使用Kafka。今天便和大家分......
  • 32-DRF框架-序列化器ModelSerializer
    #如果我们想要使用序列化器对应的是Django的模型类,DRF为我们提供了ModelSerializer模型类序列化器来帮助我们快速创建一个Serializer类创建modelserializer序列化器#......
  • c#如何使用MemoryStream和BinaryFormatter进行对象的序列化和返序列化
    c#如何使用MemoryStream和BinaryFormatter进行对象的序列化和返序列化  1下面是我写的一个序列化的类public static classObjSerialize{///<summar......
  • kafka常用操作
    如果把一个项目/微服务当成一个消费组,那么一个topic可能在多个消费组【一个topic被多个项目订阅】,一个消费组可能有多个topic。【一个项目订阅了多个topic】。一个消费组内......
  • Serializers 序列化 新增一个字段(处理后赋值) 扔给前端 (实现表和表解偶)
    #模型层classtbl_project_category(models.Model):depart_id=models.IntegerField(verbose_name='部门id',default=0)pro_name=models.CharField(max_len......
  • 30-DRF框架-Serializer序列化器
    #作用:序列化器可以进行数据的校验,对数据对象进行转换#DjangoRESTframework中的序列化使用类来定义,须继承自rest_framework.serializers.Serializer#Serializer......