Spaces:
Sleeping
Sleeping
File size: 7,542 Bytes
4b62d23 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 | """
MongoDB service for telemetry event store.
Implements append-only event logging for analytics.
"""
import os
import logging
from datetime import datetime, timezone
from typing import Optional, Dict, Any
from pymongo import MongoClient, ASCENDING, IndexModel
from pymongo.collection import Collection
from pymongo.errors import PyMongoError
logger = logging.getLogger(__name__)
class MongoDBService:
"""Singleton MongoDB connection for telemetry event storage."""
_instance = None
_client: Optional[MongoClient] = None
_db = None
_events_collection: Optional[Collection] = None
# Supported event types
EVENT_TYPES = {
"SESSION_METADATA",
"DASHBOARD_VIEW",
"ANALYSIS_REQUEST",
"TASK_QUEUED",
"TASK_COMPLETED",
"RATE_LIMIT_HIT"
}
def __new__(cls):
"""Ensure singleton pattern."""
if cls._instance is None:
cls._instance = super(MongoDBService, cls).__new__(cls)
return cls._instance
def __init__(self):
"""Initialize MongoDB connection if not already done."""
if self._client is None:
self._connect()
def _connect(self):
"""Establish MongoDB connection and set up collections."""
try:
mongo_uri = os.getenv("MONGO_URI")
if not mongo_uri:
logger.warning("MONGO_URI not set. MongoDB telemetry disabled.")
return
self._client = MongoClient(
mongo_uri,
serverSelectionTimeoutMS=5000,
connectTimeoutMS=5000
)
# Test connection
self._client.admin.command('ping')
# Get database (extract from URI or use default)
self._db = self._client.get_default_database()
self._events_collection = self._db.events
# Create indexes
self._create_indexes()
logger.info("MongoDB connection established successfully")
except Exception as e:
logger.error(f"Failed to connect to MongoDB: {str(e)}")
self._client = None
def _create_indexes(self):
"""Create indexes for events collection."""
if self._events_collection is None:
return
try:
indexes = [
IndexModel([("created_at", ASCENDING)]),
IndexModel([("device_id", ASCENDING)]),
IndexModel([("user_id", ASCENDING)]),
IndexModel([("event_type", ASCENDING)]),
IndexModel([("created_at", ASCENDING), ("event_type", ASCENDING)]),
IndexModel([("device_id", ASCENDING), ("event_type", ASCENDING)])
]
self._events_collection.create_indexes(indexes)
logger.info("MongoDB indexes created successfully")
except Exception as e:
logger.error(f"Failed to create indexes: {str(e)}")
def log_event(
self,
event_type: str,
device_id: str,
user_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> bool:
"""
Log an event to MongoDB (append-only).
Args:
event_type: Type of event (must be in EVENT_TYPES)
device_id: Device identifier
user_id: Optional user identifier
metadata: Optional additional metadata (e.g., IP, location for SESSION_METADATA)
Returns:
True if successful, False otherwise
"""
if self._events_collection is None:
logger.debug("MongoDB not available, skipping event log")
return False
if event_type not in self.EVENT_TYPES:
logger.warning(f"Invalid event_type: {event_type}")
return False
try:
event = {
"user_id": user_id,
"device_id": device_id,
"event_type": event_type,
"created_at": datetime.now(timezone.utc)
}
# Add metadata if provided (for SESSION_METADATA events)
if metadata:
event["metadata"] = metadata
self._events_collection.insert_one(event)
logger.debug(f"Logged event: {event_type} for device: {device_id}")
return True
except PyMongoError as e:
logger.error(f"Failed to log event: {str(e)}")
return False
def get_events(
self,
event_type: Optional[str] = None,
device_id: Optional[str] = None,
user_id: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: int = 1000
) -> list:
"""
Query events from MongoDB (for admin analytics).
Args:
event_type: Filter by event type
device_id: Filter by device
user_id: Filter by user
start_date: Filter by start date
end_date: Filter by end date
limit: Maximum number of events to return
Returns:
List of events
"""
if self._events_collection is None:
return []
try:
query = {}
if event_type:
query["event_type"] = event_type
if device_id:
query["device_id"] = device_id
if user_id:
query["user_id"] = user_id
if start_date or end_date:
query["created_at"] = {}
if start_date:
query["created_at"]["$gte"] = start_date
if end_date:
query["created_at"]["$lte"] = end_date
events = list(
self._events_collection
.find(query)
.sort("created_at", -1)
.limit(limit)
)
# Convert ObjectId to string for JSON serialization
for event in events:
event["_id"] = str(event["_id"])
return events
except PyMongoError as e:
logger.error(f"Failed to query events: {str(e)}")
return []
def aggregate_events(self, pipeline: list) -> list:
"""
Run aggregation pipeline on events (for admin analytics).
Args:
pipeline: MongoDB aggregation pipeline
Returns:
Aggregation results
"""
if self._events_collection is None:
return []
try:
results = list(self._events_collection.aggregate(pipeline))
return results
except PyMongoError as e:
logger.error(f"Aggregation failed: {str(e)}")
return []
def close(self):
"""Close MongoDB connection."""
if self._client:
self._client.close()
self._client = None
logger.info("MongoDB connection closed")
# Singleton instance
_mongodb_service = None
def get_mongodb_service() -> MongoDBService:
"""Get MongoDB service singleton."""
global _mongodb_service
if _mongodb_service is None:
_mongodb_service = MongoDBService()
return _mongodb_service
|