Spaces:
Sleeping
Sleeping
| """ | |
| MongoDB service for telemetry event store. | |
| Implements append-only event logging for analytics. | |
| """ | |
| import os | |
| import logging | |
| from datetime import datetime, timezone | |
| from typing import Optional, Dict, Any | |
| from pymongo import MongoClient, ASCENDING, IndexModel | |
| from pymongo.collection import Collection | |
| from pymongo.errors import PyMongoError | |
| logger = logging.getLogger(__name__) | |
| class MongoDBService: | |
| """Singleton MongoDB connection for telemetry event storage.""" | |
| _instance = None | |
| _client: Optional[MongoClient] = None | |
| _db = None | |
| _events_collection: Optional[Collection] = None | |
| # Supported event types | |
| EVENT_TYPES = { | |
| "SESSION_METADATA", | |
| "DASHBOARD_VIEW", | |
| "ANALYSIS_REQUEST", | |
| "TASK_QUEUED", | |
| "TASK_COMPLETED", | |
| "RATE_LIMIT_HIT" | |
| } | |
| def __new__(cls): | |
| """Ensure singleton pattern.""" | |
| if cls._instance is None: | |
| cls._instance = super(MongoDBService, cls).__new__(cls) | |
| return cls._instance | |
| def __init__(self): | |
| """Initialize MongoDB connection if not already done.""" | |
| if self._client is None: | |
| self._connect() | |
| def _connect(self): | |
| """Establish MongoDB connection and set up collections.""" | |
| try: | |
| mongo_uri = os.getenv("MONGO_URI") | |
| if not mongo_uri: | |
| logger.warning("MONGO_URI not set. MongoDB telemetry disabled.") | |
| return | |
| self._client = MongoClient( | |
| mongo_uri, | |
| serverSelectionTimeoutMS=5000, | |
| connectTimeoutMS=5000 | |
| ) | |
| # Test connection | |
| self._client.admin.command('ping') | |
| # Get database (extract from URI or use default) | |
| self._db = self._client.get_default_database() | |
| self._events_collection = self._db.events | |
| # Create indexes | |
| self._create_indexes() | |
| logger.info("MongoDB connection established successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to MongoDB: {str(e)}") | |
| self._client = None | |
| def _create_indexes(self): | |
| """Create indexes for events collection.""" | |
| if self._events_collection is None: | |
| return | |
| try: | |
| indexes = [ | |
| IndexModel([("created_at", ASCENDING)]), | |
| IndexModel([("device_id", ASCENDING)]), | |
| IndexModel([("user_id", ASCENDING)]), | |
| IndexModel([("event_type", ASCENDING)]), | |
| IndexModel([("created_at", ASCENDING), ("event_type", ASCENDING)]), | |
| IndexModel([("device_id", ASCENDING), ("event_type", ASCENDING)]) | |
| ] | |
| self._events_collection.create_indexes(indexes) | |
| logger.info("MongoDB indexes created successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to create indexes: {str(e)}") | |
| def log_event( | |
| self, | |
| event_type: str, | |
| device_id: str, | |
| user_id: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> bool: | |
| """ | |
| Log an event to MongoDB (append-only). | |
| Args: | |
| event_type: Type of event (must be in EVENT_TYPES) | |
| device_id: Device identifier | |
| user_id: Optional user identifier | |
| metadata: Optional additional metadata (e.g., IP, location for SESSION_METADATA) | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if self._events_collection is None: | |
| logger.debug("MongoDB not available, skipping event log") | |
| return False | |
| if event_type not in self.EVENT_TYPES: | |
| logger.warning(f"Invalid event_type: {event_type}") | |
| return False | |
| try: | |
| event = { | |
| "user_id": user_id, | |
| "device_id": device_id, | |
| "event_type": event_type, | |
| "created_at": datetime.now(timezone.utc) | |
| } | |
| # Add metadata if provided (for SESSION_METADATA events) | |
| if metadata: | |
| event["metadata"] = metadata | |
| self._events_collection.insert_one(event) | |
| logger.debug(f"Logged event: {event_type} for device: {device_id}") | |
| return True | |
| except PyMongoError as e: | |
| logger.error(f"Failed to log event: {str(e)}") | |
| return False | |
| def get_events( | |
| self, | |
| event_type: Optional[str] = None, | |
| device_id: Optional[str] = None, | |
| user_id: Optional[str] = None, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None, | |
| limit: int = 1000 | |
| ) -> list: | |
| """ | |
| Query events from MongoDB (for admin analytics). | |
| Args: | |
| event_type: Filter by event type | |
| device_id: Filter by device | |
| user_id: Filter by user | |
| start_date: Filter by start date | |
| end_date: Filter by end date | |
| limit: Maximum number of events to return | |
| Returns: | |
| List of events | |
| """ | |
| if self._events_collection is None: | |
| return [] | |
| try: | |
| query = {} | |
| if event_type: | |
| query["event_type"] = event_type | |
| if device_id: | |
| query["device_id"] = device_id | |
| if user_id: | |
| query["user_id"] = user_id | |
| if start_date or end_date: | |
| query["created_at"] = {} | |
| if start_date: | |
| query["created_at"]["$gte"] = start_date | |
| if end_date: | |
| query["created_at"]["$lte"] = end_date | |
| events = list( | |
| self._events_collection | |
| .find(query) | |
| .sort("created_at", -1) | |
| .limit(limit) | |
| ) | |
| # Convert ObjectId to string for JSON serialization | |
| for event in events: | |
| event["_id"] = str(event["_id"]) | |
| return events | |
| except PyMongoError as e: | |
| logger.error(f"Failed to query events: {str(e)}") | |
| return [] | |
| def aggregate_events(self, pipeline: list) -> list: | |
| """ | |
| Run aggregation pipeline on events (for admin analytics). | |
| Args: | |
| pipeline: MongoDB aggregation pipeline | |
| Returns: | |
| Aggregation results | |
| """ | |
| if self._events_collection is None: | |
| return [] | |
| try: | |
| results = list(self._events_collection.aggregate(pipeline)) | |
| return results | |
| except PyMongoError as e: | |
| logger.error(f"Aggregation failed: {str(e)}") | |
| return [] | |
| def close(self): | |
| """Close MongoDB connection.""" | |
| if self._client: | |
| self._client.close() | |
| self._client = None | |
| logger.info("MongoDB connection closed") | |
| # Singleton instance | |
| _mongodb_service = None | |
| def get_mongodb_service() -> MongoDBService: | |
| """Get MongoDB service singleton.""" | |
| global _mongodb_service | |
| if _mongodb_service is None: | |
| _mongodb_service = MongoDBService() | |
| return _mongodb_service | |