File size: 5,300 Bytes
b028028
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import sqlite3
import json
import time
import threading
import uuid
from datetime import datetime, timezone
from hf_api import HuggingFaceAPI


DB_PATH = "cache.db"


def get_db_connection():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn


def init_jobs_table():
    with get_db_connection() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS jobs (
                id TEXT PRIMARY KEY,
                type TEXT,
                user_id TEXT,
                status TEXT DEFAULT 'pending',
                progress_current INTEGER DEFAULT 0,
                progress_total INTEGER DEFAULT 0,
                progress_stage TEXT DEFAULT '',
                result TEXT,
                error TEXT,
                created_at REAL,
                updated_at REAL
            )
        """)
        conn.commit()


def create_job(job_type, user_id):
    job_id = str(uuid.uuid4())
    now = time.time()
    with get_db_connection() as conn:
        conn.execute(
            """INSERT INTO jobs (id, type, user_id, status, created_at, updated_at)
               VALUES (?, ?, ?, 'pending', ?, ?)""",
            (job_id, job_type, user_id, now, now),
        )
        conn.commit()
    return job_id


def update_job_progress(job_id, stage, current, total):
    with get_db_connection() as conn:
        conn.execute(
            """UPDATE jobs SET progress_stage = ?, progress_current = ?,
               progress_total = ?, status = 'running', updated_at = ? WHERE id = ?""",
            (stage, current, total, time.time(), job_id),
        )
        conn.commit()


def complete_job(job_id, result=None, error=None):
    status = "failed" if error else "completed"
    with get_db_connection() as conn:
        conn.execute(
            """UPDATE jobs SET status = ?, result = ?, error = ?, updated_at = ? WHERE id = ?""",
            (status, json.dumps(result) if result else None, error, time.time(), job_id),
        )
        conn.commit()


def get_job(job_id):
    with get_db_connection() as conn:
        row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
        if row:
            return dict(row)
    return None


def get_user_job(user_id, job_type):
    """Get the most recent job of a type for a user."""
    with get_db_connection() as conn:
        row = conn.execute(
            """SELECT * FROM jobs WHERE user_id = ? AND type = ?
               ORDER BY created_at DESC LIMIT 1""",
            (user_id, job_type),
        ).fetchone()
        if row:
            return dict(row)
    return None


def run_initial_load_job(job_id, username, token):
    """Background job to load spaces and discussions."""
    def progress_callback(stage, current, total):
        update_job_progress(job_id, stage, current, total)

    try:
        api = HuggingFaceAPI(token)

        # Fetch spaces with details
        update_job_progress(job_id, "spaces", 0, 1)
        spaces = api.fetch_spaces_with_details(username, progress_callback)

        # Fetch discussions for all spaces
        update_job_progress(job_id, "discussions", 0, len(spaces))
        discussions_map = api.fetch_all_discussions(spaces, progress_callback)

        # Store in cache
        cache_result(username, spaces, discussions_map)

        complete_job(job_id, {"spaces_count": len(spaces), "discussions_count": sum(len(d) for d in discussions_map.values())})
    except Exception as e:
        complete_job(job_id, error=str(e))


def run_wake_job(job_id, username, token, space_ids):
    """Background job to wake sleeping spaces."""
    def progress_callback(stage, current, total):
        update_job_progress(job_id, stage, current, total)

    try:
        api = HuggingFaceAPI(token)
        results = api.wake_spaces(space_ids, progress_callback)

        # Clear spaces cache
        clear_user_cache(username)

        succeeded = sum(1 for r in results if r["success"])
        failed = sum(1 for r in results if not r["success"])
        complete_job(job_id, {"results": results, "succeeded": succeeded, "failed": failed})
    except Exception as e:
        complete_job(job_id, error=str(e))


def cache_result(username, spaces, discussions_map):
    """Store fetched data in cache."""
    import time
    expires_at = time.time() + 300
    with get_db_connection() as conn:
        conn.execute(
            "INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)",
            (f"spaces:{username}", json.dumps(spaces), expires_at),
        )
        for space_id, discussions in discussions_map.items():
            conn.execute(
                "INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)",
                (f"discussions:{space_id}", json.dumps(discussions), expires_at),
            )
        conn.commit()


def clear_user_cache(username):
    """Clear user's spaces cache."""
    with get_db_connection() as conn:
        conn.execute("DELETE FROM cache WHERE key = ?", (f"spaces:{username}",))
        conn.commit()


def start_job_thread(target, *args):
    """Start a background thread for a job."""
    thread = threading.Thread(target=target, args=args, daemon=True)
    thread.start()
    return thread


# Initialize jobs table
init_jobs_table()