pool / mongo_client.py
Ubuntu
mongo fixes
e136d4a
Raw
History Blame Contribute Delete
11 kB
"""MongoDB client module for play history, activity logs, and analytics."""
import os
from datetime import datetime
from typing import Optional, List, Dict, Any
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from pymongo import ASCENDING, DESCENDING
# MongoDB configuration from environment variables
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
MONGO_DB_NAME = os.getenv("MONGO_DB_NAME", "music_memories")
# Global MongoDB client and database instances
_mongo_client: Optional[AsyncIOMotorClient] = None
_mongo_db: Optional[AsyncIOMotorDatabase] = None
def get_mongo_client() -> Optional[AsyncIOMotorClient]:
"""Get the MongoDB client instance."""
return _mongo_client
def get_mongo_db() -> Optional[AsyncIOMotorDatabase]:
"""Get the MongoDB database instance."""
return _mongo_db
async def init_mongodb() -> None:
"""Initialize MongoDB connection."""
global _mongo_client, _mongo_db
try:
# Use serverSelectionTimeoutMS to fail fast if MongoDB is not available
_mongo_client = AsyncIOMotorClient(
MONGO_URI,
serverSelectionTimeoutMS=3000,
connectTimeoutMS=5000,
socketTimeoutMS=5000
)
_mongo_db = _mongo_client[MONGO_DB_NAME]
# Test connection with ping
await _mongo_client.admin.command('ping')
print(f"✓ Connected to MongoDB at {MONGO_URI}")
# Create indexes for play_history collection
await _mongo_db.play_history.create_index([("user_id", ASCENDING), ("played_at", DESCENDING)])
await _mongo_db.play_history.create_index([("song_id", ASCENDING)])
await _mongo_db.play_history.create_index([("played_at", DESCENDING)])
# Create indexes for activity_logs collection
await _mongo_db.activity_logs.create_index([("user_id", ASCENDING), ("timestamp", DESCENDING)])
await _mongo_db.activity_logs.create_index([("action_type", ASCENDING)])
await _mongo_db.activity_logs.create_index([("timestamp", DESCENDING)])
# Create indexes for analytics collection
await _mongo_db.analytics.create_index([("event_type", ASCENDING), ("timestamp", DESCENDING)])
await _mongo_db.analytics.create_index([("timestamp", DESCENDING)])
# Create indexes for search_history collection
await _mongo_db.search_history.create_index([("query", ASCENDING)])
await _mongo_db.search_history.create_index([("timestamp", DESCENDING)])
print("✓ MongoDB indexes created")
except Exception as e:
print(f"⚠ MongoDB connection failed: {e}")
print("⚠ Running without MongoDB (using Redis/SQLite fallback)")
_mongo_client = None
_mongo_db = None
async def close_mongodb() -> None:
"""Close MongoDB connection."""
global _mongo_client, _mongo_db
if _mongo_client:
_mongo_client.close()
_mongo_client = None
_mongo_db = None
# ============== Play History (MongoDB) ==============
async def store_play_history_mongo(
user_id: int,
song_id: int,
song_title: str = None,
song_artist: str = None,
context_id: int = None,
duration_seconds: int = None,
completed: bool = True
) -> Dict[str, Any]:
"""Store a play history event in MongoDB."""
if _mongo_db is None:
return None
document = {
"user_id": user_id,
"song_id": song_id,
"song_title": song_title,
"song_artist": song_artist,
"context_id": context_id,
"duration_seconds": duration_seconds,
"completed": completed,
"played_at": datetime.utcnow(),
"created_at": datetime.utcnow()
}
result = await _mongo_db.play_history.insert_one(document)
return {
"id": str(result.inserted_id),
**document
}
async def get_user_play_history_mongo(
user_id: int,
limit: int = 50
) -> List[Dict[str, Any]]:
"""Get play history for a user from MongoDB."""
if _mongo_db is None:
return []
cursor = _mongo_db.play_history.find(
{"user_id": user_id}
).sort("played_at", DESCENDING).limit(limit)
results = []
async for doc in cursor:
results.append({
"id": str(doc["_id"]),
"user_id": doc["user_id"],
"song_id": doc["song_id"],
"song_title": doc.get("song_title"),
"song_artist": doc.get("song_artist"),
"context_id": doc.get("context_id"),
"duration_seconds": doc.get("duration_seconds"),
"completed": doc.get("completed", True),
"played_at": doc["played_at"].isoformat() if doc.get("played_at") else None
})
return results
async def get_global_play_history_mongo(
limit: int = 50
) -> List[Dict[str, Any]]:
"""Get global play history from MongoDB."""
if _mongo_db is None:
return []
cursor = _mongo_db.play_history.find({}).sort("played_at", DESCENDING).limit(limit)
results = []
async for doc in cursor:
results.append({
"id": str(doc["_id"]),
"user_id": doc["user_id"],
"song_id": doc["song_id"],
"song_title": doc.get("song_title"),
"song_artist": doc.get("song_artist"),
"context_id": doc.get("context_id"),
"duration_seconds": doc.get("duration_seconds"),
"completed": doc.get("completed", True),
"played_at": doc["played_at"].isoformat() if doc.get("played_at") else None
})
return results
async def get_song_play_count_mongo(song_id: int) -> int:
"""Get total play count for a song."""
if _mongo_db is None:
return 0
count = await _mongo_db.play_history.count_documents({"song_id": song_id})
return count
async def get_top_songs_mongo(limit: int = 10) -> List[Dict[str, Any]]:
"""Get top played songs from MongoDB analytics."""
if _mongo_db is None:
return []
pipeline = [
{"$group": {
"_id": "$song_id",
"play_count": {"$sum": 1},
"song_title": {"$first": "$song_title"},
"song_artist": {"$first": "$song_artist"}
}},
{"$sort": {"play_count": -1}},
{"$limit": limit}
]
results = []
async for doc in _mongo_db.play_history.aggregate(pipeline):
results.append({
"song_id": doc["_id"],
"song_title": doc.get("song_title"),
"song_artist": doc.get("song_artist"),
"play_count": doc["play_count"]
})
return results
# ============== Activity Logs (MongoDB) ==============
async def log_user_activity(
user_id: int,
action_type: str,
details: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Log a user activity event."""
if _mongo_db is None:
return None
document = {
"user_id": user_id,
"action_type": action_type,
"details": details or {},
"timestamp": datetime.utcnow()
}
result = await _mongo_db.activity_logs.insert_one(document)
return {
"id": str(result.inserted_id),
**document
}
async def get_user_activity_mongo(
user_id: int,
limit: int = 50,
action_type: str = None
) -> List[Dict[str, Any]]:
"""Get activity logs for a user."""
if _mongo_db is None:
return []
query = {"user_id": user_id}
if action_type:
query["action_type"] = action_type
cursor = _mongo_db.activity_logs.find(query).sort("timestamp", DESCENDING).limit(limit)
results = []
async for doc in cursor:
results.append({
"id": str(doc["_id"]),
"user_id": doc["user_id"],
"action_type": doc["action_type"],
"details": doc.get("details", {}),
"timestamp": doc["timestamp"].isoformat() if doc.get("timestamp") else None
})
return results
# ============== Analytics Events (MongoDB) ==============
async def track_analytics_event(
event_type: str,
properties: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Track an analytics event."""
if _mongo_db is None:
return None
document = {
"event_type": event_type,
"properties": properties or {},
"timestamp": datetime.utcnow()
}
result = await _mongo_db.analytics.insert_one(document)
return {
"id": str(result.inserted_id),
**document
}
async def get_analytics_summary(
event_type: str = None,
hours: int = 24
) -> Dict[str, Any]:
"""Get analytics summary for the last N hours."""
if _mongo_db is None:
return {}
from datetime import timedelta
since = datetime.utcnow() - timedelta(hours=hours)
query = {"timestamp": {"$gte": since}}
if event_type:
query["event_type"] = event_type
total_events = await _mongo_db.analytics.count_documents(query)
# Get event type breakdown
pipeline = [
{"$match": query},
{"$group": {
"_id": "$event_type",
"count": {"$sum": 1}
}},
{"$sort": {"count": -1}}
]
breakdown = []
async for doc in _mongo_db.analytics.aggregate(pipeline):
breakdown.append({
"event_type": doc["_id"],
"count": doc["count"]
})
return {
"total_events": total_events,
"period_hours": hours,
"breakdown": breakdown
}
# ============== User Search History (MongoDB) ==============
async def log_search(
user_id: int = None,
search_type: str = None,
query: str = None,
results_count: int = 0
) -> Dict[str, Any]:
"""Log a search event."""
if _mongo_db is None:
return None
document = {
"user_id": user_id,
"search_type": search_type,
"query": query,
"results_count": results_count,
"timestamp": datetime.utcnow()
}
result = await _mongo_db.search_history.insert_one(document)
return {
"id": str(result.inserted_id),
**document
}
async def get_popular_searches(
hours: int = 24,
limit: int = 10
) -> List[Dict[str, Any]]:
"""Get popular search queries."""
if _mongo_db is None:
return []
from datetime import timedelta
since = datetime.utcnow() - timedelta(hours=hours)
pipeline = [
{"$match": {"timestamp": {"$gte": since}}},
{"$group": {
"_id": "$query",
"count": {"$sum": 1},
"search_type": {"$first": "$search_type"}
}},
{"$sort": {"count": -1}},
{"$limit": limit}
]
results = []
async for doc in _mongo_db.search_history.aggregate(pipeline):
results.append({
"query": doc["_id"],
"search_type": doc.get("search_type"),
"count": doc["count"]
})
return results