""" MongoDB service for telemetry event store. Implements append-only event logging for analytics. """ import os import logging from datetime import datetime, timezone from typing import Optional, Dict, Any from pymongo import MongoClient, ASCENDING, IndexModel from pymongo.collection import Collection from pymongo.errors import PyMongoError logger = logging.getLogger(__name__) class MongoDBService: """Singleton MongoDB connection for telemetry event storage.""" _instance = None _client: Optional[MongoClient] = None _db = None _events_collection: Optional[Collection] = None # Supported event types EVENT_TYPES = { "SESSION_METADATA", "DASHBOARD_VIEW", "ANALYSIS_REQUEST", "TASK_QUEUED", "TASK_COMPLETED", "RATE_LIMIT_HIT" } def __new__(cls): """Ensure singleton pattern.""" if cls._instance is None: cls._instance = super(MongoDBService, cls).__new__(cls) return cls._instance def __init__(self): """Initialize MongoDB connection if not already done.""" if self._client is None: self._connect() def _connect(self): """Establish MongoDB connection and set up collections.""" try: mongo_uri = os.getenv("MONGO_URI") if not mongo_uri: logger.warning("MONGO_URI not set. MongoDB telemetry disabled.") return self._client = MongoClient( mongo_uri, serverSelectionTimeoutMS=5000, connectTimeoutMS=5000 ) # Test connection self._client.admin.command('ping') # Get database (extract from URI or use default) self._db = self._client.get_default_database() self._events_collection = self._db.events # Create indexes self._create_indexes() logger.info("MongoDB connection established successfully") except Exception as e: logger.error(f"Failed to connect to MongoDB: {str(e)}") self._client = None def _create_indexes(self): """Create indexes for events collection.""" if self._events_collection is None: return try: indexes = [ IndexModel([("created_at", ASCENDING)]), IndexModel([("device_id", ASCENDING)]), IndexModel([("user_id", ASCENDING)]), IndexModel([("event_type", ASCENDING)]), IndexModel([("created_at", ASCENDING), ("event_type", ASCENDING)]), IndexModel([("device_id", ASCENDING), ("event_type", ASCENDING)]) ] self._events_collection.create_indexes(indexes) logger.info("MongoDB indexes created successfully") except Exception as e: logger.error(f"Failed to create indexes: {str(e)}") def log_event( self, event_type: str, device_id: str, user_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> bool: """ Log an event to MongoDB (append-only). Args: event_type: Type of event (must be in EVENT_TYPES) device_id: Device identifier user_id: Optional user identifier metadata: Optional additional metadata (e.g., IP, location for SESSION_METADATA) Returns: True if successful, False otherwise """ if self._events_collection is None: logger.debug("MongoDB not available, skipping event log") return False if event_type not in self.EVENT_TYPES: logger.warning(f"Invalid event_type: {event_type}") return False try: event = { "user_id": user_id, "device_id": device_id, "event_type": event_type, "created_at": datetime.now(timezone.utc) } # Add metadata if provided (for SESSION_METADATA events) if metadata: event["metadata"] = metadata self._events_collection.insert_one(event) logger.debug(f"Logged event: {event_type} for device: {device_id}") return True except PyMongoError as e: logger.error(f"Failed to log event: {str(e)}") return False def get_events( self, event_type: Optional[str] = None, device_id: Optional[str] = None, user_id: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, limit: int = 1000 ) -> list: """ Query events from MongoDB (for admin analytics). Args: event_type: Filter by event type device_id: Filter by device user_id: Filter by user start_date: Filter by start date end_date: Filter by end date limit: Maximum number of events to return Returns: List of events """ if self._events_collection is None: return [] try: query = {} if event_type: query["event_type"] = event_type if device_id: query["device_id"] = device_id if user_id: query["user_id"] = user_id if start_date or end_date: query["created_at"] = {} if start_date: query["created_at"]["$gte"] = start_date if end_date: query["created_at"]["$lte"] = end_date events = list( self._events_collection .find(query) .sort("created_at", -1) .limit(limit) ) # Convert ObjectId to string for JSON serialization for event in events: event["_id"] = str(event["_id"]) return events except PyMongoError as e: logger.error(f"Failed to query events: {str(e)}") return [] def aggregate_events(self, pipeline: list) -> list: """ Run aggregation pipeline on events (for admin analytics). Args: pipeline: MongoDB aggregation pipeline Returns: Aggregation results """ if self._events_collection is None: return [] try: results = list(self._events_collection.aggregate(pipeline)) return results except PyMongoError as e: logger.error(f"Aggregation failed: {str(e)}") return [] def close(self): """Close MongoDB connection.""" if self._client: self._client.close() self._client = None logger.info("MongoDB connection closed") # Singleton instance _mongodb_service = None def get_mongodb_service() -> MongoDBService: """Get MongoDB service singleton.""" global _mongodb_service if _mongodb_service is None: _mongodb_service = MongoDBService() return _mongodb_service