1、GraphRAG是什么?
GraphRAG(Graph-based Relation-Aware Grouping)是一种基于图的关系感知分组方法,通常用于计算机视觉和机器学习领域。它的核心思想是利用图结构来表示和处理实体之间的关系,从而更有效地进行分组和识别任务。
2、本地部署
在autodl上进行运行,4090单显卡+24GB内存+pytorch框架2.3.0+python3.12
2.1创建虚拟环境
创建两个环境ollama和graphrag,避免环境产生冲突
conda create -n ollama python==3.11
conda create -n graphrag python==3.11
第一次进入环境可能会出现CondaError: Run 'conda init' before 'conda activate',具体解释见vscode虚拟环境-CSDN博客
conda init
#关闭终端再重新打开来激活环境
2.2 ollama及工具包的安装
首先进入ollama环境安装ollama
conda activate ollama
pip install --user ollama -i https://pypi.mirrors.ustc.edu.cn/simple/
还要安装一下ollama工具包,官网给出的方法是:
curl -fsSL https://ollama.com/install.sh | sh
开梯子尝试安装显示网络连接异常,安装失败(也可以自行尝试,有几率能够成功),参考进行【AI基础】租用云GPU之autoDL部署大模型ollama+llama3_autodl ollama-CSDN博客进行安装。
首先使用autodl自带的学术资源加速AutoDL帮助文档,
source /etc/network_turbo
执行命令
curl -fsSL https://ollama.com/install.sh | sh
可能直接安装成功,也可能会显示Warning,要求安装一些依赖包,按照要求进行安装和更新即可。
#示例
#WARNING: Unable to detect NVIDIA/AMD GPU.Install lspci or lshw to automatically detect and install GPU dependencies,
sudo apt-get update
sudo apt-get install lshw
然后重新执行安装命令即可。
ollama安装成功后默认以服务形式运行,可运行以下命令检查状态:
systemctl status ollama.service
若成功,显示:
ollama.service - Ollama Service
Loaded: loaded (/etc/systemd/system/ollama.service, enabled)
Active: activate (running)
若出现错误“System has not been booted with systemd as init system (PID 1). Can't operate.
Failed to connect to bus: Host is down”,说明systemctl 命令异常,需要安装 systemd和systemctl,使用命令:
apt-get install systemd -y
apt-get install systemctl -y
安装成功后,可重新运行systemctl,检查状态。
2.3 安装graphrag
新建一个终端,进入graphrag环境,安装graphrag
conda activate graphrag
pip install --user graphrag -i https://pypi.mirrors.ustc.edu.cn/simple/
然后创建一下工作目录
mkdir -p ./ragtest/input
#ragtest为大文件名,input为放语料的文件名,可自行修改,后面运行时也要注意修改
将自己的语料放在input下面,文件格式为utf-8编码的txt文件。
初始化工作目录,生成setting.yaml、
python -m graphrag.index --init --root ./ragtest
2.4 修改.env内容如下
GRAPHRAG_API_KEY=ollama
GRAPHRAG_CLAIM_EXTRACTION_ENABLED=True
.env文件一般隐藏,可使用vim命令进入编辑,在vim中,按下i键进入INSERT模式,这时可以编辑文件内容,修改完毕后ESC退出插入格式,输入:wq并按回车保存更改并退出vim。
vim .env #进入.env
i #进入编辑模式
esc #修改完成后esc退出insert模式
:wq #保存修改内容并退出
2.5 setting.yaml修改
主要修改为model:qwen2和model: nomic-embed-text,按照自己选择的model和embedding进行修改即可。
encoding_model: cl100k_base
skip_workflows: []
llm:
api_key: ollama
type: openai_chat # or azure_openai_chat
model: qwen2
model_supports_json: true # recommended if this is available for your model.
# max_tokens: 4000
# request_timeout: 180.0
api_base: http://localhost:11434/v1
# api_version: 2024-02-15-preview
# organization: <organization_id>
# deployment_name: <azure_model_deployment_name>
# tokens_per_minute: 150_000 # set a leaky bucket throttle
# requests_per_minute: 10_000 # set a leaky bucket throttle
# max_retries: 10
# max_retry_wait: 10.0
# sleep_on_rate_limit_recommendation: true # whether to sleep when azure suggests wait-times
# concurrent_requests: 25 # the number of parallel inflight requests that may be made
parallelization:
stagger: 0.3
# num_threads: 50 # the number of threads to use for parallel processing
async_mode: threaded # or asyncio
embeddings:
## parallelization: override the global parallelization settings for embeddings
async_mode: threaded # or asyncio
llm:
api_key: ollama
type: openai_embedding # or azure_openai_embedding
model: nomic-embed-text
api_base: http://localhost:11434/api
# api_version: 2024-02-15-preview
# organization: <organization_id>
# deployment_name: <azure_model_deployment_name>
# tokens_per_minute: 150_000 # set a leaky bucket throttle
# requests_per_minute: 10_000 # set a leaky bucket throttle
# max_retries: 10
# max_retry_wait: 10.0
# sleep_on_rate_limit_recommendation: true # whether to sleep when azure suggests wait-times
# concurrent_requests: 25 # the number of parallel inflight requests that may be made
# batch_size: 16 # the number of documents to send in a single request
# batch_max_tokens: 8191 # the maximum number of tokens to send in a single request
# target: required # or optional
chunks:
size: 200
overlap: 100
group_by_columns: [id] # by default, we don't allow chunks to cross documents
input:
type: file # or blob
file_type: text # or csv
base_dir: "input"
file_encoding: utf-8
file_pattern: ".*\\.txt$"
cache:
type: file # or blob
base_dir: "cache"
# connection_string: <azure_blob_storage_connection_string>
# container_name: <azure_blob_storage_container_name>
storage:
type: file # or blob
base_dir: "output/${timestamp}/artifacts"
# connection_string: <azure_blob_storage_connection_string>
# container_name: <azure_blob_storage_container_name>
reporting:
type: file # or console, blob
base_dir: "output/${timestamp}/reports"
# connection_string: <azure_blob_storage_connection_string>
# container_name: <azure_blob_storage_container_name>
entity_extraction:
## llm: override the global llm settings for this task
## parallelization: override the global parallelization settings for this task
## async_mode: override the global async_mode settings for this task
prompt: "prompts/entity_extraction.txt"
entity_types: [organization,person,geo,event]
max_gleanings: 0
summarize_descriptions:
## llm: override the global llm settings for this task
## parallelization: override the global parallelization settings for this task
## async_mode: override the global async_mode settings for this task
prompt: "prompts/summarize_descriptions.txt"
max_length: 500
claim_extraction:
## llm: override the global llm settings for this task
## parallelization: override the global parallelization settings for this task
## async_mode: override the global async_mode settings for this task
# enabled: true
prompt: "prompts/claim_extraction.txt"
description: "Any claims or facts that could be relevant to information discovery."
max_gleanings: 0
community_report:
## llm: override the global llm settings for this task
## parallelization: override the global parallelization settings for this task
## async_mode: override the global async_mode settings for this task
prompt: "prompts/community_report.txt"
max_length: 2000
max_input_length: 8000
cluster_graph:
max_cluster_size: 10
embed_graph:
enabled: false # if true, will generate node2vec embeddings for nodes
# num_walks: 10
# walk_length: 40
# window_size: 2
# iterations: 3
# random_seed: 597832
umap:
enabled: false # if true, will generate UMAP embeddings for nodes
snapshots:
graphml: yes
raw_entities: yes
top_level_nodes: yes
local_search:
# text_unit_prop: 0.5
# community_prop: 0.1
# conversation_history_max_turns: 5
# top_k_mapped_entities: 10
# top_k_relationships: 10
# max_tokens: 12000
global_search:
# max_tokens: 12000
# data_max_tokens: 12000
# map_max_tokens: 1000
# reduce_max_tokens: 2000
# concurrency: 32
2.6 中文语料库,需要对官方的部分代码进行修改
文件位置按照自己安装的位置进行修改,可使用find进行文件位置检索。
find / -name "文件名"
(1) 修改文件/root/miniconda3/envs/graphrag/lib/python3.11/site-packages/graphrag/llm/openai/openai_embeddings_llm.py内容,调用ollama服务
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
"""The EmbeddingsLLM class."""
from typing_extensions import Unpack
import ollama
from graphrag.llm.base import BaseLLM
from graphrag.llm.types import (
EmbeddingInput,
EmbeddingOutput,
LLMInput,
)
from .openai_configuration import OpenAIConfiguration
from .types import OpenAIClientTypes
class OpenAIEmbeddingsLLM(BaseLLM[EmbeddingInput, EmbeddingOutput]):
"""A text-embedding generator LLM."""
_client: OpenAIClientTypes
_configuration: OpenAIConfiguration
def __init__(self, client: OpenAIClientTypes, configuration: OpenAIConfiguration):
self.client = client
self.configuration = configuration
async def _execute_llm(
self, input: EmbeddingInput, **kwargs: Unpack[LLMInput]
) -> EmbeddingOutput | None:
args = {
"model": self.configuration.model,
**(kwargs.get("model_parameters") or {}),
}
'''
embedding = await self.client.embeddings.create(
input=input,
**args,
)
return [d.embedding for d in embedding.data]
'''
embedding_list = []
for inp in input:
#model="nomic-embed-text"根据自己的embedding模型进行修改
embedding = ollama.embeddings(model="nomic-embed-text",prompt=inp)
embedding_list.append(embedding["embedding"])
return embedding_list
(2)修改文件/root/miniconda3/envs/graphrag/lib/python3.11/site-packages/graphrag/query/llm/oai/embedding.py , 调用ollama提供的模型服务
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
"""OpenAI Embedding model implementation."""
import asyncio
from collections.abc import Callable
from typing import Any
import ollama
import numpy as np
import tiktoken
from tenacity import (
AsyncRetrying,
RetryError,
Retrying,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from graphrag.query.llm.base import BaseTextEmbedding
from graphrag.query.llm.oai.base import OpenAILLMImpl
from graphrag.query.llm.oai.typing import (
OPENAI_RETRY_ERROR_TYPES,
OpenaiApiType,
)
from graphrag.query.llm.text_utils import chunk_text
from graphrag.query.progress import StatusReporter
class OpenAIEmbedding(BaseTextEmbedding, OpenAILLMImpl):
"""Wrapper for OpenAI Embedding models."""
def __init__(
self,
api_key: str | None = None,
azure_ad_token_provider: Callable | None = None,
model: str = "text-embedding-3-small",
deployment_name: str | None = None,
api_base: str | None = None,
api_version: str | None = None,
api_type: OpenaiApiType = OpenaiApiType.OpenAI,
organization: str | None = None,
encoding_name: str = "cl100k_base",
max_tokens: int = 8191,
max_retries: int = 10,
request_timeout: float = 180.0,
retry_error_types: tuple[type[BaseException]] = OPENAI_RETRY_ERROR_TYPES, # type: ignore
reporter: StatusReporter | None = None,
):
OpenAILLMImpl.__init__(
self=self,
api_key=api_key,
azure_ad_token_provider=azure_ad_token_provider,
deployment_name=deployment_name,
api_base=api_base,
api_version=api_version,
api_type=api_type, # type: ignore
organization=organization,
max_retries=max_retries,
request_timeout=request_timeout,
reporter=reporter,
)
self.model = model
self.encoding_name = encoding_name
self.max_tokens = max_tokens
self.token_encoder = tiktoken.get_encoding(self.encoding_name)
self.retry_error_types = retry_error_types
def embed(self, text: str, **kwargs: Any) -> list[float]:
"""
Embed text using OpenAI Embedding's sync function.
For text longer than max_tokens, chunk texts into max_tokens, embed each chunk, then combine using weighted average.
Please refer to: https://github.com/openai/openai-cookbook/blob/main/examples/Embedding_long_inputs.ipynb
"""
token_chunks = chunk_text(
text=text, token_encoder=self.token_encoder, max_tokens=self.max_tokens
)
chunk_embeddings = []
chunk_lens = []
for chunk in token_chunks:
try:
#embedding, chunk_len = self._embed_with_retry(chunk, **kwargs)
#model="nomic-embed-text"根据自己的embedding模型进行修改
embedding = ollama.embeddings(model='nomic-embed-text', prompt=chunk)['embedding']
chunk_embeddings.append(embedding)
chunk_lens.append(chunk_len)
# TODO: catch a more specific exception
except Exception as e: # noqa BLE001
self._reporter.error(
message="Error embedding chunk",
details={self.__class__.__name__: str(e)},
)
continue
# chunk_embeddings = np.average(chunk_embeddings, axis=0, weights=chunk_lens)
# chunk_embeddings = chunk_embeddings / np.linalg.norm(chunk_embeddings)
# return chunk_embeddings.tolist()
return chunk_embeddings
async def aembed(self, text: str, **kwargs: Any) -> list[float]:
"""
Embed text using OpenAI Embedding's async function.
For text longer than max_tokens, chunk texts into max_tokens, embed each chunk, then combine using weighted average.
"""
token_chunks = chunk_text(
text=text, token_encoder=self.token_encoder, max_tokens=self.max_tokens
)
chunk_embeddings = []
chunk_lens = []
embedding_results = await asyncio.gather(*[
self._aembed_with_retry(chunk, **kwargs) for chunk in token_chunks
])
embedding_results = [result for result in embedding_results if result[0]]
chunk_embeddings = [result[0] for result in embedding_results]
chunk_lens = [result[1] for result in embedding_results]
# chunk_embeddings = np.average(chunk_embeddings, axis=0, weights=chunk_lens) # type: ignore
# chunk_embeddings = chunk_embeddings / np.linalg.norm(chunk_embeddings)
# return chunk_embeddings.tolist()
return chunk_embeddings
def _embed_with_retry(
self, text: str | tuple, **kwargs: Any
) -> tuple[list[float], int]:
try:
retryer = Retrying(
stop=stop_after_attempt(self.max_retries),
wait=wait_exponential_jitter(max=10),
reraise=True,
retry=retry_if_exception_type(self.retry_error_types),
)
for attempt in retryer:
with attempt:
embedding = (
self.sync_client.embeddings.create( # type: ignore
input=text,
model=self.model,
**kwargs, # type: ignore
)
.data[0]
.embedding
or []
)
return (embedding, len(text))
except RetryError as e:
self._reporter.error(
message="Error at embed_with_retry()",
details={self.__class__.__name__: str(e)},
)
return ([], 0)
else:
# TODO: why not just throw in this case?
return ([], 0)
async def _aembed_with_retry(
self, text: str | tuple, **kwargs: Any
) -> tuple[list[float], int]:
try:
retryer = AsyncRetrying(
stop=stop_after_attempt(self.max_retries),
wait=wait_exponential_jitter(max=10),
reraise=True,
retry=retry_if_exception_type(self.retry_error_types),
)
async for attempt in retryer:
with attempt:
embedding = (
await self.async_client.embeddings.create( # type: ignore
input=text,
model=self.model,
**kwargs, # type: ignore
)
).data[0].embedding or []
return (embedding, len(text))
except RetryError as e:
self._reporter.error(
message="Error at embed_with_retry()",
details={self.__class__.__name__: str(e)},
)
return ([], 0)
else:
# TODO: why not just throw in this case?
return ([], 0)
(3)修改文件/root/miniconda3/envs/graphrag/lib/python3.11/site-packages/graphrag/query/llm/text_utils.py里关于chunk_text()函数的定义
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
"""Text Utilities for LLM."""
from collections.abc import Iterator
from itertools import islice
import tiktoken
def num_tokens(text: str, token_encoder: tiktoken.Encoding | None = None) -> int:
"""Return the number of tokens in the given text."""
if token_encoder is None:
token_encoder = tiktoken.get_encoding("cl100k_base")
return len(token_encoder.encode(text)) # type: ignore
def batched(iterable: Iterator, n: int):
"""
Batch data into tuples of length n. The last batch may be shorter.
Taken from Python's cookbook: https://docs.python.org/3/library/itertools.html#itertools.batched
"""
# batched('ABCDEFG', 3) --> ABC DEF G
if n < 1:
value_error = "n must be at least one"
raise ValueError(value_error)
it = iter(iterable)
while batch := tuple(islice(it, n)):
yield batch
def chunk_text(
text: str, max_tokens: int, token_encoder: tiktoken.Encoding | None = None
):
"""Chunk text by token length."""
if token_encoder is None:
token_encoder = tiktoken.get_encoding("cl100k_base")
tokens = token_encoder.encode(text) # type: ignore
tokens = token_encoder.decode(tokens) # 将tokens解码成字符串
chunk_iterator = batched(iter(tokens), max_tokens)
yield from chunk_iterator
2.7 ollama下载模型
在ollama环境下开启ollama serve,保持该窗口,后续过程都不要关闭!
ollama serve
打开一个新的窗口,再次进入ollama环境中,拉取模型和embedding。可在ollama官网library (ollama.com)中查看自己所需的模型是否可拉取:
ollama pull qwen2:7b
ollama pull mxbai-embed-large
pull完可以输入ollama list查看模型服务,也可以通过ollama rm -modelname删除指定模型。
pull之后,serve的窗口不要关闭。
2.8 建立GraphRAG.index
在graphrag环境中建立索引,等待建立完成即可
python -m graphrag.index --root ./ragtest
成功建立后出现All workflows completed successfully.
若报错,可查看ragtest/output/20241024-104731/reports/indexing-engine.log最后的报错看具体是什么问题,根据报错进行修改即可。
2.9 查询
有global查询和local查询两种查询方式
(1) global查询
python -m graphrag.query \
--root ./ragtest \
--method global \
"问题内容"
(2) local查询
python -m graphrag.query \
--root ./ragtest \
--method local \
"问题内容"
标签:embedding,GraphRAG,text,流程,self,本地,max,ollama,chunk
From: https://blog.csdn.net/qq_65509025/article/details/143431389