import concurrent.futures import gzip import io import json import os import threading from typing import Union import numpy as np import requests from PIL import Image from requests.adapters import HTTPAdapter from tqdm import tqdm IMG_BASE = "https://i.pximg.net/img-original/img/" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0" DEFAULT_WORKERS = 8 REQUEST_TIMEOUT = 45 thread_local = threading.local() def read_dotenv_value(path, key): try: with open(path, "r") as env_file: for line in env_file: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, v = line.split("=", 1) if k == key: return v except FileNotFoundError: return None return None def get_phpsessid(): phpsessid = os.getenv("PHPSESSID") if phpsessid: return phpsessid env_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".env")) phpsessid = read_dotenv_value(env_path, "PHPSESSID") if phpsessid: return phpsessid raise RuntimeError("PHPSESSID is not set in the environment or .env") def byteize(alpha): alpha = alpha.T.reshape((-1,)) alpha = alpha[:(alpha.shape[0] // 8) * 8] alpha = np.bitwise_and(alpha, 1) alpha = alpha.reshape((-1, 8)) alpha = np.packbits(alpha, axis=1) return alpha class LSBExtractor: def __init__(self, alpha): self.data = byteize(alpha) self.pos = 0 def get_one_byte(self): byte = self.data[self.pos] self.pos += 1 return byte def get_next_n_bytes(self, n): n_bytes = self.data[self.pos:self.pos + n] self.pos += n return bytearray(n_bytes) def read_32bit_integer(self): bytes_list = self.get_next_n_bytes(4) if len(bytes_list) == 4: integer_value = int.from_bytes(bytes_list, byteorder="big") return integer_value return None def extract_image_metadata(image: Union[Image.Image, np.ndarray]) -> dict: if isinstance(image, Image.Image): if "A" not in image.getbands(): raise AssertionError("image format") alpha = np.array(image.getchannel("A")) else: if image.ndim == 3: alpha = image[..., -1] else: alpha = image assert alpha.ndim == 2, "image format" reader = LSBExtractor(alpha) magic = "stealth_pngcomp" read_magic = reader.get_next_n_bytes(len(magic)).decode("utf-8") assert magic == read_magic, "magic number" read_len = reader.read_32bit_integer() // 8 json_data = reader.get_next_n_bytes(read_len) json_data = json.loads(gzip.decompress(json_data).decode("utf-8")) if "Comment" in json_data and isinstance(json_data["Comment"], str): json_data["Comment"] = json.loads(json_data["Comment"]) return json_data def iter_text_files(): valid = [f for f in os.listdir() if f.endswith(".txt")] if not valid: print("No .txt files found.") return [] for idx, file in enumerate(valid): print(f"{idx + 1}: {file}") inputs = input("Enter the index of the file: ").split() indexes = [] for inp in inputs: if "-" in inp: start, end = map(int, inp.split("-")) indexes.extend(range(start - 1, end)) elif inp.isdigit(): indexes.append(int(inp) - 1) indexes = [idx for idx in sorted(set(indexes)) if 0 <= idx < len(valid)] return [valid[idx] for idx in indexes] def fetch_post_pages(session, post_id): url = f"https://www.pixiv.net/ajax/illust/{post_id}/pages" response = session.get(url, timeout=REQUEST_TIMEOUT) response.raise_for_status() data = response.json() return data.get("body") or [] def has_stealth_png(session, image_url, post_id): headers = {"Referer": f"https://www.pixiv.net/artworks/{post_id}"} response = session.get(image_url, headers=headers, timeout=REQUEST_TIMEOUT) response.raise_for_status() image = Image.open(io.BytesIO(response.content)) extract_image_metadata(image) return True def find_stealth_page(post_id, phpsessid): session = get_thread_session(phpsessid) try: pages = fetch_post_pages(session, post_id) except Exception: return None for idx, page in enumerate(pages): original = page.get("urls", {}).get("original") if not original or not original.lower().endswith(".png"): continue try: if has_stealth_png(session, original, post_id): return idx + 1 except Exception: continue return None def build_session(phpsessid): session = requests.Session() session.headers.update({"User-Agent": USER_AGENT, "Referer": "https://www.pixiv.net/"}) session.cookies.update({"PHPSESSID": phpsessid}) adapter = HTTPAdapter(pool_connections=DEFAULT_WORKERS * 2, pool_maxsize=DEFAULT_WORKERS * 2) session.mount("https://", adapter) session.mount("http://", adapter) return session def get_thread_session(phpsessid): session = getattr(thread_local, "session", None) if session is None: session = build_session(phpsessid) thread_local.session = session return session def main() -> int: os.chdir(os.path.dirname(os.path.abspath(__file__))) try: phpsessid = get_phpsessid() except Exception as exc: print(f"Failed to load PHPSESSID: {exc}") return 1 files = iter_text_files() if not files: return 0 workers = int(os.getenv("PIXIF_WORKERS", DEFAULT_WORKERS)) for filename in files: with open(filename, "r") as handle: post_ids = handle.read().split() if not post_ids: continue with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: futures = { executor.submit(find_stealth_page, post_id, phpsessid): post_id for post_id in post_ids } bar = tqdm( concurrent.futures.as_completed(futures), total=len(futures), desc=f"Scanning {filename}", unit="post", ) for future in bar: post_id = futures[future] try: page = future.result() except Exception: page = None if page is not None: tqdm.write(f"{post_id} page {page}") return 0 if __name__ == "__main__": raise SystemExit(main())