import os from typing import Any, Dict, List, Optional import requests ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) DEFAULT_ENDPOINT = "https://q6-p.hf.space" REQUEST_TIMEOUT = 300 def read_dotenv_value(path: str, key: str) -> Optional[str]: try: with open(path, "r") as env_file: for line in env_file: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, v = line.split("=", 1) if k == key: return v except FileNotFoundError: return None return None def get_phpsessid() -> str: phpsessid = os.getenv("PHPSESSID") if phpsessid: return phpsessid env_path = os.path.join(ROOT_DIR, ".env") phpsessid = read_dotenv_value(env_path, "PHPSESSID") if phpsessid: return phpsessid raise RuntimeError("PHPSESSID is not set in the environment or .env") def get_endpoint() -> str: endpoint = os.getenv("PIXIF_ENDPOINT") if endpoint: return endpoint.rstrip("/") env_path = os.path.join(ROOT_DIR, ".env") endpoint = read_dotenv_value(env_path, "PIXIF_ENDPOINT") if endpoint: return endpoint.rstrip("/") return DEFAULT_ENDPOINT def fetch_search( url: str, pages: int, ai_only: bool, real_only: bool, phpsessid: str, endpoint: str, ) -> Dict[str, Any]: payload = { "url": url, "pages": pages, "ai_only": ai_only, "real_only": real_only, "phpsessid": phpsessid, } response = requests.post(f"{endpoint}/search", json=payload, timeout=REQUEST_TIMEOUT) response.raise_for_status() data = response.json() if isinstance(data, dict) and data.get("error"): raise RuntimeError(str(data.get("error"))) return data def fetch_users(user_ids: List[int], phpsessid: str, endpoint: str) -> List[Dict[str, Any]]: payload = {"user_ids": user_ids, "phpsessid": phpsessid} response = requests.post(f"{endpoint}/users", json=payload, timeout=REQUEST_TIMEOUT) response.raise_for_status() data = response.json() if isinstance(data, dict) and data.get("error"): raise RuntimeError(str(data.get("error"))) if isinstance(data, dict) and "users" in data: data = data["users"] if not isinstance(data, list): raise RuntimeError("Unexpected response from users endpoint.") return data