from __future__ import annotations import json import threading import time import uuid from pathlib import Path from typing import Any import os from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse, HTMLResponse from pydantic import BaseModel from app import _api_quick_check, _run_agentic_pipeline from hydradeck.clients.grok_client import GrokClient class RunRequest(BaseModel): topic: str model: str = "grok-3-mini" base_url: str = "https://api.example.com" api_key: str = "" request_budget: float = 30.0 use_mock: bool = False language: str = "en" model_scope: str = "" model_structure: str = "" model_planner: str = "" model_section: str = "" model_paper: str = "" model_slides: str = "" JOBS: dict[str, dict[str, Any]] = {} LOCK = threading.Lock() STATE_PATH = Path("/tmp/hydradeck_state.json") HISTORY_LIMIT = 40 app = FastAPI(title="HydraDeck") def _load_state() -> None: if not STATE_PATH.exists(): return try: data = json.loads(STATE_PATH.read_text(encoding="utf-8")) except Exception: return jobs = data.get("jobs") if isinstance(jobs, dict): with LOCK: JOBS.update({str(k): v for k, v in jobs.items() if isinstance(v, dict)}) def _save_state() -> None: with LOCK: payload = {"jobs": JOBS} STATE_PATH.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8") def _prune_history() -> None: with LOCK: items = sorted( JOBS.items(), key=lambda kv: float(kv[1].get("updated_at", 0.0)), reverse=True, ) keep = dict(items[:HISTORY_LIMIT]) JOBS.clear() JOBS.update(keep) _load_state() def _new_job(req: RunRequest) -> dict[str, Any]: now = time.time() return { "id": str(uuid.uuid4()), "status": "queued", "created_at": now, "updated_at": now, "progress": 0, "status_text": "Queued", "progress_log": "", "scope": "", "sections": "", "paper": "", "slides": "", "pdf_paths": "", "paper_pdf": "", "slides_pdf": "", "error": "", "events": [], "params": req.model_dump(), } def _update_job(job_id: str, updates: dict[str, Any]) -> None: with LOCK: job = JOBS.get(job_id) if not job: return job.update(updates) job["updated_at"] = time.time() _prune_history() _save_state() def _append_event(job_id: str, event: dict[str, Any]) -> None: with LOCK: job = JOBS.get(job_id) if not job: return events = job.get("events") if isinstance(events, list): events.append(event) _save_state() def _run_job(job_id: str, req: RunRequest) -> None: _update_job(job_id, {"status": "running", "status_text": "Running"}) def on_stage(payload: dict[str, Any]) -> None: _update_job( job_id, { "status": "running", "status_text": str(payload.get("status", "Running")), "progress": int(str(payload.get("progress", "0"))), "progress_log": str(payload.get("progress_log", "")), "scope": str(payload.get("scope", "")), "sections": str(payload.get("sections", "")), "paper": str(payload.get("paper", "")), "slides": str(payload.get("slides", "")), "pdf_paths": str(payload.get("pdf_paths", "")), "paper_pdf": str(payload.get("paper_pdf", "")), "slides_pdf": str(payload.get("slides_pdf", "")), }, ) _append_event( job_id, { "ts": time.time(), "stage": str(payload.get("stage", "")), "detail": str(payload.get("detail", "")), "progress": int(str(payload.get("progress", "0"))), }, ) try: ( status, progress_log, scope, sections, paper, slides, pdf_paths, paper_pdf, slides_pdf, ) = _run_agentic_pipeline( topic=req.topic, model=req.model, base_url=req.base_url, api_key=req.api_key, request_budget=req.request_budget, use_mock=req.use_mock, progress=None, stage_callback=on_stage, language=req.language, stage_models={ "scope": req.model_scope, "structure": req.model_structure, "planner": req.model_planner, "section": req.model_section, "paper": req.model_paper, "slides": req.model_slides, }, ) _update_job( job_id, { "status": "done", "status_text": status, "progress": 100, "progress_log": progress_log, "scope": scope, "sections": sections, "paper": paper, "slides": slides, "pdf_paths": pdf_paths, "paper_pdf": paper_pdf, "slides_pdf": slides_pdf, }, ) except Exception as exc: _update_job( job_id, { "status": "error", "status_text": "Failed", "error": str(exc), }, ) @app.get("/", response_class=HTMLResponse) def index() -> str: return """ HydraDeck
HydraDeck
Advanced model routing
Per-agent model overrides (optional)
Idle
0%

Scope

Sections

paper.tex

slides.tex

Progress

Events

""" @app.post("/api/quick-check") def api_quick_check(req: RunRequest) -> dict[str, str]: result = _api_quick_check(req.base_url, req.api_key, req.model, req.request_budget) return {"result": result} @app.post("/api/jobs") def create_job(req: RunRequest) -> dict[str, str]: if not req.topic.strip(): raise HTTPException(status_code=400, detail="topic is required") job = _new_job(req) with LOCK: JOBS[job["id"]] = job _prune_history() _save_state() t = threading.Thread(target=_run_job, args=(job["id"], req), daemon=True) t.start() return {"id": job["id"]} @app.get("/api/history") def get_history() -> dict[str, Any]: with LOCK: items = sorted( JOBS.values(), key=lambda j: float(j.get("updated_at", 0.0)), reverse=True, ) rows = [ { "id": j.get("id"), "status": j.get("status"), "progress": j.get("progress"), "topic": (j.get("params") or {}).get("topic", ""), "updated_at": j.get("updated_at"), } for j in items[:HISTORY_LIMIT] ] return {"items": rows} @app.get("/api/models") def get_models(base_url: str, api_key: str = "") -> dict[str, Any]: try: cli = GrokClient(base_url=base_url, api_key=api_key, model="grok-3-mini", timeout_s=20.0, max_retries=1) models = cli.list_models(timeout_s=20.0) return {"models": models} except Exception as exc: return {"models": [], "error": str(exc)} @app.get("/api/jobs/{job_id}") def get_job(job_id: str) -> dict[str, Any]: with LOCK: job = JOBS.get(job_id) if not job: raise HTTPException(status_code=404, detail="job not found") return dict(job) @app.get("/api/jobs/{job_id}/artifact/{kind}") def get_artifact(job_id: str, kind: str): with LOCK: job = JOBS.get(job_id) if not job: raise HTTPException(status_code=404, detail="job not found") if kind == "paper": path = str(job.get("paper_pdf", "")) filename = "paper.pdf" elif kind == "slides": path = str(job.get("slides_pdf", "")) filename = "slides.pdf" else: raise HTTPException(status_code=400, detail="kind must be paper|slides") p = Path(path) if not path or not p.exists(): raise HTTPException(status_code=404, detail="artifact not ready") return FileResponse(str(p), media_type="application/pdf", filename=filename) if __name__ == "__main__": import uvicorn _load_state() port = int(os.getenv("PORT", "7861")) uvicorn.run(app, host="0.0.0.0", port=port)