from __future__ import annotations import argparse import sqlite3 import threading import time from concurrent.futures import ThreadPoolExecutor import requests DB_NAME = "grail_data.db" REQUEST_TIMEOUT_SECONDS = 10 REQUEST_PACING_SECONDS = 0.5 MAX_CONSECUTIVE_FAILURES = 1000 failure_counter = 0 counter_lock = threading.Lock() def setup_db() -> None: conn = sqlite3.connect(DB_NAME) curr = conn.cursor() curr.execute("PRAGMA foreign_keys = ON;") curr.executescript(""" CREATE TABLE IF NOT EXISTS category ( id INTEGER PRIMARY KEY, name TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS subject ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, category_id INTEGER, FOREIGN KEY (category_id) REFERENCES category(id) ); CREATE TABLE IF NOT EXISTS account ( id INTEGER PRIMARY KEY, username TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS filedata ( id INTEGER PRIMARY KEY, year INTEGER, document_name TEXT, file_name TEXT, uploaded_on TEXT, subject_id INTEGER, account_id INTEGER, category_id INTEGER, extension TEXT, approved BOOLEAN, FOREIGN KEY (subject_id) REFERENCES subject(id), FOREIGN KEY (account_id) REFERENCES account(id), FOREIGN KEY (category_id) REFERENCES category(id) ); CREATE TABLE IF NOT EXISTS successlog( id INTEGER PRIMARY KEY, success BOOLEAN DEFAULT 0, downloaded BOOLEAN DEFAULT 0 ) """) conn.commit() conn.close() print("Database schema initialized.") def _should_stop() -> bool: with counter_lock: return failure_counter >= MAX_CONSECUTIVE_FAILURES def _register_failure() -> None: global failure_counter with counter_lock: failure_counter += 1 if failure_counter >= MAX_CONSECUTIVE_FAILURES: print("MAX consecutive failures reached. Stopping new submissions.") def _reset_failure_counter() -> None: global failure_counter with counter_lock: failure_counter = 0 def get_meta(number: int) -> int: global failure_counter returni = 0 if _should_stop(): return returni thread_conn = sqlite3.connect(DB_NAME) thread_curr = thread_conn.cursor() thread_curr.execute("PRAGMA foreign_keys = ON;") thread_curr.execute( "SELECT success FROM successlog WHERE id = ? AND success = 1", (number,), ) if thread_curr.fetchone(): print(f"Skipping {number}, already successfully processed.") thread_conn.close() return 0 url = f"https://api.grail.moe/note/{number}" try: time.sleep(REQUEST_PACING_SECONDS) response = requests.get(url, timeout=REQUEST_TIMEOUT_SECONDS) if response.status_code == 200: data = response.json() thread_curr.execute( "INSERT OR REPLACE INTO successlog (id, success) VALUES (?, ?)", (number, 1), ) thread_curr.execute( "INSERT OR IGNORE INTO category (id, name) VALUES (?, ?)", (data["doc_category"]["id"], data["doc_category"]["name"]), ) thread_curr.execute( "INSERT OR IGNORE INTO subject (id, name, category_id) VALUES (?, ?, ?)", ( data["doc_subject"]["id"], data["doc_subject"]["name"], data["doc_category"]["id"], ), ) thread_curr.execute( "INSERT OR IGNORE INTO account (id, username) VALUES (?, ?)", (data["account"]["user_id"], data["account"]["username"]), ) is_approved = 1 if data["approved"] else 0 thread_curr.execute( """ INSERT OR REPLACE INTO filedata (id, year, document_name, file_name, uploaded_on, subject_id, category_id, account_id, extension, approved) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( data["id"], data["year"], data["document_name"], data["file_name"], data["uploaded_on"], data["doc_subject"]["id"], data["category"], data["account"]["user_id"], data["extension"], is_approved, ), ) print(f"Saved: {data['id']}") _reset_failure_counter() returni = 1 else: thread_curr.execute( "INSERT OR REPLACE INTO successlog (id, success) VALUES (?, ?)", (number, 0), ) print(f"Failed/Not Found: {number} (Status: {response.status_code})") _register_failure() except requests.RequestException as exc: print(f"Request error for {number}: {exc}") _register_failure() except (KeyError, ValueError, sqlite3.DatabaseError) as exc: print(f"Data/database error for {number}: {exc}") _register_failure() finally: thread_conn.commit() thread_conn.close() return returni def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Extract metadata from grail API into sqlite.") # parser.add_argument('--max-fail',type = int, default=1000) parser.add_argument("--start-id", type=int, default=10000) parser.add_argument("--end-id", type=int, default=12000) parser.add_argument("--workers", type=int, default=10) return parser.parse_args() def main() -> None: args = parse_args() setup_db() print("Starting parallel scraping...") with ThreadPoolExecutor(max_workers=args.workers) as executor: for item_id in range(args.start_id, args.end_id): if _should_stop(): print("Stopping submission of new tasks.") break executor.submit(get_meta, item_id) print("Finished.") if __name__ == "__main__": main()