Spaces:
Sleeping
Sleeping
File size: 5,300 Bytes
b028028 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | import sqlite3
import json
import time
import threading
import uuid
from datetime import datetime, timezone
from hf_api import HuggingFaceAPI
DB_PATH = "cache.db"
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_jobs_table():
with get_db_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
type TEXT,
user_id TEXT,
status TEXT DEFAULT 'pending',
progress_current INTEGER DEFAULT 0,
progress_total INTEGER DEFAULT 0,
progress_stage TEXT DEFAULT '',
result TEXT,
error TEXT,
created_at REAL,
updated_at REAL
)
""")
conn.commit()
def create_job(job_type, user_id):
job_id = str(uuid.uuid4())
now = time.time()
with get_db_connection() as conn:
conn.execute(
"""INSERT INTO jobs (id, type, user_id, status, created_at, updated_at)
VALUES (?, ?, ?, 'pending', ?, ?)""",
(job_id, job_type, user_id, now, now),
)
conn.commit()
return job_id
def update_job_progress(job_id, stage, current, total):
with get_db_connection() as conn:
conn.execute(
"""UPDATE jobs SET progress_stage = ?, progress_current = ?,
progress_total = ?, status = 'running', updated_at = ? WHERE id = ?""",
(stage, current, total, time.time(), job_id),
)
conn.commit()
def complete_job(job_id, result=None, error=None):
status = "failed" if error else "completed"
with get_db_connection() as conn:
conn.execute(
"""UPDATE jobs SET status = ?, result = ?, error = ?, updated_at = ? WHERE id = ?""",
(status, json.dumps(result) if result else None, error, time.time(), job_id),
)
conn.commit()
def get_job(job_id):
with get_db_connection() as conn:
row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
if row:
return dict(row)
return None
def get_user_job(user_id, job_type):
"""Get the most recent job of a type for a user."""
with get_db_connection() as conn:
row = conn.execute(
"""SELECT * FROM jobs WHERE user_id = ? AND type = ?
ORDER BY created_at DESC LIMIT 1""",
(user_id, job_type),
).fetchone()
if row:
return dict(row)
return None
def run_initial_load_job(job_id, username, token):
"""Background job to load spaces and discussions."""
def progress_callback(stage, current, total):
update_job_progress(job_id, stage, current, total)
try:
api = HuggingFaceAPI(token)
# Fetch spaces with details
update_job_progress(job_id, "spaces", 0, 1)
spaces = api.fetch_spaces_with_details(username, progress_callback)
# Fetch discussions for all spaces
update_job_progress(job_id, "discussions", 0, len(spaces))
discussions_map = api.fetch_all_discussions(spaces, progress_callback)
# Store in cache
cache_result(username, spaces, discussions_map)
complete_job(job_id, {"spaces_count": len(spaces), "discussions_count": sum(len(d) for d in discussions_map.values())})
except Exception as e:
complete_job(job_id, error=str(e))
def run_wake_job(job_id, username, token, space_ids):
"""Background job to wake sleeping spaces."""
def progress_callback(stage, current, total):
update_job_progress(job_id, stage, current, total)
try:
api = HuggingFaceAPI(token)
results = api.wake_spaces(space_ids, progress_callback)
# Clear spaces cache
clear_user_cache(username)
succeeded = sum(1 for r in results if r["success"])
failed = sum(1 for r in results if not r["success"])
complete_job(job_id, {"results": results, "succeeded": succeeded, "failed": failed})
except Exception as e:
complete_job(job_id, error=str(e))
def cache_result(username, spaces, discussions_map):
"""Store fetched data in cache."""
import time
expires_at = time.time() + 300
with get_db_connection() as conn:
conn.execute(
"INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)",
(f"spaces:{username}", json.dumps(spaces), expires_at),
)
for space_id, discussions in discussions_map.items():
conn.execute(
"INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)",
(f"discussions:{space_id}", json.dumps(discussions), expires_at),
)
conn.commit()
def clear_user_cache(username):
"""Clear user's spaces cache."""
with get_db_connection() as conn:
conn.execute("DELETE FROM cache WHERE key = ?", (f"spaces:{username}",))
conn.commit()
def start_job_thread(target, *args):
"""Start a background thread for a job."""
thread = threading.Thread(target=target, args=args, daemon=True)
thread.start()
return thread
# Initialize jobs table
init_jobs_table()
|