首页 > 其他分享 >graylog 索引数据查询处理简单说明

graylog 索引数据查询处理简单说明

时间:2022-11-01 20:12:56浏览次数:93  
标签:search graylog2 查询处理 索引 graylog https query searchJob final

参考请求地址

从ui 看到的,类似一个job 的执行,支持异步以及同步数据查询处理,还包含了相对时间、绝对时间查询的

/api/views/search/<id>/execute
/api/views/search
/api/views/search/metadata

查询适配

  • 参考实现
    主要是对于es6 以及es7 不同版本的兼容处理,对于查询支持了scroll 模式以及普通的search 模式

 

 

  • 查询处理
    如下图,对于查询可能会包含多个索引
 
public SearchResult search(Set<String> indices, Set<IndexRange> indexRanges, SearchesConfig config) {
    final SearchSourceBuilder searchSourceBuilder = searchRequestFactory.create(config);
 
    if (indexRanges.isEmpty()) {
        return SearchResult.empty(config.query(), searchSourceBuilder.toString());
    }
    // 构造多索引的查询
    final SearchRequest searchRequest = new SearchRequest(indices.toArray(new String[0]))
            .source(searchSourceBuilder);
    final SearchResponse searchResult = client.search(searchRequest, "Unable to perform search query");
 
    final List<ResultMessage> resultMessages = extractResultMessages(searchResult);
    final long totalResults = searchResult.getHits().getTotalHits().value;
    final long tookMs = searchResult.getTook().getMillis();
    final String builtQuery = searchSourceBuilder.toString();
    return new SearchResult(resultMessages, totalResults, indexRanges, config.query(), builtQuery, tookMs);
}

查询任务执行

实际上是基于了多线程支持模式,graylog 同时提供了同步以及异步模式,具体执行都是SearchExecutor 提供的
具体会使用QueryEngine提供的查询能力,同时一次查询可能会处理多个stream 所以需要遍历查询

 
public SearchJob execute(SearchJob searchJob) {
        searchJob.getSearch().queries().forEach(query -> searchJob.addQueryResultFuture(query.id(),
                // generate and run each query, making sure we never let an exception escape
                // if need be we default to an empty result with a failed state and the wrapped exception
                // 使用了CompletableFuture
                CompletableFuture.supplyAsync(() -> prepareAndRun(searchJob, query), queryPool)
                        .handle((queryResult, throwable) -> {
                            if (throwable != null) {
                                final Throwable cause = throwable.getCause();
                                final SearchError error;
                                if (cause instanceof SearchException) {
                                    error = ((SearchException) cause).error();
                                } else {
                                    error = new QueryError(query, cause);
                                }
                                LOG.debug("Running query {} failed: {}", query.id(), cause);
                                searchJob.addError(error);
                                return QueryResult.failedQueryWithError(query, error);
                            }
                            return queryResult;
                        })
        ));
 
        searchJob.getSearch().queries().forEach(query -> {
            final CompletableFuture<QueryResult> queryResultFuture = searchJob.getQueryResultFuture(query.id());
            if (!queryResultFuture.isDone()) {
                // this is not going to throw an exception, because we will always replace it with a placeholder "FAILED" result above
                final QueryResult result = queryResultFuture.join();
 
            } else {
                LOG.debug("[{}] Not generating query for query {}", defaultIfEmpty(query.id(), "root"), query);
            }
        });
 
        LOG.debug("Search job {} executing", searchJob.getId());
        return searchJob.seal();
    }

具体的查询结果是通过SearchJob 的results 获取的(获取数据异步线程的数据结果,利用了streamex 方便的stream 工具)

@JsonProperty("results")
public Map<String, QueryResult> results() {
    return EntryStream.of(queryResults)
            .mapValues(future -> future.getNow(QueryResult.incomplete()))
            .filterKeys(queryId -> !queryId.isEmpty()) // the root query result is meaningless, so we don't include it here
            .filterValues(r -> (r.state() == QueryResult.State.COMPLETED) || (r.state() == QueryResult.State.FAILED))
            .toMap();
}

说明

尽管scroll 是一个请求大量数据的模式,但是es 并不是很推荐了,更推荐的是使用search-after,同时需要基于pit 进行更高效的进行数据查询处理

参考资料

https://docs.graylog.org/docs/elasticsearch
https://docs.graylog.org/docs/index-model
https://docs.graylog.org/docs/rest-api
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/indexer/searches/SearchesAdapter.java
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog2/indexer/searches/Searches.java
https://www.elastic.co/guide/en/elasticsearch/reference/7.17/paginate-search-results.html#search-after
https://www.elastic.co/guide/en/elasticsearch/reference/7.17/point-in-time-api.html
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog/plugins/views/search/engine/QueryEngine.java
https://github.com/Graylog2/graylog2-server/blob/626be1f0d80506705b5ba41fbea33c2ec0164bc0/graylog2-server/src/main/java/org/graylog/plugins/views/search/rest/SearchResource.java

标签:search,graylog2,查询处理,索引,graylog,https,query,searchJob,final
From: https://www.cnblogs.com/rongfengliang/p/16848961.html

相关文章

  • 企业网站如何快速被搜索引擎收录
    对SEO推广很多人并不陌生,很多站长遇到类似的问题,就是网站的排名没有,特别是一个刚刚接手的新站,网站排名都没有。因此,要怎样才可实现新站排名和收录增长?开源字节将与大家......
  • SQL查询索引失效之谜
    1、使用!=或者<> SELECT*FROMuserWHEREname!='andrew';2、字段类型不同3、索引列使用函数如:SELECT*FROMuserWHEREDATE(create_at)='2020-09-03';如果使用......
  • 面试之基础算法题:求一个数字在给定的已排序数组中出现的起始、终止索引号(Java版)
    题目给定一个升序的整数数组,查找某一个值在数组中出现的索引号,例如,输入数组​​[2,3,3,4,4,5]​​​;查找的数是3,则返回​​[1,2]​​。时间复杂度要求为O(logN)。思路......
  • MongoDB 索引笔记
    索引(Index)合适的索引可以大大提高数据库搜索性能集合层面的索引支持复合键索引可以对多个字段进行排序复合索引:(A,B,C)可以支持的索引:{A},{A,B},{A,B,C}......
  • graylog 索引模版处理
    graylog默认分词只支持对应几个固定的字段,如果需要自定义索引信息,就可以使用模版能力,默认包含了一个graylog-internal,order为-1但是我们可以扩展默认索引信息查询信......
  • 索引和联合索引
    (1)对一张表来说,如果有一个复合索引on  (col1,col2),就没有必要同时建立一个单索引oncol1。(2)如果查询条件需要,可以在已有单索引 oncol1的情况下,添加复合索引on  (c......
  • tf.gather,取指定维度多个索引的数据
    tensorflow和numpy在数据处理上语法相似但又不完全一样,比如在numpy中想取指定维度的多个指定索引所指向的数据时,直接用一个列表保存索引就能直接取,比如:#b的shape为[2,3,2......
  • 总结一下使用索引的一些建议
    在区分度高的字段上建立索引可以有效的使用索引,区分度太低,无法有效的利用索引,可能需要扫描所有数据页,此时和不使用索引差不多联合索引注意最左匹配原则:必须按照从左......
  • MYSQL索引
    索引的优点索引大大减少了服务器需要扫描的数据量索引可以帮助服务器避免排序和临时表索引可以将随机I/O变成顺序I/O索引只要帮助存储引擎快速查找到记录,带来......
  • 索引接口汇总整理
    ##Neo4j###索引-Btreeindex-Rangeindex-Pointindex-Lookupindex-FullTextindex-Textindex其中Btree为当前默认索引,在neo4j的文档中rangeindex与pointin......