sushilideaclan01's picture
add new things in the extensive flow
026f283
"""Extensive generation (researcher → creative director → designer → copywriter)."""
import asyncio
import uuid
from typing import Dict, Any, Optional
from fastapi import APIRouter, HTTPException, Depends
from api.schemas import (
ExtensiveGenerateRequest,
ExtensiveJobResponse,
BatchResponse,
InventOnlyRequest,
InventOnlyResponse,
InventedEssentialSchema,
)
from services.generator import ad_generator
from services.auth_dependency import get_current_user
router = APIRouter(tags=["extensive"])
_extensive_jobs: Dict[str, Dict[str, Any]] = {}
async def _run_extensive_job_async(
job_id: str,
username: str,
effective_niche: str,
target_audience: Optional[str],
offer: Optional[str],
num_images: int,
image_model: Optional[str],
num_strategies: int,
use_creative_inventor: bool = True,
trend_context: Optional[str] = None,
):
"""Run extensive generation on the main event loop."""
import logging
api_logger = logging.getLogger("api")
try:
results = await ad_generator.generate_ad_extensive(
niche=effective_niche,
target_audience=target_audience,
offer=offer,
num_images=num_images,
image_model=image_model,
num_strategies=num_strategies,
username=username,
use_creative_inventor=use_creative_inventor,
trend_context=trend_context,
)
_extensive_jobs[job_id]["status"] = "completed"
_extensive_jobs[job_id]["result"] = BatchResponse(count=len(results), ads=results)
except Exception as e:
api_logger.exception("Extensive job %s failed", job_id)
_extensive_jobs[job_id]["status"] = "failed"
_extensive_jobs[job_id]["error"] = str(e)
@router.post("/extensive/generate", status_code=202)
async def generate_extensive(
request: ExtensiveGenerateRequest,
username: str = Depends(get_current_user),
):
"""
Start extensive ad generation. Returns 202 with job_id.
Poll GET /extensive/status/{job_id} then GET /extensive/result/{job_id}.
"""
if request.niche == "others":
if not request.custom_niche or not request.custom_niche.strip():
raise HTTPException(status_code=400, detail="custom_niche is required when niche is 'others'")
effective_niche = request.custom_niche.strip()
else:
effective_niche = request.niche
job_id = str(uuid.uuid4())
_extensive_jobs[job_id] = {
"status": "running",
"result": None,
"error": None,
"username": username,
}
asyncio.create_task(
_run_extensive_job_async(
job_id,
username,
effective_niche,
request.target_audience,
request.offer,
request.num_images,
request.image_model,
request.num_strategies,
getattr(request, "use_creative_inventor", True),
getattr(request, "trend_context", None),
)
)
return ExtensiveJobResponse(job_id=job_id)
@router.get("/extensive/status/{job_id}")
async def extensive_job_status(
job_id: str,
username: str = Depends(get_current_user),
):
"""Get status of an extensive generation job."""
if job_id not in _extensive_jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = _extensive_jobs[job_id]
if job["username"] != username:
raise HTTPException(status_code=404, detail="Job not found")
return {
"job_id": job_id,
"status": job["status"],
"error": job.get("error") if job["status"] == "failed" else None,
}
@router.get("/extensive/result/{job_id}", response_model=BatchResponse)
async def extensive_job_result(
job_id: str,
username: str = Depends(get_current_user),
):
"""Get result of a completed extensive generation job. 425 if still running."""
if job_id not in _extensive_jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = _extensive_jobs[job_id]
if job["username"] != username:
raise HTTPException(status_code=404, detail="Job not found")
if job["status"] == "running":
raise HTTPException(status_code=425, detail="Generation still in progress")
if job["status"] == "failed":
raise HTTPException(status_code=500, detail=job.get("error", "Generation failed"))
return job["result"]
@router.post("/extensive/invent", response_model=InventOnlyResponse)
async def invent_only(
request: InventOnlyRequest,
username: str = Depends(get_current_user),
):
"""
Invent new ad angles, concepts, visuals, and psychological triggers only (no ad generation).
Returns structured essentials for review, export, or later use in generation.
"""
from services.creative_inventor import creative_inventor_service
niche_display = request.niche.replace("_", " ").title()
offer = request.offer or f"Get the best {niche_display} solution"
essentials = await asyncio.to_thread(
creative_inventor_service.invent,
niche=niche_display,
offer=offer,
n=request.n,
target_audience_hint=request.target_audience,
trend_context=request.trend_context,
)
schema_list = [
InventedEssentialSchema(
psychology_trigger=e.psychology_trigger,
angles=e.angles,
concepts=e.concepts,
visual_directions=e.visual_directions,
hooks=getattr(e, "hooks", []) or [],
visual_styles=getattr(e, "visual_styles", []) or [],
target_audience=getattr(e, "target_audience", "") or "",
)
for e in essentials
]
export_text = None
if request.export_as_text and essentials:
export_text = await asyncio.to_thread(
creative_inventor_service.invent_and_export,
niche=niche_display,
offer=offer,
essentials=essentials,
target_audience_hint=request.target_audience,
)
return InventOnlyResponse(essentials=schema_list, export_text=export_text)