| from __future__ import annotations |
|
|
| import json |
| import os |
| from dataclasses import dataclass |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from threading import RLock |
| from typing import Iterable |
|
|
|
|
| def _utc_now() -> datetime: |
| return datetime.now(timezone.utc) |
|
|
|
|
| def _split_list_env(value: str) -> list[str]: |
| s = str(value or "") |
| if "\n" in s: |
| return [item.strip() for item in s.splitlines() if item.strip()] |
| if "||" in s: |
| return [item.strip() for item in s.split("||") if item.strip()] |
| return [item.strip() for item in s.split(",") if item.strip()] |
|
|
|
|
| def _env_first(*names: str) -> str | None: |
| for name in names: |
| v = os.getenv(name) |
| if v is None or v == "": |
| continue |
| s = str(v).strip() |
| if s: |
| return s |
| return None |
|
|
|
|
| def _env_list(*names: str) -> tuple[str, ...]: |
| out: list[str] = [] |
| for name in names: |
| value = os.getenv(name) |
| if value is None or value == "": |
| continue |
| out.extend(_split_list_env(value)) |
| return tuple(out) |
|
|
|
|
| def _env_path_list(*names: str) -> tuple[Path, ...]: |
| items: list[str] = [] |
| for name in names: |
| value = os.getenv(name) |
| if value is None or value == "": |
| continue |
| if "," in value: |
| items.extend([part.strip() for part in value.split(",") if part.strip()]) |
| else: |
| s = str(value).strip() |
| if s: |
| items.append(s) |
| base_dir = Path(__file__).resolve().parents[1] |
| paths: list[Path] = [] |
| for item in items: |
| path = Path(item).expanduser() |
| if not path.is_absolute(): |
| path = (base_dir / path).resolve() |
| paths.append(path) |
| return tuple(paths) |
|
|
|
|
| def _validate_storage_state_json(path: Path) -> tuple[bool, str | None]: |
| if not path.exists(): |
| return False, f"storage_state_not_found={path}" |
| if not path.is_file(): |
| return False, f"storage_state_not_file={path}" |
| if not os.access(path, os.R_OK): |
| return False, f"storage_state_not_readable={path}" |
| try: |
| with path.open("r", encoding="utf-8") as f: |
| json.load(f) |
| except Exception as e: |
| return False, f"storage_state_invalid_json={path} err={e}" |
| return True, None |
|
|
|
|
| @dataclass |
| class Session: |
| id: str |
| account_id: str | None = None |
| cookies: str | None = None |
| storage_state_path: Path | None = None |
| cookie_ok: bool = True |
| storage_state_ok: bool = True |
| last_checked_at: datetime | None = None |
| last_error_kind: str | None = None |
| last_error_at: datetime | None = None |
|
|
| def has_cookie(self) -> bool: |
| return bool(self.cookies and str(self.cookies).strip()) |
|
|
| def has_storage_state(self) -> bool: |
| return self.storage_state_path is not None |
|
|
| def check_cookie_light(self) -> tuple[bool, str | None]: |
| if not self.has_cookie(): |
| return False, "cookie_empty" |
| return True, None |
|
|
| def check_storage_state_light(self) -> tuple[bool, str | None]: |
| if self.storage_state_path is None: |
| return False, "storage_state_missing" |
| return _validate_storage_state_json(self.storage_state_path) |
|
|
| def check_light(self) -> dict: |
| now = _utc_now() |
| cookie_reason = None |
| state_reason = None |
| if self.has_cookie(): |
| ok, cookie_reason = self.check_cookie_light() |
| self.cookie_ok = bool(ok) |
| else: |
| self.cookie_ok = True |
| if self.has_storage_state(): |
| ok, state_reason = self.check_storage_state_light() |
| self.storage_state_ok = bool(ok) |
| else: |
| self.storage_state_ok = True |
| self.last_checked_at = now |
| return { |
| "id": self.id, |
| "account_id": self.account_id, |
| "has_cookie": bool(self.has_cookie()), |
| "has_storage_state": bool(self.has_storage_state()), |
| "cookie_ok": bool(self.cookie_ok), |
| "storage_state_ok": bool(self.storage_state_ok), |
| "cookie_reason": cookie_reason, |
| "storage_state_reason": state_reason, |
| "checked_at": now.isoformat(), |
| } |
|
|
| def cookie_usable(self) -> bool: |
| return self.has_cookie() and bool(self.cookie_ok) |
|
|
| def storage_state_usable(self) -> bool: |
| return self.has_storage_state() and bool(self.storage_state_ok) |
|
|
|
|
| class SessionPool: |
| def __init__(self, sessions: Iterable[Session]): |
| self._lock = RLock() |
| self._sessions: dict[str, Session] = {s.id: s for s in sessions if s.id} |
| self._cookie_rr = 0 |
| self._state_rr = 0 |
|
|
| @classmethod |
| def from_env(cls) -> "SessionPool": |
| cookie_list = _env_list("COOKIES_LIST", "XHS_COOKIES_LIST", "SERVICE_COOKIES_LIST") |
| cookie_single = _env_first("COOKIES", "XHS_COOKIES") |
| storage_paths = _env_path_list( |
| "MEDIACRAWLER_STORAGE_STATE_PATHS", |
| "MEDIACRAWLER_STORAGE_STATE_PATH", |
| "SERVICE_STORAGE_STATE_PATHS", |
| "SERVICE_STORAGE_STATE_PATH", |
| ) |
|
|
| if not cookie_list and cookie_single: |
| cookie_list = (cookie_single,) |
|
|
| multi_cookie = len(cookie_list) > 1 or bool(_env_first("COOKIES_LIST", "XHS_COOKIES_LIST", "SERVICE_COOKIES_LIST")) |
| multi_state = len(storage_paths) > 1 |
|
|
| sessions: list[Session] = [] |
| if not multi_cookie and not multi_state: |
| cookies = cookie_list[0] if cookie_list else None |
| state = storage_paths[0] if storage_paths else None |
| sessions.append(Session(id="default", account_id="default", cookies=cookies, storage_state_path=state)) |
| return cls(sessions) |
|
|
| for idx, cookies in enumerate(cookie_list): |
| sid = f"cookie-{idx + 1}" |
| sessions.append(Session(id=sid, account_id=sid, cookies=cookies)) |
| for idx, path in enumerate(storage_paths): |
| sid = f"state-{idx + 1}" |
| sessions.append(Session(id=sid, account_id=sid, storage_state_path=path)) |
| return cls(sessions) |
|
|
| def list_sessions(self) -> tuple[Session, ...]: |
| with self._lock: |
| return tuple(self._sessions.values()) |
|
|
| def get(self, session_id: str) -> Session | None: |
| with self._lock: |
| return self._sessions.get(str(session_id)) |
|
|
| def check_all_light(self) -> dict: |
| with self._lock: |
| reports = [s.check_light() for s in self._sessions.values()] |
| reports.sort(key=lambda x: str(x.get("id") or "")) |
| return {"now": _utc_now().isoformat(), "sessions": reports} |
|
|
| def acquire_cookie_session(self) -> Session | None: |
| with self._lock: |
| candidates = [s for s in self._sessions.values() if s.has_cookie()] |
| if not candidates: |
| return None |
| for s in candidates: |
| s.check_light() |
| healthy = [s for s in candidates if s.cookie_usable()] |
| if not healthy: |
| return None |
| healthy.sort(key=lambda s: str(s.id)) |
| idx = self._cookie_rr % len(healthy) |
| self._cookie_rr += 1 |
| return healthy[idx] |
|
|
| def acquire_storage_state_session(self) -> Session | None: |
| with self._lock: |
| candidates = [s for s in self._sessions.values() if s.has_storage_state()] |
| if not candidates: |
| return None |
| for s in candidates: |
| s.check_light() |
| healthy = [s for s in candidates if s.storage_state_usable()] |
| if not healthy: |
| return None |
| healthy.sort(key=lambda s: str(s.id)) |
| idx = self._state_rr % len(healthy) |
| self._state_rr += 1 |
| return healthy[idx] |
|
|
| def report_success(self, session_id: str) -> None: |
| now = _utc_now() |
| with self._lock: |
| s = self._sessions.get(str(session_id)) |
| if s is None: |
| return |
| s.last_error_kind = None |
| s.last_error_at = None |
| s.last_checked_at = now |
| if s.has_cookie(): |
| s.cookie_ok = True |
| if s.has_storage_state(): |
| s.storage_state_ok = True |
|
|
| def report_failure(self, session_id: str, *, error_kind: str | None) -> None: |
| now = _utc_now() |
| kind = (error_kind or "").strip() or None |
| with self._lock: |
| s = self._sessions.get(str(session_id)) |
| if s is None: |
| return |
| s.last_error_kind = kind |
| s.last_error_at = now |
| s.last_checked_at = now |
| if kind in ("auth", "risk", "captcha"): |
| if s.has_cookie(): |
| s.cookie_ok = False |
| if s.has_storage_state(): |
| s.storage_state_ok = False |
| return |
| if kind in ("rate",): |
| if s.has_cookie(): |
| s.cookie_ok = False |
| return |
|
|
| def snapshot(self) -> dict: |
| now = _utc_now() |
| with self._lock: |
| sessions: list[dict] = [] |
| for s in self._sessions.values(): |
| sessions.append( |
| { |
| "id": s.id, |
| "account_id": s.account_id, |
| "has_cookie": bool(s.has_cookie()), |
| "has_storage_state": bool(s.has_storage_state()), |
| "cookie_ok": bool(s.cookie_ok), |
| "storage_state_ok": bool(s.storage_state_ok), |
| "storage_state_path": str(s.storage_state_path) if s.storage_state_path else None, |
| "last_checked_at": s.last_checked_at.isoformat() if s.last_checked_at else None, |
| "last_error_kind": s.last_error_kind, |
| "last_error_at": s.last_error_at.isoformat() if s.last_error_at else None, |
| } |
| ) |
| sessions.sort(key=lambda x: str(x.get("id") or "")) |
| return {"now": now.isoformat(), "sessions": sessions} |
|
|
|
|