"""MongoDB client module for play history, activity logs, and analytics.""" import os from datetime import datetime from typing import Optional, List, Dict, Any from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase from pymongo import ASCENDING, DESCENDING # MongoDB configuration from environment variables MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017") MONGO_DB_NAME = os.getenv("MONGO_DB_NAME", "music_memories") # Global MongoDB client and database instances _mongo_client: Optional[AsyncIOMotorClient] = None _mongo_db: Optional[AsyncIOMotorDatabase] = None def get_mongo_client() -> Optional[AsyncIOMotorClient]: """Get the MongoDB client instance.""" return _mongo_client def get_mongo_db() -> Optional[AsyncIOMotorDatabase]: """Get the MongoDB database instance.""" return _mongo_db async def init_mongodb() -> None: """Initialize MongoDB connection.""" global _mongo_client, _mongo_db try: # Use serverSelectionTimeoutMS to fail fast if MongoDB is not available _mongo_client = AsyncIOMotorClient( MONGO_URI, serverSelectionTimeoutMS=3000, connectTimeoutMS=5000, socketTimeoutMS=5000 ) _mongo_db = _mongo_client[MONGO_DB_NAME] # Test connection with ping await _mongo_client.admin.command('ping') print(f"✓ Connected to MongoDB at {MONGO_URI}") # Create indexes for play_history collection await _mongo_db.play_history.create_index([("user_id", ASCENDING), ("played_at", DESCENDING)]) await _mongo_db.play_history.create_index([("song_id", ASCENDING)]) await _mongo_db.play_history.create_index([("played_at", DESCENDING)]) # Create indexes for activity_logs collection await _mongo_db.activity_logs.create_index([("user_id", ASCENDING), ("timestamp", DESCENDING)]) await _mongo_db.activity_logs.create_index([("action_type", ASCENDING)]) await _mongo_db.activity_logs.create_index([("timestamp", DESCENDING)]) # Create indexes for analytics collection await _mongo_db.analytics.create_index([("event_type", ASCENDING), ("timestamp", DESCENDING)]) await _mongo_db.analytics.create_index([("timestamp", DESCENDING)]) # Create indexes for search_history collection await _mongo_db.search_history.create_index([("query", ASCENDING)]) await _mongo_db.search_history.create_index([("timestamp", DESCENDING)]) print("✓ MongoDB indexes created") except Exception as e: print(f"⚠ MongoDB connection failed: {e}") print("⚠ Running without MongoDB (using Redis/SQLite fallback)") _mongo_client = None _mongo_db = None async def close_mongodb() -> None: """Close MongoDB connection.""" global _mongo_client, _mongo_db if _mongo_client: _mongo_client.close() _mongo_client = None _mongo_db = None # ============== Play History (MongoDB) ============== async def store_play_history_mongo( user_id: int, song_id: int, song_title: str = None, song_artist: str = None, context_id: int = None, duration_seconds: int = None, completed: bool = True ) -> Dict[str, Any]: """Store a play history event in MongoDB.""" if _mongo_db is None: return None document = { "user_id": user_id, "song_id": song_id, "song_title": song_title, "song_artist": song_artist, "context_id": context_id, "duration_seconds": duration_seconds, "completed": completed, "played_at": datetime.utcnow(), "created_at": datetime.utcnow() } result = await _mongo_db.play_history.insert_one(document) return { "id": str(result.inserted_id), **document } async def get_user_play_history_mongo( user_id: int, limit: int = 50 ) -> List[Dict[str, Any]]: """Get play history for a user from MongoDB.""" if _mongo_db is None: return [] cursor = _mongo_db.play_history.find( {"user_id": user_id} ).sort("played_at", DESCENDING).limit(limit) results = [] async for doc in cursor: results.append({ "id": str(doc["_id"]), "user_id": doc["user_id"], "song_id": doc["song_id"], "song_title": doc.get("song_title"), "song_artist": doc.get("song_artist"), "context_id": doc.get("context_id"), "duration_seconds": doc.get("duration_seconds"), "completed": doc.get("completed", True), "played_at": doc["played_at"].isoformat() if doc.get("played_at") else None }) return results async def get_global_play_history_mongo( limit: int = 50 ) -> List[Dict[str, Any]]: """Get global play history from MongoDB.""" if _mongo_db is None: return [] cursor = _mongo_db.play_history.find({}).sort("played_at", DESCENDING).limit(limit) results = [] async for doc in cursor: results.append({ "id": str(doc["_id"]), "user_id": doc["user_id"], "song_id": doc["song_id"], "song_title": doc.get("song_title"), "song_artist": doc.get("song_artist"), "context_id": doc.get("context_id"), "duration_seconds": doc.get("duration_seconds"), "completed": doc.get("completed", True), "played_at": doc["played_at"].isoformat() if doc.get("played_at") else None }) return results async def get_song_play_count_mongo(song_id: int) -> int: """Get total play count for a song.""" if _mongo_db is None: return 0 count = await _mongo_db.play_history.count_documents({"song_id": song_id}) return count async def get_top_songs_mongo(limit: int = 10) -> List[Dict[str, Any]]: """Get top played songs from MongoDB analytics.""" if _mongo_db is None: return [] pipeline = [ {"$group": { "_id": "$song_id", "play_count": {"$sum": 1}, "song_title": {"$first": "$song_title"}, "song_artist": {"$first": "$song_artist"} }}, {"$sort": {"play_count": -1}}, {"$limit": limit} ] results = [] async for doc in _mongo_db.play_history.aggregate(pipeline): results.append({ "song_id": doc["_id"], "song_title": doc.get("song_title"), "song_artist": doc.get("song_artist"), "play_count": doc["play_count"] }) return results # ============== Activity Logs (MongoDB) ============== async def log_user_activity( user_id: int, action_type: str, details: Dict[str, Any] = None ) -> Dict[str, Any]: """Log a user activity event.""" if _mongo_db is None: return None document = { "user_id": user_id, "action_type": action_type, "details": details or {}, "timestamp": datetime.utcnow() } result = await _mongo_db.activity_logs.insert_one(document) return { "id": str(result.inserted_id), **document } async def get_user_activity_mongo( user_id: int, limit: int = 50, action_type: str = None ) -> List[Dict[str, Any]]: """Get activity logs for a user.""" if _mongo_db is None: return [] query = {"user_id": user_id} if action_type: query["action_type"] = action_type cursor = _mongo_db.activity_logs.find(query).sort("timestamp", DESCENDING).limit(limit) results = [] async for doc in cursor: results.append({ "id": str(doc["_id"]), "user_id": doc["user_id"], "action_type": doc["action_type"], "details": doc.get("details", {}), "timestamp": doc["timestamp"].isoformat() if doc.get("timestamp") else None }) return results # ============== Analytics Events (MongoDB) ============== async def track_analytics_event( event_type: str, properties: Dict[str, Any] = None ) -> Dict[str, Any]: """Track an analytics event.""" if _mongo_db is None: return None document = { "event_type": event_type, "properties": properties or {}, "timestamp": datetime.utcnow() } result = await _mongo_db.analytics.insert_one(document) return { "id": str(result.inserted_id), **document } async def get_analytics_summary( event_type: str = None, hours: int = 24 ) -> Dict[str, Any]: """Get analytics summary for the last N hours.""" if _mongo_db is None: return {} from datetime import timedelta since = datetime.utcnow() - timedelta(hours=hours) query = {"timestamp": {"$gte": since}} if event_type: query["event_type"] = event_type total_events = await _mongo_db.analytics.count_documents(query) # Get event type breakdown pipeline = [ {"$match": query}, {"$group": { "_id": "$event_type", "count": {"$sum": 1} }}, {"$sort": {"count": -1}} ] breakdown = [] async for doc in _mongo_db.analytics.aggregate(pipeline): breakdown.append({ "event_type": doc["_id"], "count": doc["count"] }) return { "total_events": total_events, "period_hours": hours, "breakdown": breakdown } # ============== User Search History (MongoDB) ============== async def log_search( user_id: int = None, search_type: str = None, query: str = None, results_count: int = 0 ) -> Dict[str, Any]: """Log a search event.""" if _mongo_db is None: return None document = { "user_id": user_id, "search_type": search_type, "query": query, "results_count": results_count, "timestamp": datetime.utcnow() } result = await _mongo_db.search_history.insert_one(document) return { "id": str(result.inserted_id), **document } async def get_popular_searches( hours: int = 24, limit: int = 10 ) -> List[Dict[str, Any]]: """Get popular search queries.""" if _mongo_db is None: return [] from datetime import timedelta since = datetime.utcnow() - timedelta(hours=hours) pipeline = [ {"$match": {"timestamp": {"$gte": since}}}, {"$group": { "_id": "$query", "count": {"$sum": 1}, "search_type": {"$first": "$search_type"} }}, {"$sort": {"count": -1}}, {"$limit": limit} ] results = [] async for doc in _mongo_db.search_history.aggregate(pipeline): results.append({ "query": doc["_id"], "search_type": doc.get("search_type"), "count": doc["count"] }) return results