"""Extensive generation (researcher → creative director → designer → copywriter).""" import asyncio import uuid from typing import Dict, Any, Optional from fastapi import APIRouter, HTTPException, Depends from api.schemas import ( ExtensiveGenerateRequest, ExtensiveJobResponse, BatchResponse, InventOnlyRequest, InventOnlyResponse, InventedEssentialSchema, ) from services.generator import ad_generator from services.auth_dependency import get_current_user router = APIRouter(tags=["extensive"]) _extensive_jobs: Dict[str, Dict[str, Any]] = {} async def _run_extensive_job_async( job_id: str, username: str, effective_niche: str, target_audience: Optional[str], offer: Optional[str], num_images: int, image_model: Optional[str], num_strategies: int, use_creative_inventor: bool = True, trend_context: Optional[str] = None, ): """Run extensive generation on the main event loop.""" import logging api_logger = logging.getLogger("api") try: results = await ad_generator.generate_ad_extensive( niche=effective_niche, target_audience=target_audience, offer=offer, num_images=num_images, image_model=image_model, num_strategies=num_strategies, username=username, use_creative_inventor=use_creative_inventor, trend_context=trend_context, ) _extensive_jobs[job_id]["status"] = "completed" _extensive_jobs[job_id]["result"] = BatchResponse(count=len(results), ads=results) except Exception as e: api_logger.exception("Extensive job %s failed", job_id) _extensive_jobs[job_id]["status"] = "failed" _extensive_jobs[job_id]["error"] = str(e) @router.post("/extensive/generate", status_code=202) async def generate_extensive( request: ExtensiveGenerateRequest, username: str = Depends(get_current_user), ): """ Start extensive ad generation. Returns 202 with job_id. Poll GET /extensive/status/{job_id} then GET /extensive/result/{job_id}. """ if request.niche == "others": if not request.custom_niche or not request.custom_niche.strip(): raise HTTPException(status_code=400, detail="custom_niche is required when niche is 'others'") effective_niche = request.custom_niche.strip() else: effective_niche = request.niche job_id = str(uuid.uuid4()) _extensive_jobs[job_id] = { "status": "running", "result": None, "error": None, "username": username, } asyncio.create_task( _run_extensive_job_async( job_id, username, effective_niche, request.target_audience, request.offer, request.num_images, request.image_model, request.num_strategies, getattr(request, "use_creative_inventor", True), getattr(request, "trend_context", None), ) ) return ExtensiveJobResponse(job_id=job_id) @router.get("/extensive/status/{job_id}") async def extensive_job_status( job_id: str, username: str = Depends(get_current_user), ): """Get status of an extensive generation job.""" if job_id not in _extensive_jobs: raise HTTPException(status_code=404, detail="Job not found") job = _extensive_jobs[job_id] if job["username"] != username: raise HTTPException(status_code=404, detail="Job not found") return { "job_id": job_id, "status": job["status"], "error": job.get("error") if job["status"] == "failed" else None, } @router.get("/extensive/result/{job_id}", response_model=BatchResponse) async def extensive_job_result( job_id: str, username: str = Depends(get_current_user), ): """Get result of a completed extensive generation job. 425 if still running.""" if job_id not in _extensive_jobs: raise HTTPException(status_code=404, detail="Job not found") job = _extensive_jobs[job_id] if job["username"] != username: raise HTTPException(status_code=404, detail="Job not found") if job["status"] == "running": raise HTTPException(status_code=425, detail="Generation still in progress") if job["status"] == "failed": raise HTTPException(status_code=500, detail=job.get("error", "Generation failed")) return job["result"] @router.post("/extensive/invent", response_model=InventOnlyResponse) async def invent_only( request: InventOnlyRequest, username: str = Depends(get_current_user), ): """ Invent new ad angles, concepts, visuals, and psychological triggers only (no ad generation). Returns structured essentials for review, export, or later use in generation. """ from services.creative_inventor import creative_inventor_service niche_display = request.niche.replace("_", " ").title() offer = request.offer or f"Get the best {niche_display} solution" essentials = await asyncio.to_thread( creative_inventor_service.invent, niche=niche_display, offer=offer, n=request.n, target_audience_hint=request.target_audience, trend_context=request.trend_context, ) schema_list = [ InventedEssentialSchema( psychology_trigger=e.psychology_trigger, angles=e.angles, concepts=e.concepts, visual_directions=e.visual_directions, hooks=getattr(e, "hooks", []) or [], visual_styles=getattr(e, "visual_styles", []) or [], target_audience=getattr(e, "target_audience", "") or "", ) for e in essentials ] export_text = None if request.export_as_text and essentials: export_text = await asyncio.to_thread( creative_inventor_service.invent_and_export, niche=niche_display, offer=offer, essentials=essentials, target_audience_hint=request.target_audience, ) return InventOnlyResponse(essentials=schema_list, export_text=export_text)