MBE 开发最佳实践

本文档总结了创建高质量 AI 专家的最佳实践,帮助您优化专家性能和用户体验。


目录

  1. 知识库优化
  2. 专家配置
  3. API 调用优化
  4. 安全最佳实践
  5. 性能优化
  6. 监控与运维

知识库优化

1. 文档质量要求

✅ 推荐的文档类型

类型 适用场景 效果
专业教材 系统性知识 ⭐⭐⭐⭐⭐
官方文档 产品/技术说明 ⭐⭐⭐⭐⭐
FAQ 文档 常见问题 ⭐⭐⭐⭐
研究报告 行业分析 ⭐⭐⭐⭐
新闻文章 时效性内容 ⭐⭐⭐

❌ 避免的文档类型

  • 扫描质量差的 PDF(OCR 准确率低)
  • 图片为主的文档
  • 格式混乱的文件
  • 过于简短的内容(<100字/页)

2. 文档预处理建议

# 上传前检查文档质量
def check_document_quality(file_path):
    """检查文档质量"""
    issues = []
    
    # 检查文件大小
    import os
    size_mb = os.path.getsize(file_path) / (1024 * 1024)
    if size_mb > 50:
        issues.append(f"文件过大 ({size_mb:.1f}MB),建议拆分")
    
    # 检查 PDF 页数
    if file_path.endswith('.pdf'):
        import PyPDF2
        with open(file_path, 'rb') as f:
            reader = PyPDF2.PdfReader(f)
            pages = len(reader.pages)
            if pages > 500:
                issues.append(f"页数过多 ({pages}页),建议拆分")
    
    return issues

3. 知识库组织策略

按主题分库(推荐)

法律咨询系统/
├── kb_contract_law     # 合同法知识库
├── kb_labor_law        # 劳动法知识库
├── kb_civil_law        # 民法知识库
└── kb_faq              # 常见问题知识库

按来源分库

技术文档系统/
├── kb_official_docs    # 官方文档
├── kb_community        # 社区文档
└── kb_internal         # 内部文档

4. 内容更新策略

# 定期更新知识库
async def update_knowledge_base(client, kb_id, new_docs):
    """增量更新知识库"""
    for doc in new_docs:
        # 检查是否已存在
        existing = await client.search_knowledge_base(
            kb_id=kb_id,
            query=doc['title'],
            top_k=1
        )
        
        # 如果相似度过高,跳过
        if existing['results'] and existing['results'][0]['score'] > 0.95:
            print(f"跳过重复文档: {doc['title']}")
            continue
        
        # 上传新文档
        await client.upload_file(kb_id, doc['path'])
        print(f"已更新: {doc['title']}")

专家配置

1. 名称和描述

✅ 好的命名

名称: 法律顾问专家
描述: 专注于合同法、劳动法领域,提供专业法律咨询服务,可解答合同纠纷、劳动争议等问题。

❌ 不好的命名

名称: AI助手
描述: 回答问题的机器人

2. 关键词配置

# 关键词配置策略
keywords_config = {
    # 核心关键词(高权重)
    "core": ["法律咨询", "合同纠纷", "劳动争议"],
    
    # 相关关键词(中权重)
    "related": ["违约金", "赔偿", "仲裁", "诉讼"],
    
    # 问法关键词(帮助匹配)
    "questions": ["怎么处理", "如何解决", "什么是", "需要什么"]
}

# 合并关键词
final_keywords = (
    keywords_config["core"] + 
    keywords_config["related"] + 
    [f"{k}{q}" for k in keywords_config["core"][:3] 
                for q in ["怎么办", "如何", "是什么"]]
)

3. 系统提示词优化

system_prompt = """
你是一位专业的法律顾问,具有以下特点:

1. 专业领域:合同法、劳动法、民法
2. 服务对象:普通用户和企业客户
3. 回答风格:专业、准确、易懂

回答要求:
- 首先明确用户的问题类型
- 提供专业的法律依据
- 给出具体的建议和步骤
- 必要时提醒用户寻求专业律师帮助

注意事项:
- 不要给出绝对性的法律判断
- 复杂案件建议咨询专业律师
- 保护用户隐私,不要索取敏感信息
"""

4. 优先级设置

优先级 适用场景 说明
9-10 核心专家 高频使用,最先匹配
7-8 常用专家 日常咨询
5-6 一般专家 默认级别
3-4 补充专家 特定场景
1-2 备用专家 其他都不匹配时

API 调用优化

1. 连接池配置

import httpx

# 推荐的连接池配置
client = httpx.AsyncClient(
    limits=httpx.Limits(
        max_keepalive_connections=20,
        max_connections=100,
        keepalive_expiry=30
    ),
    timeout=httpx.Timeout(
        connect=5.0,
        read=30.0,
        write=10.0,
        pool=10.0
    )
)

2. 重试机制

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def call_mbe_api(client, endpoint, data):
    """带重试的 API 调用"""
    response = await client.post(endpoint, json=data)
    response.raise_for_status()
    return response.json()

3. 批量处理

async def batch_import_urls(client, kb_id, urls, batch_size=5):
    """批量导入 URL,控制并发"""
    results = []
    
    for i in range(0, len(urls), batch_size):
        batch = urls[i:i + batch_size]
        tasks = [
            client.import_url(kb_id, url)
            for url in batch
        ]
        batch_results = await asyncio.gather(*tasks, return_exceptions=True)
        results.extend(batch_results)
        
        # 批次间休息,避免限流
        await asyncio.sleep(1)
    
    return results

4. 缓存策略

from functools import lru_cache
import hashlib
import json

class CachedMBEClient:
    def __init__(self, client, cache_ttl=300):
        self.client = client
        self.cache = {}
        self.cache_ttl = cache_ttl
    
    def _cache_key(self, method, *args, **kwargs):
        data = json.dumps([method, args, kwargs], sort_keys=True)
        return hashlib.md5(data.encode()).hexdigest()
    
    async def search(self, kb_id, query, top_k=5):
        """带缓存的搜索"""
        key = self._cache_key('search', kb_id, query, top_k)
        
        # 检查缓存
        if key in self.cache:
            cached, timestamp = self.cache[key]
            if time.time() - timestamp < self.cache_ttl:
                return cached
        
        # 调用 API
        result = await self.client.search(kb_id, query, top_k)
        self.cache[key] = (result, time.time())
        return result

安全最佳实践

1. API Key 管理

# ✅ 正确做法:使用环境变量
import os
API_KEY = os.getenv("MBE_API_KEY")

# ❌ 错误做法:硬编码
API_KEY = "mbe_sk_xxxxxxxxxxxx"  # 危险!

2. 服务端代理

// 后端代理 API 调用,不在前端暴露 API Key
app.post('/api/chat', async (req, res) => {
    const response = await axios.post(
        `${MBE_BASE_URL}/mcp/analyze`,
        { text: req.body.message },
        { headers: { 'Authorization': `Bearer ${process.env.MBE_API_KEY}` } }
    );
    res.json(response.data);
});

3. IP 白名单

# 创建 API Key 时设置 IP 白名单
response = requests.post(
    f"{BASE_URL}/developer/api/keys",
    headers={"Authorization": f"Bearer {API_KEY}"},
    json={
        "name": "production-key",
        "allowed_ips": ["192.168.1.0/24", "10.0.0.1"],
        "rate_limit": 100,
        "daily_quota": 10000
    }
)

4. 输入验证

def validate_user_input(text):
    """验证用户输入"""
    # 长度限制
    if len(text) > 10000:
        raise ValueError("输入过长")
    
    # 敏感词过滤
    sensitive_words = ["密码", "银行卡号", "身份证"]
    for word in sensitive_words:
        if word in text:
            return text.replace(word, "***")
    
    return text

性能优化

1. 知识库大小控制

指标 推荐值 最大值
单个知识库 chunk 数 < 5000 10000
单个文档大小 < 10MB 100MB
知识库总数 按需创建 不限

2. 响应时间优化

# 1. 预热知识库
async def warmup_knowledge_bases(client, kb_ids):
    """预热知识库,加载到内存"""
    for kb_id in kb_ids:
        await client.search(kb_id, "测试查询", top_k=1)
    print("知识库预热完成")

# 2. 异步并行调用
async def parallel_search(client, query, kb_ids):
    """并行搜索多个知识库"""
    tasks = [
        client.search(kb_id, query, top_k=3)
        for kb_id in kb_ids
    ]
    results = await asyncio.gather(*tasks)
    
    # 合并结果
    all_results = []
    for r in results:
        all_results.extend(r.get('results', []))
    
    # 按分数排序
    all_results.sort(key=lambda x: x['score'], reverse=True)
    return all_results[:5]

3. 流式响应

import httpx

async def stream_chat(client, text):
    """流式对话,实时返回"""
    async with client.stream(
        "POST",
        f"{BASE_URL}/mcp/analyze/stream",
        json={"text": text}
    ) as response:
        async for chunk in response.aiter_text():
            yield chunk

监控与运维

1. 健康检查

async def health_check(client):
    """健康检查"""
    checks = {
        "api": False,
        "database": False,
        "redis": False
    }
    
    try:
        response = await client.get(f"{BASE_URL}/api/health/detailed")
        data = response.json()
        checks["api"] = data.get("status") == "healthy"
        checks["database"] = data.get("database", {}).get("status") == "ok"
        checks["redis"] = data.get("redis", {}).get("status") == "ok"
    except Exception as e:
        print(f"健康检查失败: {e}")
    
    return checks

2. 使用统计

async def get_usage_stats(client, days=7):
    """获取使用统计"""
    response = await client.get(
        f"{BASE_URL}/developer/api/revenue-stats?days={days}"
    )
    stats = response.json()
    
    print(f"最近 {days} 天统计:")
    print(f"  总调用: {stats['usage_stats']['total_calls']}")
    print(f"  总 Token: {stats['usage_stats']['total_tokens']}")
    print(f"  总成本: ¥{stats['usage_stats']['total_cost']:.2f}")
    
    return stats

3. 告警设置

async def check_and_alert(client, thresholds):
    """检查指标并告警"""
    stats = await get_usage_stats(client, days=1)
    
    alerts = []
    
    # 检查调用量
    if stats['usage_stats']['total_calls'] > thresholds['max_daily_calls']:
        alerts.append(f"调用量超限: {stats['usage_stats']['total_calls']}")
    
    # 检查错误率
    error_rate = stats.get('error_rate', 0)
    if error_rate > thresholds['max_error_rate']:
        alerts.append(f"错误率过高: {error_rate:.2%}")
    
    # 发送告警
    if alerts:
        await send_alert(alerts)
    
    return alerts

4. 日志记录

import logging
from datetime import datetime

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('mbe_client.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger('mbe')

class LoggingMBEClient:
    def __init__(self, client):
        self.client = client
    
    async def chat(self, text, **kwargs):
        start = datetime.now()
        try:
            result = await self.client.chat(text, **kwargs)
            duration = (datetime.now() - start).total_seconds()
            logger.info(f"Chat success | duration={duration:.2f}s | text_len={len(text)}")
            return result
        except Exception as e:
            logger.error(f"Chat error | text={text[:50]}... | error={e}")
            raise

文档版本: v1.0 更新时间: 2026-01-24