File size: 6,146 Bytes
d4a4da7 026f283 d4a4da7 026f283 d4a4da7 026f283 d4a4da7 026f283 d4a4da7 026f283 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
"""Extensive generation (researcher → creative director → designer → copywriter)."""
import asyncio
import uuid
from typing import Dict, Any, Optional
from fastapi import APIRouter, HTTPException, Depends
from api.schemas import (
ExtensiveGenerateRequest,
ExtensiveJobResponse,
BatchResponse,
InventOnlyRequest,
InventOnlyResponse,
InventedEssentialSchema,
)
from services.generator import ad_generator
from services.auth_dependency import get_current_user
router = APIRouter(tags=["extensive"])
_extensive_jobs: Dict[str, Dict[str, Any]] = {}
async def _run_extensive_job_async(
job_id: str,
username: str,
effective_niche: str,
target_audience: Optional[str],
offer: Optional[str],
num_images: int,
image_model: Optional[str],
num_strategies: int,
use_creative_inventor: bool = True,
trend_context: Optional[str] = None,
):
"""Run extensive generation on the main event loop."""
import logging
api_logger = logging.getLogger("api")
try:
results = await ad_generator.generate_ad_extensive(
niche=effective_niche,
target_audience=target_audience,
offer=offer,
num_images=num_images,
image_model=image_model,
num_strategies=num_strategies,
username=username,
use_creative_inventor=use_creative_inventor,
trend_context=trend_context,
)
_extensive_jobs[job_id]["status"] = "completed"
_extensive_jobs[job_id]["result"] = BatchResponse(count=len(results), ads=results)
except Exception as e:
api_logger.exception("Extensive job %s failed", job_id)
_extensive_jobs[job_id]["status"] = "failed"
_extensive_jobs[job_id]["error"] = str(e)
@router.post("/extensive/generate", status_code=202)
async def generate_extensive(
request: ExtensiveGenerateRequest,
username: str = Depends(get_current_user),
):
"""
Start extensive ad generation. Returns 202 with job_id.
Poll GET /extensive/status/{job_id} then GET /extensive/result/{job_id}.
"""
if request.niche == "others":
if not request.custom_niche or not request.custom_niche.strip():
raise HTTPException(status_code=400, detail="custom_niche is required when niche is 'others'")
effective_niche = request.custom_niche.strip()
else:
effective_niche = request.niche
job_id = str(uuid.uuid4())
_extensive_jobs[job_id] = {
"status": "running",
"result": None,
"error": None,
"username": username,
}
asyncio.create_task(
_run_extensive_job_async(
job_id,
username,
effective_niche,
request.target_audience,
request.offer,
request.num_images,
request.image_model,
request.num_strategies,
getattr(request, "use_creative_inventor", True),
getattr(request, "trend_context", None),
)
)
return ExtensiveJobResponse(job_id=job_id)
@router.get("/extensive/status/{job_id}")
async def extensive_job_status(
job_id: str,
username: str = Depends(get_current_user),
):
"""Get status of an extensive generation job."""
if job_id not in _extensive_jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = _extensive_jobs[job_id]
if job["username"] != username:
raise HTTPException(status_code=404, detail="Job not found")
return {
"job_id": job_id,
"status": job["status"],
"error": job.get("error") if job["status"] == "failed" else None,
}
@router.get("/extensive/result/{job_id}", response_model=BatchResponse)
async def extensive_job_result(
job_id: str,
username: str = Depends(get_current_user),
):
"""Get result of a completed extensive generation job. 425 if still running."""
if job_id not in _extensive_jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = _extensive_jobs[job_id]
if job["username"] != username:
raise HTTPException(status_code=404, detail="Job not found")
if job["status"] == "running":
raise HTTPException(status_code=425, detail="Generation still in progress")
if job["status"] == "failed":
raise HTTPException(status_code=500, detail=job.get("error", "Generation failed"))
return job["result"]
@router.post("/extensive/invent", response_model=InventOnlyResponse)
async def invent_only(
request: InventOnlyRequest,
username: str = Depends(get_current_user),
):
"""
Invent new ad angles, concepts, visuals, and psychological triggers only (no ad generation).
Returns structured essentials for review, export, or later use in generation.
"""
from services.creative_inventor import creative_inventor_service
niche_display = request.niche.replace("_", " ").title()
offer = request.offer or f"Get the best {niche_display} solution"
essentials = await asyncio.to_thread(
creative_inventor_service.invent,
niche=niche_display,
offer=offer,
n=request.n,
target_audience_hint=request.target_audience,
trend_context=request.trend_context,
)
schema_list = [
InventedEssentialSchema(
psychology_trigger=e.psychology_trigger,
angles=e.angles,
concepts=e.concepts,
visual_directions=e.visual_directions,
hooks=getattr(e, "hooks", []) or [],
visual_styles=getattr(e, "visual_styles", []) or [],
target_audience=getattr(e, "target_audience", "") or "",
)
for e in essentials
]
export_text = None
if request.export_as_text and essentials:
export_text = await asyncio.to_thread(
creative_inventor_service.invent_and_export,
niche=niche_display,
offer=offer,
essentials=essentials,
target_audience_hint=request.target_audience,
)
return InventOnlyResponse(essentials=schema_list, export_text=export_text)
|