开发者权限规范
🎯 设计目标
- 身份认证 - 确保开发者身份可信
- 数据隔离 - 开发者只能访问自己创建的专家
- 隐私保护 - 保护终端用户的隐私数据
- 操作审计 - 记录所有敏感操作
🏗️ 权限架构
┌─────────────────────────────────────────────────────────────────┐
│ MBE 平台 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 平台管理员 │ │ 开发者 │ │ 终端用户 │ │
│ │ (Admin) │ │ (Developer) │ │ (EndUser) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 系统级权限 │ │ 专家级权限 │ │ 使用级权限 │ │
│ │ - 全系统配置 │ │ - 创建专家 │ │ - 与专家对话 │ │
│ │ - 所有数据 │ │ - 查看自己专家 │ │ - 提交反馈 │ │
│ │ - 代码修改 │ │ - 获取建议 │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
👤 开发者身份管理
1. 开发者注册
@dataclass
class Developer:
"""开发者账户"""
id: str # 唯一标识 (UUID)
username: str # 用户名
email: str # 邮箱
api_key: str # API 密钥
api_secret: str # API 密钥(哈希存储)
status: str # active / suspended / deleted
tier: str # free / pro / enterprise
created_at: datetime
last_login: datetime
# 配额
max_experts: int # 最大专家数
max_requests_per_day: int # 每日请求限制
max_storage_mb: int # 存储限制
2. 认证方式
方式A: API Key 认证(推荐)
# 请求头携带 API Key
Authorization: Bearer {api_key}
# 或者使用自定义头
X-Developer-Key: {api_key}
X-Developer-Secret: {api_secret}
方式B: OAuth 2.0 认证
1. 开发者在 MBE 平台注册应用
2. 获取 client_id 和 client_secret
3. 使用 OAuth 2.0 流程获取 access_token
4. 使用 access_token 访问 API
方式C: JWT Token 认证
# 开发者登录后获取 JWT Token
POST /api/developer/login
{
"email": "developer@example.com",
"password": "..."
}
# 响应
{
"access_token": "eyJhbG...",
"refresh_token": "...",
"expires_in": 3600
}
# 后续请求携带 Token
Authorization: Bearer eyJhbG...
🔐 数据隔离机制
1. 专家所有权
@dataclass
class ExpertOwnership:
"""专家所有权记录"""
expert_id: str # 专家ID
developer_id: str # 创建者ID
created_at: datetime # 创建时间
is_public: bool # 是否公开(可被其他用户使用)
shared_with: List[str] # 共享给哪些开发者
2. 数据访问控制
class DataAccessControl:
"""数据访问控制"""
async def can_access_expert(
self,
developer_id: str,
expert_id: str
) -> bool:
"""检查开发者是否有权访问专家"""
ownership = await self.get_ownership(expert_id)
if not ownership:
return False
# 创建者有完全访问权
if ownership.developer_id == developer_id:
return True
# 被共享的开发者有访问权
if developer_id in ownership.shared_with:
return True
return False
async def can_access_data(
self,
developer_id: str,
expert_id: str,
data_type: str
) -> bool:
"""检查开发者是否有权访问特定数据"""
if not await self.can_access_expert(developer_id, expert_id):
return False
# 检查数据类型权限
allowed_data = {
"metrics": True, # 允许:聚合统计
"issues": True, # 允许:问题列表
"suggestions": True, # 允许:优化建议
"raw_logs": False, # 禁止:原始对话日志
"user_ids": False, # 禁止:用户ID
"user_messages": False # 禁止:用户消息原文
}
return allowed_data.get(data_type, False)
🛡️ 隐私保护
1. 数据脱敏规则
class DataAnonymizer:
"""数据脱敏器"""
def anonymize_user_id(self, user_id: str) -> str:
"""用户ID脱敏"""
# 使用哈希 + 截断
return f"user_{hashlib.sha256(user_id.encode()).hexdigest()[:8]}"
def anonymize_message(self, message: str) -> str:
"""消息内容脱敏"""
# 移除敏感信息
patterns = [
(r'\b\d{11}\b', '[手机号]'), # 手机号
(r'\b\d{18}\b', '[身份证]'), # 身份证
(r'[\w.-]+@[\w.-]+\.\w+', '[邮箱]'), # 邮箱
(r'\b\d{16,19}\b', '[银行卡]'), # 银行卡
]
result = message
for pattern, replacement in patterns:
result = re.sub(pattern, replacement, result)
return result
def anonymize_metrics(self, metrics: dict) -> dict:
"""指标数据脱敏"""
# 只保留聚合数据,移除个体数据
return {
"total_conversations": metrics.get("total_conversations"),
"unique_users": metrics.get("unique_users"),
"avg_response_time": metrics.get("avg_response_time"),
# 不返回具体用户列表
# "user_list": REMOVED
}
2. 开发者可访问的数据
| 数据类型 |
可访问 |
脱敏方式 |
| 聚合统计 |
✅ |
无需脱敏 |
| 响应时间 |
✅ |
无需脱敏 |
| 切换率 |
✅ |
无需脱敏 |
| 命中率 |
✅ |
无需脱敏 |
| 问题类型 |
✅ |
无需脱敏 |
| 用户ID |
❌ |
哈希处理 |
| 对话内容 |
⚠️ |
重度脱敏 |
| 个人信息 |
❌ |
完全移除 |
3. 数据保留策略
# 数据保留期限
data_retention:
# 聚合统计 - 永久保留
aggregated_metrics: permanent
# 问题检测记录 - 90天
issue_records: 90_days
# 优化建议 - 30天(应用后删除)
suggestions: 30_days
# 原始日志 - 开发者不可访问
raw_logs: developer_inaccessible
📋 权限矩阵
开发者权限
| 操作 |
Free |
Pro |
Enterprise |
| 创建专家 |
3个 |
20个 |
无限 |
| 查看指标 |
✅ |
✅ |
✅ |
| 问题检测 |
基础 |
全部 |
全部 |
| 优化建议 |
基础 |
全部 |
全部+定制 |
| 自动应用建议 |
❌ |
✅ |
✅ |
| API 调用限制 |
100/天 |
10000/天 |
无限 |
| Webhook 通知 |
❌ |
✅ |
✅ |
| 专属支持 |
❌ |
❌ |
✅ |
API 端点权限
| 端点 |
认证要求 |
数据范围 |
GET /experts |
API Key |
仅自己的专家 |
GET /experts/{id}/metrics |
API Key |
脱敏后的指标 |
GET /experts/{id}/issues |
API Key |
问题列表 |
GET /experts/{id}/suggestions |
API Key |
建议列表 |
POST /experts/{id}/apply |
API Key |
仅可自动应用的 |
GET /dashboard |
API Key |
汇总数据 |
🔧 实现方案
1. 数据库表设计
-- 开发者表
CREATE TABLE developers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
api_key VARCHAR(64) UNIQUE NOT NULL,
api_secret_hash VARCHAR(255) NOT NULL,
status VARCHAR(20) DEFAULT 'active',
tier VARCHAR(20) DEFAULT 'free',
created_at TIMESTAMP DEFAULT NOW(),
last_login TIMESTAMP,
-- 配额
max_experts INT DEFAULT 3,
max_requests_per_day INT DEFAULT 100,
max_storage_mb INT DEFAULT 100
);
-- 专家所有权表
CREATE TABLE expert_ownership (
expert_id VARCHAR(100) PRIMARY KEY,
developer_id UUID REFERENCES developers(id),
created_at TIMESTAMP DEFAULT NOW(),
is_public BOOLEAN DEFAULT FALSE,
shared_with UUID[] DEFAULT '{}'
);
-- API 调用日志
CREATE TABLE api_access_log (
id SERIAL PRIMARY KEY,
developer_id UUID REFERENCES developers(id),
endpoint VARCHAR(255),
method VARCHAR(10),
expert_id VARCHAR(100),
timestamp TIMESTAMP DEFAULT NOW(),
ip_address INET,
user_agent TEXT,
response_code INT
);
-- 创建索引
CREATE INDEX idx_ownership_developer ON expert_ownership(developer_id);
CREATE INDEX idx_access_log_developer ON api_access_log(developer_id, timestamp);
2. 认证中间件
from fastapi import Depends, HTTPException, Header
from typing import Optional
class DeveloperAuth:
"""开发者认证"""
async def get_current_developer(
self,
authorization: Optional[str] = Header(None),
x_developer_key: Optional[str] = Header(None)
) -> Developer:
"""获取当前开发者"""
# 方式1: Bearer Token
if authorization and authorization.startswith("Bearer "):
token = authorization[7:]
developer = await self.verify_token(token)
if developer:
return developer
# 方式2: API Key
if x_developer_key:
developer = await self.verify_api_key(x_developer_key)
if developer:
return developer
raise HTTPException(
status_code=401,
detail="未认证或认证已过期"
)
async def verify_api_key(self, api_key: str) -> Optional[Developer]:
"""验证 API Key"""
# 从数据库查询
developer = await db.developers.find_one({"api_key": api_key})
if not developer:
return None
if developer.status != "active":
raise HTTPException(status_code=403, detail="账户已被禁用")
# 更新最后访问时间
await db.developers.update_one(
{"id": developer.id},
{"$set": {"last_login": datetime.now()}}
)
return developer
async def check_rate_limit(self, developer: Developer) -> bool:
"""检查速率限制"""
today = datetime.now().date()
# 统计今日调用次数
count = await db.api_access_log.count_documents({
"developer_id": developer.id,
"timestamp": {"$gte": datetime.combine(today, time.min)}
})
if count >= developer.max_requests_per_day:
raise HTTPException(
status_code=429,
detail=f"已达到每日请求限制 ({developer.max_requests_per_day})"
)
return True
3. 权限检查装饰器
from functools import wraps
def require_expert_access(func):
"""要求专家访问权限的装饰器"""
@wraps(func)
async def wrapper(*args, developer: Developer, expert_id: str, **kwargs):
# 检查专家所有权
ownership = await db.expert_ownership.find_one({"expert_id": expert_id})
if not ownership:
raise HTTPException(status_code=404, detail="专家不存在")
if ownership.developer_id != developer.id:
if developer.id not in ownership.shared_with:
raise HTTPException(status_code=403, detail="无权访问此专家")
return await func(*args, developer=developer, expert_id=expert_id, **kwargs)
return wrapper
def require_tier(minimum_tier: str):
"""要求特定服务等级的装饰器"""
tier_levels = {"free": 0, "pro": 1, "enterprise": 2}
def decorator(func):
@wraps(func)
async def wrapper(*args, developer: Developer, **kwargs):
if tier_levels.get(developer.tier, 0) < tier_levels.get(minimum_tier, 0):
raise HTTPException(
status_code=403,
detail=f"此功能需要 {minimum_tier} 或更高等级"
)
return await func(*args, developer=developer, **kwargs)
return wrapper
return decorator
4. 使用示例
from fastapi import APIRouter, Depends
router = APIRouter()
auth = DeveloperAuth()
@router.get("/experts/{expert_id}/metrics")
@require_expert_access
async def get_expert_metrics(
expert_id: str,
developer: Developer = Depends(auth.get_current_developer)
):
"""获取专家指标(需要认证 + 专家访问权限)"""
# 检查速率限制
await auth.check_rate_limit(developer)
# 记录访问日志
await log_api_access(developer.id, f"/experts/{expert_id}/metrics", "GET")
# 获取并脱敏数据
metrics = await get_raw_metrics(expert_id)
anonymized = DataAnonymizer().anonymize_metrics(metrics)
return {"success": True, "metrics": anonymized}
@router.post("/experts/{expert_id}/apply-suggestions")
@require_expert_access
@require_tier("pro") # 需要 Pro 等级
async def apply_suggestions(
expert_id: str,
request: ApplySuggestionRequest,
developer: Developer = Depends(auth.get_current_developer)
):
"""应用优化建议(需要 Pro 等级)"""
# ...
📊 审计日志
记录的操作
class AuditAction(str, Enum):
"""审计操作类型"""
LOGIN = "login"
LOGOUT = "logout"
VIEW_METRICS = "view_metrics"
VIEW_ISSUES = "view_issues"
VIEW_SUGGESTIONS = "view_suggestions"
APPLY_SUGGESTION = "apply_suggestion"
CREATE_EXPERT = "create_expert"
DELETE_EXPERT = "delete_expert"
SHARE_EXPERT = "share_expert"
EXPORT_DATA = "export_data"
async def log_audit(
developer_id: str,
action: AuditAction,
resource_id: str = None,
details: dict = None,
ip_address: str = None
):
"""记录审计日志"""
await db.audit_log.insert_one({
"developer_id": developer_id,
"action": action.value,
"resource_id": resource_id,
"details": details,
"ip_address": ip_address,
"timestamp": datetime.now()
})
审计日志查询
@router.get("/audit-log")
@require_tier("enterprise")
async def get_audit_log(
developer: Developer = Depends(auth.get_current_developer),
start_date: datetime = None,
end_date: datetime = None,
action: str = None
):
"""查询审计日志(仅 Enterprise)"""
query = {"developer_id": developer.id}
if start_date:
query["timestamp"] = {"$gte": start_date}
if end_date:
query.setdefault("timestamp", {})["$lte"] = end_date
if action:
query["action"] = action
logs = await db.audit_log.find(query).sort("timestamp", -1).limit(100)
return {"success": True, "logs": logs}
🚀 实施步骤
阶段1: 基础认证(1周)
- ✅ 创建 developers 表
- ✅ 实现 API Key 认证
- ✅ 实现专家所有权检查
- ✅ 基础速率限制
阶段2: 数据隔离(1周)
- 📝 实现数据脱敏器
- 📝 实现数据访问控制
- 📝 添加审计日志
阶段3: 高级功能(1周)
- 📝 实现 OAuth 2.0
- 📝 实现服务等级限制
- 📝 实现 Webhook 通知
- 📝 开发者管理后台
✅ 总结
核心原则
- 最小权限 - 开发者只能访问必要的数据
- 数据隔离 - 严格的专家所有权检查
- 隐私优先 - 用户数据必须脱敏
- 可审计 - 所有敏感操作记录日志
安全保障
| 威胁 |
防护措施 |
| 未授权访问 |
API Key + 所有权检查 |
| 数据泄露 |
数据脱敏 + 访问控制 |
| 滥用 API |
速率限制 + 配额 |
| 操作追溯 |
审计日志 |
是否现在开始实现基础认证功能?