文件上传:并行 vs 串行策略分析

📊 当前实现分析

当前架构

  1. API层面(upload_files_batch

    • 串行读取文件for file in files
    • 小文件(<1MB):同步处理
    • 大文件(>1MB):异步处理(Celery队列)
  2. 任务处理层面(process_batch_upload_task

    • ⚠️ 串行处理:逐个提交任务并等待完成
    • ⚠️ 阻塞等待result.get(timeout=30) 等待每个任务完成
  3. 脚本工具(batch_upload_pdfs.py

    • 并行解析:使用 ThreadPoolExecutor(max_workers=4)

🔍 并行 vs 串行对比

串行上传(当前实现)

优点

  • 内存占用低:一次只处理一个文件
  • 错误隔离好:一个文件失败不影响其他文件
  • 资源可控:不会同时占用大量CPU/内存
  • 实现简单:逻辑清晰,易于调试
  • 数据库压力小:避免并发写入冲突

缺点

  • 速度慢:总时间 = 所有文件处理时间之和
  • 吞吐量低:无法充分利用多核CPU
  • 用户体验差:大量文件时等待时间长

适用场景

  • 文件数量少(<10个)
  • 文件很大(>100MB)
  • 资源受限环境
  • 需要严格错误控制

并行上传(推荐优化)

优点

  • 速度快:总时间 ≈ 最慢文件的时间
  • 吞吐量高:充分利用多核CPU和I/O
  • 用户体验好:快速完成批量上传
  • 资源利用率高:CPU、内存、I/O并行工作

缺点

  • 内存占用高:同时处理多个文件
  • 错误处理复杂:需要管理多个并发任务
  • 资源竞争:可能造成数据库/文件系统压力
  • 实现复杂:需要处理并发、锁、错误恢复

适用场景

  • 文件数量多(>10个)
  • 文件较小(<50MB)
  • 资源充足环境
  • 需要快速完成上传

💡 推荐方案:混合策略

策略1:基于文件大小的混合策略(推荐)

# 伪代码
async def upload_files_batch_optimized(files):
    small_files = [f for f in files if f.size < 5MB]
    large_files = [f for f in files if f.size >= 5MB]
    
    # 小文件:并行处理(最多4个并发)
    if small_files:
        await asyncio.gather(*[
            process_file(f) for f in small_files[:4]
        ])
    
    # 大文件:串行处理(避免内存压力)
    for file in large_files:
        await process_file(file)

优点

  • ✅ 小文件快速并行处理
  • ✅ 大文件避免内存压力
  • ✅ 平衡速度和资源

策略2:基于队列的并行处理(当前优化方向)

# 当前实现的问题
for file_info in files:
    result = process_document_upload_task.apply_async(...)
    task_result = result.get(timeout=30)  # ❌ 阻塞等待

# 优化后:并行提交,批量等待
tasks = []
for file_info in files:
    task = process_document_upload_task.apply_async(...)
    tasks.append(task)

# 并行等待所有任务完成
results = []
for task in tasks:
    try:
        result = task.get(timeout=300)  # 增加超时时间
        results.append(result)
    except Exception as e:
        results.append({"error": str(e)})

优点

  • ✅ 所有文件同时提交到队列
  • ✅ Celery Worker 并行处理
  • ✅ 不阻塞API响应

策略3:分阶段并行处理(最佳性能)

# 阶段1:并行解析文件(CPU密集型)
with ThreadPoolExecutor(max_workers=4) as executor:
    parse_futures = {
        executor.submit(parse_file, f): f 
        for f in files
    }
    parsed_results = [
        future.result() 
        for future in as_completed(parse_futures)
    ]

# 阶段2:并行生成向量(GPU/CPU密集型)
with ThreadPoolExecutor(max_workers=2) as executor:
    embed_futures = {
        executor.submit(generate_embeddings, chunks): chunks
        for chunks in parsed_results
    }
    embedded_results = [
        future.result()
        for future in as_completed(embed_futures)
    ]

# 阶段3:串行保存到数据库(避免并发冲突)
for result in embedded_results:
    save_to_database(result)

优点

  • ✅ 最大化并行度
  • ✅ 避免数据库并发冲突
  • ✅ 充分利用CPU/GPU

🎯 针对MBE系统的具体建议

当前问题

  1. process_batch_upload_task 串行等待

    # 当前代码:串行等待
    for file_info in files:
        result = process_document_upload_task.apply_async(...)
        task_result = result.get(timeout=30)  # 等待30秒
    

    问题:如果每个文件处理需要10秒,10个文件需要100秒

  2. API层面串行读取

    # 当前代码:串行读取
    for file in files:
        content = await file.read()  # 逐个读取
    

    问题:大文件会阻塞后续文件读取

推荐优化方案

方案A:优化任务处理(最简单,推荐)

修改 process_batch_upload_task

@celery_app.task(
    name="src.tasks.document_tasks.process_batch_upload",
    bind=True,
    max_retries=3,
    default_retry_delay=120,
    queue="normal_priority",
    priority=5,
)
def process_batch_upload_task(
    self, kb_id: str, files: List[Dict[str, Any]]
) -> Dict[str, Any]:
    """
    批量处理文档上传任务(并行优化版)
    """
    results = {
        "total": len(files),
        "success": 0,
        "failed": 0,
        "processing": 0,
        "errors": [],
        "file_results": [],
    }
    
    # ✅ 优化:并行提交所有任务
    tasks = []
    for file_info in files:
        filename = file_info.get("filename", "unknown")
        try:
            task = process_document_upload_task.apply_async(
                args=(
                    kb_id,
                    filename,
                    file_info.get("file_content_base64", ""),
                    file_info.get("use_ocr", False),
                    file_info.get("ocr_lang", "ch"),
                ),
                queue="normal_priority",
            )
            tasks.append((task, filename, file_info))
        except Exception as e:
            results["failed"] += 1
            results["errors"].append({"filename": filename, "error": str(e)})
    
    # ✅ 优化:批量等待结果(不阻塞)
    for task, filename, file_info in tasks:
        try:
            # 增加超时时间,允许长时间处理
            task_result = task.get(timeout=300)  # 5分钟超时
            
            if task_result and task_result.get("success"):
                results["success"] += 1
                results["file_results"].append({
                    "filename": filename,
                    "status": "success",
                    "chunks_count": task_result.get("chunks_count", 0),
                })
            else:
                results["failed"] += 1
                results["errors"].append({
                    "filename": filename,
                    "error": task_result.get("errors", ["Task failed"]) if task_result else ["Task failed"],
                })
        except Exception as e:
            results["failed"] += 1
            results["errors"].append({"filename": filename, "error": str(e)})
            logger.error(f"Batch upload file {filename} failed: {e}")
    
    return results

关键改进

  • ✅ 所有任务并行提交到队列
  • ✅ Celery Worker 可以并行处理多个任务
  • ✅ 不阻塞API响应
  • ✅ 增加超时时间(300秒)

方案B:优化API层面(中等复杂度)

修改 upload_files_batch

@router.post("/upload-batch/{kb_id}")
async def upload_files_batch(
    kb_id: str,
    background_tasks: BackgroundTasks,
    files: List[UploadFile] = File(...),
    use_queue: bool = True,
    max_concurrent: int = 4,  # 新增:最大并发数
):
    """
    批量上传多个文件到知识库(并行优化版)
    """
    # ... 前置检查 ...
    
    # ✅ 优化:并行读取文件(限制并发数)
    import asyncio
    from asyncio import Semaphore
    
    semaphore = Semaphore(max_concurrent)
    
    async def read_file_with_limit(file):
        async with semaphore:
            content = await file.read()
            return file.filename, content
    
    # 并行读取所有文件
    file_contents = await asyncio.gather(*[
        read_file_with_limit(file) for file in files
    ])
    
    # 处理文件内容
    batch_files = []
    for filename, content in file_contents:
        # ... 处理逻辑 ...

关键改进

  • ✅ 并行读取文件(限制并发数)
  • ✅ 避免大文件阻塞小文件
  • ✅ 控制内存占用

方案C:完全并行处理(最复杂,性能最好)

使用 asyncio.gatherThreadPoolExecutor

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def upload_files_batch_fully_parallel(
    kb_id: str,
    files: List[UploadFile],
    max_workers: int = 4,
):
    """
    完全并行上传(适合小文件)
    """
    manager = get_knowledge_manager()
    
    async def process_single_file(file):
        try:
            content = await file.read()
            filename = file.filename or "unknown"
            
            # 小文件直接处理
            if len(content) < 1024 * 1024:  # < 1MB
                kb = await manager.upload_file(
                    kb_id=kb_id,
                    file_path=filename,
                    file_content=content,
                )
                return {"filename": filename, "status": "success"}
            else:
                # 大文件提交到队列
                # ...
                return {"filename": filename, "status": "queued"}
        except Exception as e:
            return {"filename": file.filename, "status": "error", "error": str(e)}
    
    # 并行处理所有文件(限制并发数)
    semaphore = asyncio.Semaphore(max_workers)
    
    async def process_with_limit(file):
        async with semaphore:
            return await process_single_file(file)
    
    results = await asyncio.gather(*[
        process_with_limit(file) for file in files
    ])
    
    return results

📈 性能对比

场景:上传10个文件,每个处理10秒

方案 总时间 内存占用 复杂度 推荐度
当前串行 100秒 简单 ⭐⭐
方案A:队列并行 10-20秒 简单 ⭐⭐⭐⭐⭐
方案B:API并行 15-25秒 中等 ⭐⭐⭐⭐
方案C:完全并行 10-15秒 复杂 ⭐⭐⭐

✅ 最终推荐

短期优化(立即实施)

方案A:优化 process_batch_upload_task

  • ✅ 修改简单(只需改一个函数)
  • ✅ 风险低(不改变API接口)
  • ✅ 效果明显(10倍性能提升)
  • ✅ 利用现有Celery Worker并行能力

中期优化(1-2周)

方案B:优化API层面

  • ✅ 并行读取文件
  • ✅ 控制并发数
  • ✅ 改善用户体验

长期优化(1个月+)

方案C:完全并行处理

  • ✅ 最大化性能
  • ✅ 需要重构代码
  • ✅ 需要充分测试

🎯 实施建议

  1. 立即实施方案A:修改 process_batch_upload_task,并行提交任务
  2. 监控性能:观察Celery Worker的并发处理能力
  3. 根据结果决定:是否需要方案B或C

⚠️ 注意事项

  1. Celery Worker并发数

    • 确保Worker配置了足够的并发数(--concurrency=4
    • 监控Worker负载
  2. 数据库连接池

    • 并行处理会增加数据库连接数
    • 确保连接池足够大
  3. 内存监控

    • 并行处理会增加内存占用
    • 监控容器内存使用
  4. 错误处理

    • 并行处理时错误处理更复杂
    • 确保错误不会影响其他文件
  5. 超时设置

    • 增加任务超时时间(300秒)
    • 避免大文件超时失败