1 @Override 2 public final void read() { 3 final ChannelConfig config = config(); 4 if (shouldBreakReadReady(config)) { 5 clearReadPending(); 6 return; 7 } 8 final ChannelPipeline pipeline = pipeline(); 9 final ByteBufAllocator allocator = config.getAllocator(); 10 //获取自适应缓冲区分配器对象(第一次进来才会创建) 11 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); 12 //重置分配器对象 13 allocHandle.reset(config); 14 15 ByteBuf byteBuf = null; 16 boolean close = false; 17 try { 18 do { 19 //通过分配器分配默认大小为1024的ByteBuf(其大小会自适应调整,具体变化规则下面解析) 20 byteBuf = allocHandle.allocate(allocator); 21 //1.doReadBytes(byteBuf):先根据byteBuf大小来设置attemptedBytesRead属性(在while判断中会用到此属性),然后byteBuf尝试读取最大为attemptedBytesRead的数据 22 //2.allocHandle.lastBytesRead():记录lastBytesRead和totalBytesRead,这里可能会触发一次自适应调整 23 allocHandle.lastBytesRead(doReadBytes(byteBuf)); 24 if (allocHandle.lastBytesRead() <= 0) { 25 // nothing was read. release the buffer. 26 byteBuf.release(); 27 byteBuf = null; 28 close = allocHandle.lastBytesRead() < 0; 29 if (close) { 30 // There is nothing left to read as we received an EOF. 31 readPending = false; 32 } 33 break; 34 } 35 36 //增加已经读取消息的次数 37 allocHandle.incMessagesRead(1); 38 readPending = false; 39 //将已经读取到的数据抛给处理器链的channelRead处理(正常业务逻辑在这里处理) 40 pipeline.fireChannelRead(byteBuf); 41 byteBuf = null; 42 } while (allocHandle.continueReading()); //判断是否需要继续读取数据 43 44 //触发一次自适应调整 45 allocHandle.readComplete(); 46 //读取完成后触发处理器链的channelReadComplete 47 pipeline.fireChannelReadComplete(); 48 49 if (close) { 50 closeOnRead(pipeline); 51 } 52 } catch (Throwable t) { 53 handleReadException(pipeline, byteBuf, t, close, allocHandle); 54 } finally { 55 // Check if there is a readPending which was not processed yet. 56 // This could be for two reasons: 57 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method 58 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method 59 // 60 // See https://github.com/netty/netty/issues/2254 61 if (!readPending && !config.isAutoRead()) { 62 removeReadOp(); 63 } 64 } 65 }
public RecvByteBufAllocator.Handle recvBufAllocHandle() { //为空创建 if (recvHandle == null) { //这里创建的是AdaptiveRecvByteBufAllocator.HandleImpl实例 recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle; }
public void reset(ChannelConfig config) { this.config = config; //默认16 maxMessagePerRead = maxMessagesPerRead(); totalMessages = totalBytesRead = 0; }
public ByteBuf allocate(ByteBufAllocator alloc) { //创建一个由AdaptiveRecvByteBufAllocator.HandleImpl推测的容量大小的ByteBuf return alloc.ioBuffer(guess()); } public int guess() { //这里返回的是AdaptiveRecvByteBufAllocator.HandleImpl里推测容量,自适应调整变化的就是此值的大小 return nextReceiveBufferSize; } public ByteBuf ioBuffer(int initialCapacity) { if (PlatformDependent.hasUnsafe()) { //分配直接内存(堆外内存) return directBuffer(initialCapacity); } return heapBuffer(initialCapacity); }
1 protected int doReadBytes(ByteBuf byteBuf) throws Exception { 2 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); 3 //设置attemptedBytesRead属性,大小为byteBuf的可写大小 4 allocHandle.attemptedBytesRead(byteBuf.writableBytes()); 5 //尝试读取最大为attemptedBytesRead的数据 6 return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); 7 } 8 9 AdaptiveRecvByteBufAllocator.HandleImpl 10 public void lastBytesRead(int bytes) { 11 // If we read as much as we asked for we should check if we need to ramp up the size of our next guess. 12 // This helps adjust more quickly when large amounts of data is pending and can avoid going back to 13 // the selector to check for more data. Going back to the selector can add significant latency for large 14 // data transfers. 15 //判断当前读取到的数据与推测大小是否一致,一致的话进行一次扩容处理 16 if (bytes == attemptedBytesRead()) { 17 record(bytes); 18 } 19 //调用父类的lastBytesRead方法 20 super.lastBytesRead(bytes); 21 } 22 23 private void record(int actualReadBytes) { 24 //SIZE_TABLE里保存着有序的递增的16-1073741824(到512前,每次递增16;512后每次*2) 25 //INDEX_DECREMENT 默认 1, INDEX_INCREMENT 默认 4 26 //真实读取到数据的大小小于等于SIZE_TABLE索引前两位的值大小时,第一次触发不会缩容,第二次触发会缩容为SIZE_TABLE索引前一位的值(若index - 1 < minIndex,index值为minIndex) 27 if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) { 28 if (decreaseNow) { 29 index = max(index - INDEX_DECREMENT, minIndex); 30 nextReceiveBufferSize = SIZE_TABLE[index]; 31 decreaseNow = false; 32 } else { 33 //第一次触发不会缩容 34 decreaseNow = true; 35 } 36 } else if (actualReadBytes >= nextReceiveBufferSize) { //真实读取到数据大于等于推测大小时,直接扩容为SIZE_TABLE索引后四位的值(若index + 4 > maxIndex,index值为maxIndex) 37 index = min(index + INDEX_INCREMENT, maxIndex); 38 nextReceiveBufferSize = SIZE_TABLE[index]; 39 decreaseNow = false; 40 } 41 } 42 43 DefaultMaxMessagesRecvByteBufAllocator 44 public void lastBytesRead(int bytes) { 45 //将读取到的数据赋值给lastBytesRead 46 lastBytesRead = bytes; 47 if (bytes > 0) { 48 //将读取到的数据叠加到totalBytesRead 49 totalBytesRead += bytes; 50 } 51 }
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead > 0; } DefaultChannelConfig public boolean isAutoRead() { //autoRead默认为1,因此该判断为true return autoRead == 1; } DefaultMaxMessagesRecvByteBufAllocator public boolean get() { //当本次读操作读取到的字节数与AdaptiveRecvByteBufAllocator推测出的ByteBuf容量大小不一样时,就会返回false;否则返回true。如果本次读操作可读取的字节大于了attemptedBytesRead的话,一次读操作也只会先读取attemptedBytesRead的字节数 return attemptedBytesRead == lastBytesRead; } //totalMessages < maxMessagePerRead:根据上面的流程我们可以知道,maxMessagePerRead为16,totalMessages为读循环已经执行的读操作次数(即,循环次数)。 //totalBytesRead > 0:当本次读操作有读取到字节数时,或者以读取到的字节数小于Integer.MAX_VALUE,那么该判断都会大于0,即,为true;否则为false。
public void readComplete() { //读结束后,触发一次自适应调整 record(totalBytesRead()); } protected final int totalBytesRead() { //本次已读取数据总和 return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead; }
扩展:ByteToMessageDecoder.channelRead()
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; //判断累加区是否为空 first = cumulation == null; if (first) { //为空,直接将字节容器的指针指向新读取的数据 cumulation = data; } else { //不为空,调用累加器累加数据。可能会触发一次扩容 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } //数据传递给业务 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { //没有累加区中有数据可读时,清空 if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; //如果连续16次(discardAfterReads的默认值),累加区中仍然有未被业务拆包器读取的数据,那就做一次压缩,有效数据段整体移到容器首部(粘包后拆包时会存在此情况) } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; //压缩,即丢弃已读数据 discardSomeReadBytes(); } //传递业务数据包给业务解码器处理(拆包时,在callDecode()里就会交给业务解码器处理了,这边通常是处理的是最后一次拆包,即数据只有一个完整包时,直接走这边) int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); } }
private Cumulator cumulator = MERGE_CUMULATOR; public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { try { final ByteBuf buffer; //容器不够本次追加 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1 || cumulation.isReadOnly()) { // Expand cumulation (by replace it) when either there is not more room in the buffer // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or // duplicate().retain() or if its read-only. // // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 //追加扩容 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } //将新数据累加到字节容器中 buffer.writeBytes(in); return buffer; } finally { // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw // for whatever release (for example because of OutOfMemoryError) in.release(); } } }; static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { ByteBuf oldCumulation = cumulation; //扩容也是一个内存拷贝操作,新增的大小即是新读取数据的大小 cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); cumulation.writeBytes(oldCumulation); oldCumulation.release(); return cumulation; }
/** * Called once data should be decoded from the given {@link ByteBuf}. This method will call * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place. * * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to * @param in the {@link ByteBuf} from which to read data * @param out the {@link List} to which decoded messages should be added */ protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { int outSize = out.size(); //若有已经拆出来的数据包,交给后面的处理器链处理 if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear(); // Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. // // See: // - https://github.com/netty/netty/issues/4635 if (ctx.isRemoved()) { break; } outSize = 0; } //记录字节容器中有多少字节 int oldInputLength = in.readableBytes(); decodeRemovalReentryProtection(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } if (outSize == out.size()) { //拆包器未读取任何数据 if (oldInputLength == in.readableBytes()) { break; } else { continue; } } //拆包器未读取任何数据,已经解到了数据包 if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); } } final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { decodeState = STATE_CALLING_CHILD_DECODE; try { //待实现的业务处理逻辑 decode(ctx, in, out); } finally { boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING; decodeState = STATE_INIT; if (removePending) { handlerRemoved(ctx); } } }
标签:netty,读取,read,ctx,源码,ByteBuf,cumulation,public,out From: https://www.cnblogs.com/gumanlou/p/16603043.html