MBE四层生态系统自动化测试计划

📅 创建时间

2026-02-02


🎯 测试目标

全面覆盖MBE四层生态系统的完整角色权限和业务流程,确保:

  • 所有用户角色的权限控制正确
  • 所有业务流程正常运行
  • 层与层之间的交互正确
  • 系统稳定性和性能达标

📊 测试框架概览

┌─────────────────────────────────────────────────────────────────┐
│                    MBE 自动化测试架构                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│  │  单元测试    │  │  集成测试    │  │  E2E测试    │            │
│  │  (pytest)   │  │  (pytest)   │  │ (Playwright) │            │
│  └─────────────┘  └─────────────┘  └─────────────┘            │
│         │                │                │                    │
│         └────────────────┼────────────────┘                    │
│                          ▼                                     │
│  ┌─────────────────────────────────────────────────────────┐  │
│  │                    测试辅助层                             │  │
│  │  • 认证Helper  • 数据生成  • Mock服务  • 断言工具         │  │
│  └─────────────────────────────────────────────────────────┘  │
│                          │                                     │
│                          ▼                                     │
│  ┌─────────────────────────────────────────────────────────┐  │
│  │                    CI/CD 集成                             │  │
│  │  • GitHub Actions  • 测试报告  • 覆盖率追踪              │  │
│  └─────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

📁 测试目录结构

tests/
├── unit/                           # 单元测试
│   ├── l1_core/                    # L1 核心引擎测试
│   │   ├── test_moe_router.py
│   │   ├── test_hope_learning.py
│   │   ├── test_titans_memory.py
│   │   └── test_llm_integration.py
│   ├── l2_expert/                  # L2 专家市场测试
│   │   ├── test_expert_creator.py
│   │   ├── test_expert_trainer.py
│   │   └── test_expert_store.py
│   ├── l3_app/                     # L3 应用市场测试
│   │   ├── test_sdk_client.py
│   │   └── test_api_gateway.py
│   └── l4_user/                    # L4 用户层测试
│       ├── test_student_service.py
│       ├── test_teacher_service.py
│       └── test_parent_service.py
│
├── integration/                    # 集成测试
│   ├── test_l1_l2_integration.py   # 核心-专家集成
│   ├── test_l2_l3_integration.py   # 专家-应用集成
│   ├── test_l3_l4_integration.py   # 应用-用户集成
│   └── test_full_stack.py          # 全栈集成
│
├── e2e/                            # E2E测试 (Playwright)
│   ├── l1_core/                    # L1 核心监控
│   │   └── core-dashboard.e2e.spec.ts
│   ├── l2_expert/                  # L2 专家市场
│   │   ├── expert-create.e2e.spec.ts
│   │   ├── expert-market.e2e.spec.ts
│   │   └── expert-admin.e2e.spec.ts
│   ├── l3_app/                     # L3 应用开发者
│   │   ├── developer-portal.e2e.spec.ts
│   │   ├── api-sandbox.e2e.spec.ts
│   │   └── sdk-docs.e2e.spec.ts
│   └── l4_user/                    # L4 最终用户
│       ├── student/
│       │   ├── login.e2e.spec.ts
│       │   ├── dashboard.e2e.spec.ts
│       │   ├── learning.e2e.spec.ts
│       │   └── practice.e2e.spec.ts
│       ├── teacher/
│       │   ├── course-manage.e2e.spec.ts
│       │   ├── analytics.e2e.spec.ts
│       │   └── intervention.e2e.spec.ts
│       └── parent/
│           └── monitor.e2e.spec.ts
│
├── api/                            # API测试 (Playwright API)
│   ├── l1_core.api.spec.ts
│   ├── l2_expert.api.spec.ts
│   ├── l3_app.api.spec.ts
│   └── l4_user.api.spec.ts
│
├── permission/                     # 权限测试
│   ├── test_role_permissions.py
│   ├── test_layer_access.py
│   └── test_cross_layer_auth.py
│
├── workflow/                       # 业务流程测试
│   ├── test_expert_publish_flow.py
│   ├── test_app_develop_flow.py
│   ├── test_student_learn_flow.py
│   └── test_settlement_flow.py
│
├── performance/                    # 性能测试
│   ├── test_api_performance.py
│   ├── test_concurrent_users.py
│   └── test_expert_response.py
│
├── fixtures/                       # 测试数据
│   ├── users.json
│   ├── experts.json
│   ├── courses.json
│   └── conftest.py
│
└── helpers/                        # 辅助工具
    ├── auth.helper.ts
    ├── data.helper.ts
    ├── api.helper.ts
    └── mock.helper.ts

1️⃣ L1 核心引擎层测试

1.1 单元测试 (Python/pytest)

test_moe_router.py

"""MOE路由器单元测试"""
import pytest
from src.moe.router import ExpertRouter, RouteResult

class TestMOERouter:
    """MOE专家路由器测试"""
    
    @pytest.fixture
    def router(self):
        """创建路由器实例"""
        return ExpertRouter()
    
    # === 路由功能测试 ===
    def test_route_basic_query(self, router):
        """测试基本问题路由"""
        result = router.route("什么是Python变量?")
        assert isinstance(result, RouteResult)
        assert result.expert_id is not None
        assert result.confidence > 0
    
    def test_route_topk_experts(self, router):
        """测试TopK专家选择"""
        result = router.route("如何学习编程?", top_k=3)
        assert len(result.candidates) == 3
        assert all(c.confidence > 0 for c in result.candidates)
    
    def test_route_unknown_domain(self, router):
        """测试未知领域路由降级"""
        result = router.route("随机乱码测试xyzabc")
        assert result.fallback_used == True
        assert result.expert_id == "general"
    
    # === 负载均衡测试 ===
    def test_load_balancing(self, router):
        """测试负载均衡"""
        results = [router.route("Python问题") for _ in range(100)]
        expert_counts = {}
        for r in results:
            expert_counts[r.expert_id] = expert_counts.get(r.expert_id, 0) + 1
        # 验证分布相对均匀
        assert max(expert_counts.values()) < 80
    
    # === 错误处理测试 ===
    def test_route_empty_query(self, router):
        """测试空查询处理"""
        with pytest.raises(ValueError):
            router.route("")
    
    def test_route_timeout(self, router, mocker):
        """测试超时处理"""
        mocker.patch.object(router, '_match_expert', side_effect=TimeoutError)
        result = router.route("测试问题")
        assert result.fallback_used == True

test_hope_learning.py

"""HOPE惊讶度学习单元测试"""
import pytest
import numpy as np
from src.hope.surprise_detector import SurpriseDetector
from src.hope.memory_updater import MemoryUpdater

class TestSurpriseDetector:
    """惊讶度检测器测试"""
    
    @pytest.fixture
    def detector(self):
        return SurpriseDetector()
    
    def test_detect_high_surprise(self, detector):
        """测试高惊讶度检测"""
        # 用户首次遇到新概念
        result = detector.detect(
            user_id="test_user",
            content="递归函数的概念",
            user_history=[]
        )
        assert result.surprise_score > 0.7
        assert result.should_elaborate == True
    
    def test_detect_low_surprise(self, detector):
        """测试低惊讶度检测"""
        # 用户已熟悉的概念
        result = detector.detect(
            user_id="test_user",
            content="Python print函数",
            user_history=["print函数", "print语法"]
        )
        assert result.surprise_score < 0.3
        assert result.should_skip == True
    
    def test_detect_medium_surprise(self, detector):
        """测试中等惊讶度"""
        result = detector.detect(
            user_id="test_user",
            content="列表推导式",
            user_history=["列表基础"]
        )
        assert 0.3 <= result.surprise_score <= 0.7
        assert result.normal_pace == True

class TestMemoryUpdater:
    """记忆更新器测试"""
    
    @pytest.fixture
    def updater(self):
        return MemoryUpdater()
    
    def test_update_short_term(self, updater):
        """测试短期记忆更新"""
        updater.update(
            user_id="test_user",
            content="新学习内容",
            memory_type="short_term"
        )
        memory = updater.get_memory("test_user", "short_term")
        assert "新学习内容" in memory.recent_items
    
    def test_memory_consolidation(self, updater):
        """测试记忆巩固(短期→长期)"""
        # 重复学习同一内容
        for _ in range(5):
            updater.update(
                user_id="test_user",
                content="重要概念",
                memory_type="short_term"
            )
        memory = updater.get_memory("test_user", "long_term")
        assert "重要概念" in memory.consolidated_items

test_titans_memory.py

"""TITANS记忆系统单元测试"""
import pytest
from src.titans.miras_matcher import MIRASMatcher
from src.titans.memory_bank import MemoryBank

class TestMIRASMatcher:
    """MIRAS多尺度检索测试"""
    
    @pytest.fixture
    def matcher(self):
        return MIRASMatcher()
    
    def test_multi_scale_retrieval(self, matcher):
        """测试多尺度检索"""
        results = matcher.retrieve(
            query="Python函数参数",
            scales=["word", "sentence", "paragraph"]
        )
        assert len(results) == 3
        assert all(r.relevance_score > 0 for r in results)
    
    def test_context_aware_matching(self, matcher):
        """测试上下文感知匹配"""
        # 带上下文的查询应该更准确
        result_no_context = matcher.retrieve("变量")
        result_with_context = matcher.retrieve(
            "变量",
            context="Python编程学习"
        )
        assert result_with_context[0].relevance_score > result_no_context[0].relevance_score

class TestMemoryBank:
    """记忆库测试"""
    
    @pytest.fixture
    def bank(self):
        return MemoryBank()
    
    def test_store_and_retrieve(self, bank):
        """测试存储和检索"""
        bank.store(
            key="test_key",
            content="测试内容",
            metadata={"type": "concept"}
        )
        result = bank.retrieve("test_key")
        assert result.content == "测试内容"
    
    def test_cache_hit(self, bank):
        """测试缓存命中"""
        bank.store("cached_key", "缓存内容")
        # 第一次访问
        bank.retrieve("cached_key")
        # 第二次应该从缓存获取
        result = bank.retrieve("cached_key")
        assert result.from_cache == True

1.2 核心API测试 (Playwright)

l1_core.api.spec.ts

import { test, expect } from '@playwright/test';

const API_BASE = process.env.API_BASE || 'http://localhost:8000';

test.describe('L1 Core APIs', () => {
  
  test.describe('健康检查', () => {
    test('系统健康检查', async ({ request }) => {
      const response = await request.get(`${API_BASE}/api/health`);
      expect(response.ok()).toBeTruthy();
      
      const data = await response.json();
      expect(data.status).toBe('healthy');
      expect(data.components).toHaveProperty('database');
      expect(data.components).toHaveProperty('redis');
      expect(data.components).toHaveProperty('llm');
    });
    
    test('MOE状态检查', async ({ request }) => {
      const response = await request.get(`${API_BASE}/api/core/moe/status`);
      expect(response.ok()).toBeTruthy();
      
      const data = await response.json();
      expect(data.active_experts).toBeGreaterThan(0);
      expect(data.router_ready).toBe(true);
    });
  });
  
  test.describe('专家路由API', () => {
    test('POST /api/core/route - 基本路由', async ({ request }) => {
      const response = await request.post(`${API_BASE}/api/core/route`, {
        data: {
          query: "什么是Python变量?",
          user_id: "test_user"
        }
      });
      expect(response.ok()).toBeTruthy();
      
      const data = await response.json();
      expect(data).toHaveProperty('expert_id');
      expect(data).toHaveProperty('confidence');
      expect(data.confidence).toBeGreaterThan(0);
    });
    
    test('POST /api/core/route - TopK路由', async ({ request }) => {
      const response = await request.post(`${API_BASE}/api/core/route`, {
        data: {
          query: "如何学习编程?",
          user_id: "test_user",
          top_k: 3
        }
      });
      expect(response.ok()).toBeTruthy();
      
      const data = await response.json();
      expect(data.candidates).toHaveLength(3);
    });
  });
  
  test.describe('HOPE学习API', () => {
    test('POST /api/core/learn - 惊讶度检测', async ({ request }) => {
      const response = await request.post(`${API_BASE}/api/core/learn/surprise`, {
        data: {
          user_id: "test_user",
          content: "递归函数",
          context: "学习Python高级特性"
        }
      });
      expect(response.ok()).toBeTruthy();
      
      const data = await response.json();
      expect(data).toHaveProperty('surprise_score');
      expect(data.surprise_score).toBeGreaterThanOrEqual(0);
      expect(data.surprise_score).toBeLessThanOrEqual(1);
    });
    
    test('POST /api/core/learn/update - 记忆更新', async ({ request }) => {
      const response = await request.post(`${API_BASE}/api/core/learn/update`, {
        data: {
          user_id: "test_user",
          learned_content: "Python变量",
          mastery_level: 0.8
        }
      });
      expect(response.ok()).toBeTruthy();
      
      const data = await response.json();
      expect(data.updated).toBe(true);
    });
  });
  
  test.describe('TITANS记忆API', () => {
    test('POST /api/core/memory/write - 写入记忆', async ({ request }) => {
      const response = await request.post(`${API_BASE}/api/core/memory/write`, {
        data: {
          user_id: "test_user",
          memory_type: "short_term",
          content: "测试记忆内容",
          metadata: { source: "learning" }
        }
      });
      expect(response.ok()).toBeTruthy();
    });
    
    test('GET /api/core/memory/read - 读取记忆', async ({ request }) => {
      const response = await request.get(
        `${API_BASE}/api/core/memory/read?user_id=test_user&memory_type=short_term`
      );
      expect(response.ok()).toBeTruthy();
      
      const data = await response.json();
      expect(data).toHaveProperty('memories');
      expect(Array.isArray(data.memories)).toBe(true);
    });
  });
});

2️⃣ L2 专家市场层测试

2.1 单元测试 (Python)

test_expert_creator.py

"""专家创建者单元测试"""
import pytest
from src.expert_marketplace.expert_manager import ExpertManager
from src.expert_marketplace.expert_trainer import ExpertTrainer

class TestExpertCreation:
    """专家创建测试"""
    
    @pytest.fixture
    def manager(self):
        return ExpertManager()
    
    @pytest.fixture
    def trainer(self):
        return ExpertTrainer()
    
    # === 知识库上传测试 ===
    def test_upload_pdf(self, manager, tmp_path):
        """测试PDF上传"""
        pdf_file = tmp_path / "test.pdf"
        pdf_file.write_bytes(b"%PDF-1.4 test content")
        
        result = manager.upload_document(
            expert_id="test_expert",
            file_path=str(pdf_file),
            file_type="pdf"
        )
        assert result.success == True
        assert result.chunks_count > 0
    
    def test_upload_invalid_format(self, manager, tmp_path):
        """测试无效格式拒绝"""
        exe_file = tmp_path / "test.exe"
        exe_file.write_bytes(b"MZ...")
        
        with pytest.raises(ValueError, match="不支持的文件格式"):
            manager.upload_document(
                expert_id="test_expert",
                file_path=str(exe_file),
                file_type="exe"
            )
    
    # === 专家配置测试 ===
    def test_create_expert_config(self, manager):
        """测试专家配置创建"""
        config = manager.create_expert(
            name="Python导师",
            domain="programming",
            keywords=["python", "编程", "代码"],
            description="Python编程教学专家"
        )
        assert config.expert_id is not None
        assert config.status == "draft"
    
    def test_validate_expert_config(self, manager):
        """测试专家配置验证"""
        # 缺少必填字段
        with pytest.raises(ValueError, match="name不能为空"):
            manager.create_expert(
                name="",
                domain="test"
            )
    
    # === 专家训练测试 ===
    def test_train_expert(self, trainer, mocker):
        """测试专家训练"""
        mocker.patch.object(trainer, '_vectorize', return_value=True)
        mocker.patch.object(trainer, '_build_index', return_value=True)
        
        result = trainer.train(
            expert_id="test_expert",
            knowledge_base_id="kb_001"
        )
        assert result.status == "completed"
        assert result.vector_count > 0
    
    def test_train_progress_callback(self, trainer, mocker):
        """测试训练进度回调"""
        progress_updates = []
        
        def on_progress(progress):
            progress_updates.append(progress)
        
        trainer.train(
            expert_id="test_expert",
            knowledge_base_id="kb_001",
            on_progress=on_progress
        )
        
        assert len(progress_updates) > 0
        assert progress_updates[-1].percentage == 100

class TestExpertPublish:
    """专家发布测试"""
    
    @pytest.fixture
    def manager(self):
        return ExpertManager()
    
    def test_submit_for_review(self, manager):
        """测试提交审核"""
        result = manager.submit_for_review(
            expert_id="test_expert",
            developer_id="dev_001"
        )
        assert result.status == "pending_review"
    
    def test_submit_incomplete_expert(self, manager):
        """测试提交不完整专家"""
        with pytest.raises(ValueError, match="专家配置不完整"):
            manager.submit_for_review(
                expert_id="incomplete_expert",
                developer_id="dev_001"
            )

2.2 E2E测试 (Playwright)

expert-create.e2e.spec.ts

import { test, expect } from '@playwright/test';
import { loginAsExpertCreator } from '../../helpers/auth.helper';

test.describe('L2 专家创建流程', () => {
  
  test.beforeEach(async ({ page }) => {
    await loginAsExpertCreator(page);
  });
  
  test('完整专家创建流程', async ({ page }) => {
    // 步骤1: 进入专家创建页面
    await page.goto('/expert/create');
    await expect(page.getByText('创建新专家')).toBeVisible();
    
    // 步骤2: 填写基本信息
    await page.fill('[name="expert_name"]', 'Python编程导师');
    await page.selectOption('[name="domain"]', 'programming');
    await page.fill('[name="description"]', '专业的Python编程教学专家');
    await page.click('button:has-text("下一步")');
    
    // 步骤3: 上传知识库
    await expect(page.getByText('上传知识库')).toBeVisible();
    const fileInput = page.locator('input[type="file"]');
    await fileInput.setInputFiles('./fixtures/test_knowledge.pdf');
    await expect(page.getByText('上传成功')).toBeVisible({ timeout: 30000 });
    await page.click('button:has-text("下一步")');
    
    // 步骤4: 配置关键词
    await page.fill('[name="keywords"]', 'python, 编程, 函数, 类');
    await page.click('button:has-text("下一步")');
    
    // 步骤5: 开始训练
    await page.click('button:has-text("开始训练")');
    await expect(page.getByText('训练中')).toBeVisible();
    
    // 等待训练完成 (最多5分钟)
    await expect(page.getByText('训练完成')).toBeVisible({ timeout: 300000 });
    
    // 步骤6: 测试专家
    await page.click('button:has-text("测试专家")');
    await page.fill('[name="test_question"]', '什么是Python函数?');
    await page.click('button:has-text("提问")');
    await expect(page.locator('.expert-response')).toBeVisible({ timeout: 30000 });
    
    // 步骤7: 提交审核
    await page.click('button:has-text("提交审核")');
    await expect(page.getByText('已提交审核')).toBeVisible();
  });
  
  test('上传无效文件被拒绝', async ({ page }) => {
    await page.goto('/expert/create');
    await page.fill('[name="expert_name"]', '测试专家');
    await page.click('button:has-text("下一步")');
    
    const fileInput = page.locator('input[type="file"]');
    await fileInput.setInputFiles('./fixtures/invalid.exe');
    
    await expect(page.getByText('不支持的文件格式')).toBeVisible();
  });
});

expert-admin.e2e.spec.ts

import { test, expect } from '@playwright/test';
import { loginAsExpertAdmin } from '../../helpers/auth.helper';

test.describe('L2 专家管理员流程', () => {
  
  test.beforeEach(async ({ page }) => {
    await loginAsExpertAdmin(page);
  });
  
  test('审核专家 - 通过', async ({ page }) => {
    await page.goto('/expert/admin');
    
    // 查看待审核列表
    await expect(page.getByText('待审核专家')).toBeVisible();
    
    // 点击第一个待审核专家
    await page.click('.pending-expert-item:first-child');
    
    // 查看专家详情
    await expect(page.getByText('专家详情')).toBeVisible();
    
    // 测试专家效果
    await page.click('button:has-text("测试")');
    await page.fill('[name="test_query"]', '测试问题');
    await page.click('button:has-text("提问")');
    await expect(page.locator('.test-response')).toBeVisible({ timeout: 30000 });
    
    // 通过审核
    await page.click('button:has-text("通过")');
    await page.fill('[name="review_comment"]', '质量合格,通过审核');
    await page.click('button:has-text("确认通过")');
    
    await expect(page.getByText('审核通过')).toBeVisible();
  });
  
  test('审核专家 - 拒绝', async ({ page }) => {
    await page.goto('/expert/admin');
    await page.click('.pending-expert-item:first-child');
    
    // 拒绝审核
    await page.click('button:has-text("拒绝")');
    await page.fill('[name="reject_reason"]', '知识库内容质量不足');
    await page.click('button:has-text("确认拒绝")');
    
    await expect(page.getByText('已拒绝')).toBeVisible();
  });
  
  test('下架问题专家', async ({ page }) => {
    await page.goto('/expert/admin/published');
    
    // 找到问题专家
    await page.click('.expert-item:has-text("问题专家")');
    
    // 下架
    await page.click('button:has-text("下架")');
    await page.fill('[name="offline_reason"]', '错误率过高');
    await page.click('button:has-text("确认下架")');
    
    await expect(page.getByText('已下架')).toBeVisible();
  });
});

3️⃣ L3 应用市场层测试

3.1 API测试 (Playwright)

l3_app.api.spec.ts

import { test, expect } from '@playwright/test';

const API_BASE = process.env.API_BASE || 'http://localhost:8000';

test.describe('L3 Application APIs', () => {
  
  let apiKey: string;
  let accessToken: string;
  
  test.beforeAll(async ({ request }) => {
    // 获取开发者API Key
    const loginResponse = await request.post(`${API_BASE}/api/auth/login`, {
      data: {
        username: 'test_developer',
        password: 'test_password'
      }
    });
    const loginData = await loginResponse.json();
    accessToken = loginData.access_token;
    
    // 获取API Key
    const keyResponse = await request.post(`${API_BASE}/api/developer/keys`, {
      headers: { Authorization: `Bearer ${accessToken}` },
      data: { name: 'test_key' }
    });
    const keyData = await keyResponse.json();
    apiKey = keyData.api_key;
  });
  
  test.describe('开发者认证', () => {
    test('API Key认证成功', async ({ request }) => {
      const response = await request.get(`${API_BASE}/api/expert/list`, {
        headers: { 'X-API-Key': apiKey }
      });
      expect(response.ok()).toBeTruthy();
    });
    
    test('无效API Key拒绝', async ({ request }) => {
      const response = await request.get(`${API_BASE}/api/expert/list`, {
        headers: { 'X-API-Key': 'invalid_key' }
      });
      expect(response.status()).toBe(401);
    });
    
    test('无认证拒绝', async ({ request }) => {
      const response = await request.get(`${API_BASE}/api/expert/list`);
      expect(response.status()).toBe(401);
    });
  });
  
  test.describe('专家API', () => {
    test('GET /api/expert/list - 获取专家列表', async ({ request }) => {
      const response = await request.get(`${API_BASE}/api/expert/list`, {
        headers: { 'X-API-Key': apiKey }
      });
      expect(response.ok()).toBeTruthy();
      
      const data = await response.json();
      expect(data.experts).toBeDefined();
      expect(Array.isArray(data.experts)).toBe(true);
    });
    
    test('POST /api/expert/ask - 向专家提问', async ({ request }) => {
      const response = await request.post(`${API_BASE}/api/expert/ask`, {
        headers: { 'X-API-Key': apiKey },
        data: {
          expert_id: 'python_tutor',
          query: '什么是Python变量?',
          user_id: 'app_user_001'
        }
      });
      expect(response.ok()).toBeTruthy();
      
      const data = await response.json();
      expect(data.answer).toBeDefined();
      expect(data.confidence).toBeGreaterThan(0);
    });
    
    test('POST /api/expert/ask - 速率限制', async ({ request }) => {
      // 快速发送多个请求
      const requests = Array(100).fill(null).map(() =>
        request.post(`${API_BASE}/api/expert/ask`, {
          headers: { 'X-API-Key': apiKey },
          data: {
            expert_id: 'python_tutor',
            query: '测试问题'
          }
        })
      );
      
      const responses = await Promise.all(requests);
      const rateLimited = responses.filter(r => r.status() === 429);
      expect(rateLimited.length).toBeGreaterThan(0);
    });
  });
  
  test.describe('SDK接口', () => {
    test('GET /api/sdk/info - 获取SDK信息', async ({ request }) => {
      const response = await request.get(`${API_BASE}/api/sdk/info`);
      expect(response.ok()).toBeTruthy();
      
      const data = await response.json();
      expect(data.versions).toBeDefined();
      expect(data.download_urls).toBeDefined();
    });
  });
});

3.2 E2E测试 (Playwright)

developer-portal.e2e.spec.ts

import { test, expect } from '@playwright/test';
import { loginAsDeveloper, registerDeveloper } from '../../helpers/auth.helper';

test.describe('L3 开发者门户', () => {
  
  test('新开发者注册流程', async ({ page }) => {
    await page.goto('/developer/register');
    
    // 填写注册信息
    await page.fill('[name="email"]', `dev_${Date.now()}@test.com`);
    await page.fill('[name="password"]', 'Test123456!');
    await page.fill('[name="company"]', '测试公司');
    await page.fill('[name="purpose"]', '开发教育应用');
    
    // 同意协议
    await page.check('[name="agree_terms"]');
    
    // 提交注册
    await page.click('button:has-text("注册")');
    
    // 验证成功
    await expect(page.getByText('注册成功')).toBeVisible();
  });
  
  test('开发者控制台', async ({ page }) => {
    await loginAsDeveloper(page);
    await page.goto('/developer/console');
    
    // 验证控制台元素
    await expect(page.getByText('开发者控制台')).toBeVisible();
    await expect(page.getByText('API 密钥')).toBeVisible();
    await expect(page.getByText('使用统计')).toBeVisible();
  });
  
  test('创建API密钥', async ({ page }) => {
    await loginAsDeveloper(page);
    await page.goto('/developer/console/keys');
    
    // 点击创建
    await page.click('button:has-text("创建密钥")');
    
    // 填写信息
    await page.fill('[name="key_name"]', '测试密钥');
    await page.selectOption('[name="permissions"]', ['chat', 'expert']);
    
    // 确认创建
    await page.click('button:has-text("确认")');
    
    // 验证密钥显示
    await expect(page.locator('.api-key-value')).toBeVisible();
    await expect(page.getByText('请保存此密钥')).toBeVisible();
  });
  
  test('API沙盒测试', async ({ page }) => {
    await loginAsDeveloper(page);
    await page.goto('/developer/sandbox');
    
    // 选择API
    await page.selectOption('[name="api_endpoint"]', '/api/expert/ask');
    
    // 填写参数
    await page.fill('[name="request_body"]', JSON.stringify({
      expert_id: 'python_tutor',
      query: '什么是Python?'
    }));
    
    // 发送请求
    await page.click('button:has-text("发送请求")');
    
    // 验证响应
    await expect(page.locator('.response-panel')).toBeVisible();
    await expect(page.locator('.response-status:has-text("200")')).toBeVisible();
  });
});

4️⃣ L4 最终用户层测试

4.1 学生角色测试

student/learning.e2e.spec.ts

import { test, expect } from '@playwright/test';
import { loginAsStudent } from '../../../helpers/auth.helper';

test.describe('L4 学生学习流程', () => {
  
  test.beforeEach(async ({ page }) => {
    await loginAsStudent(page);
  });
  
  test('完整学习流程', async ({ page }) => {
    // 1. 进入学习中心
    await page.goto('/student/learn');
    await expect(page.getByText('我的课程')).toBeVisible();
    
    // 2. 选择课程
    await page.click('.course-card:has-text("Python基础")');
    
    // 3. 进入课程学习
    await page.click('button:has-text("继续学习")');
    
    // 4. AI对话学习
    await expect(page.locator('.ai-chat-panel')).toBeVisible();
    
    // 5. 提问
    await page.fill('[name="user_message"]', '什么是Python变量?');
    await page.click('button:has-text("发送")');
    
    // 6. 等待AI回复
    await expect(page.locator('.ai-response')).toBeVisible({ timeout: 30000 });
    
    // 7. 验证惊讶度显示
    await expect(page.locator('.surprise-indicator')).toBeVisible();
    
    // 8. 完成学习,获得XP
    await page.click('button:has-text("完成本节")');
    await expect(page.getByText('+10 XP')).toBeVisible();
  });
  
  test('MBE五步行为分析流程', async ({ page }) => {
    await page.goto('/student/learn/python-basics/lesson-1');
    
    // 步骤1: 不舒适检测
    await page.fill('[name="user_message"]', '我不太理解这个概念');
    await page.click('button:has-text("发送")');
    await expect(page.locator('.discomfort-detected')).toBeVisible({ timeout: 10000 });
    
    // 步骤2: 愿望分析
    await expect(page.locator('.desire-analysis')).toBeVisible();
    
    // 步骤3: 路径生成
    await expect(page.locator('.learning-path-suggestion')).toBeVisible();
    
    // 步骤4: 方案评估(惊讶度)
    await expect(page.locator('.surprise-score')).toBeVisible();
    
    // 步骤5: 行动支持
    await expect(page.locator('.action-support')).toBeVisible();
  });
  
  test('练习题系统', async ({ page }) => {
    await page.goto('/student/practice');
    
    // 选择练习
    await page.click('.practice-card:has-text("Python基础")');
    
    // 回答选择题
    await expect(page.locator('.question')).toBeVisible();
    await page.click('label:has-text("B")'); // 选择答案B
    await page.click('button:has-text("提交答案")');
    
    // 验证反馈
    await expect(page.locator('.feedback')).toBeVisible();
  });
  
  test('查看学习进度', async ({ page }) => {
    await page.goto('/student/dashboard');
    
    // 验证进度显示
    await expect(page.locator('.progress-bar')).toBeVisible();
    await expect(page.locator('.xp-display')).toBeVisible();
    await expect(page.locator('.level-display')).toBeVisible();
    await expect(page.locator('.achievements')).toBeVisible();
  });
});

4.2 教师角色测试

teacher/course-manage.e2e.spec.ts

import { test, expect } from '@playwright/test';
import { loginAsTeacher } from '../../../helpers/auth.helper';

test.describe('L4 教师课程管理', () => {
  
  test.beforeEach(async ({ page }) => {
    await loginAsTeacher(page);
  });
  
  test('创建新课程', async ({ page }) => {
    await page.goto('/teacher/courses');
    
    // 点击创建
    await page.click('button:has-text("创建课程")');
    
    // 填写基本信息
    await page.fill('[name="course_name"]', '测试课程');
    await page.fill('[name="description"]', '这是一个测试课程');
    await page.selectOption('[name="difficulty"]', 'beginner');
    
    // 添加章节
    await page.click('button:has-text("添加章节")');
    await page.fill('[name="chapter_title"]', '第一章');
    
    // 添加小节
    await page.click('button:has-text("添加小节")');
    await page.fill('[name="lesson_title"]', '第一节');
    
    // 使用AI生成内容
    await page.click('button:has-text("AI生成")');
    await page.fill('[name="ai_prompt"]', '生成Python变量讲解');
    await page.click('button:has-text("生成")');
    await expect(page.locator('.generated-content')).toBeVisible({ timeout: 60000 });
    
    // 保存课程
    await page.click('button:has-text("保存")');
    await expect(page.getByText('保存成功')).toBeVisible();
  });
  
  test('学习数据分析', async ({ page }) => {
    await page.goto('/teacher/analytics');
    
    // 验证分析面板
    await expect(page.locator('.class-overview')).toBeVisible();
    await expect(page.locator('.knowledge-heatmap')).toBeVisible();
    await expect(page.locator('.learning-trend')).toBeVisible();
    
    // 查看困难学生
    await page.click('text=需要关注');
    await expect(page.locator('.at-risk-students')).toBeVisible();
  });
  
  test('个性化干预', async ({ page }) => {
    await page.goto('/teacher/interventions');
    
    // 查看AI建议
    await expect(page.locator('.ai-suggestions')).toBeVisible();
    
    // 选择一个建议
    await page.click('.suggestion-item:first-child');
    
    // 采纳建议
    await page.click('button:has-text("采纳建议")');
    await expect(page.getByText('已应用')).toBeVisible();
  });
});

4.3 家长角色测试

parent/monitor.e2e.spec.ts

import { test, expect } from '@playwright/test';
import { loginAsParent } from '../../../helpers/auth.helper';

test.describe('L4 家长监控', () => {
  
  test.beforeEach(async ({ page }) => {
    await loginAsParent(page);
  });
  
  test('查看孩子学习报告', async ({ page }) => {
    await page.goto('/parent/dashboard');
    
    // 验证报告元素
    await expect(page.getByText('学习报告')).toBeVisible();
    await expect(page.locator('.weekly-summary')).toBeVisible();
    await expect(page.locator('.learning-time')).toBeVisible();
    await expect(page.locator('.task-completion')).toBeVisible();
  });
  
  test('设置学习时间限制', async ({ page }) => {
    await page.goto('/parent/settings');
    
    // 设置时间限制
    await page.fill('[name="daily_limit_hours"]', '2');
    await page.click('button:has-text("保存")');
    
    await expect(page.getByText('设置已保存')).toBeVisible();
  });
  
  test('联系教师', async ({ page }) => {
    await page.goto('/parent/dashboard');
    
    // 点击联系教师
    await page.click('button:has-text("联系教师")');
    
    // 填写消息
    await page.fill('[name="message"]', '请问孩子最近学习情况如何?');
    await page.click('button:has-text("发送")');
    
    await expect(page.getByText('消息已发送')).toBeVisible();
  });
});

5️⃣ 权限测试

test_role_permissions.py

"""角色权限测试"""
import pytest
from src.users.models import UserRole, ROLE_PERMISSIONS, ROLE_HIERARCHY

class TestRolePermissions:
    """角色权限矩阵测试"""
    
    # === L1 核心开发者权限 ===
    def test_core_developer_full_access(self):
        """核心开发者应有完全访问权限"""
        permissions = ROLE_PERMISSIONS.get("core_developer", {})
        assert permissions.get("core_access") == True
        assert permissions.get("system_config") == True
        assert permissions.get("moe_management") == True
        assert permissions.get("hope_config") == True
        assert permissions.get("titans_management") == True
    
    # === L2 专家创建者权限 ===
    def test_expert_creator_permissions(self):
        """专家创建者权限"""
        permissions = ROLE_PERMISSIONS.get("expert_creator", {})
        assert permissions.get("create_expert") == True
        assert permissions.get("upload_knowledge") == True
        assert permissions.get("train_expert") == True
        assert permissions.get("publish_expert") == True
        # 不应有管理权限
        assert permissions.get("admin_review") == False
        assert permissions.get("core_access") == False
    
    def test_expert_admin_permissions(self):
        """专家管理员权限"""
        permissions = ROLE_PERMISSIONS.get("expert_admin", {})
        assert permissions.get("review_expert") == True
        assert permissions.get("publish_expert") == True
        assert permissions.get("offline_expert") == True
        # 不应有核心访问权限
        assert permissions.get("core_access") == False
    
    # === L3 应用开发者权限 ===
    def test_app_developer_permissions(self):
        """应用开发者权限"""
        permissions = ROLE_PERMISSIONS.get("app_developer", {})
        assert permissions.get("api_access") == True
        assert permissions.get("sdk_download") == True
        assert permissions.get("create_api_key") == True
        assert permissions.get("use_experts") == True
        # 不应有专家管理权限
        assert permissions.get("create_expert") == False
        assert permissions.get("review_expert") == False
    
    # === L4 最终用户权限 ===
    def test_student_permissions(self):
        """学生权限"""
        permissions = ROLE_PERMISSIONS.get("student", {})
        assert permissions.get("view_courses") == True
        assert permissions.get("learn") == True
        assert permissions.get("practice") == True
        assert permissions.get("view_progress") == True
        # 不应有管理权限
        assert permissions.get("manage_courses") == False
        assert permissions.get("api_access") == False
    
    def test_teacher_permissions(self):
        """教师权限"""
        permissions = ROLE_PERMISSIONS.get("teacher", {})
        assert permissions.get("manage_students") == True
        assert permissions.get("create_courses") == True
        assert permissions.get("view_analytics") == True
        assert permissions.get("send_interventions") == True
        # 不应有系统管理权限
        assert permissions.get("system_config") == False
    
    def test_parent_permissions(self):
        """家长权限"""
        permissions = ROLE_PERMISSIONS.get("parent", {})
        assert permissions.get("view_child_progress") == True
        assert permissions.get("set_time_limits") == True
        assert permissions.get("contact_teacher") == True
        # 不应有学习权限
        assert permissions.get("learn") == False
        assert permissions.get("manage_courses") == False

class TestRoleHierarchy:
    """角色层级测试"""
    
    def test_hierarchy_order(self):
        """测试层级顺序"""
        assert ROLE_HIERARCHY["core_developer"] < ROLE_HIERARCHY["expert_admin"]
        assert ROLE_HIERARCHY["expert_admin"] < ROLE_HIERARCHY["app_developer"]
        assert ROLE_HIERARCHY["app_developer"] < ROLE_HIERARCHY["teacher"]
        assert ROLE_HIERARCHY["teacher"] < ROLE_HIERARCHY["student"]
    
    def test_cannot_manage_higher_role(self):
        """低层级不能管理高层级"""
        from src.users.auth import can_manage_role
        
        # 学生不能管理教师
        assert can_manage_role("student", "teacher") == False
        # 教师不能管理应用开发者
        assert can_manage_role("teacher", "app_developer") == False
        # 核心开发者可以管理所有
        assert can_manage_role("core_developer", "student") == True

test_cross_layer_auth.py

"""跨层权限测试"""
import pytest
import httpx

API_BASE = "http://localhost:8000"

class TestCrossLayerAuth:
    """跨层访问权限测试"""
    
    @pytest.fixture
    def student_token(self):
        """获取学生Token"""
        response = httpx.post(f"{API_BASE}/api/auth/login", json={
            "username": "test_student",
            "password": "test_password"
        })
        return response.json()["access_token"]
    
    @pytest.fixture
    def teacher_token(self):
        """获取教师Token"""
        response = httpx.post(f"{API_BASE}/api/auth/login", json={
            "username": "test_teacher",
            "password": "test_password"
        })
        return response.json()["access_token"]
    
    @pytest.fixture
    def developer_api_key(self):
        """获取开发者API Key"""
        return "mbe_dev_test_key"
    
    # === L4用户访问L1核心API ===
    def test_student_cannot_access_core_api(self, student_token):
        """学生不能访问核心API"""
        response = httpx.get(
            f"{API_BASE}/api/core/moe/status",
            headers={"Authorization": f"Bearer {student_token}"}
        )
        assert response.status_code == 403
    
    # === L4用户访问L2专家管理 ===
    def test_student_cannot_manage_experts(self, student_token):
        """学生不能管理专家"""
        response = httpx.post(
            f"{API_BASE}/api/expert/create",
            headers={"Authorization": f"Bearer {student_token}"},
            json={"name": "test"}
        )
        assert response.status_code == 403
    
    # === L4用户访问L3开发者API ===
    def test_student_cannot_access_developer_api(self, student_token):
        """学生不能访问开发者API"""
        response = httpx.post(
            f"{API_BASE}/api/developer/keys",
            headers={"Authorization": f"Bearer {student_token}"}
        )
        assert response.status_code == 403
    
    # === L3开发者访问L2专家创建 ===
    def test_developer_cannot_create_expert_without_permission(self, developer_api_key):
        """开发者没有专家创建权限时不能创建"""
        response = httpx.post(
            f"{API_BASE}/api/expert/create",
            headers={"X-API-Key": developer_api_key},
            json={"name": "test"}
        )
        # 应该被拒绝(除非有额外权限)
        assert response.status_code in [403, 401]
    
    # === 教师访问学生数据 ===
    def test_teacher_can_view_own_students(self, teacher_token):
        """教师可以查看自己学生的数据"""
        response = httpx.get(
            f"{API_BASE}/api/teacher/students",
            headers={"Authorization": f"Bearer {teacher_token}"}
        )
        assert response.status_code == 200
    
    def test_teacher_cannot_view_other_students(self, teacher_token):
        """教师不能查看其他班级学生"""
        response = httpx.get(
            f"{API_BASE}/api/teacher/students?class_id=other_class",
            headers={"Authorization": f"Bearer {teacher_token}"}
        )
        assert response.status_code == 403

6️⃣ 业务流程测试

test_expert_publish_flow.py

"""专家发布完整流程测试"""
import pytest
import httpx
import time

API_BASE = "http://localhost:8000"

class TestExpertPublishFlow:
    """专家发布完整业务流程"""
    
    @pytest.fixture
    def expert_creator_token(self):
        response = httpx.post(f"{API_BASE}/api/auth/login", json={
            "username": "expert_creator",
            "password": "test_password"
        })
        return response.json()["access_token"]
    
    @pytest.fixture
    def expert_admin_token(self):
        response = httpx.post(f"{API_BASE}/api/auth/login", json={
            "username": "expert_admin",
            "password": "test_password"
        })
        return response.json()["access_token"]
    
    def test_full_expert_publish_flow(self, expert_creator_token, expert_admin_token):
        """完整专家发布流程测试"""
        
        # 步骤1: 创建专家
        create_response = httpx.post(
            f"{API_BASE}/api/expert/create",
            headers={"Authorization": f"Bearer {expert_creator_token}"},
            json={
                "name": "测试专家",
                "domain": "programming",
                "keywords": ["python", "编程"]
            }
        )
        assert create_response.status_code == 200
        expert_id = create_response.json()["expert_id"]
        
        # 步骤2: 上传知识库
        with open("fixtures/test_knowledge.pdf", "rb") as f:
            upload_response = httpx.post(
                f"{API_BASE}/api/expert/{expert_id}/knowledge",
                headers={"Authorization": f"Bearer {expert_creator_token}"},
                files={"file": f}
            )
        assert upload_response.status_code == 200
        
        # 步骤3: 训练专家
        train_response = httpx.post(
            f"{API_BASE}/api/expert/{expert_id}/train",
            headers={"Authorization": f"Bearer {expert_creator_token}"}
        )
        assert train_response.status_code == 200
        task_id = train_response.json()["task_id"]
        
        # 等待训练完成
        for _ in range(60):  # 最多等待5分钟
            status_response = httpx.get(
                f"{API_BASE}/api/tasks/{task_id}",
                headers={"Authorization": f"Bearer {expert_creator_token}"}
            )
            if status_response.json()["status"] == "completed":
                break
            time.sleep(5)
        
        # 步骤4: 提交审核
        submit_response = httpx.post(
            f"{API_BASE}/api/expert/{expert_id}/submit",
            headers={"Authorization": f"Bearer {expert_creator_token}"}
        )
        assert submit_response.status_code == 200
        
        # 步骤5: 管理员审核
        review_response = httpx.post(
            f"{API_BASE}/api/expert/{expert_id}/review",
            headers={"Authorization": f"Bearer {expert_admin_token}"},
            json={
                "action": "approve",
                "comment": "质量合格"
            }
        )
        assert review_response.status_code == 200
        
        # 步骤6: 验证专家已上架
        market_response = httpx.get(
            f"{API_BASE}/api/market/experts",
            params={"expert_id": expert_id}
        )
        assert market_response.status_code == 200
        assert market_response.json()["status"] == "published"

test_student_learn_flow.py

"""学生学习完整流程测试"""
import pytest
import httpx

API_BASE = "http://localhost:8000"

class TestStudentLearnFlow:
    """学生学习完整业务流程"""
    
    @pytest.fixture
    def student_token(self):
        response = httpx.post(f"{API_BASE}/api/auth/login", json={
            "username": "test_student",
            "password": "test_password"
        })
        return response.json()["access_token"]
    
    def test_full_learning_flow(self, student_token):
        """完整学习流程测试"""
        
        # 步骤1: 获取课程列表
        courses_response = httpx.get(
            f"{API_BASE}/api/student/courses",
            headers={"Authorization": f"Bearer {student_token}"}
        )
        assert courses_response.status_code == 200
        courses = courses_response.json()["courses"]
        course_id = courses[0]["id"]
        
        # 步骤2: 开始学习课程
        start_response = httpx.post(
            f"{API_BASE}/api/student/courses/{course_id}/start",
            headers={"Authorization": f"Bearer {student_token}"}
        )
        assert start_response.status_code == 200
        lesson_id = start_response.json()["current_lesson_id"]
        
        # 步骤3: AI对话学习
        chat_response = httpx.post(
            f"{API_BASE}/api/student/learn/chat",
            headers={"Authorization": f"Bearer {student_token}"},
            json={
                "lesson_id": lesson_id,
                "message": "什么是Python变量?"
            }
        )
        assert chat_response.status_code == 200
        assert "answer" in chat_response.json()
        assert "surprise_score" in chat_response.json()
        
        # 步骤4: 完成课时
        complete_response = httpx.post(
            f"{API_BASE}/api/student/lessons/{lesson_id}/complete",
            headers={"Authorization": f"Bearer {student_token}"}
        )
        assert complete_response.status_code == 200
        xp_gained = complete_response.json()["xp_gained"]
        assert xp_gained > 0
        
        # 步骤5: 检查进度更新
        progress_response = httpx.get(
            f"{API_BASE}/api/student/progress",
            headers={"Authorization": f"Bearer {student_token}"}
        )
        assert progress_response.status_code == 200
        progress = progress_response.json()
        assert progress["total_xp"] > 0
    
    def test_mbe_five_step_analysis(self, student_token):
        """MBE五步行为分析测试"""
        
        # 发送表示困惑的消息
        response = httpx.post(
            f"{API_BASE}/api/student/learn/analyze",
            headers={"Authorization": f"Bearer {student_token}"},
            json={
                "message": "我不太理解递归函数"
            }
        )
        assert response.status_code == 200
        analysis = response.json()
        
        # 验证五步分析结果
        assert "discomfort" in analysis  # 步骤1: 不舒适检测
        assert "desire" in analysis       # 步骤2: 愿望分析
        assert "path" in analysis         # 步骤3: 路径生成
        assert "evaluation" in analysis   # 步骤4: 方案评估
        assert "action" in analysis       # 步骤5: 行动支持

7️⃣ 性能测试

test_api_performance.py

"""API性能测试"""
import pytest
import httpx
import asyncio
import time
from statistics import mean, median

API_BASE = "http://localhost:8000"

class TestAPIPerformance:
    """API性能测试"""
    
    @pytest.mark.asyncio
    async def test_expert_route_latency(self):
        """专家路由延迟测试"""
        latencies = []
        
        async with httpx.AsyncClient() as client:
            for _ in range(100):
                start = time.time()
                response = await client.post(
                    f"{API_BASE}/api/core/route",
                    json={"query": "测试问题", "user_id": "perf_test"}
                )
                latency = (time.time() - start) * 1000  # 转换为毫秒
                latencies.append(latency)
        
        avg_latency = mean(latencies)
        p95_latency = sorted(latencies)[95]
        
        print(f"平均延迟: {avg_latency:.2f}ms")
        print(f"P95延迟: {p95_latency:.2f}ms")
        
        assert avg_latency < 300  # 平均延迟<300ms
        assert p95_latency < 500  # P95延迟<500ms
    
    @pytest.mark.asyncio
    async def test_concurrent_users(self):
        """并发用户测试"""
        concurrent_users = 50
        requests_per_user = 10
        
        async def user_session(user_id):
            results = []
            async with httpx.AsyncClient() as client:
                for _ in range(requests_per_user):
                    start = time.time()
                    response = await client.post(
                        f"{API_BASE}/api/expert/ask",
                        json={
                            "expert_id": "python_tutor",
                            "query": "测试问题",
                            "user_id": f"user_{user_id}"
                        }
                    )
                    latency = time.time() - start
                    results.append({
                        "status": response.status_code,
                        "latency": latency
                    })
            return results
        
        tasks = [user_session(i) for i in range(concurrent_users)]
        all_results = await asyncio.gather(*tasks)
        
        # 统计
        all_requests = [r for user_results in all_results for r in user_results]
        success_rate = sum(1 for r in all_requests if r["status"] == 200) / len(all_requests)
        avg_latency = mean(r["latency"] for r in all_requests)
        
        print(f"成功率: {success_rate*100:.2f}%")
        print(f"平均延迟: {avg_latency*1000:.2f}ms")
        
        assert success_rate > 0.95  # 成功率>95%
        assert avg_latency < 1  # 平均延迟<1秒
    
    @pytest.mark.asyncio
    async def test_expert_response_time(self):
        """专家响应时间测试"""
        response_times = []
        
        async with httpx.AsyncClient(timeout=60) as client:
            for _ in range(20):
                start = time.time()
                response = await client.post(
                    f"{API_BASE}/api/expert/ask",
                    json={
                        "expert_id": "python_tutor",
                        "query": "请详细解释Python的面向对象编程",
                        "user_id": "perf_test"
                    }
                )
                response_time = time.time() - start
                response_times.append(response_time)
        
        avg_time = mean(response_times)
        max_time = max(response_times)
        
        print(f"平均响应时间: {avg_time:.2f}秒")
        print(f"最大响应时间: {max_time:.2f}秒")
        
        assert avg_time < 5  # 平均响应时间<5秒
        assert max_time < 10  # 最大响应时间<10秒

8️⃣ CI/CD配置

.github/workflows/test.yml

name: MBE 自动化测试

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  # 单元测试
  unit-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: |
          pip install -r requirements-test.txt
      
      - name: Run unit tests
        run: |
          pytest tests/unit/ -v --cov=src --cov-report=xml
      
      - name: Upload coverage
        uses: codecov/codecov-action@v3
        with:
          files: ./coverage.xml
  
  # 集成测试
  integration-tests:
    runs-on: ubuntu-latest
    needs: unit-tests
    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_PASSWORD: test
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
      redis:
        image: redis:7
        options: >-
          --health-cmd "redis-cli ping"
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: pip install -r requirements-test.txt
      
      - name: Run integration tests
        run: pytest tests/integration/ -v
        env:
          DATABASE_URL: postgresql://postgres:test@localhost:5432/test
          REDIS_URL: redis://localhost:6379
  
  # E2E测试
  e2e-tests:
    runs-on: ubuntu-latest
    needs: integration-tests
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Node.js
        uses: actions/setup-node@v4
        with:
          node-version: '20'
      
      - name: Install dependencies
        run: |
          cd tests
          npm install
      
      - name: Install Playwright browsers
        run: |
          cd tests
          npx playwright install --with-deps
      
      - name: Start services
        run: docker-compose -f docker-compose.test.yml up -d
      
      - name: Wait for services
        run: sleep 30
      
      - name: Run E2E tests
        run: |
          cd tests
          npx playwright test
        env:
          TEST_URL: http://localhost:8000
      
      - name: Upload test report
        uses: actions/upload-artifact@v3
        if: always()
        with:
          name: playwright-report
          path: tests/test-results/
  
  # 性能测试
  performance-tests:
    runs-on: ubuntu-latest
    needs: e2e-tests
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: pip install -r requirements-test.txt
      
      - name: Run performance tests
        run: pytest tests/performance/ -v

📊 测试覆盖率目标

测试类型 覆盖率目标 当前状态
L1 核心引擎单元测试 > 80% 待实现
L2 专家市场单元测试 > 75% 待实现
L3 应用市场单元测试 > 75% 待实现
L4 用户层单元测试 > 70% 部分实现
集成测试 > 60% 部分实现
E2E测试 核心流程100% 部分实现
权限测试 所有角色100% 待实现
业务流程测试 核心流程100% 待实现

🗓️ 实施计划

Week 1: 基础设施 ✅ 完成

  • 设置测试目录结构
  • 配置pytest和Playwright
  • 创建测试辅助工具
  • 配置CI/CD

Week 2: L1核心层测试 ✅ 完成 (163 tests)

  • MOE路由器单元测试
  • HOPE学习单元测试
  • TITANS记忆单元测试
  • 核心API测试

Week 3: L2专家市场测试 ✅ 完成 (121 tests)

  • 专家模型单元测试
  • 专家注册中心测试
  • 专家工作流测试
  • 专家API集成测试

Week 4: L3应用市场测试 ✅ 完成 (124 tests)

  • 应用模型单元测试
  • 订阅系统测试
  • 计费系统测试
  • 应用集成测试

Week 5: L4用户层测试 ✅ 完成 (108 tests)

  • 用户模型测试
  • 认证流程测试
  • 权限系统测试
  • 端到端流程测试

Week 6: 权限和流程测试 ✅ 完成 (89 tests)

  • 跨层权限测试
  • 权限边界测试
  • 四层业务流程测试
  • 异常处理测试

Week 7: 性能测试 ✅ 完成 (60 tests)

  • API性能基准测试
  • 并发负载测试
  • 内存和响应时间监控
  • 四层架构性能测试

Week 8: 优化和文档 ✅ 完成

  • 测试优化工具
  • 更新测试文档
  • 完善CI/CD配置
  • 创建测试报告

📝 文档信息

创建时间: 2026-02-02
文档类型: 测试计划
版本: v1.0
状态: 已完成


🔗 相关文档