MBE 专家推荐系统设计
愿景
MBE 的目标是生产可信的、有能力的专家模型,向需要优质专家的用户进行智能推荐。
与抖音推荐算法的对比
| 抖音推荐 |
MBE 专家推荐 |
对应实现 |
| 视频内容 |
专家能力 |
知识库质量评估 |
| 用户画像 |
用户需求画像 |
HOPE 记忆系统 |
| 行为序列 |
咨询历史 |
TITANS 长期记忆 |
| 点赞/完播 |
满意度/复购 |
反馈学习系统 |
| 协同过滤 |
相似用户推荐 |
用户聚类 |
| 探索与利用 |
新专家曝光 |
多臂老虎机 |
| 实时反馈 |
即时调整 |
惊讶度学习 |
系统架构
┌─────────────────────────────────────────────────────────────────────┐
│ MBE 专家推荐系统 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │
│ │ 用户理解层 │ │ 专家理解层 │ │ 匹配决策层 │ │
│ │ │ │ │ │ │ │
│ │ • 意图分析 │ │ • 能力评估 │ │ • 召回 │ │
│ │ • 需求画像 │ │ • 可信度评分 │ │ • 精排 │ │
│ │ • 行为序列 │ │ • 专业领域 │ │ • 重排 │ │
│ │ • 偏好学习 │ │ • 用户评价 │ │ • 多样性 │ │
│ └────────┬────────┘ └────────┬────────┘ └──────┬───────┘ │
│ │ │ │ │
│ └───────────────────────┼──────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 推荐引擎核心 │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 协同 │ │ 内容 │ │ 知识 │ │ 实时 │ │ │
│ │ │ 过滤 │ │ 匹配 │ │ 图谱 │ │ 反馈 │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ │ 相似用户│ │ 意图- │ │ TITANS │ │ 惊讶度 │ │ │
│ │ │ 相似专家│ │ 专家 │ │ 关系 │ │ 学习 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 存储与记忆层 │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ TITANS │ │ HOPE │ │ Redis │ │ Vector │ │ │
│ │ │ 长期 │ │ 用户 │ │ 实时 │ │ 向量 │ │ │
│ │ │ 记忆 │ │ 画像 │ │ 缓存 │ │ 检索 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
核心算法
1. 专家可信度评分 (Expert Trust Score)
def calculate_expert_trust_score(expert_id: str) -> float:
"""
计算专家可信度评分 (0-100)
维度:
1. 知识库质量 (40%)
2. 用户满意度 (30%)
3. 回答准确性 (20%)
4. 活跃度 (10%)
"""
# 知识库质量:文档数量、更新频率、来源可信度
kb_quality = evaluate_knowledge_base(expert_id)
# 用户满意度:评分、复购率、推荐率
satisfaction = calculate_user_satisfaction(expert_id)
# 回答准确性:Self-Critique 验证通过率
accuracy = get_self_critique_pass_rate(expert_id)
# 活跃度:更新频率、响应时间
activity = calculate_activity_score(expert_id)
return (
kb_quality * 0.4 +
satisfaction * 0.3 +
accuracy * 0.2 +
activity * 0.1
)
2. 用户-专家匹配分 (User-Expert Match Score)
def calculate_match_score(user_id: str, expert_id: str, query: str) -> float:
"""
计算用户与专家的匹配分数
融合多路召回:
1. 意图匹配 (米塞斯行为学)
2. 历史偏好 (TITANS/HOPE)
3. 协同过滤 (相似用户喜欢)
4. 专家质量 (可信度)
"""
# 1. 意图匹配分
intent = analyzer.analyze(query, user_id)
intent_score = get_intent_match(intent, expert_id)
# 2. 历史偏好分 (TITANS)
history_score = titans.get_user_expert_affinity(user_id, expert_id)
# 3. 协同过滤分
cf_score = collaborative_filter(user_id, expert_id)
# 4. 专家质量分
trust_score = get_expert_trust_score(expert_id) / 100
# 加权融合
return (
intent_score * 0.4 + # 当前需求最重要
history_score * 0.25 + # 历史偏好次之
cf_score * 0.15 + # 相似用户参考
trust_score * 0.2 # 专家质量保底
)
3. 协同过滤 (Collaborative Filtering)
class ExpertCollaborativeFilter:
"""
基于用户行为的协同过滤
两种方式:
1. User-based: 找相似用户,推荐他们喜欢的专家
2. Item-based: 找相似专家,推荐用户可能喜欢的
"""
def __init__(self):
self._user_expert_matrix = {} # 用户-专家交互矩阵
self._user_similarity = {} # 用户相似度缓存
self._expert_similarity = {} # 专家相似度缓存
def user_based_recommend(self, user_id: str, top_k: int = 5) -> List[str]:
"""基于相似用户的推荐"""
# 1. 找到相似用户
similar_users = self.find_similar_users(user_id, top_n=20)
# 2. 收集相似用户喜欢但当前用户没用过的专家
candidate_experts = {}
for sim_user, similarity in similar_users:
for expert_id, rating in self._user_expert_matrix.get(sim_user, {}).items():
if expert_id not in self._user_expert_matrix.get(user_id, {}):
if expert_id not in candidate_experts:
candidate_experts[expert_id] = 0
candidate_experts[expert_id] += similarity * rating
# 3. 排序返回
sorted_experts = sorted(candidate_experts.items(), key=lambda x: x[1], reverse=True)
return [e[0] for e in sorted_experts[:top_k]]
def item_based_recommend(self, user_id: str, top_k: int = 5) -> List[str]:
"""基于相似专家的推荐"""
user_history = self._user_expert_matrix.get(user_id, {})
candidate_experts = {}
for used_expert, rating in user_history.items():
# 找到相似专家
similar_experts = self.find_similar_experts(used_expert, top_n=10)
for sim_expert, similarity in similar_experts:
if sim_expert not in user_history:
if sim_expert not in candidate_experts:
candidate_experts[sim_expert] = 0
candidate_experts[sim_expert] += similarity * rating
sorted_experts = sorted(candidate_experts.items(), key=lambda x: x[1], reverse=True)
return [e[0] for e in sorted_experts[:top_k]]
4. 探索与利用 (Exploration vs Exploitation)
class ExpertExplorationStrategy:
"""
平衡推荐熟悉专家和探索新专家
使用 ε-greedy 或 UCB 算法
"""
def __init__(self, epsilon: float = 0.1):
self.epsilon = epsilon
self._expert_stats = {} # 专家被推荐次数和成功率
def select_expert(self, candidates: List[Tuple[str, float]],
user_id: str) -> str:
"""
选择要推荐的专家
以 1-ε 概率选择最高分专家(利用)
以 ε 概率随机选择新专家(探索)
"""
import random
if random.random() < self.epsilon:
# 探索:选择一个用户没用过的新专家
new_experts = self._get_unexplored_experts(user_id, candidates)
if new_experts:
return random.choice(new_experts)
# 利用:选择最高分
return candidates[0][0] if candidates else None
def ucb_select(self, candidates: List[Tuple[str, float]],
total_rounds: int) -> str:
"""
UCB (Upper Confidence Bound) 选择
平衡:预期收益 + 不确定性奖励
"""
ucb_scores = []
for expert_id, base_score in candidates:
stats = self._expert_stats.get(expert_id, {"n": 0, "success": 0})
n = stats["n"] + 1
# UCB 公式
exploitation = base_score
exploration = math.sqrt(2 * math.log(total_rounds + 1) / n)
ucb_scores.append((expert_id, exploitation + exploration))
ucb_scores.sort(key=lambda x: x[1], reverse=True)
return ucb_scores[0][0]
5. 实时反馈处理
class RealtimeFeedbackProcessor:
"""
实时处理用户反馈,调整推荐
类似抖音的即时反馈机制
"""
def __init__(self):
self._session_feedback = {} # 会话级反馈
def process_feedback(self, user_id: str, expert_id: str,
feedback_type: str, value: float):
"""
处理各种反馈信号
feedback_type:
- "rating": 直接评分
- "duration": 对话时长(类似完播率)
- "switch": 切换专家(负向信号)
- "followup": 继续追问(正向信号)
- "share": 分享(强正向信号)
"""
# 1. 更新会话级反馈
self._update_session_feedback(user_id, expert_id, feedback_type, value)
# 2. 存入 TITANS(长期记忆)
importance = self._calculate_importance(feedback_type, value)
titans.store_feedback(user_id, expert_id, feedback_type, value, importance)
# 3. 更新用户画像 (HOPE)
hope.update_user_preference(user_id, expert_id, feedback_type, value)
# 4. 更新专家统计
self._update_expert_stats(expert_id, feedback_type, value)
def _calculate_importance(self, feedback_type: str, value: float) -> float:
"""计算反馈重要性(用于惊讶度学习)"""
importance_map = {
"rating": 1.0,
"duration": 0.5,
"switch": 1.5, # 切换是负向惊讶,需要特别记住
"followup": 0.8,
"share": 2.0 # 分享是强信号
}
base = importance_map.get(feedback_type, 1.0)
# 极端值更重要
if value < 0.3 or value > 0.9:
base *= 1.3
return base
推荐流程
用户请求
│
▼
┌─────────────────────────────────────────────────────────┐
│ 1. 用户理解 │
│ - 解析当前意图(米塞斯行为学) │
│ - 加载用户画像(HOPE) │
│ - 获取历史偏好(TITANS) │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 2. 多路召回 (Recall) │
│ - 意图匹配召回:基于当前需求 │
│ - 协同过滤召回:基于相似用户 │
│ - 热门召回:高质量专家 │
│ - 探索召回:新专家曝光 │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 3. 精排 (Ranking) │
│ - 计算综合匹配分 │
│ - 考虑专家可信度 │
│ - 融合实时反馈 │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 4. 重排 (Re-ranking) │
│ - 多样性打散:避免同类专家扎堆 │
│ - 新鲜度提升:新专家适当加分 │
│ - 业务规则:付费专家、认证专家 │
└─────────────────────────────────────────────────────────┘
│
▼
推荐结果
现有 MBE 能力映射
| 推荐系统组件 |
MBE 现有能力 |
状态 |
| 用户画像 |
HOPE 记忆系统 |
✅ 已有 |
| 长期记忆 |
TITANS |
✅ 已有 |
| 意图理解 |
米塞斯行为学分析 |
✅ 已有 |
| 关键词匹配 |
TITANS 关键词层级 |
✅ 已有 |
| 反馈学习 |
惊讶度学习 |
✅ 已有 |
| 向量检索 |
MIRAS + Embedding |
✅ 已有 |
| 协同过滤 |
collaborative_filter.py |
✅ 已完成 |
| 专家评分 |
expert_trust_score.py |
✅ 已完成 |
| 探索机制 |
exploration_strategy.py |
✅ 已完成 |
| 多样性重排 |
recommendation_engine.py |
✅ 已完成 |
| 统一专家池 |
unified_expert_pool.py |
✅ 已适配 |
实现路线图
阶段 1:专家质量体系 ✅ 已完成
阶段 2:协同过滤 ✅ 已完成
阶段 3:推荐引擎 ✅ 已完成
阶段 4:探索与优化 ✅ 已完成
已实现的核心模块
1. 专家信任评分 (src/market/expert_trust_score.py)
# 评分维度权重
WEIGHTS = {
"kb": 0.40, # 知识库质量
"satisfaction": 0.30, # 用户满意度
"accuracy": 0.20, # 回答准确性
"activity": 0.10 # 活跃度
}
# 信任等级
TRUST_LEVELS = {
"excellent": 85, # 优秀
"good": 70, # 良好
"average": 50, # 一般
"poor": 0 # 较差
}
2. 协同过滤 (src/market/collaborative_filter.py)
# 相似度算法
- Pearson Correlation (用户相似度)
- Cosine Similarity (专家相似度)
# 推荐策略
- User-based: 相似用户喜欢什么
- Item-based: 相似专家有哪些
- Hybrid: 混合推荐 (60% User + 40% Item)
3. 推荐引擎 (src/market/recommendation_engine.py)
# 推荐权重
RECOMMENDATION_WEIGHTS = {
"intent_match": 0.35, # 意图匹配
"trust_score": 0.25, # 专家信任度
"cf_score": 0.20, # 协同过滤
"user_preference": 0.15, # 用户偏好
"freshness": 0.05 # 新鲜度
}
4. 探索策略 (src/market/exploration_strategy.py)
# 三种探索算法
class ExpertExplorationStrategy:
# ε-greedy: 简单有效
def epsilon_greedy_select(candidates, user_id) -> ExplorationResult
# UCB: 理论最优
def ucb_select(candidates, user_id) -> ExplorationResult
# UCB = avg_reward + c * sqrt(ln(N) / n)
# Thompson Sampling: 贝叶斯方法
def thompson_sampling_select(candidates, user_id) -> ExplorationResult
5. 实时反馈处理 (src/market/realtime_feedback.py)
# 反馈类型和权重(完整版)
FEEDBACK_WEIGHTS = {
FeedbackType.RATING: 1.0, # 直接评分
FeedbackType.LIKE: 0.5, # 点赞
FeedbackType.DISLIKE: 1.2, # 踩(负面反馈更重要)
FeedbackType.REPORT: 2.0, # 举报
FeedbackType.CLICK: 0.2, # 点击
FeedbackType.IMPRESSION: 0.1, # 曝光
FeedbackType.SESSION_DURATION: 0.6, # 会话时长
FeedbackType.CONVERSATION_TURNS: 0.5,# 对话轮次
FeedbackType.SWITCH_EXPERT: 1.5, # 切换专家(强负向)
FeedbackType.FOLLOWUP: 0.8, # 追问
FeedbackType.SHARE: 1.5, # 分享(强正向)
FeedbackType.SUBSCRIBE: 1.8, # 订阅
FeedbackType.UNSUBSCRIBE: 1.5, # 取消订阅
FeedbackType.PURCHASE: 2.0 # 购买(极强正向)
}
# 惊讶度学习
surprise = base_weight * (1 + deviation)
if polarity == "negative":
surprise *= 1.5 # 负面反馈更惊讶
if surprise > 0.5:
trigger_learning() # 高惊讶度触发即时学习
6. 推荐解释器 (src/market/recommendation_explainer.py)
# 解释因素
ExplanationType:
INTENT_MATCH # 意图匹配
COLLABORATIVE # 协同过滤
TRUST_SCORE # 信任度
USER_HISTORY # 用户历史
EXPLORATION # 探索
FRESHNESS # 新鲜度
DIVERSITY # 多样性
# 生成解释
explanation = explainer.explain_recommendation(
expert_id, expert_name, score, rank, breakdown, context
)
# 返回: short_explanation, full_explanation, main_reasons
7. API 端点
信任度端点 (/api/trust)
| 端点 |
方法 |
功能 |
/api/trust/score/{expert_id} |
GET |
获取专家信任分数 |
/api/trust/leaderboard |
GET |
获取信任度排行榜 |
/api/trust/batch |
POST |
批量计算信任分数 |
/api/trust/interaction |
POST |
记录用户交互 |
/api/trust/profile/{expert_id} |
GET |
获取完整档案 |
/api/trust/recalculate-all |
POST |
重算所有专家分数 |
市场推荐端点 (/api/market)
| 端点 |
方法 |
功能 |
/api/market/ |
GET |
市场首页(包含个性化推荐) |
/api/market/recommendations/for-user/{user_id} |
GET |
为用户生成个性化推荐 |
/api/market/recommendations/for-question |
GET |
基于问题推荐专家 |
/api/market/recommendations/similar/{expert_id} |
GET |
相似专家推荐 |
反馈端点 (/api/feedback)
| 端点 |
方法 |
功能 |
/api/feedback/expert |
POST |
提交专家反馈 |
/api/feedback/answer |
POST |
提交答案反馈 |
/api/feedback/switch |
POST |
记录专家切换 |
/api/feedback/stats |
GET |
获取反馈统计 |
8. 接口适配与性能优化 (2026-01-27)
统一专家池适配
推荐引擎现已适配 UnifiedExpertPool 接口:
# UnifiedExpertPool 新增方法
def get_all_experts(self) -> List[UnifiedExpert]:
"""获取所有专家(同步方法,用于推荐引擎)"""
all_experts = []
for expert in self._platform_experts.values():
if expert.enabled:
all_experts.append(expert)
for expert in self._market_experts.values():
all_experts.append(expert)
return all_experts
候选专家获取流程
async def _get_candidate_experts(self) -> List[Dict]:
"""获取候选专家列表 - 多源融合"""
experts = []
# 1. 从 UnifiedExpertPool 获取
if self._expert_pool:
pool_experts = self._expert_pool.get_all_experts()
for e in pool_experts:
experts.append({
"id": e.id,
"name": e.name,
"description": e.description,
"category": e.category or e.domains[0],
"keywords": e.keywords,
"price": e.price_per_1k_tokens,
"rating": e.rating_avg
})
# 2. 从 ExpertRouter 获取(备选)
if not experts:
router = get_expert_router()
for expert_id, config in router.experts.items():
experts.append({...})
# 3. 从 MarketStore 补充(市场专家)
if self._market_store:
market_experts = await self._market_store.get_published_models()
experts.extend(market_experts)
return experts
Redis 快速失败机制
避免 Redis 不可用时的重复超时:
class ExpertTrustScorer:
def __init__(self):
self._redis = None
self._redis_unavailable = False # 快速失败标志
async def _get_redis(self):
# 已知不可用,直接返回
if self._redis_unavailable:
return None
if self._redis is None:
try:
redis_host = os.environ.get("REDIS_HOST", "localhost")
self._redis = aioredis.from_url(
f"redis://{redis_host}:6379/0",
socket_connect_timeout=1.0, # 1秒超时
socket_timeout=2.0
)
await self._redis.ping()
except Exception as e:
self._redis_unavailable = True # 标记不可用
self._redis = None
return self._redis
性能优化效果
| 指标 |
优化前 |
优化后 |
提升 |
| Redis 失败重试 |
每次 3s 超时 |
立即跳过 |
100x |
| 24专家分数计算 |
~5分钟 |
<1秒 |
300x |
| 候选专家获取 |
单源 |
多源融合 |
更完整 |
数据埋点需求
# 需要采集的用户行为数据
TRACKING_EVENTS = {
# 曝光
"expert_impression": "专家曝光",
# 点击
"expert_click": "点击进入专家",
# 对话
"conversation_start": "开始对话",
"conversation_turn": "对话轮次",
"conversation_end": "结束对话",
# 反馈
"rating_submit": "提交评分",
"expert_switch": "切换专家",
"expert_share": "分享专家",
"expert_subscribe": "订阅专家",
"expert_purchase": "购买专家服务",
# 时长
"session_duration": "会话时长",
"response_read_time": "阅读回复时长"
}
预期效果
| 指标 |
当前 |
目标 |
| 专家匹配准确率 |
~70% |
90%+ |
| 用户留存率 |
- |
提升 30% |
| 专家复购率 |
- |
提升 50% |
| 新专家曝光率 |
低 |
每用户/周 2-3 新专家 |
| 推荐多样性 |
低 |
高(避免信息茧房) |
参考
- 抖音推荐算法:协同过滤 + 内容理解 + 实时反馈
- YouTube 推荐:深度神经网络 + 多目标优化
- Netflix 推荐:矩阵分解 + 深度学习
- 米塞斯行为学:人的行为是有目的的