首页 > 编程语言 >Python elasticsearch 使用示例

Python elasticsearch 使用示例

时间:2023-07-30 23:06:54浏览次数:36  
标签:index name 示例 Python scroll elasticsearch print id es

这里简单的罗列了些关于ES的自动化运维过程中可能用到的脚本DEMO


创建索引并设置shards数

# 省略部分代码

from elasticsearch import Elasticsearch

src_es = Elasticsearch(hosts = configs.es_source_host,maxsize=16)
dest_es = Elasticsearch(hosts = configs.es_dest_host,maxsize=16)

def create_dest_index():
    # 注意:shards数在索引创建时候设置,后期再更改就比较费事了(后续再改shards数,需要锁写或者reindex到新的索引)
    try:
        dest_es.indices.create(
            index=configs.es_dest_index,
            body={"settings": {"index": {"number_of_shards": 4}}},
        )

    except Exception as e:
        print(str(e))


调整索引的settings

# 省略部分代码

from elasticsearch import Elasticsearch

src_es = Elasticsearch(hosts = configs.es_source_host,maxsize=16)
dest_es = Elasticsearch(hosts = configs.es_dest_host,maxsize=16)

def update_dest_index_setting(time_dur,replicas):
    try:
        res = dest_es.indices.put_settings(
            index=configs.es_dest_index,
            body={"index.refresh_interval": time_dur, "number_of_replicas": replicas},
        )
        print(res)
    except Exception as e:
        print(str(e))


批量造测试数据

# -*- coding: utf-8 -*-

from elasticsearch import Elasticsearch

es = Elasticsearch('http://127.0.0.1:9200/')

index_name = "your_index"

doc_body = {
    "name": "小王",
    "age": 22,
    "sex": "Male",
    "addr":
        {
            "city": "guangzhou",
            "code": 1678533
        }
}

for i in range(5000):
    es.index(index=index_name, id=i, body=doc_body)


bulk指定_id的写法

from elasticsearch import Elasticsearch

# 高版本ES中,默认的bulk的不再支持显式指定_id,但是可以用下面的方法

# 创建 Elasticsearch 客户端
es = Elasticsearch('http://192.168.1.181:9200/')

# 定义要执行的批量操作
bulk_data = [
    {"index": {"_index": "your_index", "_id": 1111}},
    {"name": "小王", "age": 22, "sex": "Male", "addr": {"city": "beijing", "code": 10012}},
    {"index": {"_index": "your_index", "_id": 2222}},
    {"name": "小李", "age": 32, "sex": "Male", "addr": {"city": "shanghai", "code": 10010}},
    {"index": {"_index": "your_index", "_id": 3333}},
    {"name": "小孙", "age": 13, "sex": "Male", "addr": {"city": "guangzhou", "code": 1678533}},
]

# 使用 bulk API 执行批量操作
response = es.bulk(index='your_index', body=bulk_data)
# print(response)

# 检查响应结果
if response['errors']:
    for item in response['items']:
        if 'error' in item['index']:
            print(f"Failed operation: {item['index']}")
else:
    print("Bulk operations completed successfully!")


scroll遍历-写法1

# -*- coding: utf-8 -*-
# es.search里面入参scroll,这种写法啰嗦,但是方便后续的逻辑处理
# (例如将数据捞出来然后拼装并写到其它index里面,具体的实现可以看 scroll查询-并发写入.py)

import time

from elasticsearch import Elasticsearch

es = Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])

start_ts = time.time()

scroll_time = '5m'  # 指定 Scroll 上下文的存活时间
index_name = 'index-test1'  # 替换为你的引名称

query = {
    "query": {
        "match_all": {}  # 查询所有文档
    }
}

# 初始化 Scroll 上下文
response = es.search(index=index_name, scroll=scroll_time, body=query,size=500)
scroll_id = response['_scroll_id']
print("scroll_id -->", scroll_id)

hits = response['hits']['hits']

# 计数下,用于最后确认scroll的数量情况
count = 0

# 处理第一批结果
for hit in hits:
    _id = hit["_id"]
    _source = hit["_source"]
    print(_id,_source)

count += 1

# 滚动获取剩余结果
while len(hits) > 0:
    response = es.scroll(scroll_id=scroll_id, scroll=scroll_time)
    scroll_id = response['_scroll_id']
    hits = response['hits']['hits']

    for hit in hits:
        _id, _source = hit["_id"], hit["_source"]
        print(_id,_source)

        count += 1

    print('------------------------------------------')


stop_ts = time.time()
print('scroll 遍历的总条数: ', count, '耗时(秒):', int(stop_ts - start_ts))


scroll遍历-写法2

# -*- coding: utf-8 -*-
# helpers.scan 迭代器的写法, 如果只是要为了取数据,可以用这种

import time

from elasticsearch import Elasticsearch, helpers

es = Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])

start_ts = time.time()

scroll_duration = '5m'  # 指定 Scroll 上下文的存时间
index = 't1'  # 替换为你的引名称

query = {
    "query": {
        "match_all": {}  # 查询所有文档
    }
}

response = es.search(index=index, scroll=scroll_duration, body=query, size=500)
scroll_id = response['_scroll_id']
print("scroll_id -->", scroll_id)

count = 0

for hit in helpers.scan(es, query=query, index=index, scroll=scroll_duration):
    _id, _source = hit["_id"], hit["_source"]
    print(_id, _source)
    count += 1

stop_ts = time.time()
print(f'scroll 遍历的总条数: {count} 耗时(秒): {int(stop_ts - start_ts)}')


scroll查询数据后bulk批量写入

# -*- coding: utf-8 -*-
import json
import time

from elasticsearch import Elasticsearch

src_es = Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])
dest_es = Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])

start_ts = time.time()

scroll_time = '5m'  # 指定 Scroll 上下文的存活时间

index_name = 'index-test1'  # 替换为你的引名称
dest_index_name = 'index-test2'  # 需要写入的索引名

err_log_name = str(int(time.time())) + '.log'

query = {
    "query": {
        "match_all": {}  # 查询所有文档
    }
}

# 初始化 Scroll 上下文
response = src_es.search(index=index_name, scroll=scroll_time, body=query,size=1000)
scroll_id = response['_scroll_id']
print("scroll_id -->", scroll_id)

hits = response['hits']['hits']

# 计数下,用于最后确认scroll的数量情况
count = 0

# 处理第一批结果
data_list1=[]
for hit in hits:
    _id = hit["_id"]
    _source = hit["_source"]

    data1={}
    doc = hit
    _id, _source = doc["_id"], doc["_source"]
    data1["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
    data_list1.append(data1["index"])
    data_list1.append(_source)

# 把第一次找出的数据,拼装好的结果写入目标ES
# print('----------------------------',data_list1)
dest_res = dest_es.bulk(index=dest_index_name, body=data_list1)
if dest_res["errors"]:
    for item in response["items"]:
        if "error" in item["index"]:
            print(f"Failed operation: {item['index']}")
else:
    print("Bulk operations completed successfully!")

count += 1

# 滚动获取剩余结果
while True:
    if len(hits) < 0:
        break

    response = src_es.scroll(scroll_id=scroll_id, scroll=scroll_time)
    scroll_id = response['_scroll_id']
    print("scroll_id ---> ", scroll_id )
    hits = response['hits']['hits']

    data_list2=[]
    for hit in hits:
        data2={}
        doc = hit
        _id, _source = doc["_id"], doc["_source"]
        data2["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
        data_list2.append(data2["index"])
        data_list2.append(_source)

    # 把拼装好的结果写入目标ES
    # print('----------------------------',data_list2)
    if len(data_list2) <=0:
        break
    dest_res = dest_es.bulk(index=dest_index_name, body=data_list2)
    if dest_res["errors"]:
        for item in response["items"]:
            if "error" in item["index"]:
                print(f"Failed operation: {item['index']}")
    else:
        print("Bulk operations completed successfully!")

        # time.sleep(1)

        count += 1

    print('------------------------------------------')


stop_ts = time.time()
print('scroll 遍历的总条数: ', count, '耗时(秒):', int(stop_ts - start_ts))


ES的SQL语法

# -*- coding: utf-8 -*-
# 参考 https://zhuanlan.zhihu.com/p/341906989
# 使用SQL查询ES有一定的局限性,没有原生的Query DSL那么强大,对于嵌套属性和某些函数的支持并不怎么好,但是平时用来查询下数据基本够用了。
# 官方文档 https://www.elastic.co/guide/en/elasticsearch/reference/current/xpack-sql.html


# 高版本的ES里面,自带了sql接口

"""
1、直接使用sql语法,执行ES的查询
POST /_sql
{
  "query": "SELECT count(*),k FROM sbtest1  WHERE k>954808 group by k LIMIT 10"
}

2、将sql语法转为querydsl语法
POST /_sql/translate
{
  "query": "SELECT count(*),k FROM sbtest1  WHERE k>954808 group by k LIMIT 10"
}
"""

import json

from elasticsearch import Elasticsearch

es = Elasticsearch(["192.168.1.181:9200"])

# SQL查询语句
query_sql = {
    "query": "SELECT count(*),k FROM sbtest1  WHERE k>954808 group by k having count(*)>1 LIMIT 10"
}

# 案例1 直接使用SQL语法查出的结果
res = es.sql.query(body=query_sql)
print('直接使用SQL语法查出的结果--->\n',json.dumps(res))


query_sql_2 = {
  "query": "SHOW TABLES"
}
res = es.sql.query(body=query_sql_2)
print('show tables 结果--->\n',json.dumps(res))

"""
结果:
直接使用SQL语法查出的结果--->
 {"columns": [{"name": "count(*)", "type": "long"}, {"name": "k", "type": "long"}], "rows": [[1, 954846], [1, 954847], [1, 954868], [1, 954875], [1, 954900], [1, 954910], [1, 954923], [1, 954948], [1, 954960], [1, 955017]]}
"""

# 案例2 将SQL翻译成QueryDSL
res = es.sql.translate(body=query_sql)
print('将SQL翻译成QueryDSL--->\n',json.dumps(res))

"""
结果:
将SQL翻译成QueryDSL--->
 {"size": 0, "query": {"range": {"k": {"from": 954808, "to": null, "include_lower": false, "include_upper": false, "boost": 1.0}}}, "_source": false, "stored_fields": "_none_", "aggregations": {"groupby": {"composite": {"size": 10, "sources": [{"345": {"terms": {"field": "k", "missing_bucket": true, "order": "asc"}}}]}}}}
"""


获取mapping和设置mapping

# -*- coding: utf-8 -*-

from elasticsearch import Elasticsearch

# 创建 Elasticsearch 客户端
es = Elasticsearch([{"host": "127.0.0.1", "port": 9200}])
index_name = "index-test1"
new_index_name = "index-test1"


# 1 创建索引,并设置shard数(shard数量只能在这里设置,不支持后续调整)
try:
    es.indices.create(
        index=new_index_name,
        body={"settings": {"index": {"number_of_shards": 4}}},
    )

except Exception as e:
    print(str(e))


# 2 调整索引的参数设置索引,例如持久化时间,副本数
try:
    es.indices.put_settings(
        index=new_index_name,
        body={"index.refresh_interval": "60s", "number_of_replicas": 0},
    )
except Exception as e:
    print(str(e))


# 3 获取指定索引的映射信息
mapping = es.indices.get_mapping(index=index_name)
mapping_src = mapping[index_name]["mappings"]
# print(mapping_src)


# 4 对新索引设置mapping
try:
    res = es.indices.put_mapping(body=mapping_src, index=new_index_name)
    print(res)
except Exception as e:
    print(str(e))


标签:index,name,示例,Python,scroll,elasticsearch,print,id,es
From: https://blog.51cto.com/lee90/6903541

相关文章

  • 使用Python脚本实现ElasticSearch的在线数据迁移
    该脚本的功能,类似于elasticsearch-dump,二者都是基于scroll来实现的(包括reindex底层也是scroll)。依赖包#我这里演示的ES是7.x的,如果下面的脚本运行报错,请考虑调整这里的python的elasticsearch包版本pipinstallelasticsearch==7.13.1配置文件vimconfigs.py#-*-coding:utf-8......
  • Python安装技术类库模块
    方法1:方法2:用如下命令安装即可(注意都得是英文字符):#简单粗暴,但是可能安装到了不同的环境pipinstallsome-package#复杂但是精准还快速C:\Python310\python.exe-mpipinstall-ihttps://pypi.tuna.tsinghua.edu.cn/simplesome-package其中:C:\Python310\python.......
  • Python报错 | ImportError: To be able to use evaluate-metric/seqeval, you need to
    报错信息使用metric=evaluate.load("seqeval")的时候,报如下错误:ImportError:Tobeabletouseevaluate-metric/seqeval,youneedtoinstallthefollowingdependencies['seqeval']using'pipinstallseqeval'forinstance'错误原因这个错误提示表......
  • Python - 使用 Matplotlib 可视化在 NetworkX 中生成的图形
    介绍Python代表了一种灵活的编码语言,以其易用性和清晰性而闻名。这提供了许多库和组件,用于简化不同的任务,包括创建图形和显示。NetworkX代表了一个高效的Python工具包,用于构建、更改和研究复杂网络的排列、移动和操作。然而,Matplotlib是一个流行的工具包,用于在Python中创建静......
  • python数据分析师入门-学习笔记(爬虫-序言)
    爬虫到底是什么概括爬虫是批量化自动获取既有数据批量化自动既有数据通常获取既有数据特殊批量注册一批账号批量去领取优惠券批量自动下单购物自动做任务(签到)实际应用企业中:竞品调研数据采集办公自动化个人:比如看小说有的网站收费有的网站不收费......
  • python爬虫基础
    前言都说爬虫简介1、首先我们需要知道爬虫是什么?爬虫实际上是一段程序,我们可以通过这段程序从互联网上获取到我们想要的数据,这里还有另外一种解释是我们使用程序来模拟浏览器向服务器发送请求,来获取响应信息2、爬虫的核心:(1)、爬取网页:爬取整个网页,包含网页中的所有内容(2)、解......
  • opencv-python 卷积操作
    1图像卷积图像卷积就是卷积核在图像上按行滑动遍历像素时不断的相乘求和的过程,卷积可以用来提取特征,去噪,平滑等。如下图: ......
  • opencv-python霍夫变换
    1霍夫线检测原理霍夫变换常用来提取图像中的直线和圆等几何形状。在笛卡尔坐标系中,直线可以表示为y=kx+q 也就是说通过变量k,q可以确定一条直线,把直线写成关于k,q的函数,进行空间转换,转换后的空间称为霍夫空间。也就是说:笛卡尔坐标系中的一条线对应了霍夫空间的一个点。反过......
  • python argparse—用于命令行选项、参数和子命令的解析器
    参考:https://docs.python.org/3/library/argparse.htmlargparse.ArgumentParser:创建Parser对象语法格式class argparse.ArgumentParser(prog=None, usage=None, description=None, epilog=None, parents=[], formatter_class=argparse.HelpFormatter, prefix_chars='-......
  • 【es】elasticsearch生产数据备份和恢复方案
    https://huaweicloud.csdn.net/637f7ae4dacf622b8df859b1.html?spm=1001.2101.3001.6650.2&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Eactivity-2-119985243-blog-126509989.235%5Ev38%5Epc_relevant_sort&depth_1-utm_source=distr......