# app.py import os import sqlite3 import threading import time import subprocess import signal from typing import List, Dict, Any from fastapi import FastAPI, Request, Form, HTTPException, Depends, Response from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from huggingface_hub import HfApi, hf_hub_download import pathlib # Config from env HF_TOKEN = os.environ.get("HF_TOKEN") HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO") # e.g. "username/sqlite-backup" ADMIN_TOKEN = os.environ.get("ADMIN_TOKEN", "change-me") DB_FILE = "/tmp/app.db" BACKUP_INTERVAL_SECONDS = int(os.environ.get("BACKUP_INTERVAL_SECONDS", 3 * 3600)) # default 3 hours PING_INTERVAL_SECONDS = int(os.environ.get("PING_INTERVAL_SECONDS", 15 * 60)) # 15 minutes default templates = Jinja2Templates(directory="templates") app = FastAPI(docs_url=None, redoc_url=None) # disable public docs by default # Ensure static dir exists if used if os.path.isdir("static"): app.mount("/static", StaticFiles(directory="static"), name="static") # Database helpers def get_db_conn(): conn = sqlite3.connect(DB_FILE, check_same_thread=False) conn.row_factory = sqlite3.Row return conn def init_db(): conn = get_db_conn() cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS sites ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT NOT NULL UNIQUE, label TEXT, active INTEGER NOT NULL DEFAULT 1, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )""") cur.execute(""" CREATE TABLE IF NOT EXISTS pings ( id INTEGER PRIMARY KEY AUTOINCREMENT, site_id INTEGER NOT NULL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, status_code INTEGER, latency_ms INTEGER, error TEXT, FOREIGN KEY(site_id) REFERENCES sites(id) ON DELETE CASCADE )""") conn.commit() conn.close() # Restore DB from Hugging Face dataset repo if exists def restore_db_from_hub(): if not HF_TOKEN or not HF_DATASET_REPO: print("HF_TOKEN or HF_DATASET_REPO not set; skipping restore.") return print("Attempting to restore DB from Hugging Face Hub:", HF_DATASET_REPO) try: path = hf_hub_download(repo_id=HF_DATASET_REPO, filename="app.db", repo_type="dataset", token=HF_TOKEN) # copy into /tmp/app.db (hf_hub_download returns a path in the local cache) import shutil shutil.copy(path, DB_FILE) print("Restored DB from hub to", DB_FILE) except Exception as e: print("No remote DB found or download failed:", e) # Backup uploader def upload_db_to_hub(): if not HF_TOKEN or not HF_DATASET_REPO: print("HF_TOKEN or HF_DATASET_REPO not set; skipping upload.") return print("Uploading DB to Hugging Face Hub:", HF_DATASET_REPO) api = HfApi(token=HF_TOKEN) try: api.upload_file( path_or_fileobj=DB_FILE, path_in_repo="app.db", repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN, repo_type_arg="dataset" ) print("Upload successful") except Exception as e: print("Failed to upload DB:", e) def backup_loop(): # Periodic backing up thread while True: try: if os.path.exists(DB_FILE): upload_db_to_hub() except Exception as e: print("Backup loop error:", e) time.sleep(BACKUP_INTERVAL_SECONDS) # Supervisor that spawns worker subprocess per site class Supervisor: def __init__(self): self.procs = {} # site_id -> subprocess.Popen self.lock = threading.Lock() self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True) self.running = True self.monitor_thread.start() def spawn_worker(self, site_id: int): with self.lock: if site_id in self.procs and self.procs[site_id].poll() is None: # already running return cmd = ["python", "worker.py", str(site_id), DB_FILE, str(PING_INTERVAL_SECONDS)] print("Spawning worker:", cmd) proc = subprocess.Popen(cmd) self.procs[site_id] = proc def stop_worker(self, site_id: int): with self.lock: p = self.procs.get(site_id) if p and p.poll() is None: p.terminate() try: p.wait(timeout=5) except Exception: p.kill() self.procs.pop(site_id, None) def monitor_loop(self): while self.running: try: conn = get_db_conn() cur = conn.cursor() cur.execute("SELECT id FROM sites WHERE active = 1") rows = cur.fetchall() active_ids = {r["id"] for r in rows} # spawn missing workers for sid in active_ids: if sid not in self.procs or self.procs[sid].poll() is not None: self.spawn_worker(sid) # stop workers for removed sites for sid in list(self.procs.keys()): if sid not in active_ids: self.stop_worker(sid) conn.close() except Exception as e: print("Supervisor monitor loop error:", e) time.sleep(10) def shutdown(self): self.running = False with self.lock: for sid, p in list(self.procs.items()): try: p.terminate() except Exception: pass supervisor = None @app.on_event("startup") def startup_event(): # restore DB from hub if possible restore_db_from_hub() # ensure DB tables exist init_db() # start backup thread t = threading.Thread(target=backup_loop, daemon=True) t.start() # start supervisor global supervisor supervisor = Supervisor() print("Startup complete.") @app.on_event("shutdown") def shutdown_event(): global supervisor if supervisor: supervisor.shutdown() # do a final backup attempt try: if os.path.exists(DB_FILE): upload_db_to_hub() except Exception as e: print("Final backup failed:", e) print("Shutdown complete.") # --- API & UI routes --- @app.get("/", include_in_schema=False) def index(request: Request): conn = get_db_conn() cur = conn.cursor() cur.execute("SELECT * FROM sites ORDER BY created_at DESC") sites = cur.fetchall() # latest ping for each site data = [] for s in sites: cur.execute("SELECT * FROM pings WHERE site_id = ? ORDER BY timestamp DESC LIMIT 1", (s["id"],)) last = cur.fetchone() data.append({"site": s, "last": last}) conn.close() return templates.TemplateResponse("index.html", {"request": request, "sites": data}) @app.post("/add", include_in_schema=False) def add_site(url: str = Form(...), label: str = Form(None)): conn = get_db_conn() cur = conn.cursor() try: cur.execute("INSERT OR IGNORE INTO sites (url, label) VALUES (?, ?)", (url.strip(), label)) conn.commit() # spawn immediately cur.execute("SELECT id FROM sites WHERE url = ?", (url.strip(),)) row = cur.fetchone() if row: sid = row["id"] supervisor.spawn_worker(sid) except Exception as e: conn.close() raise HTTPException(status_code=400, detail=str(e)) conn.close() return Response(status_code=303, headers={"Location": "/"}) @app.post("/remove", include_in_schema=False) def remove_site(site_id: int = Form(...)): conn = get_db_conn() cur = conn.cursor() cur.execute("UPDATE sites SET active = 0 WHERE id = ?", (site_id,)) conn.commit() conn.close() # supervisor will stop it return Response(status_code=303, headers={"Location": "/"}) # Admin endpoint to download DB (token protected) @app.get("/admin/download_db", include_in_schema=False) def admin_download_db(token: str): if token != ADMIN_TOKEN: raise HTTPException(status_code=401, detail="Unauthorized") if not os.path.exists(DB_FILE): raise HTTPException(status_code=404, detail="DB not found") # stream DB file def iterfile(): with open(DB_FILE, "rb") as f: while True: chunk = f.read(4096) if not chunk: break yield chunk return Response(iterfile(), media_type="application/octet-stream") # Small API for status (optional) @app.get("/api/sites") def api_sites(): conn = get_db_conn() cur = conn.cursor() cur.execute("SELECT * FROM sites WHERE active = 1") sites = [dict(r) for r in cur.fetchall()] for s in sites: cur.execute("SELECT * FROM pings WHERE site_id = ? ORDER BY timestamp DESC LIMIT 1", (s["id"],)) last = cur.fetchone() s["last_ping"] = dict(last) if last else None conn.close() return {"sites": sites}