课程同步功能实现总结
✅ 已实现功能
1. 事件驱动的课程同步服务
文件: src/services/course_sync_service.py
核心功能:
- ✅ 监听知识库更新事件
- ✅ 自动触发课程同步
- ✅ TITANS+MIRAS 分析
- ✅ 更新课程映射文件
- ✅ 支持异步处理
关键方法:
# 事件处理
async def on_knowledge_base_updated(kb_id, expert_id, update_type, metadata)
# 课程同步
async def sync_course_for_kb(kb_id, expert_id)
2. Webhook 事件扩展
文件: src/api/webhooks.py
新增事件类型:
- ✅
KNOWLEDGE_BASE_CREATED - ✅
KNOWLEDGE_BASE_UPDATED - ✅
KNOWLEDGE_BASE_CHUNKS_UPDATED - ✅
KNOWLEDGE_BASE_METADATA_UPDATED - ✅
COURSE_SYNC_REQUIRED - ✅
COURSE_UPDATED
3. 知识库更新集成
文件: src/api/knowledge.py
集成点:
- ✅ 文件上传完成后触发事件
- ✅ 后台处理完成后触发事件
- ✅ 支持同步和异步两种模式
代码示例:
# 在 upload_file 和 _process_upload 中添加
from src.services.course_sync_service import get_course_sync_service
sync_service = get_course_sync_service()
await sync_service.on_knowledge_base_updated(...)
4. 教育应用 API
文件: opensource/mbe-education/backend/api/course_sync_api.py
API 端点:
- ✅
POST /api/course-sync/webhook/knowledge-base-updated- 接收 Webhook 事件 - ✅
POST /api/course-sync/manual/{kb_id}- 手动触发同步 - ✅
GET /api/course-sync/status/{kb_id}- 查询同步状态
5. 后端服务集成
文件: opensource/mbe-education/scripts/run_simple_api.py
集成:
- ✅ 注册课程同步 API 路由
- ✅ 自动加载课程同步服务
🏗️ 架构设计
事件流程
知识库更新 (API)
↓
发布事件 (course_sync_service)
↓
├─→ Webhook 事件 (可选)
└─→ 直接触发同步 (默认)
↓
加载知识库 chunks
↓
TITANS+MIRAS 分析
↓
生成知识点
↓
更新 course_mappings.json
专家ID查找策略
- 事件中提供的 expert_id(优先级最高)
- 从专家管理器查找(通过 kb_id)
- 从课程映射中查找(回退方案)
- 使用默认值
dynamic_{kb_id}
路径解析
支持多个可能的路径,自动查找:
- 知识库 chunks:
{kb_id}_chunks.json或{kb_id}/chunks.json - 专家索引:
knowledge_bases/experts/index.json - 课程映射:
opensource/mbe-education/backend/knowledge_bases/course_mappings.json
📊 工作流程
自动同步流程
用户上传文件到知识库
POST /api/knowledge/upload/{kb_id}知识库处理完成
# 在 _process_upload 中 await sync_service.on_knowledge_base_updated( kb_id=kb_id, expert_id=expert_id, update_type="chunks_updated" )课程同步服务处理
# 异步执行 await sync_service.sync_course_for_kb(kb_id, expert_id)更新课程映射
- 加载 chunks
- TITANS 分析
- 生成知识点
- 更新 JSON 文件
手动同步流程
# API 调用
POST /api/course-sync/manual/{kb_id}
# Python 调用
from src.services.course_sync_service import get_course_sync_service
sync_service = get_course_sync_service()
await sync_service.sync_course_for_kb(kb_id="your_kb_id")
🔧 配置选项
启用/禁用
sync_service = get_course_sync_service()
sync_service.enabled = True # 默认启用
同步模式
当前实现:直接触发同步(同步模式)
- 优点:简单、实时
- 缺点:可能阻塞(已使用异步任务)
未来可扩展:
- 消息队列模式(Redis Queue / RabbitMQ)
- 数据库事件表模式
📝 使用示例
示例1:知识库更新自动同步
# 用户上传文件
response = await upload_file(kb_id="e22dff89-51f", file=file)
# 系统自动触发同步(无需手动操作)
# 课程映射会在后台更新
示例2:手动触发同步
from src.services.course_sync_service import get_course_sync_service
sync_service = get_course_sync_service()
await sync_service.sync_course_for_kb(
kb_id="e22dff89-51f",
expert_id="civil_service_expert"
)
示例3:检查同步状态
# API 调用
GET /api/course-sync/status/e22dff89-51f
# 响应
{
"kb_id": "e22dff89-51f",
"synced": true,
"course_name": "考公专家",
"knowledge_points_count": 12,
"last_updated": "2026-02-05T10:30:00",
"analysis_method": "titans"
}
⚠️ 注意事项
1. 异步处理
- 同步是异步执行的,不会阻塞知识库更新
- 使用
asyncio.create_task或BackgroundTasks
2. 错误处理
- 同步失败不会影响知识库更新
- 错误会记录到日志
- 支持手动重试
3. 性能考虑
- TITANS 分析耗时较长(可能需要几秒到几分钟)
- 建议异步处理,避免阻塞 API 响应
- 大批量更新考虑批量处理
4. 数据一致性
- 同步过程中,课程映射可能短暂不一致
- 建议在同步完成后再读取课程数据
- 使用状态 API 检查同步完成
🚀 未来扩展
阶段1:消息队列支持(计划中)
# 使用 Redis Queue
from rq import Queue
queue = Queue('course_sync', connection=redis_conn)
queue.enqueue('sync_course', kb_id=kb_id)
阶段2:批量同步(计划中)
# 批量同步多个知识库
await sync_service.sync_multiple_courses(kb_ids=[...])
阶段3:监控面板(计划中)
- 同步状态仪表板
- 成功率统计
- 失败告警
📚 相关文件
src/services/course_sync_service.py- 核心同步服务src/api/webhooks.py- Webhook 事件定义src/api/knowledge.py- 知识库更新集成opensource/mbe-education/backend/api/course_sync_api.py- 教育应用 APIdocs/architecture/COURSE_SYNC_ARCHITECTURE.md- 架构设计文档docs/architecture/COURSE_SYNC_USAGE.md- 使用指南
✅ 测试建议
单元测试
- 测试专家ID查找逻辑
- 测试路径解析
- 测试事件处理
集成测试
- 测试知识库更新触发同步
- 测试手动同步API
- 测试状态查询API
端到端测试
- 上传文件 → 自动同步 → 验证课程更新
🎯 总结
✅ 已解决的问题:
- 知识库更新后课程不会自动更新
- 需要手动运行迁移脚本
- 课程内容与知识库不一致
✅ 实现方案:
- 事件驱动的自动同步
- 基于现有 Webhook 系统
- 支持手动触发和状态查询
✅ 优势:
- 自动化,无需手动操作
- 实时同步,保持数据一致
- 可扩展,支持未来增强