Files
paste-framework/paste/util/ufile.py
T
2026-06-02 16:26:10 +08:00

293 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import datetime
import io
import os
import re
import unicodedata
from typing import Optional, IO, Union
import cv2
import numpy as np
from PIL import Image
file_types = {
'jpeg': (b'\xFF\xD8\xFF', b'\xff\xd8\xff'),
'png': (b'\x89PNG',),
'gif': (b'GIF8',),
'bmp': (b'BM',),
'tiff': (b'II*\x00', b'MM\x00*'),
'webp': (b'RIFF\x00\x00\x00\x00WEBP',),
'ico': (b'\x00\x00\x01\x00',),
'psd': (b'8BPS',),
'svg': (b'<svg', b'<?xml'),
'mp3': (b'\x49\x44\x33', b'\xFF\xFB\x50'),
'm4a': (b'\x00\x00\x00\x20\x66\x74\x79\x70\x4D\x34\x41\x20\x00\x00\x00\x00',),
'wav': (b'RIFF\x00\x00\x00\x00WAVE',),
'flac': (b'fLaC',),
'aac': (b'\xFF\xF1', b'\xFF\xF9'),
'aiff': (b'FORM\x00\x00\x00\x00AIFF',),
'au': (b'.snd',),
'mov': (b'\x00\x00\x0F', b'\x00\x00\x77', b'\x6D\x6F\x6F\x76', b'\x6d\x64\x61\x74'),
'mp4': (b'\x00\x00\x00\x14', b'\x00\x00\x00\x18', b'\x00\x00\x00\x1C', b'\x00\x00\x00\x20'),
'mpg': (b'\x00\x00\x01\xB3', b'\x00\x00\x01\xBA'),
'avi': (b'RIFF\x00\x00\x00\x00AVI',),
'mkv': (b'\x1A\x45\xDF\xA3',),
'wmv': (b'\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C',),
'flv': (b'FLV\x01',),
'3gp': (b'\x00\x00\x00\x18\x66\x74\x79\x70\x33\x67\x70',),
'pdf': (b'\x25PDF',),
('doc', 'ppt', 'xls'): (b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1',),
('docx', 'pptx', 'xlsx'): (b'PK\x03\x04\n\x00\x00\x00\x00\x00\x87N',),
'rtf': (b'{\\rtf',),
'txt': (b'\xEF\xBB\xBF', b'\xFE\xFF', b'\xFF\xFE'),
'zip': (b'PK\x03\x04',),
'rar': (b'Rar!\x1A\x07\x00',),
'tar': (b'\x75\x73\x74\x61\x72\x00\x00\x00',),
'gz': (b'\x1F\x8B',),
'7z': (b'7z\xBC\xAF\x27\x1C',),
}
"""
可检测的文件格式配置。
"""
def inspect_type(file_data: Optional[bytes] = None, file_io: Optional[IO] = None):
"""
检测文件类型。
:param file_io: 文件输入输出对象
:param file_data: 文件数据内容
:return: 检测结果
"""
assert file_data is not None or file_io is not None, '至少传入文件内容或文件输入输出对象之一.'
if file_data is None and file_io is not None:
# 若未传入数据内容,仅传入文件对象,则从文件对象读取前100字节数据
file_data = file_io.read(1024*2)
if len(file_data) > 1024*2:
file_data = file_data[:1024*2]
file_type: Optional[Union[str, tuple[str]]] = ''
for _key, _val in file_types.items():
for _bs in _val:
if file_data.startswith(_bs):
file_type = _key
break
if file_type:
break
if isinstance(file_type, tuple):
if file_type[0] == 'doc':
# 使用读取到的全部数据(≤4KB)进行启发式判断
file_type = _heuristic_office_type(file_data)
elif file_type[0] == 'docx':
# 使用读取到的全部数据(≤4KB)进行启发式判断
file_type = _heuristic_office_x_type(file_data)
return file_type
def _heuristic_office_type(data: bytes) -> str:
"""
仅基于前 4KB 数据,启发式判断是 .doc、.xls 还是 .ppt
依据:各格式在 OLE 结构中的典型字符串偏移位置
"""
# 关键词及其对应类型
patterns = [
(b'W\x00o\x00r\x00d\x00D\x00o\x00c\x00u\x00m\x00e\x00n\x00t', 'doc'),
(b'W\x00o\x00r\x00d', 'doc'),
(b'WordDocument', 'doc'),
(b'Word', 'doc'),
(b'W\x00o\x00r\x00k\x00b\x00o\x00o\x00k', 'xls'),
(b'B\x00o\x00o\x00k', 'xls'),
(b'Workbook', 'xls'),
(b'Book', 'xls'),
(b'P\x00o\x00w\x00e\x00r\x00P\x00o\x00i\x00n\x00t', 'ppt'),
(b'PowerPoint', 'ppt'),
]
# 一次性遍历:在 data 中查找任一关键词
# 由于模式短,且数据小(≤4KB),用简单循环即可
for keyword, file_type in patterns:
if keyword in data:
return file_type
# 未匹配时,保守返回 ""
return ""
def _heuristic_office_x_type(data: bytes) -> str:
"""
仅用 `in` 判断 .docx/.xlsx/.pptx,精准匹配 Open XML 标准 MIME 类型
不解压、不解析、不猜,就看有没有那三个关键字符串
"""
# 关键词及其对应类型
patterns = [
(b'word/PK', 'docx'),
(b'xl/PK', 'xlsx'),
(b'ppt/PK', 'pptx'),
]
# 一次性遍历:在 data 中查找任一关键词
# 由于模式短,且数据小(≤4KB),用简单循环即可
for keyword, file_type in patterns:
if keyword in data:
return file_type
# 未匹配时,保守返回 ""
return ""
def get_file_info(file_path):
"""
取得文件信息,包括:文件大小、创建时间。
:param file_path: 文件绝对路径
:return: 大小,创建时间
"""
_ctime = datetime.datetime.fromtimestamp(os.path.getctime(file_path))
_ctime = _ctime.strftime('%Y-%m-%d %H:%M:%S')
_f_size = os.path.getsize(file_path)
# 将字节转换为 KB
_size_kb = _f_size / 1024
if _size_kb < 1024:
return f"{_size_kb:.2f} KB", _ctime
# 将 KB 转换为 MB
_size_mb = _size_kb / 1024
if _size_mb < 1024:
return f"{_size_mb:.2f} MB", _ctime
# 将 MB 转换为 GB
_size_gb = _size_mb / 1024
return f"{_size_gb:.2f} GB", _ctime
def read_to_buffer(file) -> bytes:
"""
以二进制只读方式从文件载入数据到字节流。
"""
assert os.path.isfile(file), 'File not found: %s' % file
with open(file, 'rb') as f:
buf = f.read(os.path.getsize(file))
f.close()
return buf
def sanitize_filename(filename: str) -> str:
"""
统一严格过滤文件名中的非法字符(跨 Windows/Linux/macOS 安全)。
规则:
1. 过滤所有系统禁止的字符(包括控制字符 \x00-\x1f
2. 处理 Windows 保留名称(如 CON、NUL 等)
3. 替换空格和 # 为下划线
4. 禁止以空格或点开头/结尾
5. 限制文件名长度(255 字符)
:param filename: 文件名
:return: 替换非法字符为 _ 的安全文件名
"""
# 1. Unicode 规范化(防止混淆攻击)
filename = unicodedata.normalize("NFKC", filename)
# 2. 替换所有非法字符为下划线(包括空格和 #)
# 包括:\ / : * ? " < > | \x00-\x20(控制字符和空格)#
safe_name = re.sub(r'[\\/:*?"<>|\x00-\x20#]', '_', filename)
# 3. 处理 Windows 保留名称(如 CON.txt -> _CON.txt
win_reserved = [
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4",
"LPT1", "LPT2", "LPT3", "CLOCK$"
]
if safe_name.upper().split(".")[0] in win_reserved:
safe_name = f"_{safe_name}"
# 4. 移除首尾空格和点(避免隐形问题)
safe_name = safe_name.strip(". ")
# 5. 确保文件名非空(如果输入全是非法字符)
if not safe_name:
safe_name = "unnamed_file"
# 6. 限制长度(Windows 最大 255 字符)
return safe_name[:255]
def check_and_create_dir(file_path, mode=0o777, exist_ok=False):
"""
根据传入的文件路径检查目录是否存在,若不存在,则创建。
:param file_path: 文件路径
:param mode: 目录权限,默认最高权限
:param exist_ok: 目录存在时,是否抛出异常,默认不抛出
"""
# 获取文件所在目录
_directory = os.path.dirname(file_path)
if not os.path.exists(_directory):
# 如果目录不存在,则创建目录
os.makedirs(_directory, mode, exist_ok)
def load_image_from_base64(base64_str):
"""
将 Base64 字符串转换为 face_recognition 可用的 numpy 数组。
:param base64_str: 经过 Base64 编码的图像数据
:return: 图像数据(numpy数组)
"""
try:
# 移除 Base64 头部(如果存在)
if "," in base64_str:
base64_str = base64_str.split(",")[1]
# 解码为二进制
image_data = base64.b64decode(base64_str)
# 验证图像完整性
Image.open(io.BytesIO(image_data)).verify()
# 转换为 RGB numpy 数组
image = Image.open(io.BytesIO(image_data))
if image.mode != "RGB":
image = image.convert("RGB")
return np.array(image)
except Exception as e:
raise ValueError(f"无效的 Base64 图像数据: {e}")
def load_png_from_base64(base64_str):
"""
从Base64字符串读取PNG图像并保留Alpha通道。
:param base64_str: 经过 Base64 编码的 PNG 图像数据
:return: 图像数据(numpy数组),包含BGRA四个通道
"""
try:
# 1. 解码Base64字符串
img_data = base64.b64decode(base64_str)
# 2. 将字节数据转换为numpy数组
np_array = np.frombuffer(img_data, np.uint8)
# 3. 使用IMREAD_UNCHANGED标志解码图像以保留Alpha通道
img = cv2.imdecode(np_array, cv2.IMREAD_UNCHANGED)
# 4. 检查是否成功读取
if img is None:
raise ValueError("无法解码图像数据")
# 5. 检查是否有Alpha通道
if img.shape[2] != 4:
print("警告: 图像没有Alpha通道,将添加全不透明Alpha通道")
# 将BGR转换为BGRA,添加全不透明Alpha通道
img = cv2.cvtColor(img, cv2.COLOR_BGR2BGRA)
return img
except Exception as e:
print(f"读取Base64图像时出错: {str(e)}")
return None