Spaces:
Running
Running
| from __future__ import annotations | |
| import os, logging | |
| from typing import Optional, List, Dict, Any, Tuple | |
| from datetime import datetime | |
| from bson import ObjectId | |
| from pymongo import ASCENDING, DESCENDING | |
| from database.connections import ( | |
| get_video_collection, | |
| get_script_collection, | |
| get_results_collection, | |
| get_image_collection, | |
| ) | |
| log = logging.getLogger(__name__) | |
| # ------------------- Index Setup ------------------- | |
| def _ensure_results_indexes(col): | |
| try: | |
| col.create_index([("created_at", DESCENDING)]) | |
| col.create_index([("type", ASCENDING), ("created_at", DESCENDING)]) | |
| col.create_index([("category", ASCENDING), ("created_at", DESCENDING)]) | |
| col.create_index([("created_by", ASCENDING), ("created_at", DESCENDING)]) | |
| except Exception: | |
| pass | |
| def _ensure_video_indexes(col): | |
| try: | |
| col.create_index([("created_at", DESCENDING)]) | |
| col.create_index([("category", ASCENDING), ("created_at", DESCENDING)]) | |
| col.create_index([("created_by", ASCENDING), ("created_at", DESCENDING)]) | |
| except Exception: | |
| pass | |
| _rc = get_results_collection() | |
| if _rc is not None: | |
| _ensure_results_indexes(_rc) | |
| _vc = get_video_collection() | |
| if _vc is not None: | |
| _ensure_video_indexes(_vc) | |
| _sg = get_script_collection() | |
| if _sg is not None: | |
| _ensure_video_indexes(_sg) | |
| _ia = get_image_collection() | |
| if _ia is not None: | |
| _ensure_results_indexes(_ia) | |
| # ------------------- Image Jobs ------------------- | |
| def start_job( | |
| col, | |
| *, | |
| type: str, | |
| created_by: str, | |
| category: str, | |
| inputs: Dict[str, Any], | |
| settings: Dict[str, Any], | |
| user_prompt: Optional[str] = None, | |
| ) -> str: | |
| now = datetime.utcnow() | |
| doc: Dict[str, Any] = { | |
| "type": type, | |
| "source": "variation" if type == "variation" else "text", | |
| "category": category or "general", | |
| "prompt": user_prompt, | |
| "status": "in_progress", | |
| "urls": [], | |
| "inputs": inputs or {}, | |
| "settings": settings or {}, | |
| "created_by": created_by, | |
| "created_at": now, | |
| } | |
| if "file_name" in (inputs or {}): | |
| doc["file_name"] = inputs["file_name"] | |
| res = col.insert_one(doc) | |
| return str(res.inserted_id) | |
| def finish_job( | |
| col, | |
| job_id: str, | |
| *, | |
| status: str = "completed", | |
| outputs_urls: Optional[List[str]] = None, | |
| provider_update: Optional[Dict[str, Any]] = None, | |
| ) -> None: | |
| set_fields: Dict[str, Any] = {"status": status} | |
| if outputs_urls is not None: | |
| set_fields["urls"] = list(dict.fromkeys(outputs_urls or [])) | |
| if provider_update: | |
| for k, v in provider_update.items(): | |
| set_fields[f"provider.{k}"] = v | |
| try: | |
| col.update_one({"_id": ObjectId(job_id)}, {"$set": set_fields}) | |
| except Exception as e: | |
| log.error(f"finish_job update failed: {e}") | |
| # ------------------- Video Analyses ------------------- | |
| def insert_video_analysis( | |
| *, | |
| video_name: str, | |
| response: Dict[str, Any], | |
| category: Optional[str] = None, | |
| created_by: Optional[str] = None, | |
| analyzer_model: Optional[str] = None, | |
| video_meta: Optional[Dict[str, Any]] = None, | |
| thumbnail: str = "", | |
| ) -> Optional[str]: | |
| col = get_video_collection() | |
| if col is None: | |
| return None | |
| doc: Dict[str, Any] = { | |
| "category": (category or "general"), | |
| "video": {"name": video_name, **(video_meta or {})}, | |
| "analyzer_model": analyzer_model or os.getenv("VIDEO_ANALYZER_MODEL", "gemini-2.0-flash"), | |
| "results": response or {}, | |
| "created_by": created_by, | |
| "created_at": datetime.utcnow(), | |
| "thumbnail": thumbnail or "", | |
| } | |
| res = col.insert_one(doc) | |
| return str(res.inserted_id) | |
| def list_video_categories(created_by: Optional[str] = None) -> List[str]: | |
| col = get_video_collection() | |
| if col is None: | |
| return [] | |
| try: | |
| q = {} | |
| if created_by: | |
| q["created_by"] = created_by | |
| vals = col.distinct("category", q) | |
| return sorted({v for v in vals if v not in (None, "", [])}) | |
| except Exception: | |
| return [] | |
| def find_video_analyses( | |
| *, | |
| category: Optional[str] = None, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None, | |
| limit: int = 200, | |
| created_by: Optional[str] = None, | |
| ) -> List[Dict[str, Any]]: | |
| col = get_video_collection() | |
| if col is None: | |
| return [] | |
| q: Dict[str, Any] = {} | |
| if category: | |
| q["category"] = category | |
| if created_by: | |
| q["created_by"] = created_by | |
| if start_date or end_date: | |
| rng: Dict[str, Any] = {} | |
| if start_date: rng["$gte"] = start_date | |
| if end_date: rng["$lt"] = end_date | |
| q["created_at"] = rng | |
| cur = col.find(q).sort("created_at", DESCENDING).limit(max(1, int(limit))) | |
| out: List[Dict[str, Any]] = [] | |
| for d in cur: | |
| d["_id"] = str(d.get("_id")) | |
| out.append(d) | |
| return out | |
| def insert_image_analysis( | |
| *, | |
| image_name: str, | |
| response: Dict[str, Any], | |
| category: Optional[str] = None, | |
| created_by: Optional[str] = None, | |
| analyzer_model: Optional[str] = None, | |
| image_meta: Optional[Dict[str, Any]] = None, | |
| thumbnail: str = "", | |
| ) -> Optional[str]: | |
| col = get_image_collection() | |
| if col is None: | |
| raise ValueError("Image collection not available") | |
| doc: Dict[str, Any] = { | |
| "type": "image_analysis", | |
| "source": "image_analyzer", | |
| "category": category or "general", | |
| "image": {"name": image_name, **(image_meta or {})}, | |
| "analyzer_model": analyzer_model or "gpt-4o", | |
| "results": response or {}, | |
| "created_by": created_by, | |
| "created_at": datetime.utcnow(), | |
| "thumbnail": thumbnail or "", | |
| } | |
| res = col.insert_one(doc) | |
| return str(res.inserted_id) | |
| def list_image_categories(created_by: Optional[str] = None) -> List[str]: | |
| col = get_image_collection() | |
| if col is None: | |
| return [] | |
| try: | |
| q = {"type": "image_analysis"} | |
| if created_by: | |
| q["created_by"] = created_by | |
| vals = col.distinct("category", q) | |
| return sorted({v for v in vals if v not in (None, "", [])}) | |
| except Exception: | |
| return [] | |
| def find_image_analyses( | |
| *, | |
| category: Optional[str] = None, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None, | |
| limit: int = 200, | |
| created_by: Optional[str] = None, | |
| ) -> List[Dict[str, Any]]: | |
| col = get_image_collection() | |
| if col is None: | |
| return [] | |
| q: Dict[str, Any] = {"type": "image_analysis"} | |
| if category: | |
| q["category"] = category | |
| if created_by: | |
| q["created_by"] = created_by | |
| if start_date or end_date: | |
| rng: Dict[str, Any] = {} | |
| if start_date: | |
| rng["$gte"] = start_date | |
| if end_date: | |
| rng["$lt"] = end_date | |
| q["created_at"] = rng | |
| cur = col.find(q).sort("created_at", DESCENDING).limit(max(1, int(limit))) | |
| out: List[Dict[str, Any]] = [] | |
| for d in cur: | |
| d["_id"] = str(d.get("_id")) | |
| out.append(d) | |
| return out | |
| # ------------------- Generation Jobs ------------------- | |
| def find_generation_jobs( | |
| *, | |
| category: Optional[str] = None, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None, | |
| page: int = 0, | |
| page_size: int = 20, | |
| created_by: Optional[str] = None, | |
| ) -> Tuple[List[Dict[str, Any]], int]: | |
| col = get_results_collection() | |
| if col is None: | |
| return [], 0 | |
| q: Dict[str, Any] = {"type": "generation", "source": "text"} | |
| if category: | |
| q["category"] = category | |
| if created_by: | |
| q["created_by"] = created_by | |
| if start_date or end_date: | |
| rng: Dict[str, Any] = {} | |
| if start_date: rng["$gte"] = start_date | |
| if end_date: rng["$lt"] = end_date | |
| q["created_at"] = rng | |
| total = col.count_documents(q) | |
| cur = ( | |
| col.find(q) | |
| .sort("created_at", DESCENDING) | |
| .skip(page * page_size) | |
| .limit(page_size) | |
| ) | |
| out: List[Dict[str, Any]] = [] | |
| for d in cur: | |
| d["_id"] = str(d.get("_id")) | |
| out.append(d) | |
| return out, int(total) | |
| # ------------------- Script Results ------------------- | |
| def insert_script_result( | |
| *, | |
| video_name: str, | |
| offer_details: str, | |
| target_audience: str, | |
| specific_hook: str, | |
| additional_context: str, | |
| response: List[Dict[str, Any]], | |
| thumbnail: str = "", | |
| created_by: Optional[str] = None, | |
| num_scripts: Optional[int] = None, | |
| category: Optional[str] = None, | |
| ) -> None: | |
| col = get_script_collection() | |
| if col is None: | |
| raise ValueError("Script collection not available.") | |
| doc: Dict[str, Any] = { | |
| "type": "script", | |
| "source": "script_generator", | |
| "video_name": video_name, | |
| "category": category or "general", | |
| "offer_details": offer_details, | |
| "target_audience": target_audience, | |
| "specific_hook": specific_hook, | |
| "additional_context": additional_context, | |
| "response": response, | |
| "thumbnail": thumbnail, | |
| "created_by": created_by, | |
| "num_scripts": num_scripts, | |
| "created_at": datetime.utcnow(), | |
| } | |
| try: | |
| col.insert_one(doc) | |
| except Exception as e: | |
| raise ValueError(f"Failed to insert script result: {e}") | |
| def find_script_results( | |
| *, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None, | |
| page: int = 0, | |
| page_size: int = 20, | |
| created_by: Optional[str] = None, | |
| video_name_query: Optional[str] = None, | |
| ) -> Tuple[List[Dict[str, Any]], int]: | |
| col = get_script_collection() | |
| if col is None: | |
| return [], 0 | |
| q: Dict[str, Any] = {"type": "script", "source": "script_generator"} | |
| if start_date or end_date: | |
| rng: Dict[str, Any] = {} | |
| if start_date: rng["$gte"] = start_date | |
| if end_date: rng["$lt"] = end_date | |
| q["created_at"] = rng | |
| if created_by: | |
| q["created_by"] = created_by | |
| if video_name_query: | |
| q["video_name"] = {"$regex": video_name_query, "$options": "i"} | |
| total = col.count_documents(q) | |
| cur = ( | |
| col.find(q) | |
| .sort("created_at", DESCENDING) | |
| .skip(page * page_size) | |
| .limit(page_size) | |
| ) | |
| out: List[Dict[str, Any]] = [] | |
| for d in cur: | |
| d["_id"] = str(d.get("_id")) | |
| out.append(d) | |
| return out, int(total) | |