| import asyncio |
| import json |
| import logging |
| import os |
|
|
| import libsql |
| import requests |
|
|
| from config import Config |
|
|
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| MANIFEST_DATASET = os.getenv( |
| "HF_DATASET_NAME", |
| "multimedia-cryptography-benchmarks/encrypted-media-benchmarks" |
| ) |
|
|
| _TURSO_HTTP_URL = os.getenv( |
| "TURSO_HTTP_URL", |
| Config.TURSO_DATABASE_URL.replace("libsql://", "https://") |
| ) |
| _TURSO_RO_TOKEN = os.getenv("TURSO_RO_TOKEN", Config.TURSO_AUTH_TOKEN) |
|
|
|
|
| def turso_query(sql: str, params: list = None) -> list[dict] | None: |
| try: |
| body = {"statements": [{"q": sql}]} |
| if params: |
| body["statements"][0]["params"] = {str(i+1): p for i, p in enumerate(params)} |
| r = requests.post( |
| _TURSO_HTTP_URL, |
| headers={ |
| "Authorization": f"Bearer {_TURSO_RO_TOKEN}", |
| "Content-Type": "application/json", |
| }, |
| json=body, |
| timeout=10, |
| ) |
| if r.status_code != 200: |
| logger.error("Turso HTTP error %d: %s", r.status_code, r.text[:200]) |
| return None |
| data = r.json() |
| results = data[0]["results"] |
| cols = results["columns"] |
| rows = results["rows"] |
| return [dict(zip(cols, row)) for row in rows] |
| except Exception as e: |
| logger.error("Turso HTTP query failed: %s", e) |
| return None |
|
|
| _db = None |
|
|
|
|
| def get_db(): |
| global _db |
| if _db is None: |
| _db = libsql.connect( |
| Config.TURSO_DATABASE_URL, |
| auth_token=Config.TURSO_AUTH_TOKEN, |
| ) |
| logger.info("Database connection established.") |
| return _db |
|
|
|
|
| async def init_db() -> None: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| statements = [ |
| """CREATE TABLE IF NOT EXISTS movies ( |
| tmdb_id INTEGER PRIMARY KEY, |
| title TEXT, |
| original_title TEXT, |
| overview TEXT, |
| release_date TEXT, |
| poster_url TEXT, |
| backdrop_url TEXT, |
| vote_average REAL, |
| vote_count INTEGER, |
| popularity REAL, |
| genres TEXT, |
| runtime INTEGER, |
| tagline TEXT, |
| homepage TEXT, |
| imdb_id TEXT, |
| budget INTEGER, |
| revenue INTEGER, |
| status TEXT, |
| production_companies TEXT, |
| created_at TEXT DEFAULT (datetime('now')) |
| )""", |
| """CREATE TABLE IF NOT EXISTS series ( |
| tmdb_id INTEGER PRIMARY KEY, |
| name TEXT, |
| original_name TEXT, |
| overview TEXT, |
| first_air_date TEXT, |
| poster_url TEXT, |
| backdrop_url TEXT, |
| vote_average REAL, |
| vote_count INTEGER, |
| popularity REAL, |
| genres TEXT, |
| status TEXT, |
| homepage TEXT, |
| networks TEXT, |
| origin_country TEXT, |
| languages TEXT, |
| in_production INTEGER, |
| type TEXT, |
| production_companies TEXT, |
| created_at TEXT DEFAULT (datetime('now')) |
| )""", |
| """CREATE TABLE IF NOT EXISTS seasons ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| tmdb_id INTEGER, |
| season_number INTEGER, |
| name TEXT, |
| overview TEXT, |
| poster_url TEXT, |
| air_date TEXT, |
| episode_count INTEGER, |
| FOREIGN KEY (tmdb_id) REFERENCES series(tmdb_id) |
| )""", |
| """CREATE TABLE IF NOT EXISTS episodes ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| tmdb_id INTEGER, |
| season_number INTEGER, |
| episode_number INTEGER, |
| title TEXT, |
| overview TEXT, |
| still_url TEXT, |
| air_date TEXT, |
| FOREIGN KEY (tmdb_id) REFERENCES series(tmdb_id) |
| )""", |
| """CREATE TABLE IF NOT EXISTS archives ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| random_name TEXT UNIQUE, |
| original_filename TEXT, |
| storage_channel_id INTEGER, |
| storage_msg_id INTEGER, |
| file_size INTEGER, |
| media_type TEXT CHECK(media_type IN ('movie','series')), |
| tmdb_id INTEGER, |
| season_number INTEGER, |
| episode_number INTEGER, |
| status TEXT DEFAULT 'active' CHECK(status IN ('active','reuploading','deleted','banned')), |
| reupload_count INTEGER DEFAULT 0, |
| last_verified_at TEXT, |
| uploaded_at TEXT DEFAULT (datetime('now')) |
| )""", |
| """CREATE TABLE IF NOT EXISTS storage_messages ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| random_name TEXT NOT NULL, |
| channel_id INTEGER NOT NULL, |
| message_id INTEGER NOT NULL, |
| uploaded_at TEXT DEFAULT (datetime('now')) |
| )""", |
| ] |
| for stmt in statements: |
| await asyncio.to_thread(cursor.execute, stmt) |
| index_statements = [ |
| "CREATE UNIQUE INDEX IF NOT EXISTS idx_seasons_unique ON seasons(tmdb_id, season_number)", |
| "CREATE UNIQUE INDEX IF NOT EXISTS idx_episodes_unique ON episodes(tmdb_id, season_number, episode_number)", |
| "CREATE UNIQUE INDEX IF NOT EXISTS idx_storage_unique ON storage_messages(random_name, channel_id)", |
| ] |
| for stmt in index_statements: |
| await asyncio.to_thread(cursor.execute, stmt) |
|
|
| |
| ia_cols = ["account1_identifier", "account1_ia_url", "account1_stream_url", |
| "account2_identifier", "account2_ia_url", "account2_stream_url", |
| "backup_at", "manifest_json"] |
| for col in ia_cols: |
| try: |
| cursor.execute(f"ALTER TABLE archives DROP COLUMN {col}") |
| logger.info("Dropped IA column: %s", col) |
| except Exception: |
| pass |
|
|
| migrations = [ |
| |
| "ALTER TABLE movies ADD COLUMN vote_count INTEGER", |
| "ALTER TABLE movies ADD COLUMN popularity REAL", |
| "ALTER TABLE movies ADD COLUMN genres TEXT", |
| "ALTER TABLE movies ADD COLUMN runtime INTEGER", |
| "ALTER TABLE movies ADD COLUMN tagline TEXT", |
| "ALTER TABLE movies ADD COLUMN homepage TEXT", |
| "ALTER TABLE movies ADD COLUMN imdb_id TEXT", |
| "ALTER TABLE movies ADD COLUMN budget INTEGER", |
| "ALTER TABLE movies ADD COLUMN revenue INTEGER", |
| "ALTER TABLE movies ADD COLUMN status TEXT", |
| "ALTER TABLE movies ADD COLUMN production_companies TEXT", |
| "ALTER TABLE movies ADD COLUMN original_title TEXT", |
| "ALTER TABLE movies ADD COLUMN backdrop_url TEXT", |
| |
| "ALTER TABLE series ADD COLUMN vote_count INTEGER", |
| "ALTER TABLE series ADD COLUMN popularity REAL", |
| "ALTER TABLE series ADD COLUMN genres TEXT", |
| "ALTER TABLE series ADD COLUMN homepage TEXT", |
| "ALTER TABLE series ADD COLUMN networks TEXT", |
| "ALTER TABLE series ADD COLUMN origin_country TEXT", |
| "ALTER TABLE series ADD COLUMN languages TEXT", |
| "ALTER TABLE series ADD COLUMN in_production INTEGER", |
| "ALTER TABLE series ADD COLUMN type TEXT", |
| "ALTER TABLE series ADD COLUMN production_companies TEXT", |
| "ALTER TABLE series ADD COLUMN original_name TEXT", |
| "ALTER TABLE series ADD COLUMN backdrop_url TEXT", |
| |
| "ALTER TABLE archives ADD COLUMN file_hash TEXT", |
| "ALTER TABLE archives ADD COLUMN file_id TEXT", |
| "ALTER TABLE archives ADD COLUMN quality TEXT", |
| |
| "ALTER TABLE archives DROP COLUMN file_id", |
| "ALTER TABLE archives DROP COLUMN storage_channel_id", |
| "ALTER TABLE archives DROP COLUMN storage_msg_id", |
| |
| "ALTER TABLE archives ADD COLUMN hf_base_url TEXT", |
| "ALTER TABLE archives ADD COLUMN hf_manifest_url TEXT", |
| |
| "ALTER TABLE archives ADD COLUMN encryption_key_hex TEXT", |
| "ALTER TABLE archives ADD COLUMN encryption_iv_hex TEXT", |
| "ALTER TABLE archives ADD COLUMN manifest_data TEXT", |
| ] |
| for stmt in migrations: |
| try: |
| await asyncio.to_thread(cursor.execute, stmt) |
| except Exception: |
| pass |
| await asyncio.to_thread(db.commit) |
| logger.info("Database tables initialized successfully.") |
| except Exception as e: |
| logger.error("Failed to initialize database: %s", e) |
|
|
|
|
| async def save_movie(tmdb_data: dict) -> bool: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, """INSERT OR REPLACE INTO movies |
| (tmdb_id, title, original_title, overview, release_date, |
| poster_url, backdrop_url, vote_average, vote_count, popularity, |
| genres, runtime, tagline, homepage, imdb_id, budget, revenue, |
| status, production_companies, created_at) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, |
| COALESCE((SELECT created_at FROM movies WHERE tmdb_id = ?), datetime('now')))""", [ |
| tmdb_data["tmdb_id"], |
| tmdb_data.get("title"), |
| tmdb_data.get("original_title"), |
| tmdb_data.get("overview"), |
| tmdb_data.get("release_date"), |
| tmdb_data.get("poster_url"), |
| tmdb_data.get("backdrop_url"), |
| tmdb_data.get("vote_average"), |
| tmdb_data.get("vote_count"), |
| tmdb_data.get("popularity"), |
| tmdb_data.get("genres"), |
| tmdb_data.get("runtime"), |
| tmdb_data.get("tagline"), |
| tmdb_data.get("homepage"), |
| tmdb_data.get("imdb_id"), |
| tmdb_data.get("budget"), |
| tmdb_data.get("revenue"), |
| tmdb_data.get("status"), |
| tmdb_data.get("production_companies"), |
| tmdb_data["tmdb_id"], |
| ]) |
| await asyncio.to_thread(db.commit) |
| logger.info("Movie saved: %s (ID: %d)", tmdb_data.get("title"), tmdb_data["tmdb_id"]) |
| return True |
| except Exception as e: |
| logger.error("Failed to save movie: %s", e) |
| return False |
|
|
|
|
| async def save_series(tmdb_data: dict) -> bool: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, """INSERT OR REPLACE INTO series |
| (tmdb_id, name, original_name, overview, first_air_date, |
| poster_url, backdrop_url, vote_average, vote_count, popularity, |
| genres, status, homepage, networks, origin_country, languages, |
| in_production, type, production_companies, created_at) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, |
| COALESCE((SELECT created_at FROM series WHERE tmdb_id = ?), datetime('now')))""", [ |
| tmdb_data["tmdb_id"], |
| tmdb_data.get("name"), |
| tmdb_data.get("original_name"), |
| tmdb_data.get("overview"), |
| tmdb_data.get("first_air_date"), |
| tmdb_data.get("poster_url"), |
| tmdb_data.get("backdrop_url"), |
| tmdb_data.get("vote_average"), |
| tmdb_data.get("vote_count"), |
| tmdb_data.get("popularity"), |
| tmdb_data.get("genres"), |
| tmdb_data.get("status"), |
| tmdb_data.get("homepage"), |
| tmdb_data.get("networks"), |
| tmdb_data.get("origin_country"), |
| tmdb_data.get("languages"), |
| tmdb_data.get("in_production"), |
| tmdb_data.get("type"), |
| tmdb_data.get("production_companies"), |
| tmdb_data["tmdb_id"], |
| ]) |
| await asyncio.to_thread(db.commit) |
| logger.info("Series saved: %s (ID: %d)", tmdb_data.get("name"), tmdb_data["tmdb_id"]) |
| return True |
| except Exception as e: |
| logger.error("Failed to save series: %s", e) |
| return False |
|
|
|
|
| async def save_season(tmdb_id: int, season_data: dict) -> bool: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, """INSERT OR REPLACE INTO seasons |
| (tmdb_id, season_number, name, overview, poster_url, air_date, episode_count) |
| VALUES (?, ?, ?, ?, ?, ?, ?)""", [ |
| tmdb_id, |
| season_data.get("season_number"), |
| season_data.get("name"), |
| season_data.get("overview"), |
| season_data.get("poster_url"), |
| season_data.get("air_date"), |
| season_data.get("episode_count"), |
| ]) |
| await asyncio.to_thread(db.commit) |
| logger.info( |
| "Season saved: %d (tmdb_id: %d)", season_data.get("season_number"), tmdb_id |
| ) |
| return True |
| except Exception as e: |
| logger.error("Failed to save season: %s", e) |
| return False |
|
|
|
|
| async def save_episode(tmdb_id: int, ep_data: dict) -> bool: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, """INSERT OR REPLACE INTO episodes |
| (tmdb_id, season_number, episode_number, title, overview, still_url, air_date) |
| VALUES (?, ?, ?, ?, ?, ?, ?)""", [ |
| tmdb_id, |
| ep_data.get("season_number"), |
| ep_data.get("episode_number"), |
| ep_data.get("title"), |
| ep_data.get("overview"), |
| ep_data.get("still_url"), |
| ep_data.get("air_date"), |
| ]) |
| await asyncio.to_thread(db.commit) |
| logger.info( |
| "Episode saved: S%02dE%02d (tmdb_id: %d)", |
| ep_data.get("season_number", 0), |
| ep_data.get("episode_number", 0), |
| tmdb_id, |
| ) |
| return True |
| except Exception as e: |
| logger.error("Failed to save episode: %s", e) |
| return False |
|
|
|
|
|
|
| async def save_archive(archive_data: dict) -> bool: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, """INSERT INTO archives |
| (random_name, original_filename, |
| file_size, media_type, tmdb_id, season_number, episode_number, |
| file_hash, quality) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", [ |
| archive_data.get("random_name"), |
| archive_data.get("original_filename"), |
| archive_data.get("file_size"), |
| archive_data.get("media_type"), |
| archive_data.get("tmdb_id"), |
| archive_data.get("season_number"), |
| archive_data.get("episode_number"), |
| archive_data.get("file_hash"), |
| archive_data.get("quality"), |
| ]) |
| await asyncio.to_thread(db.commit) |
| logger.info("Archive saved: %s", archive_data.get("random_name")) |
| return True |
| except Exception as e: |
| logger.error("Failed to save archive: %s", e) |
| return False |
|
|
|
|
| async def check_duplicate_hash(file_hash: str) -> dict | None: |
| """Return existing archive record with same file_hash, or None.""" |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, |
| "SELECT random_name, original_filename, file_size, uploaded_at " |
| "FROM archives WHERE file_hash = ? AND status = 'active' LIMIT 1", |
| [file_hash], |
| ) |
| row = await asyncio.to_thread(cursor.fetchone) |
| if row: |
| return { |
| "random_name": row[0], |
| "original_filename": row[1], |
| "file_size": row[2], |
| "uploaded_at": row[3], |
| } |
| return None |
| except Exception as e: |
| logger.error("check_duplicate_hash failed: %s", e) |
| return None |
|
|
|
|
|
|
|
|
|
|
| async def get_archive_by_random_name(random_name: str) -> dict | None: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, |
| "SELECT * FROM archives WHERE random_name = ?", [random_name]) |
| row = await asyncio.to_thread(cursor.fetchone) |
| if row: |
| columns = [d[0] for d in cursor.description] |
| return dict(zip(columns, row)) |
| return None |
| except Exception as e: |
| logger.error("Failed to get archive: %s", e) |
| return None |
|
|
|
|
| async def get_all_archives() -> list[dict]: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, |
| "SELECT * FROM archives ORDER BY uploaded_at DESC") |
| rows = await asyncio.to_thread(cursor.fetchall) |
| columns = [d[0] for d in cursor.description] |
| return [dict(zip(columns, row)) for row in rows] |
| except Exception as e: |
| logger.error("Failed to get all archives: %s", e) |
| return [] |
|
|
|
|
| async def get_qualities_by_tmdb(tmdb_id: int) -> list[dict]: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, |
| "SELECT random_name, quality, file_size, uploaded_at, original_filename " |
| "FROM archives WHERE tmdb_id = ? AND status = 'active' AND quality IS NOT NULL " |
| "ORDER BY quality", [tmdb_id]) |
| rows = await asyncio.to_thread(cursor.fetchall) |
| return [ |
| {"random_name": r[0], "quality": r[1], "file_size": r[2], |
| "uploaded_at": r[3], "original_filename": r[4]} |
| for r in rows |
| ] |
| except Exception as e: |
| logger.error("Failed to get qualities: %s", e) |
| return [] |
|
|
|
|
| async def update_archive_hf(random_name: str, hf_base_url: str, hf_manifest_url: str = "") -> bool: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, |
| "UPDATE archives SET hf_base_url = ?, hf_manifest_url = ?, last_verified_at = datetime('now') WHERE random_name = ?", |
| [hf_base_url, hf_manifest_url, random_name]) |
| await asyncio.to_thread(db.commit) |
| return True |
| except Exception as e: |
| logger.error("Failed to update HF URL: %s", e) |
| return False |
|
|
|
|
| async def update_archive_reupload(random_name: str, file_id: str) -> bool: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, |
| """UPDATE archives SET |
| file_id = COALESCE(NULLIF(?, ''), file_id), |
| status = 'active', |
| reupload_count = reupload_count + 1, |
| last_verified_at = datetime('now') |
| WHERE random_name = ?""", |
| [file_id, random_name], |
| ) |
| await asyncio.to_thread(db.commit) |
| return True |
| except Exception as e: |
| logger.error("Failed to update reupload: %s", e) |
| return False |
|
|
|
|
| async def update_archive_verified(random_name: str, account1_ok: bool, account2_ok: bool) -> bool: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| status = "active" |
| if not account1_ok and not account2_ok: |
| status = "deleted" |
| await asyncio.to_thread(cursor.execute, |
| "UPDATE archives SET status = ?, last_verified_at = datetime('now') WHERE random_name = ?", |
| [status, random_name]) |
| await asyncio.to_thread(db.commit) |
| return True |
| except Exception as e: |
| logger.error("Failed to update verified: %s", e) |
| return False |
|
|
|
|
| async def get_storage_message(random_name: str) -> tuple[int, int] | None: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, |
| "SELECT channel_id, message_id FROM storage_messages WHERE random_name = ? ORDER BY id LIMIT 1", |
| [random_name]) |
| row = await asyncio.to_thread(cursor.fetchone) |
| if row and row[0] and row[1]: |
| return (row[0], row[1]) |
| return None |
| except Exception as e: |
| logger.error("Failed to get storage message: %s", e) |
| return None |
|
|
|
|
| async def get_all_storage_messages(random_name: str) -> list[tuple[int, int]]: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, |
| "SELECT channel_id, message_id FROM storage_messages WHERE random_name = ? ORDER BY id", |
| [random_name]) |
| rows = await asyncio.to_thread(cursor.fetchall) |
| return [(r[0], r[1]) for r in rows] |
| except Exception as e: |
| logger.error("Failed to get all storage messages: %s", e) |
| return [] |
|
|
|
|
| async def update_storage_message(random_name: str, channel_id: int, msg_id: int) -> bool: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, |
| "INSERT OR REPLACE INTO storage_messages (random_name, channel_id, message_id) VALUES (?, ?, ?)", |
| [random_name, channel_id, msg_id]) |
| await asyncio.to_thread(cursor.execute, |
| "UPDATE archives SET storage_channel_id = ?, storage_msg_id = ? WHERE random_name = ?", |
| [channel_id, msg_id, random_name]) |
| await asyncio.to_thread(db.commit) |
| return True |
| except Exception as e: |
| logger.error("Failed to update storage message: %s", e) |
| return False |
|
|
|
|
| async def get_storage_messages(random_name: str) -> list[tuple[str, int]]: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| rows = await asyncio.to_thread(cursor.execute, |
| "SELECT channel_id, message_id FROM storage_messages WHERE random_name = ? ORDER BY rowid", |
| [random_name]) |
| results = rows.fetchall() |
| return [(str(r[0]), int(r[1])) for r in results] |
| except Exception as e: |
| logger.error("Failed to get storage messages: %s", e) |
| return [] |
|
|
| async def save_storage_messages_batch(random_name: str, messages: list[tuple[int, int]]) -> bool: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| for channel_id, msg_id in messages: |
| await asyncio.to_thread(cursor.execute, |
| "INSERT INTO storage_messages (random_name, channel_id, message_id) VALUES (?, ?, ?)", |
| [random_name, channel_id, msg_id]) |
| await asyncio.to_thread(db.commit) |
| logger.info("Saved %d storage messages for %s", len(messages), random_name) |
| return True |
| except Exception as e: |
| logger.error("Failed to save storage messages batch: %s", e) |
| return False |
|
|
|
|
| async def save_manifest(random_name: str, manifest_json: str) -> bool: |
| try: |
| os.makedirs(Config.MANIFEST_DIR, exist_ok=True) |
| path = os.path.join(Config.MANIFEST_DIR, f"{random_name}.json") |
| await asyncio.to_thread(lambda: open(path, "w").write(manifest_json)) |
| logger.info("Manifest saved to filesystem: %s", path) |
| return True |
| except Exception as e: |
| logger.error("Failed to save manifest: %s", e) |
| return False |
|
|
|
|
| async def get_manifest(random_name: str) -> str | None: |
| try: |
| path = os.path.join(Config.MANIFEST_DIR, f"{random_name}.json") |
| if os.path.exists(path): |
| data = await asyncio.to_thread(lambda: open(path, "r").read()) |
| return data |
| except Exception as e: |
| logger.warning("Failed to read manifest from filesystem: %s", e) |
| return None |
|
|
|
|
| async def save_manifest_with_key( |
| random_name: str, |
| manifest_json: dict, |
| key_hex: str, |
| iv_hex: str, |
| ) -> bool: |
| try: |
| db = get_db() |
| cursor = db.cursor() |
| await asyncio.to_thread(cursor.execute, |
| """UPDATE archives SET |
| manifest_data = ?, |
| encryption_key_hex = ?, |
| encryption_iv_hex = ?, |
| last_verified_at = datetime('now') |
| WHERE random_name = ?""", |
| [json.dumps(manifest_json), key_hex, iv_hex, random_name], |
| ) |
| await asyncio.to_thread(db.commit) |
| return True |
| except Exception as e: |
| logger.error("Failed to save manifest with key: %s", e) |
| return False |
|
|
|
|
| async def get_manifest_from_turso(random_name: str) -> dict | None: |
| rows = await asyncio.to_thread( |
| turso_query, |
| "SELECT manifest_data, encryption_key_hex, encryption_iv_hex FROM archives WHERE random_name = ? AND status = 'active'", |
| [random_name], |
| ) |
| if not rows: |
| return None |
| try: |
| manifest = json.loads(rows[0]["manifest_data"]) |
| manifest["_key_hex"] = rows[0]["encryption_key_hex"] |
| manifest["_iv_hex"] = rows[0]["encryption_iv_hex"] |
| return manifest |
| except Exception as e: |
| logger.error("Failed to parse manifest from Turso: %s", e) |
| return None |
|
|
|
|
| async def sync_manifests_from_dataset() -> int: |
| """Download all manifest files from HF Dataset to local filesystem on startup. |
| Returns number of manifests synced.""" |
| try: |
| from huggingface_hub import HfApi |
| api = HfApi() |
| token = os.getenv("HF_TOKEN", Config.HF_TOKEN) |
| files = api.list_repo_files(repo_id=MANIFEST_DATASET, repo_type="dataset", token=token) |
| os.makedirs(Config.MANIFEST_DIR, exist_ok=True) |
| count = 0 |
| import shutil |
| for fname in files: |
| if not fname.endswith(".json"): |
| continue |
| try: |
| cached = api.hf_hub_download( |
| repo_id=MANIFEST_DATASET, repo_type="dataset", |
| filename=fname, token=token, |
| ) |
| dst = os.path.join(Config.MANIFEST_DIR, fname) |
| shutil.copy2(cached, dst) |
| count += 1 |
| except Exception as e: |
| logger.warning("Failed to download manifest %s: %s", fname, e) |
| logger.info("Synced %d manifests from HF Dataset", count) |
| return count |
| except Exception as e: |
| logger.warning("Failed to sync manifests from HF Dataset: %s", e) |
| return 0 |
|
|