- elasticsearch 是什么?
elasticsearch 简称 es,是一个开源的分布式搜索引擎,可以用来实现搜索、日志统计、分析、系统监控等功能。
- 安装
1、下载
2、 解压
tar zxvf elasticsearch-8.13.0-linux-x86_64.tar.gz -C /usr/local/
3、解决JDK依赖问题
新版本的 es 压缩包中包含自带的jdk
,如果当前 Linux 环境中已经安装了 jdk,启动 es 时默认找的是已经装好的jdk,此时可能会由于 jdk 版本不一致而报错!
进入 bin 目录下修改 elasticsearch 配置
# 将jdk修改为es中自带jdk的配置目录
export JAVA_HOME=/usr/local/elasticsearch-7.13.2/jdk
export PATH=$JAVA_HOME/bin:$PATH
if [ -x "$JAVA_HOME/bin/java" ]; then
JAVA="/usr/local/elasticsearch-8.13.0/jdk/bin/java"
else
JAVA=`which java`
fi
4、创建 es 用户
注意root用户不能启动!
#创建用户
useradd user-es
#递归设置 es 目录用户及用户组
chown user-es:user-es -R /usr/local/elasticsearch-8.13.0
#切换用户
su user-es
#进入bin目录
cd /usr/local/elasticsearch-8.13.0/bin
#启动elasticsearch
./elasticsearch
本地访问http://ip:9200
进行测试,默认会需要输入密码
如需取消登录密码,将 /usr/local/elasticsearch-8.13.0/config/elasticsearch.yml
文件中 xpack.security.enabled
参数修改为 false
。
修改日志路径、端口号等信息文件:
/usr/local/elasticsearch-7.13.2/config/elasticsearch.yml
补充:以下问题可能会遇到
- es 内存不足问题
vim /usr/local/elasticsearch-8.13.0/config/jvm.options
## -Xms4g
## -Xmx4g
修改为(注意顶格,不要留空格):
-Xms1g
-Xmx1g
- 使用 Python 操作 es
目的:将某一目录下的大量文件全部导入到 es,之后对某个文件进行查找操作
- 版本一
import os
from datetime import datetime
from elasticsearch import Elasticsearch
#连接到 es
es = Elasticsearch(['http://ip:port'])
file_index = "test1"
directory = "/mnt/dir1"
for root, dirs, files in os.walk(directory):
for filename in files:
file_path = os.path.join(root, filename)
if not es.exists(index=file_index, id=file_path):
file_info = {
"file_path": file_path,
"file_size": os.path.getsize(file_path),
"created_at": datetime.fromtimestamp(os.path.getctime(file_path)),
}
#插入文件信息
es.index(index=file_index, id=file_path, body=file_info)
else:
#print(f"Document with id '{file_path}' already exists in the index.")
pass
#查找
file_name = "file2"
query = {
"query": {
"match": {
"file_path": file_name
}
}
}
result = es.search(index="test1", body=query)
for hit in result['hits']['hits']:
print(hit['_source'])
#es.indices.delete(index=file_index)
实现了基本功能,但效率极低
- 版本二
一条一条将文件信息插入到 es 效率太低,可以将数据打包好再发给 es,使用 es 的 bluk 库一并提交
bulk指令用于批量添加、更新或删除文档。这个指令允许用户在一个请求中提交多个操作
import os
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
#连接到 es
es = Elasticsearch(['http://ip:port'])
file_index = "test1"
directory = "/mnt/dir1"
data = []
for root, dirs, files in os.walk(directory):
for filename in files:
file_path = os.path.join(root, filename)
if not es.exists(index=file_index, id=file_path):
file_info = {
"file_path": file_path,
"file_size": os.path.getsize(file_path),
"created_at": datetime.fromtimestamp(os.path.getctime(file_path)),
}
#打包数据
data.append(file_info)
else:
#print(f"Document with id '{file_path}' already exists in the index.")
pass
#批量提交
success, failed = bulk(es, data)
print(f"成功写入文档数: {success}, 失败文档数: {failed}")
#查找
file_name = "file2"
query = {
"query": {
"match": {
"file_path": file_name
}
}
}
result = es.search(index="test1", body=query)
for hit in result['hits']['hits']:
print(hit['_source'])
#es.indices.delete(index=file_index)
- 版本四
分片 + 线程池 + bulk
import os
import concurrent.futures
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
# Elasticsearch连接配置
es = Elasticsearch(['http://ip:port'])
files_index = "test4"
directory = "/mnt/dir1"
# 准备一个函数用于将文件信息导入到Elasticsearch中
def index_files_bulk(file_paths):
actions = []
for file_path in file_paths:
file_info = {
'file_name': os.path.basename(file_path),
'file_path': file_path,
# 这里可以根据需要添加更多的文件信息
}
# 准备批量操作
doc_id = hash(file_path) # 以文件路径的哈希值作为文档ID
action = {
'_op_type': 'update',
'_index': files_index,
'_id': doc_id,
'doc': file_info,
'doc_as_upsert': True # 如果文档不存在则插入
}
actions.append(action)
# 使用批量操作
bulk(es, actions)
def index_files_parallel(directory):
file_paths = []
for root, dirs, files in os.walk(directory):
for file in files:
file_paths.append(os.path.join(root, file))
# 使用多线程处理文件信息导入
with concurrent.futures.ThreadPoolExecutor() as executor:
# 将文件路径切分成小块,每个线程处理一部分
chunk_size = 1000 # 每个线程处理的文件数量
for i in range(0, len(file_paths), chunk_size):
chunk = file_paths[i:i + chunk_size]
executor.submit(index_files_bulk, chunk)
# 调用函数并行导入文件信息到Elasticsearch中
#index_files_parallel(directory)
#查找
file_name = "file4"
query = {
"query": {
"match": {
"file_path": file_name
}
}
}
result = es.search(index="test1", body=query)
for hit in result['hits']['hits']:
print(hit['_source'])
#es.indices.delete(index=file_index)
未完。。。。
标签:index,python,elasticsearch,file,linux,path,os,es From: https://www.cnblogs.com/itsfei/p/18121695