目录
(3) JavaSerializerInstance 定义了一个Java序列化实例
(3) 方法2:deserializeStream defaultClassLoader
(4) 方法3:deserializeStream loader
(7) 方法6:deserialize defaultClassLoader
(3) JavaSerializerInstance 定义了一个Java序列化实例
private[spark] class JavaSerializerInstance(
counterReset: Int,
extraDebugInfo: Boolean,
defaultClassLoader: ClassLoader)
extends SerializerInstance {
override def serialize[T: ClassTag](t: T): ByteBuffer = {
val bos = new ByteBufferOutputStream()
val out = serializeStream(bos)
out.writeObject(t)
out.close()
bos.toByteBuffer
}
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
val bis = new ByteBufferInputStream(bytes)
val in = deserializeStream(bis)
in.readObject()
}
override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
val bis = new ByteBufferInputStream(bytes)
val in = deserializeStream(bis, loader)
in.readObject()
}
override def serializeStream(s: OutputStream): SerializationStream = {
new JavaSerializationStream(s, counterReset, extraDebugInfo)
}
override def deserializeStream(s: InputStream): DeserializationStream = {
new JavaDeserializationStream(s, defaultClassLoader)
}
def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
new JavaDeserializationStream(s, loader)
}
}
这个类,事实上我们在昨天的代码示例中已经自己编写运行了,【这是一个链接】 主要完成的工作就是把java序列化进行实例化,封装好各种方法,下面简单讲述封装的方法,如果有好好编写昨天的代码的同学,那对于今天的代码理解肯定是不困难的。
带着问题看:这个实例类都构造了什么东西?
(1) 构造方法参数
counterReset: Int :一个计数器,用于判断是否reset()(用于清除数据流缓存的方法)
extraDebugInfo: Boolean :于判断是否输出多余日志信息
defaultClassLoader: ClassLoader :用于确定类加载器,防止出现版本问题
这些传入的构造参数,在后面的过程中,会成为构造JavaSerializationStream的参数
(2) 方法1:serializeStream
override def serializeStream(s: OutputStream): SerializationStream = {
new JavaSerializationStream(s, counterReset, extraDebugInfo)
}
重写了SerializerInstance【前文跳转】中的方法,内容是构造一个JavaSerializationStream【前文跳转】
(3) 方法2:deserializeStream defaultClassLoader
override def deserializeStream(s: InputStream): DeserializationStream = {
new JavaDeserializationStream(s, defaultClassLoader)
}
类似的,重写了deserializeStream【前文跳转】方法,构造了一个JavaDeserializationStream
【前文跳转】
略有不同的是,类加载器是默认,也就是defaultClassLoader
(4) 方法3:deserializeStream loader
def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
new JavaDeserializationStream(s, loader)
}
类似的,重写了deserializeStream 方法【前文跳转】
构造了一个JavaDeserializationStream 【前文跳转】
略有不同的是,类加载器是指定的,传入的参数确定的类加载器
(5) 方法4:serialize
override def serialize[T: ClassTag](t: T): ByteBuffer = {
val bos = new ByteBufferOutputStream()
val out = serializeStream(bos)
out.writeObject(t)
out.close()
bos.toByteBuffer
}
请看前文“代码例子1”【前文跳转】
(6) 方法5 : deserialize loader
override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
val bis = new ByteBufferInputStream(bytes)
val in = deserializeStream(bis, loader)
in.readObject()
}
请看前文“代码例子2”【前文跳转】略有不同的是,指定类加载器
(7) 方法6:deserialize defaultClassLoader
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
val bis = new ByteBufferInputStream(bytes)
val in = deserializeStream(bis)
in.readObject()
}
请看前文“代码例子2”【前文跳转】类加载器是默认加载器
好了,Spark中Java序列化的部分就结束了,希望有所收获
接下来展开的是Spark中的另一种序列化实现方式,带着问题看,为什么新方式效率更高?
Java 序列化的性能问题:
- 开销较大:Java 默认序列化使用反射机制,因此相对较慢,特别是当序列化的数据结构较复杂时,反射的性能瓶颈尤为突出。
- 数据体积大:Java 默认序列化生成的数据文件较大,会导致内存占用较高,并增加网络传输的延迟。
Kryo 序列化的优势:
- 小巧高效:Kryo 序列化生成的数据更加紧凑,减少了数据传输的开销,也减轻了内存压力。
- 适合大数据量:在处理大量数据时,Kryo 能够显著提升性能,尤其是在 Shuffle 操作中,Kryo 的优势更加明显。
让我们一起来看一看吧
有什么问题可以直接私信博主,或者评论区说一说,我会一条条看的
还是那句话,多敲代码!!!
标签:java,val,deserializeStream,loader,源码,new,序列化,def From: https://blog.csdn.net/Peng909157372/article/details/145030772