| """MongoDB client module for play history, activity logs, and analytics.""" |
|
|
| import os |
| from datetime import datetime |
| from typing import Optional, List, Dict, Any |
| from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase |
| from pymongo import ASCENDING, DESCENDING |
|
|
|
|
| |
| MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017") |
| MONGO_DB_NAME = os.getenv("MONGO_DB_NAME", "music_memories") |
|
|
| |
| _mongo_client: Optional[AsyncIOMotorClient] = None |
| _mongo_db: Optional[AsyncIOMotorDatabase] = None |
|
|
|
|
| def get_mongo_client() -> Optional[AsyncIOMotorClient]: |
| """Get the MongoDB client instance.""" |
| return _mongo_client |
|
|
|
|
| def get_mongo_db() -> Optional[AsyncIOMotorDatabase]: |
| """Get the MongoDB database instance.""" |
| return _mongo_db |
|
|
|
|
| async def init_mongodb() -> None: |
| """Initialize MongoDB connection.""" |
| global _mongo_client, _mongo_db |
| try: |
| |
| _mongo_client = AsyncIOMotorClient( |
| MONGO_URI, |
| serverSelectionTimeoutMS=3000, |
| connectTimeoutMS=5000, |
| socketTimeoutMS=5000 |
| ) |
| _mongo_db = _mongo_client[MONGO_DB_NAME] |
|
|
| |
| await _mongo_client.admin.command('ping') |
| print(f"✓ Connected to MongoDB at {MONGO_URI}") |
|
|
| |
| await _mongo_db.play_history.create_index([("user_id", ASCENDING), ("played_at", DESCENDING)]) |
| await _mongo_db.play_history.create_index([("song_id", ASCENDING)]) |
| await _mongo_db.play_history.create_index([("played_at", DESCENDING)]) |
|
|
| |
| await _mongo_db.activity_logs.create_index([("user_id", ASCENDING), ("timestamp", DESCENDING)]) |
| await _mongo_db.activity_logs.create_index([("action_type", ASCENDING)]) |
| await _mongo_db.activity_logs.create_index([("timestamp", DESCENDING)]) |
|
|
| |
| await _mongo_db.analytics.create_index([("event_type", ASCENDING), ("timestamp", DESCENDING)]) |
| await _mongo_db.analytics.create_index([("timestamp", DESCENDING)]) |
|
|
| |
| await _mongo_db.search_history.create_index([("query", ASCENDING)]) |
| await _mongo_db.search_history.create_index([("timestamp", DESCENDING)]) |
|
|
| print("✓ MongoDB indexes created") |
| except Exception as e: |
| print(f"⚠ MongoDB connection failed: {e}") |
| print("⚠ Running without MongoDB (using Redis/SQLite fallback)") |
| _mongo_client = None |
| _mongo_db = None |
|
|
|
|
| async def close_mongodb() -> None: |
| """Close MongoDB connection.""" |
| global _mongo_client, _mongo_db |
| if _mongo_client: |
| _mongo_client.close() |
| _mongo_client = None |
| _mongo_db = None |
|
|
|
|
| |
|
|
| async def store_play_history_mongo( |
| user_id: int, |
| song_id: int, |
| song_title: str = None, |
| song_artist: str = None, |
| context_id: int = None, |
| duration_seconds: int = None, |
| completed: bool = True |
| ) -> Dict[str, Any]: |
| """Store a play history event in MongoDB.""" |
| if _mongo_db is None: |
| return None |
| |
| document = { |
| "user_id": user_id, |
| "song_id": song_id, |
| "song_title": song_title, |
| "song_artist": song_artist, |
| "context_id": context_id, |
| "duration_seconds": duration_seconds, |
| "completed": completed, |
| "played_at": datetime.utcnow(), |
| "created_at": datetime.utcnow() |
| } |
| |
| result = await _mongo_db.play_history.insert_one(document) |
| return { |
| "id": str(result.inserted_id), |
| **document |
| } |
|
|
|
|
| async def get_user_play_history_mongo( |
| user_id: int, |
| limit: int = 50 |
| ) -> List[Dict[str, Any]]: |
| """Get play history for a user from MongoDB.""" |
| if _mongo_db is None: |
| return [] |
| |
| cursor = _mongo_db.play_history.find( |
| {"user_id": user_id} |
| ).sort("played_at", DESCENDING).limit(limit) |
| |
| results = [] |
| async for doc in cursor: |
| results.append({ |
| "id": str(doc["_id"]), |
| "user_id": doc["user_id"], |
| "song_id": doc["song_id"], |
| "song_title": doc.get("song_title"), |
| "song_artist": doc.get("song_artist"), |
| "context_id": doc.get("context_id"), |
| "duration_seconds": doc.get("duration_seconds"), |
| "completed": doc.get("completed", True), |
| "played_at": doc["played_at"].isoformat() if doc.get("played_at") else None |
| }) |
| |
| return results |
|
|
|
|
| async def get_global_play_history_mongo( |
| limit: int = 50 |
| ) -> List[Dict[str, Any]]: |
| """Get global play history from MongoDB.""" |
| if _mongo_db is None: |
| return [] |
| |
| cursor = _mongo_db.play_history.find({}).sort("played_at", DESCENDING).limit(limit) |
| |
| results = [] |
| async for doc in cursor: |
| results.append({ |
| "id": str(doc["_id"]), |
| "user_id": doc["user_id"], |
| "song_id": doc["song_id"], |
| "song_title": doc.get("song_title"), |
| "song_artist": doc.get("song_artist"), |
| "context_id": doc.get("context_id"), |
| "duration_seconds": doc.get("duration_seconds"), |
| "completed": doc.get("completed", True), |
| "played_at": doc["played_at"].isoformat() if doc.get("played_at") else None |
| }) |
| |
| return results |
|
|
|
|
| async def get_song_play_count_mongo(song_id: int) -> int: |
| """Get total play count for a song.""" |
| if _mongo_db is None: |
| return 0 |
| |
| count = await _mongo_db.play_history.count_documents({"song_id": song_id}) |
| return count |
|
|
|
|
| async def get_top_songs_mongo(limit: int = 10) -> List[Dict[str, Any]]: |
| """Get top played songs from MongoDB analytics.""" |
| if _mongo_db is None: |
| return [] |
| |
| pipeline = [ |
| {"$group": { |
| "_id": "$song_id", |
| "play_count": {"$sum": 1}, |
| "song_title": {"$first": "$song_title"}, |
| "song_artist": {"$first": "$song_artist"} |
| }}, |
| {"$sort": {"play_count": -1}}, |
| {"$limit": limit} |
| ] |
| |
| results = [] |
| async for doc in _mongo_db.play_history.aggregate(pipeline): |
| results.append({ |
| "song_id": doc["_id"], |
| "song_title": doc.get("song_title"), |
| "song_artist": doc.get("song_artist"), |
| "play_count": doc["play_count"] |
| }) |
| |
| return results |
|
|
|
|
| |
|
|
| async def log_user_activity( |
| user_id: int, |
| action_type: str, |
| details: Dict[str, Any] = None |
| ) -> Dict[str, Any]: |
| """Log a user activity event.""" |
| if _mongo_db is None: |
| return None |
| |
| document = { |
| "user_id": user_id, |
| "action_type": action_type, |
| "details": details or {}, |
| "timestamp": datetime.utcnow() |
| } |
| |
| result = await _mongo_db.activity_logs.insert_one(document) |
| return { |
| "id": str(result.inserted_id), |
| **document |
| } |
|
|
|
|
| async def get_user_activity_mongo( |
| user_id: int, |
| limit: int = 50, |
| action_type: str = None |
| ) -> List[Dict[str, Any]]: |
| """Get activity logs for a user.""" |
| if _mongo_db is None: |
| return [] |
| |
| query = {"user_id": user_id} |
| if action_type: |
| query["action_type"] = action_type |
| |
| cursor = _mongo_db.activity_logs.find(query).sort("timestamp", DESCENDING).limit(limit) |
| |
| results = [] |
| async for doc in cursor: |
| results.append({ |
| "id": str(doc["_id"]), |
| "user_id": doc["user_id"], |
| "action_type": doc["action_type"], |
| "details": doc.get("details", {}), |
| "timestamp": doc["timestamp"].isoformat() if doc.get("timestamp") else None |
| }) |
| |
| return results |
|
|
|
|
| |
|
|
| async def track_analytics_event( |
| event_type: str, |
| properties: Dict[str, Any] = None |
| ) -> Dict[str, Any]: |
| """Track an analytics event.""" |
| if _mongo_db is None: |
| return None |
| |
| document = { |
| "event_type": event_type, |
| "properties": properties or {}, |
| "timestamp": datetime.utcnow() |
| } |
| |
| result = await _mongo_db.analytics.insert_one(document) |
| return { |
| "id": str(result.inserted_id), |
| **document |
| } |
|
|
|
|
| async def get_analytics_summary( |
| event_type: str = None, |
| hours: int = 24 |
| ) -> Dict[str, Any]: |
| """Get analytics summary for the last N hours.""" |
| if _mongo_db is None: |
| return {} |
| |
| from datetime import timedelta |
| since = datetime.utcnow() - timedelta(hours=hours) |
| |
| query = {"timestamp": {"$gte": since}} |
| if event_type: |
| query["event_type"] = event_type |
| |
| total_events = await _mongo_db.analytics.count_documents(query) |
| |
| |
| pipeline = [ |
| {"$match": query}, |
| {"$group": { |
| "_id": "$event_type", |
| "count": {"$sum": 1} |
| }}, |
| {"$sort": {"count": -1}} |
| ] |
| |
| breakdown = [] |
| async for doc in _mongo_db.analytics.aggregate(pipeline): |
| breakdown.append({ |
| "event_type": doc["_id"], |
| "count": doc["count"] |
| }) |
| |
| return { |
| "total_events": total_events, |
| "period_hours": hours, |
| "breakdown": breakdown |
| } |
|
|
|
|
| |
|
|
| async def log_search( |
| user_id: int = None, |
| search_type: str = None, |
| query: str = None, |
| results_count: int = 0 |
| ) -> Dict[str, Any]: |
| """Log a search event.""" |
| if _mongo_db is None: |
| return None |
| |
| document = { |
| "user_id": user_id, |
| "search_type": search_type, |
| "query": query, |
| "results_count": results_count, |
| "timestamp": datetime.utcnow() |
| } |
| |
| result = await _mongo_db.search_history.insert_one(document) |
| return { |
| "id": str(result.inserted_id), |
| **document |
| } |
|
|
|
|
| async def get_popular_searches( |
| hours: int = 24, |
| limit: int = 10 |
| ) -> List[Dict[str, Any]]: |
| """Get popular search queries.""" |
| if _mongo_db is None: |
| return [] |
| |
| from datetime import timedelta |
| since = datetime.utcnow() - timedelta(hours=hours) |
| |
| pipeline = [ |
| {"$match": {"timestamp": {"$gte": since}}}, |
| {"$group": { |
| "_id": "$query", |
| "count": {"$sum": 1}, |
| "search_type": {"$first": "$search_type"} |
| }}, |
| {"$sort": {"count": -1}}, |
| {"$limit": limit} |
| ] |
| |
| results = [] |
| async for doc in _mongo_db.search_history.aggregate(pipeline): |
| results.append({ |
| "query": doc["_id"], |
| "search_type": doc.get("search_type"), |
| "count": doc["count"] |
| }) |
| |
| return results |
|
|