MBE Python SDK 使用指南

版本: 0.1.0
更新日期: 2026-02-08

本文档提供 MBE Python SDK 的完整使用指南,包括安装、API 参考、常见场景示例和最佳实践。


📚 目录

  1. 快速开始
  2. 安装与配置
  3. 核心 API 参考
  4. 常见场景示例
  5. 最佳实践
  6. 错误处理
  7. 性能优化
  8. 常见问题

快速开始

最简单的示例

from mbe_sdk import MBEClient

# 初始化客户端
client = MBEClient(api_key="your-api-key")

# 计算惊讶度分数
result = client.surprise(
    input="递归是函数调用自身的技术",
    user_id="student_001"
)

print(f"惊讶度分数: {result.surprise_score}")
print(f"学习模式: {result.learning_mode}")

# 清理资源
client.close()

异步版本

import asyncio
from mbe_sdk import AsyncMBEClient

async def main():
    async with AsyncMBEClient(api_key="your-api-key") as client:
        result = await client.surprise(
            input="机器学习是人工智能的一个子集",
            user_id="user_123"
        )
        print(f"惊讶度分数: {result.surprise_score}")

asyncio.run(main())

安装与配置

安装

pip install mbe-sdk

配置

方式 1: 直接传入参数

from mbe_sdk import MBEClient

client = MBEClient(
    api_key="your-api-key",
    base_url="https://api.mbe.ai",  # 可选,默认为 https://api.mbe.ai
    timeout=30.0,                   # 可选,默认 30 秒
    max_retries=3                    # 可选,默认 3 次重试
)

方式 2: 使用环境变量

# 设置环境变量
export MBE_API_KEY="your-api-key"
export MBE_BASE_URL="https://api.mbe.ai"  # 可选
from mbe_sdk import MBEClient

# 自动从环境变量读取配置
client = MBEClient()

方式 3: 使用配置文件

import os
from dotenv import load_dotenv
from mbe_sdk import MBEClient

# 加载 .env 文件
load_dotenv()

client = MBEClient(
    api_key=os.getenv("MBE_API_KEY"),
    base_url=os.getenv("MBE_BASE_URL", "https://api.mbe.ai")
)

核心 API 参考

1. Surprise Detection(惊讶度检测)

分析内容的新颖性和学习准备度。

方法签名

def surprise(
    self,
    input: str,
    user_id: str = "default",
    history: Optional[List[str]] = None,
    domain: Optional[str] = None,
) -> SurpriseResult

参数说明

  • input (str, 必需): 要分析的内容文本
  • user_id (str, 可选): 用户标识符,用于上下文追踪,默认为 "default"
  • history (List[str], 可选): 用户之前的知识/主题列表
  • domain (str, 可选): 领域上下文,如 "education"、"customer_service" 等

返回值

SurpriseResult 对象,包含以下属性:

  • surprise_score (float): 惊讶度分数,范围 0.0-1.0
  • is_novel (bool): 内容是否新颖
  • learning_mode (str): 推荐的学习模式
    • fast_adaptation: 快速适应(高惊讶度)
    • pattern_recognition: 模式识别(中等惊讶度)
    • stable_memory: 稳定记忆(低惊讶度)
  • suggestion (str, 可选): 学习建议
  • related_topics (List[str]): 相关主题列表

使用示例

# 基础用法
result = client.surprise(
    input="递归是函数调用自身的技术",
    user_id="student_001"
)

# 带历史记录
result = client.surprise(
    input="动态规划算法",
    user_id="student_001",
    history=["递归", "分治算法", "贪心算法"],
    domain="education"
)

# 检查结果
if result.is_novel:
    print(f"这是新内容,惊讶度: {result.surprise_score}")
    print(f"建议学习模式: {result.learning_mode}")
    if result.suggestion:
        print(f"学习建议: {result.suggestion}")

2. Smart Routing(智能路由)

根据上下文选择最合适的专家。

方法签名

def route(
    self,
    input: str,
    task_type: str,
    user_level: str = "intermediate",
    surprise_score: Optional[float] = None,
    available_experts: Optional[List[str]] = None,
) -> RouteResult

参数说明

  • input (str, 必需): 输入内容
  • task_type (str, 必需): 任务类型,如 "explanation"、"practice"、"assessment" 等
  • user_level (str, 可选): 用户水平,可选值: "beginner"、"intermediate"、"advanced",默认为 "intermediate"
  • surprise_score (float, 可选): 预先计算的惊讶度分数
  • available_experts (List[str], 可选): 可用的专家 ID 列表

返回值

RouteResult 对象,包含以下属性:

  • selected_experts (List[SelectedExpert]): 选中的专家列表
  • routing_confidence (float): 路由置信度,范围 0.0-1.0
  • task_type (str): 任务类型
  • user_level (str, 可选): 用户水平

SelectedExpert 对象包含:

  • id (str): 专家 ID
  • weight (float): 选择权重,范围 0.0-1.0
  • reason (str, 可选): 选择原因

使用示例

# 基础路由
route_result = client.route(
    input="解释二分查找算法",
    task_type="explanation",
    user_level="intermediate"
)

# 带惊讶度分数
surprise_result = client.surprise(input="动态规划", user_id="user_123")
route_result = client.route(
    input="解释动态规划",
    task_type="explanation",
    user_level="beginner",
    surprise_score=surprise_result.surprise_score
)

# 从可用专家中选择
route_result = client.route(
    input="练习递归问题",
    task_type="practice",
    available_experts=[
        "expert_detailed_explanation",
        "expert_example_illustration",
        "expert_practice_questions"
    ]
)

# 处理结果
print(f"路由置信度: {route_result.routing_confidence}")
for expert in route_result.selected_experts:
    print(f"专家: {expert.id}, 权重: {expert.weight}")
    if expert.reason:
        print(f"  原因: {expert.reason}")

3. Memory Management(记忆管理)

存储和检索用户上下文信息。

3.1 写入记忆

def write(
    self,
    user_id: str,
    memory_type: str,
    key: str,
    content: Any,
    importance: float = 0.5,
) -> MemoryWriteResult

参数说明

  • user_id (str, 必需): 用户标识符
  • memory_type (str, 必需): 记忆类型
    • short_term: 短期记忆(会话级别)
    • working: 工作记忆(当前任务)
    • long_term: 长期记忆(持久化)
  • key (str, 必需): 记忆键,用于唯一标识
  • content (Any, 必需): 记忆内容,可以是任意 JSON 可序列化的对象
  • importance (float, 可选): 重要性分数,范围 0.0-1.0,默认为 0.5

返回值

MemoryWriteResult 对象,包含:

  • success (bool): 是否成功
  • memory_key (str): 写入的记忆键
  • message (str, 可选): 状态消息

3.2 读取记忆

def read(
    self,
    user_id: str,
    key: Optional[str] = None,
    query: Optional[str] = None,
    memory_types: Optional[List[str]] = None,
    top_k: int = 10,
) -> MemoryReadResult

参数说明

  • user_id (str, 必需): 用户标识符
  • key (str, 可选): 精确匹配的记忆键
  • query (str, 可选): 语义搜索查询
  • memory_types (List[str], 可选): 要搜索的记忆类型列表
  • top_k (int, 可选): 返回的最大数量,默认为 10

返回值

MemoryReadResult 对象,包含:

  • memories (List[MemoryItem]): 检索到的记忆列表
  • total (int): 匹配的总数

3.3 删除记忆

def delete(self, user_id: str, key: str) -> bool

参数说明

  • user_id (str, 必需): 用户标识符
  • key (str, 必需): 要删除的记忆键

返回值

bool: 是否成功删除

使用示例

# 写入长期记忆
write_result = client.memory.write(
    user_id="student_001",
    memory_type="long_term",
    key="learning_progress",
    content={
        "completed_topics": ["变量", "循环", "函数"],
        "current_level": "中级",
        "weak_areas": ["递归", "动态规划"],
        "last_updated": "2026-02-08"
    },
    importance=0.9
)

# 写入工作记忆
client.memory.write(
    user_id="student_001",
    memory_type="working",
    key="current_session",
    content={
        "session_start": "2026-02-08T10:00:00",
        "topics_covered": ["递归基础", "递归应用"],
        "questions_asked": 5
    },
    importance=0.7
)

# 精确读取
read_result = client.memory.read(
    user_id="student_001",
    key="learning_progress"
)

# 语义搜索
read_result = client.memory.read(
    user_id="student_001",
    query="学习进度和薄弱环节",
    memory_types=["long_term"],
    top_k=5
)

# 遍历结果
for memory in read_result.memories:
    print(f"键: {memory.key}")
    print(f"内容: {memory.content}")
    print(f"重要性: {memory.importance}")
    if memory.relevance:
        print(f"相关性: {memory.relevance}")

# 删除记忆
success = client.memory.delete(
    user_id="student_001",
    key="temporary_note"
)

4. Expert Marketplace(专家市场)

搜索和调用 AI 专家。

4.1 搜索专家

def search(
    self,
    query: Optional[str] = None,
    category: Optional[str] = None,
    tags: Optional[List[str]] = None,
    capabilities: Optional[List[str]] = None,
    min_rating: Optional[float] = None,
    max_latency_ms: Optional[int] = None,
    limit: int = 20,
) -> ExpertSearchResult

参数说明

  • query (str, 可选): 搜索查询文本
  • category (str, 可选): 专家类别,如 "education.math"
  • tags (List[str], 可选): 标签列表
  • capabilities (List[str], 可选): 能力列表
  • min_rating (float, 可选): 最低评分,范围 0.0-5.0
  • max_latency_ms (int, 可选): 最大延迟(毫秒)
  • limit (int, 可选): 返回的最大数量,默认为 20

返回值

ExpertSearchResult 对象,包含:

  • experts (List[Expert]): 专家列表
  • total (int): 匹配的总数

4.2 获取专家详情

def get(self, expert_id: str) -> Expert

参数说明

  • expert_id (str, 必需): 专家 ID

返回值

Expert 对象,包含:

  • id (str): 专家 ID
  • name (str): 显示名称
  • description (str): 描述
  • category (str): 类别
  • tags (List[str]): 标签列表
  • capabilities (List[str]): 能力列表
  • input_schema (Dict): 输入 JSON Schema
  • output_schema (Dict): 输出 JSON Schema
  • metrics (ExpertMetrics): 指标信息
  • pricing (ExpertPricing): 定价信息
  • is_active (bool): 是否激活

4.3 调用专家

def invoke(
    self,
    expert_id: str,
    input: Dict[str, Any],
    timeout_ms: int = 30000,
    retry: int = 0,
) -> InvocationResult

参数说明

  • expert_id (str, 必需): 专家 ID
  • input (Dict[str, Any], 必需): 输入数据,格式需符合专家的 input_schema
  • timeout_ms (int, 可选): 超时时间(毫秒),默认为 30000
  • retry (int, 可选): 重试次数,默认为 0

返回值

InvocationResult 对象,包含:

  • expert_id (str): 专家 ID
  • success (bool): 是否成功
  • result (Any): 调用结果
  • error (str, 可选): 错误消息(如果失败)
  • latency_ms (float): 延迟(毫秒)
  • cost (float): 调用成本

使用示例

# 搜索数学专家
search_result = client.experts.search(
    query="方程求解",
    category="education.math",
    min_rating=4.0,
    limit=10
)

print(f"找到 {search_result.total} 个专家")
for expert in search_result.experts:
    print(f"- {expert.name} ({expert.id})")
    print(f"  评分: {expert.metrics.rating:.1f}/5.0")
    print(f"  调用次数: {expert.metrics.call_count}")
    print(f"  平均延迟: {expert.metrics.avg_latency_ms}ms")

# 获取专家详情
expert = client.experts.get("edu.math.algebra.solver")
print(f"专家: {expert.name}")
print(f"描述: {expert.description}")
print(f"输入格式: {expert.input_schema}")
print(f"输出格式: {expert.output_schema}")

# 调用专家
invoke_result = client.experts.invoke(
    expert_id="edu.math.algebra.solver",
    input={
        "equation": "x^2 - 5x + 6 = 0",
        "show_steps": True,
        "format": "latex"
    },
    timeout_ms=30000
)

if invoke_result.success:
    print(f"结果: {invoke_result.result}")
    print(f"延迟: {invoke_result.latency_ms}ms")
    print(f"成本: ${invoke_result.cost:.4f}")
else:
    print(f"错误: {invoke_result.error}")

5. Learning Analysis(学习分析)

综合分析用户的学习状态。

方法签名

def analyze_learning(
    self,
    user_id: str,
    current_topic: str,
    recent_performance: Optional[List[Dict[str, Any]]] = None,
) -> LearningAnalysis

参数说明

  • user_id (str, 必需): 用户标识符
  • current_topic (str, 必需): 当前学习主题
  • recent_performance (List[Dict], 可选): 最近表现数据,格式为 [{"topic": str, "score": float}, ...]

返回值

LearningAnalysis 对象,包含:

  • user_id (str): 用户 ID
  • current_topic (str): 当前主题
  • surprise_score (float): 惊讶度分数
  • is_new_topic (bool): 是否为新主题
  • learning_mode (str): 推荐的学习模式
  • recent_avg_score (float): 最近平均分数
  • performance_trend (str): 表现趋势("improving"、"declining"、"stable"、"unknown")
  • suggestions (List[str]): 学习建议列表
  • recommended_approach (List[SelectedExpert]): 推荐的专家列表

使用示例

analysis = client.analyze_learning(
    user_id="student_001",
    current_topic="递归",
    recent_performance=[
        {"topic": "循环", "score": 0.85},
        {"topic": "函数", "score": 0.90},
        {"topic": "数组", "score": 0.75},
    ]
)

print(f"当前主题: {analysis.current_topic}")
print(f"惊讶度分数: {analysis.surprise_score}")
print(f"是否新主题: {analysis.is_new_topic}")
print(f"学习模式: {analysis.learning_mode}")
print(f"最近平均分数: {analysis.recent_avg_score:.2f}")
print(f"表现趋势: {analysis.performance_trend}")

print("\n学习建议:")
for suggestion in analysis.suggestions:
    print(f"- {suggestion}")

print("\n推荐专家:")
for expert in analysis.recommended_approach:
    print(f"- {expert.id} (权重: {expert.weight})")

常见场景示例

场景 1: 教育应用 - 个性化学习路径

from mbe_sdk import MBEClient

def personalize_learning_path(client: MBEClient, user_id: str, topic: str):
    """为学习者创建个性化学习路径"""
    
    # 1. 检测内容新颖度
    surprise_result = client.surprise(
        input=topic,
        user_id=user_id,
        history=get_user_history(user_id),  # 假设的函数
        domain="education"
    )
    
    # 2. 根据惊讶度选择学习模式
    if surprise_result.surprise_score > 0.7:
        # 高惊讶度:需要详细解释
        task_type = "explanation"
        user_level = "beginner"
    elif surprise_result.surprise_score > 0.4:
        # 中等惊讶度:需要示例和实践
        task_type = "practice"
        user_level = "intermediate"
    else:
        # 低惊讶度:可以挑战高级问题
        task_type = "assessment"
        user_level = "advanced"
    
    # 3. 智能路由选择专家
    route_result = client.route(
        input=topic,
        task_type=task_type,
        user_level=user_level,
        surprise_score=surprise_result.surprise_score
    )
    
    # 4. 记录学习进度
    client.memory.write(
        user_id=user_id,
        memory_type="long_term",
        key=f"topic_{topic}",
        content={
            "topic": topic,
            "surprise_score": surprise_result.surprise_score,
            "learning_mode": surprise_result.learning_mode,
            "selected_experts": [e.id for e in route_result.selected_experts],
            "timestamp": "2026-02-08T10:00:00"
        },
        importance=0.8
    )
    
    return {
        "surprise_score": surprise_result.surprise_score,
        "learning_mode": surprise_result.learning_mode,
        "recommended_experts": route_result.selected_experts,
        "suggestion": surprise_result.suggestion
    }

# 使用示例
client = MBEClient(api_key="your-api-key")
path = personalize_learning_path(client, "student_001", "动态规划")
print(path)

场景 2: 客户服务 - 智能问答系统

from mbe_sdk import MBEClient, MBERateLimitError
import time

def smart_customer_service(client: MBEClient, user_id: str, question: str):
    """智能客户服务系统"""
    
    # 1. 分析问题新颖度
    surprise_result = client.surprise(
        input=question,
        user_id=user_id,
        domain="customer_service"
    )
    
    # 2. 从记忆库检索相关历史
    memories = client.memory.read(
        user_id=user_id,
        query=question,
        memory_types=["long_term", "working"],
        top_k=3
    )
    
    # 3. 搜索相关专家
    search_result = client.experts.search(
        query=question,
        category="customer_service",
        min_rating=4.0,
        limit=5
    )
    
    # 4. 选择最佳专家并调用
    if search_result.experts:
        best_expert = search_result.experts[0]
        
        # 构建上下文
        context = {
            "question": question,
            "user_history": [m.content for m in memories.memories],
            "surprise_score": surprise_result.surprise_score
        }
        
        # 调用专家
        try:
            invoke_result = client.experts.invoke(
                expert_id=best_expert.id,
                input=context,
                timeout_ms=30000
            )
            
            if invoke_result.success:
                # 保存对话记录
                client.memory.write(
                    user_id=user_id,
                    memory_type="working",
                    key=f"conversation_{int(time.time())}",
                    content={
                        "question": question,
                        "answer": invoke_result.result,
                        "expert_id": best_expert.id
                    },
                    importance=0.6
                )
                
                return invoke_result.result
        except MBERateLimitError as e:
            print(f"速率限制,请稍后重试: {e.retry_after}秒")
            return None
    
    return "抱歉,暂时无法回答您的问题。"

# 使用示例
client = MBEClient(api_key="your-api-key")
answer = smart_customer_service(client, "customer_123", "如何退款?")
print(answer)

场景 3: 批量处理 - 异步批量分析

import asyncio
from mbe_sdk import AsyncMBEClient
from typing import List, Dict

async def batch_analyze_topics(
    client: AsyncMBEClient,
    user_id: str,
    topics: List[str],
    batch_size: int = 5
) -> List[Dict]:
    """批量分析多个主题"""
    
    results = []
    
    # 分批处理
    for i in range(0, len(topics), batch_size):
        batch = topics[i:i + batch_size]
        
        # 并行处理当前批次
        tasks = [
            client.surprise(input=topic, user_id=user_id, domain="education")
            for topic in batch
        ]
        
        batch_results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 处理结果
        for topic, result in zip(batch, batch_results):
            if isinstance(result, Exception):
                print(f"处理 {topic} 时出错: {result}")
                continue
            
            results.append({
                "topic": topic,
                "surprise_score": result.surprise_score,
                "is_novel": result.is_novel,
                "learning_mode": result.learning_mode
            })
        
        # 速率限制:批次之间等待
        if i + batch_size < len(topics):
            await asyncio.sleep(0.5)
    
    return results

# 使用示例
async def main():
    async with AsyncMBEClient(api_key="your-api-key") as client:
        topics = [
            "递归",
            "动态规划",
            "贪心算法",
            "分治算法",
            "回溯算法",
            "图算法",
            "排序算法",
            "搜索算法"
        ]
        
        results = await batch_analyze_topics(client, "student_001", topics)
        
        # 统计
        avg_surprise = sum(r["surprise_score"] for r in results) / len(results)
        novel_count = sum(1 for r in results if r["is_novel"])
        
        print(f"处理了 {len(results)} 个主题")
        print(f"平均惊讶度: {avg_surprise:.2f}")
        print(f"新颖主题数: {novel_count}")

asyncio.run(main())

场景 4: 学习追踪 - 完整学习分析

from mbe_sdk import MBEClient
from datetime import datetime

class LearningTracker:
    """学习追踪器"""
    
    def __init__(self, client: MBEClient, user_id: str):
        self.client = client
        self.user_id = user_id
    
    def track_learning_session(self, topic: str, performance_data: List[Dict]):
        """追踪学习会话"""
        
        # 1. 综合分析
        analysis = self.client.analyze_learning(
            user_id=self.user_id,
            current_topic=topic,
            recent_performance=performance_data
        )
        
        # 2. 更新学习进度
        self.client.memory.write(
            user_id=self.user_id,
            memory_type="long_term",
            key=f"progress_{topic}",
            content={
                "topic": topic,
                "surprise_score": analysis.surprise_score,
                "is_new_topic": analysis.is_new_topic,
                "learning_mode": analysis.learning_mode,
                "recent_avg_score": analysis.recent_avg_score,
                "performance_trend": analysis.performance_trend,
                "last_updated": datetime.now().isoformat()
            },
            importance=0.9
        )
        
        # 3. 保存建议
        if analysis.suggestions:
            self.client.memory.write(
                user_id=self.user_id,
                memory_type="working",
                key=f"suggestions_{topic}",
                content={
                    "suggestions": analysis.suggestions,
                    "recommended_experts": [
                        {"id": e.id, "weight": e.weight}
                        for e in analysis.recommended_approach
                    ]
                },
                importance=0.7
            )
        
        return analysis
    
    def get_learning_summary(self) -> Dict:
        """获取学习摘要"""
        
        # 读取所有学习进度
        memories = self.client.memory.read(
            user_id=self.user_id,
            query="学习进度",
            memory_types=["long_term"],
            top_k=20
        )
        
        topics = []
        for memory in memories.memories:
            if isinstance(memory.content, dict) and "topic" in memory.content:
                topics.append(memory.content)
        
        # 计算统计信息
        if topics:
            avg_surprise = sum(t.get("surprise_score", 0) for t in topics) / len(topics)
            avg_score = sum(t.get("recent_avg_score", 0) for t in topics) / len(topics)
            
            return {
                "total_topics": len(topics),
                "avg_surprise_score": avg_surprise,
                "avg_performance_score": avg_score,
                "topics": topics
            }
        
        return {"total_topics": 0, "topics": []}

# 使用示例
client = MBEClient(api_key="your-api-key")
tracker = LearningTracker(client, "student_001")

# 追踪学习会话
analysis = tracker.track_learning_session(
    topic="递归",
    performance_data=[
        {"topic": "循环", "score": 0.85},
        {"topic": "函数", "score": 0.90},
    ]
)

# 获取摘要
summary = tracker.get_learning_summary()
print(f"学习了 {summary['total_topics']} 个主题")
print(f"平均惊讶度: {summary['avg_surprise_score']:.2f}")

最佳实践

1. 客户端管理

✅ 推荐:使用上下文管理器

# 同步客户端
with MBEClient(api_key="your-api-key") as client:
    result = client.surprise(input="test", user_id="user_123")
    # 自动清理资源

# 异步客户端
async with AsyncMBEClient(api_key="your-api-key") as client:
    result = await client.surprise(input="test", user_id="user_123")
    # 自动清理资源

❌ 不推荐:忘记关闭客户端

# 不好的做法
client = MBEClient(api_key="your-api-key")
result = client.surprise(input="test", user_id="user_123")
# 忘记调用 client.close()

✅ 推荐:单例模式(长期运行的应用)

class MBEClientManager:
    _instance = None
    _client = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def get_client(self):
        if self._client is None:
            self._client = MBEClient(api_key="your-api-key")
        return self._client
    
    def close(self):
        if self._client:
            self._client.close()
            self._client = None

# 使用
manager = MBEClientManager()
client = manager.get_client()

2. 错误处理

✅ 推荐:完整的错误处理

from mbe_sdk import MBEClient
from mbe_sdk.exceptions import (
    MBEAuthenticationError,
    MBERateLimitError,
    MBETimeoutError,
    MBEAPIError,
)

client = MBEClient(api_key="your-api-key")

try:
    result = client.surprise(input="test", user_id="user_123")
except MBEAuthenticationError:
    print("认证失败,请检查 API Key")
except MBERateLimitError as e:
    print(f"速率限制,请在 {e.retry_after} 秒后重试")
except MBETimeoutError:
    print("请求超时,请稍后重试")
except MBEAPIError as e:
    print(f"API 错误: {e.message} (状态码: {e.status_code})")
except Exception as e:
    print(f"未知错误: {e}")
finally:
    client.close()

✅ 推荐:重试机制

import time
from mbe_sdk import MBEClient, MBERateLimitError, MBETimeoutError

def surprise_with_retry(client: MBEClient, input: str, user_id: str, max_retries: int = 3):
    """带重试的惊讶度检测"""
    
    for attempt in range(max_retries):
        try:
            return client.surprise(input=input, user_id=user_id)
        except MBERateLimitError as e:
            if attempt < max_retries - 1:
                wait_time = e.retry_after or (2 ** attempt)
                print(f"速率限制,等待 {wait_time} 秒后重试...")
                time.sleep(wait_time)
                continue
            raise
        except MBETimeoutError:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt
                print(f"超时,等待 {wait_time} 秒后重试...")
                time.sleep(wait_time)
                continue
            raise
    
    raise Exception("重试次数用尽")

3. 性能优化

✅ 推荐:使用异步客户端进行批量操作

import asyncio
from mbe_sdk import AsyncMBEClient

async def process_batch(client: AsyncMBEClient, items: List[str], user_id: str):
    """批量处理,充分利用异步性能"""
    
    # 并行处理所有项目
    tasks = [
        client.surprise(input=item, user_id=user_id)
        for item in items
    ]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

✅ 推荐:合理设置超时时间

# 根据操作类型设置不同的超时时间
client = MBEClient(
    api_key="your-api-key",
    timeout=60.0  # 复杂操作需要更长时间
)

# 对于专家调用,使用专门的超时参数
result = client.experts.invoke(
    expert_id="expert_id",
    input={"data": "..."},
    timeout_ms=60000  # 60秒
)

✅ 推荐:缓存频繁访问的数据

from functools import lru_cache
from mbe_sdk import MBEClient

class CachedMBEClient:
    def __init__(self, client: MBEClient):
        self.client = client
        self._expert_cache = {}
    
    def get_expert_cached(self, expert_id: str):
        """带缓存的专家获取"""
        if expert_id not in self._expert_cache:
            self._expert_cache[expert_id] = self.client.experts.get(expert_id)
        return self._expert_cache[expert_id]

4. 数据管理

✅ 推荐:使用有意义的记忆键

# 好的做法:使用有意义的键名
client.memory.write(
    user_id="user_123",
    memory_type="long_term",
    key="learning_progress_python_basics",  # 清晰、有意义的键名
    content={...}
)

# 不好的做法:使用无意义的键名
client.memory.write(
    user_id="user_123",
    memory_type="long_term",
    key="data_001",  # 不清晰
    content={...}
)

✅ 推荐:合理设置重要性分数

# 关键信息:高重要性
client.memory.write(
    user_id="user_123",
    memory_type="long_term",
    key="user_preferences",
    content={"language": "zh-CN", "theme": "dark"},
    importance=0.9  # 高重要性
)

# 临时信息:低重要性
client.memory.write(
    user_id="user_123",
    memory_type="working",
    key="current_session_temp",
    content={"temp_data": "..."},
    importance=0.3  # 低重要性
)

✅ 推荐:定期清理过期记忆

def cleanup_old_memories(client: MBEClient, user_id: str, days: int = 30):
    """清理过期的工作记忆"""
    
    memories = client.memory.read(
        user_id=user_id,
        memory_types=["working"],
        top_k=100
    )
    
    cutoff_date = datetime.now() - timedelta(days=days)
    
    for memory in memories.memories:
        if memory.created_at and memory.created_at < cutoff_date:
            if memory.importance < 0.5:  # 只删除低重要性的
                client.memory.delete(user_id=user_id, key=memory.key)

5. 安全性

✅ 推荐:使用环境变量存储 API Key

import os
from mbe_sdk import MBEClient

# 从环境变量读取
api_key = os.getenv("MBE_API_KEY")
if not api_key:
    raise ValueError("MBE_API_KEY 环境变量未设置")

client = MBEClient(api_key=api_key)

❌ 不推荐:硬编码 API Key

# 不好的做法:硬编码 API Key
client = MBEClient(api_key="sk-1234567890abcdef")  # 不要这样做!

✅ 推荐:验证用户输入

def safe_surprise(client: MBEClient, input: str, user_id: str):
    """安全的惊讶度检测,带输入验证"""
    
    # 验证输入
    if not input or len(input.strip()) == 0:
        raise ValueError("输入不能为空")
    
    if len(input) > 10000:  # 设置最大长度
        raise ValueError("输入过长")
    
    if not user_id or len(user_id) > 100:
        raise ValueError("用户 ID 无效")
    
    return client.surprise(input=input.strip(), user_id=user_id)

错误处理

异常类型

MBE SDK 定义了以下异常类型:

  1. MBEError: 所有异常的基类
  2. MBEAPIError: API 错误(包含状态码和错误码)
  3. MBEAuthenticationError: 认证失败(401)
  4. MBERateLimitError: 速率限制(429)
  5. MBETimeoutError: 请求超时
  6. MBEValidationError: 参数验证错误
  7. MBEConnectionError: 连接错误
  8. MBEExpertNotFoundError: 专家未找到(404)

错误处理示例

from mbe_sdk import MBEClient
from mbe_sdk.exceptions import *

client = MBEClient(api_key="your-api-key")

try:
    result = client.surprise(input="test", user_id="user_123")
except MBEAuthenticationError as e:
    # 处理认证错误
    print(f"认证失败: {e.message}")
    # 检查 API Key 是否正确
except MBERateLimitError as e:
    # 处理速率限制
    print(f"速率限制: {e.message}")
    if e.retry_after:
        print(f"请在 {e.retry_after} 秒后重试")
except MBETimeoutError as e:
    # 处理超时
    print(f"请求超时: {e.message}")
    # 可以增加超时时间或重试
except MBEValidationError as e:
    # 处理验证错误
    print(f"参数验证失败: {e.message}")
    if e.field:
        print(f"字段: {e.field}")
except MBEAPIError as e:
    # 处理其他 API 错误
    print(f"API 错误: {e.message}")
    print(f"状态码: {e.status_code}")
    print(f"错误码: {e.error_code}")
except MBEConnectionError as e:
    # 处理连接错误
    print(f"连接错误: {e.message}")
    # 检查网络连接或服务状态
except Exception as e:
    # 处理其他未知错误
    print(f"未知错误: {e}")
finally:
    client.close()

性能优化

1. 使用异步客户端

对于需要处理大量请求的场景,使用异步客户端可以显著提升性能:

import asyncio
from mbe_sdk import AsyncMBEClient

async def process_many_requests():
    async with AsyncMBEClient(api_key="your-api-key") as client:
        # 并行处理多个请求
        tasks = [
            client.surprise(input=f"Topic {i}", user_id="user_123")
            for i in range(100)
        ]
        results = await asyncio.gather(*tasks)
        return results

2. 批量操作

对于记忆操作,尽量批量处理:

# 不好的做法:逐个写入
for item in items:
    client.memory.write(user_id="user_123", ...)

# 更好的做法:批量写入(如果 API 支持)
# 或者使用异步并行写入
async def batch_write_memories(client, user_id, items):
    tasks = [
        client.memory.write(user_id=user_id, ...)
        for item in items
    ]
    await asyncio.gather(*tasks)

3. 连接池复用

对于长期运行的应用,复用客户端连接:

# 创建全局客户端(单例模式)
_global_client = None

def get_client():
    global _global_client
    if _global_client is None:
        _global_client = MBEClient(api_key="your-api-key")
    return _global_client

4. 合理设置超时

根据操作类型设置合适的超时时间:

# 快速操作:短超时
client = MBEClient(api_key="your-api-key", timeout=10.0)

# 复杂操作:长超时
client = MBEClient(api_key="your-api-key", timeout=60.0)

常见问题

Q1: 如何获取 API Key?

A: API Key 需要在 MBE 平台注册账号后,在控制台中生成。请访问 MBE 控制台 获取。

Q2: 如何处理速率限制?

A: 当遇到速率限制时,MBERateLimitError 异常会包含 retry_after 属性,指示需要等待的时间:

try:
    result = client.surprise(input="test", user_id="user_123")
except MBERateLimitError as e:
    if e.retry_after:
        time.sleep(e.retry_after)
        # 重试请求

Q3: 记忆数据会永久保存吗?

A: 这取决于记忆类型:

  • short_term: 短期记忆,会话结束后可能被清理
  • working: 工作记忆,可能在一定时间后过期
  • long_term: 长期记忆,会持久保存

Q4: 如何查看专家的输入/输出格式?

A: 使用 client.experts.get(expert_id) 获取专家详情,其中包含 input_schemaoutput_schema

expert = client.experts.get("expert_id")
print("输入格式:", expert.input_schema)
print("输出格式:", expert.output_schema)

Q5: 异步客户端和同步客户端有什么区别?

A:

  • 同步客户端 (MBEClient): 适合简单的脚本和同步代码
  • 异步客户端 (AsyncMBEClient): 适合需要高并发处理的场景,可以并行处理多个请求

Q6: 如何调试 API 调用?

A: 可以启用详细日志:

import logging

logging.basicConfig(level=logging.DEBUG)
# SDK 会自动记录请求和响应

Q7: 支持哪些 Python 版本?

A: MBE SDK 支持 Python 3.8 及以上版本。

Q8: 如何处理网络错误?

A: 使用 MBEConnectionError 捕获连接错误,并实现重试机制:

try:
    result = client.surprise(input="test", user_id="user_123")
except MBEConnectionError:
    # 检查网络连接或服务状态
    print("无法连接到 MBE 服务")

更多资源


文档版本: 1.0.0
最后更新: 2026-02-08