Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import logging | |
| import time | |
| from dataclasses import asdict | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from fastapi import FastAPI, Header, HTTPException, Query, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, PlainTextResponse | |
| from PIL import Image | |
| import uvicorn | |
| from .api.schemas import ( | |
| AdminSettings, | |
| CancelResponse, | |
| DashboardStats, | |
| ExportRequest, | |
| ExportResponse, | |
| GenerateRequest, | |
| GenerateResponse, | |
| HealthResponse, | |
| HistoryItem, | |
| JobInfoResponse, | |
| MetricsResponse, | |
| ModelInfo, | |
| PresetPayload, | |
| PresetResponse, | |
| RetryResponse, | |
| ) | |
| from .core.config import ( | |
| ADMIN_TOKEN, | |
| CONTENT_PROFILE, | |
| CORS_ORIGINS, | |
| DEFAULT_BACKEND_HOST, | |
| DEFAULT_BACKEND_PORT, | |
| OUTPUT_DIR, | |
| OUTPUT_RETENTION_DAYS, | |
| REQUEST_MAX_BYTES, | |
| ) | |
| from .core.logging import setup_logging | |
| from .core.observability import MetricsStore | |
| from .core.policy import ContentPolicy, PolicyAuditStore | |
| from .core.security import ApiSecurity, Principal | |
| from .jobs.manager import JobManager | |
| from .providers.factory import ProviderRegistry | |
| from .storage.history import PromptHistoryStore | |
| from .storage.maintenance import cleanup_outputs | |
| from .storage.presets import PresetStore | |
| from .storage.settings import SettingsStore | |
| setup_logging() | |
| LOGGER = logging.getLogger(__name__) | |
| provider_registry = ProviderRegistry() | |
| history_store = PromptHistoryStore() | |
| job_manager = JobManager(provider_registry, history_store) | |
| policy_audit = PolicyAuditStore() | |
| api_security = ApiSecurity() | |
| preset_store = PresetStore() | |
| settings_store = SettingsStore() | |
| metrics = MetricsStore() | |
| cleanup_removed = cleanup_outputs(OUTPUT_RETENTION_DAYS) | |
| if cleanup_removed: | |
| LOGGER.info("Startup cleanup removed %s old output day folder(s)", cleanup_removed) | |
| app = FastAPI(title="ImageForge Backend", version="0.3.0") | |
| allow_origins = [item.strip() for item in CORS_ORIGINS.split(",")] if CORS_ORIGINS != "*" else ["*"] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=allow_origins, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def request_limits_middleware(request: Request, call_next): | |
| length = request.headers.get("content-length") | |
| if length and int(length) > REQUEST_MAX_BYTES: | |
| raise HTTPException(status_code=413, detail="Request too large") | |
| start = time.perf_counter() | |
| response = await call_next(request) | |
| elapsed_ms = (time.perf_counter() - start) * 1000.0 | |
| metrics.incr("http_requests_total") | |
| metrics.observe_ms("http_request", elapsed_ms) | |
| response.headers["X-Content-Type-Options"] = "nosniff" | |
| response.headers["X-Frame-Options"] = "DENY" | |
| response.headers["Referrer-Policy"] = "no-referrer" | |
| response.headers["Content-Security-Policy"] = "default-src 'self'" | |
| return response | |
| def _principal( | |
| http_request: Request, | |
| x_imageforge_api_key: str | None, | |
| minimum_role: str, | |
| ) -> Principal: | |
| api_security.limit = int(settings_store.get().get("rate_limit_per_minute", api_security.limit)) | |
| principal = api_security.authenticate( | |
| api_key=x_imageforge_api_key, | |
| client_id=http_request.client.host if http_request.client else "unknown", | |
| ) | |
| api_security.require_role(principal, minimum_role) | |
| return principal | |
| def health() -> HealthResponse: | |
| return HealthResponse(status="ok", timestamp=datetime.now(timezone.utc)) | |
| def ready() -> HealthResponse: | |
| try: | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| probe = OUTPUT_DIR / ".ready_probe" | |
| probe.write_text("ok", encoding="utf-8") | |
| probe.unlink(missing_ok=True) | |
| except Exception as exc: # noqa: BLE001 | |
| raise HTTPException(status_code=503, detail=f"Output directory not writable: {exc}") from exc | |
| return HealthResponse(status="ready", timestamp=datetime.now(timezone.utc)) | |
| def metrics_json( | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> MetricsResponse: | |
| _principal(http_request, x_imageforge_api_key, "viewer") | |
| return MetricsResponse(metrics=metrics.snapshot()) | |
| def metrics_prom( | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> PlainTextResponse: | |
| _principal(http_request, x_imageforge_api_key, "viewer") | |
| return PlainTextResponse(metrics.to_prometheus()) | |
| def models( | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> list[ModelInfo]: | |
| _principal(http_request, x_imageforge_api_key, "viewer") | |
| runtime = settings_store.get() | |
| adult_enabled = bool(runtime.get("adult_enabled", False)) | |
| return [ | |
| ModelInfo( | |
| id=provider.id, | |
| name=provider.name, | |
| description=provider.description, | |
| available=provider.is_available() if provider.id != "zimageturbo" else (adult_enabled and provider.is_available()), | |
| ) | |
| for provider in provider_registry.list() | |
| ] | |
| def generate( | |
| request: GenerateRequest, | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| x_imageforge_admin_token: str | None = Header(default=None, alias="X-ImageForge-Admin-Token"), | |
| ) -> GenerateResponse: | |
| _principal(http_request, x_imageforge_api_key, "operator") | |
| try: | |
| provider_registry.get(request.model) | |
| except KeyError as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) from exc | |
| runtime = settings_store.get() | |
| if request.model == "zimageturbo" and not bool(runtime.get("adult_enabled", False)): | |
| raise HTTPException(status_code=400, detail="Adult provider is disabled in admin settings") | |
| policy = ContentPolicy(runtime.get("content_profile", CONTENT_PROFILE)) | |
| admin_override_applied = bool( | |
| request.admin_override and ADMIN_TOKEN and x_imageforge_admin_token == ADMIN_TOKEN | |
| ) | |
| decision = policy.evaluate( | |
| f"{request.prompt}\n{request.negative_prompt}", | |
| admin_override=admin_override_applied, | |
| ) | |
| policy_audit.write( | |
| prompt=request.prompt, | |
| negative_prompt=request.negative_prompt, | |
| profile=runtime.get("content_profile", CONTENT_PROFILE), | |
| decision=decision, | |
| client_ip=http_request.client.host if http_request.client else "unknown", | |
| model=request.model, | |
| admin_override_requested=request.admin_override, | |
| admin_override_applied=admin_override_applied, | |
| ) | |
| if not decision.allowed: | |
| raise HTTPException(status_code=400, detail=f"Blocked by policy: {decision.reason}") | |
| metrics.incr("jobs_submitted_total") | |
| job_id = job_manager.submit(request) | |
| return GenerateResponse(job_id=job_id) | |
| def job_status( | |
| job_id: str, | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> JobInfoResponse: | |
| _principal(http_request, x_imageforge_api_key, "viewer") | |
| state = job_manager.get(job_id) | |
| if state is None: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| return _to_job_response(state) | |
| def list_jobs( | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> list[JobInfoResponse]: | |
| _principal(http_request, x_imageforge_api_key, "viewer") | |
| return [_to_job_response(state) for state in job_manager.list()] | |
| def get_image(path: str = Query(...)) -> FileResponse: | |
| try: | |
| source = Path(path).resolve(strict=True) | |
| except FileNotFoundError as exc: | |
| raise HTTPException(status_code=404, detail="Image not found") from exc | |
| output_root = OUTPUT_DIR.resolve() | |
| if source != output_root and output_root not in source.parents: | |
| raise HTTPException(status_code=403, detail="Forbidden source path") | |
| if not source.is_file(): | |
| raise HTTPException(status_code=404, detail="Image not found") | |
| return FileResponse(source) | |
| def retry_job( | |
| job_id: str, | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> RetryResponse: | |
| _principal(http_request, x_imageforge_api_key, "operator") | |
| new_id = job_manager.retry(job_id) | |
| if not new_id: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| metrics.incr("jobs_retry_total") | |
| return RetryResponse(old_job_id=job_id, new_job_id=new_id) | |
| def _to_job_response(state) -> JobInfoResponse: # noqa: ANN001 | |
| return JobInfoResponse( | |
| job_id=state.job_id, | |
| status=state.status, | |
| progress=state.progress, | |
| message=state.message, | |
| created_at=state.created_at, | |
| updated_at=state.updated_at, | |
| image_paths=state.image_paths, | |
| output_dir=state.output_dir, | |
| error=state.error, | |
| ) | |
| def cancel( | |
| job_id: str, | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> CancelResponse: | |
| _principal(http_request, x_imageforge_api_key, "operator") | |
| success = job_manager.cancel(job_id) | |
| if not success: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| metrics.incr("jobs_cancel_total") | |
| return CancelResponse(success=True) | |
| def history( | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> list[HistoryItem]: | |
| _principal(http_request, x_imageforge_api_key, "viewer") | |
| items = history_store.list() | |
| parsed: list[HistoryItem] = [] | |
| for item in items: | |
| try: | |
| parsed.append( | |
| HistoryItem( | |
| prompt=item.get("prompt", ""), | |
| negative_prompt=item.get("negative_prompt", ""), | |
| timestamp=datetime.fromisoformat(item.get("timestamp", "")), | |
| ) | |
| ) | |
| except Exception: # noqa: BLE001 | |
| continue | |
| return parsed | |
| def dashboard_stats( | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> DashboardStats: | |
| _principal(http_request, x_imageforge_api_key, "viewer") | |
| return DashboardStats(**job_manager.stats()) | |
| def list_presets( | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> list[PresetResponse]: | |
| _principal(http_request, x_imageforge_api_key, "viewer") | |
| return [PresetResponse(**asdict(preset)) for preset in preset_store.list()] | |
| def upsert_preset( | |
| payload: PresetPayload, | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> PresetResponse: | |
| _principal(http_request, x_imageforge_api_key, "operator") | |
| preset = preset_store.upsert(payload.model_dump()) | |
| return PresetResponse(**asdict(preset)) | |
| def delete_preset( | |
| name: str, | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> CancelResponse: | |
| _principal(http_request, x_imageforge_api_key, "admin") | |
| ok = preset_store.delete(name) | |
| if not ok: | |
| raise HTTPException(status_code=404, detail="Preset not found") | |
| return CancelResponse(success=True) | |
| def export_image( | |
| payload: ExportRequest, | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> ExportResponse: | |
| _principal(http_request, x_imageforge_api_key, "operator") | |
| source = Path(payload.source_path).resolve() | |
| output_root = OUTPUT_DIR.resolve() | |
| if output_root not in source.parents: | |
| raise HTTPException(status_code=403, detail="Forbidden source path") | |
| if not source.exists() or not source.is_file(): | |
| raise HTTPException(status_code=404, detail="Source image not found") | |
| img = Image.open(source).convert("RGB") | |
| if payload.max_width or payload.max_height: | |
| max_w = payload.max_width or img.width | |
| max_h = payload.max_height or img.height | |
| img.thumbnail((max_w, max_h)) | |
| out_path = source.with_name(f"{source.stem}_export.{payload.format}") | |
| save_kwargs: dict[str, int] = {} | |
| if payload.format in {"jpg", "webp"}: | |
| save_kwargs["quality"] = payload.quality | |
| img.save(out_path, format=payload.format.upper(), **save_kwargs) | |
| return ExportResponse(output_path=str(out_path.resolve())) | |
| def get_admin_settings( | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> AdminSettings: | |
| principal = _principal(http_request, x_imageforge_api_key, "admin") | |
| current = settings_store.get() | |
| metrics.incr(f"admin_settings_read_by_{principal.role}") | |
| return AdminSettings(**current) | |
| def put_admin_settings( | |
| payload: AdminSettings, | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> AdminSettings: | |
| principal = _principal(http_request, x_imageforge_api_key, "admin") | |
| current = settings_store.update(payload.model_dump(), actor=principal.client_id) | |
| return AdminSettings(**current) | |
| def cleanup_endpoint( | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ) -> DashboardStats: | |
| _principal(http_request, x_imageforge_api_key, "admin") | |
| runtime = settings_store.get() | |
| cleanup_outputs(int(runtime.get("output_retention_days", OUTPUT_RETENTION_DAYS))) | |
| return DashboardStats(**job_manager.stats()) | |
| def output_file( | |
| relative_path: str, | |
| http_request: Request, | |
| x_imageforge_api_key: str | None = Header(default=None, alias="X-ImageForge-Api-Key"), | |
| ): | |
| _principal(http_request, x_imageforge_api_key, "viewer") | |
| path = (Path.cwd() / relative_path).resolve() | |
| output_root = OUTPUT_DIR.resolve() | |
| if output_root not in path.parents and path != output_root: | |
| raise HTTPException(status_code=403, detail="Forbidden") | |
| if not path.exists() or not path.is_file(): | |
| raise HTTPException(status_code=404, detail="File not found") | |
| return FileResponse(path) | |
| def run() -> None: | |
| uvicorn.run( | |
| "backend.app.main:app", | |
| host=DEFAULT_BACKEND_HOST, | |
| port=DEFAULT_BACKEND_PORT, | |
| reload=False, | |
| log_level="info", | |
| ) | |
| if __name__ == "__main__": | |
| LOGGER.info("Starting ImageForge backend") | |
| run() | |