Spaces:
Sleeping
Sleeping
| from pymongo import MongoClient | |
| from pymongo.errors import ServerSelectionTimeoutError, AutoReconnect | |
| import logging | |
| from datetime import datetime | |
| from app.config import Config | |
| from typing import List, Dict, Optional | |
| class MongoDBHandler: | |
| def __init__(self): | |
| self.logger = logging.getLogger(__name__) | |
| self._ensure_mongodb_running() | |
| try: | |
| self.client = MongoClient( | |
| Config.MONGO_URI, | |
| serverSelectionTimeoutMS=5000 | |
| ) | |
| # Test connection immediately | |
| self.client.server_info() | |
| self.db = self.client[Config.DATABASE_NAME] | |
| self.collection = self.db[Config.HISTORY_COLLECTION] | |
| self._create_indexes() | |
| self.logger.info("MongoDB connection established successfully") | |
| except (ServerSelectionTimeoutError, AutoReconnect) as e: | |
| self.logger.error(f"MongoDB connection failed: {e}") | |
| self._diagnose_connection_issue() | |
| raise | |
| def _ensure_mongodb_running(self): | |
| """Ensure MongoDB service is running""" | |
| try: | |
| import subprocess | |
| result = subprocess.run( | |
| ['net', 'start', 'MongoDB'], | |
| capture_output=True, | |
| text=True | |
| ) | |
| if "already running" not in result.stderr.lower(): | |
| self.logger.info("Started MongoDB service") | |
| except Exception as e: | |
| self.logger.error(f"Failed to start MongoDB: {e}") | |
| def _diagnose_connection_issue(self): | |
| """Diagnose common MongoDB connection issues""" | |
| import os | |
| issues = [] | |
| # Check data directory | |
| if not os.path.exists("C:\\data\\db"): | |
| issues.append("Data directory missing") | |
| # Check log directory | |
| if not os.path.exists("C:\\data\\log"): | |
| issues.append("Log directory missing") | |
| # Check service status | |
| try: | |
| import subprocess | |
| result = subprocess.run( | |
| ['sc', 'query', 'MongoDB'], | |
| capture_output=True, | |
| text=True | |
| ) | |
| if "RUNNING" not in result.stdout: | |
| issues.append("MongoDB service not running") | |
| except Exception: | |
| issues.append("Could not check service status") | |
| if issues: | |
| self.logger.error("MongoDB Issues Found:") | |
| for issue in issues: | |
| self.logger.error(f" - {issue}") | |
| def _create_indexes(self): | |
| """Create indexes for better query performance""" | |
| self.collection.create_index([("session_id", 1)]) | |
| self.collection.create_index([("timestamp", -1)]) | |
| self.collection.create_index([("session_id", 1), ("timestamp", -1)]) | |
| def save_conversation(self, session_id: str, query: str, response: str, metadata: dict = None) -> str: | |
| """Save a conversation with automatic timestamp""" | |
| conversation = { | |
| "session_id": session_id, | |
| "user_query": query, | |
| "bot_response": response, | |
| "timestamp": datetime.utcnow(), | |
| "metadata": metadata or {} | |
| } | |
| result = self.collection.insert_one(conversation) | |
| return str(result.inserted_id) | |
| def verify_storage(self) -> bool: | |
| """Verify storage is working by inserting and retrieving a test document""" | |
| try: | |
| # Insert test document | |
| test_id = self.save_conversation( | |
| session_id="test", | |
| query="test_query", | |
| response="test_response", | |
| metadata={"test": True} | |
| ) | |
| # Verify retrieval | |
| test_doc = self.collection.find_one({"_id": test_id}) | |
| # Cleanup test document | |
| self.collection.delete_one({"_id": test_id}) | |
| return test_doc is not None | |
| except Exception as e: | |
| self.logger.error(f"Storage verification failed: {e}") | |
| return False | |
| def get_conversation_history(self, session_id: str, limit: int = 10) -> List[Dict]: | |
| """Retrieve conversation history for a session""" | |
| try: | |
| cursor = self.collection.find( | |
| {"session_id": session_id}, | |
| {"_id": 0} # Exclude _id field | |
| ).sort("timestamp", -1).limit(limit) | |
| return list(cursor) | |
| except Exception as e: | |
| self.logger.error(f"Error retrieving conversation history: {e}") | |
| return [] | |
| def clear_session_history(self, session_id: str) -> int: | |
| """Clear all conversations for a session""" | |
| try: | |
| result = self.collection.delete_many({"session_id": session_id}) | |
| return result.deleted_count | |
| except Exception as e: | |
| self.logger.error(f"Error clearing session history: {e}") | |
| return 0 |