|
|
import os |
|
|
from datetime import datetime |
|
|
from pymongo import MongoClient |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
MONGO_USERNAME = os.getenv("MONGO_USERNAME") |
|
|
MONGO_PASSWORD = os.getenv("MONGO_PASSWORD") |
|
|
MONGO_HOST = os.getenv("MONGO_HOST") |
|
|
MONGO_DB = os.getenv("MONGO_DB") |
|
|
COLLECTION_NAME = os.getenv("MONGO_COLLECTION", "analyses") |
|
|
|
|
|
|
|
|
if MONGO_USERNAME and MONGO_PASSWORD: |
|
|
MONGO_URI = f"mongodb://{MONGO_USERNAME}:{MONGO_PASSWORD}@{MONGO_HOST}/{MONGO_DB}" |
|
|
else: |
|
|
MONGO_URI = f"mongodb://{MONGO_HOST}/{MONGO_DB}" |
|
|
|
|
|
|
|
|
_client = None |
|
|
_db = None |
|
|
_collection = None |
|
|
|
|
|
def get_database_connection(): |
|
|
"""Initialize MongoDB connection lazily""" |
|
|
global _client, _db, _collection |
|
|
|
|
|
if _client is None: |
|
|
_client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) |
|
|
_client.admin.command('ping') |
|
|
_db = _client[MONGO_DB] |
|
|
_collection = _db[COLLECTION_NAME] |
|
|
|
|
|
return _client, _db, _collection |
|
|
|
|
|
def insert_analysis_result( |
|
|
video_name: str, |
|
|
offer_details: str, |
|
|
target_audience: str, |
|
|
specific_hook: str, |
|
|
additional_context: str, |
|
|
response: dict |
|
|
): |
|
|
"""Insert a new video analysis result""" |
|
|
try: |
|
|
client, db, collection = get_database_connection() |
|
|
document = { |
|
|
"video_name": video_name, |
|
|
"offer_details": offer_details, |
|
|
"target_audience": target_audience, |
|
|
"specific_hook": specific_hook, |
|
|
"additional_context": additional_context, |
|
|
"response": response, |
|
|
"created_at": datetime.utcnow() |
|
|
} |
|
|
result = collection.insert_one(document) |
|
|
return str(result.inserted_id) |
|
|
except Exception as e: |
|
|
raise e |
|
|
|
|
|
def get_all_results(limit: int = 20): |
|
|
"""Fetch all saved results, sorted by newest""" |
|
|
try: |
|
|
client, db, collection = get_database_connection() |
|
|
return list(collection.find().sort("created_at", -1).limit(limit)) |
|
|
except Exception: |
|
|
return [] |
|
|
|
|
|
def get_result_by_id(doc_id): |
|
|
"""Fetch a specific analysis by _id""" |
|
|
try: |
|
|
from bson import ObjectId |
|
|
client, db, collection = get_database_connection() |
|
|
return collection.find_one({"_id": ObjectId(doc_id)}) |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
def delete_result_by_id(doc_id): |
|
|
"""Delete a specific analysis by _id""" |
|
|
try: |
|
|
from bson import ObjectId |
|
|
client, db, collection = get_database_connection() |
|
|
result = collection.delete_one({"_id": ObjectId(doc_id)}) |
|
|
return result.deleted_count > 0 |
|
|
except Exception: |
|
|
return False |