Spaces:
Sleeping
Sleeping
File size: 4,961 Bytes
e272f4f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
from pymongo import MongoClient
from pymongo.errors import ServerSelectionTimeoutError, AutoReconnect
import logging
from datetime import datetime
from app.config import Config
from typing import List, Dict, Optional
class MongoDBHandler:
def __init__(self):
self.logger = logging.getLogger(__name__)
self._ensure_mongodb_running()
try:
self.client = MongoClient(
Config.MONGO_URI,
serverSelectionTimeoutMS=5000
)
# Test connection immediately
self.client.server_info()
self.db = self.client[Config.DATABASE_NAME]
self.collection = self.db[Config.HISTORY_COLLECTION]
self._create_indexes()
self.logger.info("MongoDB connection established successfully")
except (ServerSelectionTimeoutError, AutoReconnect) as e:
self.logger.error(f"MongoDB connection failed: {e}")
self._diagnose_connection_issue()
raise
def _ensure_mongodb_running(self):
"""Ensure MongoDB service is running"""
try:
import subprocess
result = subprocess.run(
['net', 'start', 'MongoDB'],
capture_output=True,
text=True
)
if "already running" not in result.stderr.lower():
self.logger.info("Started MongoDB service")
except Exception as e:
self.logger.error(f"Failed to start MongoDB: {e}")
def _diagnose_connection_issue(self):
"""Diagnose common MongoDB connection issues"""
import os
issues = []
# Check data directory
if not os.path.exists("C:\\data\\db"):
issues.append("Data directory missing")
# Check log directory
if not os.path.exists("C:\\data\\log"):
issues.append("Log directory missing")
# Check service status
try:
import subprocess
result = subprocess.run(
['sc', 'query', 'MongoDB'],
capture_output=True,
text=True
)
if "RUNNING" not in result.stdout:
issues.append("MongoDB service not running")
except Exception:
issues.append("Could not check service status")
if issues:
self.logger.error("MongoDB Issues Found:")
for issue in issues:
self.logger.error(f" - {issue}")
def _create_indexes(self):
"""Create indexes for better query performance"""
self.collection.create_index([("session_id", 1)])
self.collection.create_index([("timestamp", -1)])
self.collection.create_index([("session_id", 1), ("timestamp", -1)])
def save_conversation(self, session_id: str, query: str, response: str, metadata: dict = None) -> str:
"""Save a conversation with automatic timestamp"""
conversation = {
"session_id": session_id,
"user_query": query,
"bot_response": response,
"timestamp": datetime.utcnow(),
"metadata": metadata or {}
}
result = self.collection.insert_one(conversation)
return str(result.inserted_id)
def verify_storage(self) -> bool:
"""Verify storage is working by inserting and retrieving a test document"""
try:
# Insert test document
test_id = self.save_conversation(
session_id="test",
query="test_query",
response="test_response",
metadata={"test": True}
)
# Verify retrieval
test_doc = self.collection.find_one({"_id": test_id})
# Cleanup test document
self.collection.delete_one({"_id": test_id})
return test_doc is not None
except Exception as e:
self.logger.error(f"Storage verification failed: {e}")
return False
def get_conversation_history(self, session_id: str, limit: int = 10) -> List[Dict]:
"""Retrieve conversation history for a session"""
try:
cursor = self.collection.find(
{"session_id": session_id},
{"_id": 0} # Exclude _id field
).sort("timestamp", -1).limit(limit)
return list(cursor)
except Exception as e:
self.logger.error(f"Error retrieving conversation history: {e}")
return []
def clear_session_history(self, session_id: str) -> int:
"""Clear all conversations for a session"""
try:
result = self.collection.delete_many({"session_id": session_id})
return result.deleted_count
except Exception as e:
self.logger.error(f"Error clearing session history: {e}")
return 0 |