from contextlib import asynccontextmanager from fastapi import FastAPI, Query, HTTPException, UploadFile, File, Form, Request from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse import gradio as gr from app import gradio_app import os from database import ( init_database, # Songs add_song, get_all_songs, get_song_by_id, delete_song, update_song_file, # Users add_user, get_all_users, get_user_by_id, # Playlists add_playlist, get_all_playlists, delete_playlist, # Memories add_memory, get_all_memories, get_memories_by_user, delete_memory, # Contexts add_context, get_all_contexts, delete_context, # Play History add_play_history, get_play_history, ) from semantic_search import ( # Song vibes add_song_vibe, search_song_vibes, remove_song_vibe, # Memory vibes add_memory_vibe, search_memory_vibes, remove_memory_vibe, # Context vibes add_context_vibe, search_context_vibes, remove_context_vibe, # Playlist journeys add_playlist_journey, search_playlist_journeys, remove_playlist_journey, ) from redis_client import ( init_redis, close_redis, get_cached_song, cache_song, invalidate_song_cache, get_cached_user, cache_user, invalidate_user_cache, get_cached_playlist, cache_playlist, invalidate_playlist_cache, get_cached_list, cache_list, invalidate_list_cache, cache_delete_pattern, store_play_event, get_user_play_history as get_redis_play_history, get_global_play_history as get_redis_global_history, ) from mongo_client import ( init_mongodb, close_mongodb, get_mongo_db, # Play history store_play_history_mongo, get_user_play_history_mongo, get_global_play_history_mongo, get_top_songs_mongo, get_song_play_count_mongo, # Activity logs log_user_activity, get_user_activity_mongo, # Analytics track_analytics_event, get_analytics_summary, # Search history log_search, get_popular_searches, ) from minio_client import ( init_minio, get_minio_client, upload_mp3_file, download_mp3_file, stream_mp3_file, delete_mp3_file, get_file_metadata, list_all_mp3_files, check_minio_health, ) import io app = FastAPI() @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager for Redis, MongoDB, MinIO and database initialization.""" # Startup init_database() await init_redis() await init_mongodb() init_minio() yield # Shutdown await close_redis() await close_mongodb() app.router.lifespan_context = lifespan @app.get("/") def greet_json(): return {"Hello": "World!", "app": "Music Memories"} @app.get("/health") async def health_check(): """Health check endpoint with Redis, MongoDB and MinIO status.""" from redis_client import get_redis_client redis_status = "disconnected" mongo_status = "disconnected" mongo_details = {} minio_status = "disconnected" try: redis_client = get_redis_client() if redis_client: await redis_client.ping() redis_status = "connected" except Exception: redis_status = "error" try: mongo_db = get_mongo_db() if mongo_db is not None: await mongo_db.command('ping') mongo_status = "connected" # Get collection stats collections = await mongo_db.list_collection_names() mongo_details["collections"] = collections except Exception as e: mongo_status = "error" mongo_details["error"] = str(e) # MinIO health check minio_health = check_minio_health() minio_status = minio_health.get("status", "error") return { "status": "healthy", "redis": redis_status, "mongodb": mongo_status, "mongodb_details": mongo_details, "minio": minio_status, "database": "sqlite" } # ============== SONGS + MP3 FILES ============== @app.get("/songs") async def list_songs(): """Get all songs with MinIO file info.""" # Try cache first cached = await get_cached_list("all_songs") if cached: return {"songs": cached, "source": "cache"} # Fetch from database songs = get_all_songs() # Add MinIO file info to each song for song in songs: file_meta = get_file_metadata(song["id"]) song["has_audio"] = file_meta is not None song["file_size"] = file_meta.get("size") if file_meta else None await cache_list("all_songs", songs) return {"songs": songs, "source": "database"} @app.get("/songs/{song_id}") async def get_song(song_id: int): """Get a song by ID with play count and MinIO file info.""" # Try cache first song = await get_cached_song(song_id) if song: play_count = await get_song_play_count_mongo(song_id) file_meta = get_file_metadata(song_id) return { **song, "source": "cache", "play_count": play_count, "has_audio": file_meta is not None, "file_size": file_meta.get("size") if file_meta else None, } # Fetch from database song = get_song_by_id(song_id) if not song: raise HTTPException(status_code=404, detail="Song not found") # Add play count from MongoDB play_count = await get_song_play_count_mongo(song_id) # Add MinIO file info file_meta = get_file_metadata(song_id) await cache_song(song_id, song) return { **song, "source": "database", "play_count": play_count, "has_audio": file_meta is not None, "file_size": file_meta.get("size") if file_meta else None, } @app.post("/songs") async def create_song( request: Request, title: str = Form(...), artist: str = Form(...), album: str = Form(None), duration: int = Form(None), bpm: int = Form(None), energy_level: int = Form(None), lyrics: str = Form(None), audio_file: UploadFile = File(None), ): """Add a new song with optional MP3 file upload to MinIO.""" song = add_song(title, artist, album, duration, bpm, energy_level) # Handle MP3 file upload to MinIO minio_info = None if audio_file and audio_file.filename: contents = await audio_file.read() file_stream = io.BytesIO(contents) minio_info = upload_mp3_file( file_data=file_stream, file_size=len(contents), song_id=song["id"], filename=audio_file.filename, ) if minio_info: # Update song with file reference update_song_file(song["id"], minio_info["object_name"], minio_info["size"]) # Add to semantic search index if lyrics provided if lyrics: add_song_vibe(song["id"], title, artist, lyrics) # Invalidate songs list cache await invalidate_list_cache("all_songs") # Log activity to MongoDB await track_analytics_event("song_created", {"song_id": song["id"], "title": title}) # Determine the public base URL for the hosted environment # Priority: 1) Env var, 2) HF Space detection, 3) Origin header, 4) Forwarded headers, 5) Fallback public_base = os.getenv("PUBLIC_BASE_URL") if not public_base: # Check if running on Hugging Face Spaces hf_space = os.getenv("SPACE_ID") if hf_space: # Extract space name from SPACE_ID (e.g., "username/space-name") public_base = f"https://{hf_space.replace('/', '-')}.hf.space" else: # Try Origin header (usually present in browser requests) origin = request.headers.get("origin") if origin: public_base = origin.rstrip("/") else: # Try to derive from request headers (works with proxies like HF Spaces) forwarded_proto = request.headers.get("x-forwarded-proto") forwarded_host = request.headers.get("x-forwarded-host") if forwarded_proto and forwarded_host: public_base = f"{forwarded_proto}://{forwarded_host}" else: # Fallback to request base URL public_base = str(request.base_url).rstrip("/") response = {"status": "success", "song": song} if minio_info: public_stream_url = f"{public_base}/songs/{song['id']}/stream" public_download_url = f"{public_base}/songs/{song['id']}/download" response["audio"] = { "uploaded": True, "size": minio_info["size"], "stream_url": public_stream_url, "download_url": public_download_url, "minio_presigned_url": minio_info["presigned_url"], } return response @app.delete("/songs/{song_id}") async def delete_song_endpoint(song_id: int): """Delete a song and its MP3 file from MinIO.""" if not delete_song(song_id): raise HTTPException(status_code=404, detail="Song not found") # Delete MP3 file from MinIO delete_mp3_file(song_id) remove_song_vibe(song_id) await invalidate_song_cache(song_id) await invalidate_list_cache("all_songs") # Log activity to MongoDB await track_analytics_event("song_deleted", {"song_id": song_id}) return {"status": "success", "deleted_id": song_id} # ============== MP3 FILE STREAMING ENDPOINTS ============== @app.get("/songs/{song_id}/stream") async def stream_song(song_id: int): """Stream an MP3 file through the FastAPI app domain. NOTE: Redirecting to a MinIO presigned URL may point at an internal hostname (e.g. 127.0.0.1) in hosted environments like Hugging Face Spaces, which browsers cannot reach. """ # Get song info to find the audio file path song = get_song_by_id(song_id) if not song: raise HTTPException(status_code=404, detail="Song not found") # Get the stored audio file path from the database object_name = song.get("audio_file_path") if not object_name: raise HTTPException(status_code=404, detail="Audio file not found") file_data = download_mp3_file(song_id, object_name=object_name) if not file_data: raise HTTPException(status_code=404, detail="Audio file not found") filename = f"{song.get('title', 'song')}_{song_id}.mp3" if song else f"song_{song_id}.mp3" return StreamingResponse( io.BytesIO(file_data), media_type="audio/mpeg", headers={"Content-Disposition": f"inline; filename={filename}"}, ) @app.get("/songs/{song_id}/download") async def download_song(song_id: int): """Download an MP3 file from MinIO.""" # Get song info to find the audio file path song = get_song_by_id(song_id) if not song: raise HTTPException(status_code=404, detail="Song not found") # Get the stored audio file path from the database object_name = song.get("audio_file_path") if not object_name: raise HTTPException(status_code=404, detail="Audio file not found") file_data = download_mp3_file(song_id, object_name=object_name) if not file_data: raise HTTPException(status_code=404, detail="Audio file not found") filename = f"{song.get('title', 'song')}_{song_id}.mp3" if song else f"song_{song_id}.mp3" return StreamingResponse( io.BytesIO(file_data), media_type="audio/mpeg", headers={"Content-Disposition": f"attachment; filename={filename}"}, ) @app.post("/songs/{song_id}/upload-audio") async def upload_audio(request: Request, song_id: int, audio_file: UploadFile = File(...)): """Upload/update MP3 file for an existing song.""" # Verify song exists song = get_song_by_id(song_id) if not song: raise HTTPException(status_code=404, detail="Song not found") # Validate file type if not audio_file.filename.lower().endswith('.mp3'): raise HTTPException(status_code=400, detail="Only MP3 files are supported") # Read and upload file contents = await audio_file.read() file_stream = io.BytesIO(contents) minio_info = upload_mp3_file( file_data=file_stream, file_size=len(contents), song_id=song_id, filename=audio_file.filename, ) if not minio_info: raise HTTPException(status_code=500, detail="Failed to upload file to MinIO") # Update song with file reference update_song_file(song_id, minio_info["object_name"], minio_info["size"]) # Invalidate cache await invalidate_song_cache(song_id) await invalidate_list_cache("all_songs") # Determine the public base URL for the hosted environment # Priority: 1) Env var, 2) HF Space detection, 3) Origin header, 4) Forwarded headers, 5) Fallback public_base = os.getenv("PUBLIC_BASE_URL") if not public_base: hf_space = os.getenv("SPACE_ID") if hf_space: public_base = f"https://{hf_space.replace('/', '-')}.hf.space" else: origin = request.headers.get("origin") if origin: public_base = origin.rstrip("/") else: forwarded_proto = request.headers.get("x-forwarded-proto") forwarded_host = request.headers.get("x-forwarded-host") if forwarded_proto and forwarded_host: public_base = f"{forwarded_proto}://{forwarded_host}" else: public_base = str(request.base_url).rstrip("/") return { "status": "success", "audio": { "uploaded": True, "size": minio_info["size"], "stream_url": f"{public_base}/songs/{song_id}/stream", "download_url": f"{public_base}/songs/{song_id}/download", "minio_presigned_url": minio_info["presigned_url"], }, } # ============== USERS ============== @app.get("/users") async def list_users(): """Get all users.""" # Try cache first cached = await get_cached_list("all_users") if cached: return {"users": cached, "source": "cache"} # Fetch from database users = get_all_users() await cache_list("all_users", users) return {"users": users, "source": "database"} @app.get("/users/{user_id}") async def get_user(user_id: int): """Get a user by ID.""" # Try cache first user = await get_cached_user(user_id) if user: return {**user, "source": "cache"} # Fetch from database user = get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="User not found") await cache_user(user_id, user) return {**user, "source": "database"} @app.post("/users") async def create_user(name: str): """Add a new user.""" user = add_user(name) await invalidate_list_cache("all_users") # Log activity to MongoDB await log_user_activity(user["id"], "user_registered", {"name": name}) await track_analytics_event("user_signup", {"user_id": user["id"]}) return {"status": "success", "user": user} # ============== PLAYLISTS ============== @app.get("/playlists") async def list_playlists(): """Get all playlists.""" # Try cache first cached = await get_cached_list("all_playlists") if cached: return {"playlists": cached, "source": "cache"} # Fetch from database playlists = get_all_playlists() await cache_list("all_playlists", playlists) return {"playlists": playlists, "source": "database"} @app.post("/playlists") async def create_playlist(name: str, vibe_code: str = None, mood_description: str = None): """Add a new playlist.""" playlist = add_playlist(name, vibe_code) # Add to playlist journeys for mood search if mood_description: add_playlist_journey(playlist["id"], name, mood_description) await invalidate_list_cache("all_playlists") # Log activity to MongoDB await track_analytics_event("playlist_created", {"playlist_id": playlist["id"], "name": name}) return {"status": "success", "playlist": playlist} @app.delete("/playlists/{playlist_id}") async def delete_playlist_endpoint(playlist_id: int): """Delete a playlist.""" if not delete_playlist(playlist_id): raise HTTPException(status_code=404, detail="Playlist not found") remove_playlist_journey(playlist_id) await invalidate_playlist_cache(playlist_id) await invalidate_list_cache("all_playlists") # Log activity to MongoDB await track_analytics_event("playlist_deleted", {"playlist_id": playlist_id}) return {"status": "success", "deleted_id": playlist_id} # ============== MEMORIES ============== @app.get("/memories") async def list_memories(): """Get all memories.""" # Try cache first cached = await get_cached_list("all_memories") if cached: return {"memories": cached, "source": "cache"} # Fetch from database memories = get_all_memories() await cache_list("all_memories", memories) return {"memories": memories, "source": "database"} @app.get("/users/{user_id}/memories") async def list_user_memories(user_id: int): """Get memories for a specific user.""" if not get_user_by_id(user_id): raise HTTPException(status_code=404, detail="User not found") # Try cache first cached = await get_cached_list(f"user_memories:{user_id}") if cached: return {"memories": cached, "source": "cache"} # Fetch from database memories = get_memories_by_user(user_id) await cache_list(f"user_memories:{user_id}", memories) return {"memories": memories, "source": "database"} @app.post("/memories") async def create_memory(user_id: int, description: str, date: str = None, song_id: int = None): """Add a new memory.""" if not get_user_by_id(user_id): raise HTTPException(status_code=404, detail="User not found") memory = add_memory(user_id, description, date, song_id) # Add to semantic search index add_memory_vibe(memory["id"], user_id, description) await invalidate_list_cache("all_memories") await invalidate_list_cache(f"user_memories:{user_id}") # Log activity to MongoDB await log_user_activity(user_id, "memory_created", {"memory_id": memory["id"]}) await track_analytics_event("memory_added", {"memory_id": memory["id"], "user_id": user_id}) return {"status": "success", "memory": memory} @app.delete("/memories/{memory_id}") async def delete_memory_endpoint(memory_id: int): """Delete a memory.""" if not delete_memory(memory_id): raise HTTPException(status_code=404, detail="Memory not found") remove_memory_vibe(memory_id) await cache_delete_pattern("list:user_memories:*") await invalidate_list_cache("all_memories") # Log activity to MongoDB await track_analytics_event("memory_deleted", {"memory_id": memory_id}) return {"status": "success", "deleted_id": memory_id} # ============== CONTEXTS ============== @app.get("/contexts") async def list_contexts(): """Get all contexts.""" # Try cache first cached = await get_cached_list("all_contexts") if cached: return {"contexts": cached, "source": "cache"} # Fetch from database contexts = get_all_contexts() await cache_list("all_contexts", contexts) return {"contexts": contexts, "source": "database"} @app.post("/contexts") async def create_context(weather: str = None, time_of_day: str = None, location_type: str = None): """Add a new context.""" context = add_context(weather, time_of_day, location_type) # Add to semantic search index add_context_vibe(context["id"], weather or "", time_of_day or "", location_type or "") await invalidate_list_cache("all_contexts") # Log activity to MongoDB await track_analytics_event("context_created", {"context_id": context["id"]}) return {"status": "success", "context": context} @app.delete("/contexts/{context_id}") async def delete_context_endpoint(context_id: int): """Delete a context.""" if not delete_context(context_id): raise HTTPException(status_code=404, detail="Context not found") remove_context_vibe(context_id) await invalidate_list_cache("all_contexts") # Log activity to MongoDB await track_analytics_event("context_deleted", {"context_id": context_id}) return {"status": "success", "deleted_id": context_id} # ============== PLAY HISTORY (MongoDB Primary) ============== @app.get("/history") async def list_history(user_id: int = None, limit: int = 50): """Get play history from MongoDB (primary storage).""" if user_id: # Try MongoDB first (primary storage) history = await get_user_play_history_mongo(user_id, limit) if not history: # Fallback to Redis history = await get_redis_play_history(user_id, limit) if not history: # Fallback to SQLite history = get_play_history(user_id, limit) return {"history": history, "source": "sqlite"} return {"history": history, "source": "redis"} return {"history": history, "source": "mongodb"} else: # Try MongoDB first (primary storage) history = await get_global_play_history_mongo(limit) if not history: # Fallback to Redis history = await get_redis_global_history(limit) if not history: # Fallback to SQLite history = get_play_history(None, limit) return {"history": history, "source": "sqlite"} return {"history": history, "source": "redis"} return {"history": history, "source": "mongodb"} @app.post("/history") async def create_history(user_id: int, song_id: int, context_id: int = None, duration_seconds: int = None): """Add a play history entry to MongoDB (primary), Redis (cache), and SQLite (backup).""" # Get song info for MongoDB storage song = get_song_by_id(song_id) # Store in MongoDB (primary storage for analytics) mongo_result = await store_play_history_mongo( user_id=user_id, song_id=song_id, song_title=song.get("title") if song else None, song_artist=song.get("artist") if song else None, context_id=context_id, duration_seconds=duration_seconds ) # Store in Redis (fast cache) await store_play_event(user_id, song_id, context_id) # Store in SQLite (backup) sqlite_history = add_play_history(user_id, song_id, context_id) # Log analytics event await track_analytics_event("song_played", { "user_id": user_id, "song_id": song_id, "context_id": context_id }) return { "status": "success", "history": sqlite_history, "mongodb_id": mongo_result.get("id") if mongo_result else None, "redis_stored": True } # ============== ANALYTICS ENDPOINTS (MongoDB) ============== @app.get("/analytics/summary") async def get_analytics(hours: int = Query(24, description="Hours to look back")): """Get analytics summary from MongoDB.""" summary = await get_analytics_summary(hours=hours) return summary @app.get("/analytics/top-songs") async def get_top_songs(limit: int = Query(10, description="Number of results")): """Get top played songs from MongoDB.""" top_songs = await get_top_songs_mongo(limit) return {"top_songs": top_songs} @app.get("/analytics/popular-searches") async def get_popular_searches_endpoint(hours: int = Query(24, description="Hours to look back"), limit: int = Query(10)): """Get popular search queries from MongoDB.""" searches = await get_popular_searches(hours=hours, limit=limit) return {"popular_searches": searches} @app.get("/users/{user_id}/activity") async def get_user_activity(user_id: int, limit: int = Query(50), action_type: str = None): """Get user activity log from MongoDB.""" activity = await get_user_activity_mongo(user_id, limit, action_type) return {"activity": activity} @app.get("/storage/files") async def list_storage_files(): """List all MP3 files in MinIO storage.""" files = list_all_mp3_files() return {"files": files, "count": len(files)} # ============== SEMANTIC SEARCH ============== @app.get("/search/songs") async def search_songs(q: str = Query(..., description="Search query"), n: int = Query(5, description="Number of results")): """Semantic search for songs by vibe/lyrics.""" results = search_song_vibes(q, n_results=n) # Log search to MongoDB await log_search(search_type="songs", query=q, results_count=len(results)) await track_analytics_event("search_performed", {"search_type": "songs", "query": q, "results": len(results)}) return {"query": q, "results": results} @app.get("/search/memories") async def search_memories(q: str = Query(..., description="Search query"), n: int = Query(5, description="Number of results")): """Semantic search for memories.""" results = search_memory_vibes(q, n_results=n) # Log search to MongoDB await log_search(search_type="memories", query=q, results_count=len(results)) await track_analytics_event("search_performed", {"search_type": "memories", "query": q, "results": len(results)}) return {"query": q, "results": results} @app.get("/search/contexts") async def search_contexts(q: str = Query(..., description="Search query"), n: int = Query(5, description="Number of results")): """Semantic search for contexts.""" results = search_context_vibes(q, n_results=n) # Log search to MongoDB await log_search(search_type="contexts", query=q, results_count=len(results)) await track_analytics_event("search_performed", {"search_type": "contexts", "query": q, "results": len(results)}) return {"query": q, "results": results} @app.get("/search/playlists") async def search_playlists(q: str = Query(..., description="Search query"), n: int = Query(5, description="Number of results")): """Semantic search for playlists by mood.""" results = search_playlist_journeys(q, n_results=n) # Log search to MongoDB await log_search(search_type="playlists", query=q, results_count=len(results)) await track_analytics_event("search_performed", {"search_type": "playlists", "query": q, "results": len(results)}) return {"query": q, "results": results} # Mount the Gradio app at /ui app = gr.mount_gradio_app(app, gradio_app, path="/ui")