首次提交

This commit is contained in:
zwf
2026-06-02 16:26:10 +08:00
commit 291e6fcaae
79 changed files with 11283 additions and 0 deletions
View File
+185
View File
@@ -0,0 +1,185 @@
import base64
import binascii
import datetime
import decimal
import json
import re
import zlib
from typing import Union
import numpy as np
from pandas._libs.tslibs.nattype import NaTType
from paste.db.baseadapter import BaseAdapter
from paste.db.basemodel import LOCAL_DATETIME_FORMAT, LOCAL_DATE_FORMAT, LOCAL_TIME_FORMAT
class JsonDumpsEncoder(json.JSONEncoder):
"""
JSON 转字符串时对一些特殊类型进行转码(编码)方法。
"""
def default(self, obj):
if isinstance(obj, NaTType):
return ''
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, (np.floating, decimal.Decimal)):
return float(obj)
elif isinstance(obj, bytes):
return obj.decode(encoding='utf-8', errors='ignore')
elif isinstance(obj, datetime.datetime):
return obj.strftime(LOCAL_DATETIME_FORMAT)
elif isinstance(obj, datetime.date):
return obj.strftime(LOCAL_DATE_FORMAT)
elif isinstance(obj, datetime.time):
return obj.strftime(LOCAL_TIME_FORMAT)
elif isinstance(obj, BaseAdapter):
return obj.to_dict()
return super().default(obj)
class BaseX:
"""
Base 编码解码方法,主要用于解码,针对编码数据自动检测编码类型。
能根据编码方式自动选择解码方法,同时在解码后尝试执行标准 Zip 解压。
"""
Type_Base16 = 'b16'
Type_Base32 = 'b32'
Type_Base64 = 'b64'
Type_Base85 = 'b85'
Decoders = {
Type_Base16: base64.b16decode,
Type_Base32: base64.b32decode,
Type_Base64: base64.b64decode,
Type_Base85: base64.b85decode,
}
"""
解码器。
"""
@classmethod
def base_x_detect(cls, data: Union[bytes, str]):
"""
检测采用的内置 Base 编码种类。返回数据与以下编码方式对应::
1、b16: Base16
2、b32: Base32
3、b64: Base64
4、b85: Base85
:param data: Base 编码数据,允许为字节流或字符串
:return: 编码名称,全小写
"""
if isinstance(data, bytes):
try:
data = data.decode()
except (UnicodeDecodeError, Exception):
return
try:
_reg = re.compile("^[0-9A-F=]+$")
if _reg.match(data) is not None:
return cls.Type_Base16
except (re.error, Exception):
pass
try:
_reg = re.compile("^[A-Z2-7=]+$")
if _reg.match(data) is not None:
return cls.Type_Base32
except (re.error, Exception):
pass
try:
_reg = re.compile("^[A-Za-z0-9+/=]+$")
if _reg.match(data) is not None:
return cls.Type_Base64
except (re.error, Exception):
pass
try:
_reg = re.compile("^[A-Za-z0-9!#$%&()*+-;<=>?@^_`{|}~']+$")
if _reg.match(data) is not None:
return cls.Type_Base85
except (re.error, Exception):
pass
@classmethod
def base_x_decode(cls, data: Union[bytes, str], base_type: str = None):
"""
自动检测编码种类后,解码 Base 编码数据。参数 base_type 与以下编码方式对应::
1、b16: Base16
2、b32: Base32
3、b64: Base64
4、b85: Base85
若在解码过程中发生异常,则返回原始数据。
:param data: Base 编码数据,允许为字节流或字符串
:param base_type: Base 编码类型,如为 b64 则代表须用 Base64 解码
:return: 解码后数据
"""
_res_data = b''
if isinstance(data, bytes):
try:
_tmp_data = data.decode()
_res_data = _tmp_data
except (UnicodeDecodeError, Exception):
return data
else:
_res_data = data
if base_type not in cls.Decoders:
# 检测 Base 编码种类
base_type = cls.base_x_detect(_res_data)
if base_type in cls.Decoders:
try:
# 尝试 BaseX 解码
_decoder = cls.Decoders.get(base_type)
_tmp_data = _decoder(_res_data)
_res_data = _tmp_data
except (binascii.Error, UnicodeDecodeError):
return data
return _res_data
@classmethod
def auto_decode_unzip(cls, data: Union[bytes, str], base_type: str = None):
"""
对参数尝试执行自动 BaseX 解码 和 Zip 解压::
1、若能 BaseX 解码,则执行解码,否则保持原始数据不变。
2、若能 Zip 解压,则执行解压,否则保持上一层数据不变。
若各种解码方法都无法顺利解码,或在解码过程中发生异常,则返回原始数据。
参数 base_type 与以下编码方式对应::
1、b16: Base16
2、b32: Base32
3、b64: Base64
4、b85: Base85
:param data: Base64 数据,允许为字节流或字符串
:param base_type: Base 编码类型,如为 b64 则代表须用 Base64 解码
:return: 解码后数据
"""
# 尝试 BaseX 解码
_res_data = cls.base_x_decode(data, base_type)
try:
# 尝试 Zip 解压
_tmp_data = zlib.decompress(_res_data)
_res_data = _tmp_data
except (zlib.error, TypeError):
return _res_data
return _res_data
+125
View File
@@ -0,0 +1,125 @@
"""
基础分页程序,处理分页计算,后续应当扩展其功能。
"""
class Pagination:
"""
分页程序。
"""
def __init__(self, row_count: int):
"""
初始化分页。
:param row_count: 总记录行数
"""
self._offset = 0
"""
偏移量。
"""
self._pages = -1
"""
总页数。
"""
self._page_number = 1
"""
当前页码。
"""
self.row_count = row_count
"""
数据行数。
"""
self.page_size = 20
"""
每页显示的数据量。默认 20 行每页。
"""
@property
def page_count(self):
"""
取得页数。该属性必须在调用 :meth:`.pages` 方法后调用,例如::
>>> self.pages()
>>> self.page_count
:return: 页数
"""
return self._pages
@property
def page_number(self):
"""
取得当前页码。该属性必须在调用 :meth:`.number` 方法后调用, 例如::
>>> self.number(3)
>>> self.page_number
:return: 页码
"""
return self._page_number
@property
def offset_size(self):
"""
取得偏移量。
"""
return self._offset
def pages(self, page_size: int = 20):
"""
计算页数。
:param page_size: 每页行数,必须处于 [1, 1000] 区间中。若不在此区间,则强制转换到此区间。默认每页 20 条。
:return: 计算取得的页数。
"""
page_size = 1 if page_size < 1 else page_size
page_size = 1000 if page_size > 1000 else page_size
self.page_size = page_size
if self.row_count == 0:
self._pages = 1
else:
_v1 = self.row_count / page_size
_v2 = self.row_count // page_size
self._pages = _v2 if _v1 == _v2 else _v2 + 1
return self._pages
def number(self, page_number: int):
"""
检查页码范围。
:param page_number: 页码
:return: 正确页码
"""
_pages = self.pages(self.page_size)
self._page_number = 1 if page_number < 1 else page_number
self._page_number = _pages if self._page_number > _pages else self._page_number
return self._page_number
def offset(self, page_number: int):
"""
偏移量。
:param page_number: 页码
:return: 偏移量
"""
self._offset = self.page_size * (page_number - 1)
return self._offset
def paging(self, page_number: int = 1, page_size: int = 20):
"""
分页计算,支持链式调用。
:params page_number 页码
:params page_size 每页显示的数量
:return self
"""
self.pages(page_size=page_size)
self.offset(self.number(page_number))
return self
+63
View File
@@ -0,0 +1,63 @@
import mimetypes
import os
import urllib.parse
import urllib.request
import weasyprint as wp
from paste.core.logging import echo_log
class Html2Pdf:
"""
将 HTML 内容转换为 PDF 文件。
"""
@classmethod
def custom_url_fetcher(cls, url, timeout=30, **kwargs):
"""
自定义 URL 加载器,增加超时时间
"""
# 处理 file:// URLs
if url.startswith('file://'):
parsed = urllib.parse.urlparse(url)
path = urllib.request.url2pathname(parsed.path)
if not os.path.exists(path):
raise ValueError(f"File not found: {path}")
_mime_type, _ = mimetypes.guess_type(path)
if not _mime_type:
_mime_type = 'application/octet-stream'
return {
'mime_type': _mime_type,
'encoding': 'binary',
'filename': os.path.basename(path),
'file_obj': open(path, 'rb'),
}
# 增加超时时间(默认是 30 秒)
return wp.default_url_fetcher(url, timeout=timeout, **kwargs)
@classmethod
def write_pdf(cls, content, output_pdf=None, base_url=""):
"""
将 HTML 转换为 PDF。
:param content: HTML 字符串
:param output_pdf: 输出的 PDF 文件路径,默认为空
:param base_url: 跨域默认地址
"""
try:
# HTML 转换为 PDF
_html = wp.HTML(string=content, url_fetcher=cls.custom_url_fetcher, base_url=base_url)
_bytes = _html.write_pdf(output_pdf)
if output_pdf:
echo_log(f"PDF 已成功生成在: {output_pdf}.")
else:
echo_log(f"PDF 已成功生成.")
return _bytes
except Exception as e:
echo_log(f"转换失败: {e}")
raise e
+126
View File
@@ -0,0 +1,126 @@
"""
雪花 ID 生成程序。
"""
import time
import logging
# 64位ID的划分
WORKER_ID_BITS = 5
DATACENTER_ID_BITS = 5
SEQUENCE_BITS = 12
# 最大取值计算
MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS) # 2**5-1 0b11111
MAX_DATACENTER_ID = -1 ^ (-1 << DATACENTER_ID_BITS)
# 移位偏移计算
WORKER_ID_SHIFT = SEQUENCE_BITS
DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS
TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS
# 序号循环掩码
SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)
# Twitter元年时间戳
TW_EPOCH = 1288834974657
ID_WORKER = None
class InvalidSystemClock(Exception):
"""
时钟回拨异常
"""
pass
class IdWorker(object):
"""
用于生成 Snow ID。
"""
@classmethod
def get_id_worker(cls, datacenter_id=1, worker_id=1, sequence=0):
"""
创建 Snow ID 对象。
:param datacenter_id: 数据中心(机器区域)ID
:param worker_id: 机器ID
:param sequence: 起始序号
"""
global ID_WORKER
if ID_WORKER is None:
ID_WORKER = IdWorker(datacenter_id, worker_id, sequence)
return ID_WORKER
def __init__(self, datacenter_id, worker_id, sequence=0):
"""
初始化。
:param datacenter_id: 数据中心(机器区域)ID
:param worker_id: 机器ID
:param sequence: 起始序号
"""
# sanity check
if worker_id > MAX_WORKER_ID or worker_id < 0:
raise ValueError('worker_id值越界')
if datacenter_id > MAX_DATACENTER_ID or datacenter_id < 0:
raise ValueError('datacenter_id值越界')
self.worker_id = worker_id
self.datacenter_id = datacenter_id
self.sequence = sequence
self.last_timestamp = -1 # 上次计算的时间戳
@staticmethod
def _gen_timestamp():
"""
生成整数时间戳。
:return:int timestamp
"""
return int(time.time() * 1000)
def get_id(self):
"""
获取新ID。
:return: 新的 Snow ID
"""
timestamp = self._gen_timestamp()
# 时钟回拨
if timestamp < self.last_timestamp:
logging.error(f"时钟正在向后倒转。拒绝请求直至 {self.last_timestamp}.")
raise InvalidSystemClock
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & SEQUENCE_MASK
if self.sequence == 0:
timestamp = self._til_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
new_id = ((timestamp - TW_EPOCH) << TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << DATACENTER_ID_SHIFT) | \
(self.worker_id << WORKER_ID_SHIFT) | self.sequence
return new_id
def _til_next_millis(self, last_timestamp):
"""
等到下一毫秒。
"""
timestamp = self._gen_timestamp()
while timestamp <= last_timestamp:
timestamp = self._gen_timestamp()
return timestamp
if __name__ == '__main__':
worker = IdWorker(1, 1, 0)
print(worker.get_id())
+704
View File
@@ -0,0 +1,704 @@
import re
from typing import Optional
import svgwrite
from svgwrite.container import Group
from svgwrite.path import Path
from svgwrite.shapes import Rect
from svgwrite.text import Text
from paste.util import ustr
class TextRect(Group):
"""
可显示文本的矩形。
"""
def __init__(self, text, insert, text_extra: dict = None, rect_extra: dict = None, **extra):
# 父类初始化
super().__init__(**extra)
self.text = text
"""
要显示的文本内容。
"""
self.extra = extra
"""
组合扩展信息。
"""
self.rectExtra = rect_extra if rect_extra is not None else {}
"""
外框扩展信息。
"""
self.textExtra = text_extra if text_extra is not None else {}
"""
文本扩展信息。
"""
self.rectInsert = insert
"""
整体位置参数,即外框的位置参数。
"""
# 初始化文本尺寸
_fs = self.font_size
self.textInsert = self.text_pos
"""
文本位置参数。
"""
# 文本初始化
self.textElement = Text(self.text, insert=self.textInsert, **self.textExtra)
# 矩形初始化
self.rectElement = Rect(insert=self.rectInsert, size=self.rect_size, **self.rectExtra)
# 加入元素
self.add(self.rectElement)
self.add(self.textElement)
@property
def font_size(self):
"""
从样式中识别字体大小,单位用像素,缺省 14px。
:return: 字体大小
"""
_font_size = self.textExtra.get('font-size', self.extra.get('font-size', f"{14}px"))
self.textExtra['font-size'] = _font_size
_size = re.sub(r'\D', '', _font_size.strip())
return int(_size)
@property
def text_width(self):
"""
文本宽度(近似)。
"""
total = len(self.text)
q_count = ustr.str_q_count(self.text)
return q_count * self.font_size + (total - q_count) * self.font_size * 0.5
@property
def text_height(self):
"""
文本高度(近似)。
"""
return self.font_size * 1.2
@property
def rect_width(self):
"""
外框宽度。
"""
return self.text_width + self.font_size * 1.5
@property
def rect_height(self):
"""
外框高度。
"""
return self.text_height * 1.9
@property
def rect_size(self):
"""
外框尺寸。
"""
return self.rect_width, self.rect_height
@property
def text_pos(self):
"""
文本位置。
"""
return \
self.rectInsert[0] + (self.rect_width - self.text_width) * 0.5, \
self.rectInsert[1] + self.text_height * 1.25
def reposition(self, position: tuple):
"""
重新定位。
:param position: 位置坐标
"""
self.rectInsert = position
self.rectElement.attribs['x'] = self.rectInsert[0]
self.rectElement.attribs['y'] = self.rectInsert[1]
self.textInsert = self.text_pos
self.textElement.attribs['x'] = self.textInsert[0]
self.textElement.attribs['y'] = self.textInsert[1]
def point_bottom(self):
"""
底部点。
"""
return self.rectInsert[0] + self.rect_width / 2, self.rectInsert[1] + self.rect_size[1]
def point_top(self):
"""
顶部点。
"""
return self.rectInsert[0] + self.rect_width / 2, self.rectInsert[1]
def point_left(self):
"""
左侧点。
"""
return self.rectInsert[0], self.rectInsert[1] + self.rect_height / 2
def point_right(self):
"""
右侧点。
"""
return self.rectInsert[0] + self.rect_width, self.rectInsert[1] + self.rect_height / 2
@classmethod
def horizontal_path(cls, start: tuple, end: tuple, **extra):
"""
生成水平方向连接线。
:param start: 起点坐标
:param end: 终点坐标
:param extra: 扩展参数
:return: 路径对象
"""
_p_control = [
(start[0] + end[0]) * 0.5,
start[1]
]
_p_center = [
(start[0] + end[0]) * 0.5,
(start[1] + end[1]) * 0.5
]
_path = Path(**extra)
_path.push(['M', start])
_path.push(['Q', _p_control + _p_center])
_path.push(['T', end])
return _path
@classmethod
def vertical_path(cls, start: tuple, end: tuple, **extra):
"""
生成垂直方向连接线。
:param start: 起点坐标
:param end: 终点坐标
:param extra: 扩展参数
:return: 路径对象
"""
_p_control = [
start[0],
(start[1] + end[1]) * 0.5
]
_p_center = [
(start[0] + end[0]) * 0.5,
(start[1] + end[1]) * 0.5
]
_path = Path(**extra)
_path.push(['M', start])
_path.push(['Q', _p_control + _p_center])
_path.push(['T', end])
return _path
def choose_point(self, sibling: 'TextRect'):
"""
选择与目标文本框的连线点。
返回起点(tuple)在自生文本框上,终点(tuple)在目标文本框上。
:param sibling: 目标文本框
:return: 起点、终点、是否水平连线
"""
_start = self.point_bottom()
_end = sibling.point_top()
_is_horizontal = True
if self.point_bottom()[1] > sibling.point_top()[1]:
if self.point_right()[0] < sibling.point_left()[0]:
_start = self.point_right()
_end = sibling.point_left()
_is_horizontal = False
elif self.point_left()[0] > sibling.point_right()[0]:
_start = self.point_left()
_end = sibling.point_right()
_is_horizontal = False
else:
_start = self.point_top()
_end = sibling.point_bottom()
_is_horizontal = True
elif self.point_top()[1] < sibling.point_bottom()[1]:
if self.point_right()[0] < sibling.point_left()[0]:
_start = self.point_right()
_end = sibling.point_left()
_is_horizontal = False
elif self.point_left()[0] > sibling.point_right()[0]:
_start = self.point_left()
_end = sibling.point_right()
_is_horizontal = False
else:
_start = self.point_bottom()
_end = sibling.point_top()
_is_horizontal = True
else:
if self.point_right()[0] < sibling.point_left()[0]:
_start = self.point_right()
_end = sibling.point_left()
_is_horizontal = False
elif self.point_left()[0] > sibling.point_right()[0]:
_start = self.point_left()
_end = sibling.point_right()
_is_horizontal = False
else:
_start = self.point_bottom()
_end = sibling.point_top()
_is_horizontal = True
return _start, _end, _is_horizontal
def connect(self, sibling: 'TextRect', **extra):
"""
取得连接路径。
:param sibling: 目标文本框
:param extra: 连线扩展参数
:return: 连接路径
"""
_start, _end, _is_horizontal = self.choose_point(sibling)
if _is_horizontal:
return self.vertical_path(_start, _end, **extra)
else:
return self.horizontal_path(_start, _end, **extra)
class RelationGraph:
"""
SVG 关系图。
根据 title 名称和 row_list 列表数据输出 svg 格式的关系图谱。
"""
def __init__(self, filename: str = 'noname.svg'):
self.filename = filename
self.width = 800
"""
画布宽度。
"""
self.height = 600
"""
画布高度。
"""
self.vhSpace = 170
"""
内容垂直浮动空间。
"""
self.lrSpace = 100
"""
内容左右留白空间。
"""
self.titleTextExtra = {
'font-size': '16px', 'fill': 'rgb(255, 255, 255)'
}
"""
标题文本样式。
"""
self.titleRectExtra = {
'rx': 10, 'ry': 10, 'fill': 'rgb(233, 72, 41)', 'fill-opacity': 1, 'stroke': 'rgb(233, 72, 41)'
}
"""
标题外框样式
"""
self.textExtra = {
'font-size': '14px', 'fill': 'rgb(255, 255, 255)'
}
"""
普通文本样式。
"""
self.rectExtra = {
'rx': 10, 'ry': 10, 'fill': 'rgb(65, 130, 164)', 'fill-opacity': 1, 'stroke': 'rgb(65, 130, 164)'
}
"""
普通文本外框样式。
"""
self.pathExtra = {
'fill': 'none', 'stroke': 'rgb(65, 130, 164)'
}
"""
连线样式。
"""
self.drawing = svgwrite.Drawing(filename=self.filename)
"""
主绘图对象。
"""
self.attribs = self.drawing.attribs
"""
图像样式。
"""
self.save = self.drawing.save
"""
保存文件方法。
"""
self.attribs.update({
'width': self.width, 'height': self.height
})
self.titleTr: Optional[TextRect] = None
"""
标题文本框对象。
"""
def draw(self, title: str, row_list: list[dict]):
"""
绘制图形。
:param row_list: 数据对象列表,必须包含 unit_name, unit_uscid, enterprise_id 三个字段
:param title: 标题文本
:return: 自身对象
"""
# 重定设图像参数
self.attribs.update(self.attribs)
# 创建标题文本框
self.titleTr = TextRect(
text=title, insert=(0, 0), text_extra=self.titleTextExtra, rect_extra=self.titleRectExtra, **{
'debug': False
}
)
self.titleTr.reposition((
(self.width - self.titleTr.rect_width) * 0.5, (self.height - self.titleTr.rect_height) * 0.5 - 20
))
self.drawing.add(self.titleTr)
_tr_list: list[TextRect] = []
for _i, _row in enumerate(row_list):
# 遍历数据,初始创建所有的文本框,得到文本框尺寸信息
# 同时保留所有需要输出的数据
_text = f"{_row['short_name']} ({_row['count']})"
_tr = TextRect(
text=_text, insert=(0, 0), rect_extra=self.rectExtra, **self.textExtra, **{
'debug': False,
'data-name': _row['unit_name'],
'data-uscid': _row['unit_uscid'],
'data-enterprise-id': _row['enterprise_id'],
}
)
_tr_list.append(_tr)
_harf = int(len(_tr_list) / 2) if int(len(_tr_list) % 2) == 0 else int(len(_tr_list) / 2) + 1
_top_list = []
_lft_list = _tr_list[:_harf]
_rit_list = _tr_list[_harf:]
_btm_list = []
if len(_tr_list) >= 12:
_top_list = _lft_list[:2]
_lft_list = _lft_list[2:]
if len(_tr_list) >= 14:
_btm_list = _rit_list[-2:]
_rit_list = _rit_list[:-2]
# 遍历所有顶部文本框,重新定位
for _i, _tr in enumerate(_top_list):
if _i == 0:
_position = (
self.titleTr.point_top()[0] - _tr.rect_width - 15,
self.titleTr.point_top()[1] - self.vhSpace - _tr.rect_height - 15
)
else:
_position = (
self.titleTr.point_top()[0] + 15,
self.titleTr.point_top()[1] - self.vhSpace - _tr.rect_height - 15
)
_tr.reposition(_position)
# 遍历所有底部文本框,重新定位
for _i, _tr in enumerate(_btm_list):
if _i == 0:
_position = (
self.titleTr.point_bottom()[0] - _tr.rect_width - 15,
self.titleTr.point_bottom()[1] + self.vhSpace + _tr.rect_height + 15
)
else:
_position = (
self.titleTr.point_bottom()[0] + 15,
self.titleTr.point_bottom()[1] + self.vhSpace + _tr.rect_height + 15
)
_tr.reposition(_position)
_top = self.titleTr.point_top()[1] - self.vhSpace
# 遍历所有左则文本框,重新定位
for _tr in _lft_list:
_w = _tr.rect_width
_h = _tr.rect_height
_space = self.titleTr.point_bottom()[1] - self.titleTr.point_top()[1] + self.vhSpace * 2 + _h
_margin = 0
if len(_lft_list) > 1:
_margin = (_space - len(_lft_list) * _h) / (len(_lft_list) - 1)
_left = self.titleTr.point_left()[0] - _w - self.lrSpace
_position = (_left, _top)
_tr.reposition(_position)
if _tr.point_left()[0] < 20:
_left = 20
_position = (_left, _top)
_tr.reposition(_position)
_top += _h + _margin
_top = self.titleTr.point_top()[1] - self.vhSpace
# 遍历所有右侧文本框,重新定位
for _tr in _rit_list:
_w = _tr.rect_width
_h = _tr.rect_height
_space = self.titleTr.point_bottom()[1] - self.titleTr.point_top()[1] + self.vhSpace * 2 + _h
_margin = 0
if len(_rit_list) > 1:
_margin = (_space - len(_rit_list) * _h) / (len(_rit_list) - 1)
_left = self.titleTr.point_right()[0] + self.lrSpace
_position = (_left, _top)
_tr.reposition(_position)
if _tr.point_right()[0] > self.width - 20:
_left = self.width - _tr.rect_width - 20
_position = (_left, _top)
_tr.reposition(_position)
_top += _h + _margin
for _tr in _tr_list:
self.drawing.add(self.titleTr.connect(_tr, **self.pathExtra))
for _tr in _tr_list:
self.drawing.add(_tr)
class EnterpriseGraph:
"""
SVG 企业汇总信息图。
根据 title 名称和 row_list 列表数据输出 svg 格式的关系图谱。
"""
def __init__(self, filename: str = 'noname.svg'):
self.filename = filename
self.width = 800
"""
画布宽度。
"""
self.height = 300
"""
画布高度。
"""
self.vhSpace = 50
"""
内容垂直浮动空间。
"""
self.lrSpace = 100
"""
内容左右留白空间。
"""
self.titleTextExtra = {
'font-size': '16px', 'fill': 'rgb(255, 255, 255)'
}
"""
标题文本样式。
"""
self.titleRectExtra = {
'rx': 10, 'ry': 10, 'fill': 'rgb(233, 72, 41)', 'fill-opacity': 1, 'stroke': 'rgb(233, 72, 41)'
}
"""
标题外框样式
"""
self.textExtra = {
'font-size': '14px', 'fill': 'rgb(255, 255, 255)'
}
"""
普通文本样式。
"""
self.rectExtra = {
'rx': 10, 'ry': 10, 'fill': 'rgb(65, 130, 164)', 'fill-opacity': 1, 'stroke': 'rgb(65, 130, 164)'
}
"""
普通文本外框样式。
"""
self.pathExtra = {
'fill': 'none', 'stroke': 'rgb(65, 130, 164)'
}
"""
连线样式。
"""
self.drawing = svgwrite.Drawing(filename=self.filename)
"""
主绘图对象。
"""
self.attribs = self.drawing.attribs
"""
图像样式。
"""
self.save = self.drawing.save
"""
保存文件方法。
"""
self.attribs.update({
'width': self.width, 'height': self.height
})
self.titleTr: Optional[TextRect] = None
"""
标题文本框对象。
"""
def draw(self, title: str, data_item: dict):
"""
绘制图形。
:param data_item: 数据项字典,中文名称:数据值
:param title: 标题文本
:return: 自身对象
"""
# 重定设图像参数
self.attribs.update(self.attribs)
# 创建标题文本框
self.titleTr = TextRect(
text=title, insert=(0, 0), text_extra=self.titleTextExtra, rect_extra=self.titleRectExtra, **{
'debug': False
}
)
self.titleTr.reposition((
(self.width - self.titleTr.rect_width) * 0.5, (self.height - self.titleTr.rect_height) * 0.5 - 20
))
self.drawing.add(self.titleTr)
_tr_list: list[TextRect] = []
for _key, _val in data_item.items():
# 遍历数据,初始创建所有的文本框,得到文本框尺寸信息
# 同时保留所有需要输出的数据
_text = f"{_key}{_val}"
_tr = TextRect(
text=_text, insert=(0, 0), rect_extra=self.rectExtra, **self.textExtra, **{
'debug': False,
}
)
_tr_list.append(_tr)
_harf = int(len(_tr_list) / 2) if int(len(_tr_list) % 2) == 0 else int(len(_tr_list) / 2) + 1
_top_list = []
_lft_list = _tr_list[:_harf]
_rit_list = _tr_list[_harf:]
_btm_list = []
if len(_tr_list) >= 12:
_top_list = _lft_list[:2]
_lft_list = _lft_list[2:]
if len(_tr_list) >= 14:
_btm_list = _rit_list[-2:]
_rit_list = _rit_list[:-2]
# 遍历所有顶部文本框,重新定位
for _key, _tr in enumerate(_top_list):
if _key == 0:
_position = (
self.titleTr.point_top()[0] - _tr.rect_width - 15,
self.titleTr.point_top()[1] - self.vhSpace - _tr.rect_height - 15
)
else:
_position = (
self.titleTr.point_top()[0] + 15,
self.titleTr.point_top()[1] - self.vhSpace - _tr.rect_height - 15
)
_tr.reposition(_position)
# 遍历所有底部文本框,重新定位
for _key, _tr in enumerate(_btm_list):
if _key == 0:
_position = (
self.titleTr.point_bottom()[0] - _tr.rect_width - 15,
self.titleTr.point_bottom()[1] + self.vhSpace + _tr.rect_height + 15
)
else:
_position = (
self.titleTr.point_bottom()[0] + 15,
self.titleTr.point_bottom()[1] + self.vhSpace + _tr.rect_height + 15
)
_tr.reposition(_position)
_top = self.titleTr.point_top()[1] - self.vhSpace
# 遍历所有左则文本框,重新定位
for _tr in _lft_list:
_w = _tr.rect_width
_h = _tr.rect_height
_space = self.titleTr.point_bottom()[1] - self.titleTr.point_top()[1] + self.vhSpace * 2 + _h
_margin = 0
if len(_lft_list) > 1:
_margin = (_space - len(_lft_list) * _h) / (len(_lft_list) - 1)
_left = self.titleTr.point_left()[0] - _w - self.lrSpace
_position = (_left, _top)
_tr.reposition(_position)
if _tr.point_left()[0] < 20:
_left = 20
_position = (_left, _top)
_tr.reposition(_position)
_top += _h + _margin
_top = self.titleTr.point_top()[1] - self.vhSpace
# 遍历所有右侧文本框,重新定位
for _tr in _rit_list:
_w = _tr.rect_width
_h = _tr.rect_height
_space = self.titleTr.point_bottom()[1] - self.titleTr.point_top()[1] + self.vhSpace * 2 + _h
_margin = 0
if len(_rit_list) > 1:
_margin = (_space - len(_rit_list) * _h) / (len(_rit_list) - 1)
_left = self.titleTr.point_right()[0] + self.lrSpace
_position = (_left, _top)
_tr.reposition(_position)
if _tr.point_right()[0] > self.width - 20:
_left = self.width - _tr.rect_width - 20
_position = (_left, _top)
_tr.reposition(_position)
_top += _h + _margin
for _tr in _tr_list:
self.drawing.add(self.titleTr.connect(_tr, **self.pathExtra))
for _tr in _tr_list:
self.drawing.add(_tr)
+164
View File
@@ -0,0 +1,164 @@
import os
from typing import Optional
from paste.core import config
class TailRead:
"""
文件逆向读取器。
主要针对读取日志文件。当遇到大日志文件时,需要从后向前读取,这样读取的速度更快。
当日志文件动态增加时,再正向读取,此时仅读取差异内容,实现小数据量交互。
"""
@classmethod
def logReader(cls, log_fn: str = None):
"""
取得配置文件设置的日志文件读取器。
:param log_fn: 日志文件名
:return: 默认日志文件读取器
"""
if log_fn is None:
log_fn = config.get_config('logger.filename')
return TailRead(fn=log_fn)
def __init__(self, fn: str):
self.file_name = fn
"""
要读取的文件名。
"""
self.file_io = open(self.file_name, 'rb')
"""
文件 IO 对象。
"""
self.current_position: Optional[int] = None
"""
当前读取点位置。
"""
_f_size = self.size()
if _f_size > 1:
# 移动到文件末尾
self.file_io.seek(_f_size - 1)
else:
self.file_io.seek(0)
def size(self):
"""
取得文件大小。
:return: 文件大小
"""
return os.path.getsize(self.file_name)
def readTail(self, lines: int = 100):
"""
从文件中,逆向读取 lines 行。读取结束后,将读取定位移动到所有读取到的字节的最后。
:param lines: 读取的行数
:return: 读取到的数据,读取完成点
"""
_buffer: bytes = b''
_c_pos = self.file_io.tell()
# 从当前位置读取一位,判断是否是回车
# 若是,则增加一行,确保读取足够的行数
_byte = self.file_io.read(1)
if _byte == b'\n':
lines += 1
# 重新回到原始位置
self.file_io.seek(_c_pos)
_r_pos = _c_pos
while lines > 0:
# 读取一个字节
_byte = self.file_io.read(1)
if _byte == b'':
# 无数据,退出
break
if _byte == b'\n':
# 减少行数
lines -= 1
# 逆向前移
_r_pos -= 1
if _r_pos <= 0:
# 超出第一位时,退出
break
self.file_io.seek(_r_pos)
# 加入缓存
_buffer = _byte + _buffer
# 扣除首字节回车符号
if _buffer[0:1] == b'\n':
_buffer = _buffer[1:]
# 第一位已经读取,因此正向移动一位
self.current_position = _c_pos + 1
self.file_io.seek(self.current_position)
return _buffer, self.current_position
def readLines(self, lines: int = 100, crt_pos: int = None):
"""
读取文件数据,默认读取 100 行。当不传入 crt_pos 时逆向读取,传入时正向读取。
具有动态方向,确保第一次是最大量读取,以后每次都是增量读取,减少传递的数据量。
:param lines: 要读取的行数
:param crt_pos: 当前读取位置
:return:
"""
_buffer: bytes = b''
if crt_pos is None:
# 参数 cur_pos 为 None 时,逆向读取
self.file_io.seek(self.size()-1)
_buffer, crt_pos = self.readTail(lines)
else:
# 参数 cur_pos 有值时,正向读取
self.file_io.seek(crt_pos)
while lines:
_bytes = self.file_io.readline()
if _bytes == b'':
# 无数据,退出
break
else:
# 减少行数
lines -= 1
# 加入缓存
_buffer += _bytes
crt_pos = self.file_io.tell()
self.current_position = crt_pos
return _buffer, self.current_position
def read(self, lines: int = 200, crt_pos: int = None):
"""
读取文件数据。注意::
1、首次读取时 crt_pos 应为 None 此时逆向读取,返回读取到的数据流和读取点位置。
2、当有 crt_pos 参数时,先检查文件是否发生了变化,若文件变大,则正向读取增量部分,若文件变小则置空。
3、若有 crt_pos 且文件没有发生变化,则返回空字节流,读取位置不变。
:param lines: 要读取的最大行数,默认 200 行
:param crt_pos: 当前读取位置,为 None 时逆向读取,否则正向读取
:return: 读取到的字节流
"""
_buffer = b''
if crt_pos is None:
# 参数 cur_pos 为 None 时,逆向读取
_buffer, crt_pos = self.readLines(lines, crt_pos=crt_pos)
else:
# 参数 cur_pos 有值时
# 检查文件是否发生了变化
_log_size = self.size()
if _log_size > crt_pos + 1:
# 内容增加,继续正向读取
_buffer, crt_pos = self.readLines(lines, crt_pos=crt_pos)
elif _log_size < crt_pos:
# 内容减少,置空读取位置
# 置空后,再次调用本函数,执行逆向读取
crt_pos = None
self.current_position = crt_pos
return _buffer, self.current_position
+40
View File
@@ -0,0 +1,40 @@
from typing import Any, Optional, Dict
def get_with_default(dict_obj: dict, key: Any, default: Optional[Any] = None):
"""
从字典中取得对应的值,若为 None,则返回默认值。
注意,字典自带 get 方法是当 key 存在,则返回对应的值,无论是否为 None;
而该方法是无论 key 是否存在,只要值为 None 均返回默认值。
:param dict_obj: 字典对象
:param key: 键
:param default: 默认值
"""
_val = dict_obj.get(key, default)
if _val is None:
_val = default
return _val
def get_by_path(dict_obj: Dict[str, Any], path: str, default: Optional[Any] = None):
"""
按路径取得字典中的数据。要求路径指向的也必须是字典,除最后一项。
:param dict_obj: 字典对象
:param path: 字典中的 key 路径,以"."号分隔
:param default: 默认值
:return:
"""
_dict: Optional[Dict[str, Any]] = dict_obj
_keys = path.split(".")
if len(_keys) > 1:
# 遍历到倒数第二项
for _key in _keys[:-1]:
_dict = _dict.get(_key, None)
if not isinstance(_dict, dict):
return default
# 返回最后一项内容
return _dict.get(_keys[-1], default)
+293
View File
@@ -0,0 +1,293 @@
import base64
import datetime
import io
import os
import re
import unicodedata
from typing import Optional, IO, Union
import cv2
import numpy as np
from PIL import Image
file_types = {
'jpeg': (b'\xFF\xD8\xFF', b'\xff\xd8\xff'),
'png': (b'\x89PNG',),
'gif': (b'GIF8',),
'bmp': (b'BM',),
'tiff': (b'II*\x00', b'MM\x00*'),
'webp': (b'RIFF\x00\x00\x00\x00WEBP',),
'ico': (b'\x00\x00\x01\x00',),
'psd': (b'8BPS',),
'svg': (b'<svg', b'<?xml'),
'mp3': (b'\x49\x44\x33', b'\xFF\xFB\x50'),
'm4a': (b'\x00\x00\x00\x20\x66\x74\x79\x70\x4D\x34\x41\x20\x00\x00\x00\x00',),
'wav': (b'RIFF\x00\x00\x00\x00WAVE',),
'flac': (b'fLaC',),
'aac': (b'\xFF\xF1', b'\xFF\xF9'),
'aiff': (b'FORM\x00\x00\x00\x00AIFF',),
'au': (b'.snd',),
'mov': (b'\x00\x00\x0F', b'\x00\x00\x77', b'\x6D\x6F\x6F\x76', b'\x6d\x64\x61\x74'),
'mp4': (b'\x00\x00\x00\x14', b'\x00\x00\x00\x18', b'\x00\x00\x00\x1C', b'\x00\x00\x00\x20'),
'mpg': (b'\x00\x00\x01\xB3', b'\x00\x00\x01\xBA'),
'avi': (b'RIFF\x00\x00\x00\x00AVI',),
'mkv': (b'\x1A\x45\xDF\xA3',),
'wmv': (b'\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C',),
'flv': (b'FLV\x01',),
'3gp': (b'\x00\x00\x00\x18\x66\x74\x79\x70\x33\x67\x70',),
'pdf': (b'\x25PDF',),
('doc', 'ppt', 'xls'): (b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1',),
('docx', 'pptx', 'xlsx'): (b'PK\x03\x04\n\x00\x00\x00\x00\x00\x87N',),
'rtf': (b'{\\rtf',),
'txt': (b'\xEF\xBB\xBF', b'\xFE\xFF', b'\xFF\xFE'),
'zip': (b'PK\x03\x04',),
'rar': (b'Rar!\x1A\x07\x00',),
'tar': (b'\x75\x73\x74\x61\x72\x00\x00\x00',),
'gz': (b'\x1F\x8B',),
'7z': (b'7z\xBC\xAF\x27\x1C',),
}
"""
可检测的文件格式配置。
"""
def inspect_type(file_data: Optional[bytes] = None, file_io: Optional[IO] = None):
"""
检测文件类型。
:param file_io: 文件输入输出对象
:param file_data: 文件数据内容
:return: 检测结果
"""
assert file_data is not None or file_io is not None, '至少传入文件内容或文件输入输出对象之一.'
if file_data is None and file_io is not None:
# 若未传入数据内容,仅传入文件对象,则从文件对象读取前100字节数据
file_data = file_io.read(1024*2)
if len(file_data) > 1024*2:
file_data = file_data[:1024*2]
file_type: Optional[Union[str, tuple[str]]] = ''
for _key, _val in file_types.items():
for _bs in _val:
if file_data.startswith(_bs):
file_type = _key
break
if file_type:
break
if isinstance(file_type, tuple):
if file_type[0] == 'doc':
# 使用读取到的全部数据(≤4KB)进行启发式判断
file_type = _heuristic_office_type(file_data)
elif file_type[0] == 'docx':
# 使用读取到的全部数据(≤4KB)进行启发式判断
file_type = _heuristic_office_x_type(file_data)
return file_type
def _heuristic_office_type(data: bytes) -> str:
"""
仅基于前 4KB 数据,启发式判断是 .doc、.xls 还是 .ppt
依据:各格式在 OLE 结构中的典型字符串偏移位置
"""
# 关键词及其对应类型
patterns = [
(b'W\x00o\x00r\x00d\x00D\x00o\x00c\x00u\x00m\x00e\x00n\x00t', 'doc'),
(b'W\x00o\x00r\x00d', 'doc'),
(b'WordDocument', 'doc'),
(b'Word', 'doc'),
(b'W\x00o\x00r\x00k\x00b\x00o\x00o\x00k', 'xls'),
(b'B\x00o\x00o\x00k', 'xls'),
(b'Workbook', 'xls'),
(b'Book', 'xls'),
(b'P\x00o\x00w\x00e\x00r\x00P\x00o\x00i\x00n\x00t', 'ppt'),
(b'PowerPoint', 'ppt'),
]
# 一次性遍历:在 data 中查找任一关键词
# 由于模式短,且数据小(≤4KB),用简单循环即可
for keyword, file_type in patterns:
if keyword in data:
return file_type
# 未匹配时,保守返回 ""
return ""
def _heuristic_office_x_type(data: bytes) -> str:
"""
仅用 `in` 判断 .docx/.xlsx/.pptx,精准匹配 Open XML 标准 MIME 类型
不解压、不解析、不猜,就看有没有那三个关键字符串
"""
# 关键词及其对应类型
patterns = [
(b'word/PK', 'docx'),
(b'xl/PK', 'xlsx'),
(b'ppt/PK', 'pptx'),
]
# 一次性遍历:在 data 中查找任一关键词
# 由于模式短,且数据小(≤4KB),用简单循环即可
for keyword, file_type in patterns:
if keyword in data:
return file_type
# 未匹配时,保守返回 ""
return ""
def get_file_info(file_path):
"""
取得文件信息,包括:文件大小、创建时间。
:param file_path: 文件绝对路径
:return: 大小,创建时间
"""
_ctime = datetime.datetime.fromtimestamp(os.path.getctime(file_path))
_ctime = _ctime.strftime('%Y-%m-%d %H:%M:%S')
_f_size = os.path.getsize(file_path)
# 将字节转换为 KB
_size_kb = _f_size / 1024
if _size_kb < 1024:
return f"{_size_kb:.2f} KB", _ctime
# 将 KB 转换为 MB
_size_mb = _size_kb / 1024
if _size_mb < 1024:
return f"{_size_mb:.2f} MB", _ctime
# 将 MB 转换为 GB
_size_gb = _size_mb / 1024
return f"{_size_gb:.2f} GB", _ctime
def read_to_buffer(file) -> bytes:
"""
以二进制只读方式从文件载入数据到字节流。
"""
assert os.path.isfile(file), 'File not found: %s' % file
with open(file, 'rb') as f:
buf = f.read(os.path.getsize(file))
f.close()
return buf
def sanitize_filename(filename: str) -> str:
"""
统一严格过滤文件名中的非法字符(跨 Windows/Linux/macOS 安全)。
规则:
1. 过滤所有系统禁止的字符(包括控制字符 \x00-\x1f
2. 处理 Windows 保留名称(如 CON、NUL 等)
3. 替换空格和 # 为下划线
4. 禁止以空格或点开头/结尾
5. 限制文件名长度(255 字符)
:param filename: 文件名
:return: 替换非法字符为 _ 的安全文件名
"""
# 1. Unicode 规范化(防止混淆攻击)
filename = unicodedata.normalize("NFKC", filename)
# 2. 替换所有非法字符为下划线(包括空格和 #)
# 包括:\ / : * ? " < > | \x00-\x20(控制字符和空格)#
safe_name = re.sub(r'[\\/:*?"<>|\x00-\x20#]', '_', filename)
# 3. 处理 Windows 保留名称(如 CON.txt -> _CON.txt
win_reserved = [
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4",
"LPT1", "LPT2", "LPT3", "CLOCK$"
]
if safe_name.upper().split(".")[0] in win_reserved:
safe_name = f"_{safe_name}"
# 4. 移除首尾空格和点(避免隐形问题)
safe_name = safe_name.strip(". ")
# 5. 确保文件名非空(如果输入全是非法字符)
if not safe_name:
safe_name = "unnamed_file"
# 6. 限制长度(Windows 最大 255 字符)
return safe_name[:255]
def check_and_create_dir(file_path, mode=0o777, exist_ok=False):
"""
根据传入的文件路径检查目录是否存在,若不存在,则创建。
:param file_path: 文件路径
:param mode: 目录权限,默认最高权限
:param exist_ok: 目录存在时,是否抛出异常,默认不抛出
"""
# 获取文件所在目录
_directory = os.path.dirname(file_path)
if not os.path.exists(_directory):
# 如果目录不存在,则创建目录
os.makedirs(_directory, mode, exist_ok)
def load_image_from_base64(base64_str):
"""
将 Base64 字符串转换为 face_recognition 可用的 numpy 数组。
:param base64_str: 经过 Base64 编码的图像数据
:return: 图像数据(numpy数组)
"""
try:
# 移除 Base64 头部(如果存在)
if "," in base64_str:
base64_str = base64_str.split(",")[1]
# 解码为二进制
image_data = base64.b64decode(base64_str)
# 验证图像完整性
Image.open(io.BytesIO(image_data)).verify()
# 转换为 RGB numpy 数组
image = Image.open(io.BytesIO(image_data))
if image.mode != "RGB":
image = image.convert("RGB")
return np.array(image)
except Exception as e:
raise ValueError(f"无效的 Base64 图像数据: {e}")
def load_png_from_base64(base64_str):
"""
从Base64字符串读取PNG图像并保留Alpha通道。
:param base64_str: 经过 Base64 编码的 PNG 图像数据
:return: 图像数据(numpy数组),包含BGRA四个通道
"""
try:
# 1. 解码Base64字符串
img_data = base64.b64decode(base64_str)
# 2. 将字节数据转换为numpy数组
np_array = np.frombuffer(img_data, np.uint8)
# 3. 使用IMREAD_UNCHANGED标志解码图像以保留Alpha通道
img = cv2.imdecode(np_array, cv2.IMREAD_UNCHANGED)
# 4. 检查是否成功读取
if img is None:
raise ValueError("无法解码图像数据")
# 5. 检查是否有Alpha通道
if img.shape[2] != 4:
print("警告: 图像没有Alpha通道,将添加全不透明Alpha通道")
# 将BGR转换为BGRA,添加全不透明Alpha通道
img = cv2.cvtColor(img, cv2.COLOR_BGR2BGRA)
return img
except Exception as e:
print(f"读取Base64图像时出错: {str(e)}")
return None
+58
View File
@@ -0,0 +1,58 @@
from matplotlib import pyplot as plt
from matplotlib.font_manager import fontManager, FontProperties
def get_fonts():
"""
取得系统字体,并与要采用的字体合并后,取得可用字体。
"""
# 系统所有可用字体
os_fonts = {f.name for f in fontManager.ttflist}
# 自定义字体,优先级按顺序排列
custom_fonts = (
'PingFang SC', 'Hiragino Sans GB', 'Heiti SC', 'SimSong', 'SimHei',
'WenQuanYi Micro Hei', 'WenQuanYi Zen Hei', 'Source Han Sans SC',
'Noto Sans CJK', 'Noto Sans CJK SC', 'DejaVu Sans'
)
# 可用字体
available_font = set(custom_fonts) & os_fonts
# 字典排序
available_font = sorted(
available_font, key=lambda x: custom_fonts.index(x) if x in custom_fonts else len(custom_fonts)
)
return available_font
def get_font_metrics(font_name='Microsoft YaHei', font_size=11, dpi=72):
"""
使用 matplotlib 获取字体度量信息。
:param font_name: 字体名称
:param font_size: 字号
:param dpi: 显示像素,像素没英寸
:return: (英文字符宽度_cm, 中文字符宽度_cm)
"""
# 创建高分辨率图形
fig = plt.figure(figsize=(10, 2), dpi=dpi)
ax = fig.add_subplot(111)
ax.axis('off')
# 设置字体
font = FontProperties(family=font_name, size=font_size)
# 测试英文字符
text_en = ax.text(0.1, 0.5, 'aaaaa', fontproperties=font)
fig.canvas.draw()
en_width_px = text_en.get_window_extent().width / 5 # 5个字符的平均宽度
# 测试中文字符
text_cn = ax.text(0.1, 0.5, '中中中中中', fontproperties=font)
fig.canvas.draw()
cn_width_px = text_cn.get_window_extent().width / 5 # 5个字符的平均宽度
plt.close(fig)
# 转换为厘米
px_per_cm = dpi / 2.54
# 增加100%宽度
return en_width_px / px_per_cm * 2, cn_width_px / px_per_cm * 2
+214
View File
@@ -0,0 +1,214 @@
"""
基本公共函数。
"""
import base64
import os
import re
from typing import Union
from urllib.parse import urlparse
import requests
from paste.db import basemodel
def fetch_image(img_url: str) -> tuple[requests.Response, str]:
"""
获取外部图像。
:param img_url: 图像 URL
:return: (响应对象,内容类型)
:raises ValueError: URL 格式无效
:raises requests.exceptions.RequestException: 请求失败
"""
# 验证 URL 格式
parsed_url = urlparse(img_url)
if not all([parsed_url.scheme, parsed_url.netloc]):
raise ValueError("Invalid URL")
# 设置请求头,模拟浏览器请求
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/91.0.4472.124 Safari/537.36'
}
# 获取外部图像
response = requests.get(img_url, headers=headers, stream=True, timeout=10)
response.raise_for_status()
# 获取内容类型,如果没有则默认为 image/jpeg
content_type = response.headers.get('Content-Type', 'image/jpeg')
return response, content_type
def save_image_to_dir(image_data: bytes, image_type: str, output_dir: str) -> str:
"""
将图像数据保存到指定目录,返回相对路径。
:param image_data: 图像二进制数据
:param image_type: 图像扩展名(如 'jpg', 'png'
:param output_dir: 输出目录(相对于项目根目录,如 'static/upload/article/images'
:return: 保存后的相对路径(以 / 开头)
"""
# 生成唯一文件名
filename = f"{basemodel.BaseModel.newId()}.{image_type}"
full_path = os.path.abspath(os.path.join(os.curdir, output_dir, filename))
# 确保目录存在
os.makedirs(os.path.dirname(full_path), exist_ok=True)
# 保存图像
with open(full_path, 'wb') as f:
f.write(image_data)
# 返回相对路径(以 / 开头)
rel_path = os.path.join(output_dir, filename).replace('\\', '/')
if not rel_path.startswith('/'):
rel_path = '/' + rel_path
return rel_path
def download_and_save_image(url: str, output_dir: str) -> Union[str, None]:
"""
从外部 URL 下载图像并保存到指定目录。
:param url: 外部图像的完整 URL
:param output_dir: 输出目录
:return: 保存成功时返回相对路径,失败时返回 None
"""
try:
res_img, res_content_type = fetch_image(url)
# 提取扩展名
image_type = res_content_type.split('/')[1].split(';')[0].strip() if '/' in res_content_type else 'jpg'
# 验证扩展名安全性
allowed_extensions = {'jpg', 'jpeg', 'png', 'gif', 'webp', 'svg', 'bmp'}
if image_type not in allowed_extensions:
image_type = 'jpg'
# 收集图像数据
image_data = b''.join(res_img.iter_content(1024))
# 保存到本地
new_src = save_image_to_dir(image_data, image_type, output_dir)
return new_src
except Exception:
return None
def decode_base64_image(header: str, data: str, output_dir: str) -> str:
"""
解码 base64 格式的图像数据并保存到指定目录。
:param header: base64 数据头
:param data: base64 编码的图像数据
:param output_dir: 输出目录
:return: 保存后的相对路径
"""
# 从 header 中获取图像类型
image_type = header.split(';')[0].split('/')[1]
# 验证扩展名安全性
allowed_extensions = {'jpg', 'jpeg', 'png', 'gif', 'webp', 'svg', 'bmp'}
if image_type not in allowed_extensions:
image_type = 'jpg'
# 解码并保存
image_data = base64.b64decode(data)
return save_image_to_dir(image_data, image_type, output_dir)
def extract_image_paths(html_content: str) -> list[dict]:
"""
从 HTML 内容中提取所有图像的 src 信息。
该方法用于识别文章中引用的所有图像资源,返回详细的图像信息列表。
:param str html_content: HTML 内容
:return: 图像信息列表,每个元素包含 src 值和类型
:rtype: list[dict]
返回结构::
[
{
'original': 'https://external.com/img.jpg', # 原始 src 值
'src': '/static/upload/article/images/abc.jpg', # 标准化后的本地路径(external/base64 为 None
'type': 'external', # local: 本地路径,domain: 本地域名,external: 外部域名,base64: base64 数据
'url': 'https://external.com/img.jpg' # 完整 URL(仅 external 类型有值)
}
]
注意::
- local/domain 类型:src 为标准化本地路径
- external 类型:src 为 Noneurl 为原始外部 URL
- base64 类型:src 为 Noneurl 为 None
"""
# 允许的本地域名列表
allowed_domains = {
'haiten.cn', 'www.haiten.cn', 'usasu.cn', 'www.usasu.cn', 'pathx.cn', 'www.pathx.cn',
'127.0.0.1', '100.64.0.18', 'localhost'
}
# 改进的正则表达式:
# - 允许 src 是第一个属性
# - 支持单引号和双引号
# - 确保引号成对匹配
# - 支持跨行匹配
img_pattern = re.compile(
r'<img[^>]*?\s+src\s*=\s*(["\'])([^"\']+?)\1[^>]*?>?',
re.IGNORECASE | re.DOTALL
)
images = []
for match in img_pattern.finditer(html_content):
original_src = match.group(2) # 捕获组 2 是 src 的值
image_info = {
'original': original_src,
'src': None,
'type': None,
'url': None
}
# 判断图像类型
if original_src.startswith('data:image'):
# base64 数据
image_info['type'] = 'base64'
elif original_src.startswith(('http://', 'https://')):
parsed_url = urlparse(original_src)
domain = parsed_url.netloc.split(':')[0]
if domain in allowed_domains:
# 本地域名 - 转换为相对路径
new_src = parsed_url.path
if parsed_url.query:
new_src += f"?{parsed_url.query}"
if parsed_url.fragment:
new_src += f"#{parsed_url.fragment}"
# 确保路径以 / 开头
if not new_src.startswith('/'):
new_src = '/' + new_src
image_info['src'] = new_src
image_info['type'] = 'domain'
else:
# 外部域名
image_info['type'] = 'external'
image_info['url'] = original_src
else:
# 本地相对路径
# 确保路径以 / 开头
if not original_src.startswith('/'):
original_src = '/' + original_src
image_info['src'] = original_src
image_info['type'] = 'local'
images.append(image_info)
return images
+218
View File
@@ -0,0 +1,218 @@
import datetime
import gzip
import io
import re
from typing import List
from urllib.parse import quote
def str_q_count(ustring):
"""
汉字加全角字符数量。
:param ustring: 待扫描文本
:return: 全角字符数量
"""
count = 0
for uchar in ustring:
inside_code = ord(uchar)
if '\u4e00' <= uchar <= '\u9fff' or 65281 <= inside_code <= 65374:
count += 1
return count
def str_q2b(ustring):
"""
全角转半角。
:param ustring: 待转换文本
:return: 转换后的文本
"""
r_str = ""
for uchar in ustring:
inside_code = ord(uchar)
if inside_code == 12288:
# 全角空格直接转换
inside_code = 32
elif 65281 <= inside_code <= 65374:
# 全角字符(除空格)根据关系转化
inside_code -= 65248
r_str += chr(inside_code)
return r_str
def str_b2q(ustring):
"""
半角转全角。
:param ustring: 待转换文本
:return: 转换后的文本
"""
r_str = ""
for uchar in ustring:
inside_code = ord(uchar)
if inside_code == 32:
# 半角空格直接转化
inside_code = 12288
elif 32 <= inside_code <= 126:
# 半角字符(除空格)根据关系转化
inside_code += 65248
r_str += chr(inside_code)
return r_str
def str_gzip(data: str):
"""
创建gzip压缩数据。
:param data: 待压缩的数据
"""
buffer = io.BytesIO()
with gzip.GzipFile(fileobj=buffer, mode='w') as f:
f.write(data.encode('utf-8'))
_compressed_data = buffer.getvalue()
return _compressed_data
def is_contains_chinese(text, length: int = None):
"""
检查字符串中是否包含中文字符。
:param text: 要检查的字符串
:param length: 可选参数,要求中文字符的最小数量
:return: 如果包含中文字符返回True,否则返回False
"""
chinese_chars = [char for char in text if '\u4e00' <= char <= '\u9fff']
if not chinese_chars:
# 如果没有中文字符
return False
if length is not None:
# 如果指定了length参数
return len(chinese_chars) >= length
return True # 默认情况,只要包含中文就返回True
def is_valid_id_number(id_str):
"""
检查字符串是否符合中国居民身份证号码格式。
支持15位和18位身份证号码,包括校验位验证
:param id_str: 要检查的字符串
:return: 如果符合格式返回True,否则返回False
"""
# 正则表达式匹配
pattern = r'^[1-9]\d{5}(19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[\dXx]$'
if not re.match(pattern, id_str):
return False
# 如果是15位身份证,直接返回True(15位不包含校验位)
if len(id_str) == 15:
return True
# 18位身份证校验位验证
# 权重系数
weight = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2]
# 校验码对应值
validate = ['1', '0', 'X', '9', '8', '7', '6', '5', '4', '3', '2']
# 计算校验位
sum_val = 0
for i in range(17):
sum_val += int(id_str[i]) * weight[i]
mod_val = sum_val % 11
if validate[mod_val].upper() != id_str[17].upper():
return False
return True
def is_valid_phone_number(phone_str):
"""
验证是否是中国大陆合法的手机号码。
:param phone_str: 要检查的字符串
:return: 如果是合法手机号返回True,否则返回False
"""
# 2023年中国大陆手机号正则表达式
pattern = r'^1(3[0-9]|4[5-9]|5[0-35-9]|6[2567]|7[0-8]|8[0-9]|9[0-35-9])\d{8}$'
return bool(re.fullmatch(pattern, phone_str))
def is_valid_postcode(postcode):
"""
验证中国邮政编码是否合法
:param postcode: 要验证的邮编字符串或数字
:return: 如果合法返回True,否则返回False
"""
# 转换为字符串处理
postcode_str = str(postcode)
# 中国邮政编码规则:
# 1. 6位数字
# 2. 第一位不能是0
pattern = r'^[1-9]\d{5}$'
return bool(re.fullmatch(pattern, postcode_str))
def encode_path_to_url(local_path: str) -> str:
"""
将本地文件路径转换为URL编码的相对路径
参数:
local_path: 本地路径(如 "C:\\data\\报告.pdf""/var/www/文件.txt")
返回:
URL编码的相对路径(如 "data/%E6%8A%A5%E5%91%8A.pdf")
处理逻辑:
1. 统一路径分隔符为/
2. 移除Windows盘符
3. 分段编码每个路径部分
4. 保留路径中的/分隔符
"""
# 统一路径分隔符为POSIX格式
normalized_path = local_path.replace('\\', '/')
# 移除Windows盘符(如 C:/)
normalized_path = re.sub(r'^[A-Za-z]:/', '', normalized_path)
# 移除开头多余的/
normalized_path = normalized_path.lstrip('/')
# 分段处理每个路径部分
encoded_parts = []
for part in normalized_path.split('/'):
if part:
# 对每个路径段进行URL编码(保留. _ - 不编码)
encoded_part = quote(part, safe='.-_')
encoded_parts.append(encoded_part)
# 拼接编码后的路径
return '/'.join(encoded_parts)
def to_datetime(dt_str: str, fmt_list: List[str]):
"""
字符串转时间日期对象。
:param dt_str: 需要转日期格式的字符串
:param fmt_list: 用于转换的日期格式列表,注意将最有可能的放在前面
"""
_date = None
for _fmt in fmt_list:
if _date is None:
try:
_date = datetime.datetime.strptime(dt_str, _fmt)
except (ValueError, Exception):
pass
else:
return _date
return _date
+154
View File
@@ -0,0 +1,154 @@
from typing import Union, List, Optional, Dict, Any
import pandas as pd
from paste.util import ufont
def cm_to_excel_units(cm):
"""
厘米转Excel列宽单位。
:param cm: 厘米单位
"""
return cm / 2.54 * 7 # 1英寸=2.54厘米, 1Excel单位=1/7英寸
def auto_width_cm(series: pd.Series, font_name='Microsoft YaHei', font_size=11, min_cm=1.5, max_cm=20):
"""
自动列宽计算方法(区分中英文)。
:param series: pandas Series (数据列)
:param font_name: 字体名称
:param font_size: 字号
:param min_cm: 最小列宽(厘米)
:param max_cm: 最大列宽(厘米)
:return: 建议的列宽(厘米)
"""
# 获取精确字体度量
en_width_cm, cn_width_cm = ufont.get_font_metrics(font_name, font_size)
def calculate_text_width(text):
"""计算文本总宽度"""
cn_count = 0
en_count = 0
for char in str(text):
if '\u4e00' <= char <= '\u9fff':
cn_count += 1
else:
en_count += 1
_total_width = (cn_count * cn_width_cm) + (en_count * en_width_cm)
return _total_width
# 计算列标题宽度
title_width = calculate_text_width(series.name)
# 计算数据内容最大宽度
content_width = series.astype(str).apply(calculate_text_width).max()
# 取最大值并增加边距,15%额外边距
total_width = max(title_width, content_width) * 1.2
return max(min(total_width, max_cm), min_cm)
def auto_column_width(df, worksheet):
"""
根据内容自动设置列的宽度。
:param df: pandas DataFrame
:param worksheet: 工作表
"""
_font_name_set = ufont.get_fonts()
for col_num, col_name in enumerate(df.columns):
# 计算列宽
width_cm = auto_width_cm(df[col_name], font_name=_font_name_set[0], font_size=11)
# 设置列宽
worksheet.set_column(col_num, col_num, cm_to_excel_units(width_cm))
def apply_header_style(df, worksheet, workbook, **kwargs):
"""
应用表头样式。
:param df: 原始 DataFrame
:param worksheet: xlsxwriter worksheet对象
:param workbook: xlsxwriter workbook对象
:param kwargs: 样式参数
:return: 无
"""
_style_sheet = {
'font_size': 12,
'bg_color': '#F2F2F2',
'border': 1,
'bold': True,
}
_header_style = workbook.add_format({**_style_sheet, **kwargs})
for col_num, value in enumerate(df.columns.values):
worksheet.write(0, col_num, value, _header_style)
def apply_data_style(df, worksheet, workbook, **kwargs):
"""
应用数据单元格样式。
:param df: 原始 DataFrame
:param worksheet: xlsxwriter worksheet对象
:param workbook: xlsxwriter workbook对象
:param kwargs: 样式参数
:return: 无
"""
_style_sheet = {
'font_size': 12,
'border': 1,
}
_cell_style = workbook.add_format({**_style_sheet, **kwargs})
for row in range(1, len(df) + 1):
for col in range(0, len(df.columns)):
worksheet.write(row, col, df.iloc[row - 1, col], _cell_style)
def insert_text_to_column(
worksheet,
workbook,
column: int,
start_row: int,
texts: Union[str, List[str]],
text_format: Optional[Dict[str, Any]] = None
) -> None:
"""
向Excel表格的指定列插入文本。
:param worksheet: xlsxwriter worksheet
:param workbook: xlsxwriter workbook
:param column: 列号
:param start_row: 开始插入的行号(1-based)
:param texts: 要插入的文本(字符串或字符串列表)
:param text_format: 格式字典,None则使用默认格式
:return: None
"""
# 设置默认格式
default_format = {
'font_size': 12,
}
# 合并用户自定义格式
fmt = workbook.add_format({**default_format, **(text_format or {})})
# 转换列号为数字索引(1-based)
if isinstance(column, str):
col_idx = ord(column.upper()) - ord('A') + 1
else:
col_idx = column
# 确保texts是列表形式
if isinstance(texts, str):
texts = [texts]
# 处理每行数据
for i, text in enumerate(texts):
row_num = start_row + i
worksheet.write(row_num, col_idx, text, fmt)