核心要点
- 掌握 LLM 应用中的并行处理策略
- 实现高效的批量处理机制
- 构建可扩展的文档处理系统
- 优化系统性能和资源利用
并行处理的适用场景
在 LLM 应用中,以下场景特别适合使用并行处理:
- 批量文档处理
- 多模型并行推理
- 大规模数据分析
- 实时流处理
批处理策略设计
1. 基础架构
from typing import List, Dict, Any
from dataclasses import dataclass
import asyncio
from langchain.chat_models import ChatOpenAI
from langchain.callbacks import AsyncCallbackHandler
@dataclass
class BatchConfig:
"""批处理配置"""
batch_size: int = 5
max_concurrent_tasks: int = 3
timeout_seconds: int = 30
retry_attempts: int = 2
class BatchProcessor:
def __init__(self, config: BatchConfig):
self.config = config
self.llm = ChatOpenAI(
temperature=0,
request_timeout=config.timeout_seconds
)
self.semaphore = asyncio.Semaphore(
config.max_concurrent_tasks
)
async def process_batch(
self,
items: List[Any]
) -> List[Dict]:
"""批量处理主函数"""
batches = self._create_batches(items)
results = []
for batch in batches:
batch_results = await self._process_batch_with_semaphore(
batch
)
results.extend(batch_results)
return results
2. 异步处理实现
class AsyncBatchProcessor(BatchProcessor):
async def _process_single_item(
self,
item: Any
) -> Dict:
"""处理单个项目"""
async with self.semaphore:
for attempt in range(self.config.retry_attempts):
try:
return await self._execute_processing(item)
except Exception as e:
if attempt == self.config.retry_attempts - 1:
return self._create_error_response(item, e)
await asyncio.sleep(2 ** attempt)
async def _execute_processing(
self,
item: Any
) -> Dict:
"""执行具体的处理逻辑"""
task = asyncio.create_task(
self.llm.agenerate([item])
)
try:
result = await asyncio.wait_for(
task,
timeout=self.config.timeout_seconds
)
return {
"status": "success",
"input": item,
"result": result
}
except asyncio.TimeoutError:
task.cancel()
raise
实战案例:批量文档处理系统
1. 系统架构
class DocumentBatchProcessor:
def __init__(self):
self.config = BatchConfig(
batch_size=10,
max_concurrent_tasks=5
)
self.processor = AsyncBatchProcessor(self.config)
self.results_manager = ResultsManager()
async def process_documents(
self,
documents: List[str]
) -> Dict:
"""处理文档批次"""
try:
preprocessed = await self._preprocess_documents(
documents
)
results = await self.processor.process_batch(
preprocessed
)
return await self.results_manager.merge_results(
results
)
except Exception as e:
return self._handle_batch_error(e, documents)
2. 资源控制机制
class ResourceController:
def __init__(self):
self.token_limit = 4096
self.request_limit = 100
self._request_count = 0
self._token_count = 0
self._reset_time = None
async def check_limits(self) -> bool:
"""检查资源限制"""
await self._update_counters()
return (
self._request_count < self.request_limit and
self._token_count < self.token_limit
)
async def track_usage(
self,
tokens_used: int
):
"""跟踪资源使用"""
self._token_count += tokens_used
self._request_count += 1
async def wait_if_needed(self):
"""必要时等待资源释放"""
if not await self.check_limits():
wait_time = self._calculate_wait_time()
await asyncio.sleep(wait_time)
3. 结果合并策略
class ResultsManager:
def __init__(self):
self.merge_strategies = {
"text": self._merge_text_results,
"embeddings": self._merge_embedding_results,
"classifications": self._merge_classification_results
}
async def merge_results(
self,
results: List[Dict]
) -> Dict:
"""合并处理结果"""
merged = {
"success_count": 0,
"error_count": 0,
"results": []
}
for result in results:
if result["status"] == "success":
merged["success_count"] += 1
merged["results"].append(
await self._process_result(result)
)
else:
merged["error_count"] += 1
return merged
性能优化指南
1. 内存管理
class MemoryManager:
def __init__(self, max_memory_mb: int = 1024):
self.max_memory = max_memory_mb * 1024 * 1024
self.current_usage = 0
async def monitor_memory(self):
"""监控内存使用"""
import psutil
process = psutil.Process()
memory_info = process.memory_info()
if memory_info.rss > self.max_memory:
await self._trigger_memory_cleanup()
async def _trigger_memory_cleanup(self):
"""触发内存清理"""
import gc
gc.collect()
2. 性能监控
class PerformanceMonitor:
def __init__(self):
self.metrics = {
"processing_times": [],
"error_rates": [],
"throughput": []
}
async def record_metrics(
self,
batch_size: int,
duration: float,
errors: int
):
"""记录性能指标"""
self.metrics["processing_times"].append(duration)
self.metrics["error_rates"].append(errors / batch_size)
self.metrics["throughput"].append(
batch_size / duration
)
最佳实践建议
-
批处理优化
- 根据系统资源动态调整批大小
- 实现智能重试机制
- 监控并优化内存使用
-
并发控制
- 使用信号量限制并发数
- 实现请求速率限制
- 合理设置超时时间
-
错误处理
- 实现分级错误处理
- 记录详细错误信息
- 提供优雅的降级方案
性能调优要点
-
系统层面
- 监控系统资源使用
- 优化内存管理
- 实现负载均衡
-
应用层面
- 优化批处理策略
- 调整并发参数
- 实现缓存机制
总结
并行处理是提升 LLM 应用性能的关键技术。主要收获:
- 掌握并行处理的核心策略
- 实现高效的批处理机制
- 优化系统性能
- 确保资源合理利用