Merge commit '47296980495f8bbfc9493e93de85dd62de6fa6b9' as 'paste-framework'

This commit is contained in:
zwf
2026-06-02 19:09:22 +08:00
107 changed files with 21484 additions and 0 deletions
+55
View File
@@ -0,0 +1,55 @@
# 测试指南
## 测试目录结构
```
tests/
├── conftest.py # 全局 fixtures 和配置
├── unit/ # 单元测试(无外部依赖,可离线运行)
│ ├── test_paste.py # 基础导入和版本
│ ├── test_snow_id.py # 雪花ID生成器
│ ├── test_jwt.py # JWT 令牌(mock
│ ├── test_configure.py # 配置管理(mock
│ ├── test_pagination.py # 分页逻辑
│ ├── test_ustr.py # 字符串工具
│ └── test_udict.py # 字典工具
├── integration/ # 集成测试(需要真实服务)
│ └── test_db.py # 数据库连接测试
└── README.md # 本文件
```
## 运行方式
### 运行所有单元测试(推荐日常使用)
```bash
cd <项目根目录>
pytest tests/unit/ -v
```
### 运行所有测试(含集成测试)
```bash
pytest tests/ -v
```
### 仅运行集成测试
```bash
pytest tests/integration/ -v
```
### 生成覆盖率报告
```bash
pytest tests/unit/ --cov=paste --cov-report=html
open htmlcov/index.html
```
## 编写规范
1. **单元测试**放在 `tests/unit/`,无外部依赖
2. **集成测试**放在 `tests/integration/`,加 `@pytest.mark.integration`
3. 每个测试类以 `Test` 开头,测试方法以 `test_` 开头
4. 使用断言而非 print 验证结果
5. 不依赖外部配置文件或数据库连接
View File
+68
View File
@@ -0,0 +1,68 @@
"""
pytest 全局配置和 fixtures。
单元测试使用 mock,集成测试需要真实服务。
"""
import json
import tempfile
from pathlib import Path
from typing import Dict, Any
import pytest
@pytest.fixture(scope="session")
def mock_config_dict() -> Dict[str, Any]:
"""
模拟配置数据,用于不依赖真实 config.json 的测试。
"""
return {
"db_engine": {
"engine": "sqlite+pysqlite:///:memory:",
"async_engine": "sqlite+aiosqlite:///:memory:",
"engine_option": {"echo": False},
},
"redis": {
"connection": {
"url": "redis://localhost:6379/15",
},
},
"rbac": {
"table": {
"rule": "rbac_rule",
"user": "rbac_user",
"item": "rbac_item",
"assignment": "rbac_assignment",
"item_child": "rbac_item_child",
},
"user_class": "paste.rbac.rbac_user.RbacUser",
},
"logger": {
"default": {
"basic": {
"level": 40,
},
},
},
"tornado": {
"demo": {
"port": 9000,
},
},
}
@pytest.fixture
def temp_config_file(mock_config_dict):
"""
创建临时配置文件,测试后自动清理。
"""
with tempfile.NamedTemporaryFile(
mode="w", suffix=".json", delete=False, encoding="utf-8"
) as f:
json.dump(mock_config_dict, f, ensure_ascii=False)
temp_path = Path(f.name)
yield temp_path
# 清理
temp_path.unlink(missing_ok=True)
@@ -0,0 +1,39 @@
"""
数据库连接集成测试。
需要真实数据库连接,默认被跳过。
运行方式:pytest tests/integration/ -v
"""
import pytest
from paste.db.basetable import BaseTable
from paste.db.baseadapter import BaseAdapter
@pytest.mark.integration
class TestDbConnection:
"""数据库连接基础测试"""
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_ping_database(self):
"""测试数据库连通性"""
result = BaseAdapter.ping()
assert result is True
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_tables_in_db(self):
"""测试获取表列表"""
tables = await BaseTable.tables_in_db()
assert isinstance(tables, list)
# 验证返回的表名称都是字符串
for table in tables:
assert isinstance(table, str)
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_is_table_exist(self):
"""测试表存在性判断"""
# 测试存在的表(information_schema.tables 必然存在)
exists = await BaseTable.is_table_exist('information_schema')
assert isinstance(exists, bool)
@@ -0,0 +1,28 @@
"""
数据库集成测试。
需要真实数据库连接,默认跳过。
通过 `--run-integration` 参数运行。
"""
import pytest
from paste.db.basetable import BaseTable
@pytest.mark.integration
class TestDatabaseIntegration:
"""数据库集成测试"""
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_tables_in_db(self):
"""测试获取数据库表列表"""
tables = await BaseTable.tables_in_db()
assert isinstance(tables, list)
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_is_table_exist(self):
"""测试表存在性判断"""
exists = await BaseTable.is_table_exist('hat_article')
assert isinstance(exists, bool)
@@ -0,0 +1,113 @@
"""
RBAC 模型集成测试。
需要真实数据库连接,默认被跳过。
运行方式:pytest tests/integration/ -v
"""
import pandas as pd
import pytest
from paste.rbac.rbac_user import RbacUser, RbacItem, RbacPermission, RbacAssignment
@pytest.mark.integration
class TestRbacUser:
"""RBAC 用户模型集成测试"""
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_user_query_generates_sql(self):
"""验证用户查询能生成 SQL"""
query = RbacUser().gen_query()
sql = RbacUser.raw_sql(query)
assert sql is not None
assert 'SELECT' in str(sql)
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_user_query_all(self):
"""测试查询所有用户"""
query = RbacUser().gen_query()
users = await RbacUser.query_all(query)
assert isinstance(users, list)
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_user_query_as_dataframe(self):
"""测试用户查询返回 DataFrame"""
query = RbacUser().gen_query()
df = await RbacUser.query_as_df(query)
assert isinstance(df, pd.DataFrame)
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_find_by_username(self):
"""测试根据用户名查询"""
user = await RbacUser.find_by_username('test')
# 如果用户不存在,返回 None
if user is not None:
assert isinstance(user, RbacUser)
assert user.username == 'test'
@pytest.mark.integration
class TestRbacItem:
"""RBAC 授权项模型集成测试"""
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_item_query_all(self):
"""测试查询所有授权项"""
query = RbacItem().gen_query()
items = await RbacItem.query_all(query)
assert isinstance(items, list)
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_item_query_as_dataframe(self):
"""测试授权项查询返回 DataFrame"""
query = RbacItem().gen_query()
df = await RbacItem.query_as_df(query)
assert isinstance(df, pd.DataFrame)
@pytest.mark.integration
class TestRbacPermission:
"""RBAC 权限模型集成测试"""
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_permission_query_all(self):
"""测试查询所有权限"""
query = RbacPermission().gen_query()
permissions = await RbacPermission.query_all(query)
assert isinstance(permissions, list)
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_permission_query_as_dataframe(self):
"""测试权限查询返回 DataFrame"""
query = RbacPermission().gen_query()
df = await RbacPermission.query_as_df(query)
assert isinstance(df, pd.DataFrame)
@pytest.mark.integration
class TestRbacAssignment:
"""RBAC 分配关系集成测试"""
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_assignment_query_all(self):
"""测试查询所有分配关系"""
query = RbacAssignment().gen_query()
assignments = await RbacAssignment.query_all(query)
assert isinstance(assignments, list)
@pytest.mark.skip(reason="需要真实数据库连接")
@pytest.mark.asyncio
async def test_assignment_query_as_dataframe(self):
"""测试分配关系查询返回 DataFrame"""
query = RbacAssignment().gen_query()
df = await RbacAssignment.query_as_df(query)
assert isinstance(df, pd.DataFrame)
@@ -0,0 +1,45 @@
"""
Redis 集成测试。
需要真实 Redis 服务,默认被跳过。
运行方式:pytest tests/integration/ -v
"""
import pytest
from paste.db.redis import Redis
@pytest.mark.integration
class TestRedisConnection:
"""Redis 集成测试"""
@pytest.mark.skip(reason="需要真实 Redis 服务")
@pytest.mark.asyncio
async def test_redis_ping(self):
"""测试 Redis 连通性"""
result = await Redis.ping()
assert result is True
@pytest.mark.skip(reason="需要真实 Redis 服务")
@pytest.mark.asyncio
async def test_redis_get_set(self):
"""测试 Redis 基本读写"""
from paste.db.redis import Redis
async with await Redis.get_redis() as r:
# 写入测试
await r.set("test_key", "test_value")
# 读取验证
value = await r.get("test_key")
assert value == b"test_value"
# 清理
await r.delete("test_key")
@pytest.mark.skip(reason="需要真实 Redis 服务")
@pytest.mark.asyncio
async def test_redis_get_keys(self):
"""测试获取所有 keys"""
from paste.db.redis import Redis
keys = await Redis.keys()
assert isinstance(keys, list)
+60
View File
@@ -0,0 +1,60 @@
"""
测试配置读取功能。
使用 mock 配置文件,不依赖真实 config.json。
"""
from unittest.mock import patch
import pytest
from paste.core import config
class TestConfiguration:
"""配置管理测试"""
def test_load_config_with_mock_file(self, temp_config_file):
"""使用临时配置文件测试加载"""
with patch.object(config, 'load_config', wraps=config.load_config):
# 模拟配置文件路径
with patch('paste.core.config.os.path.abspath') as mock_abspath:
mock_abspath.return_value = str(temp_config_file)
cfg = config.load_config()
assert isinstance(cfg, dict)
assert 'db_engine' in cfg
def test_get_config_by_path_existing(self, mock_config_dict):
"""测试读取存在的配置项"""
with patch('paste.core.config.GLOBAL_CONFIG', mock_config_dict):
result = config.get_config_by_path('db_engine.engine')
assert result == "sqlite+pysqlite:///:memory:"
def test_get_config_by_path_nested(self, mock_config_dict):
"""测试读取深层嵌套配置"""
with patch('paste.core.config.GLOBAL_CONFIG', mock_config_dict):
result = config.get_config_by_path('redis.connection.url')
assert result == "redis://localhost:6379/15"
def test_get_config_by_path_with_default(self, mock_config_dict):
"""测试带默认值的配置读取"""
with patch('paste.core.config.GLOBAL_CONFIG', mock_config_dict):
result = config.get_config_by_path('nonexistent.key', default='fallback')
assert result == 'fallback'
def test_get_config_by_path_missing_without_default(self, mock_config_dict):
"""测试缺失配置项且无默认值时抛出异常"""
with patch('paste.core.config.GLOBAL_CONFIG', mock_config_dict):
with pytest.raises(AssertionError):
config.get_config_by_path('nonexistent.key')
def test_get_config_shortcut(self, mock_config_dict):
"""测试 get_config 快捷方法"""
with patch('paste.core.config.GLOBAL_CONFIG', mock_config_dict):
result = config.get_config('tornado.demo.port')
assert result == 9000
def test_get_config_default_none(self, mock_config_dict):
"""测试 get_config 默认值 None 的情况"""
with patch('paste.core.config.GLOBAL_CONFIG', mock_config_dict):
with pytest.raises(AssertionError):
config.get_config('completely.nonexistent')
+75
View File
@@ -0,0 +1,75 @@
"""
测试 JWT 令牌编解码功能。
使用 mock 配置,不依赖真实密钥文件。
"""
import time
import pytest
from paste.security.token import encode_token, decode_token
class TestJwtToken:
"""JWT 令牌测试"""
def test_encode_decode_basic(self):
"""基础编解码测试"""
payload = {
'user_id': 123,
'username': 'test_user',
'role': 'admin',
}
token = encode_token(**payload)
assert token is not None
assert isinstance(token, str)
assert len(token) > 0
decoded = decode_token(token)
assert decoded is not None
assert decoded.get('params', {}).get('user_id') == 123
def test_token_contains_expected_fields(self):
"""验证 token 包含必要字段"""
payload = {'user_id': 456, 'username': 'demo'}
token = encode_token(**payload)
decoded = decode_token(token)
# 标准 JWT 字段
assert 'iss' in decoded, "Token should have issuer"
assert 'iat' in decoded, "Token should have issued-at time"
assert 'exp' in decoded, "Token should have expiration time"
# 自定义字段
params = decoded.get('params', {})
assert params.get('user_id') == 456
assert params.get('username') == 'demo'
def test_token_expiration(self):
"""验证 token 过期机制"""
payload = {
'user_id': 789,
'username': 'expired_user',
'exp': int(time.time()) - 3600, # 1小时前过期
}
token = encode_token(**payload)
with pytest.raises(Exception):
decode_token(token)
def test_token_tampering(self):
"""验证 token 防篡改"""
payload = {'user_id': 999, 'username': 'hacker'}
token = encode_token(**payload)
# 篡改 token
tampered_token = token[:-5] + 'XXXXX'
with pytest.raises(Exception):
decode_token(tampered_token)
def test_empty_payload(self):
"""空 payload 处理"""
token = encode_token()
decoded = decode_token(token)
assert decoded is not None
@@ -0,0 +1,119 @@
"""
测试分页逻辑。
无外部依赖,可离线运行。
"""
from paste.util.pagination import Pagination
class TestPagination:
"""分页功能测试"""
def test_pages_calculation_exact(self):
"""精确整除的分页计算"""
p = Pagination(row_count=100)
pages = p.pages(page_size=20)
assert pages == 5
def test_pages_calculation_remainder(self):
"""有余数的分页计算"""
p = Pagination(row_count=101)
pages = p.pages(page_size=20)
assert pages == 6
def test_pages_calculation_zero_rows(self):
"""零行数据的处理"""
p = Pagination(row_count=0)
pages = p.pages(page_size=20)
assert pages == 1
def test_page_number_valid(self):
"""页码有效性检测"""
p = Pagination(row_count=50)
p.pages(page_size=20)
assert p.number(1) == 1
assert p.number(3) == 3 # 超出范围应返回最大页
def test_page_number_negative(self):
"""负页码处理"""
p = Pagination(row_count=50)
p.pages(page_size=20)
assert p.number(-1) == 1
def test_offset_calculation(self):
"""偏移量计算"""
p = Pagination(row_count=100)
p.pages(page_size=20)
assert p.offset(1) == 0
assert p.offset(2) == 20
assert p.offset(3) == 40
def test_paging_chain(self):
"""链式调用分页"""
p = Pagination(row_count=123).paging(page_number=2, page_size=20)
assert p.page_count == 7
assert p.page_number == 2
assert p.offset_size == 20
def test_page_size_bounds(self):
"""页大小边界: 最小1,最大1000"""
p = Pagination(row_count=2000)
assert p.pages(page_size=0) > 0
assert p.pages(page_size=2000) <= 1000
def test_large_dataset(self):
"""大数据集分页"""
p = Pagination(row_count=1000000)
pages = p.pages(page_size=100)
assert pages == 10000
def test_single_row(self):
"""单行数据"""
p = Pagination(row_count=1)
assert p.pages(page_size=10) == 1
p.paging(page_number=1, page_size=10)
assert p.page_number == 1
assert p.offset_size == 0
def test_page_number_upper_bound(self):
"""页码上限处理"""
p = Pagination(row_count=30).paging(page_number=100, page_size=10)
assert p.page_number == 3 # 最大只能到3页
# ============================================================
# 以下是从 test_db.py 迁移过来的分页测试
# 原函数 test_pagination() 改为标准的 pytest 测试
# ============================================================
class TestPaginationFromDb:
"""从 test_db.py 迁移的分页测试"""
def test_pagination_basic(self):
"""基础分页计算"""
from paste.util.pagination import Pagination
p = Pagination(row_count=123).paging(page_number=2, page_size=20)
assert p.page_count == 7
assert p.page_number == 2
assert p.offset_size == 20
def test_pagination_first_page(self):
"""第一页"""
from paste.util.pagination import Pagination
p = Pagination(row_count=50).paging(page_number=1, page_size=10)
assert p.page_number == 1
assert p.offset_size == 0
def test_pagination_last_page(self):
"""最后一页"""
from paste.util.pagination import Pagination
p = Pagination(row_count=55).paging(page_number=6, page_size=10)
assert p.page_number == 6
assert p.offset_size == 50
def test_pagination_out_of_range(self):
"""超出范围时自动修正"""
from paste.util.pagination import Pagination
p = Pagination(row_count=30).paging(page_number=100, page_size=10)
assert p.page_number == 3 # 只有3页,自动修正
+26
View File
@@ -0,0 +1,26 @@
"""
测试 paste 包的基本导入和版本信息。
无外部依赖,可离线运行。
"""
import paste
class TestPasteImport:
"""测试 paste 包基础功能"""
def test_paste_imports(self):
"""确保 paste 包能正确导入"""
assert paste is not None
def test_paste_version(self):
"""检查 paste 包是否有 __version__"""
assert hasattr(paste, "__version__"), "paste package should have __version__"
assert isinstance(paste.__version__, str), "__version__ should be a string"
def test_paste_version_value(self):
"""验证版本号格式符合语义化版本规范"""
import re
version = paste.__version__
assert re.match(r'^\d+\.\d+\.\d+', version), \
f"Version {version} should follow semver format"
@@ -0,0 +1,46 @@
"""
测试雪花 ID 生成器。
无外部依赖,可离线运行。
"""
from paste.util.snow_id import IdWorker
class TestSnowflakeId:
"""雪花 ID 生成器测试"""
def test_snow_id_generates_string(self):
"""测试 Snowflake ID 是否生成字符串"""
id_worker = IdWorker.get_id_worker()
sid = f'{id_worker.get_id()}'
assert isinstance(sid, str), "雪花 ID 必须是字符串"
assert len(sid) > 0, "雪花 ID 必须包含内容"
def test_snow_id_is_unique(self):
"""测试生成的 ID 是否唯一(简单验证)"""
id_worker = IdWorker.get_id_worker()
ids = [id_worker.get_id() for _ in range(50)]
assert len(set(ids)) == len(ids), "All generated IDs should be unique"
def test_snow_id_monotonic_increase(self):
"""测试雪花 ID 单调递增"""
id_worker = IdWorker.get_id_worker()
ids = [id_worker.get_id() for _ in range(100)]
for i in range(1, len(ids)):
assert ids[i] > ids[i - 1], \
f"ID at position {i} should be greater than previous"
def test_snow_id_worker_isolation(self):
"""测试不同 worker 生成的 ID 不冲突"""
worker1 = IdWorker.get_id_worker(datacenter_id=1, worker_id=1)
worker2 = IdWorker.get_id_worker(datacenter_id=2, worker_id=2)
ids = [worker1.get_id() for _ in range(50)] + \
[worker2.get_id() for _ in range(50)]
assert len(set(ids)) == len(ids), \
"IDs from different workers should be unique"
def test_snow_id_high_throughput(self):
"""测试短时间高并发生成"""
id_worker = IdWorker.get_id_worker()
ids = [id_worker.get_id() for _ in range(1000)]
assert len(set(ids)) == 1000
+45
View File
@@ -0,0 +1,45 @@
"""
测试字典工具。
无外部依赖,可离线运行。
"""
from paste.util import udict
class TestUdict:
"""字典工具测试"""
def test_get_by_path_simple(self):
"""简单路径读取"""
data = {"a": 1, "b": 2}
assert udict.get_by_path(data, "a") == 1
def test_get_by_path_nested(self):
"""嵌套路径读取"""
data = {"a": {"b": {"c": 123}}}
assert udict.get_by_path(data, "a.b.c") == 123
def test_get_by_path_missing(self):
"""缺失路径处理"""
data = {"a": 1}
assert udict.get_by_path(data, "b.c.d", "default") == "default"
def test_get_by_path_none_default(self):
"""缺失路径无默认值"""
data = {"a": 1}
assert udict.get_by_path(data, "b") is None
def test_get_with_default_existing(self):
"""存在的键读取"""
data = {"key": "value"}
assert udict.get_with_default(data, "key", "fallback") == "value"
def test_get_with_default_missing(self):
"""缺失键使用默认值"""
data = {"key": "value"}
assert udict.get_with_default(data, "missing", "fallback") == "fallback"
def test_get_with_default_none_value(self):
"""值为 None 时使用默认值"""
data = {"key": None}
assert udict.get_with_default(data, "key", "fallback") == "fallback"
+44
View File
@@ -0,0 +1,44 @@
"""
测试字符串工具。
无外部依赖,可离线运行。
"""
import datetime
from paste.util import ustr
class TestUstr:
"""字符串工具测试"""
def test_str_q_count_all_cn(self):
"""全中文统计"""
assert ustr.str_q_count("中国汉字") == 4
def test_str_q_count_mixed(self):
"""中英文混合统计"""
count = ustr.str_q_count("Hello中国")
assert count == 2 # 只有中文字符算
def test_str_q_count_empty(self):
"""空字符串统计"""
assert ustr.str_q_count("") == 0
def test_str_q_count_no_cn(self):
"""纯英文统计"""
assert ustr.str_q_count("HelloWorld") == 0
def test_to_datetime_standard(self):
"""标准格式解析"""
result = ustr.to_datetime("2024-01-15 10:30:00", ["%Y-%m-%d %H:%M:%S"])
assert result is not None
assert isinstance(result, datetime.datetime)
def test_to_datetime_invalid(self):
"""无效格式解析"""
result = ustr.to_datetime("not-a-date", ["%Y-%m-%d"])
assert result is None
def test_to_datetime_empty(self):
"""空字符串解析"""
result = ustr.to_datetime("", ["%Y-%m-%d"])
assert result is None