XHS / service /session_pool.py
Trae Bot
Upload Spider_XHS project
c481f8a
from __future__ import annotations
import json
import os
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from threading import RLock
from typing import Iterable
def _utc_now() -> datetime:
return datetime.now(timezone.utc)
def _split_list_env(value: str) -> list[str]:
s = str(value or "")
if "\n" in s:
return [item.strip() for item in s.splitlines() if item.strip()]
if "||" in s:
return [item.strip() for item in s.split("||") if item.strip()]
return [item.strip() for item in s.split(",") if item.strip()]
def _env_first(*names: str) -> str | None:
for name in names:
v = os.getenv(name)
if v is None or v == "":
continue
s = str(v).strip()
if s:
return s
return None
def _env_list(*names: str) -> tuple[str, ...]:
out: list[str] = []
for name in names:
value = os.getenv(name)
if value is None or value == "":
continue
out.extend(_split_list_env(value))
return tuple(out)
def _env_path_list(*names: str) -> tuple[Path, ...]:
items: list[str] = []
for name in names:
value = os.getenv(name)
if value is None or value == "":
continue
if "," in value:
items.extend([part.strip() for part in value.split(",") if part.strip()])
else:
s = str(value).strip()
if s:
items.append(s)
base_dir = Path(__file__).resolve().parents[1]
paths: list[Path] = []
for item in items:
path = Path(item).expanduser()
if not path.is_absolute():
path = (base_dir / path).resolve()
paths.append(path)
return tuple(paths)
def _validate_storage_state_json(path: Path) -> tuple[bool, str | None]:
if not path.exists():
return False, f"storage_state_not_found={path}"
if not path.is_file():
return False, f"storage_state_not_file={path}"
if not os.access(path, os.R_OK):
return False, f"storage_state_not_readable={path}"
try:
with path.open("r", encoding="utf-8") as f:
json.load(f)
except Exception as e:
return False, f"storage_state_invalid_json={path} err={e}"
return True, None
@dataclass
class Session:
id: str
account_id: str | None = None
cookies: str | None = None
storage_state_path: Path | None = None
cookie_ok: bool = True
storage_state_ok: bool = True
last_checked_at: datetime | None = None
last_error_kind: str | None = None
last_error_at: datetime | None = None
def has_cookie(self) -> bool:
return bool(self.cookies and str(self.cookies).strip())
def has_storage_state(self) -> bool:
return self.storage_state_path is not None
def check_cookie_light(self) -> tuple[bool, str | None]:
if not self.has_cookie():
return False, "cookie_empty"
return True, None
def check_storage_state_light(self) -> tuple[bool, str | None]:
if self.storage_state_path is None:
return False, "storage_state_missing"
return _validate_storage_state_json(self.storage_state_path)
def check_light(self) -> dict:
now = _utc_now()
cookie_reason = None
state_reason = None
if self.has_cookie():
ok, cookie_reason = self.check_cookie_light()
self.cookie_ok = bool(ok)
else:
self.cookie_ok = True
if self.has_storage_state():
ok, state_reason = self.check_storage_state_light()
self.storage_state_ok = bool(ok)
else:
self.storage_state_ok = True
self.last_checked_at = now
return {
"id": self.id,
"account_id": self.account_id,
"has_cookie": bool(self.has_cookie()),
"has_storage_state": bool(self.has_storage_state()),
"cookie_ok": bool(self.cookie_ok),
"storage_state_ok": bool(self.storage_state_ok),
"cookie_reason": cookie_reason,
"storage_state_reason": state_reason,
"checked_at": now.isoformat(),
}
def cookie_usable(self) -> bool:
return self.has_cookie() and bool(self.cookie_ok)
def storage_state_usable(self) -> bool:
return self.has_storage_state() and bool(self.storage_state_ok)
class SessionPool:
def __init__(self, sessions: Iterable[Session]):
self._lock = RLock()
self._sessions: dict[str, Session] = {s.id: s for s in sessions if s.id}
self._cookie_rr = 0
self._state_rr = 0
@classmethod
def from_env(cls) -> "SessionPool":
cookie_list = _env_list("COOKIES_LIST", "XHS_COOKIES_LIST", "SERVICE_COOKIES_LIST")
cookie_single = _env_first("COOKIES", "XHS_COOKIES")
storage_paths = _env_path_list(
"MEDIACRAWLER_STORAGE_STATE_PATHS",
"MEDIACRAWLER_STORAGE_STATE_PATH",
"SERVICE_STORAGE_STATE_PATHS",
"SERVICE_STORAGE_STATE_PATH",
)
if not cookie_list and cookie_single:
cookie_list = (cookie_single,)
multi_cookie = len(cookie_list) > 1 or bool(_env_first("COOKIES_LIST", "XHS_COOKIES_LIST", "SERVICE_COOKIES_LIST"))
multi_state = len(storage_paths) > 1
sessions: list[Session] = []
if not multi_cookie and not multi_state:
cookies = cookie_list[0] if cookie_list else None
state = storage_paths[0] if storage_paths else None
sessions.append(Session(id="default", account_id="default", cookies=cookies, storage_state_path=state))
return cls(sessions)
for idx, cookies in enumerate(cookie_list):
sid = f"cookie-{idx + 1}"
sessions.append(Session(id=sid, account_id=sid, cookies=cookies))
for idx, path in enumerate(storage_paths):
sid = f"state-{idx + 1}"
sessions.append(Session(id=sid, account_id=sid, storage_state_path=path))
return cls(sessions)
def list_sessions(self) -> tuple[Session, ...]:
with self._lock:
return tuple(self._sessions.values())
def get(self, session_id: str) -> Session | None:
with self._lock:
return self._sessions.get(str(session_id))
def check_all_light(self) -> dict:
with self._lock:
reports = [s.check_light() for s in self._sessions.values()]
reports.sort(key=lambda x: str(x.get("id") or ""))
return {"now": _utc_now().isoformat(), "sessions": reports}
def acquire_cookie_session(self) -> Session | None:
with self._lock:
candidates = [s for s in self._sessions.values() if s.has_cookie()]
if not candidates:
return None
for s in candidates:
s.check_light()
healthy = [s for s in candidates if s.cookie_usable()]
if not healthy:
return None
healthy.sort(key=lambda s: str(s.id))
idx = self._cookie_rr % len(healthy)
self._cookie_rr += 1
return healthy[idx]
def acquire_storage_state_session(self) -> Session | None:
with self._lock:
candidates = [s for s in self._sessions.values() if s.has_storage_state()]
if not candidates:
return None
for s in candidates:
s.check_light()
healthy = [s for s in candidates if s.storage_state_usable()]
if not healthy:
return None
healthy.sort(key=lambda s: str(s.id))
idx = self._state_rr % len(healthy)
self._state_rr += 1
return healthy[idx]
def report_success(self, session_id: str) -> None:
now = _utc_now()
with self._lock:
s = self._sessions.get(str(session_id))
if s is None:
return
s.last_error_kind = None
s.last_error_at = None
s.last_checked_at = now
if s.has_cookie():
s.cookie_ok = True
if s.has_storage_state():
s.storage_state_ok = True
def report_failure(self, session_id: str, *, error_kind: str | None) -> None:
now = _utc_now()
kind = (error_kind or "").strip() or None
with self._lock:
s = self._sessions.get(str(session_id))
if s is None:
return
s.last_error_kind = kind
s.last_error_at = now
s.last_checked_at = now
if kind in ("auth", "risk", "captcha"):
if s.has_cookie():
s.cookie_ok = False
if s.has_storage_state():
s.storage_state_ok = False
return
if kind in ("rate",):
if s.has_cookie():
s.cookie_ok = False
return
def snapshot(self) -> dict:
now = _utc_now()
with self._lock:
sessions: list[dict] = []
for s in self._sessions.values():
sessions.append(
{
"id": s.id,
"account_id": s.account_id,
"has_cookie": bool(s.has_cookie()),
"has_storage_state": bool(s.has_storage_state()),
"cookie_ok": bool(s.cookie_ok),
"storage_state_ok": bool(s.storage_state_ok),
"storage_state_path": str(s.storage_state_path) if s.storage_state_path else None,
"last_checked_at": s.last_checked_at.isoformat() if s.last_checked_at else None,
"last_error_kind": s.last_error_kind,
"last_error_at": s.last_error_at.isoformat() if s.last_error_at else None,
}
)
sessions.sort(key=lambda x: str(x.get("id") or ""))
return {"now": now.isoformat(), "sessions": sessions}