AdGenesis-App / database /operations.py
userIdc2024's picture
Update database/operations.py
3189c1a verified
from __future__ import annotations
import os, logging
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime
from bson import ObjectId
from pymongo import ASCENDING, DESCENDING
from database.connections import (
get_video_collection,
get_script_collection,
get_results_collection,
get_image_collection,
)
log = logging.getLogger(__name__)
# ------------------- Index Setup -------------------
def _ensure_results_indexes(col):
try:
col.create_index([("created_at", DESCENDING)])
col.create_index([("type", ASCENDING), ("created_at", DESCENDING)])
col.create_index([("category", ASCENDING), ("created_at", DESCENDING)])
col.create_index([("created_by", ASCENDING), ("created_at", DESCENDING)])
except Exception:
pass
def _ensure_video_indexes(col):
try:
col.create_index([("created_at", DESCENDING)])
col.create_index([("category", ASCENDING), ("created_at", DESCENDING)])
col.create_index([("created_by", ASCENDING), ("created_at", DESCENDING)])
except Exception:
pass
_rc = get_results_collection()
if _rc is not None:
_ensure_results_indexes(_rc)
_vc = get_video_collection()
if _vc is not None:
_ensure_video_indexes(_vc)
_sg = get_script_collection()
if _sg is not None:
_ensure_video_indexes(_sg)
_ia = get_image_collection()
if _ia is not None:
_ensure_results_indexes(_ia)
# ------------------- Image Jobs -------------------
def start_job(
col,
*,
type: str,
created_by: str,
category: str,
inputs: Dict[str, Any],
settings: Dict[str, Any],
user_prompt: Optional[str] = None,
) -> str:
now = datetime.utcnow()
doc: Dict[str, Any] = {
"type": type,
"source": "variation" if type == "variation" else "text",
"category": category or "general",
"prompt": user_prompt,
"status": "in_progress",
"urls": [],
"inputs": inputs or {},
"settings": settings or {},
"created_by": created_by,
"created_at": now,
}
if "file_name" in (inputs or {}):
doc["file_name"] = inputs["file_name"]
res = col.insert_one(doc)
return str(res.inserted_id)
def finish_job(
col,
job_id: str,
*,
status: str = "completed",
outputs_urls: Optional[List[str]] = None,
provider_update: Optional[Dict[str, Any]] = None,
) -> None:
set_fields: Dict[str, Any] = {"status": status}
if outputs_urls is not None:
set_fields["urls"] = list(dict.fromkeys(outputs_urls or []))
if provider_update:
for k, v in provider_update.items():
set_fields[f"provider.{k}"] = v
try:
col.update_one({"_id": ObjectId(job_id)}, {"$set": set_fields})
except Exception as e:
log.error(f"finish_job update failed: {e}")
# ------------------- Video Analyses -------------------
def insert_video_analysis(
*,
video_name: str,
response: Dict[str, Any],
category: Optional[str] = None,
created_by: Optional[str] = None,
analyzer_model: Optional[str] = None,
video_meta: Optional[Dict[str, Any]] = None,
thumbnail: str = "",
) -> Optional[str]:
col = get_video_collection()
if col is None:
return None
doc: Dict[str, Any] = {
"category": (category or "general"),
"video": {"name": video_name, **(video_meta or {})},
"analyzer_model": analyzer_model or os.getenv("VIDEO_ANALYZER_MODEL", "gemini-2.0-flash"),
"results": response or {},
"created_by": created_by,
"created_at": datetime.utcnow(),
"thumbnail": thumbnail or "",
}
res = col.insert_one(doc)
return str(res.inserted_id)
def list_video_categories(created_by: Optional[str] = None) -> List[str]:
col = get_video_collection()
if col is None:
return []
try:
q = {}
if created_by:
q["created_by"] = created_by
vals = col.distinct("category", q)
return sorted({v for v in vals if v not in (None, "", [])})
except Exception:
return []
def find_video_analyses(
*,
category: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: int = 200,
created_by: Optional[str] = None,
) -> List[Dict[str, Any]]:
col = get_video_collection()
if col is None:
return []
q: Dict[str, Any] = {}
if category:
q["category"] = category
if created_by:
q["created_by"] = created_by
if start_date or end_date:
rng: Dict[str, Any] = {}
if start_date: rng["$gte"] = start_date
if end_date: rng["$lt"] = end_date
q["created_at"] = rng
cur = col.find(q).sort("created_at", DESCENDING).limit(max(1, int(limit)))
out: List[Dict[str, Any]] = []
for d in cur:
d["_id"] = str(d.get("_id"))
out.append(d)
return out
def insert_image_analysis(
*,
image_name: str,
response: Dict[str, Any],
category: Optional[str] = None,
created_by: Optional[str] = None,
analyzer_model: Optional[str] = None,
image_meta: Optional[Dict[str, Any]] = None,
thumbnail: str = "",
) -> Optional[str]:
col = get_image_collection()
if col is None:
raise ValueError("Image collection not available")
doc: Dict[str, Any] = {
"type": "image_analysis",
"source": "image_analyzer",
"category": category or "general",
"image": {"name": image_name, **(image_meta or {})},
"analyzer_model": analyzer_model or "gpt-4o",
"results": response or {},
"created_by": created_by,
"created_at": datetime.utcnow(),
"thumbnail": thumbnail or "",
}
res = col.insert_one(doc)
return str(res.inserted_id)
def list_image_categories(created_by: Optional[str] = None) -> List[str]:
col = get_image_collection()
if col is None:
return []
try:
q = {"type": "image_analysis"}
if created_by:
q["created_by"] = created_by
vals = col.distinct("category", q)
return sorted({v for v in vals if v not in (None, "", [])})
except Exception:
return []
def find_image_analyses(
*,
category: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: int = 200,
created_by: Optional[str] = None,
) -> List[Dict[str, Any]]:
col = get_image_collection()
if col is None:
return []
q: Dict[str, Any] = {"type": "image_analysis"}
if category:
q["category"] = category
if created_by:
q["created_by"] = created_by
if start_date or end_date:
rng: Dict[str, Any] = {}
if start_date:
rng["$gte"] = start_date
if end_date:
rng["$lt"] = end_date
q["created_at"] = rng
cur = col.find(q).sort("created_at", DESCENDING).limit(max(1, int(limit)))
out: List[Dict[str, Any]] = []
for d in cur:
d["_id"] = str(d.get("_id"))
out.append(d)
return out
# ------------------- Generation Jobs -------------------
def find_generation_jobs(
*,
category: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
page: int = 0,
page_size: int = 20,
created_by: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], int]:
col = get_results_collection()
if col is None:
return [], 0
q: Dict[str, Any] = {"type": "generation", "source": "text"}
if category:
q["category"] = category
if created_by:
q["created_by"] = created_by
if start_date or end_date:
rng: Dict[str, Any] = {}
if start_date: rng["$gte"] = start_date
if end_date: rng["$lt"] = end_date
q["created_at"] = rng
total = col.count_documents(q)
cur = (
col.find(q)
.sort("created_at", DESCENDING)
.skip(page * page_size)
.limit(page_size)
)
out: List[Dict[str, Any]] = []
for d in cur:
d["_id"] = str(d.get("_id"))
out.append(d)
return out, int(total)
# ------------------- Script Results -------------------
def insert_script_result(
*,
video_name: str,
offer_details: str,
target_audience: str,
specific_hook: str,
additional_context: str,
response: List[Dict[str, Any]],
thumbnail: str = "",
created_by: Optional[str] = None,
num_scripts: Optional[int] = None,
category: Optional[str] = None,
) -> None:
col = get_script_collection()
if col is None:
raise ValueError("Script collection not available.")
doc: Dict[str, Any] = {
"type": "script",
"source": "script_generator",
"video_name": video_name,
"category": category or "general",
"offer_details": offer_details,
"target_audience": target_audience,
"specific_hook": specific_hook,
"additional_context": additional_context,
"response": response,
"thumbnail": thumbnail,
"created_by": created_by,
"num_scripts": num_scripts,
"created_at": datetime.utcnow(),
}
try:
col.insert_one(doc)
except Exception as e:
raise ValueError(f"Failed to insert script result: {e}")
def find_script_results(
*,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
page: int = 0,
page_size: int = 20,
created_by: Optional[str] = None,
video_name_query: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], int]:
col = get_script_collection()
if col is None:
return [], 0
q: Dict[str, Any] = {"type": "script", "source": "script_generator"}
if start_date or end_date:
rng: Dict[str, Any] = {}
if start_date: rng["$gte"] = start_date
if end_date: rng["$lt"] = end_date
q["created_at"] = rng
if created_by:
q["created_by"] = created_by
if video_name_query:
q["video_name"] = {"$regex": video_name_query, "$options": "i"}
total = col.count_documents(q)
cur = (
col.find(q)
.sort("created_at", DESCENDING)
.skip(page * page_size)
.limit(page_size)
)
out: List[Dict[str, Any]] = []
for d in cur:
d["_id"] = str(d.get("_id"))
out.append(d)
return out, int(total)