文件上传:并行 vs 串行策略分析
📊 当前实现分析
当前架构
API层面(
upload_files_batch):- ✅ 串行读取文件:
for file in files - ✅ 小文件(<1MB):同步处理
- ✅ 大文件(>1MB):异步处理(Celery队列)
- ✅ 串行读取文件:
任务处理层面(
process_batch_upload_task):- ⚠️ 串行处理:逐个提交任务并等待完成
- ⚠️ 阻塞等待:
result.get(timeout=30)等待每个任务完成
脚本工具(
batch_upload_pdfs.py):- ✅ 并行解析:使用
ThreadPoolExecutor(max_workers=4)
- ✅ 并行解析:使用
🔍 并行 vs 串行对比
串行上传(当前实现)
优点:
- ✅ 内存占用低:一次只处理一个文件
- ✅ 错误隔离好:一个文件失败不影响其他文件
- ✅ 资源可控:不会同时占用大量CPU/内存
- ✅ 实现简单:逻辑清晰,易于调试
- ✅ 数据库压力小:避免并发写入冲突
缺点:
- ❌ 速度慢:总时间 = 所有文件处理时间之和
- ❌ 吞吐量低:无法充分利用多核CPU
- ❌ 用户体验差:大量文件时等待时间长
适用场景:
- 文件数量少(<10个)
- 文件很大(>100MB)
- 资源受限环境
- 需要严格错误控制
并行上传(推荐优化)
优点:
- ✅ 速度快:总时间 ≈ 最慢文件的时间
- ✅ 吞吐量高:充分利用多核CPU和I/O
- ✅ 用户体验好:快速完成批量上传
- ✅ 资源利用率高:CPU、内存、I/O并行工作
缺点:
- ❌ 内存占用高:同时处理多个文件
- ❌ 错误处理复杂:需要管理多个并发任务
- ❌ 资源竞争:可能造成数据库/文件系统压力
- ❌ 实现复杂:需要处理并发、锁、错误恢复
适用场景:
- 文件数量多(>10个)
- 文件较小(<50MB)
- 资源充足环境
- 需要快速完成上传
💡 推荐方案:混合策略
策略1:基于文件大小的混合策略(推荐)
# 伪代码
async def upload_files_batch_optimized(files):
small_files = [f for f in files if f.size < 5MB]
large_files = [f for f in files if f.size >= 5MB]
# 小文件:并行处理(最多4个并发)
if small_files:
await asyncio.gather(*[
process_file(f) for f in small_files[:4]
])
# 大文件:串行处理(避免内存压力)
for file in large_files:
await process_file(file)
优点:
- ✅ 小文件快速并行处理
- ✅ 大文件避免内存压力
- ✅ 平衡速度和资源
策略2:基于队列的并行处理(当前优化方向)
# 当前实现的问题
for file_info in files:
result = process_document_upload_task.apply_async(...)
task_result = result.get(timeout=30) # ❌ 阻塞等待
# 优化后:并行提交,批量等待
tasks = []
for file_info in files:
task = process_document_upload_task.apply_async(...)
tasks.append(task)
# 并行等待所有任务完成
results = []
for task in tasks:
try:
result = task.get(timeout=300) # 增加超时时间
results.append(result)
except Exception as e:
results.append({"error": str(e)})
优点:
- ✅ 所有文件同时提交到队列
- ✅ Celery Worker 并行处理
- ✅ 不阻塞API响应
策略3:分阶段并行处理(最佳性能)
# 阶段1:并行解析文件(CPU密集型)
with ThreadPoolExecutor(max_workers=4) as executor:
parse_futures = {
executor.submit(parse_file, f): f
for f in files
}
parsed_results = [
future.result()
for future in as_completed(parse_futures)
]
# 阶段2:并行生成向量(GPU/CPU密集型)
with ThreadPoolExecutor(max_workers=2) as executor:
embed_futures = {
executor.submit(generate_embeddings, chunks): chunks
for chunks in parsed_results
}
embedded_results = [
future.result()
for future in as_completed(embed_futures)
]
# 阶段3:串行保存到数据库(避免并发冲突)
for result in embedded_results:
save_to_database(result)
优点:
- ✅ 最大化并行度
- ✅ 避免数据库并发冲突
- ✅ 充分利用CPU/GPU
🎯 针对MBE系统的具体建议
当前问题
process_batch_upload_task串行等待:# 当前代码:串行等待 for file_info in files: result = process_document_upload_task.apply_async(...) task_result = result.get(timeout=30) # 等待30秒问题:如果每个文件处理需要10秒,10个文件需要100秒
API层面串行读取:
# 当前代码:串行读取 for file in files: content = await file.read() # 逐个读取问题:大文件会阻塞后续文件读取
推荐优化方案
方案A:优化任务处理(最简单,推荐)
修改 process_batch_upload_task:
@celery_app.task(
name="src.tasks.document_tasks.process_batch_upload",
bind=True,
max_retries=3,
default_retry_delay=120,
queue="normal_priority",
priority=5,
)
def process_batch_upload_task(
self, kb_id: str, files: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
批量处理文档上传任务(并行优化版)
"""
results = {
"total": len(files),
"success": 0,
"failed": 0,
"processing": 0,
"errors": [],
"file_results": [],
}
# ✅ 优化:并行提交所有任务
tasks = []
for file_info in files:
filename = file_info.get("filename", "unknown")
try:
task = process_document_upload_task.apply_async(
args=(
kb_id,
filename,
file_info.get("file_content_base64", ""),
file_info.get("use_ocr", False),
file_info.get("ocr_lang", "ch"),
),
queue="normal_priority",
)
tasks.append((task, filename, file_info))
except Exception as e:
results["failed"] += 1
results["errors"].append({"filename": filename, "error": str(e)})
# ✅ 优化:批量等待结果(不阻塞)
for task, filename, file_info in tasks:
try:
# 增加超时时间,允许长时间处理
task_result = task.get(timeout=300) # 5分钟超时
if task_result and task_result.get("success"):
results["success"] += 1
results["file_results"].append({
"filename": filename,
"status": "success",
"chunks_count": task_result.get("chunks_count", 0),
})
else:
results["failed"] += 1
results["errors"].append({
"filename": filename,
"error": task_result.get("errors", ["Task failed"]) if task_result else ["Task failed"],
})
except Exception as e:
results["failed"] += 1
results["errors"].append({"filename": filename, "error": str(e)})
logger.error(f"Batch upload file {filename} failed: {e}")
return results
关键改进:
- ✅ 所有任务并行提交到队列
- ✅ Celery Worker 可以并行处理多个任务
- ✅ 不阻塞API响应
- ✅ 增加超时时间(300秒)
方案B:优化API层面(中等复杂度)
修改 upload_files_batch:
@router.post("/upload-batch/{kb_id}")
async def upload_files_batch(
kb_id: str,
background_tasks: BackgroundTasks,
files: List[UploadFile] = File(...),
use_queue: bool = True,
max_concurrent: int = 4, # 新增:最大并发数
):
"""
批量上传多个文件到知识库(并行优化版)
"""
# ... 前置检查 ...
# ✅ 优化:并行读取文件(限制并发数)
import asyncio
from asyncio import Semaphore
semaphore = Semaphore(max_concurrent)
async def read_file_with_limit(file):
async with semaphore:
content = await file.read()
return file.filename, content
# 并行读取所有文件
file_contents = await asyncio.gather(*[
read_file_with_limit(file) for file in files
])
# 处理文件内容
batch_files = []
for filename, content in file_contents:
# ... 处理逻辑 ...
关键改进:
- ✅ 并行读取文件(限制并发数)
- ✅ 避免大文件阻塞小文件
- ✅ 控制内存占用
方案C:完全并行处理(最复杂,性能最好)
使用 asyncio.gather 或 ThreadPoolExecutor:
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def upload_files_batch_fully_parallel(
kb_id: str,
files: List[UploadFile],
max_workers: int = 4,
):
"""
完全并行上传(适合小文件)
"""
manager = get_knowledge_manager()
async def process_single_file(file):
try:
content = await file.read()
filename = file.filename or "unknown"
# 小文件直接处理
if len(content) < 1024 * 1024: # < 1MB
kb = await manager.upload_file(
kb_id=kb_id,
file_path=filename,
file_content=content,
)
return {"filename": filename, "status": "success"}
else:
# 大文件提交到队列
# ...
return {"filename": filename, "status": "queued"}
except Exception as e:
return {"filename": file.filename, "status": "error", "error": str(e)}
# 并行处理所有文件(限制并发数)
semaphore = asyncio.Semaphore(max_workers)
async def process_with_limit(file):
async with semaphore:
return await process_single_file(file)
results = await asyncio.gather(*[
process_with_limit(file) for file in files
])
return results
📈 性能对比
场景:上传10个文件,每个处理10秒
| 方案 | 总时间 | 内存占用 | 复杂度 | 推荐度 |
|---|---|---|---|---|
| 当前串行 | 100秒 | 低 | 简单 | ⭐⭐ |
| 方案A:队列并行 | 10-20秒 | 中 | 简单 | ⭐⭐⭐⭐⭐ |
| 方案B:API并行 | 15-25秒 | 中 | 中等 | ⭐⭐⭐⭐ |
| 方案C:完全并行 | 10-15秒 | 高 | 复杂 | ⭐⭐⭐ |
✅ 最终推荐
短期优化(立即实施)
方案A:优化 process_batch_upload_task
- ✅ 修改简单(只需改一个函数)
- ✅ 风险低(不改变API接口)
- ✅ 效果明显(10倍性能提升)
- ✅ 利用现有Celery Worker并行能力
中期优化(1-2周)
方案B:优化API层面
- ✅ 并行读取文件
- ✅ 控制并发数
- ✅ 改善用户体验
长期优化(1个月+)
方案C:完全并行处理
- ✅ 最大化性能
- ✅ 需要重构代码
- ✅ 需要充分测试
🎯 实施建议
- 立即实施方案A:修改
process_batch_upload_task,并行提交任务 - 监控性能:观察Celery Worker的并发处理能力
- 根据结果决定:是否需要方案B或C
⚠️ 注意事项
Celery Worker并发数:
- 确保Worker配置了足够的并发数(
--concurrency=4) - 监控Worker负载
- 确保Worker配置了足够的并发数(
数据库连接池:
- 并行处理会增加数据库连接数
- 确保连接池足够大
内存监控:
- 并行处理会增加内存占用
- 监控容器内存使用
错误处理:
- 并行处理时错误处理更复杂
- 确保错误不会影响其他文件
超时设置:
- 增加任务超时时间(300秒)
- 避免大文件超时失败