| import json |
| import os |
|
|
| from dotenv import load_dotenv |
| from loguru import logger |
|
|
| from xhs_utils.cookie_util import trans_cookies |
|
|
|
|
| def _mask(s: str) -> str: |
| if not s: |
| return "" |
| if len(s) <= 8: |
| return "*" * len(s) |
| return s[:4] + "*" * (len(s) - 8) + s[-4:] |
|
|
|
|
| class SessionManager: |
| def __init__(self, cookies_file: str | None = None): |
| base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) |
| self.base_dir = base_dir |
| self.cookies_file = cookies_file or os.path.join(base_dir, "datas", "cookies.json") |
| self.env_file = os.path.join(base_dir, ".env") |
|
|
| def validate_cookie(self, cookies_str: str | None): |
| if not cookies_str or not cookies_str.strip(): |
| return False, "cookies_empty" |
| try: |
| cookies = trans_cookies(cookies_str) |
| except Exception: |
| return False, "cookies_parse_failed" |
| if "a1" not in cookies or not cookies["a1"]: |
| return False, "missing_a1" |
| return True, "ok" |
|
|
| def load_from_env(self) -> str | None: |
| load_dotenv() |
| cookies_str = os.getenv("COOKIES") |
| if cookies_str is None: |
| return None |
| return cookies_str.strip() |
|
|
| def load_from_file(self) -> str | None: |
| if not os.path.exists(self.cookies_file): |
| return None |
| with open(self.cookies_file, "r", encoding="utf-8") as f: |
| data = json.load(f) |
| if not isinstance(data, dict): |
| return None |
| val = data.get("COOKIES") |
| if not val: |
| return None |
| return str(val).strip() |
|
|
| def save_to_file(self, cookies_str: str): |
| os.makedirs(os.path.dirname(self.cookies_file), exist_ok=True) |
| with open(self.cookies_file, "w", encoding="utf-8") as f: |
| json.dump({"COOKIES": cookies_str}, f, ensure_ascii=False, separators=(",", ":")) |
|
|
| def save_to_env_file(self, cookies_str: str, env_file: str | None = None): |
| env_file = os.path.abspath(env_file or self.env_file) |
| os.makedirs(os.path.dirname(env_file), exist_ok=True) |
| if os.path.exists(env_file): |
| with open(env_file, "r", encoding="utf-8") as f: |
| lines = f.read().splitlines() |
| else: |
| lines = [] |
|
|
| safe = cookies_str.replace("\\", "\\\\").replace("\"", "\\\"") |
| new_line = f"COOKIES=\"{safe}\"" |
|
|
| replaced = False |
| out = [] |
| for line in lines: |
| if line.strip().startswith("COOKIES="): |
| out.append(new_line) |
| replaced = True |
| else: |
| out.append(line) |
| if not replaced: |
| out.append(new_line) |
| with open(env_file, "w", encoding="utf-8") as f: |
| f.write("\n".join(out).rstrip("\n") + "\n") |
|
|
| def load_cookies(self, mode: str = "both") -> str | None: |
| candidates = [] |
| if mode in ("manual", "both", "auto"): |
| candidates.append(("env", self.load_from_env())) |
| candidates.append(("file", self.load_from_file())) |
| for source, cookies_str in candidates: |
| ok, reason = self.validate_cookie(cookies_str) |
| if ok: |
| logger.info(f"cookies_loaded source={source} a1={_mask(trans_cookies(cookies_str).get('a1', ''))}") |
| return cookies_str |
| if cookies_str: |
| logger.warning(f"cookies_invalid source={source} reason={reason}") |
| return None |
|
|
| def refresh_cookies_if_needed(self, cookies_str: str | None, mode: str = "both"): |
| ok, _ = self.validate_cookie(cookies_str) |
| if ok: |
| return cookies_str |
| if mode in ("auto", "both"): |
| raise ValueError("cookies_invalid_need_login") |
| return None |
|
|