"""Entry point for Kashikogi Async Space.""" import json from typing import Optional import gradio as gr import uvicorn from fastapi import FastAPI, HTTPException, Request, status from pydantic import ValidationError from kashikogi_app.core.config import get_settings from kashikogi_app.core.jobs import VideoJobManager from kashikogi_app.core.logging_config import configure_logging from kashikogi_app.gradio_interface import build_demo from kashikogi_app.models.audio import AudioUploadRequest, AudioUploadResponse from kashikogi_app.models.video import ( VideoJobStatusResponse, VideoRenderRequest, VideoRenderResponse, ) from kashikogi_app.services.audio import store_audio from kashikogi_app.services.callbacks import notify_dify_workflow from kashikogi_app.services.video import render_video_job configure_logging() settings = get_settings() job_manager = VideoJobManager( worker_count=settings.video_jobs_max_workers, cleanup_ttl_seconds=settings.video_job_cleanup_seconds, job_handler=render_video_job, callback_handler=notify_dify_workflow, ) demo = build_demo(job_manager) api = FastAPI( title="Kashikogi Async Space API", description="Audio storage and video rendering endpoints.", version="0.1.0", ) def _extract_header_token(request: Request) -> Optional[str]: auth = request.headers.get("Authorization") if not auth: return None if auth.lower().startswith("bearer "): return auth[7:].strip() return auth.strip() def _ensure_authorised(token: Optional[str]) -> None: if not settings.callback_token: return if not token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing callback token") if token != settings.callback_token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid callback token") @api.post("/audio/upload", response_model=AudioUploadResponse) async def upload_audio_endpoint(request: Request) -> AudioUploadResponse: try: body_data = await request.json() except json.JSONDecodeError as exc: # noqa: F841 raise HTTPException(status_code=400, detail="Invalid JSON payload") from exc if not isinstance(body_data, dict): raise HTTPException(status_code=400, detail="JSON object expected") inline_token = body_data.pop("callback_token", None) token = _extract_header_token(request) or inline_token _ensure_authorised(token) try: audio_request = AudioUploadRequest(**body_data) return store_audio(audio_request) except ValidationError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc except HTTPException: raise except Exception as exc: # noqa: BLE001 raise HTTPException(status_code=500, detail=str(exc)) from exc @api.post( "/video/render", response_model=VideoRenderResponse, status_code=status.HTTP_202_ACCEPTED, ) async def render_video_endpoint(request: Request) -> VideoRenderResponse: """ 柔軟なリクエスト処理: - 正しいJSONオブジェクト - JSON文字列(Dify RAW-TEXTモードからエスケープされたもの) の両方を受け付ける """ try: # リクエストボディを文字列として取得 body_bytes = await request.body() body_str = body_bytes.decode("utf-8") # まずJSONとしてパース body_data = json.loads(body_str) # body_dataが文字列の場合、さらにパース(二重エンコード対策) if isinstance(body_data, str): body_data = json.loads(body_data) if not isinstance(body_data, dict): raise HTTPException(status_code=400, detail="JSON object expected") header_token = _extract_header_token(request) inline_token = body_data.get("callback_token") _ensure_authorised(header_token or inline_token) # Pydanticモデルに変換 video_request = VideoRenderRequest(**body_data) video_request.callback_token = None except json.JSONDecodeError as exc: raise HTTPException( status_code=400, detail=f"Invalid JSON: {str(exc)}" ) from exc except Exception as exc: raise HTTPException( status_code=400, detail=f"Invalid request format: {str(exc)}" ) from exc if not video_request.slides: raise HTTPException(status_code=400, detail="slides must not be empty") try: job_id = job_manager.submit(video_request) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc except Exception as exc: # noqa: BLE001 raise HTTPException(status_code=500, detail=str(exc)) from exc return VideoRenderResponse(status="accepted", job_id=job_id, message="Job queued") @api.get("/jobs/{job_id}", response_model=VideoJobStatusResponse) def get_job_status(job_id: str) -> VideoJobStatusResponse: status_payload = job_manager.get(job_id) if not status_payload: raise HTTPException(status_code=404, detail="Job not found") return status_payload @api.get("/health") def health_check() -> dict[str, str]: return {"status": "ok"} app = gr.mount_gradio_app(api, demo, path="/") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)