import json import os import sqlite3 import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed import shutil from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union import requests from tqdm import tqdm LOCAL = 0 DRY_RUN = 0 ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) SCRIPTS_DIR = os.path.join(ROOT_DIR, "Scripts") IMAGES_DIR = os.path.join(ROOT_DIR, "images") STASH_DIR = os.path.join(IMAGES_DIR, "Stash") DB_PATH = os.path.join(ROOT_DIR, "db.sqlite") ENDPOINT = "http://127.0.0.1:7860" if LOCAL else "https://q6-p.hf.space" IMG_BASE = "https://i.pximg.net/img-original/img/" REQUEST_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", "Referer": "https://www.pixiv.net/", } MAX_WORKERS = min(16, os.cpu_count() or 8) REQUEST_TIMEOUT = (120, 120) STREAM_REQUEST_TIMEOUT = (120, 30) STREAM_IDLE_TIMEOUT_SECONDS = 45 STREAM_MAX_READ_TIMEOUTS = 3 STREAM_MAX_RETRIES = 3 STREAM_RETRY_DELAY_SECONDS = 2 stop_event = threading.Event() def read_dotenv_value(path: str, key: str) -> Optional[str]: try: with open(path, "r") as env_file: for line in env_file: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, v = line.split("=", 1) if k == key: return v except FileNotFoundError: return None return None def get_phpsessid() -> str: phpsessid = os.getenv("PHPSESSID") if phpsessid: return phpsessid env_path = os.path.join(ROOT_DIR, ".env") phpsessid = read_dotenv_value(env_path, "PHPSESSID") if phpsessid: return phpsessid raise RuntimeError("PHPSESSID is not set in the environment or .env") phpsessid = get_phpsessid() os.makedirs(STASH_DIR, exist_ok=True) images_cache = set(os.listdir(STASH_DIR)) def open_db(path: str) -> sqlite3.Connection: conn = sqlite3.connect(path) conn.execute( """ CREATE TABLE IF NOT EXISTS pixif_cache ( post_id TEXT PRIMARY KEY, url TEXT ) """ ) conn.commit() return conn def chunked(seq: Sequence[str], size: int) -> Iterator[List[str]]: for i in range(0, len(seq), size): yield seq[i:i + size] def fetch_cached_state( conn: sqlite3.Connection, post_ids: Sequence[str], ) -> Dict[str, Optional[str]]: post_ids_dict = {post_id: None for post_id in post_ids} if not post_ids: return post_ids_dict for chunk in chunked(post_ids, 900): placeholders = ",".join("?" for _ in chunk) query = f"SELECT post_id, COALESCE(url, '') FROM pixif_cache WHERE post_id IN ({placeholders})" for post_id, url in conn.execute(query, chunk): post_ids_dict[post_id] = url return post_ids_dict def upsert_urls(conn: sqlite3.Connection, rows: Sequence[Tuple[str, str]]) -> None: if not rows: return conn.executemany( """ INSERT INTO pixif_cache (post_id, url) VALUES (?, ?) ON CONFLICT(post_id) DO UPDATE SET url = excluded.url """, rows, ) def iter_api_hunt_results( post_ids: Sequence[str], phpsessid: str, desc: str, stop_event: threading.Event = stop_event, ) -> Iterator[Tuple[str, str]]: post_ids_list = list(post_ids) if not post_ids_list: return payload = {"post_ids": post_ids_list, "phpsessid": phpsessid} try: with requests.post( f"{ENDPOINT}/pixif_stream", json=payload, stream=True, timeout=STREAM_REQUEST_TIMEOUT, ) as response: response.raise_for_status() with tqdm(total=len(post_ids_list), unit="post", desc=desc) as pbar: last_progress = time.monotonic() consecutive_timeouts = 0 idle_break = False while not stop_event.is_set() and pbar.n < pbar.total: if time.monotonic() - last_progress > STREAM_IDLE_TIMEOUT_SECONDS: break try: for line in response.iter_lines(decode_unicode=True): if stop_event.is_set(): break now = time.monotonic() if now - last_progress > STREAM_IDLE_TIMEOUT_SECONDS: idle_break = True break if not line: continue try: event = json.loads(line) except json.JSONDecodeError: continue post_id = event.get("post_id") if post_id is None: continue post_id = str(post_id) url = event.get("url") if url is None: url = "" if DRY_RUN and url: tqdm.write(f"{post_id} -> {decode_if_binary(url)}") pbar.update(1) last_progress = now consecutive_timeouts = 0 yield post_id, url if idle_break: break break except requests.exceptions.ReadTimeout: consecutive_timeouts += 1 if consecutive_timeouts >= STREAM_MAX_READ_TIMEOUTS: break continue except KeyboardInterrupt: stop_event.set() raise conn = open_db(DB_PATH) valid = [f for f in os.listdir(ROOT_DIR) if f.endswith(".txt")] for idx, file_name in enumerate(valid): print(f"{idx + 1}: {file_name}") inputs = input("Enter the index of the file: ").split() indexs = [] for inp in inputs: if "-" in inp: start, end = map(int, inp.split("-")) indexs.extend(range(start - 1, end)) elif inp.isdigit(): indexs.append(int(inp) - 1) def build_image_url(url: str) -> str: if url.startswith("http"): return url return IMG_BASE + url def download_one_image( post_id: str, url: str, dest_dir: str, phpsessid: str, stop_event: threading.Event = stop_event, ) -> Tuple[str, str, Optional[str]]: dest_path = os.path.join(dest_dir, f"{post_id}.png") if os.path.exists(dest_path): return post_id, "exists", None if stop_event.is_set(): return post_id, "cancelled", None tmp_path = dest_path + ".part" full_url = build_image_url(url) try: with requests.get( full_url, headers=REQUEST_HEADERS, cookies={"PHPSESSID": phpsessid}, stream=True, timeout=REQUEST_TIMEOUT, ) as response: response.raise_for_status() with open(tmp_path, "wb") as handle: for chunk in response.iter_content(chunk_size=1024 * 1024): if stop_event.is_set(): raise KeyboardInterrupt if chunk: handle.write(chunk) os.replace(tmp_path, dest_path) return post_id, "ok", None except KeyboardInterrupt: if os.path.exists(tmp_path): try: os.remove(tmp_path) except OSError: pass return post_id, "cancelled", None except Exception as exc: if os.path.exists(tmp_path): try: os.remove(tmp_path) except OSError: pass return post_id, "error", str(exc) def link_group_image(post_id: str, group_dir: str, post_indexes: Dict[str, int]) -> None: index = post_indexes.get(post_id) if index is None: return stash_path = os.path.join(STASH_DIR, f"{post_id}.png") dest_path = os.path.join(group_dir, f"{index}_{post_id}.png") if os.path.exists(stash_path) and not os.path.exists(dest_path): os.link(stash_path, dest_path) def handle_download_result(future, group_dir: str, post_indexes: Dict[str, int], pbar: tqdm) -> None: post_id, status, detail = future.result() if status == "error": tqdm.write(f"Failed {post_id}: {detail}") elif status in {"ok", "exists"}: images_cache.add(f"{post_id}.png") link_group_image(post_id, group_dir, post_indexes) pbar.update(1) def drain_downloads( futures, group_dir: str, post_indexes: Dict[str, int], pbar: tqdm, ) -> None: for future in list(futures): if future.done(): futures.remove(future) handle_download_result(future, group_dir, post_indexes, pbar) def finish_downloads( futures, group_dir: str, post_indexes: Dict[str, int], pbar: tqdm, stop_event: threading.Event = stop_event, ) -> None: for future in as_completed(list(futures)): if stop_event.is_set(): break futures.remove(future) handle_download_result(future, group_dir, post_indexes, pbar) def decode_if_binary(val: Union[str, bytes]) -> str: if type(val) is bytes: return val.decode() return val def scan_with_retries( post_ids: Sequence[str], phpsessid: str, conn: sqlite3.Connection, post_ids_dict: Dict[str, Optional[str]], executor: ThreadPoolExecutor, futures, queued_downloads, group_dir: str, post_indexes: Dict[str, int], download_pbar: tqdm, desc: str, stop_event: threading.Event = stop_event, ) -> None: if not post_ids: return remaining = list(post_ids) attempts = 0 while remaining and attempts < STREAM_MAX_RETRIES and not stop_event.is_set(): received = set() for post_id, url in iter_api_hunt_results(remaining, phpsessid, desc, stop_event): received.add(post_id) post_ids_dict[post_id] = url if not DRY_RUN: with conn: upsert_urls(conn, [(post_id, url)]) if url and f"{post_id}.png" not in images_cache and post_id not in queued_downloads: queued_downloads.add(post_id) future = executor.submit(download_one_image, post_id, url, STASH_DIR, phpsessid, stop_event) futures.append(future) download_pbar.total += 1 download_pbar.refresh() drain_downloads(futures, group_dir, post_indexes, download_pbar) remaining = [post_id for post_id in remaining if post_id not in received] if not remaining: break attempts += 1 if STREAM_RETRY_DELAY_SECONDS: time.sleep(STREAM_RETRY_DELAY_SECONDS) try: for index in indexs: group_name = valid[index].rsplit(".", 1)[0] group_dir = os.path.join(IMAGES_DIR, group_name) os.makedirs(group_dir, exist_ok=True) with open(os.path.join(ROOT_DIR, valid[index]), "r") as f: post_ids = f.read().split() post_indexes = {post_id: i for i, post_id in enumerate(post_ids)} post_ids_dict = fetch_cached_state(conn, post_ids) if DRY_RUN: filtered = list(post_ids) else: filtered = [ post_id for post_id in post_ids if post_ids_dict[post_id] is None and f"{post_id}.png" not in images_cache ] print(f"Group: {group_name}\nFiltered: {len(filtered)}/{len(post_ids)}") if DRY_RUN: if filtered: print("Dry run outputs (post_id -> page):") for _ in iter_api_hunt_results(filtered, phpsessid, "API hunt", stop_event): pass continue cached_downloads = { post_id: decode_if_binary(url) for post_id, url in post_ids_dict.items() if url and f"{post_id}.png" not in images_cache } futures = [] queued_downloads = set() executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) interrupted = False try: with tqdm(total=0, unit="image", desc="Downloading images") as download_pbar: for post_id, url in cached_downloads.items(): queued_downloads.add(post_id) futures.append(executor.submit(download_one_image, post_id, url, STASH_DIR, phpsessid, stop_event)) download_pbar.total += 1 download_pbar.refresh() if filtered: scan_with_retries( filtered, phpsessid, conn, post_ids_dict, executor, futures, queued_downloads, group_dir, post_indexes, download_pbar, "API hunt", stop_event, ) finish_downloads(futures, group_dir, post_indexes, download_pbar, stop_event) except KeyboardInterrupt: interrupted = True stop_event.set() for future in futures: future.cancel() executor.shutdown(wait=False, cancel_futures=True) raise finally: if not interrupted: executor.shutdown(wait=True) print("Linking images to the group directory...") images_cache.update(os.listdir(STASH_DIR)) for post_id in post_ids: link_group_image(post_id, group_dir, post_indexes) if len(os.listdir(group_dir)) == 0: shutil.rmtree(group_dir) else: novelai_source = os.path.join(SCRIPTS_DIR, "novelai.py") novelai_dest = os.path.join(group_dir, "!novelai.py") if not os.path.exists(novelai_dest) and os.path.exists(novelai_source): os.link(novelai_source, novelai_dest) print(f"Linked novelai.py to images/{group_name}/!novelai.py") comfy_source = os.path.join(SCRIPTS_DIR, "comfy.py") comfy_dest = os.path.join(group_dir, "!comfy.py") if not os.path.exists(comfy_dest) and os.path.exists(comfy_source): os.link(comfy_source, comfy_dest) print(f"Linked comfy.py to images/{group_name}/!comfy.py") except KeyboardInterrupt: stop_event.set() raise finally: conn.close()