代码质量改进报告

版本: 1.0.0
更新日期: 2026-02-08
状态: 进行中

本文档记录代码质量改进计划,包括 TODO/FIXME 标记清理、性能优化和错误处理改进。


📚 目录

  1. TODO/FIXME 标记分析
  2. 性能瓶颈分析
  3. 错误处理改进
  4. 改进计划
  5. 实施进度

TODO/FIXME 标记分析

当前状态

根据代码扫描,发现以下 TODO/FIXME 标记:

高优先级(需要尽快实现)

  1. 支付集成 (private/platform/src/users/payment_service.py)

    • 微信支付 SDK 集成
    • 支付宝 SDK 集成
    • Stripe SDK 集成
    • 影响: 支付功能不完整
    • 优先级: ⭐⭐⭐⭐⭐
  2. 用户认证 (shared/src/api/webhooks.py)

    • 从认证中获取真实 user_id(当前硬编码)
    • 影响: 安全性问题
    • 优先级: ⭐⭐⭐⭐⭐
  3. 邮件通知 (shared/src/tasks/settlement_tasks.py)

    • 从配置获取管理员邮箱
    • 从数据库获取开发者邮箱
    • 影响: 通知功能不完整
    • 优先级: ⭐⭐⭐⭐

中优先级(功能完善)

  1. 缓存优化 (shared/src/tasks/settlement_tasks.py, shared/src/tasks/statistics_tasks.py)

    • 将结果缓存到 Redis
    • 影响: 性能优化
    • 优先级: ⭐⭐⭐
  2. 订阅管理 (shared/src/api/webhook.py)

    • 更新订阅状态、发送通知等业务逻辑
    • 影响: 订阅功能不完整
    • 优先级: ⭐⭐⭐
  3. Celery 任务 (shared/src/tasks/celery_tasks.py)

    • 多个任务需要实际实现
    • 影响: 后台任务功能不完整
    • 优先级: ⭐⭐⭐

低优先级(前端/UI)

  1. 前端功能 (private/platform/src/api_admin/templates/)

    • 编辑功能
    • 对话加载
    • 通知组件改进
    • 影响: UI 体验
    • 优先级: ⭐⭐
  2. 用户信息 (public/education/backend/api/course_review_api.py)

    • 从用户系统获取真实姓名
    • 影响: 显示准确性
    • 优先级: ⭐⭐

处理建议

1. 支付集成

# private/platform/src/users/payment_service.py

# TODO: 集成微信支付SDK
# 实现步骤:
# 1. 安装 wechatpay-python SDK: pip install wechatpay-python
# 2. 配置商户号和API密钥
# 3. 实现统一下单接口
# 4. 实现支付回调处理
# 5. 添加支付状态查询
# 注意事项:
# - 需要HTTPS环境
# - 需要配置支付回调URL
# - 需要处理支付超时和退款

def create_wechat_payment(self, order_id: str, amount: float, description: str):
    """
    创建微信支付订单
    
    Args:
        order_id: 订单ID
        amount: 支付金额(元)
        description: 商品描述
    
    Returns:
        dict: 包含支付URL和订单信息
    """
    from wechatpayv3 import WeChatPay
    
    wechatpay = WeChatPay(
        appid=settings.WECHAT_APP_ID,
        mchid=settings.WECHAT_MCH_ID,
        private_key_path=settings.WECHAT_PRIVATE_KEY_PATH,
        cert_serial_no=settings.WECHAT_CERT_SERIAL_NO,
        notify_url=settings.WECHAT_NOTIFY_URL
    )
    
    # 统一下单
    result = wechatpay.pay(
        description=description,
        out_trade_no=order_id,
        amount={"total": int(amount * 100)},  # 转换为分
        payer={"openid": user_openid}
    )
    
    return {
        "payment_url": result.get("h5_url"),
        "order_id": order_id
    }

2. 用户认证改进 ✅ 已完成

问题: shared/src/api/webhooks.py 中硬编码 user_id = "user_123"

解决方案:

  • ✅ 创建了 get_current_user_id 函数,支持多种认证方式(Bearer Token、API Key、Device ID)
  • ✅ 创建了 require_user_id 依赖函数,要求用户认证
  • ✅ 更新了所有端点使用依赖注入获取真实 user_id
  • ✅ 添加了权限验证,确保用户只能访问自己的 webhook

实现代码:

# shared/src/api/webhooks.py

async def get_current_user_id(
    credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
    x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
    x_device_id: Optional[str] = Header(None, alias="X-Device-ID"),
) -> Optional[str]:
    """获取当前用户ID,支持多种认证方式"""
    # 方式1: Bearer Token (JWT)
    if credentials:
        from users.auth import auth_service
        user_id = auth_service.verify_token(credentials.credentials)
        if user_id:
            return str(user_id)
    
    # 方式2: API Key
    if x_api_key:
        from api_clients.service import api_client_service
        key = await api_client_service.verify_api_key(x_api_key)
        if key and key.is_valid:
            return str(key.client_id)
    
    # 方式3: Device ID
    if x_device_id:
        from users.service import user_service
        user = await user_service.get_or_create_user_by_device(x_device_id)
        if user:
            return str(user.id)
    
    return None

async def require_user_id(
    user_id: Optional[str] = Depends(get_current_user_id)
) -> str:
    """要求用户认证"""
    if not user_id:
        raise HTTPException(status_code=401, detail="Authentication required")
    return user_id

@router.post("/endpoints", response_model=WebhookEndpoint)
async def create_webhook_endpoint(
    data: WebhookEndpointCreate,
    user_id: str = Depends(require_user_id),  # ✅ 从认证中获取
):
    """创建Webhook端点"""
    return webhook_manager.create_endpoint(user_id, data)

3. 邮件通知改进

# shared/src/tasks/settlement_tasks.py

# TODO: 从配置获取管理员邮箱
# 改进方案: 使用配置系统

from shared.src.config import settings

async def send_settlement_notification(settlement_id: str):
    """发送结算通知"""
    # 从配置获取管理员邮箱
    admin_email = settings.ADMIN_EMAIL or "admin@mises.ai"
    
    # 从数据库获取开发者邮箱
    async with AsyncSessionLocal() as session:
        result = await session.execute(
            select(User.email).where(User.id == developer_id)
        )
        developer_email = result.scalar_one_or_none()
    
    if developer_email:
        await send_email(
            recipient_email=developer_email,
            subject="结算通知",
            content=notification_content
        )

性能瓶颈分析

已完成的优化

根据项目文档,以下性能优化已完成:

  1. 数据库索引优化 (migrations/011_add_performance_indexes.sql)

    • 用户查询优化
    • 对话历史查询优化
    • API Key 查询优化
    • 闭环系统索引
  2. 消息队列异步化

    • 支付回调异步化(响应时间减少 98.3%)
    • Token 使用记录异步化(数据库压力减少 60-80%)
  3. 缓存策略

    • Token 余额缓存(Redis,TTL 5 分钟)
    • 用户画像缓存

待优化的性能问题

1. N+1 查询问题

问题位置: private/platform/src/users/router.py

# 问题代码示例
users = await db.execute(select(User))
for user in users:
    conversations = user.conversations  # N+1 查询!

改进方案:

from sqlalchemy.orm import joinedload, selectinload

# 使用 joinedload 或 selectinload
users = await db.execute(
    select(User)
    .options(selectinload(User.conversations))
)
# 一次性加载所有关联数据

2. 数据库连接池优化

当前状态: 需要检查连接池配置

改进建议:

# shared/src/storage/database.py

# 优化连接池配置
engine = create_async_engine(
    database_url,
    pool_size=20,           # 连接池大小
    max_overflow=10,        # 最大溢出连接数
    pool_pre_ping=True,     # 连接前检查
    pool_recycle=3600,      # 连接回收时间(秒)
    echo=settings.debug,
)

3. Redis 批量操作优化

问题: 多次单独操作 Redis

改进方案:

# 使用 Pipeline 批量操作
async def batch_update_cache(data: Dict[str, Any]):
    """批量更新缓存"""
    r = await get_redis()
    pipe = r.pipeline()
    
    for key, value in data.items():
        pipe.setex(key, 3600, json.dumps(value))
    
    await pipe.execute()  # 一次性执行所有操作

4. API 响应压缩

当前状态: 未启用 Gzip 压缩

改进方案:

# shared/src/main.py

from fastapi.middleware.gzip import GZipMiddleware

# 添加 Gzip 压缩中间件
app.add_middleware(
    GZipMiddleware,
    minimum_size=1000  # 大于1KB才压缩
)

5. 慢查询监控

当前状态: 已有慢查询监控工具

改进建议: 定期审查慢查询日志,优化超过 200ms 的查询

# 查看慢查询
# GET /api/performance/slow-queries?limit=50

# 优化建议:
# 1. 添加缺失的索引
# 2. 优化查询语句(避免全表扫描)
# 3. 使用 EXPLAIN ANALYZE 分析查询计划

错误处理改进

当前问题

扫描发现大量使用 except Exception 的通用异常处理:

  1. 过于宽泛的异常捕获

    • 位置: private/platform/src/users/router.py (30+ 处)
    • 问题: 捕获所有异常,难以定位具体问题
  2. 缺少错误日志

    • 部分异常处理没有记录日志
    • 缺少错误上下文信息
  3. 错误响应不统一

    • 不同模块返回的错误格式不一致

改进方案

1. 使用具体的异常类型

# ❌ 不好的做法
try:
    result = await some_operation()
except Exception as e:
    logger.error(f"Error: {e}")
    return {"error": "操作失败"}

# ✅ 好的做法
from sqlalchemy.exc import SQLAlchemyError
from httpx import HTTPError, TimeoutException

try:
    result = await some_operation()
except SQLAlchemyError as e:
    logger.error(f"Database error: {e}", exc_info=True)
    raise HTTPException(status_code=500, detail="数据库操作失败")
except HTTPError as e:
    logger.error(f"HTTP error: {e}", exc_info=True)
    raise HTTPException(status_code=503, detail="外部服务不可用")
except TimeoutException as e:
    logger.error(f"Timeout error: {e}", exc_info=True)
    raise HTTPException(status_code=504, detail="请求超时")
except ValueError as e:
    logger.warning(f"Validation error: {e}")
    raise HTTPException(status_code=400, detail=str(e))

2. 统一错误响应格式

# shared/src/utils/error_handler.py

from typing import Optional
from fastapi import HTTPException
from pydantic import BaseModel

class ErrorResponse(BaseModel):
    """统一错误响应格式"""
    error: str
    error_code: Optional[str] = None
    detail: Optional[str] = None
    request_id: Optional[str] = None

class APIError(HTTPException):
    """自定义 API 错误"""
    def __init__(
        self,
        status_code: int,
        error: str,
        error_code: Optional[str] = None,
        detail: Optional[str] = None
    ):
        self.error_code = error_code
        super().__init__(
            status_code=status_code,
            detail=ErrorResponse(
                error=error,
                error_code=error_code,
                detail=detail
            ).dict()
        )

# 使用示例
raise APIError(
    status_code=400,
    error="Invalid request",
    error_code="INVALID_PARAMETER",
    detail="Email format is invalid"
)

3. 添加错误追踪

# 使用现有的错误追踪系统
from monitoring.error_tracking import error_tracker

try:
    result = await risky_operation()
except Exception as e:
    # 记录错误到追踪系统
    error_id = error_tracker.capture_exception(
        exception=e,
        request=request,
        user_id=user_id,
        extra={"operation": "risky_operation"}
    )
    
    logger.error(f"Operation failed: {error_id}")
    raise APIError(
        status_code=500,
        error="Internal server error",
        error_code="OPERATION_FAILED",
        detail=f"Error ID: {error_id}"
    )

4. 改进特定模块的错误处理

用户服务模块 (private/platform/src/users/router.py):

# 改进前
except Exception as e:
    logger.debug(f"Failed to get user role: {e}")

# 改进后
except SQLAlchemyError as e:
    logger.error(f"Database error while fetching user role: {e}", exc_info=True)
    # 使用默认值,不中断流程
    role = "user"
except Exception as e:
    logger.error(f"Unexpected error while fetching user role: {e}", exc_info=True)
    # 记录到错误追踪系统
    error_tracker.capture_exception(e, extra={"user_id": str(user_id)})
    role = "user"

Token 计费模块 (private/platform/src/users/token_billing.py):

# 改进前
except Exception:
    pass  # 静默失败

# 改进后
except RedisError as e:
    logger.debug(f"Redis cache error (non-critical): {e}")
    # 继续执行,从数据库读取
except Exception as e:
    logger.error(f"Unexpected error in token billing: {e}", exc_info=True)
    # 记录错误但不中断流程
    error_tracker.capture_exception(e, extra={"user_id": str(user_id)})

改进计划

第一阶段:高优先级(1-2周)

  1. 支付集成 ⭐⭐⭐⭐⭐

    • 集成微信支付 SDK
    • 集成支付宝 SDK
    • 集成 Stripe SDK
    • 添加支付测试
  2. 用户认证改进 ⭐⭐⭐⭐⭐

    • 修复硬编码 user_id
    • 添加认证中间件
    • 添加认证测试
  3. 错误处理统一 ⭐⭐⭐⭐

    • 创建统一错误响应格式
    • 改进关键模块的错误处理
    • 添加错误追踪集成

第二阶段:中优先级(2-4周)

  1. 性能优化 ⭐⭐⭐

    • 修复 N+1 查询问题
    • 优化数据库连接池
    • 添加 Redis Pipeline 批量操作
    • 启用 API 响应压缩
  2. 缓存优化 ⭐⭐⭐

    • 实现结算结果缓存
    • 实现统计结果缓存
    • 添加缓存失效策略
  3. 邮件通知完善 ⭐⭐⭐

    • 从配置获取管理员邮箱
    • 从数据库获取开发者邮箱
    • 添加邮件模板

第三阶段:低优先级(持续改进)

  1. 前端功能完善 ⭐⭐

    • 实现编辑功能
    • 实现对话加载
    • 改进通知组件
  2. 代码审查和重构 ⭐⭐

    • 定期代码审查
    • 重构重复代码
    • 提升代码可读性

实施进度

已完成 ✅

  • TODO/FIXME 标记扫描和分析
  • 性能瓶颈识别
  • 错误处理问题分析
  • 改进计划制定

进行中 🚧

  • 支付集成(计划中)
  • 用户认证改进 ✅ 已完成 - 修复了硬编码 user_id 安全问题
  • 错误处理统一(计划中)

待开始 📋

  • 性能优化实施
  • 缓存优化实施
  • 前端功能完善

代码质量指标

目标指标

指标 当前值 目标值 状态
TODO/FIXME 标记 ~20 < 10 🚧
代码覆盖率 13.09% 70% 🚧
平均响应时间 - < 200ms 📋
错误处理覆盖率 ~60% 90% 🚧
慢查询数量 (>200ms) - < 5/天 📋

监控方法

  1. TODO 标记: 定期扫描代码库
  2. 代码覆盖率: 运行测试并查看覆盖率报告
  3. 响应时间: APM 监控系统
  4. 错误处理: 错误追踪系统
  5. 慢查询: 数据库慢查询日志

最佳实践建议

1. 异常处理

# ✅ 推荐做法
try:
    result = await operation()
except SpecificError as e:
    # 处理特定错误
    logger.error(f"Specific error: {e}", exc_info=True)
    raise APIError(status_code=400, error="Operation failed")
except Exception as e:
    # 处理未知错误
    logger.error(f"Unexpected error: {e}", exc_info=True)
    error_tracker.capture_exception(e)
    raise APIError(status_code=500, error="Internal server error")

2. 性能优化

# ✅ 使用批量操作
# 批量查询
users = await db.execute(
    select(User)
    .options(selectinload(User.conversations))
    .where(User.status == "active")
)

# 批量更新
await db.execute(
    update(User)
    .where(User.id.in_(user_ids))
    .values(status="inactive")
)

3. 缓存策略

# ✅ 使用缓存装饰器
from functools import lru_cache
from storage.redis import cache_result

@cache_result(expire_seconds=3600)
async def get_expensive_data(key: str):
    """获取昂贵的数据(带缓存)"""
    return await compute_expensive_data(key)

相关资源


文档版本: 1.0.0
最后更新: 2026-02-08
维护者: MBE 开发团队