终端间聊天功能设计文档
概述
本文档设计 MBE 系统的终端间聊天功能,包括:
- 频道聊天室 - 多终端订阅同一频道,实现群组聊天
- 设备间消息转发 - P2P 消息传递,支持跨协议转发
- 会议模式 - 多人实时协作,支持 AI 参与
架构设计
整体架构
┌─────────────────────────────────────────────────────────────────────┐
│ 客户端层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 小智设备 │ │ Web 浏览器│ │ IoT 设备 │ │ 移动端 H5│ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
└───────┼─────────────┼─────────────┼─────────────┼───────────────────┘
│ MCP │ WebSocket │ MQTT │ WebSocket
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ 协议适配层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │MCPAdapter│ │WebSocket │ │MQTTAdapter│ │HTTPAdapter│ │
│ │ │ │ Adapter │ │ │ │ │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
└───────┼─────────────┼─────────────┼─────────────┼───────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ 聊天服务层 (新增) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ ChatService │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ChannelManager│ │MessageRouter│ │MeetingManager│ │ │
│ │ │ (频道管理) │ │ (消息路由) │ │ (会议管理) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ 存储层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Redis │ │PostgreSQL│ │ 文件存储 │ │
│ │(实时状态)│ │(持久化) │ │(附件) │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────────┘
一、频道聊天室
1.1 功能需求
| 功能 |
描述 |
| 创建频道 |
用户可创建公开/私有频道 |
| 加入/离开频道 |
终端可订阅/取消订阅频道 |
| 消息广播 |
消息发送到频道内所有成员 |
| 消息历史 |
支持历史消息查询 |
| 成员管理 |
查看/管理频道成员 |
| AI 参与 |
AI 可作为频道成员参与对话 |
1.2 数据模型
# src/chat/models.py
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import List, Dict, Optional, Any
class ChannelType(str, Enum):
PUBLIC = "public" # 公开频道,任何人可加入
PRIVATE = "private" # 私有频道,需要邀请
MEETING = "meeting" # 会议频道,临时创建
class MessageType(str, Enum):
TEXT = "text" # 文本消息
VOICE = "voice" # 语音消息
IMAGE = "image" # 图片消息
FILE = "file" # 文件消息
SYSTEM = "system" # 系统消息
AI_RESPONSE = "ai" # AI 回复
class MemberRole(str, Enum):
OWNER = "owner" # 频道创建者
ADMIN = "admin" # 管理员
MEMBER = "member" # 普通成员
AI_AGENT = "ai_agent" # AI 代理
@dataclass
class Channel:
"""频道数据模型"""
channel_id: str
name: str
description: str
channel_type: ChannelType
owner_id: str # 创建者 user_id
created_at: datetime
updated_at: datetime
settings: Dict[str, Any] = field(default_factory=dict)
# settings 包含:
# - max_members: int (最大成员数)
# - ai_enabled: bool (是否启用 AI)
# - ai_expert_id: str (绑定的专家 ID)
# - persist_history: bool (是否持久化历史)
# - history_days: int (历史保留天数)
@dataclass
class ChannelMember:
"""频道成员"""
channel_id: str
user_id: str
device_id: str
role: MemberRole
nickname: str
joined_at: datetime
last_active_at: datetime
is_online: bool
connection_ids: List[str] = field(default_factory=list)
@dataclass
class ChannelMessage:
"""频道消息"""
message_id: str
channel_id: str
sender_id: str # user_id 或 "ai:{expert_id}"
sender_device_id: str
sender_nickname: str
message_type: MessageType
content: str
metadata: Dict[str, Any] # 附加数据(引用、附件等)
created_at: datetime
edited_at: Optional[datetime] = None
is_deleted: bool = False
reply_to: Optional[str] = None # 回复的消息 ID
1.3 数据库表设计
-- migrations/add_chat_tables.sql
-- 频道表
CREATE TABLE IF NOT EXISTS chat_channels (
channel_id VARCHAR(64) PRIMARY KEY,
name VARCHAR(128) NOT NULL,
description TEXT,
channel_type VARCHAR(32) NOT NULL DEFAULT 'public',
owner_id VARCHAR(64) NOT NULL,
settings JSONB DEFAULT '{}',
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_channels_owner ON chat_channels(owner_id);
CREATE INDEX idx_channels_type ON chat_channels(channel_type);
-- 频道成员表
CREATE TABLE IF NOT EXISTS chat_channel_members (
id SERIAL PRIMARY KEY,
channel_id VARCHAR(64) NOT NULL REFERENCES chat_channels(channel_id) ON DELETE CASCADE,
user_id VARCHAR(64) NOT NULL,
device_id VARCHAR(64),
role VARCHAR(32) NOT NULL DEFAULT 'member',
nickname VARCHAR(64),
joined_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
last_active_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE(channel_id, user_id)
);
CREATE INDEX idx_members_channel ON chat_channel_members(channel_id);
CREATE INDEX idx_members_user ON chat_channel_members(user_id);
-- 频道消息表
CREATE TABLE IF NOT EXISTS chat_messages (
message_id VARCHAR(64) PRIMARY KEY,
channel_id VARCHAR(64) NOT NULL REFERENCES chat_channels(channel_id) ON DELETE CASCADE,
sender_id VARCHAR(64) NOT NULL,
sender_device_id VARCHAR(64),
sender_nickname VARCHAR(64),
message_type VARCHAR(32) NOT NULL DEFAULT 'text',
content TEXT NOT NULL,
metadata JSONB DEFAULT '{}',
reply_to VARCHAR(64),
is_deleted BOOLEAN DEFAULT false,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
edited_at TIMESTAMP WITH TIME ZONE
);
CREATE INDEX idx_messages_channel ON chat_messages(channel_id);
CREATE INDEX idx_messages_sender ON chat_messages(sender_id);
CREATE INDEX idx_messages_created ON chat_messages(channel_id, created_at DESC);
-- 消息已读状态表
CREATE TABLE IF NOT EXISTS chat_message_reads (
id SERIAL PRIMARY KEY,
message_id VARCHAR(64) NOT NULL REFERENCES chat_messages(message_id) ON DELETE CASCADE,
user_id VARCHAR(64) NOT NULL,
read_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE(message_id, user_id)
);
CREATE INDEX idx_reads_user ON chat_message_reads(user_id);
1.4 频道管理器实现
# src/chat/channel_manager.py
import asyncio
from datetime import datetime
from typing import Dict, List, Optional, Set
from loguru import logger
import uuid
from src.storage.redis import get_redis
from src.storage.db import get_db
from src.chat.models import (
Channel, ChannelMember, ChannelMessage,
ChannelType, MessageType, MemberRole
)
class ChannelManager:
"""频道管理器 - 管理频道生命周期和成员"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 内存缓存
self._channels: Dict[str, Channel] = {}
self._channel_members: Dict[str, Dict[str, ChannelMember]] = {} # channel_id -> {user_id -> member}
self._user_channels: Dict[str, Set[str]] = {} # user_id -> Set[channel_id]
# Redis keys
self._redis_prefix = "mbe:chat"
self._initialized = True
logger.info("ChannelManager initialized")
# ========== 频道管理 ==========
async def create_channel(
self,
name: str,
owner_id: str,
channel_type: ChannelType = ChannelType.PUBLIC,
description: str = "",
settings: Optional[Dict] = None
) -> Channel:
"""创建新频道"""
channel_id = f"ch_{uuid.uuid4().hex[:12]}"
now = datetime.utcnow()
channel = Channel(
channel_id=channel_id,
name=name,
description=description,
channel_type=channel_type,
owner_id=owner_id,
created_at=now,
updated_at=now,
settings=settings or {
"max_members": 100,
"ai_enabled": True,
"persist_history": True,
"history_days": 30
}
)
# 存储到数据库
async with get_db() as db:
await db.execute("""
INSERT INTO chat_channels (channel_id, name, description, channel_type, owner_id, settings)
VALUES ($1, $2, $3, $4, $5, $6)
""", channel_id, name, description, channel_type.value, owner_id, channel.settings)
# 缓存
self._channels[channel_id] = channel
self._channel_members[channel_id] = {}
# 创建者自动加入
await self.join_channel(channel_id, owner_id, role=MemberRole.OWNER)
logger.info(f"Channel created: {channel_id} ({name}) by {owner_id}")
return channel
async def get_channel(self, channel_id: str) -> Optional[Channel]:
"""获取频道信息"""
if channel_id in self._channels:
return self._channels[channel_id]
# 从数据库加载
async with get_db() as db:
row = await db.fetchrow(
"SELECT * FROM chat_channels WHERE channel_id = $1 AND is_active = true",
channel_id
)
if row:
channel = Channel(
channel_id=row['channel_id'],
name=row['name'],
description=row['description'] or "",
channel_type=ChannelType(row['channel_type']),
owner_id=row['owner_id'],
created_at=row['created_at'],
updated_at=row['updated_at'],
settings=row['settings'] or {}
)
self._channels[channel_id] = channel
return channel
return None
async def delete_channel(self, channel_id: str, user_id: str) -> bool:
"""删除频道(仅 owner 可操作)"""
channel = await self.get_channel(channel_id)
if not channel or channel.owner_id != user_id:
return False
async with get_db() as db:
await db.execute(
"UPDATE chat_channels SET is_active = false WHERE channel_id = $1",
channel_id
)
# 清理缓存
self._channels.pop(channel_id, None)
members = self._channel_members.pop(channel_id, {})
for user_id in members:
if user_id in self._user_channels:
self._user_channels[user_id].discard(channel_id)
logger.info(f"Channel deleted: {channel_id}")
return True
# ========== 成员管理 ==========
async def join_channel(
self,
channel_id: str,
user_id: str,
device_id: str = None,
nickname: str = None,
role: MemberRole = MemberRole.MEMBER
) -> Optional[ChannelMember]:
"""加入频道"""
channel = await self.get_channel(channel_id)
if not channel:
return None
# 检查是否已是成员
if channel_id in self._channel_members:
if user_id in self._channel_members[channel_id]:
return self._channel_members[channel_id][user_id]
# 检查成员数量限制
max_members = channel.settings.get("max_members", 100)
current_count = len(self._channel_members.get(channel_id, {}))
if current_count >= max_members:
logger.warning(f"Channel {channel_id} is full")
return None
now = datetime.utcnow()
member = ChannelMember(
channel_id=channel_id,
user_id=user_id,
device_id=device_id or "",
role=role,
nickname=nickname or user_id[:8],
joined_at=now,
last_active_at=now,
is_online=True,
connection_ids=[]
)
# 存储到数据库
async with get_db() as db:
await db.execute("""
INSERT INTO chat_channel_members (channel_id, user_id, device_id, role, nickname)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (channel_id, user_id) DO UPDATE SET
device_id = EXCLUDED.device_id,
last_active_at = CURRENT_TIMESTAMP
""", channel_id, user_id, device_id, role.value, member.nickname)
# 缓存
if channel_id not in self._channel_members:
self._channel_members[channel_id] = {}
self._channel_members[channel_id][user_id] = member
if user_id not in self._user_channels:
self._user_channels[user_id] = set()
self._user_channels[user_id].add(channel_id)
logger.info(f"User {user_id} joined channel {channel_id}")
return member
async def leave_channel(self, channel_id: str, user_id: str) -> bool:
"""离开频道"""
async with get_db() as db:
await db.execute(
"DELETE FROM chat_channel_members WHERE channel_id = $1 AND user_id = $2",
channel_id, user_id
)
# 清理缓存
if channel_id in self._channel_members:
self._channel_members[channel_id].pop(user_id, None)
if user_id in self._user_channels:
self._user_channels[user_id].discard(channel_id)
logger.info(f"User {user_id} left channel {channel_id}")
return True
async def get_channel_members(self, channel_id: str) -> List[ChannelMember]:
"""获取频道成员列表"""
if channel_id in self._channel_members:
return list(self._channel_members[channel_id].values())
# 从数据库加载
async with get_db() as db:
rows = await db.fetch(
"SELECT * FROM chat_channel_members WHERE channel_id = $1",
channel_id
)
members = []
for row in rows:
member = ChannelMember(
channel_id=row['channel_id'],
user_id=row['user_id'],
device_id=row['device_id'] or "",
role=MemberRole(row['role']),
nickname=row['nickname'] or row['user_id'][:8],
joined_at=row['joined_at'],
last_active_at=row['last_active_at'],
is_online=False,
connection_ids=[]
)
members.append(member)
# 缓存
self._channel_members[channel_id] = {m.user_id: m for m in members}
return members
async def get_user_channels(self, user_id: str) -> List[Channel]:
"""获取用户加入的所有频道"""
if user_id in self._user_channels:
channels = []
for channel_id in self._user_channels[user_id]:
channel = await self.get_channel(channel_id)
if channel:
channels.append(channel)
return channels
# 从数据库加载
async with get_db() as db:
rows = await db.fetch("""
SELECT c.* FROM chat_channels c
JOIN chat_channel_members m ON c.channel_id = m.channel_id
WHERE m.user_id = $1 AND c.is_active = true
""", user_id)
channels = []
channel_ids = set()
for row in rows:
channel = Channel(
channel_id=row['channel_id'],
name=row['name'],
description=row['description'] or "",
channel_type=ChannelType(row['channel_type']),
owner_id=row['owner_id'],
created_at=row['created_at'],
updated_at=row['updated_at'],
settings=row['settings'] or {}
)
channels.append(channel)
channel_ids.add(channel.channel_id)
self._user_channels[user_id] = channel_ids
return channels
# ========== 在线状态 ==========
async def update_member_online(
self,
channel_id: str,
user_id: str,
connection_id: str,
is_online: bool
):
"""更新成员在线状态"""
if channel_id not in self._channel_members:
return
if user_id not in self._channel_members[channel_id]:
return
member = self._channel_members[channel_id][user_id]
if is_online:
if connection_id not in member.connection_ids:
member.connection_ids.append(connection_id)
member.is_online = True
else:
if connection_id in member.connection_ids:
member.connection_ids.remove(connection_id)
member.is_online = len(member.connection_ids) > 0
member.last_active_at = datetime.utcnow()
# 更新 Redis
redis = await get_redis()
if member.is_online:
await redis.sadd(f"{self._redis_prefix}:online:{channel_id}", user_id)
else:
await redis.srem(f"{self._redis_prefix}:online:{channel_id}", user_id)
async def get_online_members(self, channel_id: str) -> List[str]:
"""获取在线成员 ID 列表"""
redis = await get_redis()
members = await redis.smembers(f"{self._redis_prefix}:online:{channel_id}")
return list(members) if members else []
def get_channel_manager() -> ChannelManager:
"""获取频道管理器单例"""
return ChannelManager()
1.5 API 端点
# src/api/chat.py
from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List, Optional
from pydantic import BaseModel
from src.users.auth import get_current_user
from src.chat.channel_manager import get_channel_manager
from src.chat.message_service import get_message_service
from src.chat.models import ChannelType
router = APIRouter(prefix="/api/chat", tags=["Chat"])
# ========== 请求/响应模型 ==========
class CreateChannelRequest(BaseModel):
name: str
description: str = ""
channel_type: ChannelType = ChannelType.PUBLIC
ai_enabled: bool = True
ai_expert_id: Optional[str] = None
class ChannelResponse(BaseModel):
channel_id: str
name: str
description: str
channel_type: str
owner_id: str
member_count: int
ai_enabled: bool
class SendMessageRequest(BaseModel):
content: str
message_type: str = "text"
reply_to: Optional[str] = None
metadata: dict = {}
# ========== 频道管理 API ==========
@router.post("/channels", response_model=ChannelResponse)
async def create_channel(
request: CreateChannelRequest,
user = Depends(get_current_user)
):
"""创建频道"""
manager = get_channel_manager()
settings = {
"ai_enabled": request.ai_enabled,
"ai_expert_id": request.ai_expert_id,
"max_members": 100,
"persist_history": True
}
channel = await manager.create_channel(
name=request.name,
owner_id=user.user_id,
channel_type=request.channel_type,
description=request.description,
settings=settings
)
return ChannelResponse(
channel_id=channel.channel_id,
name=channel.name,
description=channel.description,
channel_type=channel.channel_type.value,
owner_id=channel.owner_id,
member_count=1,
ai_enabled=settings["ai_enabled"]
)
@router.get("/channels", response_model=List[ChannelResponse])
async def list_my_channels(user = Depends(get_current_user)):
"""获取我的频道列表"""
manager = get_channel_manager()
channels = await manager.get_user_channels(user.user_id)
result = []
for ch in channels:
members = await manager.get_channel_members(ch.channel_id)
result.append(ChannelResponse(
channel_id=ch.channel_id,
name=ch.name,
description=ch.description,
channel_type=ch.channel_type.value,
owner_id=ch.owner_id,
member_count=len(members),
ai_enabled=ch.settings.get("ai_enabled", False)
))
return result
@router.post("/channels/{channel_id}/join")
async def join_channel(
channel_id: str,
nickname: Optional[str] = None,
user = Depends(get_current_user)
):
"""加入频道"""
manager = get_channel_manager()
member = await manager.join_channel(
channel_id=channel_id,
user_id=user.user_id,
nickname=nickname
)
if not member:
raise HTTPException(status_code=404, detail="Channel not found or full")
return {"status": "joined", "channel_id": channel_id}
@router.post("/channels/{channel_id}/leave")
async def leave_channel(
channel_id: str,
user = Depends(get_current_user)
):
"""离开频道"""
manager = get_channel_manager()
await manager.leave_channel(channel_id, user.user_id)
return {"status": "left", "channel_id": channel_id}
# ========== 消息 API ==========
@router.post("/channels/{channel_id}/messages")
async def send_message(
channel_id: str,
request: SendMessageRequest,
user = Depends(get_current_user)
):
"""发送消息到频道"""
service = get_message_service()
message = await service.send_to_channel(
channel_id=channel_id,
sender_id=user.user_id,
content=request.content,
message_type=request.message_type,
reply_to=request.reply_to,
metadata=request.metadata
)
if not message:
raise HTTPException(status_code=403, detail="Not a channel member")
return {"message_id": message.message_id, "created_at": message.created_at}
@router.get("/channels/{channel_id}/messages")
async def get_messages(
channel_id: str,
limit: int = Query(50, le=100),
before: Optional[str] = None,
user = Depends(get_current_user)
):
"""获取频道消息历史"""
service = get_message_service()
messages = await service.get_channel_history(
channel_id=channel_id,
limit=limit,
before_message_id=before
)
return {"messages": messages}
二、设备间消息转发
2.1 功能需求
| 功能 |
描述 |
| P2P 消息 |
设备 A 直接发送消息到设备 B |
| 跨协议转发 |
WebSocket 设备可发消息给 MQTT 设备 |
| 离线消息 |
目标设备离线时暂存消息 |
| 消息确认 |
可选的送达/已读回执 |
| 消息路由 |
基于 device_id 或 user_id 路由 |
2.2 消息路由器设计
# src/chat/message_router.py
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable, Any
from loguru import logger
import json
import uuid
from src.storage.redis import get_redis
from src.terminals.manager import get_terminal_manager
from src.api.websocket import manager as ws_manager
from src.chat.models import MessageType
class MessageRouter:
"""消息路由器 - 处理设备间消息转发"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 消息队列(离线消息)
self._offline_queue: Dict[str, List[Dict]] = {}
# 消息回调(用于扩展处理)
self._message_handlers: Dict[str, Callable] = {}
# 配置
self._offline_ttl = timedelta(hours=24) # 离线消息保留时间
self._max_offline_messages = 100 # 每设备最大离线消息数
self._redis_prefix = "mbe:router"
self._initialized = True
logger.info("MessageRouter initialized")
# ========== 消息发送 ==========
async def send_to_device(
self,
target_device_id: str,
message: Dict,
sender_device_id: str = None,
require_ack: bool = False
) -> Dict:
"""发送消息到指定设备"""
message_id = message.get("message_id") or f"msg_{uuid.uuid4().hex[:12]}"
# 构建完整消息
full_message = {
"message_id": message_id,
"type": "direct_message",
"from_device": sender_device_id,
"to_device": target_device_id,
"content": message.get("content"),
"message_type": message.get("message_type", "text"),
"metadata": message.get("metadata", {}),
"timestamp": datetime.utcnow().isoformat(),
"require_ack": require_ack
}
# 尝试发送
delivered = await self._deliver_message(target_device_id, full_message)
if not delivered:
# 存储离线消息
await self._store_offline_message(target_device_id, full_message)
return {
"message_id": message_id,
"status": "queued",
"reason": "device_offline"
}
return {
"message_id": message_id,
"status": "delivered"
}
async def send_to_user(
self,
target_user_id: str,
message: Dict,
sender_id: str = None,
exclude_device: str = None
) -> Dict:
"""发送消息到用户的所有设备"""
message_id = message.get("message_id") or f"msg_{uuid.uuid4().hex[:12]}"
# 获取用户的所有连接
delivered_count = 0
# 1. WebSocket 连接
if target_user_id in ws_manager.user_connections:
for conn_id in ws_manager.user_connections[target_user_id]:
if exclude_device:
metadata = ws_manager.connection_metadata.get(conn_id, {})
if metadata.get("device_id") == exclude_device:
continue
try:
await ws_manager.send_personal(conn_id, {
"type": "direct_message",
"message_id": message_id,
"from": sender_id,
"content": message.get("content"),
"message_type": message.get("message_type", "text"),
"metadata": message.get("metadata", {}),
"timestamp": datetime.utcnow().isoformat()
})
delivered_count += 1
except Exception as e:
logger.error(f"Failed to send to connection {conn_id}: {e}")
# 2. 终端管理器连接
terminal_manager = get_terminal_manager()
connections = terminal_manager.get_connections_by_user(target_user_id)
for conn in connections:
if exclude_device and conn.device_id == exclude_device:
continue
try:
from src.terminals.adapters.base import AdapterResponse
response = AdapterResponse(
success=True,
message_id=message_id,
content=message.get("content"),
metadata={
"type": "direct_message",
"from": sender_id,
**message.get("metadata", {})
}
)
await terminal_manager.send_to_device(conn.device_id, response)
delivered_count += 1
except Exception as e:
logger.error(f"Failed to send to device {conn.device_id}: {e}")
return {
"message_id": message_id,
"status": "delivered" if delivered_count > 0 else "no_active_connections",
"delivered_to": delivered_count
}
async def broadcast_to_channel(
self,
channel_id: str,
message: Dict,
exclude_user: str = None
) -> int:
"""广播消息到频道所有成员"""
from src.chat.channel_manager import get_channel_manager
manager = get_channel_manager()
members = await manager.get_channel_members(channel_id)
delivered_count = 0
for member in members:
if exclude_user and member.user_id == exclude_user:
continue
result = await self.send_to_user(
target_user_id=member.user_id,
message=message,
sender_id=message.get("sender_id")
)
if result["status"] == "delivered":
delivered_count += result.get("delivered_to", 1)
return delivered_count
# ========== 内部方法 ==========
async def _deliver_message(self, device_id: str, message: Dict) -> bool:
"""尝试投递消息到设备"""
# 1. 尝试 WebSocket
for conn_id, metadata in ws_manager.connection_metadata.items():
if metadata.get("device_id") == device_id:
try:
await ws_manager.send_personal(conn_id, message)
return True
except:
pass
# 2. 尝试终端管理器
terminal_manager = get_terminal_manager()
connections = terminal_manager.get_all_connections()
for conn in connections:
if conn.device_id == device_id and conn.is_connected:
try:
from src.terminals.adapters.base import AdapterResponse
response = AdapterResponse(
success=True,
message_id=message["message_id"],
content=message.get("content"),
metadata=message
)
success = await terminal_manager.send_to_device(device_id, response)
if success:
return True
except Exception as e:
logger.error(f"Terminal delivery failed: {e}")
return False
async def _store_offline_message(self, device_id: str, message: Dict):
"""存储离线消息"""
redis = await get_redis()
key = f"{self._redis_prefix}:offline:{device_id}"
# 添加到队列
await redis.lpush(key, json.dumps(message))
# 限制队列长度
await redis.ltrim(key, 0, self._max_offline_messages - 1)
# 设置过期时间
await redis.expire(key, int(self._offline_ttl.total_seconds()))
logger.debug(f"Stored offline message for {device_id}: {message['message_id']}")
async def get_offline_messages(self, device_id: str) -> List[Dict]:
"""获取并清除设备的离线消息"""
redis = await get_redis()
key = f"{self._redis_prefix}:offline:{device_id}"
# 获取所有消息
messages = await redis.lrange(key, 0, -1)
if messages:
# 清除队列
await redis.delete(key)
# 解析消息
return [json.loads(m) for m in reversed(messages)]
return []
async def send_ack(self, message_id: str, device_id: str, ack_type: str = "delivered"):
"""发送消息确认"""
redis = await get_redis()
# 获取原消息信息
key = f"{self._redis_prefix}:pending:{message_id}"
message_info = await redis.get(key)
if message_info:
info = json.loads(message_info)
sender_device = info.get("from_device")
if sender_device:
# 发送确认给发送方
await self.send_to_device(
target_device_id=sender_device,
message={
"type": "message_ack",
"message_id": message_id,
"ack_type": ack_type,
"ack_by": device_id,
"timestamp": datetime.utcnow().isoformat()
}
)
# 清除 pending 记录
await redis.delete(key)
def get_message_router() -> MessageRouter:
"""获取消息路由器单例"""
return MessageRouter()
2.3 WebSocket 消息处理扩展
# src/chat/websocket_handlers.py
# 添加到现有的 WebSocket 消息处理器
class ChatMessageHandler:
"""聊天消息处理器"""
def __init__(self):
self._router = get_message_router()
self._channel_manager = get_channel_manager()
async def handle_direct_message(self, connection_id: str, message: Dict) -> Dict:
"""处理直接消息(设备间转发)"""
target = message.get("target")
content = message.get("content")
target_type = message.get("target_type", "device") # device 或 user
if not target or not content:
return {"type": "error", "message": "Missing target or content"}
# 获取发送者信息
sender_info = ws_manager.connection_metadata.get(connection_id, {})
sender_device = sender_info.get("device_id")
sender_user = sender_info.get("user_id")
if target_type == "device":
result = await self._router.send_to_device(
target_device_id=target,
message={
"content": content,
"message_type": message.get("message_type", "text"),
"metadata": message.get("metadata", {})
},
sender_device_id=sender_device,
require_ack=message.get("require_ack", False)
)
else: # user
result = await self._router.send_to_user(
target_user_id=target,
message={
"content": content,
"message_type": message.get("message_type", "text"),
"metadata": message.get("metadata", {})
},
sender_id=sender_user
)
return {
"type": "send_result",
"message_id": result["message_id"],
"status": result["status"]
}
async def handle_channel_message(self, connection_id: str, message: Dict) -> Dict:
"""处理频道消息"""
channel_id = message.get("channel_id")
content = message.get("content")
if not channel_id or not content:
return {"type": "error", "message": "Missing channel_id or content"}
sender_info = ws_manager.connection_metadata.get(connection_id, {})
sender_user = sender_info.get("user_id")
sender_device = sender_info.get("device_id")
# 检查是否是频道成员
members = await self._channel_manager.get_channel_members(channel_id)
if not any(m.user_id == sender_user for m in members):
return {"type": "error", "message": "Not a channel member"}
# 获取发送者昵称
member = next((m for m in members if m.user_id == sender_user), None)
nickname = member.nickname if member else sender_user[:8]
# 广播到频道
msg = {
"type": "channel_message",
"channel_id": channel_id,
"sender_id": sender_user,
"sender_device": sender_device,
"sender_nickname": nickname,
"content": content,
"message_type": message.get("message_type", "text"),
"metadata": message.get("metadata", {}),
"timestamp": datetime.utcnow().isoformat()
}
delivered = await self._router.broadcast_to_channel(
channel_id=channel_id,
message=msg,
exclude_user=sender_user # 不发给自己
)
# 存储消息历史
from src.chat.message_service import get_message_service
service = get_message_service()
await service.save_channel_message(channel_id, msg)
return {
"type": "send_result",
"channel_id": channel_id,
"delivered_to": delivered
}
三、会议模式
3.1 功能需求
| 功能 |
描述 |
| 创建会议 |
创建临时会议房间 |
| 邀请参与者 |
邀请设备/用户加入会议 |
| 实时语音/文字 |
支持语音和文字混合 |
| AI 助手 |
AI 可参与会议并提供建议 |
| 会议记录 |
自动记录会议内容 |
| 会议总结 |
会后生成 AI 总结 |
| 屏幕共享 |
(未来)支持屏幕共享 |
3.2 会议管理器设计
# src/chat/meeting_manager.py
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set
from enum import Enum
from dataclasses import dataclass, field
from loguru import logger
import uuid
from src.storage.redis import get_redis
from src.chat.message_router import get_message_router
from src.chat.channel_manager import get_channel_manager
from src.chat.models import ChannelType, MemberRole
class MeetingStatus(str, Enum):
SCHEDULED = "scheduled" # 已安排
ACTIVE = "active" # 进行中
PAUSED = "paused" # 暂停
ENDED = "ended" # 已结束
class ParticipantRole(str, Enum):
HOST = "host" # 主持人
CO_HOST = "co_host" # 联合主持
PARTICIPANT = "participant" # 参与者
AI_ASSISTANT = "ai_assistant" # AI 助手
@dataclass
class MeetingParticipant:
"""会议参与者"""
user_id: str
device_id: str
nickname: str
role: ParticipantRole
joined_at: datetime
is_active: bool = True
is_muted: bool = False
connection_ids: List[str] = field(default_factory=list)
@dataclass
class Meeting:
"""会议数据模型"""
meeting_id: str
title: str
description: str
host_id: str # 主持人 user_id
status: MeetingStatus
channel_id: str # 关联的频道 ID
created_at: datetime
started_at: Optional[datetime] = None
ended_at: Optional[datetime] = None
scheduled_start: Optional[datetime] = None
scheduled_duration: int = 60 # 分钟
settings: Dict = field(default_factory=dict)
# settings 包含:
# - ai_enabled: bool
# - ai_expert_id: str
# - auto_record: bool
# - max_participants: int
# - require_approval: bool (加入需要批准)
@dataclass
class MeetingMessage:
"""会议消息"""
message_id: str
meeting_id: str
sender_id: str
sender_nickname: str
message_type: str # text, voice, system, ai
content: str
timestamp: datetime
metadata: Dict = field(default_factory=dict)
class MeetingManager:
"""会议管理器"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 活跃会议缓存
self._meetings: Dict[str, Meeting] = {}
self._participants: Dict[str, Dict[str, MeetingParticipant]] = {} # meeting_id -> {user_id -> participant}
self._meeting_messages: Dict[str, List[MeetingMessage]] = {} # meeting_id -> messages
# 关联
self._channel_manager = get_channel_manager()
self._message_router = get_message_router()
self._redis_prefix = "mbe:meeting"
self._initialized = True
logger.info("MeetingManager initialized")
# ========== 会议管理 ==========
async def create_meeting(
self,
title: str,
host_id: str,
description: str = "",
scheduled_start: datetime = None,
duration_minutes: int = 60,
ai_enabled: bool = True,
ai_expert_id: str = None
) -> Meeting:
"""创建会议"""
meeting_id = f"meet_{uuid.uuid4().hex[:12]}"
now = datetime.utcnow()
# 创建关联频道
channel = await self._channel_manager.create_channel(
name=f"Meeting: {title}",
owner_id=host_id,
channel_type=ChannelType.MEETING,
description=description,
settings={
"meeting_id": meeting_id,
"ai_enabled": ai_enabled,
"ai_expert_id": ai_expert_id
}
)
meeting = Meeting(
meeting_id=meeting_id,
title=title,
description=description,
host_id=host_id,
status=MeetingStatus.SCHEDULED if scheduled_start else MeetingStatus.ACTIVE,
channel_id=channel.channel_id,
created_at=now,
started_at=now if not scheduled_start else None,
scheduled_start=scheduled_start,
scheduled_duration=duration_minutes,
settings={
"ai_enabled": ai_enabled,
"ai_expert_id": ai_expert_id,
"auto_record": True,
"max_participants": 50,
"require_approval": False
}
)
# 缓存
self._meetings[meeting_id] = meeting
self._participants[meeting_id] = {}
self._meeting_messages[meeting_id] = []
# 主持人自动加入
await self.join_meeting(meeting_id, host_id, role=ParticipantRole.HOST)
# 如果启用 AI,添加 AI 助手
if ai_enabled:
await self._add_ai_assistant(meeting_id, ai_expert_id)
# 存储到 Redis
await self._save_meeting_to_redis(meeting)
logger.info(f"Meeting created: {meeting_id} ({title}) by {host_id}")
return meeting
async def get_meeting(self, meeting_id: str) -> Optional[Meeting]:
"""获取会议信息"""
if meeting_id in self._meetings:
return self._meetings[meeting_id]
# 从 Redis 加载
return await self._load_meeting_from_redis(meeting_id)
async def start_meeting(self, meeting_id: str, user_id: str) -> bool:
"""开始会议"""
meeting = await self.get_meeting(meeting_id)
if not meeting:
return False
# 权限检查
if meeting.host_id != user_id:
participant = self._participants.get(meeting_id, {}).get(user_id)
if not participant or participant.role not in [ParticipantRole.HOST, ParticipantRole.CO_HOST]:
return False
meeting.status = MeetingStatus.ACTIVE
meeting.started_at = datetime.utcnow()
await self._save_meeting_to_redis(meeting)
# 广播会议开始
await self._broadcast_meeting_event(meeting_id, {
"type": "meeting_started",
"meeting_id": meeting_id,
"started_at": meeting.started_at.isoformat()
})
return True
async def end_meeting(self, meeting_id: str, user_id: str) -> Optional[Dict]:
"""结束会议"""
meeting = await self.get_meeting(meeting_id)
if not meeting:
return None
# 权限检查
if meeting.host_id != user_id:
return None
meeting.status = MeetingStatus.ENDED
meeting.ended_at = datetime.utcnow()
# 广播会议结束
await self._broadcast_meeting_event(meeting_id, {
"type": "meeting_ended",
"meeting_id": meeting_id,
"ended_at": meeting.ended_at.isoformat()
})
# 生成会议总结
summary = await self._generate_meeting_summary(meeting_id)
# 清理
await self._cleanup_meeting(meeting_id)
return {
"meeting_id": meeting_id,
"duration_minutes": (meeting.ended_at - meeting.started_at).total_seconds() / 60,
"summary": summary
}
# ========== 参与者管理 ==========
async def join_meeting(
self,
meeting_id: str,
user_id: str,
device_id: str = None,
nickname: str = None,
role: ParticipantRole = ParticipantRole.PARTICIPANT
) -> Optional[MeetingParticipant]:
"""加入会议"""
meeting = await self.get_meeting(meeting_id)
if not meeting or meeting.status == MeetingStatus.ENDED:
return None
# 检查人数限制
max_participants = meeting.settings.get("max_participants", 50)
current_count = len(self._participants.get(meeting_id, {}))
if current_count >= max_participants:
logger.warning(f"Meeting {meeting_id} is full")
return None
now = datetime.utcnow()
participant = MeetingParticipant(
user_id=user_id,
device_id=device_id or "",
nickname=nickname or user_id[:8],
role=role,
joined_at=now,
is_active=True
)
if meeting_id not in self._participants:
self._participants[meeting_id] = {}
self._participants[meeting_id][user_id] = participant
# 加入关联频道
await self._channel_manager.join_channel(
channel_id=meeting.channel_id,
user_id=user_id,
device_id=device_id,
nickname=nickname,
role=MemberRole.MEMBER
)
# 广播加入事件
await self._broadcast_meeting_event(meeting_id, {
"type": "participant_joined",
"user_id": user_id,
"nickname": participant.nickname,
"role": role.value
}, exclude_user=user_id)
# 发送参与者列表给新加入者
participants_list = [
{
"user_id": p.user_id,
"nickname": p.nickname,
"role": p.role.value,
"is_active": p.is_active
}
for p in self._participants[meeting_id].values()
]
await self._message_router.send_to_user(
target_user_id=user_id,
message={
"type": "meeting_info",
"meeting_id": meeting_id,
"title": meeting.title,
"host_id": meeting.host_id,
"participants": participants_list,
"ai_enabled": meeting.settings.get("ai_enabled", False)
}
)
logger.info(f"User {user_id} joined meeting {meeting_id}")
return participant
async def leave_meeting(self, meeting_id: str, user_id: str) -> bool:
"""离开会议"""
if meeting_id not in self._participants:
return False
if user_id not in self._participants[meeting_id]:
return False
participant = self._participants[meeting_id].pop(user_id)
# 离开关联频道
meeting = self._meetings.get(meeting_id)
if meeting:
await self._channel_manager.leave_channel(meeting.channel_id, user_id)
# 广播离开事件
await self._broadcast_meeting_event(meeting_id, {
"type": "participant_left",
"user_id": user_id,
"nickname": participant.nickname
})
logger.info(f"User {user_id} left meeting {meeting_id}")
return True
async def get_participants(self, meeting_id: str) -> List[MeetingParticipant]:
"""获取会议参与者列表"""
return list(self._participants.get(meeting_id, {}).values())
# ========== 消息处理 ==========
async def send_message(
self,
meeting_id: str,
sender_id: str,
content: str,
message_type: str = "text",
metadata: Dict = None
) -> Optional[MeetingMessage]:
"""发送会议消息"""
meeting = await self.get_meeting(meeting_id)
if not meeting or meeting.status != MeetingStatus.ACTIVE:
return None
# 检查是否是参与者
if sender_id not in self._participants.get(meeting_id, {}):
return None
participant = self._participants[meeting_id][sender_id]
message = MeetingMessage(
message_id=f"mmsg_{uuid.uuid4().hex[:12]}",
meeting_id=meeting_id,
sender_id=sender_id,
sender_nickname=participant.nickname,
message_type=message_type,
content=content,
timestamp=datetime.utcnow(),
metadata=metadata or {}
)
# 存储消息
if meeting_id not in self._meeting_messages:
self._meeting_messages[meeting_id] = []
self._meeting_messages[meeting_id].append(message)
# 广播消息
await self._broadcast_meeting_event(meeting_id, {
"type": "meeting_message",
"message_id": message.message_id,
"sender_id": sender_id,
"sender_nickname": participant.nickname,
"message_type": message_type,
"content": content,
"timestamp": message.timestamp.isoformat(),
"metadata": metadata or {}
}, exclude_user=sender_id)
# 如果启用 AI,检查是否需要 AI 响应
if meeting.settings.get("ai_enabled"):
await self._check_ai_response(meeting_id, message)
return message
# ========== AI 助手 ==========
async def _add_ai_assistant(self, meeting_id: str, expert_id: str = None):
"""添加 AI 助手到会议"""
ai_user_id = f"ai:{expert_id or 'default'}"
participant = MeetingParticipant(
user_id=ai_user_id,
device_id="ai_assistant",
nickname="AI 助手",
role=ParticipantRole.AI_ASSISTANT,
joined_at=datetime.utcnow()
)
if meeting_id not in self._participants:
self._participants[meeting_id] = {}
self._participants[meeting_id][ai_user_id] = participant
logger.info(f"AI assistant added to meeting {meeting_id}")
async def _check_ai_response(self, meeting_id: str, message: MeetingMessage):
"""检查是否需要 AI 响应"""
# 检查是否 @ AI 或包含问题
content = message.content.lower()
should_respond = (
"@ai" in content or
"ai助手" in content or
content.endswith("?") or
content.endswith("?")
)
if should_respond:
await self._generate_ai_response(meeting_id, message)
async def _generate_ai_response(self, meeting_id: str, trigger_message: MeetingMessage):
"""生成 AI 响应"""
meeting = self._meetings.get(meeting_id)
if not meeting:
return
expert_id = meeting.settings.get("ai_expert_id")
# 获取会议上下文
recent_messages = self._meeting_messages.get(meeting_id, [])[-10:]
context = "\n".join([
f"{m.sender_nickname}: {m.content}"
for m in recent_messages
])
# 调用 MBE 引擎生成响应
try:
from src.core.engine import get_engine
engine = get_engine()
result = await engine.analyze_behavior(
query=f"会议讨论内容:\n{context}\n\n请根据讨论内容提供有价值的见解或建议。",
device_id="ai_assistant",
fast_mode=True,
expert_id=expert_id
)
if result and result.get("response"):
ai_response = result["response"]
# 发送 AI 响应
ai_message = MeetingMessage(
message_id=f"mmsg_{uuid.uuid4().hex[:12]}",
meeting_id=meeting_id,
sender_id=f"ai:{expert_id or 'default'}",
sender_nickname="AI 助手",
message_type="ai",
content=ai_response,
timestamp=datetime.utcnow(),
metadata={"expert_id": expert_id, "triggered_by": trigger_message.message_id}
)
self._meeting_messages[meeting_id].append(ai_message)
# 广播 AI 响应
await self._broadcast_meeting_event(meeting_id, {
"type": "meeting_message",
"message_id": ai_message.message_id,
"sender_id": ai_message.sender_id,
"sender_nickname": "AI 助手",
"message_type": "ai",
"content": ai_response,
"timestamp": ai_message.timestamp.isoformat(),
"metadata": ai_message.metadata
})
except Exception as e:
logger.error(f"AI response generation failed: {e}")
async def _generate_meeting_summary(self, meeting_id: str) -> str:
"""生成会议总结"""
messages = self._meeting_messages.get(meeting_id, [])
if not messages:
return "会议没有消息记录。"
# 构建会议内容
content = "\n".join([
f"[{m.timestamp.strftime('%H:%M')}] {m.sender_nickname}: {m.content}"
for m in messages
if m.message_type in ["text", "voice"]
])
try:
from src.core.engine import get_engine
engine = get_engine()
result = await engine.analyze_behavior(
query=f"""请为以下会议内容生成简洁的会议总结,包括:
1. 主要讨论话题
2. 关键决定/结论
3. 待办事项(如有)
会议内容:
{content}""",
device_id="ai_assistant",
fast_mode=True
)
return result.get("response", "无法生成会议总结。")
except Exception as e:
logger.error(f"Meeting summary generation failed: {e}")
return f"会议持续了 {len(messages)} 条消息。"
# ========== 内部方法 ==========
async def _broadcast_meeting_event(
self,
meeting_id: str,
event: Dict,
exclude_user: str = None
):
"""广播会议事件到所有参与者"""
participants = self._participants.get(meeting_id, {})
for user_id, participant in participants.items():
if exclude_user and user_id == exclude_user:
continue
# 跳过 AI 助手
if participant.role == ParticipantRole.AI_ASSISTANT:
continue
await self._message_router.send_to_user(
target_user_id=user_id,
message=event,
sender_id="system"
)
async def _save_meeting_to_redis(self, meeting: Meeting):
"""保存会议到 Redis"""
redis = await get_redis()
key = f"{self._redis_prefix}:{meeting.meeting_id}"
data = {
"meeting_id": meeting.meeting_id,
"title": meeting.title,
"description": meeting.description,
"host_id": meeting.host_id,
"status": meeting.status.value,
"channel_id": meeting.channel_id,
"created_at": meeting.created_at.isoformat(),
"started_at": meeting.started_at.isoformat() if meeting.started_at else None,
"settings": meeting.settings
}
await redis.set(key, json.dumps(data), ex=86400) # 24小时过期
async def _load_meeting_from_redis(self, meeting_id: str) -> Optional[Meeting]:
"""从 Redis 加载会议"""
redis = await get_redis()
key = f"{self._redis_prefix}:{meeting_id}"
data = await redis.get(key)
if not data:
return None
info = json.loads(data)
meeting = Meeting(
meeting_id=info["meeting_id"],
title=info["title"],
description=info["description"],
host_id=info["host_id"],
status=MeetingStatus(info["status"]),
channel_id=info["channel_id"],
created_at=datetime.fromisoformat(info["created_at"]),
started_at=datetime.fromisoformat(info["started_at"]) if info.get("started_at") else None,
settings=info.get("settings", {})
)
self._meetings[meeting_id] = meeting
return meeting
async def _cleanup_meeting(self, meeting_id: str):
"""清理会议资源"""
# 保存会议记录到数据库(可选)
# ...
# 清理内存缓存
self._meetings.pop(meeting_id, None)
self._participants.pop(meeting_id, None)
self._meeting_messages.pop(meeting_id, None)
# 清理 Redis
redis = await get_redis()
await redis.delete(f"{self._redis_prefix}:{meeting_id}")
def get_meeting_manager() -> MeetingManager:
"""获取会议管理器单例"""
return MeetingManager()
3.3 会议 API 端点
# src/api/meeting.py
from fastapi import APIRouter, Depends, HTTPException
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime
from src.users.auth import get_current_user
from src.chat.meeting_manager import get_meeting_manager
router = APIRouter(prefix="/api/meetings", tags=["Meetings"])
# ========== 请求模型 ==========
class CreateMeetingRequest(BaseModel):
title: str
description: str = ""
scheduled_start: Optional[datetime] = None
duration_minutes: int = 60
ai_enabled: bool = True
ai_expert_id: Optional[str] = None
class SendMeetingMessageRequest(BaseModel):
content: str
message_type: str = "text"
# ========== API 端点 ==========
@router.post("/")
async def create_meeting(
request: CreateMeetingRequest,
user = Depends(get_current_user)
):
"""创建会议"""
manager = get_meeting_manager()
meeting = await manager.create_meeting(
title=request.title,
host_id=user.user_id,
description=request.description,
scheduled_start=request.scheduled_start,
duration_minutes=request.duration_minutes,
ai_enabled=request.ai_enabled,
ai_expert_id=request.ai_expert_id
)
return {
"meeting_id": meeting.meeting_id,
"title": meeting.title,
"channel_id": meeting.channel_id,
"status": meeting.status.value,
"join_url": f"/api/meetings/{meeting.meeting_id}/join"
}
@router.post("/{meeting_id}/join")
async def join_meeting(
meeting_id: str,
nickname: Optional[str] = None,
user = Depends(get_current_user)
):
"""加入会议"""
manager = get_meeting_manager()
participant = await manager.join_meeting(
meeting_id=meeting_id,
user_id=user.user_id,
nickname=nickname
)
if not participant:
raise HTTPException(status_code=404, detail="Meeting not found or full")
return {
"status": "joined",
"meeting_id": meeting_id,
"role": participant.role.value
}
@router.post("/{meeting_id}/leave")
async def leave_meeting(
meeting_id: str,
user = Depends(get_current_user)
):
"""离开会议"""
manager = get_meeting_manager()
await manager.leave_meeting(meeting_id, user.user_id)
return {"status": "left"}
@router.post("/{meeting_id}/start")
async def start_meeting(
meeting_id: str,
user = Depends(get_current_user)
):
"""开始会议"""
manager = get_meeting_manager()
success = await manager.start_meeting(meeting_id, user.user_id)
if not success:
raise HTTPException(status_code=403, detail="Cannot start meeting")
return {"status": "started"}
@router.post("/{meeting_id}/end")
async def end_meeting(
meeting_id: str,
user = Depends(get_current_user)
):
"""结束会议"""
manager = get_meeting_manager()
result = await manager.end_meeting(meeting_id, user.user_id)
if not result:
raise HTTPException(status_code=403, detail="Cannot end meeting")
return result
@router.post("/{meeting_id}/messages")
async def send_meeting_message(
meeting_id: str,
request: SendMeetingMessageRequest,
user = Depends(get_current_user)
):
"""发送会议消息"""
manager = get_meeting_manager()
message = await manager.send_message(
meeting_id=meeting_id,
sender_id=user.user_id,
content=request.content,
message_type=request.message_type
)
if not message:
raise HTTPException(status_code=403, detail="Cannot send message")
return {
"message_id": message.message_id,
"timestamp": message.timestamp.isoformat()
}
@router.get("/{meeting_id}/participants")
async def get_participants(
meeting_id: str,
user = Depends(get_current_user)
):
"""获取会议参与者"""
manager = get_meeting_manager()
participants = await manager.get_participants(meeting_id)
return {
"participants": [
{
"user_id": p.user_id,
"nickname": p.nickname,
"role": p.role.value,
"is_active": p.is_active,
"joined_at": p.joined_at.isoformat()
}
for p in participants
]
}
四、WebSocket 消息类型扩展
4.1 新增消息类型
# 客户端发送的消息类型
CLIENT_MESSAGE_TYPES = {
# 频道相关
"channel_subscribe": "订阅频道",
"channel_unsubscribe": "取消订阅频道",
"channel_message": "发送频道消息",
# 直接消息
"direct_message": "发送直接消息(P2P)",
"message_ack": "消息确认",
# 会议相关
"meeting_join": "加入会议",
"meeting_leave": "离开会议",
"meeting_message": "发送会议消息",
# 通用
"typing": "正在输入提示",
"presence": "在线状态更新"
}
# 服务端发送的消息类型
SERVER_MESSAGE_TYPES = {
# 频道相关
"channel_joined": "已加入频道",
"channel_left": "已离开频道",
"channel_message": "频道消息",
"member_joined": "成员加入",
"member_left": "成员离开",
# 直接消息
"direct_message": "直接消息",
"message_ack": "消息确认回执",
"offline_messages": "离线消息推送",
# 会议相关
"meeting_info": "会议信息",
"meeting_started": "会议开始",
"meeting_ended": "会议结束",
"meeting_message": "会议消息",
"participant_joined": "参与者加入",
"participant_left": "参与者离开",
# 通用
"typing": "对方正在输入",
"presence": "在线状态变化",
"error": "错误消息"
}
4.2 消息格式示例
// 订阅频道
{
"type": "channel_subscribe",
"channel_id": "ch_abc123"
}
// 发送频道消息
{
"type": "channel_message",
"channel_id": "ch_abc123",
"content": "大家好!",
"message_type": "text"
}
// 发送直接消息
{
"type": "direct_message",
"target": "device_xyz789",
"target_type": "device",
"content": "你好,有时间吗?",
"require_ack": true
}
// 加入会议
{
"type": "meeting_join",
"meeting_id": "meet_def456",
"nickname": "张三"
}
// 发送会议消息
{
"type": "meeting_message",
"meeting_id": "meet_def456",
"content": "我同意这个方案",
"message_type": "text"
}
五、实现计划
Phase 1: 基础设施 (1-2天)
- 创建数据库迁移脚本
- 实现 ChannelManager
- 实现 MessageRouter
- 扩展 WebSocket 消息处理
Phase 2: 频道聊天 (2-3天)
- 实现频道 CRUD API
- 实现频道消息广播
- 实现消息历史存储和查询
- 集成 AI 参与
Phase 3: 设备间转发 (1-2天)
- 实现 P2P 消息路由
- 实现离线消息队列
- 实现消息确认机制
- 跨协议转发测试
Phase 4: 会议模式 (2-3天)
- 实现 MeetingManager
- 实现会议 API
- 实现 AI 助手集成
- 实现会议总结生成
Phase 5: 测试和优化 (1-2天)
- 编写测试脚本
- 性能优化
- 文档完善
六、配置文件
# config/chat.yaml
chat:
# 频道配置
channel:
default_max_members: 100
default_history_days: 30
max_channels_per_user: 10
# 消息配置
message:
max_content_length: 4096
offline_ttl_hours: 24
max_offline_per_device: 100
# 会议配置
meeting:
default_duration_minutes: 60
max_participants: 50
auto_end_after_minutes: 180
ai_response_delay_ms: 500
# Redis 配置
redis:
prefix: "mbe:chat"
message_ttl_seconds: 86400
七、安全考虑
- 消息加密: 敏感消息可选端到端加密
- 权限验证: 所有操作需验证用户权限
- 频率限制: 防止消息刷屏
- 内容过滤: 可选敏感词过滤
- 审计日志: 记录关键操作
八、监控指标
# 建议监控的指标
METRICS = {
"chat_active_channels": "活跃频道数",
"chat_total_members": "总成员数",
"chat_messages_per_minute": "每分钟消息数",
"chat_active_meetings": "活跃会议数",
"chat_offline_queue_size": "离线消息队列大小",
"chat_message_delivery_latency": "消息投递延迟",
"chat_ai_response_latency": "AI 响应延迟"
}