开发者权限规范

🎯 设计目标

  1. 身份认证 - 确保开发者身份可信
  2. 数据隔离 - 开发者只能访问自己创建的专家
  3. 隐私保护 - 保护终端用户的隐私数据
  4. 操作审计 - 记录所有敏感操作

🏗️ 权限架构

┌─────────────────────────────────────────────────────────────────┐
│                          MBE 平台                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐      │
│  │   平台管理员   │    │    开发者     │    │   终端用户    │      │
│  │   (Admin)    │    │  (Developer) │    │   (EndUser)  │      │
│  └──────┬───────┘    └──────┬───────┘    └──────┬───────┘      │
│         │                   │                   │               │
│         ▼                   ▼                   ▼               │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐      │
│  │ 系统级权限    │    │ 专家级权限    │    │ 使用级权限    │      │
│  │ - 全系统配置  │    │ - 创建专家    │    │ - 与专家对话  │      │
│  │ - 所有数据    │    │ - 查看自己专家 │    │ - 提交反馈    │      │
│  │ - 代码修改    │    │ - 获取建议    │    │              │      │
│  └──────────────┘    └──────────────┘    └──────────────┘      │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

👤 开发者身份管理

1. 开发者注册

@dataclass
class Developer:
    """开发者账户"""
    id: str                    # 唯一标识 (UUID)
    username: str              # 用户名
    email: str                 # 邮箱
    api_key: str               # API 密钥
    api_secret: str            # API 密钥(哈希存储)
    status: str                # active / suspended / deleted
    tier: str                  # free / pro / enterprise
    created_at: datetime
    last_login: datetime
    
    # 配额
    max_experts: int           # 最大专家数
    max_requests_per_day: int  # 每日请求限制
    max_storage_mb: int        # 存储限制

2. 认证方式

方式A: API Key 认证(推荐)

# 请求头携带 API Key
Authorization: Bearer {api_key}

# 或者使用自定义头
X-Developer-Key: {api_key}
X-Developer-Secret: {api_secret}

方式B: OAuth 2.0 认证

1. 开发者在 MBE 平台注册应用
2. 获取 client_id 和 client_secret
3. 使用 OAuth 2.0 流程获取 access_token
4. 使用 access_token 访问 API

方式C: JWT Token 认证

# 开发者登录后获取 JWT Token
POST /api/developer/login
{
    "email": "developer@example.com",
    "password": "..."
}

# 响应
{
    "access_token": "eyJhbG...",
    "refresh_token": "...",
    "expires_in": 3600
}

# 后续请求携带 Token
Authorization: Bearer eyJhbG...

🔐 数据隔离机制

1. 专家所有权

@dataclass
class ExpertOwnership:
    """专家所有权记录"""
    expert_id: str         # 专家ID
    developer_id: str      # 创建者ID
    created_at: datetime   # 创建时间
    is_public: bool        # 是否公开(可被其他用户使用)
    shared_with: List[str] # 共享给哪些开发者

2. 数据访问控制

class DataAccessControl:
    """数据访问控制"""
    
    async def can_access_expert(
        self, 
        developer_id: str, 
        expert_id: str
    ) -> bool:
        """检查开发者是否有权访问专家"""
        ownership = await self.get_ownership(expert_id)
        
        if not ownership:
            return False
        
        # 创建者有完全访问权
        if ownership.developer_id == developer_id:
            return True
        
        # 被共享的开发者有访问权
        if developer_id in ownership.shared_with:
            return True
        
        return False
    
    async def can_access_data(
        self,
        developer_id: str,
        expert_id: str,
        data_type: str
    ) -> bool:
        """检查开发者是否有权访问特定数据"""
        if not await self.can_access_expert(developer_id, expert_id):
            return False
        
        # 检查数据类型权限
        allowed_data = {
            "metrics": True,      # 允许:聚合统计
            "issues": True,       # 允许:问题列表
            "suggestions": True,  # 允许:优化建议
            "raw_logs": False,    # 禁止:原始对话日志
            "user_ids": False,    # 禁止:用户ID
            "user_messages": False # 禁止:用户消息原文
        }
        
        return allowed_data.get(data_type, False)

🛡️ 隐私保护

1. 数据脱敏规则

class DataAnonymizer:
    """数据脱敏器"""
    
    def anonymize_user_id(self, user_id: str) -> str:
        """用户ID脱敏"""
        # 使用哈希 + 截断
        return f"user_{hashlib.sha256(user_id.encode()).hexdigest()[:8]}"
    
    def anonymize_message(self, message: str) -> str:
        """消息内容脱敏"""
        # 移除敏感信息
        patterns = [
            (r'\b\d{11}\b', '[手机号]'),           # 手机号
            (r'\b\d{18}\b', '[身份证]'),           # 身份证
            (r'[\w.-]+@[\w.-]+\.\w+', '[邮箱]'),   # 邮箱
            (r'\b\d{16,19}\b', '[银行卡]'),        # 银行卡
        ]
        
        result = message
        for pattern, replacement in patterns:
            result = re.sub(pattern, replacement, result)
        
        return result
    
    def anonymize_metrics(self, metrics: dict) -> dict:
        """指标数据脱敏"""
        # 只保留聚合数据,移除个体数据
        return {
            "total_conversations": metrics.get("total_conversations"),
            "unique_users": metrics.get("unique_users"),
            "avg_response_time": metrics.get("avg_response_time"),
            # 不返回具体用户列表
            # "user_list": REMOVED
        }

2. 开发者可访问的数据

数据类型 可访问 脱敏方式
聚合统计 无需脱敏
响应时间 无需脱敏
切换率 无需脱敏
命中率 无需脱敏
问题类型 无需脱敏
用户ID 哈希处理
对话内容 ⚠️ 重度脱敏
个人信息 完全移除

3. 数据保留策略

# 数据保留期限
data_retention:
  # 聚合统计 - 永久保留
  aggregated_metrics: permanent
  
  # 问题检测记录 - 90天
  issue_records: 90_days
  
  # 优化建议 - 30天(应用后删除)
  suggestions: 30_days
  
  # 原始日志 - 开发者不可访问
  raw_logs: developer_inaccessible

📋 权限矩阵

开发者权限

操作 Free Pro Enterprise
创建专家 3个 20个 无限
查看指标
问题检测 基础 全部 全部
优化建议 基础 全部 全部+定制
自动应用建议
API 调用限制 100/天 10000/天 无限
Webhook 通知
专属支持

API 端点权限

端点 认证要求 数据范围
GET /experts API Key 仅自己的专家
GET /experts/{id}/metrics API Key 脱敏后的指标
GET /experts/{id}/issues API Key 问题列表
GET /experts/{id}/suggestions API Key 建议列表
POST /experts/{id}/apply API Key 仅可自动应用的
GET /dashboard API Key 汇总数据

🔧 实现方案

1. 数据库表设计

-- 开发者表
CREATE TABLE developers (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    api_key VARCHAR(64) UNIQUE NOT NULL,
    api_secret_hash VARCHAR(255) NOT NULL,
    status VARCHAR(20) DEFAULT 'active',
    tier VARCHAR(20) DEFAULT 'free',
    created_at TIMESTAMP DEFAULT NOW(),
    last_login TIMESTAMP,
    
    -- 配额
    max_experts INT DEFAULT 3,
    max_requests_per_day INT DEFAULT 100,
    max_storage_mb INT DEFAULT 100
);

-- 专家所有权表
CREATE TABLE expert_ownership (
    expert_id VARCHAR(100) PRIMARY KEY,
    developer_id UUID REFERENCES developers(id),
    created_at TIMESTAMP DEFAULT NOW(),
    is_public BOOLEAN DEFAULT FALSE,
    shared_with UUID[] DEFAULT '{}'
);

-- API 调用日志
CREATE TABLE api_access_log (
    id SERIAL PRIMARY KEY,
    developer_id UUID REFERENCES developers(id),
    endpoint VARCHAR(255),
    method VARCHAR(10),
    expert_id VARCHAR(100),
    timestamp TIMESTAMP DEFAULT NOW(),
    ip_address INET,
    user_agent TEXT,
    response_code INT
);

-- 创建索引
CREATE INDEX idx_ownership_developer ON expert_ownership(developer_id);
CREATE INDEX idx_access_log_developer ON api_access_log(developer_id, timestamp);

2. 认证中间件

from fastapi import Depends, HTTPException, Header
from typing import Optional

class DeveloperAuth:
    """开发者认证"""
    
    async def get_current_developer(
        self,
        authorization: Optional[str] = Header(None),
        x_developer_key: Optional[str] = Header(None)
    ) -> Developer:
        """获取当前开发者"""
        
        # 方式1: Bearer Token
        if authorization and authorization.startswith("Bearer "):
            token = authorization[7:]
            developer = await self.verify_token(token)
            if developer:
                return developer
        
        # 方式2: API Key
        if x_developer_key:
            developer = await self.verify_api_key(x_developer_key)
            if developer:
                return developer
        
        raise HTTPException(
            status_code=401,
            detail="未认证或认证已过期"
        )
    
    async def verify_api_key(self, api_key: str) -> Optional[Developer]:
        """验证 API Key"""
        # 从数据库查询
        developer = await db.developers.find_one({"api_key": api_key})
        
        if not developer:
            return None
        
        if developer.status != "active":
            raise HTTPException(status_code=403, detail="账户已被禁用")
        
        # 更新最后访问时间
        await db.developers.update_one(
            {"id": developer.id},
            {"$set": {"last_login": datetime.now()}}
        )
        
        return developer
    
    async def check_rate_limit(self, developer: Developer) -> bool:
        """检查速率限制"""
        today = datetime.now().date()
        
        # 统计今日调用次数
        count = await db.api_access_log.count_documents({
            "developer_id": developer.id,
            "timestamp": {"$gte": datetime.combine(today, time.min)}
        })
        
        if count >= developer.max_requests_per_day:
            raise HTTPException(
                status_code=429,
                detail=f"已达到每日请求限制 ({developer.max_requests_per_day})"
            )
        
        return True

3. 权限检查装饰器

from functools import wraps

def require_expert_access(func):
    """要求专家访问权限的装饰器"""
    @wraps(func)
    async def wrapper(*args, developer: Developer, expert_id: str, **kwargs):
        # 检查专家所有权
        ownership = await db.expert_ownership.find_one({"expert_id": expert_id})
        
        if not ownership:
            raise HTTPException(status_code=404, detail="专家不存在")
        
        if ownership.developer_id != developer.id:
            if developer.id not in ownership.shared_with:
                raise HTTPException(status_code=403, detail="无权访问此专家")
        
        return await func(*args, developer=developer, expert_id=expert_id, **kwargs)
    
    return wrapper


def require_tier(minimum_tier: str):
    """要求特定服务等级的装饰器"""
    tier_levels = {"free": 0, "pro": 1, "enterprise": 2}
    
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, developer: Developer, **kwargs):
            if tier_levels.get(developer.tier, 0) < tier_levels.get(minimum_tier, 0):
                raise HTTPException(
                    status_code=403,
                    detail=f"此功能需要 {minimum_tier} 或更高等级"
                )
            return await func(*args, developer=developer, **kwargs)
        return wrapper
    return decorator

4. 使用示例

from fastapi import APIRouter, Depends

router = APIRouter()
auth = DeveloperAuth()

@router.get("/experts/{expert_id}/metrics")
@require_expert_access
async def get_expert_metrics(
    expert_id: str,
    developer: Developer = Depends(auth.get_current_developer)
):
    """获取专家指标(需要认证 + 专家访问权限)"""
    
    # 检查速率限制
    await auth.check_rate_limit(developer)
    
    # 记录访问日志
    await log_api_access(developer.id, f"/experts/{expert_id}/metrics", "GET")
    
    # 获取并脱敏数据
    metrics = await get_raw_metrics(expert_id)
    anonymized = DataAnonymizer().anonymize_metrics(metrics)
    
    return {"success": True, "metrics": anonymized}


@router.post("/experts/{expert_id}/apply-suggestions")
@require_expert_access
@require_tier("pro")  # 需要 Pro 等级
async def apply_suggestions(
    expert_id: str,
    request: ApplySuggestionRequest,
    developer: Developer = Depends(auth.get_current_developer)
):
    """应用优化建议(需要 Pro 等级)"""
    # ...

📊 审计日志

记录的操作

class AuditAction(str, Enum):
    """审计操作类型"""
    LOGIN = "login"
    LOGOUT = "logout"
    VIEW_METRICS = "view_metrics"
    VIEW_ISSUES = "view_issues"
    VIEW_SUGGESTIONS = "view_suggestions"
    APPLY_SUGGESTION = "apply_suggestion"
    CREATE_EXPERT = "create_expert"
    DELETE_EXPERT = "delete_expert"
    SHARE_EXPERT = "share_expert"
    EXPORT_DATA = "export_data"


async def log_audit(
    developer_id: str,
    action: AuditAction,
    resource_id: str = None,
    details: dict = None,
    ip_address: str = None
):
    """记录审计日志"""
    await db.audit_log.insert_one({
        "developer_id": developer_id,
        "action": action.value,
        "resource_id": resource_id,
        "details": details,
        "ip_address": ip_address,
        "timestamp": datetime.now()
    })

审计日志查询

@router.get("/audit-log")
@require_tier("enterprise")
async def get_audit_log(
    developer: Developer = Depends(auth.get_current_developer),
    start_date: datetime = None,
    end_date: datetime = None,
    action: str = None
):
    """查询审计日志(仅 Enterprise)"""
    query = {"developer_id": developer.id}
    
    if start_date:
        query["timestamp"] = {"$gte": start_date}
    if end_date:
        query.setdefault("timestamp", {})["$lte"] = end_date
    if action:
        query["action"] = action
    
    logs = await db.audit_log.find(query).sort("timestamp", -1).limit(100)
    return {"success": True, "logs": logs}

🚀 实施步骤

阶段1: 基础认证(1周)

  1. ✅ 创建 developers 表
  2. ✅ 实现 API Key 认证
  3. ✅ 实现专家所有权检查
  4. ✅ 基础速率限制

阶段2: 数据隔离(1周)

  1. 📝 实现数据脱敏器
  2. 📝 实现数据访问控制
  3. 📝 添加审计日志

阶段3: 高级功能(1周)

  1. 📝 实现 OAuth 2.0
  2. 📝 实现服务等级限制
  3. 📝 实现 Webhook 通知
  4. 📝 开发者管理后台

✅ 总结

核心原则

  1. 最小权限 - 开发者只能访问必要的数据
  2. 数据隔离 - 严格的专家所有权检查
  3. 隐私优先 - 用户数据必须脱敏
  4. 可审计 - 所有敏感操作记录日志

安全保障

威胁 防护措施
未授权访问 API Key + 所有权检查
数据泄露 数据脱敏 + 访问控制
滥用 API 速率限制 + 配额
操作追溯 审计日志

是否现在开始实现基础认证功能?