首页 > 编程语言 >Python elasticsearch-py类库基础用法

Python elasticsearch-py类库基础用法

时间:2024-01-28 19:33:09浏览次数:37  
标签:类库 index Python res py test print my es

实践环境

https://pypi.org/project/elasticsearch/

pip install elasticsearch==7.6.0

离线安装包及依赖包下载地址:

https://files.pythonhosted.org/packages/f5/71/45d36a8df68f3ebb098d6861b2c017f3d094538c0fb98fa61d4dc43e69b9/urllib3-1.26.2-py2.py3-none-any.whl#sha256=d8ff90d979214d7b4f8ce956e80f4028fc6860e4431f731ea4a8c08f23f99473

https://files.pythonhosted.org/packages/98/98/c2ff18671db109c9f10ed27f5ef610ae05b73bd876664139cf95bd1429aa/certifi-2023.7.22.tar.gz#sha256=539cc1d13202e33ca466e88b2807e29f4c13049d6d87031a3c110744495cb082

https://files.pythonhosted.org/packages/1f/12/7919c5d8b9c497f9180db15ea8ead6499812ea8264a6ae18766d93c59fe5/dataclasses-0.8.tar.gz#sha256=8479067f342acf957dc82ec415d355ab5edb7e7646b90dc6e2fd1d96ad084c97

https://files.pythonhosted.org/packages/6b/db/d934d605258d38bd470c83d535c3a73c3d01e4ad357ecb4336300fbb8e88/elastic-transport-8.4.1.tar.gz#sha256=e5548997113c5d9566c9a1a51ed67bce50a4871bc0e44b692166461279e4167e

https://files.pythonhosted.org/packages/94/1a/2369fc9264c655c20908053b59fae7f65ddc47f123d89b533a724ae1d19d/elasticsearch-7.6.0.tar.gz

Python3.6.2

Elasticsearch服务 7.6

注意:elasticsearch-py类库版本必须和Elasticsearch服务器版本保持对应,否则会有兼容性问题,具体如下,

# Elasticsearch 7.x
elasticsearch>=7.0.0,<8.0.0

# Elasticsearch 6.x
elasticsearch>=6.0.0,<7.0.0

# Elasticsearch 5.x
elasticsearch>=5.0.0,<6.0.0

# Elasticsearch 2.x
elasticsearch>=2.0.0,<3.0.0

代码实践

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime


def search_data_by_scroll(es, index, body):
    '''翻页查询'''

    request_timeout = 600
    scroll = '20m' # 用于配置scroll有效期--滚动搜索上下文保留时间,默认不能超过 1d (24 小时). 可使用search.max_keep_alive集群配置修改该值

    query_result = es.search(index=index,
                                  scroll=scroll,
                                  body=body,
                                  request_timeout=request_timeout)
    scroll_id = query_result['_scroll_id']

    while len(query_result['hits']['hits']) > 0:
        yield query_result['hits']['hits']
        query_result = es.scroll(scroll_id=scroll_id,
                                      scroll=scroll,
                                      request_timeout=request_timeout)
        scroll_id = query_result['_scroll_id']


if __name__ == '__main__':
    # ES连接相关配置信息
    hosts = [ # ES 集群服务器信息
        {"host": "10.153.1.4", "port": 9200},
        {"host": "10.153.1.5", "port": 9200},
        {"host": "10.153.1.6", "port": 9200}
    ]
    username = 'myusername'  #  ES用户账号 可选配置, 如果无需访问验证, 则配置为空字符串 ''
    password = 'mypassword'  # ES用户密码 可选配置,如果无需访问验证, 则配置为空字符串 ''


    es = Elasticsearch(hosts=hosts,
                       http_auth=(username, password),
                       # 配置连接前进行探测
                       sniff_on_connection_fail = True,  # 节点无响应时刷新节点
                       sniffer_timeout = 60  # 设置超时时间,单位 秒
                       )

    ############### 增 ###############
    #### 创建索引
    #创建索引,如果索引已存在,则报错
    res = es.indices.create(index='my-test-index')
    print(res) # 输出:{'acknowledged': True, 'shards_acknowledged': True, 'index': 'my-test-index'}

    # 创建索引,如果索引已存在,则忽略400错误, 形如 elasticsearch.exceptions.RequestError: RequestError(400, 'resource_already_exists_exception', 'index [my-test-index/rOk1sVW9R46GZq9o7ACVxQ] already exists')
    res = es.indices.create(index='my-test-index', ignore=400)
    print(res) # 输出索引已存在相关错误信息

    # 定义mapping body
    index_mappings = {
        'mappings': { # key固定为 mappings
            'properties': {
                'name': {
                    'type': 'keyword'
                },
                'age': {
                    'type': 'integer'
                },
                'tags': {
                    'type': 'text'
                },
                "timestamp": {
                    "type": "date"
                }
            }
        },
        'settings': {
            'index': {
                'number_of_shards': '3',
                'number_of_replicas': '0'
            }
        }
    }

    # 创建索引的同时,设置mapping
    es.indices.create(index='my-test-index', body=index_mappings, ignore=400)

    #### 为索引创建别名
    # 为单个索引创建别名
    print(es.indices.put_alias(index='my-test-index', name='my-test-index-alias')) # 输出:{'acknowledged': True}

    res = es.indices.create(index='my-test-index2', ignore=400)
    print(res)
    # 为多个索引创建同一个别名,联合查询时用
    print(es.indices.put_alias(index=['my-test-index', 'my-test-index2'], name='test-index-alias')) # 输出:{'acknowledged': True}

    #### 插入数据
    # 插入单条数据
    data = {
        'name': '张三',
        'age': 18,
        'tags': '勤奋',
        'timestamp': datetime.now()
    }
    res = es.index(index='my-test-index', body=data)
    print(res) # 输出:{'_index': 'my-test-index', '_type': '_doc', '_id': 'ELlNE4sBRHfq82dAAhMz', '_version': 1, 'result': 'created', '_shards': {'total': 1, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}

    # 插入记录时,指定记录ID
    data = {
        'name': '晓晓',
        'age': 19,
        'tags': '尊师',
        'timestamp': datetime.now()
    }
    res = es.index(index='my-test-index', id=1, body=data)
    print(res) # 输出:{'_index': 'my-test-index', '_type': '_doc', '_id': '1', '_version': 1, 'result': 'created', '_shards': {'total': 1, 'successful': 1, 'failed': 0}, '_seq_no': 2, '_primary_term': 1}


    # 插入记录时,指定文档类型
    # res = es.index(index='mytest-index', doc_type='person', id=1, body=data) # 注意:如果索引不存在,会自动创建索引
    # print(res) # {'_index': 'mytest-index', '_type': 'person', '_id': '1', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}


    # 批量插入数据
    insert_records = []
    index_name = 'my-test-index'
    data1 = {
        '_index': index_name,
        'name': '李四',
        'age': 20,
        'tags': '和善',
        'timestamp': datetime.now()
    }
    data2 = {
        '_index': index_name,
        'name': '王五',
        'age': 19,
        'tags': '好学',
        'timestamp': datetime.now()
    }
    insert_records.append(data1)
    insert_records.append(data2)
    res = helpers.bulk(client=es, actions=insert_records)
    print(res) # 输出:(2, [])

    ############### 改 ###############
    #### 更新记录
    # 使用 index 方法 # 注意:使用index实现更新时,body数据必须包含记录的全部字段,否则对应记录,未包含的字段将会被删除
    data = {
        'name': '晓晓',
        'age': 23,
        'timestamp': datetime.now()
    }
    res = es.index(index='mytest-index', id='1', body=data) # id为es中记录的_id值
    print(res) # 输出:{'_index': 'mytest-index', '_type': '_doc', '_id': '1', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}


    # 使用 update 方法
    body = {
        'doc': {
            'tags': '尊师重教',
        }
    }
    res = es.update(index='mytest-index', id='1', body=body)
    print(res) # 输出:{'_index': 'mytest-index', '_type': '_doc', '_id': '1', '_version': 3, 'result': 'updated', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 2, '_primary_term': 1}


    ############### 查 ###############
    # 查看全部索引的信息
    index_info = es.indices.get('*')
    # print(index_info) # 输出数据格式同 查看某个索引的信息 调用输出

    # 查看某个索引的信息
    index_info = es.indices.get('mytest-index')
    print(index_info) # 输出:{'mytest-index': {'aliases': {}, 'mappings': {'properties': {'age': {'type': 'long'}, 'name': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}, 'tags': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}, 'timestamp': {'type': 'date'}}}, 'settings': {'index': {'search': {'slowlog': {'threshold': {'query': {'warn': '2s', 'info': '1s'}}}}, 'number_of_shards': '1', 'provided_name': 'mytest-index', 'creation_date': '1696837120143', 'number_of_replicas': '1', 'uuid': 'EzxFtEyGQBKPUCu9usa8XA', 'version': {'created': '7060099'}}}}}

    # 查看多个指定索引的信息
    print(es.indices.get(index=['my-test-index', 'mytest-index']))

    # 查询索引相关信息
    body = '''
        {
           "query": {
              "match_all": {}
           }
        }'''
    body = body.strip()
    res = es.search(index=index_name,
                    doc_type='_doc', # 可以配置为None
                    request_timeout=120, # 设置查询超时时间120秒,es默认的查询超时时间是10s
                    body=body) # 注意:这里的body也可以是json,不一定要用字符串
    # print(res)

    # 查看索引是否存在
    print(es.indices.exists('my-test-index'))  # 输出:True

    # 获取指定索引中指定ID的记录信息
    res = es.get(index='mytest-index', id=1)
    print(res) # 输出:{'_index': 'mytest-index', '_type': '_doc', '_id': '1', '_version': 3, '_seq_no': 2, '_primary_term': 1, 'found': True, '_source': {'name': '晓晓', 'age': 23, 'timestamp': '2023-10-09T15:49:53.068134', 'tags': '尊师重教'}}

    # # 获取所有索引中指定ID的记录信息
    # res = es.get(index="*", id="1") # 报错了

    # 查询记录数
    doc_count_info = es.count(index=index_name)
    print(doc_count_info) # 输出:{'count': 4, '_shards': {'total': 3, 'successful': 3, 'skipped': 0, 'failed': 0}}

    # 查询记录信息
    res = es.search(index=index_name,
              request_timeout=120, # 设置查询超时时间120秒,es默认的查询超时时间是10s
              body=body)
    # print(res) # 反回一个字典,同界面查询结果

    # 查询指定索引,指定文档类型,指定ID记录是否存在
    print(es.exists(index='mytest-index', doc_type='doc', id='1')) # 输出:False

    # 查询指定索引中,指定ID记录是否存在
    print(es.exists(index='mytest-index', id='1')) # 输出:True

    # 翻页查询记录信息
    for data in search_data_by_scroll(es, 'index_presale_orderinfo', body):
        print(data) # 输出数据格式为列表, 形如 [{...},{...}]


    # 获取当前集群的基本信息
    print(es.info())

    # 获取集群健康信息
    print(es.cluster.health())

    # 获取整个集群的综合状态信息。
    print(es.cluster.state())

    # 返回群集的当前节点的信息
    print(es.cluster.stats())

    # 获取索引mapping
    print(es.indices.get_mapping(index='my-test-index'))

    # 获取索引设置
    # 获取单个索引的设置
    print(es.indices.get_settings(index='my-test-index'))
    # 获取多个索引的设置
    print(es.indices.get_settings(index=['my-test-index', 'my-test-index2']))
    # 获取所有索引的设置
    print(es.indices.get_settings(index='*'))

    # 获取索引别名
    # 获取单个索引的别名
    print(es.indices.get_alias(index='my-test-index'))
    # 获取多个索引的别名
    print(es.indices.get_alias(index=['my-test-index', 'my-test-index2']))

    # 获取任务列表
    print(es.tasks.list(detailed=True, timeout='60s'))
	
	
    ############### 删 ###############

    # 删除文档记录
    # 按id删除
    res = es.delete(index='mytest-index', id='1')
    print(res) # 输出:{'_index': 'mytest-index', '_type': '_doc', '_id': '1', '_version': 4, 'result': 'deleted', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 3, '_primary_term': 1}

    # 按条件删除
    body = {
        'query': {
            'match': {
                'name': '张三'
            }
        }
    }
    res = es.delete_by_query(index=index_name, body=body, ignore=[400, 404])
    print(res) # 输出:{'took': 25, 'timed_out': False, 'total': 1, 'deleted': 1, 'batches': 1, 'version_conflicts': 0, 'noops': 0, 'retries': {'bulk': 0, 'search': 0}, 'throttled_millis': 0, 'requests_per_second': -1.0, 'throttled_until_millis': 0, 'failures': []}

    # 删除索引别名
    # 删除单个索引的别名
    print(es.indices.delete_alias(index='my-test-index', name='my-test-index-alias')) # 输出:{'acknowledged': True}

    # 删除多个索引的指定别名,如果别名不存在,则忽略错误
    # print(es.indices.delete_alias(index=['my-test-index', 'my-test-index2'], name=['test-index-alias']))

    # 删除多个索引的所有别名
    print(es.indices.delete_alias(index=['my-test-index', 'my-test-index2'], name='_all')) # 如果存在索引别名,则会报错


    # 删除索引
    print(es.indices.delete(index_name)) # 输出:{'acknowledged': True}

    # 删除索引,忽略400,404错误,索引不存在时,会报404错误
    print(es.indices.delete(index_name, ignore = [400, 404]))

注意:笔者实践时,发现运行以下语句,会报错

res = es.get(index="*", id="1") # 报错:elasticsearch.exceptions.AuthorizationException: AuthorizationException(403, 'security_exception', 'action [indices:data/read/get] is unauthorized for user [elastic]')

其它说明:

es.scroll(scroll_id=scroll_id, scroll='10m', request_timeout=20)

scroll 时间单位描述:

d   天
h   小时
m   分钟
s   秒

参考链接

https://pypi.org/project/elasticsearch/7.6.0/#description

https://elasticsearch-py.readthedocs.io/en/v8.10.0/

https://elasticsearch-py.readthedocs.io/en/v8.10.0/api.html

http://runxinzhi.com/bubu99-p-13580687.html

标签:类库,index,Python,res,py,test,print,my,es
From: https://www.cnblogs.com/shouke/p/17993185

相关文章

  • 快乐学Python,如何对数据进行清洗?(缺失值处理和重复值删除)
    上一篇文章中,我们介绍了通过pandas读取数据到DataFrame中之后,对DataFrame中数据的操作方式,这篇文章我们继续来介绍:数据清洗。即:当读取的数据出现缺失或异常时,我们如何对缺失的数据进行预处理。1、缺失值是什么?当我们从数据文件(CSV、Excel等)或者其他数据源加载到DataFrame中时,往......
  • 如何使用Python实现图像识别?
    原文链接:https://blog.csdn.net/qq_61433567/article/details/1311398481OpenCV(OpenSourceComputerVisionLibrary):用于图像处理和计算机视觉方面的函数库。2NumPy:用于处理数组和矩阵。3Matplotlib:用于绘制图像和图表。可以使用以下命令来安装这些库:pipinstallopencv-python......
  • 读论文-基于Python的协同过滤算法的研究与应用实现
    前言今天读的论文为一篇名为《基于Python的协同过滤算法的研究与应用实现》的论文,文章是在2019年9月发表于《电脑知识与技术》的一篇期刊论文。摘要随着科学技术的快速发展和知识产权的日益重要,大多数用户会选择在播放平台上看电影。例如腾讯视频、爱奇艺等,用户迫切需要一个合......
  • Python 实现 zip 分卷解压
    在上传数据备份到云端的时候,由于数据文件过大,可能会遇到各种各样的问题:比如49G的大文件gitlfspush到99%突然失败,你说心态爆炸不爆炸?再比如某些网盘会限制单个文件上传大小限制在4G以内;因此我们可能会用7z之类的压缩软件对数据文件进行分卷压缩,得到多个分卷文件,例如......
  • python 国内各大源列表:
    附,国内各大源列表:名称地址阿里 https://mirrors.aliyun.com/pypi/simple豆瓣 http://pypi.douban.com/simple/清华大学 https://pypi.tuna.tsinghua.edu.cn/simple中国科学技术大学 https://pypi.mirrors.ustc.edu.cn/simple华中理工大学 http://pypi.hustunique.com/simple山东......
  • python第五节:集合set(2)
    集合删除remove方法如果找不到元素则报错(KeyError)。如:setVar.remove(element)setVar:为一个set类型的变量element:集合中要查找并删除的元素函数作用:在集合setVar中查找element元素,如果存在则删除;如果没找到,则报错。例子1:set1={'a','b'}set1.remove('a')#set1.remove('c......
  • Git必知必会基础(18):PyCharm中使用Git
     本系列汇总,请查看这里:https://www.cnblogs.com/uncleyong/p/10854115.html目前主流ide是pycharm、idea等,他们可以集成git,下面简单分享下用法。数据准备远程仓库 演示:在PyCharm中使用Git(IDEA中类似)配置GitSettings——>VersionControl——>Git——>指定git命令的执行......
  • python02-变量及输出
    目标变量的作用定义变量认识数据类型一.变量的作用举例体验:我们去图书馆读书,怎么样快速找到自己想要的书籍呢?是不是管理员提前将书放到固定位置,并把这个位置进行了编号,我们只需要在图书馆中按照这个编号查找指定的位置就能找到想要的书籍。这个编号其实就是把书籍存放的......
  • C-like structures in Python
    bytes转Structuredefconvert_bytes_to_structure(st:object,byte:bytes):assertctypes.sizeof(st)==len(byte),'sizeerror!need:%d,give:%d'%(ctypes.sizeof(st),len(byte))#ctypes.memmove(ctypes.pointer(st),byte,ctypes.sizeof(st))......
  • python-01注释
    一.注释的作用没有注释的代码添加注释的代码通过用自己熟悉的语言,在程序中对某些代码进行标注说明,这就是注释的作用,能够大大增强程序的可读性。二.注释的分类及语法注释分为两类:单行注释和多行注释。单行注释只能注释一行内容,语法如下:#注释内容多行......