MBE 模型市场平台设计

一、平台愿景

将 MBE 打造成 AI 专家模型市场

  • 开发者上传知识 → 训练专家模型 → 发布到市场
  • 用户按需调用 → 按 Token 计费 → 开发者获得分成

二、核心功能架构

┌─────────────────────────────────────────────────────────────────────────┐
│                        MBE 模型市场平台                                  │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐     │
│  │  开发者门户      │    │  模型市场        │    │  用户终端        │     │
│  │  Developer      │    │  Model Market   │    │  Consumer       │     │
│  │  Portal         │    │                 │    │  Portal         │     │
│  ├─────────────────┤    ├─────────────────┤    ├─────────────────┤     │
│  │ • 文件上传       │    │ • 模型浏览       │    │ • 专家调用       │     │
│  │ • 知识库管理     │    │ • 评分/评论      │    │ • 对话历史       │     │
│  │ • 模型训练       │    │ • 分类检索       │    │ • Token 余额    │     │
│  │ • 质量测试       │    │ • 热门排行       │    │ • 使用统计       │     │
│  │ • 发布上架       │    │ • 价格对比       │    │                 │     │
│  │ • 收益统计       │    │                 │    │                 │     │
│  └─────────────────┘    └─────────────────┘    └─────────────────┘     │
│           │                     │                     │                │
│           ▼                     ▼                     ▼                │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                        核心处理层                                │   │
│  ├─────────────────────────────────────────────────────────────────┤   │
│  │                                                                 │   │
│  │  ┌───────────┐  ┌───────────┐  ┌───────────┐  ┌───────────┐    │   │
│  │  │ 文件处理   │  │ 向量索引   │  │ 模型训练   │  │ 推理服务   │    │   │
│  │  │ Pipeline  │  │ Service   │  │ Service   │  │ Service   │    │   │
│  │  ├───────────┤  ├───────────┤  ├───────────┤  ├───────────┤    │   │
│  │  │断点续传    │  │增量索引    │  │检查点恢复  │  │负载均衡    │    │   │
│  │  │分块队列    │  │并行构建    │  │分布式训练  │  │模型缓存    │    │   │
│  │  │格式转换    │  │热更新      │  │自动调参    │  │GPU 调度   │    │   │
│  │  └───────────┘  └───────────┘  └───────────┘  └───────────┘    │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│           │                     │                     │                │
│           ▼                     ▼                     ▼                │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                        资源调度层                                │   │
│  ├─────────────────────────────────────────────────────────────────┤   │
│  │  任务队列 (Redis) │ 分布式锁 │ 资源池管理 │ 优先级调度           │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

三、断点续传设计

3.1 文件上传断点续传

# 已实现的分块上传 API
POST /admin/knowledge/upload-chunk/init/{kb_id}     # 初始化分块上传
POST /admin/knowledge/upload-chunk/{upload_id}/{chunk_index}  # 上传分块
POST /admin/knowledge/upload-chunk/complete/{upload_id}       # 完成上传
GET  /admin/knowledge/upload-chunk/status/{upload_id}         # 查询状态(断点续传用)

断点续传流程

1. 客户端调用 init → 获取 upload_id
2. 分块上传(每块 5MB)
3. 网络中断 → 重连后调用 status 查询已上传的块
4. 从断点继续上传剩余块
5. 调用 complete 完成合并

3.2 续分块设计(新增)

class ChunkingJob:
    """分块任务状态"""
    job_id: str
    kb_id: str
    source_file: str
    total_chars: int
    processed_chars: int
    chunks_created: int
    status: str  # pending, processing, paused, completed, error
    checkpoint: dict  # 断点信息
    
# API 设计
POST /admin/knowledge/{kb_id}/chunking/start    # 开始分块
GET  /admin/knowledge/{kb_id}/chunking/status   # 查询进度
POST /admin/knowledge/{kb_id}/chunking/pause    # 暂停
POST /admin/knowledge/{kb_id}/chunking/resume   # 从断点继续

实现要点

  • 每处理 100 个 chunk 保存一次 checkpoint
  • checkpoint 包含:文件偏移量、已创建 chunk 数、上下文状态
  • 暂停/恢复时从 checkpoint 加载状态继续

3.3 续向量索引设计(新增)

class IndexingJob:
    """索引任务状态"""
    job_id: str
    kb_id: str
    total_chunks: int
    indexed_chunks: int
    status: str
    checkpoint: dict
    
# API 设计
POST /admin/knowledge/{kb_id}/indexing/start    # 开始索引
GET  /admin/knowledge/{kb_id}/indexing/status   # 查询进度
POST /admin/knowledge/{kb_id}/indexing/pause    # 暂停
POST /admin/knowledge/{kb_id}/indexing/resume   # 继续

实现要点

  • 向量生成后批量写入 FAISS 索引(每 1000 条)
  • 定期保存索引快照
  • 支持增量索引(新增文件不重建全量)

3.4 续训练设计(新增)

class TrainingJob:
    """训练任务状态"""
    job_id: str
    kb_id: str
    model_type: str  # expert, titans, moe
    current_epoch: int
    total_epochs: int
    current_step: int
    total_steps: int
    best_loss: float
    status: str
    checkpoint_path: str
    
# API 设计
POST /admin/training/start                      # 开始训练
GET  /admin/training/{job_id}/status            # 查询进度
POST /admin/training/{job_id}/pause             # 暂停(保存检查点)
POST /admin/training/{job_id}/resume            # 从检查点恢复
POST /admin/training/{job_id}/cancel            # 取消

实现要点

  • 每个 epoch 结束保存 checkpoint
  • checkpoint 包含:模型权重、优化器状态、学习率调度器状态
  • 支持跨机器恢复训练

四、多用户资源调度设计

4.1 任务队列架构

┌─────────────────────────────────────────────────────────────┐
│                     任务调度系统                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────┐                                        │
│  │  任务提交入口    │                                        │
│  │  (API Gateway)  │                                        │
│  └────────┬────────┘                                        │
│           │                                                 │
│           ▼                                                 │
│  ┌─────────────────────────────────────────┐               │
│  │          Redis 任务队列                  │               │
│  ├─────────────────────────────────────────┤               │
│  │  high_priority   │ 付费用户/紧急任务     │               │
│  │  normal_priority │ 普通用户任务          │               │
│  │  low_priority    │ 后台/批量任务         │               │
│  │  training_queue  │ 训练任务(独立)       │               │
│  └────────┬────────┴────────────────────────┘               │
│           │                                                 │
│           ▼                                                 │
│  ┌─────────────────────────────────────────┐               │
│  │          Worker Pool (Celery)           │               │
│  ├─────────────────────────────────────────┤               │
│  │                                         │               │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐   │               │
│  │  │Worker 1 │ │Worker 2 │ │Worker N │   │               │
│  │  │CPU 分块 │ │CPU 分块 │ │CPU 分块 │   │               │
│  │  └─────────┘ └─────────┘ └─────────┘   │               │
│  │                                         │               │
│  │  ┌─────────┐ ┌─────────┐               │               │
│  │  │GPU Wkr 1│ │GPU Wkr 2│  (训练/推理)  │               │
│  │  └─────────┘ └─────────┘               │               │
│  │                                         │               │
│  └─────────────────────────────────────────┘               │
│                                                             │
└─────────────────────────────────────────────────────────────┘

4.2 资源配额管理

class UserResourceQuota:
    """用户资源配额"""
    user_id: str
    plan: str  # free, basic, pro, enterprise
    
    # 并发限制
    max_concurrent_uploads: int      # 同时上传数
    max_concurrent_indexing: int     # 同时索引数
    max_concurrent_training: int     # 同时训练数
    max_concurrent_queries: int      # 同时查询数
    
    # 资源上限
    max_storage_gb: int              # 存储上限
    max_knowledge_bases: int         # 知识库数量
    max_models: int                  # 模型数量
    
    # 计算资源
    cpu_hours_monthly: int           # CPU 小时/月
    gpu_hours_monthly: int           # GPU 小时/月
    
# 套餐配置
PLAN_QUOTAS = {
    "free": UserResourceQuota(
        max_concurrent_uploads=1,
        max_concurrent_indexing=1,
        max_concurrent_training=0,  # 免费用户不能训练
        max_concurrent_queries=5,
        max_storage_gb=1,
        max_knowledge_bases=3,
        max_models=0,
        cpu_hours_monthly=10,
        gpu_hours_monthly=0
    ),
    "basic": UserResourceQuota(
        max_concurrent_uploads=3,
        max_concurrent_indexing=2,
        max_concurrent_training=1,
        max_concurrent_queries=20,
        max_storage_gb=10,
        max_knowledge_bases=10,
        max_models=5,
        cpu_hours_monthly=100,
        gpu_hours_monthly=10
    ),
    "pro": UserResourceQuota(
        max_concurrent_uploads=10,
        max_concurrent_indexing=5,
        max_concurrent_training=3,
        max_concurrent_queries=100,
        max_storage_gb=100,
        max_knowledge_bases=50,
        max_models=20,
        cpu_hours_monthly=500,
        gpu_hours_monthly=50
    ),
    "enterprise": UserResourceQuota(
        # 无限制或自定义
        ...
    )
}

4.3 任务优先级调度

class TaskScheduler:
    """任务调度器"""
    
    def calculate_priority(self, task: Task, user: User) -> int:
        """计算任务优先级"""
        base_priority = TASK_TYPE_PRIORITY[task.type]  # 任务类型基础优先级
        
        # 用户等级加成
        if user.plan == "enterprise":
            base_priority += 100
        elif user.plan == "pro":
            base_priority += 50
        elif user.plan == "basic":
            base_priority += 20
        
        # 等待时间加成(防止饥饿)
        wait_seconds = (now - task.created_at).total_seconds()
        wait_bonus = min(wait_seconds // 60, 50)  # 每分钟+1,最高+50
        
        # 紧急标记
        if task.urgent:
            base_priority += 200
        
        return base_priority + wait_bonus
    
    async def dispatch_task(self, task: Task):
        """分发任务到 Worker"""
        # 1. 检查用户配额
        if not await self.check_quota(task.user_id, task.type):
            raise QuotaExceededException()
        
        # 2. 计算优先级
        priority = self.calculate_priority(task, user)
        
        # 3. 选择队列
        queue = self.select_queue(task)
        
        # 4. 提交任务
        await queue.submit(task, priority=priority)
        
        # 5. 占用配额
        await self.acquire_quota(task.user_id, task.type)

4.4 GPU 资源池管理

class GPUResourcePool:
    """GPU 资源池"""
    
    def __init__(self):
        self.gpus = self._discover_gpus()
        self.allocations = {}  # job_id -> GPU assignment
        self.lock = asyncio.Lock()
    
    async def allocate(self, job_id: str, gpu_memory_gb: int) -> GPUAllocation:
        """分配 GPU 资源"""
        async with self.lock:
            for gpu in self.gpus:
                available = gpu.total_memory - gpu.used_memory
                if available >= gpu_memory_gb:
                    allocation = GPUAllocation(
                        job_id=job_id,
                        gpu_id=gpu.id,
                        memory_gb=gpu_memory_gb
                    )
                    gpu.used_memory += gpu_memory_gb
                    self.allocations[job_id] = allocation
                    return allocation
            
            # 无可用 GPU,加入等待队列
            return await self.wait_for_gpu(job_id, gpu_memory_gb)
    
    async def release(self, job_id: str):
        """释放 GPU 资源"""
        async with self.lock:
            if job_id in self.allocations:
                alloc = self.allocations.pop(job_id)
                gpu = self.gpus[alloc.gpu_id]
                gpu.used_memory -= alloc.memory_gb
                
                # 通知等待中的任务
                await self.notify_waiting_jobs()

五、模型发布与市场

5.1 模型发布流程

┌─────────────────────────────────────────────────────────────┐
│                     模型发布流程                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 开发者完成训练                                          │
│     └─ 模型状态: trained                                    │
│                ↓                                            │
│  2. 质量测试 (自动)                                         │
│     ├─ 准确率测试                                           │
│     ├─ 响应时间测试                                         │
│     ├─ 安全性检查                                           │
│     └─ 模型状态: tested (通过) / rejected (未通过)          │
│                ↓                                            │
│  3. 填写上架信息                                            │
│     ├─ 模型名称、描述                                       │
│     ├─ 定价策略 (Token 单价)                                │
│     ├─ 领域分类                                             │
│     ├─ 使用示例                                             │
│     └─ 模型状态: pending_review                             │
│                ↓                                            │
│  4. 平台审核 (人工)                                         │
│     ├─ 内容合规检查                                         │
│     ├─ 功能验证                                             │
│     └─ 模型状态: approved / rejected                        │
│                ↓                                            │
│  5. 上架市场                                                │
│     └─ 模型状态: published                                  │
│                ↓                                            │
│  6. 用户调用 → 计费 → 开发者分成                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

5.2 模型市场数据模型

class PublishedModel(Base):
    """已发布模型"""
    __tablename__ = "published_models"
    
    id = Column(String(36), primary_key=True)
    developer_id = Column(String(36), ForeignKey("users.id"))
    kb_id = Column(String(36))  # 关联知识库
    
    # 基本信息
    name = Column(String(100))
    description = Column(Text)
    category = Column(String(50))  # 法律/健康/金融/教育/...
    tags = Column(JSON)
    icon_url = Column(String(500))
    
    # 定价
    price_per_1k_tokens = Column(Float)  # 每 1000 Token 价格
    min_tokens_per_call = Column(Integer, default=100)
    
    # 质量指标
    quality_score = Column(Float)
    accuracy_score = Column(Float)
    response_time_p95 = Column(Float)  # 95分位响应时间
    
    # 统计
    total_calls = Column(Integer, default=0)
    total_tokens = Column(BigInteger, default=0)
    total_revenue = Column(Float, default=0)
    rating_avg = Column(Float, default=0)
    rating_count = Column(Integer, default=0)
    
    # 状态
    status = Column(String(20))  # draft/testing/pending/published/suspended
    published_at = Column(DateTime)
    
    # 版本
    version = Column(String(20))
    model_checkpoint = Column(String(500))  # 模型文件路径


class ModelReview(Base):
    """模型评价"""
    __tablename__ = "model_reviews"
    
    id = Column(String(36), primary_key=True)
    model_id = Column(String(36), ForeignKey("published_models.id"))
    user_id = Column(String(36), ForeignKey("users.id"))
    rating = Column(Integer)  # 1-5
    comment = Column(Text)
    created_at = Column(DateTime)


class ModelUsageLog(Base):
    """模型使用日志"""
    __tablename__ = "model_usage_logs"
    
    id = Column(String(36), primary_key=True)
    model_id = Column(String(36))
    user_id = Column(String(36))
    tokens_used = Column(Integer)
    cost = Column(Float)
    response_time_ms = Column(Integer)
    created_at = Column(DateTime)

5.3 收益分成机制

class RevenueSharing:
    """收益分成"""
    
    # 分成比例
    PLATFORM_RATE = 0.20   # 平台 20%
    DEVELOPER_RATE = 0.80  # 开发者 80%
    
    async def process_call(self, model_id: str, user_id: str, tokens: int):
        """处理一次调用的计费"""
        model = await get_model(model_id)
        
        # 计算费用
        cost = (tokens / 1000) * model.price_per_1k_tokens
        
        # 扣除用户余额
        await deduct_user_balance(user_id, cost)
        
        # 分成
        platform_share = cost * self.PLATFORM_RATE
        developer_share = cost * self.DEVELOPER_RATE
        
        # 记录
        await record_revenue(model_id, model.developer_id, cost, developer_share)
        
        # 更新统计
        model.total_calls += 1
        model.total_tokens += tokens
        model.total_revenue += developer_share

六、实施路线图

Phase 1: 基础设施 ✅ 已完成

任务 优先级 状态 实现文件
任务队列 (Celery + Redis) P0 src/tasks/celery_app.py
断点续传完善 P0 src/tasks/*_tasks.py
资源配额系统 P0 src/tasks/quota_manager.py
任务状态 API P0 src/api/tasks.py, src/api/quota.py

Phase 2: 训练系统 ✅ 已完成

任务 优先级 状态 实现文件
训练任务管理 P0 src/training/config.py
检查点管理 P0 src/training/checkpoint_manager.py
训练监控 P1 src/training/monitor.py
自动质量评估 P1 src/training/evaluator.py
训练API P0 src/api/training_v2.py

Phase 3: 模型市场 ✅ 已完成

任务 优先级 状态 实现文件
模型发布流程 P0 src/market/publisher.py
模型数据模型 P0 src/market/models.py
计费系统 P0 src/market/billing.py
评价系统 P1 src/market/store.py
市场 API P0 src/api/market.py

Phase 4: 扩展能力 ✅ 已完成

任务 优先级 状态 实现文件
GPU 集群支持 P1 src/cluster/gpu_pool.py
分布式训练 P2 src/cluster/distributed.py
A/B 测试 P2 src/cluster/ab_testing.py
API 网关 P2 src/cluster/gateway.py
集群 API P1 src/api/cluster.py

七、技术选型

组件 推荐方案 备选
任务队列 Celery + Redis RQ, Dramatiq
GPU 调度 NVIDIA Triton Ray Serve
对象存储 MinIO / S3 阿里云 OSS
向量数据库 FAISS + PostgreSQL Milvus, Qdrant
监控 Prometheus + Grafana DataDog
日志 ELK Stack Loki

八、当前系统差距分析

功能 当前状态 目标 差距
文件上传断点续传 ✅ 已实现 -
分块处理 ✅ 同步处理 异步+断点 需改造
向量索引 ✅ 同步处理 异步+增量 需改造
模型训练 ✅ 手动触发 任务队列+断点 需改造
多用户并发 ⚠️ 基础支持 资源调度 需新增
模型市场 ❌ 无 完整市场 需新增
GPU 调度 ❌ 单机 资源池 需新增

文档版本: v1.0 更新日期: 2026-01-23