Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import sqlite3 | |
| import threading | |
| import time | |
| import subprocess | |
| import signal | |
| from typing import List, Dict, Any | |
| from fastapi import FastAPI, Request, Form, HTTPException, Depends, Response | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| from huggingface_hub import HfApi, hf_hub_download | |
| import pathlib | |
| # Config from env | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO") # e.g. "username/sqlite-backup" | |
| ADMIN_TOKEN = os.environ.get("ADMIN_TOKEN", "change-me") | |
| DB_FILE = "/tmp/app.db" | |
| BACKUP_INTERVAL_SECONDS = int(os.environ.get("BACKUP_INTERVAL_SECONDS", 3 * 3600)) # default 3 hours | |
| PING_INTERVAL_SECONDS = int(os.environ.get("PING_INTERVAL_SECONDS", 15 * 60)) # 15 minutes default | |
| templates = Jinja2Templates(directory="templates") | |
| app = FastAPI(docs_url=None, redoc_url=None) # disable public docs by default | |
| # Ensure static dir exists if used | |
| if os.path.isdir("static"): | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Database helpers | |
| def get_db_conn(): | |
| conn = sqlite3.connect(DB_FILE, check_same_thread=False) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def init_db(): | |
| conn = get_db_conn() | |
| cur = conn.cursor() | |
| cur.execute(""" | |
| CREATE TABLE IF NOT EXISTS sites ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| url TEXT NOT NULL UNIQUE, | |
| label TEXT, | |
| active INTEGER NOT NULL DEFAULT 1, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| )""") | |
| cur.execute(""" | |
| CREATE TABLE IF NOT EXISTS pings ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| site_id INTEGER NOT NULL, | |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| status_code INTEGER, | |
| latency_ms INTEGER, | |
| error TEXT, | |
| FOREIGN KEY(site_id) REFERENCES sites(id) ON DELETE CASCADE | |
| )""") | |
| conn.commit() | |
| conn.close() | |
| # Restore DB from Hugging Face dataset repo if exists | |
| def restore_db_from_hub(): | |
| if not HF_TOKEN or not HF_DATASET_REPO: | |
| print("HF_TOKEN or HF_DATASET_REPO not set; skipping restore.") | |
| return | |
| print("Attempting to restore DB from Hugging Face Hub:", HF_DATASET_REPO) | |
| try: | |
| path = hf_hub_download(repo_id=HF_DATASET_REPO, filename="app.db", repo_type="dataset", token=HF_TOKEN) | |
| # copy into /tmp/app.db (hf_hub_download returns a path in the local cache) | |
| import shutil | |
| shutil.copy(path, DB_FILE) | |
| print("Restored DB from hub to", DB_FILE) | |
| except Exception as e: | |
| print("No remote DB found or download failed:", e) | |
| # Backup uploader | |
| def upload_db_to_hub(): | |
| if not HF_TOKEN or not HF_DATASET_REPO: | |
| print("HF_TOKEN or HF_DATASET_REPO not set; skipping upload.") | |
| return | |
| print("Uploading DB to Hugging Face Hub:", HF_DATASET_REPO) | |
| api = HfApi(token=HF_TOKEN) | |
| try: | |
| api.upload_file( | |
| path_or_fileobj=DB_FILE, | |
| path_in_repo="app.db", | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| repo_type_arg="dataset" | |
| ) | |
| print("Upload successful") | |
| except Exception as e: | |
| print("Failed to upload DB:", e) | |
| def backup_loop(): | |
| # Periodic backing up thread | |
| while True: | |
| try: | |
| if os.path.exists(DB_FILE): | |
| upload_db_to_hub() | |
| except Exception as e: | |
| print("Backup loop error:", e) | |
| time.sleep(BACKUP_INTERVAL_SECONDS) | |
| # Supervisor that spawns worker subprocess per site | |
| class Supervisor: | |
| def __init__(self): | |
| self.procs = {} # site_id -> subprocess.Popen | |
| self.lock = threading.Lock() | |
| self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True) | |
| self.running = True | |
| self.monitor_thread.start() | |
| def spawn_worker(self, site_id: int): | |
| with self.lock: | |
| if site_id in self.procs and self.procs[site_id].poll() is None: | |
| # already running | |
| return | |
| cmd = ["python", "worker.py", str(site_id), DB_FILE, str(PING_INTERVAL_SECONDS)] | |
| print("Spawning worker:", cmd) | |
| proc = subprocess.Popen(cmd) | |
| self.procs[site_id] = proc | |
| def stop_worker(self, site_id: int): | |
| with self.lock: | |
| p = self.procs.get(site_id) | |
| if p and p.poll() is None: | |
| p.terminate() | |
| try: | |
| p.wait(timeout=5) | |
| except Exception: | |
| p.kill() | |
| self.procs.pop(site_id, None) | |
| def monitor_loop(self): | |
| while self.running: | |
| try: | |
| conn = get_db_conn() | |
| cur = conn.cursor() | |
| cur.execute("SELECT id FROM sites WHERE active = 1") | |
| rows = cur.fetchall() | |
| active_ids = {r["id"] for r in rows} | |
| # spawn missing workers | |
| for sid in active_ids: | |
| if sid not in self.procs or self.procs[sid].poll() is not None: | |
| self.spawn_worker(sid) | |
| # stop workers for removed sites | |
| for sid in list(self.procs.keys()): | |
| if sid not in active_ids: | |
| self.stop_worker(sid) | |
| conn.close() | |
| except Exception as e: | |
| print("Supervisor monitor loop error:", e) | |
| time.sleep(10) | |
| def shutdown(self): | |
| self.running = False | |
| with self.lock: | |
| for sid, p in list(self.procs.items()): | |
| try: | |
| p.terminate() | |
| except Exception: | |
| pass | |
| supervisor = None | |
| def startup_event(): | |
| # restore DB from hub if possible | |
| restore_db_from_hub() | |
| # ensure DB tables exist | |
| init_db() | |
| # start backup thread | |
| t = threading.Thread(target=backup_loop, daemon=True) | |
| t.start() | |
| # start supervisor | |
| global supervisor | |
| supervisor = Supervisor() | |
| print("Startup complete.") | |
| def shutdown_event(): | |
| global supervisor | |
| if supervisor: | |
| supervisor.shutdown() | |
| # do a final backup attempt | |
| try: | |
| if os.path.exists(DB_FILE): | |
| upload_db_to_hub() | |
| except Exception as e: | |
| print("Final backup failed:", e) | |
| print("Shutdown complete.") | |
| # --- API & UI routes --- | |
| def index(request: Request): | |
| conn = get_db_conn() | |
| cur = conn.cursor() | |
| cur.execute("SELECT * FROM sites ORDER BY created_at DESC") | |
| sites = cur.fetchall() | |
| # latest ping for each site | |
| data = [] | |
| for s in sites: | |
| cur.execute("SELECT * FROM pings WHERE site_id = ? ORDER BY timestamp DESC LIMIT 1", (s["id"],)) | |
| last = cur.fetchone() | |
| data.append({"site": s, "last": last}) | |
| conn.close() | |
| return templates.TemplateResponse("index.html", {"request": request, "sites": data}) | |
| def add_site(url: str = Form(...), label: str = Form(None)): | |
| conn = get_db_conn() | |
| cur = conn.cursor() | |
| try: | |
| cur.execute("INSERT OR IGNORE INTO sites (url, label) VALUES (?, ?)", (url.strip(), label)) | |
| conn.commit() | |
| # spawn immediately | |
| cur.execute("SELECT id FROM sites WHERE url = ?", (url.strip(),)) | |
| row = cur.fetchone() | |
| if row: | |
| sid = row["id"] | |
| supervisor.spawn_worker(sid) | |
| except Exception as e: | |
| conn.close() | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| conn.close() | |
| return Response(status_code=303, headers={"Location": "/"}) | |
| def remove_site(site_id: int = Form(...)): | |
| conn = get_db_conn() | |
| cur = conn.cursor() | |
| cur.execute("UPDATE sites SET active = 0 WHERE id = ?", (site_id,)) | |
| conn.commit() | |
| conn.close() | |
| # supervisor will stop it | |
| return Response(status_code=303, headers={"Location": "/"}) | |
| # Admin endpoint to download DB (token protected) | |
| def admin_download_db(token: str): | |
| if token != ADMIN_TOKEN: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| if not os.path.exists(DB_FILE): | |
| raise HTTPException(status_code=404, detail="DB not found") | |
| # stream DB file | |
| def iterfile(): | |
| with open(DB_FILE, "rb") as f: | |
| while True: | |
| chunk = f.read(4096) | |
| if not chunk: | |
| break | |
| yield chunk | |
| return Response(iterfile(), media_type="application/octet-stream") | |
| # Small API for status (optional) | |
| def api_sites(): | |
| conn = get_db_conn() | |
| cur = conn.cursor() | |
| cur.execute("SELECT * FROM sites WHERE active = 1") | |
| sites = [dict(r) for r in cur.fetchall()] | |
| for s in sites: | |
| cur.execute("SELECT * FROM pings WHERE site_id = ? ORDER BY timestamp DESC LIMIT 1", (s["id"],)) | |
| last = cur.fetchone() | |
| s["last_ping"] = dict(last) if last else None | |
| conn.close() | |
| return {"sites": sites} | |