""" 公共对接模块。 """ import asyncio import random import re from http.cookies import SimpleCookie from typing import Optional, Dict, Any from tornado.httpclient import HTTPRequest, HTTPResponse from paste.web import requests USER_AGENTS = [ # Chrome on Windows 10 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', # Firefox on Windows 10 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0', # Edge on Windows 11 (Chromium-based) 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.2210.91', # Safari on macOS 'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_2) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15', # Chrome on macOS 'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', # Firefox on macOS 'Mozilla/5.0 (Macintosh; Intel Mac OS X 14.2; rv:121.0) Gecko/20100101 Firefox/121.0', # Chrome on Linux 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', # Opera on Windows (Chromium-based) 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 OPR/106.0.0.0', ] """ 常用 PC 端浏览器 User-Agent 列表。 """ DEFAULT_TIMEOUT = 120.0 """ 默认超时时长。 """ CONCURRENCY_COUNT = 5 """ 最大并发次数。 """ MAX_RETRY_COUNT = 5 """ 最大重试次数。 """ def get_random_user_agent() -> tuple[str, str, str]: """ 从 user_agents 列表中随机返回一个 User-Agent 字符串及其浏览器版本和操作系统名称。 Returns: tuple: (user_agent: str, browser_version: str, os_name: str) """ ua: str = random.choice(USER_AGENTS) # 提取浏览器版本 browser_version: str = "Unknown" if "Chrome/" in ua: match = re.search(r"Chrome/(\d+\.\d+\.\d+\.\d+)", ua) if match: browser_version = match.group(1) elif "Firefox/" in ua: match = re.search(r"Firefox/(\d+\.\d+)", ua) if match: browser_version = match.group(1) elif "Version/" in ua and "Safari/" in ua: # Safari match = re.search(r"Version/(\d+\.\d+)", ua) if match: browser_version = match.group(1) elif "Edg/" in ua: match = re.search(r"Edg/(\d+\.\d+\.\d+\.\d+)", ua) if match: browser_version = match.group(1) elif "OPR/" in ua: match = re.search(r"OPR/(\d+\.\d+\.\d+\.\d+)", ua) if match: browser_version = match.group(1) # 提取操作系统名称 os_name: str = "Unknown" if "Mac" in ua: os_name = "Mac" elif "Windows" in ua: os_name = "Windows" elif "Linux" in ua: os_name = "Linux" return ua, browser_version, os_name def get_cookies(response: HTTPResponse) -> Dict[str, str]: """ 从响应对象读取 Cookies。 :param response: 请求响应对象 :return: 提取到的 Cookies """ cookies = SimpleCookie() for set_cookie in response.headers.get_list('Set-Cookie'): cookies.load(set_cookie) return {k: v.value for k, v in cookies.items()} def get_cookie_value(cookies_string, cookie_name): """ 从 cookies 字符串中按名称提取对应的值 参数: cookies_string: str, 格式为 "key1=value1; key2=value2; ..." cookie_name: str, 要查找的 cookie 名称 返回: str 或 None: 如果找到返回对应的值,否则返回 None """ # 按分号分割 cookie 字符串 cookies_list = cookies_string.split(';') for cookie in cookies_list: # 去除前后空格,然后按等号分割键值对 parts = cookie.strip().split('=', 1) if len(parts) == 2: key, value = parts if key == cookie_name: return value # 如果没有找到,返回 None return None def new_http_request( url: str, body: Optional[Dict[str, Any]] = None, method: str = 'POST', timeout: Optional[float] = None, follow_redirects: bool = True, use_form: bool = False, extra_headers: Optional[Dict[str, str]] = None, **kwargs ) -> HTTPRequest: """ 新建 HTTPRequest 对象。 支持 GET 和 POST 方法: - GET: 参数通过 URL 查询字符串传递 - POST: 参数通过 JSON body 或 form 表单传递(由 use_form 控制) :param url: 请求的完整 URL :param body: 请求体(字典),GET 时为查询参数,POST 时为 JSON 或 form 数据 :param method: HTTP 方法,仅支持 'GET' 或 'POST' :param timeout: 请求超时时间(秒) :param follow_redirects: 是否跟随重定向 :param use_form: 如果为 True,POST 时使用 application/x-www-form-urlencoded 格式;否则使用 JSON :param extra_headers: 可选的额外请求头,用于传入 Cookie、Authorization 等 :param kwargs: 其他参数,符合 tornado.httpclient.HTTPRequest 参数要求 :return: tornado.httpclient.HTTPRequest 对象 :raises ValueError: 当 method 不合法时抛出 """ return requests.build_http_request( url, body, method, timeout, follow_redirects, use_form, extra_headers, **kwargs ) async def scrape_cookies(url: str, timeout: Optional[float] = 10, extra_headers: Optional[Dict[str, str]] = None, **kwargs) -> Dict[str, str]: """ 发送 GET 请求到 url 以获取服务端下发的 Cookies(如 JSESSIONID)。 不关心响应体,只提取响应头中的 Set-Cookie。 返回解析后的 Cookie 字典 { 'name': 'value', ... } :param url: 获取 Cookies 的路径 :param timeout: 超时时间 :param extra_headers: 扩展头 :return: Cookies 读取到的 Cookies """ cookies: Optional[Dict[str, str]] = None request = new_http_request( url=url, method='GET', timeout=timeout, follow_redirects=True, extra_headers=extra_headers, **kwargs ) def after_request(response: HTTPResponse, retry_queue: asyncio.Queue[HTTPRequest]): nonlocal cookies cookies = get_cookies(response) request_queue = asyncio.Queue() await request_queue.put(request) await requests.async_concurrency( request_queue, con_count=1, retry=MAX_RETRY_COUNT, after_request=after_request ) if cookies is None: # 无正确响应,抛出异常 raise Exception(f"未能读取到 Cookies.") return cookies