首页 > 编程语言 >Spark 源码分析(一) SparkRpc中序列化与反序列化Serializer的抽象类解读 (java序列化部分完结,正在更新Kryo序列化部分~)

Spark 源码分析(一) SparkRpc中序列化与反序列化Serializer的抽象类解读 (java序列化部分完结,正在更新Kryo序列化部分~)

时间:2025-01-09 12:32:17浏览次数:3  
标签:java val deserializeStream loader 源码 new 序列化 def

目录

(3) JavaSerializerInstance 定义了一个Java序列化实例

(1) 构造方法参数

(2) 方法1:serializeStream

(3) 方法2:deserializeStream defaultClassLoader

(4) 方法3:deserializeStream loader

(5) 方法4:serialize

(6) 方法5 : deserialize loader

(7) 方法6:deserialize defaultClassLoader


(3) JavaSerializerInstance 定义了一个Java序列化实例

private[spark] class JavaSerializerInstance(

counterReset: Int,

extraDebugInfo: Boolean,

defaultClassLoader: ClassLoader)

extends SerializerInstance {



override def serialize[T: ClassTag](t: T): ByteBuffer = {

val bos = new ByteBufferOutputStream()

val out = serializeStream(bos)

out.writeObject(t)

out.close()

bos.toByteBuffer

}



override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {

val bis = new ByteBufferInputStream(bytes)

val in = deserializeStream(bis)

in.readObject()

}



override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {

val bis = new ByteBufferInputStream(bytes)

val in = deserializeStream(bis, loader)

in.readObject()

}



override def serializeStream(s: OutputStream): SerializationStream = {

new JavaSerializationStream(s, counterReset, extraDebugInfo)

}



override def deserializeStream(s: InputStream): DeserializationStream = {

new JavaDeserializationStream(s, defaultClassLoader)

}



def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {

new JavaDeserializationStream(s, loader)

}

}

这个类,事实上我们在昨天的代码示例中已经自己编写运行了,【这是一个链接】 主要完成的工作就是把java序列化进行实例化,封装好各种方法,下面简单讲述封装的方法,如果有好好编写昨天的代码的同学,那对于今天的代码理解肯定是不困难的。

带着问题看:这个实例类都构造了什么东西?

(1) 构造方法参数

counterReset: Int :一个计数器,用于判断是否reset()(用于清除数据流缓存的方法)

extraDebugInfo: Boolean :于判断是否输出多余日志信息

defaultClassLoader: ClassLoader :用于确定类加载器,防止出现版本问题

这些传入的构造参数,在后面的过程中,会成为构造JavaSerializationStream的参数

(2) 方法1:serializeStream
override def serializeStream(s: OutputStream): SerializationStream = {

new JavaSerializationStream(s, counterReset, extraDebugInfo)

}

重写了SerializerInstance【前文跳转】中的方法,内容是构造一个JavaSerializationStream【前文跳转

(3) 方法2:deserializeStream defaultClassLoader
override def deserializeStream(s: InputStream): DeserializationStream = {

new JavaDeserializationStream(s, defaultClassLoader)

}

类似的,重写了deserializeStream【前文跳转】方法,构造了一个JavaDeserializationStream
前文跳转

略有不同的是,类加载器是默认也就是defaultClassLoader

(4) 方法3:deserializeStream loader
def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {

new JavaDeserializationStream(s, loader)

}

类似的,重写了deserializeStream 方法【前文跳转
构造了一个JavaDeserializationStream 【前文跳转

略有不同的是,类加载器是指定的传入的参数确定的类加载器

(5) 方法4:serialize
override def serialize[T: ClassTag](t: T): ByteBuffer = {

val bos = new ByteBufferOutputStream()

val out = serializeStream(bos)

out.writeObject(t)

out.close()

bos.toByteBuffer

}

请看前文“代码例子1”【前文跳转】

(6) 方法5 : deserialize loader
override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {

val bis = new ByteBufferInputStream(bytes)

val in = deserializeStream(bis, loader)

in.readObject()

}

请看前文“代码例子2”【前文跳转】略有不同的是,指定类加载器

(7) 方法6:deserialize defaultClassLoader
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {

val bis = new ByteBufferInputStream(bytes)

val in = deserializeStream(bis)

in.readObject()

}

请看前文“代码例子2”【前文跳转】类加载器是默认加载器

好了,Spark中Java序列化的部分就结束了,希望有所收获

接下来展开的是Spark中的另一种序列化实现方式,带着问题看,为什么新方式效率更高?

Java 序列化的性能问题:
  • 开销较大:Java 默认序列化使用反射机制,因此相对较慢,特别是当序列化的数据结构较复杂时,反射的性能瓶颈尤为突出。
  • 数据体积大:Java 默认序列化生成的数据文件较大,会导致内存占用较高,并增加网络传输的延迟。
Kryo 序列化的优势:
  • 小巧高效:Kryo 序列化生成的数据更加紧凑,减少了数据传输的开销,也减轻了内存压力。
  • 适合大数据量:在处理大量数据时,Kryo 能够显著提升性能,尤其是在 Shuffle 操作中,Kryo 的优势更加明显。

让我们一起来看一看吧

有什么问题可以直接私信博主,或者评论区说一说,我会一条条看的

还是那句话,多敲代码!!!

标签:java,val,deserializeStream,loader,源码,new,序列化,def
From: https://blog.csdn.net/Peng909157372/article/details/145030772

相关文章

  • 蓝易云 - 用JAVA实现文件分隔功能教程
    当你需要用Java实现文件分隔功能时,你可以按照以下步骤进行操作。文件分隔功能指将一个大文件分隔成多个小文件,以方便处理和传输。步骤1:导入JavaIO包首先,在Java程序中导入JavaIO包,以便后续处理文件操作。importjava.io.*;​步骤2:实现文件分隔功能接下来,你可以创建一个J......
  • 普通的maven项目将main函数打包实现java -jar来运行
    一、创建一个maven项目假设groupId为org.example则在src\main\java目录下创建一个org包,在其下创建一个example包,然后创建我们打包后要执行java类MyThread.java二、在pom文件中添加以下build插件声明在<project>标签内添加<build><plugins><plugin><g......
  • springboot高校实验室安全管理系统-计算机毕业设计源码73839
    目 录摘要1绪论1.1研究背景1.2 选题意义1.3研究方案1.4论文章节安排2相关技术介绍2.1B/S结构2.2SpringBoot框架2.3Java语言2.4MySQL数据库3系统分析3.1可行性分析3.2 系统功能性分析3.3.非功能性分析3.4 系统用例分析3.5系统流......
  • 【源码+文档+调试讲解】微信小程序的英语学习激励系统
    摘要网络技术的快速发展给各行各业带来了很大的突破,也给各行各业提供了一种新的管理技术,对于微信小程序的英语学习激励系统将又是一个传统管理到智能化信息管理的典型案例,对于传统的英语学习激励管理,所包括的信息内容比较多,用户想要对这些数据进行管理维护需要花费很大的时间......
  • linux下启动第二个RocketMQ,报错java.lang.RuntimeException: Lock failed,MQ already
    报错如下图: 这种情况下启动两个broker,基本都会在第二个broker,报lockfailed,MQalreadystarted因为使用了相同的默认配置(只启动一个broker不受影响) 不同的配置,需求满足最基本的配置不同brokerName不同brokerId不同listenPort不同storePathRootDir......
  • Java Druid 面试题
    Druid连接池在项目中有哪些优势?性能优越:Druid采用了高效的连接管理机制,可以快速地创建和回收数据库连接,减少了连接的创建和销毁带来的性能开销。监控与统计:Druid提供了详细的监控信息,包括连接池的状态、SQL执行的统计信息等,这有助于性能调优和问题诊断。SQL日志记录:Druid内置......
  • 【毕业设计】A049-基于Java的大学城水电管理系统的设计与实现
    ......
  • springboot少儿体育培训机构管理系统源码毕设+论文
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着社会经济的迅速发展和人们对健康意识的提升,少儿体育培训逐渐成为家长们关注的热点。近年来,各类少儿体育培训机构如雨后春笋般涌现,不仅丰富了孩子......
  • springboot基于web的在线视频网站设计与实现源码毕设+论文
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着互联网技术的飞速发展,网络视频已成为人们日常生活中不可或缺的一部分。从短视频分享到长篇影视观看,视频内容以其直观、生动、信息量大等特点,深受......
  • 2025毕设springboot 病人跟踪治疗系统设计与实现论文+源码
    系统程序文件列表开题报告内容研究背景在现代医疗体系中,病人跟踪治疗是确保患者康复的重要环节。随着人口老龄化和慢性病患者的增多,医疗资源的分配和管理变得日益复杂。传统的纸质病历记录方式不仅效率低下,还容易出错,无法满足现代医疗对高效、精准、全面跟踪治疗的需求。因......