ConvoBot / src /databases /mongo_handler.py
ashish-ninehertz
changes
e272f4f
from pymongo import MongoClient
from pymongo.errors import ServerSelectionTimeoutError, AutoReconnect
import logging
from datetime import datetime
from app.config import Config
from typing import List, Dict, Optional
class MongoDBHandler:
def __init__(self):
self.logger = logging.getLogger(__name__)
self._ensure_mongodb_running()
try:
self.client = MongoClient(
Config.MONGO_URI,
serverSelectionTimeoutMS=5000
)
# Test connection immediately
self.client.server_info()
self.db = self.client[Config.DATABASE_NAME]
self.collection = self.db[Config.HISTORY_COLLECTION]
self._create_indexes()
self.logger.info("MongoDB connection established successfully")
except (ServerSelectionTimeoutError, AutoReconnect) as e:
self.logger.error(f"MongoDB connection failed: {e}")
self._diagnose_connection_issue()
raise
def _ensure_mongodb_running(self):
"""Ensure MongoDB service is running"""
try:
import subprocess
result = subprocess.run(
['net', 'start', 'MongoDB'],
capture_output=True,
text=True
)
if "already running" not in result.stderr.lower():
self.logger.info("Started MongoDB service")
except Exception as e:
self.logger.error(f"Failed to start MongoDB: {e}")
def _diagnose_connection_issue(self):
"""Diagnose common MongoDB connection issues"""
import os
issues = []
# Check data directory
if not os.path.exists("C:\\data\\db"):
issues.append("Data directory missing")
# Check log directory
if not os.path.exists("C:\\data\\log"):
issues.append("Log directory missing")
# Check service status
try:
import subprocess
result = subprocess.run(
['sc', 'query', 'MongoDB'],
capture_output=True,
text=True
)
if "RUNNING" not in result.stdout:
issues.append("MongoDB service not running")
except Exception:
issues.append("Could not check service status")
if issues:
self.logger.error("MongoDB Issues Found:")
for issue in issues:
self.logger.error(f" - {issue}")
def _create_indexes(self):
"""Create indexes for better query performance"""
self.collection.create_index([("session_id", 1)])
self.collection.create_index([("timestamp", -1)])
self.collection.create_index([("session_id", 1), ("timestamp", -1)])
def save_conversation(self, session_id: str, query: str, response: str, metadata: dict = None) -> str:
"""Save a conversation with automatic timestamp"""
conversation = {
"session_id": session_id,
"user_query": query,
"bot_response": response,
"timestamp": datetime.utcnow(),
"metadata": metadata or {}
}
result = self.collection.insert_one(conversation)
return str(result.inserted_id)
def verify_storage(self) -> bool:
"""Verify storage is working by inserting and retrieving a test document"""
try:
# Insert test document
test_id = self.save_conversation(
session_id="test",
query="test_query",
response="test_response",
metadata={"test": True}
)
# Verify retrieval
test_doc = self.collection.find_one({"_id": test_id})
# Cleanup test document
self.collection.delete_one({"_id": test_id})
return test_doc is not None
except Exception as e:
self.logger.error(f"Storage verification failed: {e}")
return False
def get_conversation_history(self, session_id: str, limit: int = 10) -> List[Dict]:
"""Retrieve conversation history for a session"""
try:
cursor = self.collection.find(
{"session_id": session_id},
{"_id": 0} # Exclude _id field
).sort("timestamp", -1).limit(limit)
return list(cursor)
except Exception as e:
self.logger.error(f"Error retrieving conversation history: {e}")
return []
def clear_session_history(self, session_id: str) -> int:
"""Clear all conversations for a session"""
try:
result = self.collection.delete_many({"session_id": session_id})
return result.deleted_count
except Exception as e:
self.logger.error(f"Error clearing session history: {e}")
return 0