Files
2026-06-02 17:46:38 +08:00

216 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
公共对接模块。
"""
import asyncio
import random
import re
from http.cookies import SimpleCookie
from typing import Optional, Dict, Any
from tornado.httpclient import HTTPRequest, HTTPResponse
from paste.web import requests
USER_AGENTS = [
# Chrome on Windows 10
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
# Firefox on Windows 10
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
# Edge on Windows 11 (Chromium-based)
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.2210.91',
# Safari on macOS
'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_2) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
# Chrome on macOS
'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
# Firefox on macOS
'Mozilla/5.0 (Macintosh; Intel Mac OS X 14.2; rv:121.0) Gecko/20100101 Firefox/121.0',
# Chrome on Linux
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
# Opera on Windows (Chromium-based)
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 OPR/106.0.0.0',
]
"""
常用 PC 端浏览器 User-Agent 列表。
"""
DEFAULT_TIMEOUT = 120.0
"""
默认超时时长。
"""
CONCURRENCY_COUNT = 5
"""
最大并发次数。
"""
MAX_RETRY_COUNT = 5
"""
最大重试次数。
"""
def get_random_user_agent() -> tuple[str, str, str]:
"""
从 user_agents 列表中随机返回一个 User-Agent 字符串及其浏览器版本和操作系统名称。
Returns:
tuple: (user_agent: str, browser_version: str, os_name: str)
"""
ua: str = random.choice(USER_AGENTS)
# 提取浏览器版本
browser_version: str = "Unknown"
if "Chrome/" in ua:
match = re.search(r"Chrome/(\d+\.\d+\.\d+\.\d+)", ua)
if match:
browser_version = match.group(1)
elif "Firefox/" in ua:
match = re.search(r"Firefox/(\d+\.\d+)", ua)
if match:
browser_version = match.group(1)
elif "Version/" in ua and "Safari/" in ua: # Safari
match = re.search(r"Version/(\d+\.\d+)", ua)
if match:
browser_version = match.group(1)
elif "Edg/" in ua:
match = re.search(r"Edg/(\d+\.\d+\.\d+\.\d+)", ua)
if match:
browser_version = match.group(1)
elif "OPR/" in ua:
match = re.search(r"OPR/(\d+\.\d+\.\d+\.\d+)", ua)
if match:
browser_version = match.group(1)
# 提取操作系统名称
os_name: str = "Unknown"
if "Mac" in ua:
os_name = "Mac"
elif "Windows" in ua:
os_name = "Windows"
elif "Linux" in ua:
os_name = "Linux"
return ua, browser_version, os_name
def get_cookies(response: HTTPResponse) -> Dict[str, str]:
"""
从响应对象读取 Cookies。
:param response: 请求响应对象
:return: 提取到的 Cookies
"""
cookies = SimpleCookie()
for set_cookie in response.headers.get_list('Set-Cookie'):
cookies.load(set_cookie)
return {k: v.value for k, v in cookies.items()}
def get_cookie_value(cookies_string, cookie_name):
"""
从 cookies 字符串中按名称提取对应的值
参数:
cookies_string: str, 格式为 "key1=value1; key2=value2; ..."
cookie_name: str, 要查找的 cookie 名称
返回:
str 或 None: 如果找到返回对应的值,否则返回 None
"""
# 按分号分割 cookie 字符串
cookies_list = cookies_string.split(';')
for cookie in cookies_list:
# 去除前后空格,然后按等号分割键值对
parts = cookie.strip().split('=', 1)
if len(parts) == 2:
key, value = parts
if key == cookie_name:
return value
# 如果没有找到,返回 None
return None
def new_http_request(
url: str,
body: Optional[Dict[str, Any]] = None,
method: str = 'POST',
timeout: Optional[float] = None,
follow_redirects: bool = True,
use_form: bool = False,
extra_headers: Optional[Dict[str, str]] = None,
**kwargs
) -> HTTPRequest:
"""
新建 HTTPRequest 对象。
支持 GET 和 POST 方法:
- GET: 参数通过 URL 查询字符串传递
- POST: 参数通过 JSON body 或 form 表单传递(由 use_form 控制)
:param url: 请求的完整 URL
:param body: 请求体(字典),GET 时为查询参数,POST 时为 JSON 或 form 数据
:param method: HTTP 方法,仅支持 'GET''POST'
:param timeout: 请求超时时间(秒)
:param follow_redirects: 是否跟随重定向
:param use_form: 如果为 TruePOST 时使用 application/x-www-form-urlencoded 格式;否则使用 JSON
:param extra_headers: 可选的额外请求头,用于传入 Cookie、Authorization 等
:param kwargs: 其他参数,符合 tornado.httpclient.HTTPRequest 参数要求
:return: tornado.httpclient.HTTPRequest 对象
:raises ValueError: 当 method 不合法时抛出
"""
return requests.build_http_request(
url, body, method, timeout, follow_redirects, use_form, extra_headers,
**kwargs
)
async def scrape_cookies(url: str, timeout: Optional[float] = 10,
extra_headers: Optional[Dict[str, str]] = None,
**kwargs) -> Dict[str, str]:
"""
发送 GET 请求到 url 以获取服务端下发的 Cookies(如 JSESSIONID)。
不关心响应体,只提取响应头中的 Set-Cookie。
返回解析后的 Cookie 字典 { 'name': 'value', ... }
:param url: 获取 Cookies 的路径
:param timeout: 超时时间
:param extra_headers: 扩展头
:return: Cookies 读取到的 Cookies
"""
cookies: Optional[Dict[str, str]] = None
request = new_http_request(
url=url,
method='GET',
timeout=timeout,
follow_redirects=True,
extra_headers=extra_headers,
**kwargs
)
def after_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]):
nonlocal cookies
cookies = get_cookies(response)
request_queue = asyncio.Queue()
await request_queue.put(request)
await requests.async_concurrency(
request_queue, con_count=1, retry=MAX_RETRY_COUNT,
after_request=after_request
)
if cookies is None:
# 无正确响应,抛出异常
raise Exception(f"未能读取到 Cookies.")
return cookies