用户列表 + 对话历史查询优化
版本: 1.0.0
更新日期: 2026-02-08
状态: ✅ 已完成
本文档记录用户列表查询与对话历史统计的优化方案和实施情况。
📚 目录
优化场景
问题描述
在用户列表 API (/api/users) 中,如果需要显示每个用户的对话历史统计信息(如对话数量、最后对话时间),可能会产生 N+1 查询问题:
# ❌ 潜在的 N+1 查询问题
users = await get_users() # 1 次查询
for user in users:
conversations = await get_user_conversations(user.id) # N 次查询
user.conversation_count = len(conversations)
当前实现
文件: shared/src/api/admin/users_api.py, private/platform/src/api_admin/users_api.py
当前状态:
- ✅ 用户列表查询已使用 LEFT JOIN 获取订阅信息(无 N+1 问题)
- ✅ 对话统计使用批量查询(已优化)
优化方案
方案:批量获取对话统计
使用子查询一次性获取所有用户的对话统计信息,避免循环查询。
优化前(潜在问题):
users = await get_users() # 1 次查询
for user in users:
# 如果在这里查询对话统计,会产生 N+1 查询
stats = await get_user_conversation_stats(user.id) # N 次查询
优化后:
# 1. 获取用户列表
users = await get_users() # 1 次查询
# 2. 批量获取所有用户的对话统计(1 次查询)
user_ids = [user.id for user in users]
stats_sql = """
SELECT
ud.user_id,
COUNT(DISTINCT bh.id) as conversation_count,
MAX(bh.created_at) as last_conversation_at
FROM user_devices ud
LEFT JOIN session_context sc ON ud.device_id = sc.device_id
LEFT JOIN behavior_history bh ON sc.session_id = bh.session_id
WHERE ud.user_id = ANY(:user_ids)
GROUP BY ud.user_id
"""
stats = await execute(stats_sql, {"user_ids": user_ids}) # 1 次查询
# 3. 合并数据
for user in users:
user.conversation_count = stats.get(user.id, 0)
总查询次数: 从 N+1 次减少到 2 次
实施详情
1. ✅ 用户列表 API 优化
文件:
shared/src/api/admin/users_api.py:215-248private/platform/src/api_admin/users_api.py:215-248
优化内容:
- 在获取用户列表后,批量查询所有用户的对话统计
- 使用
ANY(:user_ids)和GROUP BY一次性获取统计 - 将统计信息添加到用户对象中
代码实现:
# 优化:批量获取所有用户的对话统计(避免 N+1 查询)
user_ids = [str(row.user_id) for row in rows]
conversation_stats = {}
if user_ids:
# 使用子查询一次性获取所有用户的对话统计
stats_sql = text("""
SELECT
ud.user_id,
COUNT(DISTINCT bh.id) as conversation_count,
MAX(bh.created_at) as last_conversation_at
FROM user_devices ud
LEFT JOIN session_context sc ON ud.device_id = sc.device_id
LEFT JOIN behavior_history bh ON sc.session_id = bh.session_id
WHERE ud.user_id = ANY(:user_ids)
GROUP BY ud.user_id
""")
stats_result = await session.execute(
stats_sql, {"user_ids": user_ids}
)
for stat_row in stats_result.fetchall():
conversation_stats[str(stat_row.user_id)] = {
"conversation_count": stat_row.conversation_count or 0,
"last_conversation_at": (
stat_row.last_conversation_at.isoformat()
if stat_row.last_conversation_at
else None
),
}
# 合并统计信息到用户对象
for row in rows:
user_id_str = str(row.user_id)
stats = conversation_stats.get(user_id_str, {
"conversation_count": 0,
"last_conversation_at": None,
})
users.append({
# ... 用户基本信息 ...
"conversation_count": stats["conversation_count"],
"last_conversation_at": stats["last_conversation_at"],
})
性能提升
查询次数对比
| 场景 | 优化前 | 优化后 | 提升 |
|---|---|---|---|
| 用户列表(20 用户) | 21 次查询 | 2 次查询 | 90.5% |
| 用户列表(100 用户) | 101 次查询 | 2 次查询 | 98.0% |
响应时间对比
- 优化前: 100ms + N × 10ms(假设每次对话查询 10ms)
- 优化后: 100ms + 50ms(批量查询)
- 提升: 随着用户数量增加,性能提升显著
数据库压力
- 优化前: N+1 次数据库查询,高并发时压力大
- 优化后: 2 次数据库查询,压力显著降低
API 响应格式
优化后的 API 响应包含对话统计信息:
{
"success": true,
"users": [
{
"user_id": "uuid",
"email": "user@example.com",
"role": "user",
"status": "active",
"conversation_count": 42,
"last_conversation_at": "2026-02-08T10:30:00",
// ... 其他字段
}
],
"total": 100,
"pagination": {
"page": 1,
"page_size": 20,
"total": 100,
"total_pages": 5
}
}
相关优化
1. ✅ 用户详情 API
文件: shared/src/api/admin/users_api.py:332-416
状态: ✅ 已优化
- 使用 JOIN 一次性获取用户、订阅、设备信息
- 无 N+1 查询问题
2. ✅ 用户统计 API
文件: private/platform/src/users/router.py:1533-1746
状态: ✅ 已优化
- 使用 JOIN 一次性获取对话统计
- 无 N+1 查询问题
最佳实践
1. 批量查询原则
当需要为多个对象获取关联数据时,优先使用批量查询:
# ✅ 好的做法:批量查询
user_ids = [user.id for user in users]
stats = await batch_get_conversation_stats(user_ids)
# ❌ 不好的做法:循环查询
for user in users:
stats = await get_conversation_stats(user.id)
2. 使用 JOIN 或子查询
对于统计信息,使用 JOIN 或子查询一次性获取:
# ✅ 好的做法:使用 JOIN
SELECT u.*, COUNT(bh.id) as conversation_count
FROM users u
LEFT JOIN behavior_history bh ON u.id = bh.user_id
GROUP BY u.id
# ❌ 不好的做法:分开查询
SELECT * FROM users
# 然后循环查询每个用户的对话
3. 缓存统计结果
对于不经常变化的统计信息,可以考虑缓存:
# 缓存对话统计(TTL: 5 分钟)
cache_key = f"user:conversation_stats:{user_id}"
stats = await redis.get(cache_key)
if not stats:
stats = await calculate_stats(user_id)
await redis.setex(cache_key, 300, json.dumps(stats))
相关资源
文档版本: 1.0.0
最后更新: 2026-02-08