p / Client /hunt.py
q6's picture
change client to sqlite
a1a9773
raw
history blame
5.44 kB
import os
import sqlite3
import requests
import tempfile
import zipfile
from tqdm import tqdm
local = 0
endpoint = "http://127.0.0.1:7860" if local else "https://q6-p.hf.space"
def read_dotenv_value(path, key):
try:
with open(path, "r") as env_file:
for line in env_file:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
if k == key:
return v
except FileNotFoundError:
return None
return None
def get_phpsessid():
phpsessid = os.getenv("PHPSESSID")
if phpsessid:
return phpsessid
env_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".env"))
phpsessid = read_dotenv_value(env_path, "PHPSESSID")
if phpsessid:
return phpsessid
raise RuntimeError("PHPSESSID is not set in the environment or .env")
phpsessid = get_phpsessid()
os.chdir(os.path.dirname(os.path.abspath(__file__)))
os.makedirs("images/Stash", exist_ok=True)
images_cache = set(os.listdir("images/Stash"))
DB_PATH = "db.sqlite"
def open_db(path):
conn = sqlite3.connect(path)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS pixif_cache (
post_id TEXT PRIMARY KEY,
url TEXT
)
"""
)
conn.commit()
return conn
def chunked(seq, size):
for i in range(0, len(seq), size):
yield seq[i:i + size]
def fetch_cached_urls(conn, post_ids):
post_ids_dict = {post_id: None for post_id in post_ids}
if not post_ids:
return post_ids_dict
for chunk in chunked(post_ids, 900):
placeholders = ",".join("?" for _ in chunk)
query = f"SELECT post_id, COALESCE(url, '') FROM pixif_cache WHERE post_id IN ({placeholders})"
for post_id, url in conn.execute(query, chunk):
post_ids_dict[post_id] = url
return post_ids_dict
def upsert_urls(conn, rows):
if not rows:
return
conn.executemany(
"""
INSERT INTO pixif_cache (post_id, url)
VALUES (?, ?)
ON CONFLICT(post_id) DO UPDATE SET url = excluded.url
""",
rows,
)
conn = open_db(DB_PATH)
valid = [f for f in os.listdir() if f.endswith(".txt")]
for idx, file in enumerate(valid):
print(f"{idx + 1}: {file}")
inputs = input("Enter the index of the file: ").split()
indexs = []
for inp in inputs:
if "-" in inp:
start, end = map(int, inp.split("-"))
indexs.extend(range(start - 1, end))
elif inp.isdigit():
indexs.append(int(inp) - 1)
def download_zip(to_download, dest_dir):
response = requests.post(f"{endpoint}/pixif_zip", json={"d": to_download}, stream=True)
response.raise_for_status()
total = int(response.headers.get("Content-Length", 0))
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_file:
tmp_path = tmp_file.name
if total > 0:
with tqdm(total=total, unit="B", unit_scale=True, desc="Downloading zip") as pbar:
for chunk in response.iter_content(chunk_size=1024 * 1024):
if chunk:
tmp_file.write(chunk)
pbar.update(len(chunk))
else:
for chunk in response.iter_content(chunk_size=1024 * 1024):
if chunk:
tmp_file.write(chunk)
with zipfile.ZipFile(tmp_path, "r") as zf:
zf.extractall(dest_dir)
os.remove(tmp_path)
def decode_if_binary(val):
if type(val) is bytes:
return val.decode()
return val
for index in indexs:
group_name = valid[index].rsplit(".", 1)[0]
os.makedirs(f"images/{group_name}", exist_ok=True)
with open(valid[index], "r") as f:
post_ids = f.read().split()
post_ids_dict = fetch_cached_urls(conn, post_ids)
filtered = [
post_id
for post_id in post_ids
if post_ids_dict[post_id] is None and f"{post_id}.png" not in images_cache
]
print(f"Group: {group_name}\nFiltered: {len(filtered)}/{len(post_ids)}")
if filtered:
data = requests.post(f'{endpoint}/pixif', json={"post_ids": filtered, "phpsessid": phpsessid})
data = data.json()
rows = [(post_id, url) for post_id, url in data.items()]
no_exif = set(filtered) - set(data.keys())
rows.extend((post_id, "") for post_id in no_exif)
with conn:
upsert_urls(conn, rows)
for post_id, url in data.items():
post_ids_dict[post_id] = url
for post_id in no_exif:
post_ids_dict[post_id] = ""
to_download = {post_id: decode_if_binary(url) for post_id, url in post_ids_dict.items() if url and f"{post_id}.png" not in images_cache}
if to_download:
print(f"Total images to download: {len(to_download)}")
download_zip(to_download, "images/Stash")
images_cache.update(os.listdir("images/Stash"))
print("Linking images to the group directory...")
for i, post_id in enumerate(post_ids):
stash_path = f"images/Stash/{post_id}.png"
dest_path = f"images/{group_name}/{i}_{post_id}.png"
if os.path.exists(stash_path) and not os.path.exists(dest_path):
os.link(stash_path, dest_path)
if not os.listdir(f'images/{group_name}'):
os.rmdir(f"images/{group_name}")
conn.close()