MBE API 使用示例

版本: 1.0.0
更新日期: 2026-02-08

本文档提供 MBE API 的完整使用示例,包括用户注册/登录流程、Token 计费使用和常见 API 调用示例。


📚 目录

  1. 快速开始
  2. 用户注册/登录流程
  3. Token 计费使用
  4. API 调用示例
  5. 完整示例项目

快速开始

基础设置

import requests
import json

# API 基础地址
BASE_URL = "http://localhost:8000"
API_BASE = f"{BASE_URL}/api/v1"

# 请求头模板
def get_headers(token=None):
    headers = {"Content-Type": "application/json"}
    if token:
        headers["Authorization"] = f"Bearer {token}"
    return headers

用户注册/登录流程

1. 用户注册

MBE 支持多种注册方式:

方式 1: 邮箱+密码注册

def register_with_email(email: str, password: str, username: str = None):
    """使用邮箱和密码注册"""
    
    url = f"{API_BASE}/users/register"
    data = {
        "email": email,
        "password": password,
    }
    
    if username:
        data["username"] = username
    
    response = requests.post(url, json=data, timeout=10)
    
    if response.status_code == 200:
        result = response.json()
        return {
            "success": True,
            "user_id": result.get("user_id"),
            "access_token": result.get("access_token"),
            "refresh_token": result.get("refresh_token"),
            "message": result.get("message", "注册成功")
        }
    else:
        error = response.json()
        return {
            "success": False,
            "error": error.get("detail", "注册失败")
        }

# 使用示例
result = register_with_email(
    email="user@example.com",
    password="SecurePassword123!",
    username="testuser"
)

if result["success"]:
    print(f"注册成功!用户ID: {result['user_id']}")
    print(f"访问令牌: {result['access_token'][:30]}...")
    access_token = result["access_token"]
else:
    print(f"注册失败: {result['error']}")

方式 2: 设备自动注册

def register_with_device(device_id: str):
    """使用设备ID自动注册(适用于小智设备等)"""
    
    url = f"{API_BASE}/users/register"
    data = {
        "device_id": device_id
    }
    
    response = requests.post(url, json=data, timeout=10)
    
    if response.status_code == 200:
        result = response.json()
        return {
            "success": True,
            "user_id": result.get("user_id"),
            "access_token": result.get("access_token"),
            "refresh_token": result.get("refresh_token"),
            "message": result.get("message", "设备已注册")
        }
    else:
        error = response.json()
        return {
            "success": False,
            "error": error.get("detail", "注册失败")
        }

# 使用示例
result = register_with_device(device_id="device_123456")
if result["success"]:
    access_token = result["access_token"]

方式 3: 手机号注册

def register_with_phone(phone: str, password: str):
    """使用手机号注册"""
    
    url = f"{API_BASE}/users/register"
    data = {
        "phone": phone,
        "password": password
    }
    
    response = requests.post(url, json=data, timeout=10)
    
    if response.status_code == 200:
        result = response.json()
        return {
            "success": True,
            "user_id": result.get("user_id"),
            "access_token": result.get("access_token"),
            "refresh_token": result.get("refresh_token")
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "注册失败")
        }

2. 用户登录

def login(email: str, password: str):
    """用户登录"""
    
    url = f"{API_BASE}/users/login"
    data = {
        "email": email,
        "password": password
    }
    
    response = requests.post(url, json=data, timeout=10)
    
    if response.status_code == 200:
        result = response.json()
        return {
            "success": True,
            "access_token": result.get("access_token"),
            "refresh_token": result.get("refresh_token"),
            "user_id": result.get("user_id"),
            "role": result.get("role"),
            "email": result.get("email")
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "登录失败")
        }

# 使用示例
result = login(
    email="user@example.com",
    password="SecurePassword123!"
)

if result["success"]:
    access_token = result["access_token"]
    refresh_token = result["refresh_token"]
    user_id = result["user_id"]
    print(f"登录成功!用户ID: {user_id}, 角色: {result['role']}")
else:
    print(f"登录失败: {result['error']}")

3. 刷新 Token

def refresh_access_token(refresh_token: str):
    """刷新访问令牌"""
    
    url = f"{API_BASE}/users/refresh"
    data = {
        "refresh_token": refresh_token
    }
    
    response = requests.post(url, json=data, timeout=10)
    
    if response.status_code == 200:
        result = response.json()
        return {
            "success": True,
            "access_token": result.get("access_token")
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "刷新失败")
        }

# 使用示例
result = refresh_access_token(refresh_token)
if result["success"]:
    access_token = result["access_token"]
    print("Token 刷新成功")

4. 获取用户信息

def get_user_info(access_token: str):
    """获取当前用户信息"""
    
    url = f"{API_BASE}/users/me"
    headers = get_headers(access_token)
    
    response = requests.get(url, headers=headers, timeout=10)
    
    if response.status_code == 200:
        return {
            "success": True,
            "user": response.json()
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "获取失败")
        }

# 使用示例
result = get_user_info(access_token)
if result["success"]:
    user = result["user"]
    print(f"用户ID: {user.get('user_id')}")
    print(f"邮箱: {user.get('email')}")
    print(f"用户名: {user.get('username')}")

5. 更新用户信息

def update_user_info(access_token: str, username: str = None, **kwargs):
    """更新用户信息"""
    
    url = f"{API_BASE}/users/me"
    headers = get_headers(access_token)
    
    data = {}
    if username:
        data["username"] = username
    data.update(kwargs)
    
    response = requests.put(url, headers=headers, json=data, timeout=10)
    
    if response.status_code == 200:
        return {
            "success": True,
            "user": response.json()
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "更新失败")
        }

# 使用示例
result = update_user_info(
    access_token=access_token,
    username="newname",
    bio="这是我的个人简介"
)

Token 计费使用

1. 获取 Token 余额

def get_token_balance(access_token: str):
    """获取用户 Token 余额"""
    
    # 方式 1: 通过用户信息获取(推荐)
    url = f"{API_BASE}/users/me"
    headers = get_headers(access_token)
    
    response = requests.get(url, headers=headers, timeout=10)
    
    if response.status_code == 200:
        user = response.json()
        # 余额信息在 user 对象中
        return {
            "success": True,
            "balance": user.get("token_balance", {}),
            "quota": user.get("token_quota", {}),
            "usage": user.get("token_usage", {})
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "获取失败")
        }

# 使用示例
result = get_token_balance(access_token)
if result["success"]:
    balance = result["balance"]
    quota = result["quota"]
    usage = result["usage"]
    
    print(f"Token 余额: {balance.get('remaining', 0)}")
    print(f"本月配额: {quota.get('monthly_tokens', 0)}")
    print(f"本月已用: {usage.get('used_this_month', 0)}")
    print(f"剩余额度: {quota.get('monthly_tokens', 0) - usage.get('used_this_month', 0)}")

2. 获取使用统计

def get_usage_stats(access_token: str, period: str = "month"):
    """
    获取使用统计
    
    Args:
        period: 统计周期,可选值: "day", "week", "month"
    """
    
    url = f"{API_BASE}/users/usage"
    headers = get_headers(access_token)
    params = {"period": period}
    
    response = requests.get(url, headers=headers, params=params, timeout=10)
    
    if response.status_code == 200:
        return {
            "success": True,
            "stats": response.json()
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "获取失败")
        }

# 使用示例
result = get_usage_stats(access_token, period="month")
if result["success"]:
    stats = result["stats"]
    print(f"今日使用: {stats.get('today_usage', 0)}")
    print(f"今日限额: {stats.get('today_limit', 0)}")
    print(f"本月使用: {stats.get('month_usage', 0)}")
    print(f"本月限额: {stats.get('month_limit', 0)}")
    print(f"剩余额度: {stats.get('remaining', 0)}")

3. 检查配额

def check_quota(access_token: str, required_tokens: int = 0):
    """检查是否有足够的 Token 配额"""
    
    url = f"{API_BASE}/users/usage/check"
    headers = get_headers(access_token)
    params = {"required": required_tokens}
    
    response = requests.get(url, headers=headers, params=params, timeout=10)
    
    if response.status_code == 200:
        result = response.json()
        return {
            "success": True,
            "has_quota": result.get("has_quota", False),
            "remaining": result.get("remaining", 0),
            "message": result.get("message", "")
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "检查失败")
        }

# 使用示例
result = check_quota(access_token, required_tokens=1000)
if result["success"]:
    if result["has_quota"]:
        print(f"配额充足,剩余: {result['remaining']} tokens")
    else:
        print(f"配额不足: {result['message']}")

4. 获取订阅信息

def get_subscription_info(access_token: str):
    """获取用户订阅信息"""
    
    url = f"{API_BASE}/users/subscription"
    headers = get_headers(access_token)
    
    response = requests.get(url, headers=headers, timeout=10)
    
    if response.status_code == 200:
        return {
            "success": True,
            "subscription": response.json()
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "获取失败")
        }

# 使用示例
result = get_subscription_info(access_token)
if result["success"]:
    sub = result["subscription"]
    print(f"当前套餐: {sub.get('plan_type', 'free')}")
    print(f"套餐名称: {sub.get('plan_name', '免费版')}")
    print(f"状态: {sub.get('status', 'unknown')}")
    print(f"到期时间: {sub.get('expires_at', 'N/A')}")

5. 获取定价信息

def get_pricing_info(access_token: str):
    """获取可用套餐和定价信息"""
    
    url = f"{API_BASE}/users/pricing"
    headers = get_headers(access_token)
    
    response = requests.get(url, headers=headers, timeout=10)
    
    if response.status_code == 200:
        return {
            "success": True,
            "plans": response.json().get("plans", [])
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "获取失败")
        }

# 使用示例
result = get_pricing_info(access_token)
if result["success"]:
    plans = result["plans"]
    print("可用套餐:")
    for plan in plans:
        print(f"  - {plan.get('plan_name')}: {plan.get('price', 0)}元/月")
        print(f"    每月Token: {plan.get('monthly_tokens', 0)}")
        print(f"    特性: {', '.join(plan.get('features', []))}")

6. 升级订阅

def upgrade_subscription(access_token: str, plan_type: str):
    """
    升级订阅套餐
    
    Args:
        plan_type: 套餐类型,如 "personal", "pro", "enterprise"
    """
    
    url = f"{API_BASE}/users/subscription/upgrade"
    headers = get_headers(access_token)
    data = {
        "plan_type": plan_type
    }
    
    response = requests.post(url, headers=headers, json=data, timeout=10)
    
    if response.status_code == 200:
        return {
            "success": True,
            "result": response.json()
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "升级失败")
        }

# 使用示例
result = upgrade_subscription(access_token, plan_type="pro")
if result["success"]:
    print("订阅升级成功!")
    # 可能需要支付
    payment_info = result["result"].get("payment", {})
    if payment_info:
        print(f"请完成支付: {payment_info.get('amount', 0)}元")

7. 创建支付订单

def create_payment_order(access_token: str, amount: float, payment_method: str = "alipay"):
    """
    创建支付订单(用于充值或购买套餐)
    
    Args:
        amount: 支付金额(元)
        payment_method: 支付方式,可选值: "alipay", "wechat", "stripe"
    """
    
    url = f"{API_BASE}/users/payment/create"
    headers = get_headers(access_token)
    data = {
        "amount": amount,
        "payment_method": payment_method,
        "purpose": "recharge"  # 或 "subscription"
    }
    
    response = requests.post(url, headers=headers, json=data, timeout=10)
    
    if response.status_code == 200:
        return {
            "success": True,
            "order": response.json()
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "创建订单失败")
        }

# 使用示例
result = create_payment_order(access_token, amount=100.0, payment_method="alipay")
if result["success"]:
    order = result["order"]
    print(f"订单创建成功!订单ID: {order.get('order_id')}")
    print(f"支付链接: {order.get('payment_url', 'N/A')}")
    print(f"金额: {order.get('amount', 0)}元")

API 调用示例

1. Surprise Detection API(惊讶度检测)

def call_surprise_api(access_token: str, input_text: str, user_id: str = None, history: list = None):
    """调用惊讶度检测 API"""
    
    url = f"{API_BASE}/surprise"
    headers = get_headers(access_token)
    data = {
        "input": input_text,
        "user_id": user_id or "default",
        "history": history or [],
        "domain": "education"  # 可选
    }
    
    response = requests.post(url, headers=headers, json=data, timeout=30)
    
    if response.status_code == 200:
        return {
            "success": True,
            "result": response.json()
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "调用失败")
        }

# 使用示例
result = call_surprise_api(
    access_token=access_token,
    input_text="递归是函数调用自身的技术",
    user_id="student_001",
    history=["变量", "循环", "函数"]
)

if result["success"]:
    data = result["result"]
    print(f"惊讶度分数: {data.get('surprise_score', 0):.2f}")
    print(f"是否新颖: {data.get('is_novel', False)}")
    print(f"学习模式: {data.get('learning_mode', 'unknown')}")
    if data.get('suggestion'):
        print(f"建议: {data['suggestion']}")

2. Smart Routing API(智能路由)

def call_route_api(access_token: str, input_text: str, task_type: str, user_level: str = "intermediate"):
    """调用智能路由 API"""
    
    url = f"{API_BASE}/route"
    headers = get_headers(access_token)
    data = {
        "input": input_text,
        "task_type": task_type,
        "user_level": user_level,
        "available_experts": None  # 可选,指定可用专家列表
    }
    
    response = requests.post(url, headers=headers, json=data, timeout=30)
    
    if response.status_code == 200:
        return {
            "success": True,
            "result": response.json()
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "调用失败")
        }

# 使用示例
result = call_route_api(
    access_token=access_token,
    input_text="解释二分查找算法",
    task_type="explanation",
    user_level="beginner"
)

if result["success"]:
    data = result["result"]
    print(f"路由置信度: {data.get('routing_confidence', 0):.2f}")
    print("选中的专家:")
    for expert in data.get('selected_experts', []):
        print(f"  - {expert.get('id')}: 权重 {expert.get('weight', 0):.2f}")
        if expert.get('reason'):
            print(f"    原因: {expert['reason']}")

3. Memory API(记忆管理)

写入记忆

def write_memory(access_token: str, user_id: str, key: str, content: dict, memory_type: str = "long_term"):
    """写入记忆"""
    
    url = f"{API_BASE}/memory/write"
    headers = get_headers(access_token)
    data = {
        "user_id": user_id,
        "memory_type": memory_type,  # "short_term", "working", "long_term"
        "key": key,
        "content": content,
        "importance": 0.5  # 0.0-1.0
    }
    
    response = requests.post(url, headers=headers, json=data, timeout=30)
    
    if response.status_code == 200:
        return {
            "success": True,
            "result": response.json()
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "写入失败")
        }

# 使用示例
result = write_memory(
    access_token=access_token,
    user_id="student_001",
    key="learning_progress",
    content={
        "completed_topics": ["变量", "循环", "函数"],
        "current_level": "中级",
        "weak_areas": ["递归", "动态规划"]
    },
    memory_type="long_term",
    importance=0.9
)

读取记忆

def read_memory(access_token: str, user_id: str, query: str = None, key: str = None, top_k: int = 10):
    """读取记忆"""
    
    url = f"{API_BASE}/memory/read"
    headers = get_headers(access_token)
    data = {
        "user_id": user_id,
        "query": query,  # 语义搜索查询
        "key": key,  # 精确匹配键
        "top_k": top_k
    }
    
    response = requests.post(url, headers=headers, json=data, timeout=30)
    
    if response.status_code == 200:
        return {
            "success": True,
            "result": response.json()
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "读取失败")
        }

# 使用示例
result = read_memory(
    access_token=access_token,
    user_id="student_001",
    query="学习进度",
    top_k=5
)

if result["success"]:
    memories = result["result"].get("memories", [])
    print(f"找到 {len(memories)} 条记忆:")
    for mem in memories:
        print(f"  - {mem.get('key')}: {mem.get('content')}")

4. Expert Marketplace API(专家市场)

搜索专家

def search_experts(access_token: str, query: str = None, category: str = None, limit: int = 20):
    """搜索专家"""
    
    url = f"{API_BASE}/experts/search"
    headers = get_headers(access_token)
    data = {
        "query": query,
        "category": category,
        "min_rating": 4.0,  # 可选
        "limit": limit
    }
    
    response = requests.post(url, headers=headers, json=data, timeout=30)
    
    if response.status_code == 200:
        return {
            "success": True,
            "result": response.json()
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "搜索失败")
        }

# 使用示例
result = search_experts(
    access_token=access_token,
    query="数学方程求解",
    category="education.math",
    limit=10
)

if result["success"]:
    experts = result["result"].get("experts", [])
    print(f"找到 {len(experts)} 个专家:")
    for expert in experts:
        print(f"  - {expert.get('name')} ({expert.get('id')})")
        print(f"    评分: {expert.get('metrics', {}).get('rating', 0):.1f}/5.0")

调用专家

def invoke_expert(access_token: str, expert_id: str, input_data: dict, timeout_ms: int = 30000):
    """调用专家"""
    
    url = f"{API_BASE}/experts/{expert_id}/invoke"
    headers = get_headers(access_token)
    data = {
        "input": input_data,
        "timeout_ms": timeout_ms
    }
    
    response = requests.post(url, headers=headers, json=data, timeout=60)
    
    if response.status_code == 200:
        return {
            "success": True,
            "result": response.json()
        }
    else:
        return {
            "success": False,
            "error": response.json().get("detail", "调用失败")
        }

# 使用示例
result = invoke_expert(
    access_token=access_token,
    expert_id="edu.math.algebra.solver",
    input_data={
        "equation": "x^2 - 5x + 6 = 0",
        "show_steps": True
    }
)

if result["success"]:
    data = result["result"]
    print(f"调用成功: {data.get('success', False)}")
    print(f"结果: {data.get('result', {})}")
    print(f"延迟: {data.get('latency_ms', 0)}ms")
    print(f"成本: ${data.get('cost', 0):.4f}")

完整示例项目

教育应用示例

"""
完整的教育应用示例
演示用户注册、登录、Token计费和使用API的完整流程
"""

import requests
import json
from typing import Optional, Dict, List

class MBEClient:
    """MBE API 客户端封装"""
    
    def __init__(self, base_url: str = "http://localhost:8000"):
        self.base_url = base_url
        self.api_base = f"{base_url}/api/v1"
        self.access_token: Optional[str] = None
        self.user_id: Optional[str] = None
    
    def _get_headers(self) -> Dict[str, str]:
        """获取请求头"""
        headers = {"Content-Type": "application/json"}
        if self.access_token:
            headers["Authorization"] = f"Bearer {self.access_token}"
        return headers
    
    # ========== 用户认证 ==========
    
    def register(self, email: str, password: str, username: str = None) -> Dict:
        """用户注册"""
        url = f"{self.api_base}/users/register"
        data = {"email": email, "password": password}
        if username:
            data["username"] = username
        
        response = requests.post(url, json=data, timeout=10)
        if response.status_code == 200:
            result = response.json()
            self.access_token = result.get("access_token")
            self.user_id = result.get("user_id")
            return {"success": True, **result}
        return {"success": False, "error": response.json().get("detail")}
    
    def login(self, email: str, password: str) -> Dict:
        """用户登录"""
        url = f"{self.api_base}/users/login"
        data = {"email": email, "password": password}
        
        response = requests.post(url, json=data, timeout=10)
        if response.status_code == 200:
            result = response.json()
            self.access_token = result.get("access_token")
            self.user_id = result.get("user_id")
            return {"success": True, **result}
        return {"success": False, "error": response.json().get("detail")}
    
    def get_user_info(self) -> Dict:
        """获取用户信息"""
        url = f"{self.api_base}/users/me"
        response = requests.get(url, headers=self._get_headers(), timeout=10)
        if response.status_code == 200:
            return {"success": True, "user": response.json()}
        return {"success": False, "error": response.json().get("detail")}
    
    # ========== Token 计费 ==========
    
    def get_balance(self) -> Dict:
        """获取 Token 余额"""
        result = self.get_user_info()
        if result["success"]:
            user = result["user"]
            return {
                "success": True,
                "balance": user.get("token_balance", {}),
                "quota": user.get("token_quota", {}),
                "usage": user.get("token_usage", {})
            }
        return result
    
    def get_usage_stats(self, period: str = "month") -> Dict:
        """获取使用统计"""
        url = f"{self.api_base}/users/usage"
        params = {"period": period}
        response = requests.get(url, headers=self._get_headers(), params=params, timeout=10)
        if response.status_code == 200:
            return {"success": True, "stats": response.json()}
        return {"success": False, "error": response.json().get("detail")}
    
    def check_quota(self, required: int = 0) -> Dict:
        """检查配额"""
        url = f"{self.api_base}/users/usage/check"
        params = {"required": required}
        response = requests.get(url, headers=self._get_headers(), params=params, timeout=10)
        if response.status_code == 200:
            return {"success": True, **response.json()}
        return {"success": False, "error": response.json().get("detail")}
    
    # ========== API 调用 ==========
    
    def surprise(self, input_text: str, user_id: str = None, history: List[str] = None) -> Dict:
        """惊讶度检测"""
        url = f"{self.api_base}/surprise"
        data = {
            "input": input_text,
            "user_id": user_id or self.user_id or "default",
            "history": history or []
        }
        response = requests.post(url, headers=self._get_headers(), json=data, timeout=30)
        if response.status_code == 200:
            return {"success": True, "result": response.json()}
        return {"success": False, "error": response.json().get("detail")}
    
    def route(self, input_text: str, task_type: str, user_level: str = "intermediate") -> Dict:
        """智能路由"""
        url = f"{self.api_base}/route"
        data = {
            "input": input_text,
            "task_type": task_type,
            "user_level": user_level
        }
        response = requests.post(url, headers=self._get_headers(), json=data, timeout=30)
        if response.status_code == 200:
            return {"success": True, "result": response.json()}
        return {"success": False, "error": response.json().get("detail")}


# ========== 使用示例 ==========

def main():
    """主函数:演示完整流程"""
    
    client = MBEClient()
    
    # 1. 注册或登录
    print("=" * 50)
    print("步骤 1: 用户注册/登录")
    print("=" * 50)
    
    email = "test@example.com"
    password = "Test123456!"
    
    # 尝试登录,如果失败则注册
    result = client.login(email, password)
    if not result["success"]:
        print("登录失败,尝试注册...")
        result = client.register(email, password, username="testuser")
    
    if result["success"]:
        print(f"✅ 认证成功!用户ID: {client.user_id}")
        print(f"Token: {client.access_token[:30]}...")
    else:
        print(f"❌ 认证失败: {result.get('error')}")
        return
    
    # 2. 获取用户信息和余额
    print("\n" + "=" * 50)
    print("步骤 2: 获取用户信息和 Token 余额")
    print("=" * 50)
    
    balance_result = client.get_balance()
    if balance_result["success"]:
        balance = balance_result["balance"]
        quota = balance_result["quota"]
        usage = balance_result["usage"]
        
        print(f"Token 余额: {balance.get('remaining', 0)}")
        print(f"本月配额: {quota.get('monthly_tokens', 0)}")
        print(f"本月已用: {usage.get('used_this_month', 0)}")
    
    # 3. 检查配额
    print("\n" + "=" * 50)
    print("步骤 3: 检查配额")
    print("=" * 50)
    
    quota_result = client.check_quota(required=1000)
    if quota_result["success"]:
        if quota_result.get("has_quota"):
            print(f"✅ 配额充足,剩余: {quota_result.get('remaining', 0)} tokens")
        else:
            print(f"⚠️ 配额不足: {quota_result.get('message', '')}")
    
    # 4. 调用 Surprise API
    print("\n" + "=" * 50)
    print("步骤 4: 调用 Surprise API")
    print("=" * 50)
    
    surprise_result = client.surprise(
        input_text="递归是函数调用自身的技术",
        user_id=client.user_id,
        history=["变量", "循环", "函数"]
    )
    
    if surprise_result["success"]:
        data = surprise_result["result"]
        print(f"惊讶度分数: {data.get('surprise_score', 0):.2f}")
        print(f"是否新颖: {data.get('is_novel', False)}")
        print(f"学习模式: {data.get('learning_mode', 'unknown')}")
    
    # 5. 调用 Route API
    print("\n" + "=" * 50)
    print("步骤 5: 调用 Route API")
    print("=" * 50)
    
    route_result = client.route(
        input_text="解释二分查找算法",
        task_type="explanation",
        user_level="beginner"
    )
    
    if route_result["success"]:
        data = route_result["result"]
        print(f"路由置信度: {data.get('routing_confidence', 0):.2f}")
        print("选中的专家:")
        for expert in data.get('selected_experts', [])[:3]:
            print(f"  - {expert.get('id')}: 权重 {expert.get('weight', 0):.2f}")
    
    # 6. 获取使用统计
    print("\n" + "=" * 50)
    print("步骤 6: 获取使用统计")
    print("=" * 50)
    
    stats_result = client.get_usage_stats(period="month")
    if stats_result["success"]:
        stats = stats_result["stats"]
        print(f"今日使用: {stats.get('today_usage', 0)}")
        print(f"本月使用: {stats.get('month_usage', 0)}")
        print(f"剩余额度: {stats.get('remaining', 0)}")
    
    print("\n" + "=" * 50)
    print("示例完成!")
    print("=" * 50)


if __name__ == "__main__":
    main()

错误处理

常见错误码

HTTP 状态码 错误码 说明 处理建议
400 INVALID_REQUEST 请求参数错误 检查请求参数
401 AUTH_FAILED 认证失败 检查 Token 是否有效
403 INSUFFICIENT_QUOTA 配额不足 检查 Token 余额或升级套餐
404 RESOURCE_NOT_FOUND 资源不存在 检查资源 ID 是否正确
429 RATE_LIMITED 速率限制 降低请求频率或等待
500 INTERNAL_ERROR 服务器错误 联系技术支持

错误处理示例

def safe_api_call(func, *args, **kwargs):
    """安全的 API 调用包装器"""
    try:
        result = func(*args, **kwargs)
        if result.get("success"):
            return result
        else:
            error = result.get("error", "未知错误")
            # 根据错误类型处理
            if "配额" in error or "quota" in error.lower():
                print("⚠️ 配额不足,请升级套餐或充值")
            elif "认证" in error or "auth" in error.lower():
                print("⚠️ 认证失败,请重新登录")
            elif "速率" in error or "rate" in error.lower():
                print("⚠️ 请求过于频繁,请稍后重试")
            else:
                print(f"❌ 错误: {error}")
            return result
    except requests.exceptions.Timeout:
        print("❌ 请求超时,请稍后重试")
        return {"success": False, "error": "请求超时"}
    except requests.exceptions.ConnectionError:
        print("❌ 连接失败,请检查网络")
        return {"success": False, "error": "连接失败"}
    except Exception as e:
        print(f"❌ 未知错误: {e}")
        return {"success": False, "error": str(e)}

# 使用示例
result = safe_api_call(client.surprise, "test", user_id="user_123")

最佳实践

1. Token 管理

  • ✅ 将 Token 存储在安全的地方(环境变量、密钥管理服务)
  • ✅ 定期刷新 Token(使用 refresh_token)
  • ❌ 不要在代码中硬编码 Token
  • ❌ 不要将 Token 提交到版本控制系统

2. 错误处理

  • ✅ 始终检查 API 响应状态
  • ✅ 实现重试机制(对于临时错误)
  • ✅ 记录错误日志以便调试

3. 性能优化

  • ✅ 使用连接池复用 HTTP 连接
  • ✅ 批量处理请求(如果 API 支持)
  • ✅ 合理设置超时时间

4. 配额管理

  • ✅ 在调用 API 前检查配额
  • ✅ 监控 Token 使用情况
  • ✅ 设置配额预警阈值

更多资源


文档版本: 1.0.0
最后更新: 2026-02-08