ABSA / src /utils /mongodb_service.py
parthnuwal7's picture
Adding Mongo+Redis concept
4b62d23
"""
MongoDB service for telemetry event store.
Implements append-only event logging for analytics.
"""
import os
import logging
from datetime import datetime, timezone
from typing import Optional, Dict, Any
from pymongo import MongoClient, ASCENDING, IndexModel
from pymongo.collection import Collection
from pymongo.errors import PyMongoError
logger = logging.getLogger(__name__)
class MongoDBService:
"""Singleton MongoDB connection for telemetry event storage."""
_instance = None
_client: Optional[MongoClient] = None
_db = None
_events_collection: Optional[Collection] = None
# Supported event types
EVENT_TYPES = {
"SESSION_METADATA",
"DASHBOARD_VIEW",
"ANALYSIS_REQUEST",
"TASK_QUEUED",
"TASK_COMPLETED",
"RATE_LIMIT_HIT"
}
def __new__(cls):
"""Ensure singleton pattern."""
if cls._instance is None:
cls._instance = super(MongoDBService, cls).__new__(cls)
return cls._instance
def __init__(self):
"""Initialize MongoDB connection if not already done."""
if self._client is None:
self._connect()
def _connect(self):
"""Establish MongoDB connection and set up collections."""
try:
mongo_uri = os.getenv("MONGO_URI")
if not mongo_uri:
logger.warning("MONGO_URI not set. MongoDB telemetry disabled.")
return
self._client = MongoClient(
mongo_uri,
serverSelectionTimeoutMS=5000,
connectTimeoutMS=5000
)
# Test connection
self._client.admin.command('ping')
# Get database (extract from URI or use default)
self._db = self._client.get_default_database()
self._events_collection = self._db.events
# Create indexes
self._create_indexes()
logger.info("MongoDB connection established successfully")
except Exception as e:
logger.error(f"Failed to connect to MongoDB: {str(e)}")
self._client = None
def _create_indexes(self):
"""Create indexes for events collection."""
if self._events_collection is None:
return
try:
indexes = [
IndexModel([("created_at", ASCENDING)]),
IndexModel([("device_id", ASCENDING)]),
IndexModel([("user_id", ASCENDING)]),
IndexModel([("event_type", ASCENDING)]),
IndexModel([("created_at", ASCENDING), ("event_type", ASCENDING)]),
IndexModel([("device_id", ASCENDING), ("event_type", ASCENDING)])
]
self._events_collection.create_indexes(indexes)
logger.info("MongoDB indexes created successfully")
except Exception as e:
logger.error(f"Failed to create indexes: {str(e)}")
def log_event(
self,
event_type: str,
device_id: str,
user_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> bool:
"""
Log an event to MongoDB (append-only).
Args:
event_type: Type of event (must be in EVENT_TYPES)
device_id: Device identifier
user_id: Optional user identifier
metadata: Optional additional metadata (e.g., IP, location for SESSION_METADATA)
Returns:
True if successful, False otherwise
"""
if self._events_collection is None:
logger.debug("MongoDB not available, skipping event log")
return False
if event_type not in self.EVENT_TYPES:
logger.warning(f"Invalid event_type: {event_type}")
return False
try:
event = {
"user_id": user_id,
"device_id": device_id,
"event_type": event_type,
"created_at": datetime.now(timezone.utc)
}
# Add metadata if provided (for SESSION_METADATA events)
if metadata:
event["metadata"] = metadata
self._events_collection.insert_one(event)
logger.debug(f"Logged event: {event_type} for device: {device_id}")
return True
except PyMongoError as e:
logger.error(f"Failed to log event: {str(e)}")
return False
def get_events(
self,
event_type: Optional[str] = None,
device_id: Optional[str] = None,
user_id: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: int = 1000
) -> list:
"""
Query events from MongoDB (for admin analytics).
Args:
event_type: Filter by event type
device_id: Filter by device
user_id: Filter by user
start_date: Filter by start date
end_date: Filter by end date
limit: Maximum number of events to return
Returns:
List of events
"""
if self._events_collection is None:
return []
try:
query = {}
if event_type:
query["event_type"] = event_type
if device_id:
query["device_id"] = device_id
if user_id:
query["user_id"] = user_id
if start_date or end_date:
query["created_at"] = {}
if start_date:
query["created_at"]["$gte"] = start_date
if end_date:
query["created_at"]["$lte"] = end_date
events = list(
self._events_collection
.find(query)
.sort("created_at", -1)
.limit(limit)
)
# Convert ObjectId to string for JSON serialization
for event in events:
event["_id"] = str(event["_id"])
return events
except PyMongoError as e:
logger.error(f"Failed to query events: {str(e)}")
return []
def aggregate_events(self, pipeline: list) -> list:
"""
Run aggregation pipeline on events (for admin analytics).
Args:
pipeline: MongoDB aggregation pipeline
Returns:
Aggregation results
"""
if self._events_collection is None:
return []
try:
results = list(self._events_collection.aggregate(pipeline))
return results
except PyMongoError as e:
logger.error(f"Aggregation failed: {str(e)}")
return []
def close(self):
"""Close MongoDB connection."""
if self._client:
self._client.close()
self._client = None
logger.info("MongoDB connection closed")
# Singleton instance
_mongodb_service = None
def get_mongodb_service() -> MongoDBService:
"""Get MongoDB service singleton."""
global _mongodb_service
if _mongodb_service is None:
_mongodb_service = MongoDBService()
return _mongodb_service