MBE SDK 使用示例
本文档提供 MBE API 的代码示例,帮助开发者快速集成。
目录
Python 示例
安装依赖
pip install requests httpx
MBE 客户端类
"""
MBE Python SDK 示例
"""
import requests
from typing import Optional, Dict, Any, List
class MBEClient:
"""MBE API 客户端"""
def __init__(self, api_key: str, base_url: str = "http://localhost:8000"):
self.api_key = api_key
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
})
def _request(self, method: str, path: str, **kwargs) -> Dict[str, Any]:
"""发送请求"""
url = f"{self.base_url}{path}"
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
# ==================== 知识库管理 ====================
def create_knowledge_base(
self,
name: str,
description: str = "",
language: str = "auto"
) -> Dict[str, Any]:
"""创建知识库"""
return self._request("POST", "/admin/knowledge/create", json={
"name": name,
"description": description,
"language": language
})
def upload_file(self, kb_id: str, file_path: str) -> Dict[str, Any]:
"""上传文件到知识库"""
with open(file_path, "rb") as f:
files = {"file": (file_path.split("/")[-1], f)}
# 临时移除 Content-Type header
headers = {"Authorization": f"Bearer {self.api_key}"}
response = requests.post(
f"{self.base_url}/admin/knowledge/upload/{kb_id}",
files=files,
headers=headers
)
response.raise_for_status()
return response.json()
def import_url(
self,
kb_id: str,
url: str,
chunk_size: int = 1000
) -> Dict[str, Any]:
"""从 URL 导入内容"""
return self._request("POST", f"/admin/knowledge/import-url/{kb_id}", json={
"url": url,
"chunk_size": chunk_size
})
def import_urls(self, kb_id: str, urls: List[str]) -> Dict[str, Any]:
"""批量导入 URL"""
return self._request("POST", f"/admin/knowledge/import-urls/{kb_id}", json=urls)
def list_knowledge_bases(self) -> Dict[str, Any]:
"""获取知识库列表"""
return self._request("GET", "/admin/knowledge/")
def get_knowledge_base(self, kb_id: str) -> Dict[str, Any]:
"""获取知识库详情"""
return self._request("GET", f"/admin/knowledge/{kb_id}")
def search_knowledge_base(
self,
kb_id: str,
query: str,
top_k: int = 5
) -> Dict[str, Any]:
"""搜索知识库"""
return self._request("POST", "/admin/knowledge/search", json={
"kb_id": kb_id,
"query": query,
"top_k": top_k
})
def delete_knowledge_base(self, kb_id: str) -> Dict[str, Any]:
"""删除知识库"""
return self._request("DELETE", f"/admin/knowledge/{kb_id}")
# ==================== 专家管理 ====================
def validate_expert(self, kb_id: str) -> Dict[str, Any]:
"""验证知识库是否可发布"""
return self._request("POST", "/admin/knowledge/expert/validate", json={
"kb_id": kb_id
})
def auto_optimize_expert(self, kb_id: str) -> Dict[str, Any]:
"""一键优化专家信息"""
return self._request("POST", f"/admin/knowledge/expert/auto-optimize?kb_id={kb_id}")
def publish_expert(
self,
kb_id: str,
name: str,
description: str,
keywords: List[str],
domains: List[str] = None,
priority: int = 6,
greeting: str = ""
) -> Dict[str, Any]:
"""发布专家"""
return self._request("POST", "/admin/knowledge/expert/publish", json={
"kb_id": kb_id,
"name": name,
"description": description,
"keywords": keywords,
"domains": domains or [],
"priority": priority,
"greeting": greeting
})
def list_experts(self) -> Dict[str, Any]:
"""获取专家列表"""
return self._request("GET", "/admin/knowledge/expert/list")
def get_expert_status(self, expert_id: str) -> Dict[str, Any]:
"""获取专家状态"""
return self._request("GET", f"/admin/knowledge/expert/{expert_id}/status")
def delete_expert(self, expert_id: str) -> Dict[str, Any]:
"""删除专家"""
return self._request("DELETE", f"/admin/knowledge/expert/{expert_id}")
# ==================== 对话 ====================
def chat(
self,
text: str,
device_id: str = None,
session_id: str = None
) -> Dict[str, Any]:
"""发送对话"""
data = {"text": text}
if device_id:
data["device_id"] = device_id
if session_id:
data["session_id"] = session_id
return self._request("POST", "/mcp/analyze", json=data)
def get_history(
self,
device_id: str,
limit: int = 20,
offset: int = 0
) -> Dict[str, Any]:
"""获取对话历史"""
return self._request("GET", f"/api/v1/history/{device_id}?limit={limit}&offset={offset}")
# 使用示例
if __name__ == "__main__":
# 初始化客户端
client = MBEClient(
api_key="mbe_sk_your_api_key_here",
base_url="http://localhost:8000"
)
# 1. 创建知识库
print("创建知识库...")
kb = client.create_knowledge_base(
name="法律咨询知识库",
description="包含常见法律问题解答"
)
kb_id = kb["kb_id"]
print(f"知识库已创建: {kb_id}")
# 2. 上传文件
print("\n上传文件...")
result = client.upload_file(kb_id, "legal_guide.pdf")
print(f"上传结果: {result['stats']}")
# 3. 导入网页
print("\n导入网页...")
result = client.import_url(kb_id, "https://example.com/legal-faq")
print(f"导入结果: {result['extracted']}")
# 4. 验证并发布专家
print("\n验证专家...")
validation = client.validate_expert(kb_id)
if validation["can_publish"]:
# 一键优化
optimized = client.auto_optimize_expert(kb_id)
opt_info = optimized["optimized"]
# 发布
print("\n发布专家...")
expert = client.publish_expert(
kb_id=kb_id,
name=opt_info["name"],
description=opt_info["description"],
keywords=opt_info["keywords"],
domains=opt_info["domains"]
)
print(f"专家已发布: {expert['expert']['id']}")
# 5. 对话测试
print("\n对话测试...")
response = client.chat("合同违约怎么处理?")
print(f"回答: {response['response'][:200]}...")
异步版本
"""
MBE Python SDK - 异步版本
"""
import httpx
from typing import Optional, Dict, Any, List
class AsyncMBEClient:
"""MBE 异步 API 客户端"""
def __init__(self, api_key: str, base_url: str = "http://localhost:8000"):
self.api_key = api_key
self.base_url = base_url.rstrip("/")
self.client = httpx.AsyncClient(
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
timeout=30.0
)
async def close(self):
"""关闭客户端"""
await self.client.aclose()
async def _request(self, method: str, path: str, **kwargs) -> Dict[str, Any]:
"""发送请求"""
url = f"{self.base_url}{path}"
response = await self.client.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
async def create_knowledge_base(self, name: str, description: str = "") -> Dict[str, Any]:
"""创建知识库"""
return await self._request("POST", "/admin/knowledge/create", json={
"name": name,
"description": description
})
async def chat(self, text: str, device_id: str = None) -> Dict[str, Any]:
"""发送对话"""
return await self._request("POST", "/mcp/analyze", json={
"text": text,
"device_id": device_id
})
async def search(self, kb_id: str, query: str, top_k: int = 5) -> Dict[str, Any]:
"""搜索知识库"""
return await self._request("POST", "/admin/knowledge/search", json={
"kb_id": kb_id,
"query": query,
"top_k": top_k
})
# 异步使用示例
async def main():
client = AsyncMBEClient(
api_key="mbe_sk_your_api_key",
base_url="http://localhost:8000"
)
try:
# 创建知识库
kb = await client.create_knowledge_base("测试知识库", "测试用")
print(f"创建成功: {kb}")
# 对话
response = await client.chat("你好")
print(f"回答: {response}")
finally:
await client.close()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
JavaScript 示例
Node.js 客户端
/**
* MBE JavaScript SDK
*/
const axios = require('axios');
const FormData = require('form-data');
const fs = require('fs');
class MBEClient {
constructor(apiKey, baseUrl = 'http://localhost:8000') {
this.apiKey = apiKey;
this.baseUrl = baseUrl.replace(/\/$/, '');
this.client = axios.create({
baseURL: this.baseUrl,
headers: {
'Authorization': `Bearer ${apiKey}`,
'Content-Type': 'application/json'
},
timeout: 30000
});
}
// ==================== 知识库管理 ====================
async createKnowledgeBase(name, description = '', language = 'auto') {
const response = await this.client.post('/admin/knowledge/create', {
name, description, language
});
return response.data;
}
async uploadFile(kbId, filePath) {
const form = new FormData();
form.append('file', fs.createReadStream(filePath));
const response = await this.client.post(
`/admin/knowledge/upload/${kbId}`,
form,
{
headers: {
...form.getHeaders(),
'Authorization': `Bearer ${this.apiKey}`
}
}
);
return response.data;
}
async importUrl(kbId, url, chunkSize = 1000) {
const response = await this.client.post(
`/admin/knowledge/import-url/${kbId}`,
{ url, chunk_size: chunkSize }
);
return response.data;
}
async listKnowledgeBases() {
const response = await this.client.get('/admin/knowledge/');
return response.data;
}
async searchKnowledgeBase(kbId, query, topK = 5) {
const response = await this.client.post('/admin/knowledge/search', {
kb_id: kbId,
query,
top_k: topK
});
return response.data;
}
// ==================== 专家管理 ====================
async publishExpert(kbId, name, description, keywords, options = {}) {
const response = await this.client.post('/admin/knowledge/expert/publish', {
kb_id: kbId,
name,
description,
keywords,
domains: options.domains || [],
priority: options.priority || 6,
greeting: options.greeting || ''
});
return response.data;
}
async listExperts() {
const response = await this.client.get('/admin/knowledge/expert/list');
return response.data;
}
async autoOptimizeExpert(kbId) {
const response = await this.client.post(
`/admin/knowledge/expert/auto-optimize?kb_id=${kbId}`
);
return response.data;
}
// ==================== 对话 ====================
async chat(text, deviceId = null, sessionId = null) {
const data = { text };
if (deviceId) data.device_id = deviceId;
if (sessionId) data.session_id = sessionId;
const response = await this.client.post('/mcp/analyze', data);
return response.data;
}
async getHistory(deviceId, limit = 20, offset = 0) {
const response = await this.client.get(
`/api/v1/history/${deviceId}?limit=${limit}&offset=${offset}`
);
return response.data;
}
}
// 使用示例
async function main() {
const client = new MBEClient(
'mbe_sk_your_api_key',
'http://localhost:8000'
);
try {
// 1. 创建知识库
console.log('创建知识库...');
const kb = await client.createKnowledgeBase(
'法律咨询知识库',
'包含常见法律问题解答'
);
console.log(`知识库已创建: ${kb.kb_id}`);
// 2. 上传文件
console.log('\n上传文件...');
const uploadResult = await client.uploadFile(kb.kb_id, './legal_guide.pdf');
console.log(`上传结果: ${JSON.stringify(uploadResult.stats)}`);
// 3. 导入网页
console.log('\n导入网页...');
const importResult = await client.importUrl(
kb.kb_id,
'https://example.com/legal-faq'
);
console.log(`导入结果: ${JSON.stringify(importResult.extracted)}`);
// 4. 一键优化并发布专家
console.log('\n优化并发布专家...');
const optimized = await client.autoOptimizeExpert(kb.kb_id);
const expert = await client.publishExpert(
kb.kb_id,
optimized.optimized.name,
optimized.optimized.description,
optimized.optimized.keywords
);
console.log(`专家已发布: ${expert.expert.id}`);
// 5. 对话测试
console.log('\n对话测试...');
const response = await client.chat('合同违约怎么处理?');
console.log(`回答: ${response.response.substring(0, 200)}...`);
} catch (error) {
console.error('错误:', error.response?.data || error.message);
}
}
main();
浏览器端示例
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>MBE 对话示例</title>
</head>
<body>
<div id="chat-container">
<div id="messages"></div>
<input type="text" id="input" placeholder="输入消息...">
<button onclick="sendMessage()">发送</button>
</div>
<script>
const API_KEY = 'mbe_sk_your_api_key';
const BASE_URL = 'http://localhost:8000';
async function sendMessage() {
const input = document.getElementById('input');
const text = input.value.trim();
if (!text) return;
// 显示用户消息
appendMessage('user', text);
input.value = '';
try {
const response = await fetch(`${BASE_URL}/mcp/analyze`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${API_KEY}`
},
body: JSON.stringify({ text })
});
const data = await response.json();
appendMessage('assistant', data.response);
} catch (error) {
appendMessage('error', `错误: ${error.message}`);
}
}
function appendMessage(role, content) {
const messages = document.getElementById('messages');
const div = document.createElement('div');
div.className = `message ${role}`;
div.textContent = content;
messages.appendChild(div);
messages.scrollTop = messages.scrollHeight;
}
// 回车发送
document.getElementById('input').addEventListener('keypress', (e) => {
if (e.key === 'Enter') sendMessage();
});
</script>
</body>
</html>
cURL 示例
创建知识库
curl -X POST "http://localhost:8000/admin/knowledge/create" \
-H "Authorization: Bearer mbe_sk_your_api_key" \
-H "Content-Type: application/json" \
-d '{
"name": "法律咨询知识库",
"description": "包含常见法律问题解答"
}'
上传文件
curl -X POST "http://localhost:8000/admin/knowledge/upload/kb_abc123" \
-H "Authorization: Bearer mbe_sk_your_api_key" \
-F "file=@legal_guide.pdf"
导入 URL
curl -X POST "http://localhost:8000/admin/knowledge/import-url/kb_abc123" \
-H "Authorization: Bearer mbe_sk_your_api_key" \
-H "Content-Type: application/json" \
-d '{
"url": "https://example.com/legal-faq",
"chunk_size": 1000
}'
发布专家
curl -X POST "http://localhost:8000/admin/knowledge/expert/publish" \
-H "Authorization: Bearer mbe_sk_your_api_key" \
-H "Content-Type: application/json" \
-d '{
"kb_id": "kb_abc123",
"name": "法律顾问专家",
"description": "专注于民事法律咨询",
"keywords": ["法律", "合同", "诉讼"]
}'
对话
curl -X POST "http://localhost:8000/mcp/analyze" \
-H "Authorization: Bearer mbe_sk_your_api_key" \
-H "Content-Type: application/json" \
-d '{
"text": "合同违约怎么处理?"
}'
完整项目示例
法律咨询机器人
"""
完整示例:法律咨询机器人
"""
import os
from mbe_client import MBEClient # 使用上面的客户端类
def create_legal_expert():
"""创建法律专家"""
client = MBEClient(
api_key=os.getenv("MBE_API_KEY"),
base_url=os.getenv("MBE_BASE_URL", "http://localhost:8000")
)
# 1. 创建知识库
kb = client.create_knowledge_base(
name="法律咨询知识库",
description="包含合同法、劳动法、民法等常见法律问题"
)
kb_id = kb["kb_id"]
print(f"✅ 知识库创建成功: {kb_id}")
# 2. 上传法律文档
legal_docs = [
"docs/contract_law.pdf",
"docs/labor_law.pdf",
"docs/civil_law.pdf"
]
for doc in legal_docs:
if os.path.exists(doc):
result = client.upload_file(kb_id, doc)
print(f"✅ 上传成功: {doc} ({result['stats']['chunk_count']} chunks)")
# 3. 导入法律网站内容
legal_urls = [
"https://www.court.gov.cn/zixun/",
"https://www.moj.gov.cn/pub/sfbgw/"
]
for url in legal_urls:
try:
result = client.import_url(kb_id, url)
print(f"✅ 导入成功: {url} ({result['extracted']['chunks_added']} chunks)")
except Exception as e:
print(f"⚠️ 导入失败: {url} - {e}")
# 4. 验证并优化
validation = client.validate_expert(kb_id)
if not validation["can_publish"]:
print(f"❌ 无法发布: {validation['errors']}")
return None
optimized = client.auto_optimize_expert(kb_id)
opt = optimized["optimized"]
print(f"✅ 优化建议: {opt['name']}")
# 5. 发布专家
expert = client.publish_expert(
kb_id=kb_id,
name=opt["name"],
description=opt["description"],
keywords=opt["keywords"],
domains=opt["domains"],
priority=8,
greeting="您好!我是法律顾问专家,可以为您解答合同、劳动、民事等法律问题。请问有什么可以帮助您的?"
)
print(f"✅ 专家发布成功: {expert['expert']['id']}")
return expert["expert"]["id"]
def chat_with_legal_expert():
"""与法律专家对话"""
client = MBEClient(
api_key=os.getenv("MBE_API_KEY"),
base_url=os.getenv("MBE_BASE_URL", "http://localhost:8000")
)
print("\n=== 法律咨询机器人 ===")
print("输入 'quit' 退出\n")
session_id = None
while True:
user_input = input("您: ").strip()
if user_input.lower() == 'quit':
break
if not user_input:
continue
response = client.chat(user_input, session_id=session_id)
session_id = response.get("session_id")
print(f"\n专家: {response['response']}\n")
if __name__ == "__main__":
# 首次运行创建专家
# expert_id = create_legal_expert()
# 对话测试
chat_with_legal_expert()
文档版本: v1.0 更新时间: 2026-01-24