首页 > 其他分享 >Dubbo——Buffer 缓冲区

Dubbo——Buffer 缓冲区

时间:2022-10-19 11:00:48浏览次数:54  
标签:Dubbo readerIndex Buffer int length ChannelBuffer 缓冲区 方法 public

前言

Buffer 是一种字节容器,在 Netty 等 NIO 框架中都有类似的设计,例如,Java NIO 中的ByteBuffer、Netty4 中的 ByteBuf。Dubbo 抽象出了 ChannelBuffer 接口对底层 NIO 框架中的 Buffer 设计进行统一,其子类如下图所示:

ChannelBuffer 接口

ChannelBuffer 接口的设计与 Netty4 中 ByteBuf 抽象类的设计基本一致,也有 readerIndex 和 writerIndex 指针的概念,如下所示,它们的核心方法也是如出一辙。

  • getBytes()、setBytes() 方法:从参数指定的位置读、写当前 ChannelBuffer,不会修改 readerIndex 和 writerIndex 指针的位置。

  • readBytes() 、writeBytes() 方法:也是读、写当前 ChannelBuffer,但是 readBytes() 方法会从 readerIndex 指针开始读取数据,并移动 readerIndex 指针;writeBytes() 方法会从 writerIndex 指针位置开始写入数据,并移动 writerIndex 指针。

  • markReaderIndex()、markWriterIndex() 方法:记录当前 readerIndex 指针和 writerIndex 指针的位置,一般会和 resetReaderIndex()、resetWriterIndex() 方法配套使用。resetReaderIndex() 方法会将 readerIndex 指针重置到 markReaderIndex() 方法标记的位置,resetwriterIndex() 方法同理。

  • capacity()、clear()、copy() 等辅助方法用来获取 ChannelBuffer 容量以及实现清理、拷贝数据的功能,这里不再赘述。

  • factory() 方法:该方法返回创建 ChannelBuffer 的工厂对象,ChannelBufferFactory 中定义了多个 getBuffer() 方法重载来创建 ChannelBuffer,如下图所示,这些 ChannelBufferFactory的实现都是单例的。

AbstractChannelBuffer

AbstractChannelBuffer 抽象类实现了 ChannelBuffer 接口的大部分方法,其核心是维护了以下四个索引。

  • readerIndex、writerIndex(int 类型):通过 readBytes() 方法及其重载读取数据时,会后移 readerIndex 索引;通过 writeBytes() 方法及其重载写入数据的时候,会后移 writerIndex 索引。

  • markedReaderIndex、markedWriterIndex(int 类型):实现记录 readerIndex(writerIndex)以及回滚 readerIndex(writerIndex)的功能。

AbstractChannelBuffer 中 readBytes() 和 writeBytes() 方法的各个重载最终会通过 getBytes() 方法和 setBytes() 方法实现数据的读写,这些方法在 AbstractChannelBuffer 子类中实现。下面以读写一个 byte 数组为例,进行介绍:

public abstract class AbstractChannelBuffer implements ChannelBuffer {

    private int readerIndex;

    private int writerIndex;
	
	public void readBytes(byte[] dst, int dstIndex, int length) {
		// 检测可读字节数是否足够
	    checkReadableBytes(length);
		// 将readerIndex之后的length个字节数读取到dst数组中dstIndex~
		// dstIndex+length的位置
	    getBytes(readerIndex, dst, dstIndex, length);
		// 将readerIndex后移length个字节
	    readerIndex += length;
	}	
	
	public void writeBytes(byte[] src, int srcIndex, int length) {
		// 将src数组中srcIndex~srcIndex+length的数据写入当前buffer中
		// writerIndex~writerIndex+length的位置
	    setBytes(writerIndex, src, srcIndex, length);
		// 将writeIndex后移length个字节
	    writerIndex += length;
	}
}

Buffer 各实现类解析

了解了 ChannelBuffer 接口的核心方法以及 AbstractChannelBuffer 的公共实现之后,我们再来看 ChannelBuffer 的具体实现。

 

HeapChannelBuffer 是基于字节数组的 ChannelBuffer 实现,我们可以看到其中有一个 array(byte[]数组)字段,它就是 HeapChannelBuffer 存储数据的地方。HeapChannelBuffer 的 setBytes() 以及 getBytes() 方法实现是调用 System.arraycopy() 方法完成数组操作的,具体实现如下:

public class HeapChannelBuffer extends AbstractChannelBuffer {

    protected final byte[] array;
	
    @Override
    public void getBytes(int index, byte[] dst, int dstIndex, int length) {
        System.arraycopy(array, index, dst, dstIndex, length);
    }
	
    @Override
    public void setBytes(int index, byte[] src, int srcIndex, int length) {
        System.arraycopy(src, srcIndex, array, index, length);
    }
}

HeapChannelBuffer 对应的 ChannelBufferFactory 实现是 HeapChannelBufferFactory,其 getBuffer() 方法会通过 ChannelBuffers 这个工具类创建一个指定大小 HeapChannelBuffer 对象,下面简单介绍两个 getBuffer() 方法重载:

public class HeapChannelBufferFactory implements ChannelBufferFactory {

    @Override
    public ChannelBuffer getBuffer(int capacity) {
		// 新建一个HeapChannelBuffer,底层的会新建一个长度为capacity的byte数组
        return ChannelBuffers.buffer(capacity);
    }

    @Override
    public ChannelBuffer getBuffer(byte[] array, int offset, int length) {
		// 新建一个HeapChannelBuffer,并且会拷贝array数组中offset~offset+lenght
		// 的数据到新HeapChannelBuffer中
        return ChannelBuffers.wrappedBuffer(array, offset, length);
    }
}

DynamicChannelBuffer

DynamicChannelBuffer 可以认为是其他 ChannelBuffer 的装饰器,它可以为其他 ChannelBuffer 添加动态扩展容量的功能。DynamicChannelBuffer 中有两个核心字段:

  • buffer(ChannelBuffer 类型),是被修饰的 ChannelBuffer,默认为HeapChannelBuffer。

  • factory(ChannelBufferFactory 类型),用于创建被修饰的 HeapChannelBuffer 对象的 ChannelBufferFactory 工厂,默认为 HeapChannelBufferFactory。

DynamicChannelBuffer 需要关注的是 ensureWritableBytes() 方法,该方法实现了动态扩容的功能,在每次写入数据之前,都需要调用该方法确定当前可用空间是否足够,调用位置如下图所示:

ensureWritableBytes() 方法如果检测到底层 ChannelBuffer 对象的空间不足,则会创建一个新的 ChannelBuffer(空间扩大为原来的两倍),然后将原来 ChannelBuffer 中的数据拷贝到新 ChannelBuffer 中,最后将 buffer 字段指向新 ChannelBuffer 对象,完成整个扩容操作。ensureWritableBytes() 方法的具体实现如下:

public class DynamicChannelBuffer extends AbstractChannelBuffer {

    private ChannelBuffer buffer;
	
    @Override
    public void ensureWritableBytes(int minWritableBytes) {
        if (minWritableBytes <= writableBytes()) {
            return;
        }

        int newCapacity;
        if (capacity() == 0) {
            newCapacity = 1;
        } else {
            newCapacity = capacity();
        }
        int minNewCapacity = writerIndex() + minWritableBytes;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }

        ChannelBuffer newBuffer = factory().getBuffer(newCapacity);
        newBuffer.writeBytes(buffer, 0, writerIndex());
        buffer = newBuffer;
    }
}

ByteBufferBackedChannelBuffer

ByteBufferBackedChannelBuffer 是基于 Java NIO 中 ByteBuffer 的 ChannelBuffer 实现,其中的方法基本都是通过组合 ByteBuffer 的 API 实现的。下面以 getBytes() 方法和 setBytes() 方法的一个重载为例,进行分析:

public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {

	public void getBytes(int index, byte[] dst, int dstIndex, int length) {
	    ByteBuffer data = buffer.duplicate();
	    try {
	        // 移动ByteBuffer中的指针
	        data.limit(index + length).position(index);
	    } catch (IllegalArgumentException e) {
	        throw new IndexOutOfBoundsException();
	    }
	    // 通过ByteBuffer的get()方法实现读取
	    data.get(dst, dstIndex, length);
	}
	
	public void setBytes(int index, byte[] src, int srcIndex, int length) {
	    ByteBuffer data = buffer.duplicate();
	    // 移动ByteBuffer中的指针
	    data.limit(index + length).position(index);
	    // 将数据写入底层的ByteBuffer中
	    data.put(src, srcIndex, length);
	}
}

NettyBackedChannelBuffer

NettyBackedChannelBuffer 是基于 Netty 中 ByteBuf 的 ChannelBuffer 实现,Netty 中的 ByteBuf 内部维护了 readerIndex 和 writerIndex 以及 markedReaderIndex、markedWriterIndex 这四个索引,所以 NettyBackedChannelBuffer 没有再继承 AbstractChannelBuffer 抽象类,而是直接实现了 ChannelBuffer 接口。

 

NettyBackedChannelBuffer 对 ChannelBuffer 接口的实现都是调用底层封装的 Netty ByteBuf 实现的。

相关 Stream 以及门面类

在 ChannelBuffer 基础上,Dubbo 提供了一套输入输出流。

 

ChannelBufferInputStream 底层封装了一个 ChannelBuffer,其实现 InputStream 接口的 read*() 方法全部都是从 ChannelBuffer 中读取数据。ChannelBufferInputStream 中还维护了一个 startIndex 和一个endIndex 索引,作为读取数据的起止位置。ChannelBufferOutputStream 与 ChannelBufferInputStream 类似,会向底层的 ChannelBuffer 写入数据。

 

最后要介绍 ChannelBuffers 这个门面类,下图展示了 ChannelBuffers 这个门面类的所有方法:

对这些方法进行分类,可归纳出如下这些方法。

  • dynamicBuffer() 方法:创建 DynamicChannelBuffer 对象,初始化大小由第一个参数指定,默认为 256。

  • buffer() 方法:创建指定大小的 HeapChannelBuffer 对象。

  • wrappedBuffer() 方法:将传入的 byte[] 数字封装成 HeapChannelBuffer 对象。

  • directBuffer() 方法:创建 ByteBufferBackedChannelBuffer 对象,需要注意的是,底层的 ByteBuffer 使用的堆外内存,需要特别关注堆外内存的管理。

  • equals() 方法:用于比较两个 ChannelBuffer 是否相同,其中会逐个比较两个 ChannelBuffer 中的前 7 个可读字节,只有两者完全一致,才算两个 ChannelBuffer 相同。其核心实现如下示例代码:

public final class ChannelBuffers {

	public static boolean equals(ChannelBuffer bufferA, ChannelBuffer bufferB) {
	    final int aLen = bufferA.readableBytes();
	    if (aLen != bufferB.readableBytes()) { 
	        return false; // 比较两个ChannelBuffer的可读字节数
	    }
	    final int byteCount = aLen & 7; // 只比较前7个字节
	    int aIndex = bufferA.readerIndex();
	    int bIndex = bufferB.readerIndex();
	    for (int i = byteCount; i > 0; i--) {
	        if (bufferA.getByte(aIndex) != bufferB.getByte(bIndex)) {
	            return false; // 前7个字节发现不同,则返回false
	        }
	        aIndex++;
	        bIndex++;
	    }
	    return true;
	}
}
  • compare() 方法:用于比较两个 ChannelBuffer 的大小,会逐个比较两个 ChannelBuffer 中的全部可读字节,具体实现与 equals() 方法类似。

标签:Dubbo,readerIndex,Buffer,int,length,ChannelBuffer,缓冲区,方法,public
From: https://blog.51cto.com/u_14014612/5769027

相关文章

  • Dubbo——ExtensionLoader源码解析
    前言ExtensionLoader,从字面理解,拓展组件加载器,是Dubbo里用来加载器内部SPI(ServiceProviderInterface)的加载器。在dubbo框架中,每一个SPI接口都对应着自己的ExtensionLoade......
  • Dubbo——时间轮(Time Wheel)算法应用
    定时任务Netty、Quartz、Kafka以及Linux都有定时任务功能。 JDK自带的java.util.Timer和DelayedQueue可实现简单的定时任务,底层用的是堆,存取复杂度都是O(nlog(......
  • dubbo 源码解析----- 服务引用
    转自:https://blog.csdn.net/beichen8641/article/details/104815163 在Dubbo中,我们可以通过两种方式引用远程服务。第一种是使用服务直连的方式引用服务,第二种方式是......
  • Dubbo——Dubbo中的URL统一资源模型与Dubbo协议
    一、URL简介在互联网领域,每个信息资源都有统一的且在网上唯一的地址,该地址就叫URL(UniformResourceLocator,统一资源定位符),它是互联网的统一资源定位标志,也就是指网络地址......
  • Dubbo——Dubbo SPI解析(上)
    前言Dubbo为了更好地达到OCP原则(即“对扩展开放,对修改封闭”的原则),采用了“微内核+插件”的架构。那什么是微内核架构呢?微内核架构也被称为插件化架构(Plug-inArchitect......
  • StringBuffer方法
    StringBuffer:应用场景:频繁操作字符串内容时比如数据源拉取数据到本地仓库、文件上传下载等。主要方法:append:把字符添加到缓冲区的末端;insert:在指......
  • MYsql中BufferPool缓存机制
    1.当修改一条数据时,会将数据从磁盘文件中读入到缓存中,2.然后将此版本记录到undolog日志文件中生成版本链,便于回滚3.更新bufferpool中的数据4.将缓存的数据记录到redolog......
  • Java NIO——缓冲区Buffer
    基本介绍缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能......
  • MongooseError: Operation `logs.insertOne()` buffering timed out after 10000ms
    我有个model总报错:2022-10-1700:22:15:logadd:MongooseError:Operation`logs.insertOne()`bufferingtimedoutafter10000ms0|tinyurl|atTimeout.<ano......
  • 缓冲区溢出实验
    本次实验采用环境为Ubuntulinux64操作系统,在实验楼云环境进行实验介绍:缓冲区溢出是指程序试图向缓冲区写入超出预分配固定长度数据的情况。这一漏洞可以被恶意用户利用......