LIFETIMECHECK / app.py
triflix's picture
Create app.py
ff2fd7f verified
# app.py
import os
import sqlite3
import threading
import time
import subprocess
import signal
from typing import List, Dict, Any
from fastapi import FastAPI, Request, Form, HTTPException, Depends, Response
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from huggingface_hub import HfApi, hf_hub_download
import pathlib
# Config from env
HF_TOKEN = os.environ.get("HF_TOKEN")
HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO") # e.g. "username/sqlite-backup"
ADMIN_TOKEN = os.environ.get("ADMIN_TOKEN", "change-me")
DB_FILE = "/tmp/app.db"
BACKUP_INTERVAL_SECONDS = int(os.environ.get("BACKUP_INTERVAL_SECONDS", 3 * 3600)) # default 3 hours
PING_INTERVAL_SECONDS = int(os.environ.get("PING_INTERVAL_SECONDS", 15 * 60)) # 15 minutes default
templates = Jinja2Templates(directory="templates")
app = FastAPI(docs_url=None, redoc_url=None) # disable public docs by default
# Ensure static dir exists if used
if os.path.isdir("static"):
app.mount("/static", StaticFiles(directory="static"), name="static")
# Database helpers
def get_db_conn():
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_db_conn()
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS sites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL UNIQUE,
label TEXT,
active INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)""")
cur.execute("""
CREATE TABLE IF NOT EXISTS pings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
site_id INTEGER NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status_code INTEGER,
latency_ms INTEGER,
error TEXT,
FOREIGN KEY(site_id) REFERENCES sites(id) ON DELETE CASCADE
)""")
conn.commit()
conn.close()
# Restore DB from Hugging Face dataset repo if exists
def restore_db_from_hub():
if not HF_TOKEN or not HF_DATASET_REPO:
print("HF_TOKEN or HF_DATASET_REPO not set; skipping restore.")
return
print("Attempting to restore DB from Hugging Face Hub:", HF_DATASET_REPO)
try:
path = hf_hub_download(repo_id=HF_DATASET_REPO, filename="app.db", repo_type="dataset", token=HF_TOKEN)
# copy into /tmp/app.db (hf_hub_download returns a path in the local cache)
import shutil
shutil.copy(path, DB_FILE)
print("Restored DB from hub to", DB_FILE)
except Exception as e:
print("No remote DB found or download failed:", e)
# Backup uploader
def upload_db_to_hub():
if not HF_TOKEN or not HF_DATASET_REPO:
print("HF_TOKEN or HF_DATASET_REPO not set; skipping upload.")
return
print("Uploading DB to Hugging Face Hub:", HF_DATASET_REPO)
api = HfApi(token=HF_TOKEN)
try:
api.upload_file(
path_or_fileobj=DB_FILE,
path_in_repo="app.db",
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN,
repo_type_arg="dataset"
)
print("Upload successful")
except Exception as e:
print("Failed to upload DB:", e)
def backup_loop():
# Periodic backing up thread
while True:
try:
if os.path.exists(DB_FILE):
upload_db_to_hub()
except Exception as e:
print("Backup loop error:", e)
time.sleep(BACKUP_INTERVAL_SECONDS)
# Supervisor that spawns worker subprocess per site
class Supervisor:
def __init__(self):
self.procs = {} # site_id -> subprocess.Popen
self.lock = threading.Lock()
self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True)
self.running = True
self.monitor_thread.start()
def spawn_worker(self, site_id: int):
with self.lock:
if site_id in self.procs and self.procs[site_id].poll() is None:
# already running
return
cmd = ["python", "worker.py", str(site_id), DB_FILE, str(PING_INTERVAL_SECONDS)]
print("Spawning worker:", cmd)
proc = subprocess.Popen(cmd)
self.procs[site_id] = proc
def stop_worker(self, site_id: int):
with self.lock:
p = self.procs.get(site_id)
if p and p.poll() is None:
p.terminate()
try:
p.wait(timeout=5)
except Exception:
p.kill()
self.procs.pop(site_id, None)
def monitor_loop(self):
while self.running:
try:
conn = get_db_conn()
cur = conn.cursor()
cur.execute("SELECT id FROM sites WHERE active = 1")
rows = cur.fetchall()
active_ids = {r["id"] for r in rows}
# spawn missing workers
for sid in active_ids:
if sid not in self.procs or self.procs[sid].poll() is not None:
self.spawn_worker(sid)
# stop workers for removed sites
for sid in list(self.procs.keys()):
if sid not in active_ids:
self.stop_worker(sid)
conn.close()
except Exception as e:
print("Supervisor monitor loop error:", e)
time.sleep(10)
def shutdown(self):
self.running = False
with self.lock:
for sid, p in list(self.procs.items()):
try:
p.terminate()
except Exception:
pass
supervisor = None
@app.on_event("startup")
def startup_event():
# restore DB from hub if possible
restore_db_from_hub()
# ensure DB tables exist
init_db()
# start backup thread
t = threading.Thread(target=backup_loop, daemon=True)
t.start()
# start supervisor
global supervisor
supervisor = Supervisor()
print("Startup complete.")
@app.on_event("shutdown")
def shutdown_event():
global supervisor
if supervisor:
supervisor.shutdown()
# do a final backup attempt
try:
if os.path.exists(DB_FILE):
upload_db_to_hub()
except Exception as e:
print("Final backup failed:", e)
print("Shutdown complete.")
# --- API & UI routes ---
@app.get("/", include_in_schema=False)
def index(request: Request):
conn = get_db_conn()
cur = conn.cursor()
cur.execute("SELECT * FROM sites ORDER BY created_at DESC")
sites = cur.fetchall()
# latest ping for each site
data = []
for s in sites:
cur.execute("SELECT * FROM pings WHERE site_id = ? ORDER BY timestamp DESC LIMIT 1", (s["id"],))
last = cur.fetchone()
data.append({"site": s, "last": last})
conn.close()
return templates.TemplateResponse("index.html", {"request": request, "sites": data})
@app.post("/add", include_in_schema=False)
def add_site(url: str = Form(...), label: str = Form(None)):
conn = get_db_conn()
cur = conn.cursor()
try:
cur.execute("INSERT OR IGNORE INTO sites (url, label) VALUES (?, ?)", (url.strip(), label))
conn.commit()
# spawn immediately
cur.execute("SELECT id FROM sites WHERE url = ?", (url.strip(),))
row = cur.fetchone()
if row:
sid = row["id"]
supervisor.spawn_worker(sid)
except Exception as e:
conn.close()
raise HTTPException(status_code=400, detail=str(e))
conn.close()
return Response(status_code=303, headers={"Location": "/"})
@app.post("/remove", include_in_schema=False)
def remove_site(site_id: int = Form(...)):
conn = get_db_conn()
cur = conn.cursor()
cur.execute("UPDATE sites SET active = 0 WHERE id = ?", (site_id,))
conn.commit()
conn.close()
# supervisor will stop it
return Response(status_code=303, headers={"Location": "/"})
# Admin endpoint to download DB (token protected)
@app.get("/admin/download_db", include_in_schema=False)
def admin_download_db(token: str):
if token != ADMIN_TOKEN:
raise HTTPException(status_code=401, detail="Unauthorized")
if not os.path.exists(DB_FILE):
raise HTTPException(status_code=404, detail="DB not found")
# stream DB file
def iterfile():
with open(DB_FILE, "rb") as f:
while True:
chunk = f.read(4096)
if not chunk:
break
yield chunk
return Response(iterfile(), media_type="application/octet-stream")
# Small API for status (optional)
@app.get("/api/sites")
def api_sites():
conn = get_db_conn()
cur = conn.cursor()
cur.execute("SELECT * FROM sites WHERE active = 1")
sites = [dict(r) for r in cur.fetchall()]
for s in sites:
cur.execute("SELECT * FROM pings WHERE site_id = ? ORDER BY timestamp DESC LIMIT 1", (s["id"],))
last = cur.fetchone()
s["last_ping"] = dict(last) if last else None
conn.close()
return {"sites": sites}