课程同步架构设计

📋 问题背景

当前问题

  • 知识库更新后,课程映射不会自动更新
  • 需要手动运行迁移脚本
  • 可能导致课程内容与知识库不一致

用户需求

  • 知识库更新时,自动同步课程
  • 使用队列消息机制确保可靠性

🏗️ 架构设计

方案对比

方案 优点 缺点 适用场景
Webhook 事件 轻量、已集成 需要接收端在线 实时同步
消息队列 可靠、支持重试 需要额外基础设施 高可靠性要求
数据库事件 简单、无依赖 轮询延迟 简单场景

推荐方案:混合架构

知识库更新
    ↓
发布事件(Webhook + 队列)
    ↓
    ├─→ Webhook → 教育应用(实时)
    └─→ 消息队列 → 后台任务(可靠)

🔧 实现方案

方案1:基于现有 Webhook 系统(推荐)

优点

  • 无需引入新基础设施
  • 利用现有 WebSocket 推送
  • 实现简单

实现步骤

  1. 扩展 Webhook 事件类型
# src/api/webhooks.py
class WebhookEvent(str, Enum):
    # ... 现有事件
    KNOWLEDGE_BASE_CREATED = "knowledge_base.created"
    KNOWLEDGE_BASE_UPDATED = "knowledge_base.updated"
    KNOWLEDGE_BASE_CHUNKS_UPDATED = "knowledge_base.chunks_updated"
    EXPERT_COURSE_SYNC_REQUIRED = "expert.course_sync_required"
  1. 在知识库更新时发布事件
# src/api/knowledge.py
@router.post("/{kb_id}/chunks")
async def update_knowledge_chunks(kb_id: str, ...):
    # ... 更新 chunks
    
    # 发布事件
    await publish_webhook_event(
        WebhookEvent.KNOWLEDGE_BASE_CHUNKS_UPDATED,
        {
            "kb_id": kb_id,
            "expert_id": expert_id,
            "chunks_count": len(chunks),
            "timestamp": datetime.now().isoformat()
        }
    )
  1. 教育应用订阅事件
# opensource/mbe-education/backend/api/course_sync.py
@router.post("/webhook/knowledge-base-updated")
async def handle_kb_update(payload: dict):
    """接收知识库更新事件,触发课程同步"""
    kb_id = payload["kb_id"]
    expert_id = payload.get("expert_id")
    
    # 异步同步课程
    await sync_course_for_kb(kb_id, expert_id)

方案2:基于消息队列(高可靠性)

使用 Redis Queue 或 RabbitMQ

优点

  • 消息持久化
  • 自动重试
  • 支持批量处理

实现

# src/services/message_queue.py
import redis
from rq import Queue

redis_conn = redis.Redis()
queue = Queue('course_sync', connection=redis_conn)

def enqueue_course_sync(kb_id: str, expert_id: str):
    """将课程同步任务加入队列"""
    queue.enqueue(
        'scripts.sync_course',
        kb_id=kb_id,
        expert_id=expert_id,
        job_timeout=300  # 5分钟超时
    )

方案3:数据库事件表(简单可靠)

使用数据库触发器 + 后台轮询

-- 创建事件表
CREATE TABLE kb_update_events (
    id SERIAL PRIMARY KEY,
    kb_id VARCHAR(255) NOT NULL,
    expert_id VARCHAR(255),
    event_type VARCHAR(50),
    status VARCHAR(20) DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT NOW(),
    processed_at TIMESTAMP
);

-- 后台任务轮询
SELECT * FROM kb_update_events 
WHERE status = 'pending' 
ORDER BY created_at 
LIMIT 10;

📊 完整流程

事件发布流程

1. 知识库更新(API调用)
   ↓
2. 保存 chunks/embeddings
   ↓
3. 发布事件
   ├─→ Webhook(实时推送)
   ├─→ 消息队列(可靠投递)
   └─→ 数据库事件表(持久化)
   ↓
4. 教育应用接收事件
   ↓
5. 触发课程同步
   ├─→ 加载知识库 chunks
   ├─→ TITANS+MIRAS 分析
   ├─→ 生成知识点
   └─→ 更新 course_mappings.json

错误处理

  • Webhook 失败:降级到消息队列
  • 队列失败:写入数据库事件表
  • 同步失败:记录错误,支持手动重试

🔄 集成点

1. 知识库更新 API

# src/api/knowledge.py
@router.post("/{kb_id}/chunks")
async def update_knowledge_chunks(...):
    # ... 更新逻辑
    
    # 触发课程同步
    from src.services.course_sync_service import get_course_sync_service
    sync_service = get_course_sync_service()
    await sync_service.on_knowledge_base_updated(
        kb_id=kb_id,
        expert_id=expert_id,
        update_type="chunks_updated"
    )

2. 专家创建/更新 API

# src/api/knowledge.py
@router.post("/expert/publish")
async def publish_expert(...):
    # ... 发布逻辑
    
    # 如果知识库已存在,触发课程同步
    if kb_id:
        await sync_service.on_knowledge_base_updated(
            kb_id=kb_id,
            expert_id=expert_id,
            update_type="expert_published"
        )

⚙️ 配置选项

# config.py
COURSE_SYNC_ENABLED = True
COURSE_SYNC_MODE = "webhook"  # webhook, queue, database
COURSE_SYNC_DELAY = 5  # 延迟5秒同步(避免频繁更新)
COURSE_SYNC_BATCH_SIZE = 10  # 批量处理大小

📈 监控和日志

  • 记录同步事件
  • 监控同步成功率
  • 告警同步失败
  • 统计同步延迟

✅ 实施建议

阶段1:快速实现(基于 Webhook)

  1. 扩展 Webhook 事件类型
  2. 在知识库更新时发布事件
  3. 教育应用接收并处理

阶段2:增强可靠性(消息队列)

  1. 引入 Redis Queue
  2. 实现队列消费者
  3. 添加重试机制

阶段3:完善监控

  1. 添加同步状态追踪
  2. 实现监控面板
  3. 设置告警规则

🔍 注意事项

  1. 性能考虑:TITANS 分析耗时,应异步处理
  2. 幂等性:确保重复事件不会重复处理
  3. 错误恢复:失败事件应支持手动重试
  4. 数据一致性:同步过程中避免读取旧数据