from pymongo import MongoClient from pymongo.errors import ServerSelectionTimeoutError, AutoReconnect import logging from datetime import datetime from app.config import Config from typing import List, Dict, Optional class MongoDBHandler: def __init__(self): self.logger = logging.getLogger(__name__) self._ensure_mongodb_running() try: self.client = MongoClient( Config.MONGO_URI, serverSelectionTimeoutMS=5000 ) # Test connection immediately self.client.server_info() self.db = self.client[Config.DATABASE_NAME] self.collection = self.db[Config.HISTORY_COLLECTION] self._create_indexes() self.logger.info("MongoDB connection established successfully") except (ServerSelectionTimeoutError, AutoReconnect) as e: self.logger.error(f"MongoDB connection failed: {e}") self._diagnose_connection_issue() raise def _ensure_mongodb_running(self): """Ensure MongoDB service is running""" try: import subprocess result = subprocess.run( ['net', 'start', 'MongoDB'], capture_output=True, text=True ) if "already running" not in result.stderr.lower(): self.logger.info("Started MongoDB service") except Exception as e: self.logger.error(f"Failed to start MongoDB: {e}") def _diagnose_connection_issue(self): """Diagnose common MongoDB connection issues""" import os issues = [] # Check data directory if not os.path.exists("C:\\data\\db"): issues.append("Data directory missing") # Check log directory if not os.path.exists("C:\\data\\log"): issues.append("Log directory missing") # Check service status try: import subprocess result = subprocess.run( ['sc', 'query', 'MongoDB'], capture_output=True, text=True ) if "RUNNING" not in result.stdout: issues.append("MongoDB service not running") except Exception: issues.append("Could not check service status") if issues: self.logger.error("MongoDB Issues Found:") for issue in issues: self.logger.error(f" - {issue}") def _create_indexes(self): """Create indexes for better query performance""" self.collection.create_index([("session_id", 1)]) self.collection.create_index([("timestamp", -1)]) self.collection.create_index([("session_id", 1), ("timestamp", -1)]) def save_conversation(self, session_id: str, query: str, response: str, metadata: dict = None) -> str: """Save a conversation with automatic timestamp""" conversation = { "session_id": session_id, "user_query": query, "bot_response": response, "timestamp": datetime.utcnow(), "metadata": metadata or {} } result = self.collection.insert_one(conversation) return str(result.inserted_id) def verify_storage(self) -> bool: """Verify storage is working by inserting and retrieving a test document""" try: # Insert test document test_id = self.save_conversation( session_id="test", query="test_query", response="test_response", metadata={"test": True} ) # Verify retrieval test_doc = self.collection.find_one({"_id": test_id}) # Cleanup test document self.collection.delete_one({"_id": test_id}) return test_doc is not None except Exception as e: self.logger.error(f"Storage verification failed: {e}") return False def get_conversation_history(self, session_id: str, limit: int = 10) -> List[Dict]: """Retrieve conversation history for a session""" try: cursor = self.collection.find( {"session_id": session_id}, {"_id": 0} # Exclude _id field ).sort("timestamp", -1).limit(limit) return list(cursor) except Exception as e: self.logger.error(f"Error retrieving conversation history: {e}") return [] def clear_session_history(self, session_id: str) -> int: """Clear all conversations for a session""" try: result = self.collection.delete_many({"session_id": session_id}) return result.deleted_count except Exception as e: self.logger.error(f"Error clearing session history: {e}") return 0