spaces-dashboard / jobs.py
mrfakename's picture
jobs
b028028
import sqlite3
import json
import time
import threading
import uuid
from datetime import datetime, timezone
from hf_api import HuggingFaceAPI
DB_PATH = "cache.db"
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_jobs_table():
with get_db_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
type TEXT,
user_id TEXT,
status TEXT DEFAULT 'pending',
progress_current INTEGER DEFAULT 0,
progress_total INTEGER DEFAULT 0,
progress_stage TEXT DEFAULT '',
result TEXT,
error TEXT,
created_at REAL,
updated_at REAL
)
""")
conn.commit()
def create_job(job_type, user_id):
job_id = str(uuid.uuid4())
now = time.time()
with get_db_connection() as conn:
conn.execute(
"""INSERT INTO jobs (id, type, user_id, status, created_at, updated_at)
VALUES (?, ?, ?, 'pending', ?, ?)""",
(job_id, job_type, user_id, now, now),
)
conn.commit()
return job_id
def update_job_progress(job_id, stage, current, total):
with get_db_connection() as conn:
conn.execute(
"""UPDATE jobs SET progress_stage = ?, progress_current = ?,
progress_total = ?, status = 'running', updated_at = ? WHERE id = ?""",
(stage, current, total, time.time(), job_id),
)
conn.commit()
def complete_job(job_id, result=None, error=None):
status = "failed" if error else "completed"
with get_db_connection() as conn:
conn.execute(
"""UPDATE jobs SET status = ?, result = ?, error = ?, updated_at = ? WHERE id = ?""",
(status, json.dumps(result) if result else None, error, time.time(), job_id),
)
conn.commit()
def get_job(job_id):
with get_db_connection() as conn:
row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
if row:
return dict(row)
return None
def get_user_job(user_id, job_type):
"""Get the most recent job of a type for a user."""
with get_db_connection() as conn:
row = conn.execute(
"""SELECT * FROM jobs WHERE user_id = ? AND type = ?
ORDER BY created_at DESC LIMIT 1""",
(user_id, job_type),
).fetchone()
if row:
return dict(row)
return None
def run_initial_load_job(job_id, username, token):
"""Background job to load spaces and discussions."""
def progress_callback(stage, current, total):
update_job_progress(job_id, stage, current, total)
try:
api = HuggingFaceAPI(token)
# Fetch spaces with details
update_job_progress(job_id, "spaces", 0, 1)
spaces = api.fetch_spaces_with_details(username, progress_callback)
# Fetch discussions for all spaces
update_job_progress(job_id, "discussions", 0, len(spaces))
discussions_map = api.fetch_all_discussions(spaces, progress_callback)
# Store in cache
cache_result(username, spaces, discussions_map)
complete_job(job_id, {"spaces_count": len(spaces), "discussions_count": sum(len(d) for d in discussions_map.values())})
except Exception as e:
complete_job(job_id, error=str(e))
def run_wake_job(job_id, username, token, space_ids):
"""Background job to wake sleeping spaces."""
def progress_callback(stage, current, total):
update_job_progress(job_id, stage, current, total)
try:
api = HuggingFaceAPI(token)
results = api.wake_spaces(space_ids, progress_callback)
# Clear spaces cache
clear_user_cache(username)
succeeded = sum(1 for r in results if r["success"])
failed = sum(1 for r in results if not r["success"])
complete_job(job_id, {"results": results, "succeeded": succeeded, "failed": failed})
except Exception as e:
complete_job(job_id, error=str(e))
def cache_result(username, spaces, discussions_map):
"""Store fetched data in cache."""
import time
expires_at = time.time() + 300
with get_db_connection() as conn:
conn.execute(
"INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)",
(f"spaces:{username}", json.dumps(spaces), expires_at),
)
for space_id, discussions in discussions_map.items():
conn.execute(
"INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)",
(f"discussions:{space_id}", json.dumps(discussions), expires_at),
)
conn.commit()
def clear_user_cache(username):
"""Clear user's spaces cache."""
with get_db_connection() as conn:
conn.execute("DELETE FROM cache WHERE key = ?", (f"spaces:{username}",))
conn.commit()
def start_job_thread(target, *args):
"""Start a background thread for a job."""
thread = threading.Thread(target=target, args=args, daemon=True)
thread.start()
return thread
# Initialize jobs table
init_jobs_table()