首页 > 其他分享 >[Flink] Flink 序列化器

[Flink] Flink 序列化器

时间:2024-08-20 12:04:24浏览次数:18  
标签:flink Flink kafka org apache import 序列化 public

1 概述:Flink (反)序列化器

简述

  • 序列化器:多用于 Sink 输出时
  • 反序列化器:多用于 Source 读取时

依赖包及版本

  • 依赖包及版本信息(汇总)
org.apache.kafka:kafka-clients:${kafka-clients.version=2.4.1}

org.apache.flink:flink-java:${flink.version=1.12.6}
org.apache.flink:flink-clients_${scala.version=2.11}:${flink.version}
org.apache.flink:flink-streaming-java_${scala.version}:${flink.version}
org.apache.flink:flink-connector-kafka_${scala.version}:${flink.version}
org.apache.flink:flink-statebackend-rocksdb_${scala.version}:${flink.version}

//org.apache.flink:flink-table-api-java-bridge_${scala.version}:${flink.version}
//org.apache.flink:flink-table-planner-blink_${scala.version}:${flink.version}

//com.alibaba.ververica:flink-connector-mysql-cdc:1.3.0
...

2 Flink (反)序列化器的种类

Kafka 反序列化器

Deserializer + KafkaConsumer 【推荐/普通JAVA应用】

  • 核心API:
  • org.apache.kafka.common.serialization.Deserializer
  • org.apache.kafka.clients.consumer.KafkaConsumer
  • org.apache.kafka.clients.consumer.ConsumerRecords / org.apache.kafka.clients.consumer.ConsumerRecord
  • 依赖库 : kafka-clients:2.4.1

  • 使用案例

  • 定义反序列化器

public class CompanyDeserializer implements Deserializer<Company>

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanyDeserializer implements Deserializer<Company> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public Company deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        ByteBuffer buffer = ByteBuffer.wrap(data);
        int nameLen, addressLen;
        String name, address;
        nameLen = buffer.getInt();
        byte[] nameBytes = new byte[nameLen];
        buffer.get(nameBytes);
        addressLen = buffer.getInt();
        byte[] addressBytes = new byte[addressLen];
        buffer.get(addressBytes);
        try {
            name = new String(nameBytes, "UTF-8");
            address = new String(addressBytes, "UTF-8");
        } catch (UnsupportedEncodingException ex) {
            throw new SerializationException("Error:"+ex.getMessage());
        }
        return new Company(name,address);

    }

    @Override
    public void close() {

    }
}
  • 使用反序列化器
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class CompanyConsumer {
    public static void main(String[] args) {
        Properties properties=new Properties();
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxx.xxx.xxx.xxx:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"debug-group");
        KafkaConsumer<String, Company> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Collections.singletonList("companyTopic"));
        while(true){
            ConsumerRecords<String,Company> consumerRecords=kafkaConsumer.poll(Duration.ofMillis(1000));
            for(ConsumerRecord<String,Company> consumerRecord: consumerRecords){
                System.out.println(consumerRecord.value());
            }
        }
    }
}

补充:使用案例2

//org.apache.kafka.clients.consumer.ConsumerConfig
//org.apache.kafka.clients.consumer.KafkaConsumer
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaDeserializerType.STRING_DESERIALIZER.getDeserializer());//key.deserializer
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaDeserializerType.BYTE_ARRAY_DESERIALIZER.getDeserializer());//value.deserializer

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(properties)
...
//org.apache.kafka.clients.consumer.ConsumerRecords
ConsumerRecords<String, byte[]> records = onsumer.poll(1000);
  • 核心API:
  • org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer
  • org.apache.flink.connector.kafka.source.KafkaSourceBuilder : Flink 社区推荐使用
  • org.apache.flink.connector.kafka.source.KafkaSource : Flink 社区推荐使用
  • org.apache.kafka.clients.consumer.ConsumerRecord

相关文章

  • (八)Flink Join 连接
    在分布式数据处理中,JOIN是一个非常重要的操作。Flink的JOIN是用于将两个数据流按照一定的条件进行连接,生成新的数据流。Flink双流JOIN主要分为两大类:一类是基于窗口的JOIN操作,另一类是基于原生State的Connect算子操作。其中基于窗口的JOIN可细分为WindowJoin......
  • 【第68课】Java安全&原生反序列化&SpringBoot攻防&heapdump提取&CVE
    免责声明本文发布的工具和脚本,仅用作测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。如果任何单位或个人认为该项目的脚本可能涉嫌侵犯其权利,则应及时通知并提供身份证明,所有权证明,我们将在收到认证文件后删除相关内容。文中所涉......
  • 反序列化刷题(一)
    反序列化刷题web255将isvip改为true然后序列化echourlencode($v=serialize($f=newctfShowUser()));Cookie:O%3A11%3A%22ctfShowUser%22%3A3%3A%7Bs%3A8%3A%22username%22%3Bs%3A6%3A%22xxxxxx%22%3Bs%3A8%3A%22password%22%3Bs%3A6%3A%22xxxxxx%22%3Bs%3A5%3A%22isVip%22%3......
  • 易优flink 友情链接-EyouCms手册
    【基础用法】名称:flink功能:用于获取友情链接列表。语法:{eyou:flinktype='text'row='30'titlelen='15'}{$field.title}{/eyou:flink}参数:type=''链接类型,text为文字链接,image为图片链接,all为全部链接row='30'链接类型数量titlelen='100'标题长度......
  • 对象流,序列化和反序列化 day18
    packagecom.shujia.day18.ketang;importjava.io.*;/*序列化流:序列化:将一个对象转换成网络中传输的流对象输出流:ObjectOutputStream将一个类的对象写进文本中反序列化:将网络中传输的流还原成一个对象对象输入流:Object......
  • JAVA中的序列化
    Java序列化是一种机制,它可以将对象状态转换为可存储或可传输的形式。序列化后的对象可以在网络上传输,或者保存到文件、数据库等存储介质中。在Java中,序列化通过实现 java.io.Serializable接口来实现。本文将详细介绍Java序列化的概念、实现方式、优缺点以及代码示例。一、序......
  • JAVA中的反序列化
    Java反序列化是将之前序列化存储的对象状态信息重新恢复为Java对象的过程。这个过程与序列化是相反的,它允许程序从字节流中重建对象,这对于网络传输、对象持久化以及分布式系统中的对象传递至关重要。下面将详细介绍Java反序列化的概念、实现方式、安全注意事项,并通过一个......
  • flink车联网项目:业务实现2(维表开发)(第68天)
    系列文章目录3.2维表开发3.2.1创建库3.2.2示例3.2.2.1类型转换3.2.2.2创建mysql映射表3.2.2.3创建paimon映射表3.2.2.4从mysql插入到paimon表3.2.2.5结果查看3.2.2.6测试3.2.3其他表开发3.2.4部署文章目录系列文章目录前言3.2维表开发3.2.1创建......
  • flink + iceberg 快速搭建指南
    flink+iceberg快速搭建theenvironmentincludes:minioicebergflinkCentos更换tencent的yum源备份系统旧配置文件mv/etc/yum.repos.d/CentOS-Base.repo/etc/yum.repos.d/CentOS-Base.repo.backup获取对应版本的CentOS-Base.repo到/etc/yum.repos.d/目录各版......