参考请求地址
从ui 看到的,类似一个job 的执行,支持异步以及同步数据查询处理,还包含了相对时间、绝对时间查询的
/api/views/search/<id>/execute
/api/views/search
/api/views/search/metadata
查询适配
- 参考实现
主要是对于es6 以及es7 不同版本的兼容处理,对于查询支持了scroll 模式以及普通的search 模式
- 查询处理
如下图,对于查询可能会包含多个索引
public SearchResult search(Set<String> indices, Set<IndexRange> indexRanges, SearchesConfig config) {
final SearchSourceBuilder searchSourceBuilder = searchRequestFactory.create(config);
if (indexRanges.isEmpty()) {
return SearchResult.empty(config.query(), searchSourceBuilder.toString());
}
// 构造多索引的查询
final SearchRequest searchRequest = new SearchRequest(indices.toArray(new String[0]))
.source(searchSourceBuilder);
final SearchResponse searchResult = client.search(searchRequest, "Unable to perform search query");
final List<ResultMessage> resultMessages = extractResultMessages(searchResult);
final long totalResults = searchResult.getHits().getTotalHits().value;
final long tookMs = searchResult.getTook().getMillis();
final String builtQuery = searchSourceBuilder.toString();
return new SearchResult(resultMessages, totalResults, indexRanges, config.query(), builtQuery, tookMs);
}
查询任务执行
实际上是基于了多线程支持模式,graylog 同时提供了同步以及异步模式,具体执行都是SearchExecutor 提供的
具体会使用QueryEngine提供的查询能力,同时一次查询可能会处理多个stream 所以需要遍历查询
public SearchJob execute(SearchJob searchJob) {
searchJob.getSearch().queries().forEach(query -> searchJob.addQueryResultFuture(query.id(),
// generate and run each query, making sure we never let an exception escape
// if need be we default to an empty result with a failed state and the wrapped exception
// 使用了CompletableFuture
CompletableFuture.supplyAsync(() -> prepareAndRun(searchJob, query), queryPool)
.handle((queryResult, throwable) -> {
if (throwable != null) {
final Throwable cause = throwable.getCause();
final SearchError error;
if (cause instanceof SearchException) {
error = ((SearchException) cause).error();
} else {
error = new QueryError(query, cause);
}
LOG.debug("Running query {} failed: {}", query.id(), cause);
searchJob.addError(error);
return QueryResult.failedQueryWithError(query, error);
}
return queryResult;
})
));
searchJob.getSearch().queries().forEach(query -> {
final CompletableFuture<QueryResult> queryResultFuture = searchJob.getQueryResultFuture(query.id());
if (!queryResultFuture.isDone()) {
// this is not going to throw an exception, because we will always replace it with a placeholder "FAILED" result above
final QueryResult result = queryResultFuture.join();
} else {
LOG.debug("[{}] Not generating query for query {}", defaultIfEmpty(query.id(), "root"), query);
}
});
LOG.debug("Search job {} executing", searchJob.getId());
return searchJob.seal();
}
具体的查询结果是通过SearchJob 的results 获取的(获取数据异步线程的数据结果,利用了streamex 方便的stream 工具)
@JsonProperty("results")
public Map<String, QueryResult> results() {
return EntryStream.of(queryResults)
.mapValues(future -> future.getNow(QueryResult.incomplete()))
.filterKeys(queryId -> !queryId.isEmpty()) // the root query result is meaningless, so we don't include it here
.filterValues(r -> (r.state() == QueryResult.State.COMPLETED) || (r.state() == QueryResult.State.FAILED))
.toMap();
}
说明
尽管scroll 是一个请求大量数据的模式,但是es 并不是很推荐了,更推荐的是使用search-after,同时需要基于pit 进行更高效的进行数据查询处理
参考资料
https://docs.graylog.org/docs/elasticsearch
https://docs.graylog.org/docs/index-model
https://docs.graylog.org/docs/rest-api
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/indexer/searches/SearchesAdapter.java
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/indexer/searches/Searches.java
https://www.elastic.co/guide/en/elasticsearch/reference/7.17/paginate-search-results.html#search-after
https://www.elastic.co/guide/en/elasticsearch/reference/7.17/point-in-time-api.html
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog/plugins/views/search/engine/QueryEngine.java
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog/plugins/views/search/rest/SearchResource.java