引言
RAG(检索增强生成)应用的性能很大程度上取决于文档处理、分割策略和检索方法的优化。本文将系统地介绍 RAG 应用的各个环节优化策略,帮助开发者构建更高效的 RAG 系统。
文档预处理优化
非分割类型的文档转换器
1. 问答转换器(QA Transformer)
问答转换器可以将文档转换为问答对的形式,这种转换可以显著提升检索质量:
from langchain.document_transformers import QATransformer
class CustomQATransformer:
def __init__(self, llm):
self.llm = llm
self.qa_template = """
基于以下内容生成 3-5 个问答对:
{text}
格式要求:
Q1: 问题1
A1: 答案1
Q2: 问题2
A2: 答案2
"""
def transform_documents(self, documents):
qa_pairs = []
for doc in documents:
# 使用 LLM 生成问答对
response = self.llm(self.qa_template.format(text=doc.page_content))
# 解析问答对
pairs = self._parse_qa_pairs(response)
qa_pairs.extend(pairs)
return qa_pairs
def _parse_qa_pairs(self, text):
# 解析逻辑
pairs = []
lines = text.split('\n')
current_q = None
for line in lines:
if line.startswith('Q'):
current_q = line[line.find(':')+1:].strip()
elif line.startswith('A') and current_q:
answer = line[line.find(':')+1:].strip()
pairs.append({
'question': current_q,
'answer': answer
})
current_q = None
return pairs
优化技巧:
- 使用模板提示词引导 LLM 生成高质量问答对
- 实现批处理以提高效率
- 添加缓存机制避免重复转换
- 考虑领域特定的问题类型
2. 翻译转换器(Translation Transformer)
对于多语言 RAG 应用,翻译转换器可以确保文档在统一的语言环境下被索引和检索:
from langchain.document_transformers import TranslationTransformer
class EnhancedTranslationTransformer:
def __init__(self, translator_model, source_lang=None, target_lang="en"):
self.translator = translator_model
self.source_lang = source_lang
self.target_lang = target_lang
self.cache = {}
def transform_documents(self, documents):
translated_docs = []
batch_size = 5 # 批处理大小
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
translated_batch = self._translate_batch(batch)
translated_docs.extend(translated_batch)
return translated_docs
def _translate_batch(self, documents):
translated = []
for doc in documents:
cache_key = f"{doc.page_content}_{self.target_lang}"
if cache_key in self.cache:
translated.append(self.cache[cache_key])
continue
translated_content = self.translator(
doc.page_content,
source_lang=self.source_lang,
target_lang=self.target_lang
)
new_doc = Document(
page_content=translated_content,
metadata={
**doc.metadata,
"original_language": self.source_lang,
"translated_language": self.target_lang
}
)
self.cache[cache_key] = new_doc
translated.append(new_doc)
return translated
优化技巧:
- 实现语言检测功能
- 使用批量翻译提高效率
- 保留原文作为元数据
- 实现翻译质量检查机制
文档分割策略优化
最佳实践
- 基于语义的分割
class SemanticSplitter:
def __init__(self, embedding_model, min_similarity=0.7):
self.embedding_model = embedding_model
self.min_similarity = min_similarity
def split_documents(self, documents):
chunks = []
for doc in documents:
# 初始分割
initial_chunks = self._initial_split(doc)
# 合并相似块
merged_chunks = self._merge_similar_chunks(initial_chunks)
chunks.extend(merged_chunks)
return chunks
def _merge_similar_chunks(self, chunks):
merged = []
current_chunk = chunks[0]
for next_chunk in chunks[1:]:
similarity = self._calculate_similarity(current_chunk, next_chunk)
if similarity >= self.min_similarity:
current_chunk = self._merge_chunks(current_chunk, next_chunk)
else:
merged.append(current_chunk)
current_chunk = next_chunk
merged.append(current_chunk)
return merged
- 上下文感知分割
class ContextAwareSplitter:
def __init__(self, chunk_size=1000, chunk_overlap=200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.context_markers = {
'start': ['# ', '## ', '### ', 'Chapter', 'Section'],
'end': ['\n\n', '\n---', '\n###']
}
def split_text(self, text):
chunks = []
current_chunk = ""
current_context = None
for line in text.split('\n'):
# 检测新的上下文
new_context = self._detect_context(line)
if new_context:
if current_chunk:
chunks.append({
'content': current_chunk.strip(),
'context': current_context
})
current_chunk = line
current_context = new_context
else:
if len(current_chunk) + len(line) > self.chunk_size:
chunks.append({
'content': current_chunk.strip(),
'context': current_context
})
current_chunk = line
else:
current_chunk += '\n' + line
if current_chunk:
chunks.append({
'content': current_chunk.strip(),
'context': current_context
})
return chunks
分割策略选择指南
-
文档类型导向
- 结构化文档:基于标记分割
- 非结构化文档:语义分割
- 代码文档:基于函数/类分割
-
长度控制
- 考虑 LLM 上下文窗口大小
- 保持语义完整性
- 适当的重叠确保连贯性
-
特殊处理
- 表格:保持行完整性
- 列表:维持条目关系
- 代码:保持函数完整性
检索优化策略
1. 向量存储优化
class OptimizedVectorStore:
def __init__(self, base_store):
self.store = base_store
self.cache = LRUCache(maxsize=1000)
self.metadata_index = {}
def add_documents(self, documents):
# 批量处理
batch_size = 100
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
self._process_batch(batch)
def _process_batch(self, documents):
# 预处理
processed_docs = self._preprocess_documents(documents)
# 更新元数据索引
self._update_metadata_index(processed_docs)
# 添加到向量存储
self.store.add_documents(processed_docs)
def similarity_search(self, query, k=4, **kwargs):
cache_key = f"{query}_{k}_{json.dumps(kwargs)}"
if cache_key in self.cache:
return self.cache[cache_key]
results = self.store.similarity_search(query, k=k, **kwargs)
self.cache[cache_key] = results
return results
2. 混合检索策略
class HybridSearchRetriever:
def __init__(self, vector_store, keyword_index, weights=(0.7, 0.3)):
self.vector_store = vector_store
self.keyword_index = keyword_index
self.weights = weights
def get_relevant_documents(self, query):
# 向量检索
vector_results = self.vector_store.similarity_search(query, k=10)
# 关键词检索
keyword_results = self.keyword_index.search(query, k=10)
# 合并结果
combined_results = self._merge_results(
vector_results,
keyword_results,
self.weights
)
return combined_results[:5]
def _merge_results(self, vector_results, keyword_results, weights):
# 实现结果合并逻辑
scored_results = {}
for doc in vector_results:
scored_results[doc.id] = {
'doc': doc,
'score': weights[0] * doc.similarity
}
for doc in keyword_results:
if doc.id in scored_results:
scored_results[doc.id]['score'] += weights[1] * doc.score
else:
scored_results[doc.id] = {
'doc': doc,
'score': weights[1] * doc.score
}
# 排序并返回结果
sorted_results = sorted(
scored_results.values(),
key=lambda x: x['score'],
reverse=True
)
return [item['doc'] for item in sorted_results]
3. 上下文感知检索
class ContextualRetriever:
def __init__(self, base_retriever):
self.retriever = base_retriever
self.conversation_history = []
def get_relevant_documents(self, query):
# 构建增强查询
enhanced_query = self._build_contextual_query(query)
# 获取结果
results = self.retriever.get_relevant_documents(enhanced_query)
# 更新历史
self.conversation_history.append({
'query': query,
'results': results
})
return results
def _build_contextual_query(self, query):
if not self.conversation_history:
return query
recent_context = self.conversation_history[-3:] # 最近3次对话
context_text = "\n".join([
f"Q: {item['query']}"
for item in recent_context
])
return f"""
Context: {context_text}
Current question: {query}
"""
性能优化建议
-
缓存策略
- 实现多级缓存
- 缓存常见查询结果
- 定期更新缓存
-
批处理优化
- 文档处理批量化
- 向量计算批处理
- 检索请求合并
-
索引优化
- 构建元数据索引
- 实现增量更新
- 定期重建索引
-
资源管理
- 内存使用监控
- 连接池管理
- 异步处理
监控与评估
-
性能指标
- 响应时间
- 检索准确率
- 资源使用率
-
质量评估
- 相关性评分
- 用户反馈分析
- A/B 测试
class RAGMetrics:
def __init__(self):
self.metrics = {
'response_times': [],
'retrieval_accuracy': [],
'user_feedback': []
}
def log_metric(self, metric_type, value):
if metric_type in self.metrics:
self.metrics[metric_type].append({
'value': value,
'timestamp': datetime.now()
})
def get_statistics(self, metric_type, time_range=None):
if metric_type not in self.metrics:
return None
data = self.metrics[metric_type]
if time_range:
data = [
d for d in data
if d['timestamp'] > datetime.now() - time_range
]
values = [d['value'] for d in data]
return {
'mean': statistics.mean(values),
'median': statistics.median(values),
'std': statistics.stdev(values) if len(values) > 1 else 0,
'count': len(values)
}
结论
RAG 应用的优化是一个持续的过程,需要在文档处理、分割策略和检索方法等多个方面进行综合优化。通过采用适当的策略和持续的监控评估,可以显著提升 RAG 应用的性能和用户体验。关键是要根据具体应用场景和需求,选择合适的优化策略,并不断进行调整和改进。
标签:检索,RAG,doc,self,results,current,文档,chunk,def From: https://www.cnblogs.com/muzinan110/p/18541397