Spaces:
Sleeping
Sleeping
| import os | |
| from datetime import datetime | |
| from pymongo import MongoClient | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # MongoDB Configuration | |
| MONGO_USERNAME = os.getenv("MONGO_USERNAME") | |
| MONGO_PASSWORD = os.getenv("MONGO_PASSWORD") | |
| MONGO_HOST = os.getenv("MONGO_HOST") | |
| MONGO_DB = os.getenv("MONGO_DB") | |
| COLLECTION_NAME = os.getenv("MONGO_COLLECTION", "analyses") | |
| # Build MongoDB URI | |
| if MONGO_USERNAME and MONGO_PASSWORD: | |
| MONGO_URI = f"mongodb://{MONGO_USERNAME}:{MONGO_PASSWORD}@{MONGO_HOST}/{MONGO_DB}" | |
| else: | |
| MONGO_URI = f"mongodb://{MONGO_HOST}/{MONGO_DB}" | |
| # Global variables for lazy initialization | |
| _client = None | |
| _db = None | |
| _collection = None | |
| def get_database_connection(): | |
| """Initialize MongoDB connection lazily""" | |
| global _client, _db, _collection | |
| if _client is None: | |
| _client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) | |
| _client.admin.command('ping') | |
| _db = _client[MONGO_DB] | |
| _collection = _db[COLLECTION_NAME] | |
| return _client, _db, _collection | |
| def insert_analysis_result( | |
| video_name: str, | |
| offer_details: str, | |
| target_audience: str, | |
| specific_hook: str, | |
| additional_context: str, | |
| response: dict | |
| ): | |
| """Insert a new video analysis result""" | |
| try: | |
| client, db, collection = get_database_connection() | |
| document = { | |
| "video_name": video_name, | |
| "offer_details": offer_details, | |
| "target_audience": target_audience, | |
| "specific_hook": specific_hook, | |
| "additional_context": additional_context, | |
| "response": response, | |
| "created_at": datetime.utcnow() | |
| } | |
| result = collection.insert_one(document) | |
| return str(result.inserted_id) | |
| except Exception as e: | |
| raise e | |
| def get_all_results(limit: int = 20): | |
| """Fetch all saved results, sorted by newest""" | |
| try: | |
| client, db, collection = get_database_connection() | |
| return list(collection.find().sort("created_at", -1).limit(limit)) | |
| except Exception: | |
| return [] | |
| def get_result_by_id(doc_id): | |
| """Fetch a specific analysis by _id""" | |
| try: | |
| from bson import ObjectId | |
| client, db, collection = get_database_connection() | |
| return collection.find_one({"_id": ObjectId(doc_id)}) | |
| except Exception: | |
| return None | |
| def delete_result_by_id(doc_id): | |
| """Delete a specific analysis by _id""" | |
| try: | |
| from bson import ObjectId | |
| client, db, collection = get_database_connection() | |
| result = collection.delete_one({"_id": ObjectId(doc_id)}) | |
| return result.deleted_count > 0 | |
| except Exception: | |
| return False |