MBE API 使用示例
版本: 1.0.0
更新日期: 2026-02-08
本文档提供 MBE API 的完整使用示例,包括用户注册/登录流程、Token 计费使用和常见 API 调用示例。
📚 目录
快速开始
基础设置
import requests
import json
# API 基础地址
BASE_URL = "http://localhost:8000"
API_BASE = f"{BASE_URL}/api/v1"
# 请求头模板
def get_headers(token=None):
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
用户注册/登录流程
1. 用户注册
MBE 支持多种注册方式:
方式 1: 邮箱+密码注册
def register_with_email(email: str, password: str, username: str = None):
"""使用邮箱和密码注册"""
url = f"{API_BASE}/users/register"
data = {
"email": email,
"password": password,
}
if username:
data["username"] = username
response = requests.post(url, json=data, timeout=10)
if response.status_code == 200:
result = response.json()
return {
"success": True,
"user_id": result.get("user_id"),
"access_token": result.get("access_token"),
"refresh_token": result.get("refresh_token"),
"message": result.get("message", "注册成功")
}
else:
error = response.json()
return {
"success": False,
"error": error.get("detail", "注册失败")
}
# 使用示例
result = register_with_email(
email="user@example.com",
password="SecurePassword123!",
username="testuser"
)
if result["success"]:
print(f"注册成功!用户ID: {result['user_id']}")
print(f"访问令牌: {result['access_token'][:30]}...")
access_token = result["access_token"]
else:
print(f"注册失败: {result['error']}")
方式 2: 设备自动注册
def register_with_device(device_id: str):
"""使用设备ID自动注册(适用于小智设备等)"""
url = f"{API_BASE}/users/register"
data = {
"device_id": device_id
}
response = requests.post(url, json=data, timeout=10)
if response.status_code == 200:
result = response.json()
return {
"success": True,
"user_id": result.get("user_id"),
"access_token": result.get("access_token"),
"refresh_token": result.get("refresh_token"),
"message": result.get("message", "设备已注册")
}
else:
error = response.json()
return {
"success": False,
"error": error.get("detail", "注册失败")
}
# 使用示例
result = register_with_device(device_id="device_123456")
if result["success"]:
access_token = result["access_token"]
方式 3: 手机号注册
def register_with_phone(phone: str, password: str):
"""使用手机号注册"""
url = f"{API_BASE}/users/register"
data = {
"phone": phone,
"password": password
}
response = requests.post(url, json=data, timeout=10)
if response.status_code == 200:
result = response.json()
return {
"success": True,
"user_id": result.get("user_id"),
"access_token": result.get("access_token"),
"refresh_token": result.get("refresh_token")
}
else:
return {
"success": False,
"error": response.json().get("detail", "注册失败")
}
2. 用户登录
def login(email: str, password: str):
"""用户登录"""
url = f"{API_BASE}/users/login"
data = {
"email": email,
"password": password
}
response = requests.post(url, json=data, timeout=10)
if response.status_code == 200:
result = response.json()
return {
"success": True,
"access_token": result.get("access_token"),
"refresh_token": result.get("refresh_token"),
"user_id": result.get("user_id"),
"role": result.get("role"),
"email": result.get("email")
}
else:
return {
"success": False,
"error": response.json().get("detail", "登录失败")
}
# 使用示例
result = login(
email="user@example.com",
password="SecurePassword123!"
)
if result["success"]:
access_token = result["access_token"]
refresh_token = result["refresh_token"]
user_id = result["user_id"]
print(f"登录成功!用户ID: {user_id}, 角色: {result['role']}")
else:
print(f"登录失败: {result['error']}")
3. 刷新 Token
def refresh_access_token(refresh_token: str):
"""刷新访问令牌"""
url = f"{API_BASE}/users/refresh"
data = {
"refresh_token": refresh_token
}
response = requests.post(url, json=data, timeout=10)
if response.status_code == 200:
result = response.json()
return {
"success": True,
"access_token": result.get("access_token")
}
else:
return {
"success": False,
"error": response.json().get("detail", "刷新失败")
}
# 使用示例
result = refresh_access_token(refresh_token)
if result["success"]:
access_token = result["access_token"]
print("Token 刷新成功")
4. 获取用户信息
def get_user_info(access_token: str):
"""获取当前用户信息"""
url = f"{API_BASE}/users/me"
headers = get_headers(access_token)
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return {
"success": True,
"user": response.json()
}
else:
return {
"success": False,
"error": response.json().get("detail", "获取失败")
}
# 使用示例
result = get_user_info(access_token)
if result["success"]:
user = result["user"]
print(f"用户ID: {user.get('user_id')}")
print(f"邮箱: {user.get('email')}")
print(f"用户名: {user.get('username')}")
5. 更新用户信息
def update_user_info(access_token: str, username: str = None, **kwargs):
"""更新用户信息"""
url = f"{API_BASE}/users/me"
headers = get_headers(access_token)
data = {}
if username:
data["username"] = username
data.update(kwargs)
response = requests.put(url, headers=headers, json=data, timeout=10)
if response.status_code == 200:
return {
"success": True,
"user": response.json()
}
else:
return {
"success": False,
"error": response.json().get("detail", "更新失败")
}
# 使用示例
result = update_user_info(
access_token=access_token,
username="newname",
bio="这是我的个人简介"
)
Token 计费使用
1. 获取 Token 余额
def get_token_balance(access_token: str):
"""获取用户 Token 余额"""
# 方式 1: 通过用户信息获取(推荐)
url = f"{API_BASE}/users/me"
headers = get_headers(access_token)
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
user = response.json()
# 余额信息在 user 对象中
return {
"success": True,
"balance": user.get("token_balance", {}),
"quota": user.get("token_quota", {}),
"usage": user.get("token_usage", {})
}
else:
return {
"success": False,
"error": response.json().get("detail", "获取失败")
}
# 使用示例
result = get_token_balance(access_token)
if result["success"]:
balance = result["balance"]
quota = result["quota"]
usage = result["usage"]
print(f"Token 余额: {balance.get('remaining', 0)}")
print(f"本月配额: {quota.get('monthly_tokens', 0)}")
print(f"本月已用: {usage.get('used_this_month', 0)}")
print(f"剩余额度: {quota.get('monthly_tokens', 0) - usage.get('used_this_month', 0)}")
2. 获取使用统计
def get_usage_stats(access_token: str, period: str = "month"):
"""
获取使用统计
Args:
period: 统计周期,可选值: "day", "week", "month"
"""
url = f"{API_BASE}/users/usage"
headers = get_headers(access_token)
params = {"period": period}
response = requests.get(url, headers=headers, params=params, timeout=10)
if response.status_code == 200:
return {
"success": True,
"stats": response.json()
}
else:
return {
"success": False,
"error": response.json().get("detail", "获取失败")
}
# 使用示例
result = get_usage_stats(access_token, period="month")
if result["success"]:
stats = result["stats"]
print(f"今日使用: {stats.get('today_usage', 0)}")
print(f"今日限额: {stats.get('today_limit', 0)}")
print(f"本月使用: {stats.get('month_usage', 0)}")
print(f"本月限额: {stats.get('month_limit', 0)}")
print(f"剩余额度: {stats.get('remaining', 0)}")
3. 检查配额
def check_quota(access_token: str, required_tokens: int = 0):
"""检查是否有足够的 Token 配额"""
url = f"{API_BASE}/users/usage/check"
headers = get_headers(access_token)
params = {"required": required_tokens}
response = requests.get(url, headers=headers, params=params, timeout=10)
if response.status_code == 200:
result = response.json()
return {
"success": True,
"has_quota": result.get("has_quota", False),
"remaining": result.get("remaining", 0),
"message": result.get("message", "")
}
else:
return {
"success": False,
"error": response.json().get("detail", "检查失败")
}
# 使用示例
result = check_quota(access_token, required_tokens=1000)
if result["success"]:
if result["has_quota"]:
print(f"配额充足,剩余: {result['remaining']} tokens")
else:
print(f"配额不足: {result['message']}")
4. 获取订阅信息
def get_subscription_info(access_token: str):
"""获取用户订阅信息"""
url = f"{API_BASE}/users/subscription"
headers = get_headers(access_token)
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return {
"success": True,
"subscription": response.json()
}
else:
return {
"success": False,
"error": response.json().get("detail", "获取失败")
}
# 使用示例
result = get_subscription_info(access_token)
if result["success"]:
sub = result["subscription"]
print(f"当前套餐: {sub.get('plan_type', 'free')}")
print(f"套餐名称: {sub.get('plan_name', '免费版')}")
print(f"状态: {sub.get('status', 'unknown')}")
print(f"到期时间: {sub.get('expires_at', 'N/A')}")
5. 获取定价信息
def get_pricing_info(access_token: str):
"""获取可用套餐和定价信息"""
url = f"{API_BASE}/users/pricing"
headers = get_headers(access_token)
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return {
"success": True,
"plans": response.json().get("plans", [])
}
else:
return {
"success": False,
"error": response.json().get("detail", "获取失败")
}
# 使用示例
result = get_pricing_info(access_token)
if result["success"]:
plans = result["plans"]
print("可用套餐:")
for plan in plans:
print(f" - {plan.get('plan_name')}: {plan.get('price', 0)}元/月")
print(f" 每月Token: {plan.get('monthly_tokens', 0)}")
print(f" 特性: {', '.join(plan.get('features', []))}")
6. 升级订阅
def upgrade_subscription(access_token: str, plan_type: str):
"""
升级订阅套餐
Args:
plan_type: 套餐类型,如 "personal", "pro", "enterprise"
"""
url = f"{API_BASE}/users/subscription/upgrade"
headers = get_headers(access_token)
data = {
"plan_type": plan_type
}
response = requests.post(url, headers=headers, json=data, timeout=10)
if response.status_code == 200:
return {
"success": True,
"result": response.json()
}
else:
return {
"success": False,
"error": response.json().get("detail", "升级失败")
}
# 使用示例
result = upgrade_subscription(access_token, plan_type="pro")
if result["success"]:
print("订阅升级成功!")
# 可能需要支付
payment_info = result["result"].get("payment", {})
if payment_info:
print(f"请完成支付: {payment_info.get('amount', 0)}元")
7. 创建支付订单
def create_payment_order(access_token: str, amount: float, payment_method: str = "alipay"):
"""
创建支付订单(用于充值或购买套餐)
Args:
amount: 支付金额(元)
payment_method: 支付方式,可选值: "alipay", "wechat", "stripe"
"""
url = f"{API_BASE}/users/payment/create"
headers = get_headers(access_token)
data = {
"amount": amount,
"payment_method": payment_method,
"purpose": "recharge" # 或 "subscription"
}
response = requests.post(url, headers=headers, json=data, timeout=10)
if response.status_code == 200:
return {
"success": True,
"order": response.json()
}
else:
return {
"success": False,
"error": response.json().get("detail", "创建订单失败")
}
# 使用示例
result = create_payment_order(access_token, amount=100.0, payment_method="alipay")
if result["success"]:
order = result["order"]
print(f"订单创建成功!订单ID: {order.get('order_id')}")
print(f"支付链接: {order.get('payment_url', 'N/A')}")
print(f"金额: {order.get('amount', 0)}元")
API 调用示例
1. Surprise Detection API(惊讶度检测)
def call_surprise_api(access_token: str, input_text: str, user_id: str = None, history: list = None):
"""调用惊讶度检测 API"""
url = f"{API_BASE}/surprise"
headers = get_headers(access_token)
data = {
"input": input_text,
"user_id": user_id or "default",
"history": history or [],
"domain": "education" # 可选
}
response = requests.post(url, headers=headers, json=data, timeout=30)
if response.status_code == 200:
return {
"success": True,
"result": response.json()
}
else:
return {
"success": False,
"error": response.json().get("detail", "调用失败")
}
# 使用示例
result = call_surprise_api(
access_token=access_token,
input_text="递归是函数调用自身的技术",
user_id="student_001",
history=["变量", "循环", "函数"]
)
if result["success"]:
data = result["result"]
print(f"惊讶度分数: {data.get('surprise_score', 0):.2f}")
print(f"是否新颖: {data.get('is_novel', False)}")
print(f"学习模式: {data.get('learning_mode', 'unknown')}")
if data.get('suggestion'):
print(f"建议: {data['suggestion']}")
2. Smart Routing API(智能路由)
def call_route_api(access_token: str, input_text: str, task_type: str, user_level: str = "intermediate"):
"""调用智能路由 API"""
url = f"{API_BASE}/route"
headers = get_headers(access_token)
data = {
"input": input_text,
"task_type": task_type,
"user_level": user_level,
"available_experts": None # 可选,指定可用专家列表
}
response = requests.post(url, headers=headers, json=data, timeout=30)
if response.status_code == 200:
return {
"success": True,
"result": response.json()
}
else:
return {
"success": False,
"error": response.json().get("detail", "调用失败")
}
# 使用示例
result = call_route_api(
access_token=access_token,
input_text="解释二分查找算法",
task_type="explanation",
user_level="beginner"
)
if result["success"]:
data = result["result"]
print(f"路由置信度: {data.get('routing_confidence', 0):.2f}")
print("选中的专家:")
for expert in data.get('selected_experts', []):
print(f" - {expert.get('id')}: 权重 {expert.get('weight', 0):.2f}")
if expert.get('reason'):
print(f" 原因: {expert['reason']}")
3. Memory API(记忆管理)
写入记忆
def write_memory(access_token: str, user_id: str, key: str, content: dict, memory_type: str = "long_term"):
"""写入记忆"""
url = f"{API_BASE}/memory/write"
headers = get_headers(access_token)
data = {
"user_id": user_id,
"memory_type": memory_type, # "short_term", "working", "long_term"
"key": key,
"content": content,
"importance": 0.5 # 0.0-1.0
}
response = requests.post(url, headers=headers, json=data, timeout=30)
if response.status_code == 200:
return {
"success": True,
"result": response.json()
}
else:
return {
"success": False,
"error": response.json().get("detail", "写入失败")
}
# 使用示例
result = write_memory(
access_token=access_token,
user_id="student_001",
key="learning_progress",
content={
"completed_topics": ["变量", "循环", "函数"],
"current_level": "中级",
"weak_areas": ["递归", "动态规划"]
},
memory_type="long_term",
importance=0.9
)
读取记忆
def read_memory(access_token: str, user_id: str, query: str = None, key: str = None, top_k: int = 10):
"""读取记忆"""
url = f"{API_BASE}/memory/read"
headers = get_headers(access_token)
data = {
"user_id": user_id,
"query": query, # 语义搜索查询
"key": key, # 精确匹配键
"top_k": top_k
}
response = requests.post(url, headers=headers, json=data, timeout=30)
if response.status_code == 200:
return {
"success": True,
"result": response.json()
}
else:
return {
"success": False,
"error": response.json().get("detail", "读取失败")
}
# 使用示例
result = read_memory(
access_token=access_token,
user_id="student_001",
query="学习进度",
top_k=5
)
if result["success"]:
memories = result["result"].get("memories", [])
print(f"找到 {len(memories)} 条记忆:")
for mem in memories:
print(f" - {mem.get('key')}: {mem.get('content')}")
4. Expert Marketplace API(专家市场)
搜索专家
def search_experts(access_token: str, query: str = None, category: str = None, limit: int = 20):
"""搜索专家"""
url = f"{API_BASE}/experts/search"
headers = get_headers(access_token)
data = {
"query": query,
"category": category,
"min_rating": 4.0, # 可选
"limit": limit
}
response = requests.post(url, headers=headers, json=data, timeout=30)
if response.status_code == 200:
return {
"success": True,
"result": response.json()
}
else:
return {
"success": False,
"error": response.json().get("detail", "搜索失败")
}
# 使用示例
result = search_experts(
access_token=access_token,
query="数学方程求解",
category="education.math",
limit=10
)
if result["success"]:
experts = result["result"].get("experts", [])
print(f"找到 {len(experts)} 个专家:")
for expert in experts:
print(f" - {expert.get('name')} ({expert.get('id')})")
print(f" 评分: {expert.get('metrics', {}).get('rating', 0):.1f}/5.0")
调用专家
def invoke_expert(access_token: str, expert_id: str, input_data: dict, timeout_ms: int = 30000):
"""调用专家"""
url = f"{API_BASE}/experts/{expert_id}/invoke"
headers = get_headers(access_token)
data = {
"input": input_data,
"timeout_ms": timeout_ms
}
response = requests.post(url, headers=headers, json=data, timeout=60)
if response.status_code == 200:
return {
"success": True,
"result": response.json()
}
else:
return {
"success": False,
"error": response.json().get("detail", "调用失败")
}
# 使用示例
result = invoke_expert(
access_token=access_token,
expert_id="edu.math.algebra.solver",
input_data={
"equation": "x^2 - 5x + 6 = 0",
"show_steps": True
}
)
if result["success"]:
data = result["result"]
print(f"调用成功: {data.get('success', False)}")
print(f"结果: {data.get('result', {})}")
print(f"延迟: {data.get('latency_ms', 0)}ms")
print(f"成本: ${data.get('cost', 0):.4f}")
完整示例项目
教育应用示例
"""
完整的教育应用示例
演示用户注册、登录、Token计费和使用API的完整流程
"""
import requests
import json
from typing import Optional, Dict, List
class MBEClient:
"""MBE API 客户端封装"""
def __init__(self, base_url: str = "http://localhost:8000"):
self.base_url = base_url
self.api_base = f"{base_url}/api/v1"
self.access_token: Optional[str] = None
self.user_id: Optional[str] = None
def _get_headers(self) -> Dict[str, str]:
"""获取请求头"""
headers = {"Content-Type": "application/json"}
if self.access_token:
headers["Authorization"] = f"Bearer {self.access_token}"
return headers
# ========== 用户认证 ==========
def register(self, email: str, password: str, username: str = None) -> Dict:
"""用户注册"""
url = f"{self.api_base}/users/register"
data = {"email": email, "password": password}
if username:
data["username"] = username
response = requests.post(url, json=data, timeout=10)
if response.status_code == 200:
result = response.json()
self.access_token = result.get("access_token")
self.user_id = result.get("user_id")
return {"success": True, **result}
return {"success": False, "error": response.json().get("detail")}
def login(self, email: str, password: str) -> Dict:
"""用户登录"""
url = f"{self.api_base}/users/login"
data = {"email": email, "password": password}
response = requests.post(url, json=data, timeout=10)
if response.status_code == 200:
result = response.json()
self.access_token = result.get("access_token")
self.user_id = result.get("user_id")
return {"success": True, **result}
return {"success": False, "error": response.json().get("detail")}
def get_user_info(self) -> Dict:
"""获取用户信息"""
url = f"{self.api_base}/users/me"
response = requests.get(url, headers=self._get_headers(), timeout=10)
if response.status_code == 200:
return {"success": True, "user": response.json()}
return {"success": False, "error": response.json().get("detail")}
# ========== Token 计费 ==========
def get_balance(self) -> Dict:
"""获取 Token 余额"""
result = self.get_user_info()
if result["success"]:
user = result["user"]
return {
"success": True,
"balance": user.get("token_balance", {}),
"quota": user.get("token_quota", {}),
"usage": user.get("token_usage", {})
}
return result
def get_usage_stats(self, period: str = "month") -> Dict:
"""获取使用统计"""
url = f"{self.api_base}/users/usage"
params = {"period": period}
response = requests.get(url, headers=self._get_headers(), params=params, timeout=10)
if response.status_code == 200:
return {"success": True, "stats": response.json()}
return {"success": False, "error": response.json().get("detail")}
def check_quota(self, required: int = 0) -> Dict:
"""检查配额"""
url = f"{self.api_base}/users/usage/check"
params = {"required": required}
response = requests.get(url, headers=self._get_headers(), params=params, timeout=10)
if response.status_code == 200:
return {"success": True, **response.json()}
return {"success": False, "error": response.json().get("detail")}
# ========== API 调用 ==========
def surprise(self, input_text: str, user_id: str = None, history: List[str] = None) -> Dict:
"""惊讶度检测"""
url = f"{self.api_base}/surprise"
data = {
"input": input_text,
"user_id": user_id or self.user_id or "default",
"history": history or []
}
response = requests.post(url, headers=self._get_headers(), json=data, timeout=30)
if response.status_code == 200:
return {"success": True, "result": response.json()}
return {"success": False, "error": response.json().get("detail")}
def route(self, input_text: str, task_type: str, user_level: str = "intermediate") -> Dict:
"""智能路由"""
url = f"{self.api_base}/route"
data = {
"input": input_text,
"task_type": task_type,
"user_level": user_level
}
response = requests.post(url, headers=self._get_headers(), json=data, timeout=30)
if response.status_code == 200:
return {"success": True, "result": response.json()}
return {"success": False, "error": response.json().get("detail")}
# ========== 使用示例 ==========
def main():
"""主函数:演示完整流程"""
client = MBEClient()
# 1. 注册或登录
print("=" * 50)
print("步骤 1: 用户注册/登录")
print("=" * 50)
email = "test@example.com"
password = "Test123456!"
# 尝试登录,如果失败则注册
result = client.login(email, password)
if not result["success"]:
print("登录失败,尝试注册...")
result = client.register(email, password, username="testuser")
if result["success"]:
print(f"✅ 认证成功!用户ID: {client.user_id}")
print(f"Token: {client.access_token[:30]}...")
else:
print(f"❌ 认证失败: {result.get('error')}")
return
# 2. 获取用户信息和余额
print("\n" + "=" * 50)
print("步骤 2: 获取用户信息和 Token 余额")
print("=" * 50)
balance_result = client.get_balance()
if balance_result["success"]:
balance = balance_result["balance"]
quota = balance_result["quota"]
usage = balance_result["usage"]
print(f"Token 余额: {balance.get('remaining', 0)}")
print(f"本月配额: {quota.get('monthly_tokens', 0)}")
print(f"本月已用: {usage.get('used_this_month', 0)}")
# 3. 检查配额
print("\n" + "=" * 50)
print("步骤 3: 检查配额")
print("=" * 50)
quota_result = client.check_quota(required=1000)
if quota_result["success"]:
if quota_result.get("has_quota"):
print(f"✅ 配额充足,剩余: {quota_result.get('remaining', 0)} tokens")
else:
print(f"⚠️ 配额不足: {quota_result.get('message', '')}")
# 4. 调用 Surprise API
print("\n" + "=" * 50)
print("步骤 4: 调用 Surprise API")
print("=" * 50)
surprise_result = client.surprise(
input_text="递归是函数调用自身的技术",
user_id=client.user_id,
history=["变量", "循环", "函数"]
)
if surprise_result["success"]:
data = surprise_result["result"]
print(f"惊讶度分数: {data.get('surprise_score', 0):.2f}")
print(f"是否新颖: {data.get('is_novel', False)}")
print(f"学习模式: {data.get('learning_mode', 'unknown')}")
# 5. 调用 Route API
print("\n" + "=" * 50)
print("步骤 5: 调用 Route API")
print("=" * 50)
route_result = client.route(
input_text="解释二分查找算法",
task_type="explanation",
user_level="beginner"
)
if route_result["success"]:
data = route_result["result"]
print(f"路由置信度: {data.get('routing_confidence', 0):.2f}")
print("选中的专家:")
for expert in data.get('selected_experts', [])[:3]:
print(f" - {expert.get('id')}: 权重 {expert.get('weight', 0):.2f}")
# 6. 获取使用统计
print("\n" + "=" * 50)
print("步骤 6: 获取使用统计")
print("=" * 50)
stats_result = client.get_usage_stats(period="month")
if stats_result["success"]:
stats = stats_result["stats"]
print(f"今日使用: {stats.get('today_usage', 0)}")
print(f"本月使用: {stats.get('month_usage', 0)}")
print(f"剩余额度: {stats.get('remaining', 0)}")
print("\n" + "=" * 50)
print("示例完成!")
print("=" * 50)
if __name__ == "__main__":
main()
错误处理
常见错误码
| HTTP 状态码 | 错误码 | 说明 | 处理建议 |
|---|---|---|---|
| 400 | INVALID_REQUEST |
请求参数错误 | 检查请求参数 |
| 401 | AUTH_FAILED |
认证失败 | 检查 Token 是否有效 |
| 403 | INSUFFICIENT_QUOTA |
配额不足 | 检查 Token 余额或升级套餐 |
| 404 | RESOURCE_NOT_FOUND |
资源不存在 | 检查资源 ID 是否正确 |
| 429 | RATE_LIMITED |
速率限制 | 降低请求频率或等待 |
| 500 | INTERNAL_ERROR |
服务器错误 | 联系技术支持 |
错误处理示例
def safe_api_call(func, *args, **kwargs):
"""安全的 API 调用包装器"""
try:
result = func(*args, **kwargs)
if result.get("success"):
return result
else:
error = result.get("error", "未知错误")
# 根据错误类型处理
if "配额" in error or "quota" in error.lower():
print("⚠️ 配额不足,请升级套餐或充值")
elif "认证" in error or "auth" in error.lower():
print("⚠️ 认证失败,请重新登录")
elif "速率" in error or "rate" in error.lower():
print("⚠️ 请求过于频繁,请稍后重试")
else:
print(f"❌ 错误: {error}")
return result
except requests.exceptions.Timeout:
print("❌ 请求超时,请稍后重试")
return {"success": False, "error": "请求超时"}
except requests.exceptions.ConnectionError:
print("❌ 连接失败,请检查网络")
return {"success": False, "error": "连接失败"}
except Exception as e:
print(f"❌ 未知错误: {e}")
return {"success": False, "error": str(e)}
# 使用示例
result = safe_api_call(client.surprise, "test", user_id="user_123")
最佳实践
1. Token 管理
- ✅ 将 Token 存储在安全的地方(环境变量、密钥管理服务)
- ✅ 定期刷新 Token(使用 refresh_token)
- ❌ 不要在代码中硬编码 Token
- ❌ 不要将 Token 提交到版本控制系统
2. 错误处理
- ✅ 始终检查 API 响应状态
- ✅ 实现重试机制(对于临时错误)
- ✅ 记录错误日志以便调试
3. 性能优化
- ✅ 使用连接池复用 HTTP 连接
- ✅ 批量处理请求(如果 API 支持)
- ✅ 合理设置超时时间
4. 配额管理
- ✅ 在调用 API 前检查配额
- ✅ 监控 Token 使用情况
- ✅ 设置配额预警阈值
更多资源
文档版本: 1.0.0
最后更新: 2026-02-08