MBE 模型市场平台设计
一、平台愿景
将 MBE 打造成 AI 专家模型市场:
- 开发者上传知识 → 训练专家模型 → 发布到市场
- 用户按需调用 → 按 Token 计费 → 开发者获得分成
二、核心功能架构
┌─────────────────────────────────────────────────────────────────────────┐
│ MBE 模型市场平台 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 开发者门户 │ │ 模型市场 │ │ 用户终端 │ │
│ │ Developer │ │ Model Market │ │ Consumer │ │
│ │ Portal │ │ │ │ Portal │ │
│ ├─────────────────┤ ├─────────────────┤ ├─────────────────┤ │
│ │ • 文件上传 │ │ • 模型浏览 │ │ • 专家调用 │ │
│ │ • 知识库管理 │ │ • 评分/评论 │ │ • 对话历史 │ │
│ │ • 模型训练 │ │ • 分类检索 │ │ • Token 余额 │ │
│ │ • 质量测试 │ │ • 热门排行 │ │ • 使用统计 │ │
│ │ • 发布上架 │ │ • 价格对比 │ │ │ │
│ │ • 收益统计 │ │ │ │ │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 核心处理层 │ │
│ ├─────────────────────────────────────────────────────────────────┤ │
│ │ │ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │
│ │ │ 文件处理 │ │ 向量索引 │ │ 模型训练 │ │ 推理服务 │ │ │
│ │ │ Pipeline │ │ Service │ │ Service │ │ Service │ │ │
│ │ ├───────────┤ ├───────────┤ ├───────────┤ ├───────────┤ │ │
│ │ │断点续传 │ │增量索引 │ │检查点恢复 │ │负载均衡 │ │ │
│ │ │分块队列 │ │并行构建 │ │分布式训练 │ │模型缓存 │ │ │
│ │ │格式转换 │ │热更新 │ │自动调参 │ │GPU 调度 │ │ │
│ │ └───────────┘ └───────────┘ └───────────┘ └───────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 资源调度层 │ │
│ ├─────────────────────────────────────────────────────────────────┤ │
│ │ 任务队列 (Redis) │ 分布式锁 │ 资源池管理 │ 优先级调度 │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
三、断点续传设计
3.1 文件上传断点续传
# 已实现的分块上传 API
POST /admin/knowledge/upload-chunk/init/{kb_id} # 初始化分块上传
POST /admin/knowledge/upload-chunk/{upload_id}/{chunk_index} # 上传分块
POST /admin/knowledge/upload-chunk/complete/{upload_id} # 完成上传
GET /admin/knowledge/upload-chunk/status/{upload_id} # 查询状态(断点续传用)
断点续传流程:
1. 客户端调用 init → 获取 upload_id
2. 分块上传(每块 5MB)
3. 网络中断 → 重连后调用 status 查询已上传的块
4. 从断点继续上传剩余块
5. 调用 complete 完成合并
3.2 续分块设计(新增)
class ChunkingJob:
"""分块任务状态"""
job_id: str
kb_id: str
source_file: str
total_chars: int
processed_chars: int
chunks_created: int
status: str # pending, processing, paused, completed, error
checkpoint: dict # 断点信息
# API 设计
POST /admin/knowledge/{kb_id}/chunking/start # 开始分块
GET /admin/knowledge/{kb_id}/chunking/status # 查询进度
POST /admin/knowledge/{kb_id}/chunking/pause # 暂停
POST /admin/knowledge/{kb_id}/chunking/resume # 从断点继续
实现要点:
- 每处理 100 个 chunk 保存一次 checkpoint
- checkpoint 包含:文件偏移量、已创建 chunk 数、上下文状态
- 暂停/恢复时从 checkpoint 加载状态继续
3.3 续向量索引设计(新增)
class IndexingJob:
"""索引任务状态"""
job_id: str
kb_id: str
total_chunks: int
indexed_chunks: int
status: str
checkpoint: dict
# API 设计
POST /admin/knowledge/{kb_id}/indexing/start # 开始索引
GET /admin/knowledge/{kb_id}/indexing/status # 查询进度
POST /admin/knowledge/{kb_id}/indexing/pause # 暂停
POST /admin/knowledge/{kb_id}/indexing/resume # 继续
实现要点:
- 向量生成后批量写入 FAISS 索引(每 1000 条)
- 定期保存索引快照
- 支持增量索引(新增文件不重建全量)
3.4 续训练设计(新增)
class TrainingJob:
"""训练任务状态"""
job_id: str
kb_id: str
model_type: str # expert, titans, moe
current_epoch: int
total_epochs: int
current_step: int
total_steps: int
best_loss: float
status: str
checkpoint_path: str
# API 设计
POST /admin/training/start # 开始训练
GET /admin/training/{job_id}/status # 查询进度
POST /admin/training/{job_id}/pause # 暂停(保存检查点)
POST /admin/training/{job_id}/resume # 从检查点恢复
POST /admin/training/{job_id}/cancel # 取消
实现要点:
- 每个 epoch 结束保存 checkpoint
- checkpoint 包含:模型权重、优化器状态、学习率调度器状态
- 支持跨机器恢复训练
四、多用户资源调度设计
4.1 任务队列架构
┌─────────────────────────────────────────────────────────────┐
│ 任务调度系统 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ │
│ │ 任务提交入口 │ │
│ │ (API Gateway) │ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ Redis 任务队列 │ │
│ ├─────────────────────────────────────────┤ │
│ │ high_priority │ 付费用户/紧急任务 │ │
│ │ normal_priority │ 普通用户任务 │ │
│ │ low_priority │ 后台/批量任务 │ │
│ │ training_queue │ 训练任务(独立) │ │
│ └────────┬────────┴────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ Worker Pool (Celery) │ │
│ ├─────────────────────────────────────────┤ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │Worker 1 │ │Worker 2 │ │Worker N │ │ │
│ │ │CPU 分块 │ │CPU 分块 │ │CPU 分块 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ │ │
│ │ │GPU Wkr 1│ │GPU Wkr 2│ (训练/推理) │ │
│ │ └─────────┘ └─────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
4.2 资源配额管理
class UserResourceQuota:
"""用户资源配额"""
user_id: str
plan: str # free, basic, pro, enterprise
# 并发限制
max_concurrent_uploads: int # 同时上传数
max_concurrent_indexing: int # 同时索引数
max_concurrent_training: int # 同时训练数
max_concurrent_queries: int # 同时查询数
# 资源上限
max_storage_gb: int # 存储上限
max_knowledge_bases: int # 知识库数量
max_models: int # 模型数量
# 计算资源
cpu_hours_monthly: int # CPU 小时/月
gpu_hours_monthly: int # GPU 小时/月
# 套餐配置
PLAN_QUOTAS = {
"free": UserResourceQuota(
max_concurrent_uploads=1,
max_concurrent_indexing=1,
max_concurrent_training=0, # 免费用户不能训练
max_concurrent_queries=5,
max_storage_gb=1,
max_knowledge_bases=3,
max_models=0,
cpu_hours_monthly=10,
gpu_hours_monthly=0
),
"basic": UserResourceQuota(
max_concurrent_uploads=3,
max_concurrent_indexing=2,
max_concurrent_training=1,
max_concurrent_queries=20,
max_storage_gb=10,
max_knowledge_bases=10,
max_models=5,
cpu_hours_monthly=100,
gpu_hours_monthly=10
),
"pro": UserResourceQuota(
max_concurrent_uploads=10,
max_concurrent_indexing=5,
max_concurrent_training=3,
max_concurrent_queries=100,
max_storage_gb=100,
max_knowledge_bases=50,
max_models=20,
cpu_hours_monthly=500,
gpu_hours_monthly=50
),
"enterprise": UserResourceQuota(
# 无限制或自定义
...
)
}
4.3 任务优先级调度
class TaskScheduler:
"""任务调度器"""
def calculate_priority(self, task: Task, user: User) -> int:
"""计算任务优先级"""
base_priority = TASK_TYPE_PRIORITY[task.type] # 任务类型基础优先级
# 用户等级加成
if user.plan == "enterprise":
base_priority += 100
elif user.plan == "pro":
base_priority += 50
elif user.plan == "basic":
base_priority += 20
# 等待时间加成(防止饥饿)
wait_seconds = (now - task.created_at).total_seconds()
wait_bonus = min(wait_seconds // 60, 50) # 每分钟+1,最高+50
# 紧急标记
if task.urgent:
base_priority += 200
return base_priority + wait_bonus
async def dispatch_task(self, task: Task):
"""分发任务到 Worker"""
# 1. 检查用户配额
if not await self.check_quota(task.user_id, task.type):
raise QuotaExceededException()
# 2. 计算优先级
priority = self.calculate_priority(task, user)
# 3. 选择队列
queue = self.select_queue(task)
# 4. 提交任务
await queue.submit(task, priority=priority)
# 5. 占用配额
await self.acquire_quota(task.user_id, task.type)
4.4 GPU 资源池管理
class GPUResourcePool:
"""GPU 资源池"""
def __init__(self):
self.gpus = self._discover_gpus()
self.allocations = {} # job_id -> GPU assignment
self.lock = asyncio.Lock()
async def allocate(self, job_id: str, gpu_memory_gb: int) -> GPUAllocation:
"""分配 GPU 资源"""
async with self.lock:
for gpu in self.gpus:
available = gpu.total_memory - gpu.used_memory
if available >= gpu_memory_gb:
allocation = GPUAllocation(
job_id=job_id,
gpu_id=gpu.id,
memory_gb=gpu_memory_gb
)
gpu.used_memory += gpu_memory_gb
self.allocations[job_id] = allocation
return allocation
# 无可用 GPU,加入等待队列
return await self.wait_for_gpu(job_id, gpu_memory_gb)
async def release(self, job_id: str):
"""释放 GPU 资源"""
async with self.lock:
if job_id in self.allocations:
alloc = self.allocations.pop(job_id)
gpu = self.gpus[alloc.gpu_id]
gpu.used_memory -= alloc.memory_gb
# 通知等待中的任务
await self.notify_waiting_jobs()
五、模型发布与市场
5.1 模型发布流程
┌─────────────────────────────────────────────────────────────┐
│ 模型发布流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 开发者完成训练 │
│ └─ 模型状态: trained │
│ ↓ │
│ 2. 质量测试 (自动) │
│ ├─ 准确率测试 │
│ ├─ 响应时间测试 │
│ ├─ 安全性检查 │
│ └─ 模型状态: tested (通过) / rejected (未通过) │
│ ↓ │
│ 3. 填写上架信息 │
│ ├─ 模型名称、描述 │
│ ├─ 定价策略 (Token 单价) │
│ ├─ 领域分类 │
│ ├─ 使用示例 │
│ └─ 模型状态: pending_review │
│ ↓ │
│ 4. 平台审核 (人工) │
│ ├─ 内容合规检查 │
│ ├─ 功能验证 │
│ └─ 模型状态: approved / rejected │
│ ↓ │
│ 5. 上架市场 │
│ └─ 模型状态: published │
│ ↓ │
│ 6. 用户调用 → 计费 → 开发者分成 │
│ │
└─────────────────────────────────────────────────────────────┘
5.2 模型市场数据模型
class PublishedModel(Base):
"""已发布模型"""
__tablename__ = "published_models"
id = Column(String(36), primary_key=True)
developer_id = Column(String(36), ForeignKey("users.id"))
kb_id = Column(String(36)) # 关联知识库
# 基本信息
name = Column(String(100))
description = Column(Text)
category = Column(String(50)) # 法律/健康/金融/教育/...
tags = Column(JSON)
icon_url = Column(String(500))
# 定价
price_per_1k_tokens = Column(Float) # 每 1000 Token 价格
min_tokens_per_call = Column(Integer, default=100)
# 质量指标
quality_score = Column(Float)
accuracy_score = Column(Float)
response_time_p95 = Column(Float) # 95分位响应时间
# 统计
total_calls = Column(Integer, default=0)
total_tokens = Column(BigInteger, default=0)
total_revenue = Column(Float, default=0)
rating_avg = Column(Float, default=0)
rating_count = Column(Integer, default=0)
# 状态
status = Column(String(20)) # draft/testing/pending/published/suspended
published_at = Column(DateTime)
# 版本
version = Column(String(20))
model_checkpoint = Column(String(500)) # 模型文件路径
class ModelReview(Base):
"""模型评价"""
__tablename__ = "model_reviews"
id = Column(String(36), primary_key=True)
model_id = Column(String(36), ForeignKey("published_models.id"))
user_id = Column(String(36), ForeignKey("users.id"))
rating = Column(Integer) # 1-5
comment = Column(Text)
created_at = Column(DateTime)
class ModelUsageLog(Base):
"""模型使用日志"""
__tablename__ = "model_usage_logs"
id = Column(String(36), primary_key=True)
model_id = Column(String(36))
user_id = Column(String(36))
tokens_used = Column(Integer)
cost = Column(Float)
response_time_ms = Column(Integer)
created_at = Column(DateTime)
5.3 收益分成机制
class RevenueSharing:
"""收益分成"""
# 分成比例
PLATFORM_RATE = 0.20 # 平台 20%
DEVELOPER_RATE = 0.80 # 开发者 80%
async def process_call(self, model_id: str, user_id: str, tokens: int):
"""处理一次调用的计费"""
model = await get_model(model_id)
# 计算费用
cost = (tokens / 1000) * model.price_per_1k_tokens
# 扣除用户余额
await deduct_user_balance(user_id, cost)
# 分成
platform_share = cost * self.PLATFORM_RATE
developer_share = cost * self.DEVELOPER_RATE
# 记录
await record_revenue(model_id, model.developer_id, cost, developer_share)
# 更新统计
model.total_calls += 1
model.total_tokens += tokens
model.total_revenue += developer_share
六、实施路线图
Phase 1: 基础设施 ✅ 已完成
| 任务 | 优先级 | 状态 | 实现文件 |
|---|---|---|---|
| 任务队列 (Celery + Redis) | P0 | ✅ | src/tasks/celery_app.py |
| 断点续传完善 | P0 | ✅ | src/tasks/*_tasks.py |
| 资源配额系统 | P0 | ✅ | src/tasks/quota_manager.py |
| 任务状态 API | P0 | ✅ | src/api/tasks.py, src/api/quota.py |
Phase 2: 训练系统 ✅ 已完成
| 任务 | 优先级 | 状态 | 实现文件 |
|---|---|---|---|
| 训练任务管理 | P0 | ✅ | src/training/config.py |
| 检查点管理 | P0 | ✅ | src/training/checkpoint_manager.py |
| 训练监控 | P1 | ✅ | src/training/monitor.py |
| 自动质量评估 | P1 | ✅ | src/training/evaluator.py |
| 训练API | P0 | ✅ | src/api/training_v2.py |
Phase 3: 模型市场 ✅ 已完成
| 任务 | 优先级 | 状态 | 实现文件 |
|---|---|---|---|
| 模型发布流程 | P0 | ✅ | src/market/publisher.py |
| 模型数据模型 | P0 | ✅ | src/market/models.py |
| 计费系统 | P0 | ✅ | src/market/billing.py |
| 评价系统 | P1 | ✅ | src/market/store.py |
| 市场 API | P0 | ✅ | src/api/market.py |
Phase 4: 扩展能力 ✅ 已完成
| 任务 | 优先级 | 状态 | 实现文件 |
|---|---|---|---|
| GPU 集群支持 | P1 | ✅ | src/cluster/gpu_pool.py |
| 分布式训练 | P2 | ✅ | src/cluster/distributed.py |
| A/B 测试 | P2 | ✅ | src/cluster/ab_testing.py |
| API 网关 | P2 | ✅ | src/cluster/gateway.py |
| 集群 API | P1 | ✅ | src/api/cluster.py |
七、技术选型
| 组件 | 推荐方案 | 备选 |
|---|---|---|
| 任务队列 | Celery + Redis | RQ, Dramatiq |
| GPU 调度 | NVIDIA Triton | Ray Serve |
| 对象存储 | MinIO / S3 | 阿里云 OSS |
| 向量数据库 | FAISS + PostgreSQL | Milvus, Qdrant |
| 监控 | Prometheus + Grafana | DataDog |
| 日志 | ELK Stack | Loki |
八、当前系统差距分析
| 功能 | 当前状态 | 目标 | 差距 |
|---|---|---|---|
| 文件上传断点续传 | ✅ 已实现 | ✅ | - |
| 分块处理 | ✅ 同步处理 | 异步+断点 | 需改造 |
| 向量索引 | ✅ 同步处理 | 异步+增量 | 需改造 |
| 模型训练 | ✅ 手动触发 | 任务队列+断点 | 需改造 |
| 多用户并发 | ⚠️ 基础支持 | 资源调度 | 需新增 |
| 模型市场 | ❌ 无 | 完整市场 | 需新增 |
| GPU 调度 | ❌ 单机 | 资源池 | 需新增 |
文档版本: v1.0 更新日期: 2026-01-23