from __future__ import annotations import os, logging from typing import Optional, List, Dict, Any, Tuple from datetime import datetime from bson import ObjectId from pymongo import ASCENDING, DESCENDING from database.connections import ( get_video_collection, get_script_collection, get_results_collection, get_image_collection, ) log = logging.getLogger(__name__) # ------------------- Index Setup ------------------- def _ensure_results_indexes(col): try: col.create_index([("created_at", DESCENDING)]) col.create_index([("type", ASCENDING), ("created_at", DESCENDING)]) col.create_index([("category", ASCENDING), ("created_at", DESCENDING)]) col.create_index([("created_by", ASCENDING), ("created_at", DESCENDING)]) except Exception: pass def _ensure_video_indexes(col): try: col.create_index([("created_at", DESCENDING)]) col.create_index([("category", ASCENDING), ("created_at", DESCENDING)]) col.create_index([("created_by", ASCENDING), ("created_at", DESCENDING)]) except Exception: pass _rc = get_results_collection() if _rc is not None: _ensure_results_indexes(_rc) _vc = get_video_collection() if _vc is not None: _ensure_video_indexes(_vc) _sg = get_script_collection() if _sg is not None: _ensure_video_indexes(_sg) _ia = get_image_collection() if _ia is not None: _ensure_results_indexes(_ia) # ------------------- Image Jobs ------------------- def start_job( col, *, type: str, created_by: str, category: str, inputs: Dict[str, Any], settings: Dict[str, Any], user_prompt: Optional[str] = None, ) -> str: now = datetime.utcnow() doc: Dict[str, Any] = { "type": type, "source": "variation" if type == "variation" else "text", "category": category or "general", "prompt": user_prompt, "status": "in_progress", "urls": [], "inputs": inputs or {}, "settings": settings or {}, "created_by": created_by, "created_at": now, } if "file_name" in (inputs or {}): doc["file_name"] = inputs["file_name"] res = col.insert_one(doc) return str(res.inserted_id) def finish_job( col, job_id: str, *, status: str = "completed", outputs_urls: Optional[List[str]] = None, provider_update: Optional[Dict[str, Any]] = None, ) -> None: set_fields: Dict[str, Any] = {"status": status} if outputs_urls is not None: set_fields["urls"] = list(dict.fromkeys(outputs_urls or [])) if provider_update: for k, v in provider_update.items(): set_fields[f"provider.{k}"] = v try: col.update_one({"_id": ObjectId(job_id)}, {"$set": set_fields}) except Exception as e: log.error(f"finish_job update failed: {e}") # ------------------- Video Analyses ------------------- def insert_video_analysis( *, video_name: str, response: Dict[str, Any], category: Optional[str] = None, created_by: Optional[str] = None, analyzer_model: Optional[str] = None, video_meta: Optional[Dict[str, Any]] = None, thumbnail: str = "", ) -> Optional[str]: col = get_video_collection() if col is None: return None doc: Dict[str, Any] = { "category": (category or "general"), "video": {"name": video_name, **(video_meta or {})}, "analyzer_model": analyzer_model or os.getenv("VIDEO_ANALYZER_MODEL", "gemini-2.0-flash"), "results": response or {}, "created_by": created_by, "created_at": datetime.utcnow(), "thumbnail": thumbnail or "", } res = col.insert_one(doc) return str(res.inserted_id) def list_video_categories(created_by: Optional[str] = None) -> List[str]: col = get_video_collection() if col is None: return [] try: q = {} if created_by: q["created_by"] = created_by vals = col.distinct("category", q) return sorted({v for v in vals if v not in (None, "", [])}) except Exception: return [] def find_video_analyses( *, category: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, limit: int = 200, created_by: Optional[str] = None, ) -> List[Dict[str, Any]]: col = get_video_collection() if col is None: return [] q: Dict[str, Any] = {} if category: q["category"] = category if created_by: q["created_by"] = created_by if start_date or end_date: rng: Dict[str, Any] = {} if start_date: rng["$gte"] = start_date if end_date: rng["$lt"] = end_date q["created_at"] = rng cur = col.find(q).sort("created_at", DESCENDING).limit(max(1, int(limit))) out: List[Dict[str, Any]] = [] for d in cur: d["_id"] = str(d.get("_id")) out.append(d) return out def insert_image_analysis( *, image_name: str, response: Dict[str, Any], category: Optional[str] = None, created_by: Optional[str] = None, analyzer_model: Optional[str] = None, image_meta: Optional[Dict[str, Any]] = None, thumbnail: str = "", ) -> Optional[str]: col = get_image_collection() if col is None: raise ValueError("Image collection not available") doc: Dict[str, Any] = { "type": "image_analysis", "source": "image_analyzer", "category": category or "general", "image": {"name": image_name, **(image_meta or {})}, "analyzer_model": analyzer_model or "gpt-4o", "results": response or {}, "created_by": created_by, "created_at": datetime.utcnow(), "thumbnail": thumbnail or "", } res = col.insert_one(doc) return str(res.inserted_id) def list_image_categories(created_by: Optional[str] = None) -> List[str]: col = get_image_collection() if col is None: return [] try: q = {"type": "image_analysis"} if created_by: q["created_by"] = created_by vals = col.distinct("category", q) return sorted({v for v in vals if v not in (None, "", [])}) except Exception: return [] def find_image_analyses( *, category: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, limit: int = 200, created_by: Optional[str] = None, ) -> List[Dict[str, Any]]: col = get_image_collection() if col is None: return [] q: Dict[str, Any] = {"type": "image_analysis"} if category: q["category"] = category if created_by: q["created_by"] = created_by if start_date or end_date: rng: Dict[str, Any] = {} if start_date: rng["$gte"] = start_date if end_date: rng["$lt"] = end_date q["created_at"] = rng cur = col.find(q).sort("created_at", DESCENDING).limit(max(1, int(limit))) out: List[Dict[str, Any]] = [] for d in cur: d["_id"] = str(d.get("_id")) out.append(d) return out # ------------------- Generation Jobs ------------------- def find_generation_jobs( *, category: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, page: int = 0, page_size: int = 20, created_by: Optional[str] = None, ) -> Tuple[List[Dict[str, Any]], int]: col = get_results_collection() if col is None: return [], 0 q: Dict[str, Any] = {"type": "generation", "source": "text"} if category: q["category"] = category if created_by: q["created_by"] = created_by if start_date or end_date: rng: Dict[str, Any] = {} if start_date: rng["$gte"] = start_date if end_date: rng["$lt"] = end_date q["created_at"] = rng total = col.count_documents(q) cur = ( col.find(q) .sort("created_at", DESCENDING) .skip(page * page_size) .limit(page_size) ) out: List[Dict[str, Any]] = [] for d in cur: d["_id"] = str(d.get("_id")) out.append(d) return out, int(total) # ------------------- Script Results ------------------- def insert_script_result( *, video_name: str, offer_details: str, target_audience: str, specific_hook: str, additional_context: str, response: List[Dict[str, Any]], thumbnail: str = "", created_by: Optional[str] = None, num_scripts: Optional[int] = None, category: Optional[str] = None, ) -> None: col = get_script_collection() if col is None: raise ValueError("Script collection not available.") doc: Dict[str, Any] = { "type": "script", "source": "script_generator", "video_name": video_name, "category": category or "general", "offer_details": offer_details, "target_audience": target_audience, "specific_hook": specific_hook, "additional_context": additional_context, "response": response, "thumbnail": thumbnail, "created_by": created_by, "num_scripts": num_scripts, "created_at": datetime.utcnow(), } try: col.insert_one(doc) except Exception as e: raise ValueError(f"Failed to insert script result: {e}") def find_script_results( *, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, page: int = 0, page_size: int = 20, created_by: Optional[str] = None, video_name_query: Optional[str] = None, ) -> Tuple[List[Dict[str, Any]], int]: col = get_script_collection() if col is None: return [], 0 q: Dict[str, Any] = {"type": "script", "source": "script_generator"} if start_date or end_date: rng: Dict[str, Any] = {} if start_date: rng["$gte"] = start_date if end_date: rng["$lt"] = end_date q["created_at"] = rng if created_by: q["created_by"] = created_by if video_name_query: q["video_name"] = {"$regex": video_name_query, "$options": "i"} total = col.count_documents(q) cur = ( col.find(q) .sort("created_at", DESCENDING) .skip(page * page_size) .limit(page_size) ) out: List[Dict[str, Any]] = [] for d in cur: d["_id"] = str(d.get("_id")) out.append(d) return out, int(total)