File size: 2,696 Bytes
734acc3 1f7922f 734acc3 aa44385 734acc3 aa44385 1f7922f aa44385 734acc3 aa44385 4ae889a 734acc3 aa44385 4ae889a aa44385 734acc3 aa44385 4ae889a aa44385 734acc3 aa44385 4ae889a aa44385 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
import os
from datetime import datetime
from pymongo import MongoClient
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# MongoDB Configuration
MONGO_USERNAME = os.getenv("MONGO_USERNAME")
MONGO_PASSWORD = os.getenv("MONGO_PASSWORD")
MONGO_HOST = os.getenv("MONGO_HOST")
MONGO_DB = os.getenv("MONGO_DB")
COLLECTION_NAME = os.getenv("MONGO_COLLECTION", "analyses")
# Build MongoDB URI
if MONGO_USERNAME and MONGO_PASSWORD:
MONGO_URI = f"mongodb://{MONGO_USERNAME}:{MONGO_PASSWORD}@{MONGO_HOST}/{MONGO_DB}"
else:
MONGO_URI = f"mongodb://{MONGO_HOST}/{MONGO_DB}"
# Global variables for lazy initialization
_client = None
_db = None
_collection = None
def get_database_connection():
"""Initialize MongoDB connection lazily"""
global _client, _db, _collection
if _client is None:
_client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
_client.admin.command('ping')
_db = _client[MONGO_DB]
_collection = _db[COLLECTION_NAME]
return _client, _db, _collection
def insert_analysis_result(
video_name: str,
offer_details: str,
target_audience: str,
specific_hook: str,
additional_context: str,
response: dict
):
"""Insert a new video analysis result"""
try:
client, db, collection = get_database_connection()
document = {
"video_name": video_name,
"offer_details": offer_details,
"target_audience": target_audience,
"specific_hook": specific_hook,
"additional_context": additional_context,
"response": response,
"created_at": datetime.utcnow()
}
result = collection.insert_one(document)
return str(result.inserted_id)
except Exception as e:
raise e
def get_all_results(limit: int = 20):
"""Fetch all saved results, sorted by newest"""
try:
client, db, collection = get_database_connection()
return list(collection.find().sort("created_at", -1).limit(limit))
except Exception:
return []
def get_result_by_id(doc_id):
"""Fetch a specific analysis by _id"""
try:
from bson import ObjectId
client, db, collection = get_database_connection()
return collection.find_one({"_id": ObjectId(doc_id)})
except Exception:
return None
def delete_result_by_id(doc_id):
"""Delete a specific analysis by _id"""
try:
from bson import ObjectId
client, db, collection = get_database_connection()
result = collection.delete_one({"_id": ObjectId(doc_id)})
return result.deleted_count > 0
except Exception:
return False |