from __future__ import annotations import json import os from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from threading import RLock from typing import Iterable def _utc_now() -> datetime: return datetime.now(timezone.utc) def _split_list_env(value: str) -> list[str]: s = str(value or "") if "\n" in s: return [item.strip() for item in s.splitlines() if item.strip()] if "||" in s: return [item.strip() for item in s.split("||") if item.strip()] return [item.strip() for item in s.split(",") if item.strip()] def _env_first(*names: str) -> str | None: for name in names: v = os.getenv(name) if v is None or v == "": continue s = str(v).strip() if s: return s return None def _env_list(*names: str) -> tuple[str, ...]: out: list[str] = [] for name in names: value = os.getenv(name) if value is None or value == "": continue out.extend(_split_list_env(value)) return tuple(out) def _env_path_list(*names: str) -> tuple[Path, ...]: items: list[str] = [] for name in names: value = os.getenv(name) if value is None or value == "": continue if "," in value: items.extend([part.strip() for part in value.split(",") if part.strip()]) else: s = str(value).strip() if s: items.append(s) base_dir = Path(__file__).resolve().parents[1] paths: list[Path] = [] for item in items: path = Path(item).expanduser() if not path.is_absolute(): path = (base_dir / path).resolve() paths.append(path) return tuple(paths) def _validate_storage_state_json(path: Path) -> tuple[bool, str | None]: if not path.exists(): return False, f"storage_state_not_found={path}" if not path.is_file(): return False, f"storage_state_not_file={path}" if not os.access(path, os.R_OK): return False, f"storage_state_not_readable={path}" try: with path.open("r", encoding="utf-8") as f: json.load(f) except Exception as e: return False, f"storage_state_invalid_json={path} err={e}" return True, None @dataclass class Session: id: str account_id: str | None = None cookies: str | None = None storage_state_path: Path | None = None cookie_ok: bool = True storage_state_ok: bool = True last_checked_at: datetime | None = None last_error_kind: str | None = None last_error_at: datetime | None = None def has_cookie(self) -> bool: return bool(self.cookies and str(self.cookies).strip()) def has_storage_state(self) -> bool: return self.storage_state_path is not None def check_cookie_light(self) -> tuple[bool, str | None]: if not self.has_cookie(): return False, "cookie_empty" return True, None def check_storage_state_light(self) -> tuple[bool, str | None]: if self.storage_state_path is None: return False, "storage_state_missing" return _validate_storage_state_json(self.storage_state_path) def check_light(self) -> dict: now = _utc_now() cookie_reason = None state_reason = None if self.has_cookie(): ok, cookie_reason = self.check_cookie_light() self.cookie_ok = bool(ok) else: self.cookie_ok = True if self.has_storage_state(): ok, state_reason = self.check_storage_state_light() self.storage_state_ok = bool(ok) else: self.storage_state_ok = True self.last_checked_at = now return { "id": self.id, "account_id": self.account_id, "has_cookie": bool(self.has_cookie()), "has_storage_state": bool(self.has_storage_state()), "cookie_ok": bool(self.cookie_ok), "storage_state_ok": bool(self.storage_state_ok), "cookie_reason": cookie_reason, "storage_state_reason": state_reason, "checked_at": now.isoformat(), } def cookie_usable(self) -> bool: return self.has_cookie() and bool(self.cookie_ok) def storage_state_usable(self) -> bool: return self.has_storage_state() and bool(self.storage_state_ok) class SessionPool: def __init__(self, sessions: Iterable[Session]): self._lock = RLock() self._sessions: dict[str, Session] = {s.id: s for s in sessions if s.id} self._cookie_rr = 0 self._state_rr = 0 @classmethod def from_env(cls) -> "SessionPool": cookie_list = _env_list("COOKIES_LIST", "XHS_COOKIES_LIST", "SERVICE_COOKIES_LIST") cookie_single = _env_first("COOKIES", "XHS_COOKIES") storage_paths = _env_path_list( "MEDIACRAWLER_STORAGE_STATE_PATHS", "MEDIACRAWLER_STORAGE_STATE_PATH", "SERVICE_STORAGE_STATE_PATHS", "SERVICE_STORAGE_STATE_PATH", ) if not cookie_list and cookie_single: cookie_list = (cookie_single,) multi_cookie = len(cookie_list) > 1 or bool(_env_first("COOKIES_LIST", "XHS_COOKIES_LIST", "SERVICE_COOKIES_LIST")) multi_state = len(storage_paths) > 1 sessions: list[Session] = [] if not multi_cookie and not multi_state: cookies = cookie_list[0] if cookie_list else None state = storage_paths[0] if storage_paths else None sessions.append(Session(id="default", account_id="default", cookies=cookies, storage_state_path=state)) return cls(sessions) for idx, cookies in enumerate(cookie_list): sid = f"cookie-{idx + 1}" sessions.append(Session(id=sid, account_id=sid, cookies=cookies)) for idx, path in enumerate(storage_paths): sid = f"state-{idx + 1}" sessions.append(Session(id=sid, account_id=sid, storage_state_path=path)) return cls(sessions) def list_sessions(self) -> tuple[Session, ...]: with self._lock: return tuple(self._sessions.values()) def get(self, session_id: str) -> Session | None: with self._lock: return self._sessions.get(str(session_id)) def check_all_light(self) -> dict: with self._lock: reports = [s.check_light() for s in self._sessions.values()] reports.sort(key=lambda x: str(x.get("id") or "")) return {"now": _utc_now().isoformat(), "sessions": reports} def acquire_cookie_session(self) -> Session | None: with self._lock: candidates = [s for s in self._sessions.values() if s.has_cookie()] if not candidates: return None for s in candidates: s.check_light() healthy = [s for s in candidates if s.cookie_usable()] if not healthy: return None healthy.sort(key=lambda s: str(s.id)) idx = self._cookie_rr % len(healthy) self._cookie_rr += 1 return healthy[idx] def acquire_storage_state_session(self) -> Session | None: with self._lock: candidates = [s for s in self._sessions.values() if s.has_storage_state()] if not candidates: return None for s in candidates: s.check_light() healthy = [s for s in candidates if s.storage_state_usable()] if not healthy: return None healthy.sort(key=lambda s: str(s.id)) idx = self._state_rr % len(healthy) self._state_rr += 1 return healthy[idx] def report_success(self, session_id: str) -> None: now = _utc_now() with self._lock: s = self._sessions.get(str(session_id)) if s is None: return s.last_error_kind = None s.last_error_at = None s.last_checked_at = now if s.has_cookie(): s.cookie_ok = True if s.has_storage_state(): s.storage_state_ok = True def report_failure(self, session_id: str, *, error_kind: str | None) -> None: now = _utc_now() kind = (error_kind or "").strip() or None with self._lock: s = self._sessions.get(str(session_id)) if s is None: return s.last_error_kind = kind s.last_error_at = now s.last_checked_at = now if kind in ("auth", "risk", "captcha"): if s.has_cookie(): s.cookie_ok = False if s.has_storage_state(): s.storage_state_ok = False return if kind in ("rate",): if s.has_cookie(): s.cookie_ok = False return def snapshot(self) -> dict: now = _utc_now() with self._lock: sessions: list[dict] = [] for s in self._sessions.values(): sessions.append( { "id": s.id, "account_id": s.account_id, "has_cookie": bool(s.has_cookie()), "has_storage_state": bool(s.has_storage_state()), "cookie_ok": bool(s.cookie_ok), "storage_state_ok": bool(s.storage_state_ok), "storage_state_path": str(s.storage_state_path) if s.storage_state_path else None, "last_checked_at": s.last_checked_at.isoformat() if s.last_checked_at else None, "last_error_kind": s.last_error_kind, "last_error_at": s.last_error_at.isoformat() if s.last_error_at else None, } ) sessions.sort(key=lambda x: str(x.get("id") or "")) return {"now": now.isoformat(), "sessions": sessions}