首页 > 其他分享 >Elasticsearch 实战

Elasticsearch 实战

时间:2022-08-21 21:57:31浏览次数:160  
标签:实战 msgSearchVO keyword 报文 private Elasticsearch new type

需求

假设现在有这么一个需求,系统接了很多的报文,需要提供全文检索,为了简化,报文目前只有类型,流水号,内容这三个字段。

索引设计

建立msg索引,映射规则如下

PUT /msg
{
	"mappings" : {
      "properties" : {
        "traceNo" : {
          "type" : "keyword"
        },
        "type" : {
          "type" : "keyword"
        },
        "content" : {
          "type" : "text"
        }
      }
    }
}

代码实现

交易VO

@Getter
@Setter
public class TradeVO {

    /**
     * 交易编号
     */
    private String tradeNo;

    /**
     * 成交金额
     */
    private Integer matchAmt;

    /**
     * 成交数量
     */
    private Integer matchQty;

}

报文实体

package com.wangtao.msgsearch.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@NoArgsConstructor
@AllArgsConstructor
@Data
public class Msg {

    /**
     * 0001: 交易所报文
     * 0002: 银行间报文
     * 0003: 场外报文
     * es类型: keyword
     */
    private String type;

    /**
     * 流水号
     * es类型: keyword
     */
    private String traceNo;

    /**
     * 报文内容
     * es类型: text
     */
    private String content;

}

查询VO

package com.wangtao.msgsearch.vo;

import lombok.Data;
@Data
public class MsgSearchVO {

    /**
     * 报文类型
     */
    private String type;

    /**
     * 流水号
     */
    private String traceNo;

    /**
     * 关键字, 从报文内容搜索
     */
    private String keyword;

    private Integer pageNo = 1;

    private Integer pageSize = 20;
}

索引创建

/**
 * 创建索引并指定映射
 */
@GetMapping("/createIndex")
public void createIndex() throws IOException {
    TypeMapping typeMapping = new TypeMapping.Builder()
        .properties("type", p -> p.keyword(k -> k))
        .properties("traceNo", p -> p.keyword(k -> k))
        .properties("content", p -> p.text(t -> t))
        .build();
    CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
        .index(INDEX_NAME)
        .mappings(typeMapping)
        .build();
    BooleanResponse booleanResponse = elasticsearchClient.indices().exists(e -> e.index(INDEX_NAME));
    if (booleanResponse.value()) {
        // 如果存在则删除
        elasticsearchClient.indices().delete(d -> d.index(INDEX_NAME));
    }
    elasticsearchClient.indices().create(createIndexRequest);
}

插入测试数据

/**
 * 准备数据
 */
private List<Msg> generateData() {
    Random random = new Random();
    List<Msg> msgList = new ArrayList<>();
    for (int i = 1; i <= 50; i++) {
        TradeVO tradeVO = new TradeVO();
        tradeVO.setTradeNo("T20220821" + String.format("%03d", i));
        tradeVO.setMatchAmt(random.nextInt(10000));
        tradeVO.setMatchQty(tradeVO.getMatchAmt());

        Msg msg = new Msg();
        msg.setTraceNo("M20220821" + String.format("%03d", i));
        msg.setType(MsgTypeEnum.ofOrdinal(i % 3).getValue());
        try {
            msg.setContent(objectMapper.writeValueAsString(tradeVO));
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        msgList.add(msg);
    }
    return msgList;
}

/**
 * 批量插入数据
 */
@GetMapping("/addData")
public List<Msg> addData() throws IOException {
    List<Msg> msgList = generateData();
    List<BulkOperation> bulkOperations = new ArrayList<>();
    for (int i = 1; i <= msgList.size(); i++) {
        Msg msg = msgList.get(i - 1);
        String finalI = i + "";
        BulkOperation bulkOperation = new BulkOperation.Builder()
            .index(o -> o.id(finalI).document(msg))
            .build();
        bulkOperations.add(bulkOperation);
    }
    BulkRequest bulkRequest = new BulkRequest.Builder()
        .index(INDEX_NAME)
        .operations(bulkOperations)
        .build();
    BulkResponse response = elasticsearchClient.bulk(bulkRequest);
    if (response.errors()) {
        log.error("batch insert has error!");
    }
    return msgList;
}

根据条件搜索

@PostMapping("/searchByCondition")
public List<Msg> searchByCondition(@RequestBody MsgSearchVO msgSearchVO) throws IOException {
    /*
     * 类型为keyword时, 记录建立倒排索引时不会进行分词
     * 查询时使用term关键字, 条件不会被分词
     *
     * 类型为text时, 记录建立倒排索引时会进行分词
     * 查询时使用match关键字, 条件也会被分词
     */
    log.info("args: {}", msgSearchVO);
    Integer from = (msgSearchVO.getPageNo() - 1) * msgSearchVO.getPageSize();
    List<Query> andQueryList = new ArrayList<>();
    if (StringUtils.isNotBlank(msgSearchVO.getType())) {
        Query byType = new TermQuery.Builder()
            .field("type")
            .value(msgSearchVO.getType())
            .build()._toQuery();
        andQueryList.add(byType);
    }
    if (StringUtils.isNotBlank(msgSearchVO.getTraceNo())) {
        Query byTraceNo = new TermQuery.Builder()
            .field("traceNo")
            .value(msgSearchVO.getTraceNo())
            .build()._toQuery();
        andQueryList.add(byTraceNo);
    }
    if (StringUtils.isNotBlank(msgSearchVO.getKeyword())) {
        Query byContent = new MatchQuery.Builder()
            .field("content")
            .query(msgSearchVO.getKeyword())
            .build()._toQuery();
        andQueryList.add(byContent);
    }
    Query query = new BoolQuery.Builder()
        .must(andQueryList)
        .build()._toQuery();
    SearchResponse<Msg> response = elasticsearchClient.search(
        s -> s.index(INDEX_NAME).from(from).size(msgSearchVO.getPageSize())
         .query(query),
         Msg.class
    );
    List<Hit<Msg>> hits = response.hits().hits();
    assert response.hits().total() != null;
    log.info("page count: {}", hits.size());
    log.info("total count: {}", response.hits().total().value());
    return hits.stream().map(Hit::source).collect(Collectors.toList());
}

源码

https://github.com/wangtaoj/elasticsearch-learning

标签:实战,msgSearchVO,keyword,报文,private,Elasticsearch,new,type
From: https://www.cnblogs.com/wt20/p/16610987.html

相关文章