Files
2026-06-02 16:26:10 +08:00

186 lines
5.6 KiB
Python

import base64
import binascii
import datetime
import decimal
import json
import re
import zlib
from typing import Union
import numpy as np
from pandas._libs.tslibs.nattype import NaTType
from paste.db.baseadapter import BaseAdapter
from paste.db.basemodel import LOCAL_DATETIME_FORMAT, LOCAL_DATE_FORMAT, LOCAL_TIME_FORMAT
class JsonDumpsEncoder(json.JSONEncoder):
"""
JSON 转字符串时对一些特殊类型进行转码(编码)方法。
"""
def default(self, obj):
if isinstance(obj, NaTType):
return ''
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, (np.floating, decimal.Decimal)):
return float(obj)
elif isinstance(obj, bytes):
return obj.decode(encoding='utf-8', errors='ignore')
elif isinstance(obj, datetime.datetime):
return obj.strftime(LOCAL_DATETIME_FORMAT)
elif isinstance(obj, datetime.date):
return obj.strftime(LOCAL_DATE_FORMAT)
elif isinstance(obj, datetime.time):
return obj.strftime(LOCAL_TIME_FORMAT)
elif isinstance(obj, BaseAdapter):
return obj.to_dict()
return super().default(obj)
class BaseX:
"""
Base 编码解码方法,主要用于解码,针对编码数据自动检测编码类型。
能根据编码方式自动选择解码方法,同时在解码后尝试执行标准 Zip 解压。
"""
Type_Base16 = 'b16'
Type_Base32 = 'b32'
Type_Base64 = 'b64'
Type_Base85 = 'b85'
Decoders = {
Type_Base16: base64.b16decode,
Type_Base32: base64.b32decode,
Type_Base64: base64.b64decode,
Type_Base85: base64.b85decode,
}
"""
解码器。
"""
@classmethod
def base_x_detect(cls, data: Union[bytes, str]):
"""
检测采用的内置 Base 编码种类。返回数据与以下编码方式对应::
1、b16: Base16
2、b32: Base32
3、b64: Base64
4、b85: Base85
:param data: Base 编码数据,允许为字节流或字符串
:return: 编码名称,全小写
"""
if isinstance(data, bytes):
try:
data = data.decode()
except (UnicodeDecodeError, Exception):
return
try:
_reg = re.compile("^[0-9A-F=]+$")
if _reg.match(data) is not None:
return cls.Type_Base16
except (re.error, Exception):
pass
try:
_reg = re.compile("^[A-Z2-7=]+$")
if _reg.match(data) is not None:
return cls.Type_Base32
except (re.error, Exception):
pass
try:
_reg = re.compile("^[A-Za-z0-9+/=]+$")
if _reg.match(data) is not None:
return cls.Type_Base64
except (re.error, Exception):
pass
try:
_reg = re.compile("^[A-Za-z0-9!#$%&()*+-;<=>?@^_`{|}~']+$")
if _reg.match(data) is not None:
return cls.Type_Base85
except (re.error, Exception):
pass
@classmethod
def base_x_decode(cls, data: Union[bytes, str], base_type: str = None):
"""
自动检测编码种类后,解码 Base 编码数据。参数 base_type 与以下编码方式对应::
1、b16: Base16
2、b32: Base32
3、b64: Base64
4、b85: Base85
若在解码过程中发生异常,则返回原始数据。
:param data: Base 编码数据,允许为字节流或字符串
:param base_type: Base 编码类型,如为 b64 则代表须用 Base64 解码
:return: 解码后数据
"""
_res_data = b''
if isinstance(data, bytes):
try:
_tmp_data = data.decode()
_res_data = _tmp_data
except (UnicodeDecodeError, Exception):
return data
else:
_res_data = data
if base_type not in cls.Decoders:
# 检测 Base 编码种类
base_type = cls.base_x_detect(_res_data)
if base_type in cls.Decoders:
try:
# 尝试 BaseX 解码
_decoder = cls.Decoders.get(base_type)
_tmp_data = _decoder(_res_data)
_res_data = _tmp_data
except (binascii.Error, UnicodeDecodeError):
return data
return _res_data
@classmethod
def auto_decode_unzip(cls, data: Union[bytes, str], base_type: str = None):
"""
对参数尝试执行自动 BaseX 解码 和 Zip 解压::
1、若能 BaseX 解码,则执行解码,否则保持原始数据不变。
2、若能 Zip 解压,则执行解压,否则保持上一层数据不变。
若各种解码方法都无法顺利解码,或在解码过程中发生异常,则返回原始数据。
参数 base_type 与以下编码方式对应::
1、b16: Base16
2、b32: Base32
3、b64: Base64
4、b85: Base85
:param data: Base64 数据,允许为字节流或字符串
:param base_type: Base 编码类型,如为 b64 则代表须用 Base64 解码
:return: 解码后数据
"""
# 尝试 BaseX 解码
_res_data = cls.base_x_decode(data, base_type)
try:
# 尝试 Zip 解压
_tmp_data = zlib.decompress(_res_data)
_res_data = _tmp_data
except (zlib.error, TypeError):
return _res_data
return _res_data