from __future__ import annotations import asyncio from contextlib import asynccontextmanager from typing import Any from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile from hf_backend.config import load_config from hf_backend.fetcher import FetchError from hf_backend.hub_store import HubStore from hf_backend.service import QueueProcessor def _require_shared_token(request: Request, shared_token: str) -> None: auth_header = request.headers.get("Authorization", "") expected = f"Bearer {shared_token}" if auth_header != expected: raise HTTPException(status_code=401, detail="unauthorized") def create_app(*, config=None, store=None, processor=None) -> FastAPI: runtime_config = config or load_config() runtime_store = store if runtime_store is None and processor is None: runtime_store = HubStore(runtime_config) runtime_processor = processor or QueueProcessor(runtime_config, runtime_store) @asynccontextmanager async def lifespan(_app: FastAPI): if processor is not None: yield return assert runtime_store is not None runtime_store.ensure_dataset_repo() manifest = runtime_store.load_manifest() runtime_store.save_manifest(manifest, message="Initialize manifest") yield app = FastAPI(title="ebook-hf-executor", lifespan=lifespan) @app.get("/health") def health() -> dict[str, Any]: manifest = runtime_processor.current_manifest() return { "ok": True, "dataset_repo_id": getattr(runtime_config, "dataset_repo_id", ""), "busy": runtime_processor.is_running(), "pending_jobs": sum(1 for job in manifest["jobs"] if job.get("status") == "pending"), } @app.post("/api/upload") async def upload_job( request: Request, file: UploadFile = File(...), language: str | None = Form(None), model: str | None = Form(None), ) -> dict[str, Any]: _require_shared_token(request, runtime_config.shared_token) if not file.filename: raise HTTPException(status_code=400, detail="missing file name") content = await file.read() if not content: raise HTTPException(status_code=400, detail="empty file") job = runtime_processor.submit_upload( filename=file.filename, content=content, language=language, model=model, origin="upload", ) return {"ok": True, "job": job} @app.post("/api/fetch-book") async def fetch_book(request: Request) -> dict[str, Any]: _require_shared_token(request, runtime_config.shared_token) payload = await request.json() if request.headers.get("content-type", "").startswith("application/json") else {} payload = payload if isinstance(payload, dict) else {} query = str(payload.get("query") or "").strip() if not query: raise HTTPException(status_code=400, detail="missing query") try: job = await asyncio.to_thread(runtime_processor.submit_fetch, query=query) except FetchError as exc: raise HTTPException(status_code=404, detail=str(exc)) started = runtime_processor.trigger_background_run() return {"ok": True, "started": started, "job": job} @app.post("/api/run-once") def run_once(request: Request) -> dict[str, Any]: _require_shared_token(request, runtime_config.shared_token) started = runtime_processor.trigger_background_run() return {"ok": True, "started": started} @app.post("/api/download-weekly") def download_weekly(request: Request) -> dict[str, Any]: _require_shared_token(request, runtime_config.shared_token) job = runtime_processor.submit_weekly_download() return {"ok": True, "job": job} return app try: app = create_app() except Exception: app = FastAPI(title="ebook-hf-executor") @app.get("/health") def health_unconfigured() -> dict[str, Any]: return { "ok": False, "error": "service is not configured yet", "pending_jobs": 0, }