通常我们使用 es,是通过 http 接口进行访问,es 在处理各种业务请求时遵循一个编程的范式(套路),如果了解了这个套路,对于阅读调试 es 的代码会非常轻松。
在 es 中,一个操作被称为 action,基类是 ActionType,如何处理这个操作的逻辑代码放在 TransportAction 中,这也是一个基类。
1 public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse> { 2 3 public final String actionName; 4 private final ActionFilter[] filters; 5 protected final TaskManager taskManager; 6 /** 7 * @deprecated declare your own logger. 8 */ 9 @Deprecated 10 protected Logger logger = LogManager.getLogger(getClass()); 11 12 protected TransportAction(String actionName, ActionFilters actionFilters, TaskManager taskManager) { 13 this.actionName = actionName; 14 this.filters = actionFilters.filters(); 15 this.taskManager = taskManager; 16 } 17 18 /** 19 * Use this method when the transport action should continue to run in the context of the current task 20 */ 21 public final void execute(Task task, Request request, ActionListener<Response> listener) { 22 final ActionRequestValidationException validationException; 23 try { 24 validationException = request.validate(); 25 } catch (Exception e) { 26 assert false : new AssertionError("validating of request [" + request + "] threw exception", e); 27 logger.warn("validating of request [" + request + "] threw exception", e); 28 listener.onFailure(e); 29 return; 30 } 31 if (validationException != null) { 32 listener.onFailure(validationException); 33 return; 34 } 35 if (task != null && request.getShouldStoreResult()) { 36 listener = new TaskResultStoringActionListener<>(taskManager, task, listener); 37 } 38 39 RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger); 40 requestFilterChain.proceed(task, actionName, request, listener); 41 } 42 43 protected abstract void doExecute(Task task, Request request, ActionListener<Response> listener); 44 45 private static class RequestFilterChain<Request extends ActionRequest, Response extends ActionResponse> 46 implements 47 ActionFilterChain<Request, Response> { 48 49 private final TransportAction<Request, Response> action; 50 private final AtomicInteger index = new AtomicInteger(); 51 private final Logger logger; 52 53 private RequestFilterChain(TransportAction<Request, Response> action, Logger logger) { 54 this.action = action; 55 this.logger = logger; 56 } 57 58 @Override 59 public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) { 60 int i = index.getAndIncrement(); 61 try { 62 if (i < this.action.filters.length) { 63 this.action.filters[i].apply(task, actionName, request, listener, this); 64 } else if (i == this.action.filters.length) { 65 this.action.doExecute(task, request, listener); 66 } else { 67 listener.onFailure(new IllegalStateException("proceed was called too many times")); 68 } 69 } catch (Exception e) { 70 logger.trace("Error during transport action execution.", e); 71 listener.onFailure(e); 72 } 73 } 74 75 } 76 77 /** 78 * Wrapper for an action listener that stores the result at the end of the execution 79 */ 80 private static class TaskResultStoringActionListener<Response extends ActionResponse> implements ActionListener<Response> { 81 private final ActionListener<Response> delegate; 82 private final Task task; 83 private final TaskManager taskManager; 84 85 private TaskResultStoringActionListener(TaskManager taskManager, Task task, ActionListener<Response> delegate) { 86 this.taskManager = taskManager; 87 this.task = task; 88 this.delegate = delegate; 89 } 90 91 @Override 92 public void onResponse(Response response) { 93 try { 94 taskManager.storeResult(task, response, delegate); 95 } catch (Exception e) { 96 delegate.onFailure(e); 97 } 98 } 99 100 @Override 101 public void onFailure(Exception e) { 102 try { 103 taskManager.storeResult(task, e, delegate); 104 } catch (Exception inner) { 105 inner.addSuppressed(e); 106 delegate.onFailure(inner); 107 } 108 } 109 } 110 }View Code
TransportAction 中抽象了 doExecute 方法,供具体的类实现。
以 getById 为例,对应的操作是 GetAction,处理这个操作的逻辑则放在 TransportGetAction 中,接下来着重分析 TransportGetAction。
public class TransportGetAction extends TransportSingleShardAction<GetRequest, GetResponse> { private final IndicesService indicesService; private final ExecutorSelector executorSelector; @Inject public TransportGetAction( ClusterService clusterService, TransportService transportService, IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ExecutorSelector executorSelector ) { super( GetAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, GetRequest::new, ThreadPool.Names.GET ); this.indicesService = indicesService; this.executorSelector = executorSelector; } @Override protected boolean resolveIndex(GetRequest request) { return true; } @Override protected ShardIterator shards(ClusterState state, InternalRequest request) { return clusterService.operationRouting() .getShards( clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), request.request().preference() ); } @Override protected void resolveRequest(ClusterState state, InternalRequest request) { // update the routing (request#index here is possibly an alias) request.request().routing(state.metadata().resolveIndexRouting(request.request().routing(), request.request().index())); } @Override protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); if (request.realtime()) { // we are not tied to a refresh cycle here anyway super.asyncShardOperation(request, shardId, listener); } else { indexShard.awaitShardSearchActive(b -> { try { super.asyncShardOperation(request, shardId, listener); } catch (Exception ex) { listener.onFailure(ex); } }); } } @Override protected GetResponse shardOperation(GetRequest request, ShardId shardId) throws IOException { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); if (request.refresh() && request.realtime() == false) { indexShard.refresh("refresh_flag_get"); } GetResult result = indexShard.getService() .get( request.id(), request.storedFields(), request.realtime(), request.version(), request.versionType(), request.fetchSourceContext(), request.isForceSyntheticSource() ); return new GetResponse(result); } @Override protected Writeable.Reader<GetResponse> getResponseReader() { return GetResponse::new; } @Override protected String getExecutor(GetRequest request, ShardId shardId) { final ClusterState clusterState = clusterService.state(); if (clusterState.metadata().getIndexSafe(shardId.getIndex()).isSystem()) { return executorSelector.executorForGet(shardId.getIndexName()); } else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) { return ThreadPool.Names.SEARCH_THROTTLED; } else { return super.getExecutor(request, shardId); } } }View Code
TransportGetAction 继承自 TransportSingleShardAction,对于 getById 的请求,从 TransportSingleShardAction 的 doExecute 方法开始进入:
public abstract class TransportSingleShardAction<Request extends SingleShardRequest<Request>, Response extends ActionResponse> extends TransportAction<Request, Response> { protected final ThreadPool threadPool; protected final ClusterService clusterService; protected final TransportService transportService; protected final IndexNameExpressionResolver indexNameExpressionResolver; private final String transportShardAction; private final String executor; protected TransportSingleShardAction( String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> request, String executor ) { super(actionName, actionFilters, transportService.getTaskManager()); this.threadPool = threadPool; this.clusterService = clusterService; this.transportService = transportService; this.indexNameExpressionResolver = indexNameExpressionResolver; this.transportShardAction = actionName + "[s]"; this.executor = executor; if (isSubAction() == false) { transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, request, new TransportHandler()); } transportService.registerRequestHandler(transportShardAction, ThreadPool.Names.SAME, request, new ShardTransportHandler()); } /** * Tells whether the action is a main one or a subaction. Used to decide whether we need to register * the main transport handler. In fact if the action is a subaction, its execute method * will be called locally to its parent action. */ protected boolean isSubAction() { return false; } @Override protected void doExecute(Task task, Request request, ActionListener<Response> listener) { new AsyncSingleAction(request, listener).start(); } protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException; protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException { threadPool.executor(getExecutor(request, shardId)).execute(ActionRunnable.supply(listener, () -> shardOperation(request, shardId))); } protected abstract Writeable.Reader<Response> getResponseReader(); protected abstract boolean resolveIndex(Request request); protected static ClusterBlockException checkGlobalBlock(ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.READ); } protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) { return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.concreteIndex()); } protected void resolveRequest(ClusterState state, InternalRequest request) { } /** * Returns the candidate shards to execute the operation on or <code>null</code> the execute * the operation locally (the node that received the request) */ @Nullable protected abstract ShardsIterator shards(ClusterState state, InternalRequest request); class AsyncSingleAction { private final ActionListener<Response> listener; private final ShardsIterator shardIt; private final InternalRequest internalRequest; private final DiscoveryNodes nodes; private volatile Exception lastFailure; private AsyncSingleAction(Request request, ActionListener<Response> listener) { this.listener = listener; ClusterState clusterState = clusterService.state(); if (logger.isTraceEnabled()) { logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version()); } nodes = clusterState.nodes(); ClusterBlockException blockException = checkGlobalBlock(clusterState); if (blockException != null) { throw blockException; } String concreteSingleIndex; if (resolveIndex(request)) { concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName(); } else { concreteSingleIndex = request.index(); } this.internalRequest = new InternalRequest(request, concreteSingleIndex); resolveRequest(clusterState, internalRequest); blockException = checkRequestBlock(clusterState, internalRequest); if (blockException != null) { throw blockException; } this.shardIt = shards(clusterState, internalRequest); } public void start() { if (shardIt == null) { // just execute it on the local node final Writeable.Reader<Response> reader = getResponseReader(); transportService.sendRequest( clusterService.localNode(), transportShardAction, internalRequest.request(), new TransportResponseHandler<Response>() { @Override public Response read(StreamInput in) throws IOException { return reader.read(in); } @Override public void handleResponse(final Response response) { listener.onResponse(response); } @Override public void handleException(TransportException exp) { listener.onFailure(exp); } } ); } else { perform(null); } } private void onFailure(ShardRouting shardRouting, Exception e) { if (e != null) { logger.trace(() -> format("%s: failed to execute [%s]", shardRouting, internalRequest.request()), e); } perform(e); } private void perform(@Nullable final Exception currentFailure) { Exception lastFailure = this.lastFailure; if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) { lastFailure = currentFailure; this.lastFailure = currentFailure; } final ShardRouting shardRouting = shardIt.nextOrNull(); if (shardRouting == null) { Exception failure = lastFailure; if (failure == null || isShardNotAvailableException(failure)) { failure = new NoShardAvailableActionException( null, LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure ); } else { logger.debug(() -> format("%s: failed to execute [%s]", null, internalRequest.request()), failure); } listener.onFailure(failure); return; } DiscoveryNode node = nodes.get(shardRouting.currentNodeId()); if (node == null) { onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId())); } else { internalRequest.request().internalShardId = shardRouting.shardId(); if (logger.isTraceEnabled()) { logger.trace( "sending request [{}] to shard [{}] on node [{}]", internalRequest.request(), internalRequest.request().internalShardId, node ); } final Writeable.Reader<Response> reader = getResponseReader(); transportService.sendRequest( node, transportShardAction, internalRequest.request(), new TransportResponseHandler<Response>() { @Override public Response read(StreamInput in) throws IOException { return reader.read(in); } @Override public void handleResponse(final Response response) { listener.onResponse(response); } @Override public void handleException(TransportException exp) { onFailure(shardRouting, exp); } } ); } } } private class TransportHandler implements TransportRequestHandler<Request> { @Override public void messageReceived(Request request, final TransportChannel channel, Task task) throws Exception { // if we have a local operation, execute it on a thread since we don't spawn execute(task, request, new ChannelActionListener<>(channel, actionName, request)); } } private class ShardTransportHandler implements TransportRequestHandler<Request> { @Override public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { if (logger.isTraceEnabled()) { logger.trace("executing [{}] on shard [{}]", request, request.internalShardId); } asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request)); } } /** * Internal request class that gets built on each node. Holds the original request plus additional info. */ protected class InternalRequest { final Request request; final String concreteIndex; InternalRequest(Request request, String concreteIndex) { this.request = request; this.concreteIndex = concreteIndex; } public Request request() { return request; } public String concreteIndex() { return concreteIndex; } } protected String getExecutor(Request request, ShardId shardId) { return executor; } }View Code
跟读代码,发现由 transportService 向数据节点 sendRequest 请求数据,因为调试环境只有一个节点,协调节点和数据节点都是自身,这个请求的处理在当前节点,但是我们可以更进一步来看。
es 使用 netty 监听 2 个端口,http 的 9200 端口,tcp 的 9300 端口,http 用于处理来自外部的请求,tcp 处理来自节点之间的请求,上述的 TransportAction 处理 http 请求,而 TransportService 则处理 tcp 请求。
回到我们的调用栈,数据节点接收到请求后,自然走的是 netty 的 handler:
1 final class Netty4MessageChannelHandler extends ChannelDuplexHandler { 2 3 private final Netty4Transport transport; 4 5 private final Queue<WriteOperation> queuedWrites = new ArrayDeque<>(); 6 7 private WriteOperation currentWrite; 8 private final InboundPipeline pipeline; 9 10 Netty4MessageChannelHandler(PageCacheRecycler recycler, Netty4Transport transport) { 11 this.transport = transport; 12 final ThreadPool threadPool = transport.getThreadPool(); 13 final Transport.RequestHandlers requestHandlers = transport.getRequestHandlers(); 14 this.pipeline = new InboundPipeline(transport.getVersion(), transport.getStatsTracker(), recycler, threadPool::relativeTimeInMillis, 15 transport.getInflightBreaker(), requestHandlers::getHandler, transport::inboundMessage); 16 } 17 18 @Override 19 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 20 assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); 21 assert Transports.assertTransportThread(); 22 assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass(); 23 24 final ByteBuf buffer = (ByteBuf) msg; 25 Netty4TcpChannel channel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get(); 26 final BytesReference wrapped = Netty4Utils.toBytesReference(buffer); 27 try (ReleasableBytesReference reference = new ReleasableBytesReference(wrapped, buffer::release)) { 28 pipeline.handleBytes(channel, reference); 29 } 30 } 31 32 @Override 33 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 34 assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); 35 ExceptionsHelper.maybeDieOnAnotherThread(cause); 36 final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class); 37 final Throwable newCause = unwrapped != null ? unwrapped : cause; 38 Netty4TcpChannel tcpChannel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get(); 39 if (newCause instanceof Error) { 40 transport.onException(tcpChannel, new Exception(newCause)); 41 } else { 42 transport.onException(tcpChannel, (Exception) newCause); 43 } 44 } 45 46 @Override 47 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { 48 assert msg instanceof ByteBuf; 49 assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); 50 final boolean queued = queuedWrites.offer(new WriteOperation((ByteBuf) msg, promise)); 51 assert queued; 52 assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); 53 } 54 55 @Override 56 public void channelWritabilityChanged(ChannelHandlerContext ctx) { 57 assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); 58 if (ctx.channel().isWritable()) { 59 doFlush(ctx); 60 } 61 ctx.fireChannelWritabilityChanged(); 62 } 63 64 @Override 65 public void flush(ChannelHandlerContext ctx) { 66 assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); 67 if (doFlush(ctx) == false) { 68 ctx.flush(); 69 } 70 } 71 72 @Override 73 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 74 assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); 75 doFlush(ctx); 76 Releasables.closeExpectNoException(pipeline); 77 super.channelInactive(ctx); 78 } 79 80 private boolean doFlush(ChannelHandlerContext ctx) { 81 assert ctx.executor().inEventLoop(); 82 final Channel channel = ctx.channel(); 83 if (channel.isActive() == false) { 84 failQueuedWrites(); 85 return false; 86 } 87 boolean needsFlush = true; 88 while (channel.isWritable()) { 89 if (currentWrite == null) { 90 currentWrite = queuedWrites.poll(); 91 } 92 if (currentWrite == null) { 93 break; 94 } 95 final WriteOperation write = currentWrite; 96 final int readableBytes = write.buf.readableBytes(); 97 final int bufferSize = Math.min(readableBytes, 1 << 18); 98 final int readerIndex = write.buf.readerIndex(); 99 final boolean sliced = readableBytes != bufferSize; 100 final ByteBuf writeBuffer; 101 if (sliced) { 102 writeBuffer = write.buf.retainedSlice(readerIndex, bufferSize); 103 write.buf.readerIndex(readerIndex + bufferSize); 104 } else { 105 writeBuffer = write.buf; 106 } 107 final ChannelFuture writeFuture = ctx.write(writeBuffer); 108 needsFlush = true; 109 if (sliced == false) { 110 currentWrite = null; 111 writeFuture.addListener(future -> { 112 assert ctx.executor().inEventLoop(); 113 if (future.isSuccess()) { 114 write.promise.trySuccess(); 115 } else { 116 write.promise.tryFailure(future.cause()); 117 } 118 }); 119 } else { 120 writeFuture.addListener(future -> { 121 assert ctx.executor().inEventLoop(); 122 if (future.isSuccess() == false) { 123 write.promise.tryFailure(future.cause()); 124 } 125 }); 126 } 127 if (channel.isWritable() == false) { 128 // try flushing to make channel writable again, loop will only continue if channel becomes writable again 129 ctx.flush(); 130 needsFlush = false; 131 } 132 } 133 if (needsFlush) { 134 ctx.flush(); 135 } 136 if (channel.isActive() == false) { 137 failQueuedWrites(); 138 } 139 return true; 140 } 141 142 private void failQueuedWrites() { 143 if (currentWrite != null) { 144 final WriteOperation current = currentWrite; 145 currentWrite = null; 146 current.failAsClosedChannel(); 147 } 148 WriteOperation queuedWrite; 149 while ((queuedWrite = queuedWrites.poll()) != null) { 150 queuedWrite.failAsClosedChannel(); 151 } 152 } 153 154 private static final class WriteOperation { 155 156 private final ByteBuf buf; 157 158 private final ChannelPromise promise; 159 160 WriteOperation(ByteBuf buf, ChannelPromise promise) { 161 this.buf = buf; 162 this.promise = promise; 163 } 164 165 void failAsClosedChannel() { 166 promise.tryFailure(new ClosedChannelException()); 167 buf.release(); 168 } 169 } 170 }View Code
数据节点处理请求的逻辑重新回到了具体的 TransportAction 中,这里是
org.elasticsearch.action.support.single.shard.TransportSingleShardAction.ShardTransportHandler
最终到了查找文档的环节
//org.elasticsearch.action.get.TransportGetAction#shardOperation @Override protected GetResponse shardOperation(GetRequest request, ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); if (request.refresh() && request.realtime() == false) { indexShard.refresh("refresh_flag_get"); } GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(), request.realtime(), request.version(), request.versionType(), request.fetchSourceContext()); return new GetResponse(result); }
数据节点将 GetRespose 返回给协调节点,协调节点需要对数据进行组装后,最终返回给客户端。
过程看似复杂,需要抓住主路:TransportAction 中同时提供了 netty 客户端和 netty 服务端的代码,协调节点发起请求的代码在此处,数据节点处理请求的代码也在此处,导致于跟读代码时似乎绕了一圈。
标签:请求,void,request,private,listener,protected,rest,final,es From: https://www.cnblogs.com/allenwas3/p/17933141.html