一、前言:
好久没写笔记了,最近忙一个项目,用到ES查询,以往的笔记写ES都是搭建环境,用Kibana玩一玩
这次是直接调用API操作了,话不多说,进入主题
二、环境前提:
公司用的还是纯ElasticSearch的API库,并没有Spring-Data-ES的包装
ElasticSearch版本是7.3.1
这是封装的包:
<!-- es start --> <dependency> <groupId>cn.ymcd.comm</groupId> <artifactId>comm-elasticsearch</artifactId> <version>1.0.3</version> </dependency>
然后看下里面的ES依赖:
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.3.0</version><!--$NO-MVN-MAN-VER$--> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.3.0</version><!--$NO-MVN-MAN-VER$--> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>7.3.0</version><!--$NO-MVN-MAN-VER$--> </dependency> <dependency> <groupId>org.elasticsearch.plugin</groupId> <artifactId>transport-netty4-client</artifactId> <version>7.3.0</version><!--$NO-MVN-MAN-VER$--> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.3.0</version><!--$NO-MVN-MAN-VER$--> </dependency>
包源码只有一个客户端类:
类似JDBC的连接,提供主机,账户密码信息,调用客户端对象方法获取连接资源
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package cn.ymcd.comm.elasticsearch; import cn.ymcd.comm.base.log.LogFactory; import cn.ymcd.comm.base.log.YmcdLogger; import java.util.Arrays; import java.util.List; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; @Configuration @Component("esClient") public class EsRestClient implements AutoCloseable { private YmcdLogger logger = LogFactory.getLogger(this.getClass()); @Value("${elasticsearch.host-name}") private String hostName; @Value("${elasticsearch.port:9200}") private int port; @Value("${elasticsearch.cluster:}") private String cluster; @Value("${elasticsearch.userName:}") private String userName; @Value("${elasticsearch.password:}") private String password; protected RestHighLevelClient client; public EsRestClient() { } public RestHighLevelClient getEsClient() { RestClientBuilder builder = null; if (StringUtils.isNotBlank(this.cluster)) { this.logger.debug("connect to cluster server..."); List<HttpHost> esHosts = (List)Arrays.stream(this.cluster.split(",")).map(HttpHost::create).collect(Collectors.toList()); builder = RestClient.builder((HttpHost[])esHosts.toArray(new HttpHost[esHosts.size()])); } else { this.logger.debug("connect to single node server..."); builder = RestClient.builder(new HttpHost[]{new HttpHost(this.hostName, this.port)}); } if (StringUtils.isNotBlank(this.userName) && StringUtils.isNotBlank(this.password)) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(this.userName, this.password)); builder.setHttpClientConfigCallback(new HttpClientConfigCallback() { public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }); } this.client = new RestHighLevelClient(builder); this.logger.debug("connected to server!"); return this.client; } public void close() { if (this.client != null) { this.logger.debug("close es client..."); try { this.client.close(); } catch (Exception var2) { this.logger.error("close es client error!", var2); } } } }
三、API封装:
封装了,但是没完全封装
1、我要做一个翻页查询都没有,还得我自己加上去整一个,麻了
2、有提供一个ES的索引名称注解和泛型声明,为什么返回类型没有一个按泛型返回的,还得是自己写
package cn.ymcd.perception.common.service; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.sort.SortBuilder; import java.util.List; /** * ES接口 * * @projectName: perception-task-server * @author: panx * @date: 2023年09月12日 10:04 * @version: 1.0 */ public interface IEsBaseService<T> { /** * 基础查询 根据id查询 数据 * * @param entity * @param id * @return java.lang.String * @author panx * @createTime 2023/9/12 0012 17:14 */ String getById(T entity, String id); /** * 根据多个id 查询数据信息 * * @param entity * @param list * @return org.elasticsearch.action.search.SearchResponse * @author panx * @createTime 2023/9/12 0012 17:14 */ SearchResponse findByIdsList(T entity, List<String> list); /** * 查询所有信息 * * @param from * @param size * @param entity * @return org.elasticsearch.search.SearchHit[] * @author panx * @createTime 2023/9/12 0012 17:15 */ SearchHit[] queryAll(int from, int size, T entity); /** * 根据条件查询 * * @param indexName * @param page * @param boolQueryBuilder * @param highlightBuilder * @param sortBuilder * @return org.elasticsearch.action.search.SearchResponse * @author panx * @createTime 2023/9/12 0012 17:15 */ SearchResponse whereQuery(String indexName, Page page, BoolQueryBuilder boolQueryBuilder, HighlightBuilder highlightBuilder, SortBuilder sortBuilder); /** * @author OnCloud9 * @date 2023/9/14 13:36 * @description 翻页查询 * @param tClass * @param page * @param boolQueryBuilder * @param highlightBuilder * @param sortBuilder * @return com.baomidou.mybatisplus.extension.plugins.pagination.Page<T> */ <Entity> Page<Entity> pageQuery(Class<Entity> tClass, Page page, BoolQueryBuilder boolQueryBuilder, HighlightBuilder highlightBuilder, SortBuilder sortBuilder); /** * 设置高亮显示字段 * * @param fields * @return org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder * @author panx * @createTime 2023/9/12 0012 17:15 */ HighlightBuilder highlightBuilder(String... fields); /** * 获取排序规则 * * @param fieldSort * @param isDesc * @return org.elasticsearch.search.sort.SortBuilder * @author panx * @createTime 2023/9/12 0012 17:15 */ SortBuilder getSortBuilder(String fieldSort, Boolean isDesc); /** * 获取高亮显示的值 * * @param hit * @param field * @return java.lang.String * @author panx * @createTime 2023/9/12 0012 17:15 */ String getHighlightContent(SearchHit hit, String field); /** * 查询 索引中所有满足条件数据 游标 查询 * * @param response * @param restHighLevelClient * @return java.util.List<org.elasticsearch.search.SearchHits> * @author panx * @createTime 2023/9/12 0012 17:16 */ List<SearchHits> getAllData(SearchResponse response, RestHighLevelClient restHighLevelClient); /** * 获取ES中的总数 * * @param indexName * @param boolQueryBuilder * @return long * @author panx * @createTime 2023/9/12 0012 17:16 */ long getCount(String indexName, BoolQueryBuilder boolQueryBuilder); /** * 获取 注解索引名称 * * @param entity * @return java.lang.String * @author panx * @createTime 2023/9/12 0012 17:16 */ String getIndexName(T entity); /** * @author OnCloud9 * @date 2023/9/17 17:10 * @description 聚合查询 * @params [tClass, aggregationBuilder, resultName] * @return java.util.List<java.util.Map<java.lang.String,java.lang.String>> */ <Entity> List<Terms.Bucket> getAggregationQuery(Class<Entity> tClass, AggregationBuilder aggregationBuilder, String resultName); /** * @author OnCloud9 * @date 2023/9/18 15:40 * @description 条件聚合查询 * @params [tClass, boolQueryBuilder, aggregationBuilder, resultName] * @return java.util.List<org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket> */ <Entity> List<Terms.Bucket> getConditionAggregationQuery( Class<Entity> tClass, BoolQueryBuilder boolQueryBuilder, AggregationBuilder aggregationBuilder, String resultName ); }
首先是实体的索引名称注解:
我们项目的mysql表名 直接对应到es的索引名上,数据来源也是mysql推到es上面,统一规范了
package cn.ymcd.perception.base; import java.lang.annotation.*; /*** * 注解 索引信息 * @param * @return * @author gaof * @createTime 2020/3/17 17:05 * @version: 1.0 */ @Inherited @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) public @interface EsIndex { /*** * 索引名称 * @param * @return java.lang.String * @author gaof * @createTime 2020/3/17 17:06 * @version: 1.0 */ String indexName(); }
注解的获取方式,因为做ES的CRUD都需要知晓是在哪个索引下操作
所以每个方法必定需要索引的参数传入,
叼毛同事非要从对象反射过去找注解,我给他重载下,从字节对象读就行了
这里说下接口上的泛型声明很不好,因为我在写业务的时候发现并不是只有一个索引要操作,需要多个索引操作,这个泛型声明限定死了
所以我写的方法都改用方法泛型,这样调用的时候才支持不同的业务实体
@Override public String getIndexName(T entity) { EsIndex index = entity.getClass().getAnnotation(EsIndex.class); String indexName = index.indexName(); if (StringUtils.isBlank(indexName)) { logger.error("注解索引名称为空"); throw new ElasticsearchException("注解indexName(索引名称)为空"); } return indexName; } public <Entity> String getIndexName(Class<Entity> tClass) { EsIndex index = tClass.getAnnotation(EsIndex.class); String indexName = index.indexName(); if (StringUtils.isBlank(indexName)) { logger.error("注解索引名称为空"); throw new ElasticsearchException("注解indexName(索引名称)为空"); } return indexName; }
其它的就是如何操作API了
这里重点说下PageQuery这个方法,要解决几个问题:
1、怎么接收返回的结果,ES的结果叫命中对象,放在一个数组里面,存的是JSON串
这里根据入参的实体类字节对象,交给可以做JSON序列化的工具活化JSON给对象就行了,这里用的FastJson
2、解决翻页问题,这里我不想冗余代码了,所以直接在同事写的whereQuery基础上套参写
在查询前算好from + to的参数值,返回的结果集塞回Page翻页对象交出去,
可以不传Page对象,那我默认认为调用者需要查询全部记录,就按一般索引支持的最大记录数翻页
3、索引翻页问题,因为在2上面说过,索引存在一个最大记录数的限制,有可能这个索引存了一万五千条数据,但是翻页查询只能翻到前一万条数据
在这个封装的工具方法中可以使用getCount方法获取真实的总记录数,也可以通过查询响应的命中对象获取总共的命中数量
解决的方法可以参考下链接: https://zhuanlan.zhihu.com/p/489562200
无非就三种, 1 调参数加大、2 Scroll滚动查询、3 SearchAfter标记查询
而在我的业务场景就是把ES数据带到功能上,要翻页查询,经理说不能调参数,后面两种办法又不能实现分页功能
所以折中的办法就是不调整,查到1万位置,默认认为用户不需要再看后面的内容
@Override public String getById(T entity, String id) { String real = ""; String indexName = getIndexName(entity); SearchSourceBuilder builder = new SearchSourceBuilder(); SearchRequest request = new SearchRequest(indexName); builder.query(QueryBuilders.termQuery("id", id)); request.source(builder); SearchResponse response = null; try (RestHighLevelClient restHighLevelClient = getEsClient()) { response = restHighLevelClient.search(request, RequestOptions.DEFAULT); for (SearchHit hit : response.getHits()) { real = hit.getSourceAsString(); } } catch (IOException e) { logger.error("根据id查询数据获取索引异常", e); } return real; }
@Override public SearchResponse findByIdsList(T entity, List<String> list) { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); SearchRequest request = new SearchRequest(getIndexName(entity)); sourceBuilder.query(QueryBuilders.termsQuery("id", list)); request.source(sourceBuilder); SearchResponse response = null; try (RestHighLevelClient restHighLevelClient = getEsClient()) { response = restHighLevelClient.search(request, RequestOptions.DEFAULT); } catch (IOException e) { logger.error("根据多个id查询数据异常", e); } return response; }
@Override public SearchHit[] queryAll(int from, int size, T entity) { SearchRequest searchRequest = new SearchRequest(getIndexName(entity)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(); searchSourceBuilder.from(from); searchSourceBuilder.size(size); SearchHit[] hitsArr = null; try (RestHighLevelClient restHighLevelClient = getEsClient()) { searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); close(); SearchHits hits = searchResponse.getHits(); hitsArr = hits.getHits(); } catch (IOException e) { this.logger.error("查询数据失败", e); } return hitsArr; }
@Override public SearchResponse whereQuery(String indexName, Page page, BoolQueryBuilder boolQueryBuilder, HighlightBuilder highlightBuilder, SortBuilder sortBuilder) { if (StringUtils.isBlank(indexName)) { return null; } SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); // 设置查询条数 if (null != page) { Long current = page.getCurrent(); Long size = page.getSize(); searchSourceBuilder.from(current.intValue()); searchSourceBuilder.size(size.intValue()); } //设置需要排序的字段 if (null != sortBuilder) { searchSourceBuilder.sort(sortBuilder); } // 设置高亮,使用默认的highlighter高亮器 if (null != highlightBuilder) { searchSourceBuilder.highlighter(highlightBuilder); } searchSourceBuilder.query(boolQueryBuilder); try (RestHighLevelClient restHighLevelClient = getEsClient()) { searchRequest.source(searchSourceBuilder); logger.info("query ES where... indexName = " + indexName + ":" + searchSourceBuilder.toString()); return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); } catch (Exception e) { logger.error("查询ES数据信息失败", e); } return null; }
@Override public <Entity> Page<Entity> pageQuery(Class<Entity> tClass, Page page, BoolQueryBuilder boolQueryBuilder, HighlightBuilder highlightBuilder, SortBuilder sortBuilder) { String indexName = getIndexName(tClass); if (Objects.isNull(page)) page = new Page<>(1, 10000); Long current = page.getCurrent(); page.setCurrent((current - 1L) * page.getSize()); SearchResponse searchResponse = whereQuery(indexName, page, boolQueryBuilder, highlightBuilder, sortBuilder); SearchHits searchHits = searchResponse.getHits(); long value = searchResponse.getHits().getTotalHits().value; /* 超出最大记录数配置,按最大记录数返回 */ List<Entity> records = new ArrayList<>(); for (SearchHit searchHit : searchHits) { String recordJson = searchHit.getSourceAsString(); logger.info("recordJson " + recordJson); Entity t = JSON.parseObject(recordJson, tClass); records.add(t); } page.setRecords(records); page.setTotal(value); return page; }
@Override public HighlightBuilder highlightBuilder(String... fields) { if (null != fields && fields.length > 0) { HighlightBuilder highlightBuilder = new HighlightBuilder(); for (String field : fields) { highlightBuilder.field(field); } highlightBuilder.preTags("<span style=\"color:red;\">") .postTags("</span>"); return highlightBuilder; } return null; }
@Override public SortBuilder getSortBuilder(String fieldSort, Boolean isDesc) { if (Boolean.TRUE.equals(isDesc)) { return SortBuilders.fieldSort(fieldSort).order(SortOrder.DESC); } return SortBuilders.fieldSort(fieldSort).order(SortOrder.ASC); }
@Override public String getHighlightContent(SearchHit hit, String field) { if (StringUtils.isBlank(field)) { return null; } HighlightField highlightField = hit.getHighlightFields().get(field); StringBuilder sub = new StringBuilder(); if (null != highlightField) { Text[] contents = highlightField.getFragments(); if (null != contents) { for (Text t : contents) { sub.append(t); } } } return sub.toString(); }
@Override public List<SearchHits> getAllData(SearchResponse response, RestHighLevelClient restHighLevelClient) { boolean succeeded = false; List<SearchHits> hitList = new ArrayList<>(); try { String scrollId = response.getScrollId(); SearchHits searchHits = response.getHits(); hitList.add(searchHits); // 根据游标查询所有数据 while (searchHits.getHits() != null && searchHits.getHits().length > 0) { SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); scrollRequest.scroll(TimeValue.timeValueMillis(30)); response = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT); scrollId = response.getScrollId(); searchHits = response.getHits(); hitList.add(searchHits); } // 查询完清除游标 ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); clearScrollRequest.addScrollId(scrollId); ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); succeeded = clearScrollResponse.isSucceeded(); } catch (IOException e) { logger.error("统一方法索引查询所有数据发生异常", e); } if (!succeeded) { return new ArrayList<>(); } return hitList; }
@Override public long getCount(String indexName, BoolQueryBuilder boolQueryBuilder) { CountRequest countRequest = new CountRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(boolQueryBuilder); try (RestHighLevelClient restHighLevelClient = getEsClient()) { countRequest.source(searchSourceBuilder); CountResponse countResponse = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT); return countResponse.getCount(); } catch (Exception e) { logger.error("统计ES数据失败", e); } return 0L; }
@SuppressWarnings("Duplicates") @Override public <Entity> List<Terms.Bucket> getAggregationQuery(Class<Entity> tClass, AggregationBuilder aggregationBuilder, String resultName) { String indexName = getIndexName(tClass); SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.aggregation(aggregationBuilder); searchSourceBuilder.size(0); try (RestHighLevelClient restHighLevelClient = getEsClient()) { searchRequest.source(searchSourceBuilder); logger.info("query ES where... indexName = " + indexName + ":" + searchSourceBuilder.toString()); SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); Aggregations aggregations = searchResponse.getAggregations(); Terms terms = aggregations.get(resultName); List<? extends Terms.Bucket> buckets = terms.getBuckets(); List<Terms.Bucket> returnBuckets = new ArrayList<>(buckets.size()); returnBuckets.addAll(buckets); return returnBuckets; } catch (Exception e) { logger.error("查询ES数据信息失败", e); return Collections.emptyList(); } }
@SuppressWarnings("Duplicates") @Override public <Entity> List<Terms.Bucket> getConditionAggregationQuery(Class<Entity> tClass, BoolQueryBuilder boolQueryBuilder, AggregationBuilder aggregationBuilder, String resultN String indexName = getIndexName(tClass); SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.aggregation(aggregationBuilder); searchSourceBuilder.query(boolQueryBuilder); searchSourceBuilder.size(0); try (RestHighLevelClient restHighLevelClient = getEsClient()) { searchRequest.source(searchSourceBuilder); logger.info("query ES where... indexName = " + indexName + ":" + searchSourceBuilder.toString()); SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); Aggregations aggregations = searchResponse.getAggregations(); Terms terms = aggregations.get(resultName); List<? extends Terms.Bucket> buckets = terms.getBuckets(); List<Terms.Bucket> returnBuckets = new ArrayList<>(buckets.size()); returnBuckets.addAll(buckets); return returnBuckets; } catch (Exception e) { logger.error("查询ES数据信息失败", e); return Collections.emptyList(); } }
标签:return,String,indexName,org,ElasticSearch,elasticsearch,应用,new,Java From: https://www.cnblogs.com/mindzone/p/17722894.html