| """ |
| User Query Interface — Phase 3, Story S3.1 |
| Handles user reel requests, validates inputs, and dispatches to orchestrator. |
| """ |
|
|
| import logging |
| from typing import Optional, Dict, Any |
| from uuid import uuid4, UUID |
|
|
| from ..models.schemas import ( |
| ReelRequest, |
| ReelQueryRequest, |
| ReelQueryResponse, |
| RequestStatus, |
| DurationTarget, |
| Platform, |
| Tone, |
| AspectRatio, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class QueryInterface: |
| """ |
| User-facing query interface for the Reel Creator Platform. |
| |
| Responsibilities: |
| 1. Accept and validate user queries with structured controls |
| 2. Package into ReelRequest objects |
| 3. Dispatch to orchestrator with proper status tracking |
| 4. Return status responses for UI polling |
| |
| Architecture: |
| User Input -> Validation -> ReelRequest Creation -> |
| Status Tracking -> Orchestrator Dispatch -> Response |
| """ |
| |
| |
| PLATFORM_DEFAULTS = { |
| Platform.INSTAGRAM_REELS: AspectRatio.NINE_SIXTEEN, |
| Platform.TIKTOK: AspectRatio.NINE_SIXTEEN, |
| Platform.YOUTUBE_SHORTS: AspectRatio.NINE_SIXTEEN, |
| Platform.LINKEDIN: AspectRatio.ONE_ONE, |
| Platform.TWITTER: AspectRatio.ONE_ONE, |
| Platform.FACEBOOK: AspectRatio.FOUR_FIVE, |
| Platform.CUSTOM: AspectRatio.NINE_SIXTEEN, |
| } |
| |
| |
| DURATION_MS = { |
| DurationTarget.TEN_SECONDS: 10000, |
| DurationTarget.TWENTY_SECONDS: 20000, |
| DurationTarget.THIRTY_SECONDS: 30000, |
| DurationTarget.SIXTY_SECONDS: 60000, |
| DurationTarget.CUSTOM: None, |
| } |
| |
| def __init__(self, db_connection=None): |
| self.db = db_connection |
| |
| def validate_query( |
| self, |
| request: ReelQueryRequest |
| ) -> tuple[bool, Optional[str]]: |
| """ |
| Validate a user reel query. |
| |
| Checks: |
| - Query text length (5-2000 chars) |
| - Duration/platform compatibility |
| - Tone consistency with query text |
| - Brand config existence (if provided) |
| """ |
| |
| |
| |
| if len(request.user_query.strip()) < 5: |
| return False, "Query must contain at least 5 characters" |
| |
| |
| tone_keywords = { |
| Tone.SPORTY: ["sport", "fast", "speed", "racing", "aggressive", "dynamic"], |
| Tone.ELEGANT: ["elegant", "luxury", "premium", "sophisticated", "refined"], |
| Tone.TECHNICAL: ["tech", "technology", "engineering", "specs", "performance", "innovation"], |
| Tone.LUXURY: ["luxury", "premium", "exclusive", "handcrafted", "bespoke"], |
| Tone.ADVENTURE: ["adventure", "off-road", "explore", "journey", "wild"], |
| Tone.MINIMAL: ["minimal", "clean", "simple", "essence", "pure"], |
| Tone.DYNAMIC: ["dynamic", "motion", "movement", "action", "energy"], |
| Tone.SERENE: ["serene", "calm", "peaceful", "tranquil", "quiet"], |
| } |
| |
| query_lower = request.user_query.lower() |
| detected_tones = [] |
| for tone, keywords in tone_keywords.items(): |
| if any(kw in query_lower for kw in keywords): |
| detected_tones.append(tone) |
| |
| |
| if detected_tones and request.tone not in detected_tones: |
| logger.info( |
| f"Query tone mismatch: requested={request.tone.value}, " |
| f"detected={[t.value for t in detected_tones]}" |
| ) |
| |
| |
| duration_ms = self.DURATION_MS.get(request.duration_target) |
| if duration_ms and request.platform in [Platform.INSTAGRAM_REELS, Platform.TIKTOK]: |
| if duration_ms > 90000: |
| return False, f"{request.platform.value} max duration is 90s" |
| |
| return True, None |
| |
| def create_reel_request( |
| self, |
| request: ReelQueryRequest, |
| brand_config_id: Optional[UUID] = None, |
| ) -> ReelRequest: |
| """ |
| Create a validated ReelRequest from user input. |
| |
| Auto-computes: |
| - aspect_ratio from platform defaults (if not specified) |
| - duration_ms from duration_target |
| - status = PENDING |
| """ |
| is_valid, error = self.validate_query(request) |
| if not is_valid: |
| raise ValueError(f"Invalid query: {error}") |
| |
| |
| aspect_ratio = request.aspect_ratio or self.PLATFORM_DEFAULTS.get( |
| request.platform, AspectRatio.NINE_SIXTEEN |
| ) |
| |
| |
| duration_ms = self.DURATION_MS.get(request.duration_target) |
| if request.duration_target == DurationTarget.CUSTOM and request.additional_constraints: |
| custom_duration = request.additional_constraints.get("custom_duration_ms") |
| if custom_duration: |
| duration_ms = int(custom_duration) |
| |
| reel_request = ReelRequest( |
| id=uuid4(), |
| user_query=request.user_query.strip(), |
| duration_target=request.duration_target, |
| duration_ms=duration_ms, |
| platform=request.platform, |
| tone=request.tone, |
| aspect_ratio=aspect_ratio, |
| brand_config_id=brand_config_id, |
| additional_constraints=request.additional_constraints, |
| status=RequestStatus.PENDING, |
| ) |
| |
| logger.info( |
| f"Created reel request {reel_request.id}: " |
| f"{request.user_query[:60]}... | " |
| f"{request.duration_target.value} | " |
| f"{request.platform.value} | " |
| f"{request.tone.value}" |
| ) |
| |
| return reel_request |
| |
| def update_status( |
| self, |
| request_id: UUID, |
| status: RequestStatus, |
| error_message: Optional[str] = None, |
| ) -> None: |
| """Update the status of a reel request in the database.""" |
| if self.db is None: |
| logger.warning("No database connection, status update skipped") |
| return |
| |
| |
| |
| logger.info(f"Updated request {request_id} status: {status.value}") |
| |
| def get_request_status( |
| self, |
| request_id: UUID |
| ) -> ReelQueryResponse: |
| """ |
| Get current status of a reel request for UI polling. |
| |
| Returns ReelQueryResponse with current status and any results. |
| """ |
| |
| |
| return ReelQueryResponse( |
| request_id=str(request_id), |
| status=RequestStatus.PENDING, |
| message="Request is being processed", |
| ) |
| |
| def package_for_orchestrator( |
| self, |
| reel_request: ReelRequest, |
| brand_config: Optional[Dict[str, Any]] = None, |
| asset_summary: Optional[Dict[str, Any]] = None, |
| brochure_summary: Optional[Dict[str, Any]] = None, |
| ) -> Dict[str, Any]: |
| """ |
| Package all context needed by the LLM orchestrator. |
| |
| Returns a dict with: |
| - request: ReelRequest |
| - brand_config: structured brand guidelines |
| - asset_summary: summary of available assets (type counts, metadata distributions) |
| - brochure_summary: summary of brochure content by section |
| - platform_constraints: platform-specific rules |
| """ |
| platform_constraints = self._get_platform_constraints(reel_request.platform) |
| |
| return { |
| "request": reel_request.model_dump(), |
| "brand_config": brand_config or {}, |
| "asset_summary": asset_summary or {}, |
| "brochure_summary": brochure_summary or {}, |
| "platform_constraints": platform_constraints, |
| "timestamp": str(reel_request.created_at) if reel_request.created_at else None, |
| } |
| |
| def _get_platform_constraints( |
| self, |
| platform: Platform |
| ) -> Dict[str, Any]: |
| """Get platform-specific constraints for reel generation.""" |
| constraints = { |
| Platform.INSTAGRAM_REELS: { |
| "max_duration_seconds": 90, |
| "recommended_duration_seconds": [15, 30, 60], |
| "aspect_ratio": "9:16", |
| "min_resolution": "720x1280", |
| "caption_style": "short_punchy", |
| "safe_zone_top": 0.15, |
| "safe_zone_bottom": 0.20, |
| }, |
| Platform.TIKTOK: { |
| "max_duration_seconds": 180, |
| "recommended_duration_seconds": [15, 30, 60], |
| "aspect_ratio": "9:16", |
| "min_resolution": "720x1280", |
| "caption_style": "trendy_hashtags", |
| "safe_zone_top": 0.10, |
| "safe_zone_bottom": 0.25, |
| }, |
| Platform.YOUTUBE_SHORTS: { |
| "max_duration_seconds": 60, |
| "recommended_duration_seconds": [15, 30, 58], |
| "aspect_ratio": "9:16", |
| "min_resolution": "720x1280", |
| "caption_style": "descriptive", |
| "safe_zone_top": 0.10, |
| "safe_zone_bottom": 0.20, |
| }, |
| Platform.LINKEDIN: { |
| "max_duration_seconds": 600, |
| "recommended_duration_seconds": [30, 60, 120], |
| "aspect_ratio": "1:1", |
| "min_resolution": "1080x1080", |
| "caption_style": "professional", |
| "safe_zone_top": 0.10, |
| "safe_zone_bottom": 0.15, |
| }, |
| Platform.TWITTER: { |
| "max_duration_seconds": 140, |
| "recommended_duration_seconds": [30, 60, 120], |
| "aspect_ratio": "1:1", |
| "min_resolution": "720x720", |
| "caption_style": "concise", |
| "safe_zone_top": 0.10, |
| "safe_zone_bottom": 0.15, |
| }, |
| Platform.FACEBOOK: { |
| "max_duration_seconds": 240, |
| "recommended_duration_seconds": [30, 60, 120], |
| "aspect_ratio": "4:5", |
| "min_resolution": "1080x1350", |
| "caption_style": "engaging", |
| "safe_zone_top": 0.10, |
| "safe_zone_bottom": 0.18, |
| }, |
| Platform.CUSTOM: { |
| "max_duration_seconds": 300, |
| "recommended_duration_seconds": [15, 30, 60], |
| "aspect_ratio": "9:16", |
| "min_resolution": "720x1280", |
| "caption_style": "flexible", |
| "safe_zone_top": 0.10, |
| "safe_zone_bottom": 0.20, |
| }, |
| } |
| |
| return constraints.get(platform, constraints[Platform.CUSTOM]) |
|
|
|
|
| |
| |
| |
|
|
| from fastapi import APIRouter, HTTPException, BackgroundTasks |
| from pydantic import BaseModel |
|
|
| router = APIRouter(prefix="/api/v1/reels", tags=["reels"]) |
|
|
| |
| _request_store: Dict[str, ReelRequest] = {} |
|
|
|
|
| class CreateReelRequest(BaseModel): |
| user_query: str |
| duration_target: str = "20s" |
| platform: str = "instagram_reels" |
| tone: str = "sporty" |
| aspect_ratio: Optional[str] = None |
| brand_config_id: Optional[str] = None |
| additional_constraints: Optional[Dict[str, Any]] = None |
|
|
|
|
| @router.post("/create", response_model=ReelQueryResponse) |
| async def create_reel( |
| request: CreateReelRequest, |
| background_tasks: BackgroundTasks, |
| ): |
| """ |
| Create a new reel generation request. |
| |
| This endpoint validates the user query, creates a ReelRequest, |
| and dispatches to the orchestrator pipeline. |
| """ |
| try: |
| |
| query = ReelQueryRequest( |
| user_query=request.user_query, |
| duration_target=DurationTarget(request.duration_target), |
| platform=Platform(request.platform), |
| tone=Tone(request.tone), |
| aspect_ratio=AspectRatio(request.aspect_ratio) if request.aspect_ratio else None, |
| brand_config_id=request.brand_config_id, |
| additional_constraints=request.additional_constraints, |
| ) |
| |
| |
| interface = QueryInterface() |
| brand_uuid = None |
| if request.brand_config_id: |
| brand_uuid = UUID(request.brand_config_id) |
| |
| reel_request = interface.create_reel_request(query, brand_config_id=brand_uuid) |
| |
| |
| _request_store[str(reel_request.id)] = reel_request |
| |
| |
| interface.update_status(reel_request.id, RequestStatus.PLANNING) |
| |
| |
| |
| return ReelQueryResponse( |
| request_id=str(reel_request.id), |
| status=RequestStatus.PLANNING, |
| message="Reel request accepted and planning has begun", |
| estimated_completion_seconds=30, |
| ) |
| |
| except ValueError as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
| except Exception as e: |
| logger.exception("Error creating reel request") |
| raise HTTPException(status_code=500, detail="Internal server error") |
|
|
|
|
| @router.get("/status/{request_id}", response_model=ReelQueryResponse) |
| async def get_reel_status(request_id: str): |
| """Get the current status of a reel generation request.""" |
| reel_request = _request_store.get(request_id) |
| if not reel_request: |
| raise HTTPException(status_code=404, detail="Request not found") |
| |
| return ReelQueryResponse( |
| request_id=request_id, |
| status=reel_request.status, |
| message=f"Current status: {reel_request.status.value}", |
| ) |
|
|
|
|
| @router.post("/regenerate/{request_id}", response_model=ReelQueryResponse) |
| async def regenerate_reel( |
| request_id: str, |
| request: CreateReelRequest, |
| ): |
| """Regenerate a reel with modified parameters.""" |
| old_request = _request_store.get(request_id) |
| if not old_request: |
| raise HTTPException(status_code=404, detail="Request not found") |
| |
| |
| new_query = ReelQueryRequest( |
| user_query=request.user_query or old_request.user_query, |
| duration_target=DurationTarget(request.duration_target) if request.duration_target else old_request.duration_target, |
| platform=Platform(request.platform) if request.platform else old_request.platform, |
| tone=Tone(request.tone) if request.tone else old_request.tone, |
| ) |
| |
| interface = QueryInterface() |
| new_reel_request = interface.create_reel_request( |
| new_query, |
| brand_config_id=old_request.brand_config_id, |
| ) |
| |
| _request_store[str(new_reel_request.id)] = new_reel_request |
| |
| return ReelQueryResponse( |
| request_id=str(new_reel_request.id), |
| status=RequestStatus.PLANNING, |
| message="Regeneration request accepted", |
| estimated_completion_seconds=30, |
| ) |
|
|