MBE 专家推荐系统设计

愿景

MBE 的目标是生产可信的、有能力的专家模型,向需要优质专家的用户进行智能推荐

与抖音推荐算法的对比

抖音推荐 MBE 专家推荐 对应实现
视频内容 专家能力 知识库质量评估
用户画像 用户需求画像 HOPE 记忆系统
行为序列 咨询历史 TITANS 长期记忆
点赞/完播 满意度/复购 反馈学习系统
协同过滤 相似用户推荐 用户聚类
探索与利用 新专家曝光 多臂老虎机
实时反馈 即时调整 惊讶度学习

系统架构

┌─────────────────────────────────────────────────────────────────────┐
│                    MBE 专家推荐系统                                  │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────┐     ┌─────────────────┐     ┌──────────────┐ │
│  │   用户理解层    │     │   专家理解层    │     │  匹配决策层  │ │
│  │                 │     │                 │     │              │ │
│  │ • 意图分析      │     │ • 能力评估      │     │ • 召回       │ │
│  │ • 需求画像      │     │ • 可信度评分    │     │ • 精排       │ │
│  │ • 行为序列      │     │ • 专业领域      │     │ • 重排       │ │
│  │ • 偏好学习      │     │ • 用户评价      │     │ • 多样性    │ │
│  └────────┬────────┘     └────────┬────────┘     └──────┬───────┘ │
│           │                       │                      │         │
│           └───────────────────────┼──────────────────────┘         │
│                                   ▼                                │
│  ┌─────────────────────────────────────────────────────────────┐  │
│  │                    推荐引擎核心                              │  │
│  │                                                             │  │
│  │   ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐      │  │
│  │   │ 协同    │  │ 内容    │  │ 知识    │  │ 实时    │      │  │
│  │   │ 过滤    │  │ 匹配    │  │ 图谱    │  │ 反馈    │      │  │
│  │   │         │  │         │  │         │  │         │      │  │
│  │   │ 相似用户│  │ 意图-   │  │ TITANS  │  │ 惊讶度  │      │  │
│  │   │ 相似专家│  │ 专家    │  │ 关系    │  │ 学习    │      │  │
│  │   └─────────┘  └─────────┘  └─────────┘  └─────────┘      │  │
│  │                                                             │  │
│  └─────────────────────────────────────────────────────────────┘  │
│                                   │                                │
│                                   ▼                                │
│  ┌─────────────────────────────────────────────────────────────┐  │
│  │                    存储与记忆层                              │  │
│  │                                                             │  │
│  │   ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐      │  │
│  │   │ TITANS  │  │  HOPE   │  │  Redis  │  │ Vector  │      │  │
│  │   │ 长期    │  │  用户   │  │  实时   │  │  向量   │      │  │
│  │   │ 记忆    │  │  画像   │  │  缓存   │  │  检索   │      │  │
│  │   └─────────┘  └─────────┘  └─────────┘  └─────────┘      │  │
│  │                                                             │  │
│  └─────────────────────────────────────────────────────────────┘  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

核心算法

1. 专家可信度评分 (Expert Trust Score)

def calculate_expert_trust_score(expert_id: str) -> float:
    """
    计算专家可信度评分 (0-100)
    
    维度:
    1. 知识库质量 (40%)
    2. 用户满意度 (30%)
    3. 回答准确性 (20%)
    4. 活跃度 (10%)
    """
    # 知识库质量:文档数量、更新频率、来源可信度
    kb_quality = evaluate_knowledge_base(expert_id)
    
    # 用户满意度:评分、复购率、推荐率
    satisfaction = calculate_user_satisfaction(expert_id)
    
    # 回答准确性:Self-Critique 验证通过率
    accuracy = get_self_critique_pass_rate(expert_id)
    
    # 活跃度:更新频率、响应时间
    activity = calculate_activity_score(expert_id)
    
    return (
        kb_quality * 0.4 +
        satisfaction * 0.3 +
        accuracy * 0.2 +
        activity * 0.1
    )

2. 用户-专家匹配分 (User-Expert Match Score)

def calculate_match_score(user_id: str, expert_id: str, query: str) -> float:
    """
    计算用户与专家的匹配分数
    
    融合多路召回:
    1. 意图匹配 (米塞斯行为学)
    2. 历史偏好 (TITANS/HOPE)
    3. 协同过滤 (相似用户喜欢)
    4. 专家质量 (可信度)
    """
    # 1. 意图匹配分
    intent = analyzer.analyze(query, user_id)
    intent_score = get_intent_match(intent, expert_id)
    
    # 2. 历史偏好分 (TITANS)
    history_score = titans.get_user_expert_affinity(user_id, expert_id)
    
    # 3. 协同过滤分
    cf_score = collaborative_filter(user_id, expert_id)
    
    # 4. 专家质量分
    trust_score = get_expert_trust_score(expert_id) / 100
    
    # 加权融合
    return (
        intent_score * 0.4 +      # 当前需求最重要
        history_score * 0.25 +    # 历史偏好次之
        cf_score * 0.15 +         # 相似用户参考
        trust_score * 0.2         # 专家质量保底
    )

3. 协同过滤 (Collaborative Filtering)

class ExpertCollaborativeFilter:
    """
    基于用户行为的协同过滤
    
    两种方式:
    1. User-based: 找相似用户,推荐他们喜欢的专家
    2. Item-based: 找相似专家,推荐用户可能喜欢的
    """
    
    def __init__(self):
        self._user_expert_matrix = {}  # 用户-专家交互矩阵
        self._user_similarity = {}      # 用户相似度缓存
        self._expert_similarity = {}    # 专家相似度缓存
    
    def user_based_recommend(self, user_id: str, top_k: int = 5) -> List[str]:
        """基于相似用户的推荐"""
        # 1. 找到相似用户
        similar_users = self.find_similar_users(user_id, top_n=20)
        
        # 2. 收集相似用户喜欢但当前用户没用过的专家
        candidate_experts = {}
        for sim_user, similarity in similar_users:
            for expert_id, rating in self._user_expert_matrix.get(sim_user, {}).items():
                if expert_id not in self._user_expert_matrix.get(user_id, {}):
                    if expert_id not in candidate_experts:
                        candidate_experts[expert_id] = 0
                    candidate_experts[expert_id] += similarity * rating
        
        # 3. 排序返回
        sorted_experts = sorted(candidate_experts.items(), key=lambda x: x[1], reverse=True)
        return [e[0] for e in sorted_experts[:top_k]]
    
    def item_based_recommend(self, user_id: str, top_k: int = 5) -> List[str]:
        """基于相似专家的推荐"""
        user_history = self._user_expert_matrix.get(user_id, {})
        
        candidate_experts = {}
        for used_expert, rating in user_history.items():
            # 找到相似专家
            similar_experts = self.find_similar_experts(used_expert, top_n=10)
            for sim_expert, similarity in similar_experts:
                if sim_expert not in user_history:
                    if sim_expert not in candidate_experts:
                        candidate_experts[sim_expert] = 0
                    candidate_experts[sim_expert] += similarity * rating
        
        sorted_experts = sorted(candidate_experts.items(), key=lambda x: x[1], reverse=True)
        return [e[0] for e in sorted_experts[:top_k]]

4. 探索与利用 (Exploration vs Exploitation)

class ExpertExplorationStrategy:
    """
    平衡推荐熟悉专家和探索新专家
    
    使用 ε-greedy 或 UCB 算法
    """
    
    def __init__(self, epsilon: float = 0.1):
        self.epsilon = epsilon
        self._expert_stats = {}  # 专家被推荐次数和成功率
    
    def select_expert(self, candidates: List[Tuple[str, float]], 
                     user_id: str) -> str:
        """
        选择要推荐的专家
        
        以 1-ε 概率选择最高分专家(利用)
        以 ε 概率随机选择新专家(探索)
        """
        import random
        
        if random.random() < self.epsilon:
            # 探索:选择一个用户没用过的新专家
            new_experts = self._get_unexplored_experts(user_id, candidates)
            if new_experts:
                return random.choice(new_experts)
        
        # 利用:选择最高分
        return candidates[0][0] if candidates else None
    
    def ucb_select(self, candidates: List[Tuple[str, float]], 
                   total_rounds: int) -> str:
        """
        UCB (Upper Confidence Bound) 选择
        
        平衡:预期收益 + 不确定性奖励
        """
        ucb_scores = []
        for expert_id, base_score in candidates:
            stats = self._expert_stats.get(expert_id, {"n": 0, "success": 0})
            n = stats["n"] + 1
            
            # UCB 公式
            exploitation = base_score
            exploration = math.sqrt(2 * math.log(total_rounds + 1) / n)
            
            ucb_scores.append((expert_id, exploitation + exploration))
        
        ucb_scores.sort(key=lambda x: x[1], reverse=True)
        return ucb_scores[0][0]

5. 实时反馈处理

class RealtimeFeedbackProcessor:
    """
    实时处理用户反馈,调整推荐
    
    类似抖音的即时反馈机制
    """
    
    def __init__(self):
        self._session_feedback = {}  # 会话级反馈
    
    def process_feedback(self, user_id: str, expert_id: str, 
                        feedback_type: str, value: float):
        """
        处理各种反馈信号
        
        feedback_type:
        - "rating": 直接评分
        - "duration": 对话时长(类似完播率)
        - "switch": 切换专家(负向信号)
        - "followup": 继续追问(正向信号)
        - "share": 分享(强正向信号)
        """
        # 1. 更新会话级反馈
        self._update_session_feedback(user_id, expert_id, feedback_type, value)
        
        # 2. 存入 TITANS(长期记忆)
        importance = self._calculate_importance(feedback_type, value)
        titans.store_feedback(user_id, expert_id, feedback_type, value, importance)
        
        # 3. 更新用户画像 (HOPE)
        hope.update_user_preference(user_id, expert_id, feedback_type, value)
        
        # 4. 更新专家统计
        self._update_expert_stats(expert_id, feedback_type, value)
    
    def _calculate_importance(self, feedback_type: str, value: float) -> float:
        """计算反馈重要性(用于惊讶度学习)"""
        importance_map = {
            "rating": 1.0,
            "duration": 0.5,
            "switch": 1.5,     # 切换是负向惊讶,需要特别记住
            "followup": 0.8,
            "share": 2.0       # 分享是强信号
        }
        base = importance_map.get(feedback_type, 1.0)
        
        # 极端值更重要
        if value < 0.3 or value > 0.9:
            base *= 1.3
        
        return base

推荐流程

用户请求
    │
    ▼
┌─────────────────────────────────────────────────────────┐
│ 1. 用户理解                                              │
│    - 解析当前意图(米塞斯行为学)                         │
│    - 加载用户画像(HOPE)                                │
│    - 获取历史偏好(TITANS)                              │
└─────────────────────────────────────────────────────────┘
    │
    ▼
┌─────────────────────────────────────────────────────────┐
│ 2. 多路召回 (Recall)                                     │
│    - 意图匹配召回:基于当前需求                           │
│    - 协同过滤召回:基于相似用户                           │
│    - 热门召回:高质量专家                                 │
│    - 探索召回:新专家曝光                                 │
└─────────────────────────────────────────────────────────┘
    │
    ▼
┌─────────────────────────────────────────────────────────┐
│ 3. 精排 (Ranking)                                        │
│    - 计算综合匹配分                                       │
│    - 考虑专家可信度                                       │
│    - 融合实时反馈                                         │
└─────────────────────────────────────────────────────────┘
    │
    ▼
┌─────────────────────────────────────────────────────────┐
│ 4. 重排 (Re-ranking)                                     │
│    - 多样性打散:避免同类专家扎堆                         │
│    - 新鲜度提升:新专家适当加分                           │
│    - 业务规则:付费专家、认证专家                         │
└─────────────────────────────────────────────────────────┘
    │
    ▼
推荐结果

现有 MBE 能力映射

推荐系统组件 MBE 现有能力 状态
用户画像 HOPE 记忆系统 ✅ 已有
长期记忆 TITANS ✅ 已有
意图理解 米塞斯行为学分析 ✅ 已有
关键词匹配 TITANS 关键词层级 ✅ 已有
反馈学习 惊讶度学习 ✅ 已有
向量检索 MIRAS + Embedding ✅ 已有
协同过滤 collaborative_filter.py ✅ 已完成
专家评分 expert_trust_score.py ✅ 已完成
探索机制 exploration_strategy.py ✅ 已完成
多样性重排 recommendation_engine.py ✅ 已完成
统一专家池 unified_expert_pool.py ✅ 已适配

实现路线图

阶段 1:专家质量体系 ✅ 已完成

  • 专家可信度评分模型 (src/market/expert_trust_score.py)
  • 知识库质量评估 (KnowledgeBaseMetrics)
  • 用户满意度统计 (UserSatisfactionMetrics)
  • 回答准确性指标 (AccuracyMetrics)
  • 活跃度指标 (ActivityMetrics)
  • API 端点 (/api/trust/*)

阶段 2:协同过滤 ✅ 已完成

  • 用户-专家交互矩阵 (collaborative_filter.py)
  • 用户相似度计算 (Pearson Correlation)
  • 专家相似度计算 (Cosine Similarity)
  • User-based 推荐
  • Item-based 推荐
  • 混合推荐策略

阶段 3:推荐引擎 ✅ 已完成

  • 多路召回整合 (意图匹配 + 协同过滤 + 信任分数)
  • 精排模型 (多维度加权融合)
  • 重排逻辑实现 (多样性打散)
  • 冷启动处理

阶段 4:探索与优化 ✅ 已完成

  • ε-greedy 探索算法 (exploration_strategy.py)
  • UCB (Upper Confidence Bound) 算法
  • Thompson Sampling 算法
  • 实时反馈处理 (realtime_feedback.py)
  • 惊讶度学习机制
  • 推荐解释生成 (recommendation_explainer.py)
  • 多样性重排(避免信息茧房)

已实现的核心模块

1. 专家信任评分 (src/market/expert_trust_score.py)

# 评分维度权重
WEIGHTS = {
    "kb": 0.40,           # 知识库质量
    "satisfaction": 0.30,  # 用户满意度
    "accuracy": 0.20,      # 回答准确性
    "activity": 0.10       # 活跃度
}

# 信任等级
TRUST_LEVELS = {
    "excellent": 85,  # 优秀
    "good": 70,       # 良好
    "average": 50,    # 一般
    "poor": 0         # 较差
}

2. 协同过滤 (src/market/collaborative_filter.py)

# 相似度算法
- Pearson Correlation (用户相似度)
- Cosine Similarity (专家相似度)

# 推荐策略
- User-based: 相似用户喜欢什么
- Item-based: 相似专家有哪些
- Hybrid: 混合推荐 (60% User + 40% Item)

3. 推荐引擎 (src/market/recommendation_engine.py)

# 推荐权重
RECOMMENDATION_WEIGHTS = {
    "intent_match": 0.35,      # 意图匹配
    "trust_score": 0.25,       # 专家信任度
    "cf_score": 0.20,          # 协同过滤
    "user_preference": 0.15,   # 用户偏好
    "freshness": 0.05          # 新鲜度
}

4. 探索策略 (src/market/exploration_strategy.py)

# 三种探索算法
class ExpertExplorationStrategy:
    # ε-greedy: 简单有效
    def epsilon_greedy_select(candidates, user_id) -> ExplorationResult
    
    # UCB: 理论最优
    def ucb_select(candidates, user_id) -> ExplorationResult
    # UCB = avg_reward + c * sqrt(ln(N) / n)
    
    # Thompson Sampling: 贝叶斯方法
    def thompson_sampling_select(candidates, user_id) -> ExplorationResult

5. 实时反馈处理 (src/market/realtime_feedback.py)

# 反馈类型和权重(完整版)
FEEDBACK_WEIGHTS = {
    FeedbackType.RATING: 1.0,            # 直接评分
    FeedbackType.LIKE: 0.5,              # 点赞
    FeedbackType.DISLIKE: 1.2,           # 踩(负面反馈更重要)
    FeedbackType.REPORT: 2.0,            # 举报
    FeedbackType.CLICK: 0.2,             # 点击
    FeedbackType.IMPRESSION: 0.1,        # 曝光
    FeedbackType.SESSION_DURATION: 0.6,  # 会话时长
    FeedbackType.CONVERSATION_TURNS: 0.5,# 对话轮次
    FeedbackType.SWITCH_EXPERT: 1.5,     # 切换专家(强负向)
    FeedbackType.FOLLOWUP: 0.8,          # 追问
    FeedbackType.SHARE: 1.5,             # 分享(强正向)
    FeedbackType.SUBSCRIBE: 1.8,         # 订阅
    FeedbackType.UNSUBSCRIBE: 1.5,       # 取消订阅
    FeedbackType.PURCHASE: 2.0           # 购买(极强正向)
}

# 惊讶度学习
surprise = base_weight * (1 + deviation)
if polarity == "negative":
    surprise *= 1.5  # 负面反馈更惊讶
if surprise > 0.5:
    trigger_learning()  # 高惊讶度触发即时学习

6. 推荐解释器 (src/market/recommendation_explainer.py)

# 解释因素
ExplanationType:
    INTENT_MATCH    # 意图匹配
    COLLABORATIVE   # 协同过滤
    TRUST_SCORE     # 信任度
    USER_HISTORY    # 用户历史
    EXPLORATION     # 探索
    FRESHNESS       # 新鲜度
    DIVERSITY       # 多样性

# 生成解释
explanation = explainer.explain_recommendation(
    expert_id, expert_name, score, rank, breakdown, context
)
# 返回: short_explanation, full_explanation, main_reasons

7. API 端点

信任度端点 (/api/trust)

端点 方法 功能
/api/trust/score/{expert_id} GET 获取专家信任分数
/api/trust/leaderboard GET 获取信任度排行榜
/api/trust/batch POST 批量计算信任分数
/api/trust/interaction POST 记录用户交互
/api/trust/profile/{expert_id} GET 获取完整档案
/api/trust/recalculate-all POST 重算所有专家分数

市场推荐端点 (/api/market)

端点 方法 功能
/api/market/ GET 市场首页(包含个性化推荐)
/api/market/recommendations/for-user/{user_id} GET 为用户生成个性化推荐
/api/market/recommendations/for-question GET 基于问题推荐专家
/api/market/recommendations/similar/{expert_id} GET 相似专家推荐

反馈端点 (/api/feedback)

端点 方法 功能
/api/feedback/expert POST 提交专家反馈
/api/feedback/answer POST 提交答案反馈
/api/feedback/switch POST 记录专家切换
/api/feedback/stats GET 获取反馈统计

8. 接口适配与性能优化 (2026-01-27)

统一专家池适配

推荐引擎现已适配 UnifiedExpertPool 接口:

# UnifiedExpertPool 新增方法
def get_all_experts(self) -> List[UnifiedExpert]:
    """获取所有专家(同步方法,用于推荐引擎)"""
    all_experts = []
    for expert in self._platform_experts.values():
        if expert.enabled:
            all_experts.append(expert)
    for expert in self._market_experts.values():
        all_experts.append(expert)
    return all_experts

候选专家获取流程

async def _get_candidate_experts(self) -> List[Dict]:
    """获取候选专家列表 - 多源融合"""
    experts = []
    
    # 1. 从 UnifiedExpertPool 获取
    if self._expert_pool:
        pool_experts = self._expert_pool.get_all_experts()
        for e in pool_experts:
            experts.append({
                "id": e.id,
                "name": e.name,
                "description": e.description,
                "category": e.category or e.domains[0],
                "keywords": e.keywords,
                "price": e.price_per_1k_tokens,
                "rating": e.rating_avg
            })
    
    # 2. 从 ExpertRouter 获取(备选)
    if not experts:
        router = get_expert_router()
        for expert_id, config in router.experts.items():
            experts.append({...})
    
    # 3. 从 MarketStore 补充(市场专家)
    if self._market_store:
        market_experts = await self._market_store.get_published_models()
        experts.extend(market_experts)
    
    return experts

Redis 快速失败机制

避免 Redis 不可用时的重复超时:

class ExpertTrustScorer:
    def __init__(self):
        self._redis = None
        self._redis_unavailable = False  # 快速失败标志
    
    async def _get_redis(self):
        # 已知不可用,直接返回
        if self._redis_unavailable:
            return None
        
        if self._redis is None:
            try:
                redis_host = os.environ.get("REDIS_HOST", "localhost")
                self._redis = aioredis.from_url(
                    f"redis://{redis_host}:6379/0",
                    socket_connect_timeout=1.0,  # 1秒超时
                    socket_timeout=2.0
                )
                await self._redis.ping()
            except Exception as e:
                self._redis_unavailable = True  # 标记不可用
                self._redis = None
        return self._redis

性能优化效果

指标 优化前 优化后 提升
Redis 失败重试 每次 3s 超时 立即跳过 100x
24专家分数计算 ~5分钟 <1秒 300x
候选专家获取 单源 多源融合 更完整

数据埋点需求

# 需要采集的用户行为数据
TRACKING_EVENTS = {
    # 曝光
    "expert_impression": "专家曝光",
    
    # 点击
    "expert_click": "点击进入专家",
    
    # 对话
    "conversation_start": "开始对话",
    "conversation_turn": "对话轮次",
    "conversation_end": "结束对话",
    
    # 反馈
    "rating_submit": "提交评分",
    "expert_switch": "切换专家",
    "expert_share": "分享专家",
    "expert_subscribe": "订阅专家",
    "expert_purchase": "购买专家服务",
    
    # 时长
    "session_duration": "会话时长",
    "response_read_time": "阅读回复时长"
}

预期效果

指标 当前 目标
专家匹配准确率 ~70% 90%+
用户留存率 - 提升 30%
专家复购率 - 提升 50%
新专家曝光率 每用户/周 2-3 新专家
推荐多样性 高(避免信息茧房)

参考

  • 抖音推荐算法:协同过滤 + 内容理解 + 实时反馈
  • YouTube 推荐:深度神经网络 + 多目标优化
  • Netflix 推荐:矩阵分解 + 深度学习
  • 米塞斯行为学:人的行为是有目的的