fromozu's picture
Upload hf_backend/app.py with huggingface_hub
7c4dd4a verified
from __future__ import annotations
import asyncio
from contextlib import asynccontextmanager
from typing import Any
from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
from hf_backend.config import load_config
from hf_backend.fetcher import FetchError
from hf_backend.hub_store import HubStore
from hf_backend.service import QueueProcessor
def _require_shared_token(request: Request, shared_token: str) -> None:
auth_header = request.headers.get("Authorization", "")
expected = f"Bearer {shared_token}"
if auth_header != expected:
raise HTTPException(status_code=401, detail="unauthorized")
def create_app(*, config=None, store=None, processor=None) -> FastAPI:
runtime_config = config or load_config()
runtime_store = store
if runtime_store is None and processor is None:
runtime_store = HubStore(runtime_config)
runtime_processor = processor or QueueProcessor(runtime_config, runtime_store)
@asynccontextmanager
async def lifespan(_app: FastAPI):
if processor is not None:
yield
return
assert runtime_store is not None
runtime_store.ensure_dataset_repo()
manifest = runtime_store.load_manifest()
runtime_store.save_manifest(manifest, message="Initialize manifest")
yield
app = FastAPI(title="ebook-hf-executor", lifespan=lifespan)
@app.get("/health")
def health() -> dict[str, Any]:
manifest = runtime_processor.current_manifest()
return {
"ok": True,
"dataset_repo_id": getattr(runtime_config, "dataset_repo_id", ""),
"busy": runtime_processor.is_running(),
"pending_jobs": sum(1 for job in manifest["jobs"] if job.get("status") == "pending"),
}
@app.post("/api/upload")
async def upload_job(
request: Request,
file: UploadFile = File(...),
language: str | None = Form(None),
model: str | None = Form(None),
) -> dict[str, Any]:
_require_shared_token(request, runtime_config.shared_token)
if not file.filename:
raise HTTPException(status_code=400, detail="missing file name")
content = await file.read()
if not content:
raise HTTPException(status_code=400, detail="empty file")
job = runtime_processor.submit_upload(
filename=file.filename,
content=content,
language=language,
model=model,
origin="upload",
)
return {"ok": True, "job": job}
@app.post("/api/fetch-book")
async def fetch_book(request: Request) -> dict[str, Any]:
_require_shared_token(request, runtime_config.shared_token)
payload = await request.json() if request.headers.get("content-type", "").startswith("application/json") else {}
payload = payload if isinstance(payload, dict) else {}
query = str(payload.get("query") or "").strip()
if not query:
raise HTTPException(status_code=400, detail="missing query")
try:
job = await asyncio.to_thread(runtime_processor.submit_fetch, query=query)
except FetchError as exc:
raise HTTPException(status_code=404, detail=str(exc))
started = runtime_processor.trigger_background_run()
return {"ok": True, "started": started, "job": job}
@app.post("/api/run-once")
def run_once(request: Request) -> dict[str, Any]:
_require_shared_token(request, runtime_config.shared_token)
started = runtime_processor.trigger_background_run()
return {"ok": True, "started": started}
@app.post("/api/download-weekly")
def download_weekly(request: Request) -> dict[str, Any]:
_require_shared_token(request, runtime_config.shared_token)
job = runtime_processor.submit_weekly_download()
return {"ok": True, "job": job}
return app
try:
app = create_app()
except Exception:
app = FastAPI(title="ebook-hf-executor")
@app.get("/health")
def health_unconfigured() -> dict[str, Any]:
return {
"ok": False,
"error": "service is not configured yet",
"pending_jobs": 0,
}