MBE API 使用示例

本文档提供 MBE API 的常见使用场景和代码示例。

目录

  1. 认证
  2. 聊天对话
  3. 知识库管理
  4. 专家市场
  5. 错误处理

认证

Python 示例

import requests

BASE_URL = "https://mbe.hi-maker.com"

# 1. 用户注册
def register_user(username: str, email: str, password: str):
    """注册新用户"""
    response = requests.post(
        f"{BASE_URL}/api/v1/users/register",
        json={
            "username": username,
            "email": email,
            "password": password
        }
    )
    return response.json()

# 2. 用户登录
def login(username: str, password: str):
    """用户登录,获取Token"""
    response = requests.post(
        f"{BASE_URL}/api/v1/users/login",
        json={
            "username": username,
            "password": password
        }
    )
    data = response.json()
    return data["access_token"]

# 3. 使用Token访问API
def get_user_info(token: str):
    """获取当前用户信息"""
    response = requests.get(
        f"{BASE_URL}/api/v1/users/me",
        headers={
            "Authorization": f"Bearer {token}"
        }
    )
    return response.json()

# 使用示例
token = login("user@example.com", "password123")
user_info = get_user_info(token)
print(user_info)

JavaScript 示例

const BASE_URL = 'https://mbe.hi-maker.com';

// 1. 用户登录
async function login(username, password) {
    const response = await fetch(`${BASE_URL}/api/v1/users/login`, {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json'
        },
        body: JSON.stringify({
            username,
            password
        })
    });
    const data = await response.json();
    return data.access_token;
}

// 2. 使用Token访问API
async function getUserInfo(token) {
    const response = await fetch(`${BASE_URL}/api/v1/users/me`, {
        headers: {
            'Authorization': `Bearer ${token}`
        }
    });
    return await response.json();
}

// 使用示例
const token = await login('user@example.com', 'password123');
const userInfo = await getUserInfo(token);
console.log(userInfo);

聊天对话

Python 示例

import requests

BASE_URL = "https://mbe.hi-maker.com"
API_KEY = "your_api_key"

def chat(query: str, expert_id: str = None, context: list = None):
    """发送聊天消息"""
    response = requests.post(
        f"{BASE_URL}/api/chat/message",
        headers={
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json"
        },
        json={
            "query": query,
            "expert_id": expert_id,  # 可选,不指定则自动路由
            "context": context or [],
            "stream": False
        }
    )
    return response.json()

# 使用示例
response = chat(
    query="什么是合同违约?",
    expert_id="civil_lawyer"  # 指定法律专家
)

print(f"回答: {response['answer']}")
print(f"专家: {response['expert_name']}")
print(f"置信度: {response['confidence']}")

流式响应示例

import requests
import json

def chat_stream(query: str):
    """流式聊天响应"""
    response = requests.post(
        f"{BASE_URL}/api/chat/message",
        headers={
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json"
        },
        json={
            "query": query,
            "stream": True
        },
        stream=True
    )
    
    for line in response.iter_lines():
        if line:
            data = json.loads(line)
            if data.get("type") == "chunk":
                print(data["content"], end="", flush=True)
            elif data.get("type") == "done":
                print(f"\n\n专家: {data['expert_name']}")
                break

# 使用示例
chat_stream("请介绍一下MBE系统")

知识库管理

Python 示例

import requests

BASE_URL = "https://mbe.hi-maker.com"
API_KEY = "your_api_key"
headers = {"Authorization": f"Bearer {API_KEY}"}

# 1. 创建知识库
def create_knowledge_base(name: str, description: str):
    """创建新知识库"""
    response = requests.post(
        f"{BASE_URL}/admin/knowledge/create",
        headers=headers,
        json={
            "name": name,
            "description": description
        }
    )
    return response.json()

# 2. 上传文档
def upload_document(kb_id: str, file_path: str):
    """上传文档到知识库"""
    with open(file_path, 'rb') as f:
        files = {'file': f}
        response = requests.post(
            f"{BASE_URL}/admin/knowledge/{kb_id}/upload",
            headers=headers,
            files=files
        )
    return response.json()

# 3. 搜索知识库
def search_knowledge_base(kb_id: str, query: str, top_k: int = 5):
    """搜索知识库"""
    response = requests.post(
        f"{BASE_URL}/admin/knowledge/{kb_id}/search",
        headers=headers,
        json={
            "query": query,
            "top_k": top_k
        }
    )
    return response.json()

# 使用示例
kb = create_knowledge_base("法律知识库", "包含法律相关文档")
kb_id = kb["data"]["kb_id"]

upload_document(kb_id, "contract_law.pdf")

results = search_knowledge_base(kb_id, "合同违约")
for result in results["data"]["results"]:
    print(f"相关度: {result['score']}")
    print(f"内容: {result['content'][:100]}...")

专家市场

Python 示例

import requests

BASE_URL = "https://mbe.hi-maker.com"
API_KEY = "your_api_key"
headers = {"Authorization": f"Bearer {API_KEY}"}

# 1. 浏览专家
def browse_experts(category: str = None, min_rating: float = None):
    """浏览专家列表"""
    params = {}
    if category:
        params["category"] = category
    if min_rating:
        params["min_rating"] = min_rating
    
    response = requests.get(
        f"{BASE_URL}/api/market/experts",
        headers=headers,
        params=params
    )
    return response.json()

# 2. 调用专家
def invoke_expert(expert_id: str, query: str):
    """调用专家回答问题"""
    response = requests.post(
        f"{BASE_URL}/api/market/experts/{expert_id}/invoke",
        headers=headers,
        json={
            "query": query,
            "user_id": "user_123"
        }
    )
    return response.json()

# 使用示例
experts = browse_experts(category="法律", min_rating=4.5)
for expert in experts["data"]["experts"]:
    print(f"{expert['name']}: {expert['rating']}")

response = invoke_expert("civil_lawyer", "什么是合同违约?")
print(response["data"]["answer"])

错误处理

Python 示例

import requests
from requests.exceptions import RequestException

BASE_URL = "https://mbe.hi-maker.com"
API_KEY = "your_api_key"

def handle_api_request(method: str, endpoint: str, **kwargs):
    """统一的API请求处理,包含错误处理"""
    url = f"{BASE_URL}{endpoint}"
    headers = kwargs.pop("headers", {})
    headers["Authorization"] = f"Bearer {API_KEY}"
    
    try:
        response = requests.request(method, url, headers=headers, **kwargs)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        if response.status_code == 401:
            print("认证失败,请检查API Key")
        elif response.status_code == 429:
            print("请求频率超限,请稍后重试")
        elif response.status_code == 400:
            error_data = response.json()
            print(f"请求参数错误: {error_data.get('error', {}).get('message')}")
        else:
            print(f"HTTP错误 {response.status_code}: {e}")
        raise
    except RequestException as e:
        print(f"网络错误: {e}")
        raise

# 使用示例
try:
    result = handle_api_request("GET", "/api/v1/users/me")
    print(result)
except Exception as e:
    print(f"请求失败: {e}")

重试机制示例

import requests
import time
from typing import Optional

def api_request_with_retry(
    method: str,
    endpoint: str,
    max_retries: int = 3,
    retry_delay: float = 1.0,
    **kwargs
) -> Optional[dict]:
    """带重试机制的API请求"""
    url = f"{BASE_URL}{endpoint}"
    headers = kwargs.pop("headers", {})
    headers["Authorization"] = f"Bearer {API_KEY}"
    
    for attempt in range(max_retries):
        try:
            response = requests.request(method, url, headers=headers, **kwargs)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as e:
            # 4xx错误不重试
            if 400 <= response.status_code < 500:
                raise
            # 5xx错误重试
            if attempt < max_retries - 1:
                wait_time = retry_delay * (2 ** attempt)  # 指数退避
                print(f"请求失败,{wait_time}秒后重试... (尝试 {attempt + 1}/{max_retries})")
                time.sleep(wait_time)
            else:
                raise
        except requests.exceptions.RequestException as e:
            if attempt < max_retries - 1:
                wait_time = retry_delay * (2 ** attempt)
                print(f"网络错误,{wait_time}秒后重试... (尝试 {attempt + 1}/{max_retries})")
                time.sleep(wait_time)
            else:
                raise
    
    return None

# 使用示例
result = api_request_with_retry("GET", "/api/v1/users/me")

完整示例:聊天机器人

import requests
from typing import List, Dict

class MBEChatBot:
    """MBE聊天机器人客户端"""
    
    def __init__(self, api_key: str, base_url: str = "https://mbe.hi-maker.com"):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        self.conversation_history: List[Dict] = []
    
    def chat(self, query: str, expert_id: str = None) -> Dict:
        """发送消息并获取回答"""
        # 构建上下文
        context = [
            {"role": msg["role"], "content": msg["content"]}
            for msg in self.conversation_history[-5:]  # 只保留最近5条
        ]
        
        # 发送请求
        response = requests.post(
            f"{self.base_url}/api/chat/message",
            headers=self.headers,
            json={
                "query": query,
                "expert_id": expert_id,
                "context": context,
                "stream": False
            }
        )
        response.raise_for_status()
        result = response.json()
        
        # 保存到历史记录
        self.conversation_history.append({
            "role": "user",
            "content": query
        })
        self.conversation_history.append({
            "role": "assistant",
            "content": result["answer"]
        })
        
        return result
    
    def clear_history(self):
        """清空对话历史"""
        self.conversation_history = []

# 使用示例
bot = MBEChatBot(api_key="your_api_key")

# 第一轮对话
response = bot.chat("什么是合同违约?")
print(f"回答: {response['answer']}")
print(f"专家: {response['expert_name']}")

# 第二轮对话(带上下文)
response = bot.chat("那应该如何处理?")
print(f"回答: {response['answer']}")

更多资源