MBE 开发最佳实践
本文档总结了创建高质量 AI 专家的最佳实践,帮助您优化专家性能和用户体验。
目录
- 知识库优化
- 专家配置
- API 调用优化
- 安全最佳实践
- 性能优化
- 监控与运维
知识库优化
1. 文档质量要求
✅ 推荐的文档类型
| 类型 |
适用场景 |
效果 |
| 专业教材 |
系统性知识 |
⭐⭐⭐⭐⭐ |
| 官方文档 |
产品/技术说明 |
⭐⭐⭐⭐⭐ |
| FAQ 文档 |
常见问题 |
⭐⭐⭐⭐ |
| 研究报告 |
行业分析 |
⭐⭐⭐⭐ |
| 新闻文章 |
时效性内容 |
⭐⭐⭐ |
❌ 避免的文档类型
- 扫描质量差的 PDF(OCR 准确率低)
- 图片为主的文档
- 格式混乱的文件
- 过于简短的内容(<100字/页)
2. 文档预处理建议
# 上传前检查文档质量
def check_document_quality(file_path):
"""检查文档质量"""
issues = []
# 检查文件大小
import os
size_mb = os.path.getsize(file_path) / (1024 * 1024)
if size_mb > 50:
issues.append(f"文件过大 ({size_mb:.1f}MB),建议拆分")
# 检查 PDF 页数
if file_path.endswith('.pdf'):
import PyPDF2
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
pages = len(reader.pages)
if pages > 500:
issues.append(f"页数过多 ({pages}页),建议拆分")
return issues
3. 知识库组织策略
按主题分库(推荐)
法律咨询系统/
├── kb_contract_law # 合同法知识库
├── kb_labor_law # 劳动法知识库
├── kb_civil_law # 民法知识库
└── kb_faq # 常见问题知识库
按来源分库
技术文档系统/
├── kb_official_docs # 官方文档
├── kb_community # 社区文档
└── kb_internal # 内部文档
4. 内容更新策略
# 定期更新知识库
async def update_knowledge_base(client, kb_id, new_docs):
"""增量更新知识库"""
for doc in new_docs:
# 检查是否已存在
existing = await client.search_knowledge_base(
kb_id=kb_id,
query=doc['title'],
top_k=1
)
# 如果相似度过高,跳过
if existing['results'] and existing['results'][0]['score'] > 0.95:
print(f"跳过重复文档: {doc['title']}")
continue
# 上传新文档
await client.upload_file(kb_id, doc['path'])
print(f"已更新: {doc['title']}")
专家配置
1. 名称和描述
✅ 好的命名
名称: 法律顾问专家
描述: 专注于合同法、劳动法领域,提供专业法律咨询服务,可解答合同纠纷、劳动争议等问题。
❌ 不好的命名
名称: AI助手
描述: 回答问题的机器人
2. 关键词配置
# 关键词配置策略
keywords_config = {
# 核心关键词(高权重)
"core": ["法律咨询", "合同纠纷", "劳动争议"],
# 相关关键词(中权重)
"related": ["违约金", "赔偿", "仲裁", "诉讼"],
# 问法关键词(帮助匹配)
"questions": ["怎么处理", "如何解决", "什么是", "需要什么"]
}
# 合并关键词
final_keywords = (
keywords_config["core"] +
keywords_config["related"] +
[f"{k}{q}" for k in keywords_config["core"][:3]
for q in ["怎么办", "如何", "是什么"]]
)
3. 系统提示词优化
system_prompt = """
你是一位专业的法律顾问,具有以下特点:
1. 专业领域:合同法、劳动法、民法
2. 服务对象:普通用户和企业客户
3. 回答风格:专业、准确、易懂
回答要求:
- 首先明确用户的问题类型
- 提供专业的法律依据
- 给出具体的建议和步骤
- 必要时提醒用户寻求专业律师帮助
注意事项:
- 不要给出绝对性的法律判断
- 复杂案件建议咨询专业律师
- 保护用户隐私,不要索取敏感信息
"""
4. 优先级设置
| 优先级 |
适用场景 |
说明 |
| 9-10 |
核心专家 |
高频使用,最先匹配 |
| 7-8 |
常用专家 |
日常咨询 |
| 5-6 |
一般专家 |
默认级别 |
| 3-4 |
补充专家 |
特定场景 |
| 1-2 |
备用专家 |
其他都不匹配时 |
API 调用优化
1. 连接池配置
import httpx
# 推荐的连接池配置
client = httpx.AsyncClient(
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30
),
timeout=httpx.Timeout(
connect=5.0,
read=30.0,
write=10.0,
pool=10.0
)
)
2. 重试机制
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def call_mbe_api(client, endpoint, data):
"""带重试的 API 调用"""
response = await client.post(endpoint, json=data)
response.raise_for_status()
return response.json()
3. 批量处理
async def batch_import_urls(client, kb_id, urls, batch_size=5):
"""批量导入 URL,控制并发"""
results = []
for i in range(0, len(urls), batch_size):
batch = urls[i:i + batch_size]
tasks = [
client.import_url(kb_id, url)
for url in batch
]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
results.extend(batch_results)
# 批次间休息,避免限流
await asyncio.sleep(1)
return results
4. 缓存策略
from functools import lru_cache
import hashlib
import json
class CachedMBEClient:
def __init__(self, client, cache_ttl=300):
self.client = client
self.cache = {}
self.cache_ttl = cache_ttl
def _cache_key(self, method, *args, **kwargs):
data = json.dumps([method, args, kwargs], sort_keys=True)
return hashlib.md5(data.encode()).hexdigest()
async def search(self, kb_id, query, top_k=5):
"""带缓存的搜索"""
key = self._cache_key('search', kb_id, query, top_k)
# 检查缓存
if key in self.cache:
cached, timestamp = self.cache[key]
if time.time() - timestamp < self.cache_ttl:
return cached
# 调用 API
result = await self.client.search(kb_id, query, top_k)
self.cache[key] = (result, time.time())
return result
安全最佳实践
1. API Key 管理
# ✅ 正确做法:使用环境变量
import os
API_KEY = os.getenv("MBE_API_KEY")
# ❌ 错误做法:硬编码
API_KEY = "mbe_sk_xxxxxxxxxxxx" # 危险!
2. 服务端代理
// 后端代理 API 调用,不在前端暴露 API Key
app.post('/api/chat', async (req, res) => {
const response = await axios.post(
`${MBE_BASE_URL}/mcp/analyze`,
{ text: req.body.message },
{ headers: { 'Authorization': `Bearer ${process.env.MBE_API_KEY}` } }
);
res.json(response.data);
});
3. IP 白名单
# 创建 API Key 时设置 IP 白名单
response = requests.post(
f"{BASE_URL}/developer/api/keys",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"name": "production-key",
"allowed_ips": ["192.168.1.0/24", "10.0.0.1"],
"rate_limit": 100,
"daily_quota": 10000
}
)
4. 输入验证
def validate_user_input(text):
"""验证用户输入"""
# 长度限制
if len(text) > 10000:
raise ValueError("输入过长")
# 敏感词过滤
sensitive_words = ["密码", "银行卡号", "身份证"]
for word in sensitive_words:
if word in text:
return text.replace(word, "***")
return text
性能优化
1. 知识库大小控制
| 指标 |
推荐值 |
最大值 |
| 单个知识库 chunk 数 |
< 5000 |
10000 |
| 单个文档大小 |
< 10MB |
100MB |
| 知识库总数 |
按需创建 |
不限 |
2. 响应时间优化
# 1. 预热知识库
async def warmup_knowledge_bases(client, kb_ids):
"""预热知识库,加载到内存"""
for kb_id in kb_ids:
await client.search(kb_id, "测试查询", top_k=1)
print("知识库预热完成")
# 2. 异步并行调用
async def parallel_search(client, query, kb_ids):
"""并行搜索多个知识库"""
tasks = [
client.search(kb_id, query, top_k=3)
for kb_id in kb_ids
]
results = await asyncio.gather(*tasks)
# 合并结果
all_results = []
for r in results:
all_results.extend(r.get('results', []))
# 按分数排序
all_results.sort(key=lambda x: x['score'], reverse=True)
return all_results[:5]
3. 流式响应
import httpx
async def stream_chat(client, text):
"""流式对话,实时返回"""
async with client.stream(
"POST",
f"{BASE_URL}/mcp/analyze/stream",
json={"text": text}
) as response:
async for chunk in response.aiter_text():
yield chunk
监控与运维
1. 健康检查
async def health_check(client):
"""健康检查"""
checks = {
"api": False,
"database": False,
"redis": False
}
try:
response = await client.get(f"{BASE_URL}/api/health/detailed")
data = response.json()
checks["api"] = data.get("status") == "healthy"
checks["database"] = data.get("database", {}).get("status") == "ok"
checks["redis"] = data.get("redis", {}).get("status") == "ok"
except Exception as e:
print(f"健康检查失败: {e}")
return checks
2. 使用统计
async def get_usage_stats(client, days=7):
"""获取使用统计"""
response = await client.get(
f"{BASE_URL}/developer/api/revenue-stats?days={days}"
)
stats = response.json()
print(f"最近 {days} 天统计:")
print(f" 总调用: {stats['usage_stats']['total_calls']}")
print(f" 总 Token: {stats['usage_stats']['total_tokens']}")
print(f" 总成本: ¥{stats['usage_stats']['total_cost']:.2f}")
return stats
3. 告警设置
async def check_and_alert(client, thresholds):
"""检查指标并告警"""
stats = await get_usage_stats(client, days=1)
alerts = []
# 检查调用量
if stats['usage_stats']['total_calls'] > thresholds['max_daily_calls']:
alerts.append(f"调用量超限: {stats['usage_stats']['total_calls']}")
# 检查错误率
error_rate = stats.get('error_rate', 0)
if error_rate > thresholds['max_error_rate']:
alerts.append(f"错误率过高: {error_rate:.2%}")
# 发送告警
if alerts:
await send_alert(alerts)
return alerts
4. 日志记录
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('mbe_client.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('mbe')
class LoggingMBEClient:
def __init__(self, client):
self.client = client
async def chat(self, text, **kwargs):
start = datetime.now()
try:
result = await self.client.chat(text, **kwargs)
duration = (datetime.now() - start).total_seconds()
logger.info(f"Chat success | duration={duration:.2f}s | text_len={len(text)}")
return result
except Exception as e:
logger.error(f"Chat error | text={text[:50]}... | error={e}")
raise
文档版本: v1.0
更新时间: 2026-01-24