XHS / xhs_utils /session_manager.py
Trae Bot
Upload Spider_XHS project
c481f8a
import json
import os
from dotenv import load_dotenv
from loguru import logger
from xhs_utils.cookie_util import trans_cookies
def _mask(s: str) -> str:
if not s:
return ""
if len(s) <= 8:
return "*" * len(s)
return s[:4] + "*" * (len(s) - 8) + s[-4:]
class SessionManager:
def __init__(self, cookies_file: str | None = None):
base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
self.base_dir = base_dir
self.cookies_file = cookies_file or os.path.join(base_dir, "datas", "cookies.json")
self.env_file = os.path.join(base_dir, ".env")
def validate_cookie(self, cookies_str: str | None):
if not cookies_str or not cookies_str.strip():
return False, "cookies_empty"
try:
cookies = trans_cookies(cookies_str)
except Exception:
return False, "cookies_parse_failed"
if "a1" not in cookies or not cookies["a1"]:
return False, "missing_a1"
return True, "ok"
def load_from_env(self) -> str | None:
load_dotenv()
cookies_str = os.getenv("COOKIES")
if cookies_str is None:
return None
return cookies_str.strip()
def load_from_file(self) -> str | None:
if not os.path.exists(self.cookies_file):
return None
with open(self.cookies_file, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
return None
val = data.get("COOKIES")
if not val:
return None
return str(val).strip()
def save_to_file(self, cookies_str: str):
os.makedirs(os.path.dirname(self.cookies_file), exist_ok=True)
with open(self.cookies_file, "w", encoding="utf-8") as f:
json.dump({"COOKIES": cookies_str}, f, ensure_ascii=False, separators=(",", ":"))
def save_to_env_file(self, cookies_str: str, env_file: str | None = None):
env_file = os.path.abspath(env_file or self.env_file)
os.makedirs(os.path.dirname(env_file), exist_ok=True)
if os.path.exists(env_file):
with open(env_file, "r", encoding="utf-8") as f:
lines = f.read().splitlines()
else:
lines = []
safe = cookies_str.replace("\\", "\\\\").replace("\"", "\\\"")
new_line = f"COOKIES=\"{safe}\""
replaced = False
out = []
for line in lines:
if line.strip().startswith("COOKIES="):
out.append(new_line)
replaced = True
else:
out.append(line)
if not replaced:
out.append(new_line)
with open(env_file, "w", encoding="utf-8") as f:
f.write("\n".join(out).rstrip("\n") + "\n")
def load_cookies(self, mode: str = "both") -> str | None:
candidates = []
if mode in ("manual", "both", "auto"):
candidates.append(("env", self.load_from_env()))
candidates.append(("file", self.load_from_file()))
for source, cookies_str in candidates:
ok, reason = self.validate_cookie(cookies_str)
if ok:
logger.info(f"cookies_loaded source={source} a1={_mask(trans_cookies(cookies_str).get('a1', ''))}")
return cookies_str
if cookies_str:
logger.warning(f"cookies_invalid source={source} reason={reason}")
return None
def refresh_cookies_if_needed(self, cookies_str: str | None, mode: str = "both"):
ok, _ = self.validate_cookie(cookies_str)
if ok:
return cookies_str
if mode in ("auto", "both"):
raise ValueError("cookies_invalid_need_login")
return None