import sqlite3 import json import time import threading import uuid from datetime import datetime, timezone from hf_api import HuggingFaceAPI DB_PATH = "cache.db" def get_db_connection(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_jobs_table(): with get_db_connection() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, type TEXT, user_id TEXT, status TEXT DEFAULT 'pending', progress_current INTEGER DEFAULT 0, progress_total INTEGER DEFAULT 0, progress_stage TEXT DEFAULT '', result TEXT, error TEXT, created_at REAL, updated_at REAL ) """) conn.commit() def create_job(job_type, user_id): job_id = str(uuid.uuid4()) now = time.time() with get_db_connection() as conn: conn.execute( """INSERT INTO jobs (id, type, user_id, status, created_at, updated_at) VALUES (?, ?, ?, 'pending', ?, ?)""", (job_id, job_type, user_id, now, now), ) conn.commit() return job_id def update_job_progress(job_id, stage, current, total): with get_db_connection() as conn: conn.execute( """UPDATE jobs SET progress_stage = ?, progress_current = ?, progress_total = ?, status = 'running', updated_at = ? WHERE id = ?""", (stage, current, total, time.time(), job_id), ) conn.commit() def complete_job(job_id, result=None, error=None): status = "failed" if error else "completed" with get_db_connection() as conn: conn.execute( """UPDATE jobs SET status = ?, result = ?, error = ?, updated_at = ? WHERE id = ?""", (status, json.dumps(result) if result else None, error, time.time(), job_id), ) conn.commit() def get_job(job_id): with get_db_connection() as conn: row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone() if row: return dict(row) return None def get_user_job(user_id, job_type): """Get the most recent job of a type for a user.""" with get_db_connection() as conn: row = conn.execute( """SELECT * FROM jobs WHERE user_id = ? AND type = ? ORDER BY created_at DESC LIMIT 1""", (user_id, job_type), ).fetchone() if row: return dict(row) return None def run_initial_load_job(job_id, username, token): """Background job to load spaces and discussions.""" def progress_callback(stage, current, total): update_job_progress(job_id, stage, current, total) try: api = HuggingFaceAPI(token) # Fetch spaces with details update_job_progress(job_id, "spaces", 0, 1) spaces = api.fetch_spaces_with_details(username, progress_callback) # Fetch discussions for all spaces update_job_progress(job_id, "discussions", 0, len(spaces)) discussions_map = api.fetch_all_discussions(spaces, progress_callback) # Store in cache cache_result(username, spaces, discussions_map) complete_job(job_id, {"spaces_count": len(spaces), "discussions_count": sum(len(d) for d in discussions_map.values())}) except Exception as e: complete_job(job_id, error=str(e)) def run_wake_job(job_id, username, token, space_ids): """Background job to wake sleeping spaces.""" def progress_callback(stage, current, total): update_job_progress(job_id, stage, current, total) try: api = HuggingFaceAPI(token) results = api.wake_spaces(space_ids, progress_callback) # Clear spaces cache clear_user_cache(username) succeeded = sum(1 for r in results if r["success"]) failed = sum(1 for r in results if not r["success"]) complete_job(job_id, {"results": results, "succeeded": succeeded, "failed": failed}) except Exception as e: complete_job(job_id, error=str(e)) def cache_result(username, spaces, discussions_map): """Store fetched data in cache.""" import time expires_at = time.time() + 300 with get_db_connection() as conn: conn.execute( "INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)", (f"spaces:{username}", json.dumps(spaces), expires_at), ) for space_id, discussions in discussions_map.items(): conn.execute( "INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)", (f"discussions:{space_id}", json.dumps(discussions), expires_at), ) conn.commit() def clear_user_cache(username): """Clear user's spaces cache.""" with get_db_connection() as conn: conn.execute("DELETE FROM cache WHERE key = ?", (f"spaces:{username}",)) conn.commit() def start_job_thread(target, *args): """Start a background thread for a job.""" thread = threading.Thread(target=target, args=args, daemon=True) thread.start() return thread # Initialize jobs table init_jobs_table()