Squashed 'paste-framework/' content from commit 34e8684

git-subtree-dir: paste-framework
git-subtree-split: 34e8684c4bc3cebbe177509f42ab4ef5b5425a7a
This commit is contained in:
zwf
2026-06-02 19:09:22 +08:00
commit 4729698049
107 changed files with 21484 additions and 0 deletions
+293
View File
@@ -0,0 +1,293 @@
import base64
import datetime
import io
import os
import re
import unicodedata
from typing import Optional, IO, Union
import cv2
import numpy as np
from PIL import Image
file_types = {
'jpeg': (b'\xFF\xD8\xFF', b'\xff\xd8\xff'),
'png': (b'\x89PNG',),
'gif': (b'GIF8',),
'bmp': (b'BM',),
'tiff': (b'II*\x00', b'MM\x00*'),
'webp': (b'RIFF\x00\x00\x00\x00WEBP',),
'ico': (b'\x00\x00\x01\x00',),
'psd': (b'8BPS',),
'svg': (b'<svg', b'<?xml'),
'mp3': (b'\x49\x44\x33', b'\xFF\xFB\x50'),
'm4a': (b'\x00\x00\x00\x20\x66\x74\x79\x70\x4D\x34\x41\x20\x00\x00\x00\x00',),
'wav': (b'RIFF\x00\x00\x00\x00WAVE',),
'flac': (b'fLaC',),
'aac': (b'\xFF\xF1', b'\xFF\xF9'),
'aiff': (b'FORM\x00\x00\x00\x00AIFF',),
'au': (b'.snd',),
'mov': (b'\x00\x00\x0F', b'\x00\x00\x77', b'\x6D\x6F\x6F\x76', b'\x6d\x64\x61\x74'),
'mp4': (b'\x00\x00\x00\x14', b'\x00\x00\x00\x18', b'\x00\x00\x00\x1C', b'\x00\x00\x00\x20'),
'mpg': (b'\x00\x00\x01\xB3', b'\x00\x00\x01\xBA'),
'avi': (b'RIFF\x00\x00\x00\x00AVI',),
'mkv': (b'\x1A\x45\xDF\xA3',),
'wmv': (b'\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C',),
'flv': (b'FLV\x01',),
'3gp': (b'\x00\x00\x00\x18\x66\x74\x79\x70\x33\x67\x70',),
'pdf': (b'\x25PDF',),
('doc', 'ppt', 'xls'): (b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1',),
('docx', 'pptx', 'xlsx'): (b'PK\x03\x04\n\x00\x00\x00\x00\x00\x87N',),
'rtf': (b'{\\rtf',),
'txt': (b'\xEF\xBB\xBF', b'\xFE\xFF', b'\xFF\xFE'),
'zip': (b'PK\x03\x04',),
'rar': (b'Rar!\x1A\x07\x00',),
'tar': (b'\x75\x73\x74\x61\x72\x00\x00\x00',),
'gz': (b'\x1F\x8B',),
'7z': (b'7z\xBC\xAF\x27\x1C',),
}
"""
可检测的文件格式配置。
"""
def inspect_type(file_data: Optional[bytes] = None, file_io: Optional[IO] = None):
"""
检测文件类型。
:param file_io: 文件输入输出对象
:param file_data: 文件数据内容
:return: 检测结果
"""
assert file_data is not None or file_io is not None, '至少传入文件内容或文件输入输出对象之一.'
if file_data is None and file_io is not None:
# 若未传入数据内容,仅传入文件对象,则从文件对象读取前100字节数据
file_data = file_io.read(1024*2)
if len(file_data) > 1024*2:
file_data = file_data[:1024*2]
file_type: Optional[Union[str, tuple[str]]] = ''
for _key, _val in file_types.items():
for _bs in _val:
if file_data.startswith(_bs):
file_type = _key
break
if file_type:
break
if isinstance(file_type, tuple):
if file_type[0] == 'doc':
# 使用读取到的全部数据(≤4KB)进行启发式判断
file_type = _heuristic_office_type(file_data)
elif file_type[0] == 'docx':
# 使用读取到的全部数据(≤4KB)进行启发式判断
file_type = _heuristic_office_x_type(file_data)
return file_type
def _heuristic_office_type(data: bytes) -> str:
"""
仅基于前 4KB 数据,启发式判断是 .doc、.xls 还是 .ppt
依据:各格式在 OLE 结构中的典型字符串偏移位置
"""
# 关键词及其对应类型
patterns = [
(b'W\x00o\x00r\x00d\x00D\x00o\x00c\x00u\x00m\x00e\x00n\x00t', 'doc'),
(b'W\x00o\x00r\x00d', 'doc'),
(b'WordDocument', 'doc'),
(b'Word', 'doc'),
(b'W\x00o\x00r\x00k\x00b\x00o\x00o\x00k', 'xls'),
(b'B\x00o\x00o\x00k', 'xls'),
(b'Workbook', 'xls'),
(b'Book', 'xls'),
(b'P\x00o\x00w\x00e\x00r\x00P\x00o\x00i\x00n\x00t', 'ppt'),
(b'PowerPoint', 'ppt'),
]
# 一次性遍历:在 data 中查找任一关键词
# 由于模式短,且数据小(≤4KB),用简单循环即可
for keyword, file_type in patterns:
if keyword in data:
return file_type
# 未匹配时,保守返回 ""
return ""
def _heuristic_office_x_type(data: bytes) -> str:
"""
仅用 `in` 判断 .docx/.xlsx/.pptx,精准匹配 Open XML 标准 MIME 类型
不解压、不解析、不猜,就看有没有那三个关键字符串
"""
# 关键词及其对应类型
patterns = [
(b'word/PK', 'docx'),
(b'xl/PK', 'xlsx'),
(b'ppt/PK', 'pptx'),
]
# 一次性遍历:在 data 中查找任一关键词
# 由于模式短,且数据小(≤4KB),用简单循环即可
for keyword, file_type in patterns:
if keyword in data:
return file_type
# 未匹配时,保守返回 ""
return ""
def get_file_info(file_path):
"""
取得文件信息,包括:文件大小、创建时间。
:param file_path: 文件绝对路径
:return: 大小,创建时间
"""
_ctime = datetime.datetime.fromtimestamp(os.path.getctime(file_path))
_ctime = _ctime.strftime('%Y-%m-%d %H:%M:%S')
_f_size = os.path.getsize(file_path)
# 将字节转换为 KB
_size_kb = _f_size / 1024
if _size_kb < 1024:
return f"{_size_kb:.2f} KB", _ctime
# 将 KB 转换为 MB
_size_mb = _size_kb / 1024
if _size_mb < 1024:
return f"{_size_mb:.2f} MB", _ctime
# 将 MB 转换为 GB
_size_gb = _size_mb / 1024
return f"{_size_gb:.2f} GB", _ctime
def read_to_buffer(file) -> bytes:
"""
以二进制只读方式从文件载入数据到字节流。
"""
assert os.path.isfile(file), 'File not found: %s' % file
with open(file, 'rb') as f:
buf = f.read(os.path.getsize(file))
f.close()
return buf
def sanitize_filename(filename: str) -> str:
"""
统一严格过滤文件名中的非法字符(跨 Windows/Linux/macOS 安全)。
规则:
1. 过滤所有系统禁止的字符(包括控制字符 \x00-\x1f
2. 处理 Windows 保留名称(如 CON、NUL 等)
3. 替换空格和 # 为下划线
4. 禁止以空格或点开头/结尾
5. 限制文件名长度(255 字符)
:param filename: 文件名
:return: 替换非法字符为 _ 的安全文件名
"""
# 1. Unicode 规范化(防止混淆攻击)
filename = unicodedata.normalize("NFKC", filename)
# 2. 替换所有非法字符为下划线(包括空格和 #)
# 包括:\ / : * ? " < > | \x00-\x20(控制字符和空格)#
safe_name = re.sub(r'[\\/:*?"<>|\x00-\x20#]', '_', filename)
# 3. 处理 Windows 保留名称(如 CON.txt -> _CON.txt
win_reserved = [
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4",
"LPT1", "LPT2", "LPT3", "CLOCK$"
]
if safe_name.upper().split(".")[0] in win_reserved:
safe_name = f"_{safe_name}"
# 4. 移除首尾空格和点(避免隐形问题)
safe_name = safe_name.strip(". ")
# 5. 确保文件名非空(如果输入全是非法字符)
if not safe_name:
safe_name = "unnamed_file"
# 6. 限制长度(Windows 最大 255 字符)
return safe_name[:255]
def check_and_create_dir(file_path, mode=0o777, exist_ok=False):
"""
根据传入的文件路径检查目录是否存在,若不存在,则创建。
:param file_path: 文件路径
:param mode: 目录权限,默认最高权限
:param exist_ok: 目录存在时,是否抛出异常,默认不抛出
"""
# 获取文件所在目录
_directory = os.path.dirname(file_path)
if not os.path.exists(_directory):
# 如果目录不存在,则创建目录
os.makedirs(_directory, mode, exist_ok)
def load_image_from_base64(base64_str):
"""
将 Base64 字符串转换为 face_recognition 可用的 numpy 数组。
:param base64_str: 经过 Base64 编码的图像数据
:return: 图像数据(numpy数组)
"""
try:
# 移除 Base64 头部(如果存在)
if "," in base64_str:
base64_str = base64_str.split(",")[1]
# 解码为二进制
image_data = base64.b64decode(base64_str)
# 验证图像完整性
Image.open(io.BytesIO(image_data)).verify()
# 转换为 RGB numpy 数组
image = Image.open(io.BytesIO(image_data))
if image.mode != "RGB":
image = image.convert("RGB")
return np.array(image)
except Exception as e:
raise ValueError(f"无效的 Base64 图像数据: {e}")
def load_png_from_base64(base64_str):
"""
从Base64字符串读取PNG图像并保留Alpha通道。
:param base64_str: 经过 Base64 编码的 PNG 图像数据
:return: 图像数据(numpy数组),包含BGRA四个通道
"""
try:
# 1. 解码Base64字符串
img_data = base64.b64decode(base64_str)
# 2. 将字节数据转换为numpy数组
np_array = np.frombuffer(img_data, np.uint8)
# 3. 使用IMREAD_UNCHANGED标志解码图像以保留Alpha通道
img = cv2.imdecode(np_array, cv2.IMREAD_UNCHANGED)
# 4. 检查是否成功读取
if img is None:
raise ValueError("无法解码图像数据")
# 5. 检查是否有Alpha通道
if img.shape[2] != 4:
print("警告: 图像没有Alpha通道,将添加全不透明Alpha通道")
# 将BGR转换为BGRA,添加全不透明Alpha通道
img = cv2.cvtColor(img, cv2.COLOR_BGR2BGRA)
return img
except Exception as e:
print(f"读取Base64图像时出错: {str(e)}")
return None