课程同步架构设计
📋 问题背景
当前问题:
- 知识库更新后,课程映射不会自动更新
- 需要手动运行迁移脚本
- 可能导致课程内容与知识库不一致
用户需求:
- 知识库更新时,自动同步课程
- 使用队列消息机制确保可靠性
🏗️ 架构设计
方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Webhook 事件 | 轻量、已集成 | 需要接收端在线 | 实时同步 |
| 消息队列 | 可靠、支持重试 | 需要额外基础设施 | 高可靠性要求 |
| 数据库事件 | 简单、无依赖 | 轮询延迟 | 简单场景 |
推荐方案:混合架构
知识库更新
↓
发布事件(Webhook + 队列)
↓
├─→ Webhook → 教育应用(实时)
└─→ 消息队列 → 后台任务(可靠)
🔧 实现方案
方案1:基于现有 Webhook 系统(推荐)
优点:
- 无需引入新基础设施
- 利用现有 WebSocket 推送
- 实现简单
实现步骤:
- 扩展 Webhook 事件类型
# src/api/webhooks.py
class WebhookEvent(str, Enum):
# ... 现有事件
KNOWLEDGE_BASE_CREATED = "knowledge_base.created"
KNOWLEDGE_BASE_UPDATED = "knowledge_base.updated"
KNOWLEDGE_BASE_CHUNKS_UPDATED = "knowledge_base.chunks_updated"
EXPERT_COURSE_SYNC_REQUIRED = "expert.course_sync_required"
- 在知识库更新时发布事件
# src/api/knowledge.py
@router.post("/{kb_id}/chunks")
async def update_knowledge_chunks(kb_id: str, ...):
# ... 更新 chunks
# 发布事件
await publish_webhook_event(
WebhookEvent.KNOWLEDGE_BASE_CHUNKS_UPDATED,
{
"kb_id": kb_id,
"expert_id": expert_id,
"chunks_count": len(chunks),
"timestamp": datetime.now().isoformat()
}
)
- 教育应用订阅事件
# opensource/mbe-education/backend/api/course_sync.py
@router.post("/webhook/knowledge-base-updated")
async def handle_kb_update(payload: dict):
"""接收知识库更新事件,触发课程同步"""
kb_id = payload["kb_id"]
expert_id = payload.get("expert_id")
# 异步同步课程
await sync_course_for_kb(kb_id, expert_id)
方案2:基于消息队列(高可靠性)
使用 Redis Queue 或 RabbitMQ
优点:
- 消息持久化
- 自动重试
- 支持批量处理
实现:
# src/services/message_queue.py
import redis
from rq import Queue
redis_conn = redis.Redis()
queue = Queue('course_sync', connection=redis_conn)
def enqueue_course_sync(kb_id: str, expert_id: str):
"""将课程同步任务加入队列"""
queue.enqueue(
'scripts.sync_course',
kb_id=kb_id,
expert_id=expert_id,
job_timeout=300 # 5分钟超时
)
方案3:数据库事件表(简单可靠)
使用数据库触发器 + 后台轮询
-- 创建事件表
CREATE TABLE kb_update_events (
id SERIAL PRIMARY KEY,
kb_id VARCHAR(255) NOT NULL,
expert_id VARCHAR(255),
event_type VARCHAR(50),
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT NOW(),
processed_at TIMESTAMP
);
-- 后台任务轮询
SELECT * FROM kb_update_events
WHERE status = 'pending'
ORDER BY created_at
LIMIT 10;
📊 完整流程
事件发布流程
1. 知识库更新(API调用)
↓
2. 保存 chunks/embeddings
↓
3. 发布事件
├─→ Webhook(实时推送)
├─→ 消息队列(可靠投递)
└─→ 数据库事件表(持久化)
↓
4. 教育应用接收事件
↓
5. 触发课程同步
├─→ 加载知识库 chunks
├─→ TITANS+MIRAS 分析
├─→ 生成知识点
└─→ 更新 course_mappings.json
错误处理
- Webhook 失败:降级到消息队列
- 队列失败:写入数据库事件表
- 同步失败:记录错误,支持手动重试
🔄 集成点
1. 知识库更新 API
# src/api/knowledge.py
@router.post("/{kb_id}/chunks")
async def update_knowledge_chunks(...):
# ... 更新逻辑
# 触发课程同步
from src.services.course_sync_service import get_course_sync_service
sync_service = get_course_sync_service()
await sync_service.on_knowledge_base_updated(
kb_id=kb_id,
expert_id=expert_id,
update_type="chunks_updated"
)
2. 专家创建/更新 API
# src/api/knowledge.py
@router.post("/expert/publish")
async def publish_expert(...):
# ... 发布逻辑
# 如果知识库已存在,触发课程同步
if kb_id:
await sync_service.on_knowledge_base_updated(
kb_id=kb_id,
expert_id=expert_id,
update_type="expert_published"
)
⚙️ 配置选项
# config.py
COURSE_SYNC_ENABLED = True
COURSE_SYNC_MODE = "webhook" # webhook, queue, database
COURSE_SYNC_DELAY = 5 # 延迟5秒同步(避免频繁更新)
COURSE_SYNC_BATCH_SIZE = 10 # 批量处理大小
📈 监控和日志
- 记录同步事件
- 监控同步成功率
- 告警同步失败
- 统计同步延迟
✅ 实施建议
阶段1:快速实现(基于 Webhook)
- 扩展 Webhook 事件类型
- 在知识库更新时发布事件
- 教育应用接收并处理
阶段2:增强可靠性(消息队列)
- 引入 Redis Queue
- 实现队列消费者
- 添加重试机制
阶段3:完善监控
- 添加同步状态追踪
- 实现监控面板
- 设置告警规则
🔍 注意事项
- 性能考虑:TITANS 分析耗时,应异步处理
- 幂等性:确保重复事件不会重复处理
- 错误恢复:失败事件应支持手动重试
- 数据一致性:同步过程中避免读取旧数据