import base64 import datetime import io import os import re import unicodedata from typing import Optional, IO, Union import cv2 import numpy as np from PIL import Image file_types = { 'jpeg': (b'\xFF\xD8\xFF', b'\xff\xd8\xff'), 'png': (b'\x89PNG',), 'gif': (b'GIF8',), 'bmp': (b'BM',), 'tiff': (b'II*\x00', b'MM\x00*'), 'webp': (b'RIFF\x00\x00\x00\x00WEBP',), 'ico': (b'\x00\x00\x01\x00',), 'psd': (b'8BPS',), 'svg': (b' 1024*2: file_data = file_data[:1024*2] file_type: Optional[Union[str, tuple[str]]] = '' for _key, _val in file_types.items(): for _bs in _val: if file_data.startswith(_bs): file_type = _key break if file_type: break if isinstance(file_type, tuple): if file_type[0] == 'doc': # 使用读取到的全部数据(≤4KB)进行启发式判断 file_type = _heuristic_office_type(file_data) elif file_type[0] == 'docx': # 使用读取到的全部数据(≤4KB)进行启发式判断 file_type = _heuristic_office_x_type(file_data) return file_type def _heuristic_office_type(data: bytes) -> str: """ 仅基于前 4KB 数据,启发式判断是 .doc、.xls 还是 .ppt 依据:各格式在 OLE 结构中的典型字符串偏移位置 """ # 关键词及其对应类型 patterns = [ (b'W\x00o\x00r\x00d\x00D\x00o\x00c\x00u\x00m\x00e\x00n\x00t', 'doc'), (b'W\x00o\x00r\x00d', 'doc'), (b'WordDocument', 'doc'), (b'Word', 'doc'), (b'W\x00o\x00r\x00k\x00b\x00o\x00o\x00k', 'xls'), (b'B\x00o\x00o\x00k', 'xls'), (b'Workbook', 'xls'), (b'Book', 'xls'), (b'P\x00o\x00w\x00e\x00r\x00P\x00o\x00i\x00n\x00t', 'ppt'), (b'PowerPoint', 'ppt'), ] # 一次性遍历:在 data 中查找任一关键词 # 由于模式短,且数据小(≤4KB),用简单循环即可 for keyword, file_type in patterns: if keyword in data: return file_type # 未匹配时,保守返回 "" return "" def _heuristic_office_x_type(data: bytes) -> str: """ 仅用 `in` 判断 .docx/.xlsx/.pptx,精准匹配 Open XML 标准 MIME 类型 不解压、不解析、不猜,就看有没有那三个关键字符串 """ # 关键词及其对应类型 patterns = [ (b'word/PK', 'docx'), (b'xl/PK', 'xlsx'), (b'ppt/PK', 'pptx'), ] # 一次性遍历:在 data 中查找任一关键词 # 由于模式短,且数据小(≤4KB),用简单循环即可 for keyword, file_type in patterns: if keyword in data: return file_type # 未匹配时,保守返回 "" return "" def get_file_info(file_path): """ 取得文件信息,包括:文件大小、创建时间。 :param file_path: 文件绝对路径 :return: 大小,创建时间 """ _ctime = datetime.datetime.fromtimestamp(os.path.getctime(file_path)) _ctime = _ctime.strftime('%Y-%m-%d %H:%M:%S') _f_size = os.path.getsize(file_path) # 将字节转换为 KB _size_kb = _f_size / 1024 if _size_kb < 1024: return f"{_size_kb:.2f} KB", _ctime # 将 KB 转换为 MB _size_mb = _size_kb / 1024 if _size_mb < 1024: return f"{_size_mb:.2f} MB", _ctime # 将 MB 转换为 GB _size_gb = _size_mb / 1024 return f"{_size_gb:.2f} GB", _ctime def read_to_buffer(file) -> bytes: """ 以二进制只读方式从文件载入数据到字节流。 """ assert os.path.isfile(file), 'File not found: %s' % file with open(file, 'rb') as f: buf = f.read(os.path.getsize(file)) f.close() return buf def sanitize_filename(filename: str) -> str: """ 统一严格过滤文件名中的非法字符(跨 Windows/Linux/macOS 安全)。 规则: 1. 过滤所有系统禁止的字符(包括控制字符 \x00-\x1f) 2. 处理 Windows 保留名称(如 CON、NUL 等) 3. 替换空格和 # 为下划线 4. 禁止以空格或点开头/结尾 5. 限制文件名长度(255 字符) :param filename: 文件名 :return: 替换非法字符为 _ 的安全文件名 """ # 1. Unicode 规范化(防止混淆攻击) filename = unicodedata.normalize("NFKC", filename) # 2. 替换所有非法字符为下划线(包括空格和 #) # 包括:\ / : * ? " < > | \x00-\x20(控制字符和空格)# safe_name = re.sub(r'[\\/:*?"<>|\x00-\x20#]', '_', filename) # 3. 处理 Windows 保留名称(如 CON.txt -> _CON.txt) win_reserved = [ "CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "LPT1", "LPT2", "LPT3", "CLOCK$" ] if safe_name.upper().split(".")[0] in win_reserved: safe_name = f"_{safe_name}" # 4. 移除首尾空格和点(避免隐形问题) safe_name = safe_name.strip(". ") # 5. 确保文件名非空(如果输入全是非法字符) if not safe_name: safe_name = "unnamed_file" # 6. 限制长度(Windows 最大 255 字符) return safe_name[:255] def check_and_create_dir(file_path, mode=0o777, exist_ok=False): """ 根据传入的文件路径检查目录是否存在,若不存在,则创建。 :param file_path: 文件路径 :param mode: 目录权限,默认最高权限 :param exist_ok: 目录存在时,是否抛出异常,默认不抛出 """ # 获取文件所在目录 _directory = os.path.dirname(file_path) if not os.path.exists(_directory): # 如果目录不存在,则创建目录 os.makedirs(_directory, mode, exist_ok) def load_image_from_base64(base64_str): """ 将 Base64 字符串转换为 face_recognition 可用的 numpy 数组。 :param base64_str: 经过 Base64 编码的图像数据 :return: 图像数据(numpy数组) """ try: # 移除 Base64 头部(如果存在) if "," in base64_str: base64_str = base64_str.split(",")[1] # 解码为二进制 image_data = base64.b64decode(base64_str) # 验证图像完整性 Image.open(io.BytesIO(image_data)).verify() # 转换为 RGB numpy 数组 image = Image.open(io.BytesIO(image_data)) if image.mode != "RGB": image = image.convert("RGB") return np.array(image) except Exception as e: raise ValueError(f"无效的 Base64 图像数据: {e}") def load_png_from_base64(base64_str): """ 从Base64字符串读取PNG图像并保留Alpha通道。 :param base64_str: 经过 Base64 编码的 PNG 图像数据 :return: 图像数据(numpy数组),包含BGRA四个通道 """ try: # 1. 解码Base64字符串 img_data = base64.b64decode(base64_str) # 2. 将字节数据转换为numpy数组 np_array = np.frombuffer(img_data, np.uint8) # 3. 使用IMREAD_UNCHANGED标志解码图像以保留Alpha通道 img = cv2.imdecode(np_array, cv2.IMREAD_UNCHANGED) # 4. 检查是否成功读取 if img is None: raise ValueError("无法解码图像数据") # 5. 检查是否有Alpha通道 if img.shape[2] != 4: print("警告: 图像没有Alpha通道,将添加全不透明Alpha通道") # 将BGR转换为BGRA,添加全不透明Alpha通道 img = cv2.cvtColor(img, cv2.COLOR_BGR2BGRA) return img except Exception as e: print(f"读取Base64图像时出错: {str(e)}") return None