| import os | |
| import lmdb | |
| import requests | |
| import tempfile | |
| import zipfile | |
| from tqdm import tqdm | |
| local = 0 | |
| endpoint = "http://127.0.0.1:7860" if local else "https://q6-p.hf.space" | |
| def read_dotenv_value(path, key): | |
| try: | |
| with open(path, "r") as env_file: | |
| for line in env_file: | |
| line = line.strip() | |
| if not line or line.startswith("#") or "=" not in line: | |
| continue | |
| k, v = line.split("=", 1) | |
| if k == key: | |
| return v | |
| except FileNotFoundError: | |
| return None | |
| return None | |
| def get_phpsessid(): | |
| phpsessid = os.getenv("PHPSESSID") | |
| if phpsessid: | |
| return phpsessid | |
| env_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".env")) | |
| phpsessid = read_dotenv_value(env_path, "PHPSESSID") | |
| if phpsessid: | |
| return phpsessid | |
| raise RuntimeError("PHPSESSID is not set in the environment or .env") | |
| phpsessid = get_phpsessid() | |
| os.chdir(os.path.dirname(os.path.abspath(__file__))) | |
| os.makedirs("images/Stash", exist_ok=True) | |
| images_cache = set(os.listdir("images/Stash")) | |
| db = lmdb.open("db", subdir=True, map_size=1048576 * 2) | |
| valid = [f for f in os.listdir() if f.endswith(".txt")] | |
| for idx, file in enumerate(valid): | |
| print(f"{idx + 1}: {file}") | |
| inputs = input("Enter the index of the file: ").split() | |
| indexs = [] | |
| for inp in inputs: | |
| if "-" in inp: | |
| start, end = map(int, inp.split("-")) | |
| indexs.extend(range(start - 1, end)) | |
| elif inp.isdigit(): | |
| indexs.append(int(inp) - 1) | |
| def download_zip(to_download, dest_dir): | |
| response = requests.post(f"{endpoint}/pixif_zip", json={"d": to_download}, stream=True) | |
| response.raise_for_status() | |
| total = int(response.headers.get("Content-Length", 0)) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_file: | |
| tmp_path = tmp_file.name | |
| if total > 0: | |
| with tqdm(total=total, unit="B", unit_scale=True, desc="Downloading zip") as pbar: | |
| for chunk in response.iter_content(chunk_size=1024 * 1024): | |
| if chunk: | |
| tmp_file.write(chunk) | |
| pbar.update(len(chunk)) | |
| else: | |
| for chunk in response.iter_content(chunk_size=1024 * 1024): | |
| if chunk: | |
| tmp_file.write(chunk) | |
| with zipfile.ZipFile(tmp_path, "r") as zf: | |
| zf.extractall(dest_dir) | |
| os.remove(tmp_path) | |
| def decode_if_binary(val): | |
| if type(val) is bytes: | |
| return val.decode() | |
| return val | |
| for index in indexs: | |
| group_name = valid[index].rsplit(".", 1)[0] | |
| os.makedirs(f"images/{group_name}", exist_ok=True) | |
| with open(valid[index], "r") as f: | |
| post_ids = f.read().split() | |
| with db.begin(write=True) as txn: | |
| post_ids_dict = {post_id: txn.get(post_id.encode()) for post_id in post_ids} | |
| filtered = [post_id for post_id in post_ids if post_ids_dict[post_id] is None and f"{post_id}.png" not in images_cache] | |
| print(f"Group: {group_name}\nFiltered: {len(filtered)}/{len(post_ids)}") | |
| if filtered: | |
| data = requests.post(f'{endpoint}/pixif', json={"post_ids": filtered, "phpsessid": phpsessid}) | |
| data = data.json() | |
| for post_id, url in data.items(): | |
| txn.put(post_id.encode(), url.encode()) | |
| post_ids_dict[post_id] = url | |
| no_exif = set(filtered) - set(data.keys()) | |
| for post_id in no_exif: | |
| txn.put(post_id.encode(), b'') | |
| to_download = {post_id: decode_if_binary(url) for post_id, url in post_ids_dict.items() if url and f"{post_id}.png" not in images_cache} | |
| if to_download: | |
| print(f"Total images to download: {len(to_download)}") | |
| download_zip(to_download, "images/Stash") | |
| images_cache.update(os.listdir("images/Stash")) | |
| print("Linking images to the group directory...") | |
| for i, post_id in enumerate(post_ids): | |
| stash_path = f"images/Stash/{post_id}.png" | |
| dest_path = f"images/{group_name}/{i}_{post_id}.png" | |
| if os.path.exists(stash_path) and not os.path.exists(dest_path): | |
| os.link(stash_path, dest_path) | |
| if not os.listdir(f'images/{group_name}'): | |
| os.rmdir(f"images/{group_name}") | |
| db.close() | |