1、怎么把业务数据存入es
连接ES
@Service @Slf4j public class ElasticConnectUtil { @Value("${elastic.host}") private String elasticHost; @Value("${elastic.port}") private Integer elasticPort; private ElasticsearchClient client = null; public ElasticsearchClient getClient(){ // ElasticsearchClient client = null ; if(null != client){ return client; } try{ RestClient restClient = RestClient.builder(new HttpHost(elasticHost,elasticPort)).build(); ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); client = new ElasticsearchClient(transport); }catch (Exception e){ log.error("连接elastic失败:",e); throw new BaseException("连接elastic失败:",e); } return client; } }
保存数据到es
public void saveData(String index,String id,String data){ Reader reader = new StringReader(data); IndexRequest<JsonData> indexRequest = IndexRequest.of(a -> a.index(index).id(id).withJson(reader)); IndexResponse response = null; try { response = elasticConnectUtil.getClient().index(indexRequest); }catch (Exception e){ String errMsg = "新增数据失败:索引:"+index+",id:"+id+",数据:"+data; log.error(errMsg,e); throw new BaseException(errMsg,e); } log.info("新增数据成功:索引:"+index+",id:"+id+",数据:"+data+",返回结果:"+response.toString()); }
2、select t.* from ord_send_info t where t.is_valid=1 and (t.corp_id in () or t.corp_id in () ); 原来需要这样查询数据的地方,改到es里,提高响应速度,特别是大数据量的时候,千万及亿级时
报错:"error": "no handler found for uri [/customer/doc/1?pretty=&pretty=true] and method [POST]"
解决:好像是语法问题,下图是官网上的的
正确的语法是:
POST /website/_doc { "title":"My first blog entry", "text":"Just trying this out..." } get /website/_doc/dWIsVIMBqVRaOii8I0Q- get /_search PUT /test2/_doc/1 { "name":"李华", "age":18 } PUT /customer/_doc/1 { "name":"John Doe" } GET /customer/_doc/1 post /customer/_update/1 { "doc":{"name":"John Doe-update"} }
如下图:官网中,中文的文档都是基于2.x版本的,,
官网关于8.4的语法示例:
自己弄个浏览器翻译插件翻一下
动态映射|弹性搜索指南 [8.4] |弹性的 (elastic.co)
显式映射|弹性搜索指南 [8.4] |弹性的 (elastic.co)
springboot项目中使用 es,官方文档参考: 安装|弹性搜索 Java API 客户端 [8.4] |弹性的 (elastic.co)
测试过程中发现一个问题啊
代码如下:
@Test public void saveDataTest(){ String id = "eWJ7XoMBqVRaOii8nkQn"; String index = "products"; Product p = new Product(); p.setName("社保啊公积金"); p.setPrice(BigDecimal.valueOf(0.00006)); operations.saveData(index,id, JSONUtil.toJsonStr(p)); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } operations.search(index); }
以上仅是个单元测试啊,现象是保存数据之后立刻就去查询,是查询不到刚保存成功的那个结果的,但是如果保存成功之后延迟一秒再查询,就是能查到最新结果的,
这个问题不知道是不是跟ES的配置有关。
问题2:对象中有Long类型的id,转成json存到es后值比原来小1,转成string后再存值就不会变化
问题3:往同一个index存10w条数据时,当存了1000多条时,报错
2022-10-09 18:04:30.775 ERROR 19956 --- [l-1600-thread-1] o.a.h.i.n.c.InternalHttpAsyncClient : I/O reactor terminated abnormally org.apache.http.nio.reactor.IOReactorException: Failure opening selector at org.apache.http.impl.nio.reactor.AbstractIOReactor.<init>(AbstractIOReactor.java:103) ~[httpcore-nio-4.4.12.jar:4.4.12] at org.apache.http.impl.nio.reactor.BaseIOReactor.<init>(BaseIOReactor.java:85) ~[httpcore-nio-4.4.12.jar:4.4.12] at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:321) ~[httpcore-nio-4.4.12.jar:4.4.12] at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221) ~[httpasyncclient-4.1.4.jar:4.1.4] at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64) ~[httpasyncclient-4.1.4.jar:4.1.4] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_201] Caused by: java.io.IOException: Unable to establish loopback connection at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:94) ~[na:1.8.0_201] at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:61) ~[na:1.8.0_201] at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_201] at sun.nio.ch.PipeImpl.<init>(PipeImpl.java:171) ~[na:1.8.0_201] at sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:50) ~[na:1.8.0_201] at java.nio.channels.Pipe.open(Pipe.java:155) ~[na:1.8.0_201] at sun.nio.ch.WindowsSelectorImpl.<init>(WindowsSelectorImpl.java:127) ~[na:1.8.0_201] at sun.nio.ch.WindowsSelectorProvider.openSelector(WindowsSelectorProvider.java:44) ~[na:1.8.0_201] at java.nio.channels.Selector.open(Selector.java:227) ~[na:1.8.0_201] at org.apache.http.impl.nio.reactor.AbstractIOReactor.<init>(AbstractIOReactor.java:101) ~[httpcore-nio-4.4.12.jar:4.4.12] ... 5 common frames omitted Caused by: java.net.SocketException: No buffer space available (maximum connections reached?): connect at sun.nio.ch.Net.connect0(Native Method) ~[na:1.8.0_201] at sun.nio.ch.Net.connect(Net.java:454) ~[na:1.8.0_201] at sun.nio.ch.Net.connect(Net.java:446) ~[na:1.8.0_201] at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648) ~[na:1.8.0_201] at java.nio.channels.SocketChannel.open(SocketChannel.java:189) ~[na:1.8.0_201] at sun.nio.ch.PipeImpl$Initializer$LoopbackConnector.run(PipeImpl.java:127) ~[na:1.8.0_201] at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:76) ~[na:1.8.0_201] ... 14 common frames omittedy
问题3原因:连接一次es就创建了一个ElasticsearchClient对象,创建的多了短时间未释放掉,就把资源占满了,
问题4:报错 co.elastic.clients.elasticsearch._types.ElasticsearchException: [es/search] failed: [search_phase_execution_exception] all shards failed
问题4原因:查询语法及类型不对导致,参考:身份证|弹性搜索指南 [8.4] |弹性的 (elastic.co) 这个例子里查的string类型的数据,实际需求查的是Long类型的数据,
错误代码:
get /_search { "query":{ "bool": { "should":[ {"match":{"ord_send_info.corpId":[1,2,3]}}, {"match":{"ord_send_info.chgCorpId":[1,2,3]}} ] } } }
正确代码如下:
get /ord_send_info/_search { "query":{ "bool": { "should":[ {"match":{"corpId":2}}, {"match":{"corpId":2}}, {"match":{"corpId":2}}, {"match":{"chgCorpId":2}}, {"match":{"chgCorpId":2}}, {"match":{"chgCorpId":2}} ] } } }
依照错误代码写的java查询代码就报了上述错误,按照正确的代码重写java代码就可以了
代码参考如下:
public void search(String index,List<Integer> corpIdList){ List<Query> queryList = new ArrayList<>(); for(int i:corpIdList){ Query qCorpId = Query.of(q -> q.match(m->m.field("corpId").query(i))); Query qChgCorpId = Query.of(q -> q.match(m -> m.field("chgCorpId").query(i))); Query qInsSuppId = Query.of(q -> q.match(m -> m.field("insSuppId").query(i))); Query qAccuSuppId = Query.of(q -> q.match(m -> m.field("accuSuppId").query(i))); queryList.add(qCorpId); queryList.add(qChgCorpId); queryList.add(qInsSuppId); queryList.add(qAccuSuppId); } Long startTime = System.currentTimeMillis(); SearchRequest searchRequest = SearchRequest.of(s -> s.index(index).query(q -> q.bool(b -> b.should(queryList)))); SearchResponse<OrdSendInfoEs> searchResponse = null; try { searchResponse = elasticConnectUtil.getClient().search(searchRequest,OrdSendInfoEs.class); } catch (IOException e) { log.error("批量查询异常",e); } log.info("耗时:"+(System.currentTimeMillis() - startTime)); log.info("查询到总数:"+searchResponse.hits().total().value()); }
select t.* from ord_send_info t where t.is_valid=1 and (t.corp_id in () or t.corp_id in () );
这里就解决了最初的问题,解决原来在数据库查询慢的问题
问题5:最大返回1w条
标签:201,java,8.4,Elastic,解决方案,1.8,na,id,nio From: https://www.cnblogs.com/feiye512/p/16779006.html