Elasticsearch 全文检索引擎复习笔记
Elasticsearch 是一个基于 Lucene 的搜索引擎。它提供了一个分布式、多租户的全文搜索引擎,能够为应用程序提供实时的、结构化和非结构化数据搜索。
ES 是用 Java 开发的,并作为 Apache 许可条款下的开源软件发布。它的特点包括:
-
分布式架构:Elasticsearch使用分布式架构,可以在多台机器上部署,提供水平扩展能力。
-
文档导向:Elasticsearch是文档导向的搜索引擎,每一个存储的记录都是一个文档,文档由多个字段组成。
-
实时搜索:Elasticsearch支持实时搜索,意味着当你添加、修改或删除文档,这些更改将立即反映在搜索结果中。
-
多租户:Elasticsearch支持多租户,意味着一个Elasticsearch集群可以被多个应用程序使用。
Elasticsearch 的主要用途包括全文搜索、结构化搜索、分析以及数据挖掘等。它常用于日志分析、实时分析、搜索引擎、数据仓库等应用场景。
初识 Elasticsearch
传统数据库使用的是正向索引,即为每个文档建立一个索引,并记录这个文档中包含哪些词条。这样,当用户搜索某个词条时,数据库需要扫描整个文档集合,检查每个文档是否包含这个词条。这种索引查找方式在数据量大时显得效率低下,因为它需要遍历整个文档集合。
相比之下Elasticsearch (ES) 使用倒排索引来支持快速搜索。倒排索引是一种数据结构,它为每个词条(即文档中的单词)建立一个索引,并记录这个词条出现在哪些文档中。这样,当用户搜索某个词条时,搜索引擎可以快速查找包含这个词条的文档。
总的来说,倒排索引的优势在于它可以大大缩短搜索的时间,使得搜索变得更加快速。它还提供了其他一些优势,比如可以支持多种搜索操作(如通配符搜索和布尔搜索),并且可以很方便地执行分析和聚合操作。
倒排索引中包含两部分内容:
- 词条词典(Term Dictionary):记录所有词条,以及词条与倒排列表(Posting List)之间的关系,会给词条创建索引,提高查询和插入效率
- 倒排列表(Posting List):记录词条所在的文档id、词条出现频率 、词条在文档中的位置等信息
我们可以统一地把 mysql 与 elasticsearch 的概念做一下对比:
MySQL | Elasticsearch | 说明 |
---|---|---|
Table | Index | 索引(index),就是文档的集合,类似数据库的表(table) |
Row | Document | 文档(Document),就是一条条的数据,类似数据库中的行(Row),文档都是JSON格式 |
Column | Field | 字段(Field),就是JSON文档中的字段,类似数据库中的列(Column) |
Schema | Mapping | Mapping(映射)是索引中文档的约束,例如字段类型约束。类似数据库的表结构(Schema) |
SQL | DSL | DSL是elasticsearch提供的JSON风格的请求语句,用来操作elasticsearch,实现CRUD |
另外,两者各自有自己擅长的业务:
- Mysql:擅长事务类型操作,可以确保数据的安全和一致性
- Elasticsearch:擅长海量数据的搜索、分析、计算
因此在企业中,往往是两者结合使用:
- 对安全性要求较高的写操作,使用 mysql 实现
- 对查询性能要求较高的搜索需求,使用 elasticsearch 实现
- 两者再基于某种方式,实现数据的同步,保证一致性
ES 及其 IK 分词器插件的安装可以参考这篇博客:Elasticsearch、IK分词器安装 (docker)。
一、索引库操作
索引库就类似数据库表,mapping 映射就类似表的结构。我们要向 es 中存储数据,必须先创建“库”和“表”。
Ⅰ、DSL语法操作
POST
mapping 是对索引库中文档的约束,常见的 mapping 属性包括:
- type:字段数据类型,常见的简单类型有:
- 字符串:text(可分词的文本)、keyword(精确值,例如:品牌、国家、ip地址)
- 数值:long、integer、short、byte、double、float、
- 布尔:boolean
- 日期:date
- 对象:object
- index:是否创建索引,默认为true
- analyzer:使用哪种分词器
- properties:该字段的子字段
例如下面的 json 文档:
{
"age": 21,
"weight": 52.1,
"isMarried": false,
"info": "黑马程序员Java讲师",
"email": "zy@itcast.cn",
"score": [99.1, 99.5, 98.9],
"name": {
"firstName": "云",
"lastName": "赵"
}
}
对应的每个字段映射(mapping):
- age:类型为 integer;参与搜索,因此需要index为true;无需分词器
- weight:类型为float;参与搜索,因此需要index为true;无需分词器
- isMarried:类型为boolean;参与搜索,因此需要index为true;无需分词器
- info:类型为字符串,需要分词,因此是text;参与搜索,因此需要index为true;分词器可以用ik_smart
- email:类型为字符串,但是不需要分词,因此是keyword;不参与搜索,因此需要index为false;无需分词器
- score:虽然是数组,但是我们只看元素的类型,类型为float;参与搜索,因此需要index为true;无需分词器
- name:类型为object,需要定义多个子属性
- name.firstName;类型为字符串,但是不需要分词,因此是keyword;参与搜索,因此需要index为true;无需分词器
- name.lastName;类型为字符串,但是不需要分词,因此是keyword;参与搜索,因此需要index为true;无需分词器
由于 ES 使用 RESTFul 的接口风格,因此创建一个索引库只需要在 Kibana 中编写 PUT /索引库名称{...}
即可:
PUT /heima
{
"mappings": {
"properties": {
"info":{
"type": "text",
"analyzer": "ik_smart"
},
"email":{
"type": "keyword",
"index": "falsae"
},
"name":{
"properties": {
"firstName": {
"type": "keyword"
}
}
},
// ... 略
}
}
}
GET、DELETE
查看索引库语法:
GET /索引库名称
删除索引库的语法:
DELETE /索引库名称
PUT
倒排索引结构虽然不复杂,但是一旦数据结构改变(比如改变了分词器),就需要重新创建倒排索引,这简直是灾难。因此索引库一旦创建,无法修改mapping。
虽然无法修改 mapping 中已有的字段,但是却允许添加新的字段到 mapping 中,因为不会对倒排索引产生影响。
语法示例:
PUT /索引库名/_mapping
{
"properties": {
"新字段名":{
"type": "integer"
}
}
}
Ⅱ、JavaAPI操作
Elasticsearch 官方提供了各种不同语言的客户端,用来操作 ES。这些客户端的本质就是组装DSL语句,通过 http 请求发送给ES。
在 ES 提供的API中,与 ES 一切交互都封装在一个名为 RestHighLevelClient
的类中,必须先完成这个对象的初始化,建立与 elasticsearch 的连接。
- 引入 es 的 RestHighLevelClient 依赖:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
- 因为 SpringBoot 默认的ES版本是7.6.2,所以我们需要覆盖默认的ES版本:
<properties>
<java.version>11</java.version>
<elasticsearch.version>7.12.1</elasticsearch.version>
</properties>
- 初始化 RestHighLevelClient,初始化的代码如下:
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://localhost:9200")
));
创建索引库
使用 RestClient 创建索引库有以下三步骤:
- 创建 Request 对象。因为是创建索引库的操作,因此 Request 是
CreateIndexRequest
。 - 添加请求参数,其实就是DSL的 JSON 参数部分。
- 发送请求,
client.indices()
方法的返回值是 IndicesClient 类型,封装了所有与索引库操作有关的方法。
使用下列代码可通过 RestClient 创建索引库,其中 HotelConstants.MAPPING_TEMPLATE
是为简化书写定义的一个 mapping 映射静态字符串常量。
@Test
void testCreateHotelIndex() throws IOException {
//创建Request对象
CreateIndexRequest request = new CreateIndexRequest("hotel");
//准备请求参数——DSL语句
request.source(HotelConstants.MAPPING_TEMPLATE, XContentType.JSON);
//发送创建索引库请求
client.indices().create(request, RequestOptions.DEFAULT);
}
删除索引库
和创建的语法类似,只需要创建 DeleteIndexRequest
对象,发送请求即可。
@Test
void testDeleteHotelIndex() throws IOException {
// 1.创建Request对象
DeleteIndexRequest request = new DeleteIndexRequest("hotel");
// 2.发送请求
client.indices().delete(request, RequestOptions.DEFAULT);
}
判断索引库是否存在
判断索引库是否存在,本质就是查询,因此与删除的 Java 代码流程是类似的。依然是三步走:
@Test
void testExistsHotelIndex() throws IOException {
// 1.创建Request对象
GetIndexRequest request = new GetIndexRequest("hotel");
// 2.发送请求
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
// 3.输出
System.err.println(exists ? "索引库已经存在!" : "索引库不存在!");
}
RestClient 操作 ES 的流程基本类似。核心是 client.indices()
方法来获取索引库的操作对象。
二、文档操作
Ⅰ、DSL语法操作
POST
新增文档的 DSL 语法如下:
POST /索引库名/_doc/文档id
{
"字段1": "值1",
"字段2": "值2",
"字段3": {
"子属性1": "值3",
"子属性2": "值4"
},
// ...
}
GET、DELETE
根据 RESTFul 风格,新增是 post,查询应该是 get,删除是 delete,不过查询一般都需要条件,这里我们把文档id带上。
查看文档语法:
GET /索引库名称/_doc/文档id
删除文档语法:
DELETE /索引库名称/_doc/文档id
PUT
修改有两种方式:
- 全量修改:直接覆盖原来的文档
- 增量修改:修改文档中的部分字段
全量修改是覆盖原来的文档,其本质是根据指定的 id 删除文档,随后新增一个相同 id 的文档:
PUT /索引库名/_doc/文档id
{
"字段1": "值1",
"字段2": "值2",
// ... 略
}
增量修改是只修改指定id匹配的文档中的部分字段,使用 _update
:
POST /索引库名/_update/文档id
{
"doc": {
"字段名": "新的值",
}
}
Ⅱ、JavaAPI操作
我们预先在 mysql 数据库中准备了 tb_hotel 的信息表,并在 IHotelService
提供对数据库的 CRUD 操作方法。我们要将数据库的酒店数据查询出来,写入 elasticsearch 中。
数据库查询后的结果是一个 Hotel
类型的对象。结构如下:
@Data
@TableName("tb_hotel")
public class Hotel {
@TableId(type = IdType.INPUT)
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String longitude;
private String latitude;
private String pic;
}
与我们的索引库结构存在差异:
- longitude和latitude需要合并为location
因此,我们需要定义一个新的实体类 HotelDoc
,与索引库结构吻合:
package cn.itcast.hotel.pojo;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class HotelDoc {
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String location;
private String pic;
public HotelDoc(Hotel hotel) {
this.id = hotel.getId();
this.name = hotel.getName();
this.address = hotel.getAddress();
this.price = hotel.getPrice();
this.score = hotel.getScore();
this.brand = hotel.getBrand();
this.city = hotel.getCity();
this.starName = hotel.getStarName();
this.business = hotel.getBusiness();
this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
this.pic = hotel.getPic();
}
}
新增文档
新增文档与创建索引库类似,同样是三步走:
- 创建Request对象
- 准备请求参数,也就是DSL中的JSON文档
- 发送请求
变化的地方在于,这里直接使用 client.*() 的 API,不再需要 client.indices() 了。另外需要注意的是:
- 酒店数据来自于数据库,我们需要先查询出来,得到 Hotel 对象
- Hotel 对象需要转化为 HotelDoc 对象
- HotelDoc 需要序列化为 json 格式
@Test
void testAddDocument() throws IOException {
//获取单条旅馆数据
Hotel hotel = hotelService.getById(61083L);
//转换为文档类型
HotelDoc hotelDoc = new HotelDoc(hotel);
//添加文档至ES
IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString());
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
client.index(request, RequestOptions.DEFAULT);
}
获取和删除文档
从 ES 获取数据获得的是一个 GetResponse
对象,其 getSourceAsString()
方法将返回一个 json 字符串,而在解析结果的过程中应将其转换为一个 Java 对象。
@Test
void testGetDocumentById() throws IOException {
//通过文档id查询索引数据
GetRequest request = new GetRequest("hotel", "61083");
GetResponse response = client.get(request, RequestOptions.DEFAULT);
//将JSON转化为java对象
String source = response.getSourceAsString();
HotelDoc hotelDoc = JSON.parseObject(source, HotelDoc.class);
System.out.println(hotelDoc);
}
删除文档操作类似且更为简单,创建 DeleteRequest
发送请求即可。
@Test
void testDeleteDocument() throws IOException {
// 1.准备Request
DeleteRequest request = new DeleteRequest("hotel", "61083");
// 2.发送请求
client.delete(request, RequestOptions.DEFAULT);
}
更新文档
在RestClient的API中,全量修改与新增的API完全一致,判断依据是ID是否存在:
- 如果新增时,ID已经存在,则修改
- 如果新增时,ID不存在,则新增
这里不再赘述,我们主要关注增量修改。
@Test
void testUpdateDocument() throws IOException {
// 1.准备Request
UpdateRequest request = new UpdateRequest("hotel", "61083");
// 2.准备请求参数
request.doc(
"price", "952",
"starName", "四钻"
);
// 3.发送请求
client.update(request, RequestOptions.DEFAULT);
}
request.doc()
方法中的两个参数为一组,分别对应 key 和 value 值。
批量导入文档
我们可以利用 BulkRequest 批量将数据库数据导入到索引库中。批量处理 BulkRequest,其本质就是将多个普通的 CRUD 请求组合在一起发送,因此 Bulk 中添加了多个 IndexRequest,就是批量新增功能了。
我们在导入酒店数据时,将代码改造成 for 循环处理即可。
@Test
void testBulkRequest() throws IOException {
// 批量查询酒店数据
List<Hotel> hotels = hotelService.list();
// 1.创建Request
BulkRequest request = new BulkRequest();
// 2.准备参数,添加多个新增的Request
for (Hotel hotel : hotels) {
// 2.1.转换为文档类型HotelDoc
HotelDoc hotelDoc = new HotelDoc(hotel);
// 2.2.创建新增文档的Request对象
request.add(new IndexRequest("hotel")
.id(hotelDoc.getId().toString())
.source(JSON.toJSONString(hotelDoc), XContentType.JSON));
}
// 3.发送请求
client.bulk(request, RequestOptions.DEFAULT);
}
三、搜索操作
elasticsearch 的查询依然是基于 RESTFul 风格的DSL来实现的。
Ⅰ、DSL语法操作
Elasticsearch 提供了基于JSON的DSL(Domain Specific Language)来定义查询。常见的查询类型包括:
- 查询所有:查询出所有数据,一般测试用。例如:match_all
- 全文检索(full text)查询:利用分词器对用户输入内容分词,然后去倒排索引库中匹配。例如:
- match_query
- multi_match_query
- 精确查询:根据精确词条值查找数据,一般是查找keyword、数值、日期、boolean等类型字段。例如:
- ids
- range
- term
- 地理(geo)查询:根据经纬度查询。例如:
- geo_distance
- geo_bounding_box
- 复合(compound)查询:复合查询可以将上述各种查询条件组合起来,合并查询条件。例如:
- bool
- function_score
查询的语法基本一致:
GET /indexName/_search
{
"query": {
"查询类型": {
"查询条件": "条件值"
}
}
}
我们以查询所有为例,其中:
- 查询类型为 match_all
- 没有查询条件
// 查询所有
GET /indexName/_search
{
"query": {
"match_all": {
}
}
}
其它查询无非就是查询类型、查询条件的变化。
全文检索查询
全文检索查询的基本流程如下:
- 对用户搜索的内容做分词,得到词条
- 根据词条去倒排索引库中匹配,得到文档id
- 根据文档id找到文档,返回给用户
比较常用的场景包括:
- 商城的输入框搜索
- 百度输入框搜索
因为是拿着词条去匹配,因此参与搜索的字段也必须是可分词的 text 类型的字段。
常见的全文检索查询包括:
- match 查询:单字段查询
- multi_match 查询:多字段查询,任意一个字段符合条件就算符合查询条件
match 查询语法如下:
GET /indexName/_search
{
"query": {
"match": {
"FIELD": "TEXT"
}
}
}
mulit_match 语法如下:
GET /indexName/_search
{
"query": {
"multi_match": {
"query": "TEXT",
"fields": ["FIELD1", " FIELD12"]
}
}
}
搜索字段越多,对查询性能影响越大,因此建议在创建索引库时采用 copy_to 拷贝到一个新字段,然后使用单字段查询的方式查询这个组合字段。
精确查询
精确查询一般是查找keyword、数值、日期、boolean等类型字段。所以不会对搜索条件分词。常见的有:
- term:根据词条精确值查询
- range:根据值的范围查询
因为精确查询的字段搜是不分词的字段,因此查询的条件也必须是不分词的词条。查询时,用户输入的内容跟字段值完全匹配时才认为符合条件。如果用户输入的内容过多,反而搜索不到数据。
// term查询
GET /indexName/_search
{
"query": {
"term": {
"FIELD": {
"value": "VALUE"
}
}
}
}
范围查询,一般应用在对数值类型做范围过滤的时候。比如做价格范围过滤。
// range查询
GET /indexName/_search
{
"query": {
"range": {
"FIELD": {
"gte": 10, // 这里的gte代表大于等于,gt则代表大于
"lte": 20 // lte代表小于等于,lt则代表小于
}
}
}
}
地理坐标查询
所谓的地理坐标查询,其实就是根据经纬度查询,官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/geo-queries.html
常见的使用场景包括:
- 携程:搜索我附近的酒店
- 滴滴:搜索我附近的出租车
- 微信:搜索我附近的人
- 矩形范围查询,也就是
geo_bounding_box
查询,查询坐标落在某个矩形范围的所有文档:
查询时,需要指定矩形的左上、右下两个点的坐标,然后画出一个矩形,落在该矩形内的都是符合条件的点。
// geo_bounding_box查询
GET /indexName/_search
{
"query": {
"geo_bounding_box": {
"FIELD": {
"top_left": { // 左上点
"lat": 31.1,
"lon": 121.5
},
"bottom_right": { // 右下点
"lat": 30.9,
"lon": 121.7
}
}
}
}
}
- 附近查询,也叫做距离查询(geo_distance):查询到指定中心点小于某个距离值的所有文档。
换句话来说,在地图上找一个点作为圆心,以指定距离为半径,画一个圆,落在圆内的坐标都算符合条件:
// geo_distance 查询
GET /indexName/_search
{
"query": {
"geo_distance": {
"distance": "15km", // 半径
"FIELD": "31.21,121.5" // 圆心
}
}
}
复合查询
复合(compound)查询:复合查询可以将其它简单查询组合起来,实现更复杂的搜索逻辑。常见的有两种:
- fuction score:算分函数查询,可以控制文档相关性算分,控制文档排名
- bool query:布尔查询,利用逻辑关系组合多个其它的查询,实现复杂搜索
1. 文档相关度及算分函数查询:
当我们利用 match 查询时,文档结果会根据与搜索词条的关联度打分(_score),返回结果时按照分值降序排列。
在 elasticsearch 中,早期使用的打分算法是 TF-IDF 算法,在后来的5.1版本升级中, elasticsearch 将算法改进为 BM25 算法。
TF-IDF 算法有一各缺陷,就是词条频率越高,文档得分也会越高,单个词条对文档影响较大。而BM25则会让单个词条的算分有一个上限,曲线更加平滑。
根据相关度打分是比较合理的需求,但合理的不一定是产品经理需要的。
以百度为例,你搜索的结果中,并不是相关度越高排名越靠前,而是谁掏的钱多排名就越靠前。
要想控制相关性算分,就需要利用 ES 中的 function score 查询了。
function score 查询中包含四部分内容:
- 原始查询条件:query 部分,基于这个条件搜索文档,并且基于BM25算法给文档打分,原始算分(query score)
- 过滤条件:filter 部分,符合该条件的文档才会重新算分
- 算分函数:符合 filter 条件的文档要根据这个函数做运算,得到的函数算分(function score),有四种函数
- weight:函数结果是常量
- field_value_factor:以文档中的某个字段值作为函数结果
- random_score:以随机数作为函数结果
- script_score:自定义算分函数算法
- 运算模式:算分函数的结果、原始查询的相关性算分,两者之间的运算方式,包括:
- multiply:相乘
- replace:用function score替换query score
- 其它,例如:sum、avg、max、min
示例:给“如家”这个品牌的酒店排名靠前一些。翻译一下这个需求,转换为之前说的四个要点:
- 原始条件:不确定,可以任意变化
- 过滤条件:brand = "如家"
- 算分函数:可以简单粗暴,直接给固定的算分结果,weight
- 运算模式:比如求和
因此最终的DSL语句如下:
GET /hotel/_search
{
"query": {
"function_score": {
"query": {}, // 原始查询,可以是任意条件
"functions": [ // 算分函数
{
"filter": { // 满足的条件,品牌必须是如家
"term": {
"brand": "如家"
}
},
"weight": 2 // 算分权重为2
}
],
"boost_mode": "sum" // 加权模式,求和
}
}
}
2. 布尔查询:
布尔查询是一个或多个查询子句的组合,每一个子句就是一个子查询。子查询的组合方式有:
- must:必须匹配每个子查询,类似“与”
- should:选择性匹配子查询,类似“或”
- must_not:必须不匹配,不参与算分,类似“非”
- filter:必须匹配,不参与算分
GET /hotel/_search
{
"query": {
"bool": {
"must": [
{"term": {"city": "上海" }}
],
"should": [
{"term": {"brand": "皇冠假日" }},
{"term": {"brand": "华美达" }}
],
"must_not": [
{ "range": { "price": { "lte": 500 } }}
],
"filter": [
{ "range": {"score": { "gte": 45 } }}
]
}
}
}
排序、分页与高亮
1. 搜索结果排序:
keyword、数值、日期类型排序的语法基本一致。
GET /indexName/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"FIELD": "desc" // 排序字段、排序方式ASC、DESC
}
]
}
排序条件是一个数组,也就是可以写多个排序条件。按照声明的顺序,当第一个条件相等时,再按照第二个条件排序,以此类推。
地理坐标排序略有不同。
GET /indexName/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"_geo_distance" : {
"FIELD" : "纬度,经度", // 文档中geo_point类型的字段名、目标坐标点
"order" : "asc", // 排序方式
"unit" : "km" // 排序的距离单位
}
}
]
}
2. 搜索结果分页:
elasticsearch 默认情况下只返回 top10 的数据。而如果要查询更多数据就需要修改分页参数了。elasticsearch中通过修改from、size参数来控制要返回的分页结果:
- from:从第几个文档开始
- size:总共查询几个文档
类似于mysql中的limit ?, ?
GET /hotel/_search
{
"query": {
"match_all": {}
},
"from": 0, // 分页开始的位置,默认为0
"size": 10, // 期望获取的文档总数
"sort": [
{"price": "asc"}
]
}
3. 指定字段高亮:
什么是高亮显示呢?
我们在百度,京东搜索时,关键字会变成红色,比较醒目,这叫高亮显示,高亮显示的实现分为两步:
- 给文档中的所有关键字都添加一个标签,例如
<em>
标签 - 页面给
<em>
标签编写CSS样式
GET /hotel/_search
{
"query": {
"match": {
"FIELD": "TEXT" // 查询条件,高亮一定要使用全文检索查询
}
},
"highlight": {
"fields": { // 指定要高亮的字段
"FIELD": {
"pre_tags": "<em>", // 用来标记高亮字段的前置标签
"post_tags": "</em>" // 用来标记高亮字段的后置标签
}
}
}
}
注意:
- 高亮是对关键字高亮,因此搜索条件必须带有关键字,而不能是范围这样的查询。
- 默认情况下,高亮的字段,必须与搜索指定的字段一致,否则无法高亮
- 如果要对非搜索字段高亮,则需要添加一个属性:
required_field_match=false
数据聚合
聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算。例如:
- 什么品牌的手机最受欢迎?
- 这些手机的平均价格、最高价格、最低价格?
- 这些手机每月的销售情况如何?
实现这些统计功能的比数据库的sql要方便的多,而且查询速度非常快,可以实现近实时搜索效果。
聚合常见的有三类:
- 桶(Bucket)聚合:用来对文档做分组
- TermAggregation:按照文档字段值分组,例如按照品牌值分组、按照国家分组
- Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
- 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Stats:同时求max、min、avg、sum等
- 管道(pipeline)聚合:其它聚合的结果为基础做聚合
注意:参加聚合的字段必须是keyword、日期、数值、布尔类型
1. Bucket 桶聚合语法:
现在,我们要统计所有数据中的酒店品牌有几种,其实就是按照品牌对数据分组。此时可以根据酒店品牌的名称做聚合,也就是 Bucket 聚合。
默认情况下,Bucket 聚合是对索引库的所有文档做聚合,但真实场景下,用户会输入搜索条件,因此聚合必须是对搜索结果聚合。那么聚合必须添加限定条件。
我们可以限定要聚合的文档范围,只要添加 query 条件即可:
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200 // 只对200元以下的文档聚合
}
}
},
"size": 0, // 设置size为0,结果中不包含文档,只包含聚合结果
"aggs": { // 定义聚合
"brandAgg": { //给聚合起个名字
"terms": { // 聚合的类型,按照品牌值聚合,所以选择term
"field": "brand", // 参与聚合的字段
"order": {
"_count": "asc" // 按照_count升序排列
},
"size": 20 // 希望获取的聚合结果数量
}
}
}
}
2. Metric聚合语法:
我们对酒店按照品牌分组,形成了一个个桶。现在我们需要对桶内的酒店做运算,获取每个品牌的用户评分的min、max、avg等值。
这就要用到Metric聚合了,例如stat聚合:就可以获取min、max、avg等结果。
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
},
"aggs": { // 是brands聚合的子聚合,也就是分组后对每组分别计算
"score_stats": { // 聚合名称
"stats": { // 聚合类型,这里stats可以计算min、max、avg等
"field": "score" // 聚合字段,这里是score
}
}
}
}
}
}
自动补全
当用户在搜索框输入字符时,我们应该提示出与该字符有关的搜索项,这种根据用户输入的字母,提示完整词条的功能,就是自动补全了。
如果需要根据拼音字母来推断,因此要用到拼音分词功能。而要实现根据字母做补全,就必须对文档按照拼音分词。在 GitHub 上恰好有 ES 的拼音分词插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin。
默认的拼音分词器会将每个汉字单独分为拼音,而我们希望的是每个词条形成一组拼音,需要对拼音分词器做个性化定制,形成自定义分词器。
elasticsearch 中分词器(analyzer)的组成包含三部分:
- character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
- tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
- tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等
声明自定义分词器的语法如下:
PUT /test
{
"settings": {
"analysis": {
"analyzer": { // 自定义分词器
"my_analyzer": { // 分词器名称
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": { // 自定义tokenizer filter
"py": { // 过滤器名称
"type": "pinyin", // 过滤器类型,这里是pinyin
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}
// 自动补全查询
GET /test/_search
{
"suggest": {
"title_suggest": {
"text": "s", // 关键字
"completion": {
"field": "title", // 补全查询的字段
"skip_duplicates": true, // 跳过重复的
"size": 10 // 获取前10条结果
}
}
}
}
Ⅱ、JavaAPI操作
快速入门
我们以match_all查询为例
- 第一步,创建
SearchRequest
对象,指定索引库名 - 第二步,利用
request.source()
构建DSL,DSL中可以包含查询、分页、排序、高亮等query()
:代表查询条件,利用QueryBuilders.matchAllQuery()
构建一个match_all查询的DSL
- 第三步,利用
client.search()
发送请求,得到响应
client.search()
方法将返回一个 SearchResponse
对象,我们还需要对响应数据进行结果解析。我们将解析结果的测试代码封装为 handleResponse()
方法,方便重复调用。
@Test
void testMatchAll() throws IOException {
// 1.准备Request
SearchRequest request = new SearchRequest("hotel");
// 2.准备DSL
request.source().query(QueryBuilders.matchAllQuery());
// 3.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析响应
handleResponse(response);
}
private void handleResponse(SearchResponse response) {
// 4.解析响应
SearchHits searchHits = response.getHits();
// 4.1.获取总条数
long total = searchHits.getTotalHits().value;
System.out.println("共搜索到" + total + "条数据");
// 4.2.文档数组
SearchHit[] hits = searchHits.getHits();
// 4.3.遍历
for (SearchHit hit : hits) {
// 获取文档source
String json = hit.getSourceAsString();
// 反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
System.out.println("hotelDoc = " + hotelDoc);
}
}
其他的查询步骤基本类似,在构建查询条件对象时使用不同的方法构建即可:
- match:
QueryBuilders.matchQuery(FILED, TEXT)
- term:
QueryBuilders.termQuery(FILED, TEXT)
- range:
QueryBuilders.rangeQuery(FILED).lte(number)
布尔复合查询
布尔查询是用must、must_not、filter等方式组合其它查询。
与其它查询的差别同样是在查询条件的构建——QueryBuilders,结果解析等其他代码完全不变。完整代码如下:
@Test
void testBool() throws IOException {
// 1.准备Request
SearchRequest request = new SearchRequest("hotel");
// 2.准备DSL
// 2.1.准备BooleanQuery
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 2.2.添加term
boolQuery.must(QueryBuilders.termQuery("city", "杭州"));
// 2.3.添加range
boolQuery.filter(QueryBuilders.rangeQuery("price").lte(250));
request.source().query(boolQuery);
// 3.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析响应
handleResponse(response);
}
排序与分页
搜索结果的排序与分页是与 query 同级的参数,因此同样是使用 request.source()
来设置。
@Test
void testPageAndSort() throws IOException {
//创建SearchRequest对象获取数据并封装到SearchResponse对象
SearchRequest request = new SearchRequest("hotel");
//创建BoolQuery对象并添加bool条件
request.source().query(QueryBuilders.matchAllQuery());
//排序API,按价格升序
request.source().sort("price", SortOrder.ASC);
//分页API,查询大小为5条的第0页
request.source().from(0).size(5);
//发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//获取解析结果对象
SearchHits searchHits = response.getHits();
//处理查询数据
handleResponse(searchHits);
}
搜索结果高亮
高亮的代码与之前代码差异较大,有两点:
- 查询的DSL:其中除了查询条件,还需要添加高亮条件,同样是与query同级。
- 结果解析:结果除了要解析_source文档数据,还要解析高亮结果
@Test
void testHighlighter() throws IOException {
//创建SearchRequest对象获取数据并封装到SearchResponse对象
SearchRequest request = new SearchRequest("hotel");
//创建BoolQuery对象并添加bool条件
request.source().query(QueryBuilders.matchQuery("all", "如家"));
//高亮显示关键词
request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));
//发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//获取解析结果对象
SearchHits searchHits = response.getHits();
//处理查询数据
SearchHit[] hits = searchHits.getHits();
for (SearchHit hit : hits) {
//封装HotelDoc对象
String json = hit.getSourceAsString();
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
//获取高亮部分并封装
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
if (!CollectionUtils.isEmpty(highlightFields)) {
HighlightField highlightField = highlightFields.get("name");
if (highlightField != null) {
String name = highlightField.getFragments()[0].string();
hotelDoc.setName(name);
}
}
System.out.println("hotelDoc=" + hotelDoc);
}
}
数据聚合
聚合条件与query条件同级别,因此需要使用 request.source()
来指定聚合条件。
需求:搜索页面的品牌、城市等信息不应该是在页面写死,而是通过聚合索引库中的酒店数据得来的。
使用聚合功能,利用 Bucket 聚合,对搜索结果中的文档基于品牌分组、基于城市分组,就能得知包含哪些品牌、哪些城市了。
因为是对搜索结果聚合,因此聚合是限定范围的聚合,也就是说聚合的限定条件跟搜索文档的条件一致。
@Override
public Map<String, List<String>> filters(RequestParams params) {
try {
// 1.准备Request
SearchRequest request = new SearchRequest("hotel");
// 2.准备DSL
// 2.1.query
buildBasicQuery(params, request);
// 2.2.设置size
request.source().size(0);
// 2.3.聚合
buildAggregation(request);
// 3.发出请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析结果
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();
// 4.1.根据品牌名称,获取品牌结果
List<String> brandList = getAggByName(aggregations, "brandAgg");
result.put("品牌", brandList);
// 4.2.根据品牌名称,获取品牌结果
List<String> cityList = getAggByName(aggregations, "cityAgg");
result.put("城市", cityList);
// 4.3.根据品牌名称,获取品牌结果
List<String> starList = getAggByName(aggregations, "starAgg");
result.put("星级", starList);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void buildAggregation(SearchRequest request) {
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100)
);
}
private List<String> getAggByName(Aggregations aggregations, String aggName) {
// 4.1.根据聚合名称获取聚合结果
Terms brandTerms = aggregations.get(aggName);
// 4.2.获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
// 4.3.遍历
List<String> brandList = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
// 4.4.获取key
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}
自动补全
现在,我们的 hotel 索引库还没有设置拼音分词器,需要修改索引库中的配置。但是我们知道索引库是无法修改的,只能删除然后重新创建。
另外,我们需要添加一个字段,用来做自动补全,将brand、suggestion、city等都放进去,作为自动补全的提示。
在完成修改酒店映射结构、修改 HotelDoc 实体类属性后,以下代码可以实现依照拼音提示自动补全:
@Override
public List<String> getSuggestions(String prefix) {
try {
// 1.准备Request
SearchRequest request = new SearchRequest("hotel");
// 2.准备DSL
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix(prefix)
.skipDuplicates(true)
.size(10)
));
// 3.发起请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析结果
Suggest suggest = response.getSuggest();
// 4.1.根据补全查询名称,获取补全结果
CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
// 4.2.获取options
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
// 4.3.遍历
List<String> list = new ArrayList<>(options.size());
for (CompletionSuggestion.Entry.Option option : options) {
String text = option.getText().toString();
list.add(text);
}
return list;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
四、数据同步
elasticsearch 中的酒店数据来自于 mysql 数据库,因此 mysql 数据发生改变时,elasticsearch 也必须跟着改变,这个就是 elasticsearch 与 mysql 之间的数据同步。
Ⅰ、思路分析
常见的数据同步方案有三种:
- 同步调用
- 异步通知
- 监听binlog
1. 同步调用:
基本步骤如下:
- hotel-demo对外提供接口,用来修改elasticsearch中的数据
- 酒店管理服务在完成数据库操作后,直接调用hotel-demo提供的接口,
- 优点:实现简单,粗暴
- 缺点:业务耦合度高
2. 异步通知:
流程如下:
- hotel-admin 对 mysql 数据库数据完成增、删、改后,发送MQ消息
- hotel-demo 监听MQ,接收到消息后完成 elasticsearch 数据修改
- 优点:低耦合,实现难度一般
- 缺点:依赖mq的可靠性
3. 监听binlog:
流程如下:
- 给 mysql 开启 binlog 功能
- mysql完成增、删、改操作都会记录在 binlog 中
- hotel-demo 基于 canal 监听 binlog 变化,实时更新 elasticsearch 中的内容
- 优点:完全解除服务间耦合
- 缺点:开启 binlog 增加数据库负担、实现复杂度高
Ⅱ、MQ 方案
我们选择基于消息队列实现数据同步,并在对应的服务引入 SpringAMQP 的依赖。
1. 定义静态常量声明队列交换机名称:
/**
* 定义RabbitMQ常量
*/
public class MqConstants {
/**
* 交换机
*/
public final static String HOTEL_EXCHANGE = "hotel.topic";
/**
* 监听新增和修改队列
*/
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/**
* 监听删除队列
*/
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/**
* 新增或修改的RoutingKey
*/
public final static String HOTEL_INSERT_KEY = "hotel.insert";
/**
* 删除的RoutingKey
*/
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
2. 使用 @Bean 注入的方式声明交换机:
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
3. 发送和接收消息:
在服务的中的增、删、改业务中分别发送MQ消息,并在 Listening 类中监听队列,处理消息。
@Component
public class HotelListener {
/**
* 监听酒店新增或修改的业务
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
try {
// 0.根据id查询酒店数据
Hotel hotel = getById(id);
// 转换为文档类型
HotelDoc hotelDoc = new HotelDoc(hotel);
// 1.准备Request对象
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
// 2.准备Json文档
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 3.发送请求
client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 监听酒店删除的业务
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
try {
// 1.准备Request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
// 2.发送请求
client.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
相关文章索引
SpringCloud微服务框架复习笔记
微服务异步通讯——RabbitMQ消息队列复习笔记