67rp / scraper /code /_1extractor.py
Jovanseetk
Prepare Hugging Face Spaces deploy
554d9f2
from __future__ import annotations
import argparse
import sqlite3
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import requests
DB_NAME = "grail_data.db"
REQUEST_TIMEOUT_SECONDS = 10
REQUEST_PACING_SECONDS = 0.5
MAX_CONSECUTIVE_FAILURES = 1000
failure_counter = 0
counter_lock = threading.Lock()
def setup_db() -> None:
conn = sqlite3.connect(DB_NAME)
curr = conn.cursor()
curr.execute("PRAGMA foreign_keys = ON;")
curr.executescript("""
CREATE TABLE IF NOT EXISTS category (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS subject (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
category_id INTEGER,
FOREIGN KEY (category_id) REFERENCES category(id)
);
CREATE TABLE IF NOT EXISTS account (
id INTEGER PRIMARY KEY,
username TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS filedata (
id INTEGER PRIMARY KEY,
year INTEGER,
document_name TEXT,
file_name TEXT,
uploaded_on TEXT,
subject_id INTEGER,
account_id INTEGER,
category_id INTEGER,
extension TEXT,
approved BOOLEAN,
FOREIGN KEY (subject_id) REFERENCES subject(id),
FOREIGN KEY (account_id) REFERENCES account(id),
FOREIGN KEY (category_id) REFERENCES category(id)
);
CREATE TABLE IF NOT EXISTS successlog(
id INTEGER PRIMARY KEY,
success BOOLEAN DEFAULT 0,
downloaded BOOLEAN DEFAULT 0
)
""")
conn.commit()
conn.close()
print("Database schema initialized.")
def _should_stop() -> bool:
with counter_lock:
return failure_counter >= MAX_CONSECUTIVE_FAILURES
def _register_failure() -> None:
global failure_counter
with counter_lock:
failure_counter += 1
if failure_counter >= MAX_CONSECUTIVE_FAILURES:
print("MAX consecutive failures reached. Stopping new submissions.")
def _reset_failure_counter() -> None:
global failure_counter
with counter_lock:
failure_counter = 0
def get_meta(number: int) -> int:
global failure_counter
returni = 0
if _should_stop():
return returni
thread_conn = sqlite3.connect(DB_NAME)
thread_curr = thread_conn.cursor()
thread_curr.execute("PRAGMA foreign_keys = ON;")
thread_curr.execute(
"SELECT success FROM successlog WHERE id = ? AND success = 1",
(number,),
)
if thread_curr.fetchone():
print(f"Skipping {number}, already successfully processed.")
thread_conn.close()
return 0
url = f"https://api.grail.moe/note/{number}"
try:
time.sleep(REQUEST_PACING_SECONDS)
response = requests.get(url, timeout=REQUEST_TIMEOUT_SECONDS)
if response.status_code == 200:
data = response.json()
thread_curr.execute(
"INSERT OR REPLACE INTO successlog (id, success) VALUES (?, ?)",
(number, 1),
)
thread_curr.execute(
"INSERT OR IGNORE INTO category (id, name) VALUES (?, ?)",
(data["doc_category"]["id"], data["doc_category"]["name"]),
)
thread_curr.execute(
"INSERT OR IGNORE INTO subject (id, name, category_id) VALUES (?, ?, ?)",
(
data["doc_subject"]["id"],
data["doc_subject"]["name"],
data["doc_category"]["id"],
),
)
thread_curr.execute(
"INSERT OR IGNORE INTO account (id, username) VALUES (?, ?)",
(data["account"]["user_id"], data["account"]["username"]),
)
is_approved = 1 if data["approved"] else 0
thread_curr.execute(
"""
INSERT OR REPLACE INTO filedata (id, year, document_name, file_name, uploaded_on, subject_id, category_id, account_id, extension, approved)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
data["id"],
data["year"],
data["document_name"],
data["file_name"],
data["uploaded_on"],
data["doc_subject"]["id"],
data["category"],
data["account"]["user_id"],
data["extension"],
is_approved,
),
)
print(f"Saved: {data['id']}")
_reset_failure_counter()
returni = 1
else:
thread_curr.execute(
"INSERT OR REPLACE INTO successlog (id, success) VALUES (?, ?)",
(number, 0),
)
print(f"Failed/Not Found: {number} (Status: {response.status_code})")
_register_failure()
except requests.RequestException as exc:
print(f"Request error for {number}: {exc}")
_register_failure()
except (KeyError, ValueError, sqlite3.DatabaseError) as exc:
print(f"Data/database error for {number}: {exc}")
_register_failure()
finally:
thread_conn.commit()
thread_conn.close()
return returni
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Extract metadata from grail API into sqlite.")
# parser.add_argument('--max-fail',type = int, default=1000)
parser.add_argument("--start-id", type=int, default=10000)
parser.add_argument("--end-id", type=int, default=12000)
parser.add_argument("--workers", type=int, default=10)
return parser.parse_args()
def main() -> None:
args = parse_args()
setup_db()
print("Starting parallel scraping...")
with ThreadPoolExecutor(max_workers=args.workers) as executor:
for item_id in range(args.start_id, args.end_id):
if _should_stop():
print("Stopping submission of new tasks.")
break
executor.submit(get_meta, item_id)
print("Finished.")
if __name__ == "__main__":
main()