| from contextlib import asynccontextmanager |
| from fastapi import FastAPI, Query, HTTPException, UploadFile, File, Form, Request |
| from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse |
| import gradio as gr |
| from app import gradio_app |
| import os |
| from database import ( |
| init_database, |
| |
| add_song, get_all_songs, get_song_by_id, delete_song, update_song_file, |
| |
| add_user, get_all_users, get_user_by_id, |
| |
| add_playlist, get_all_playlists, delete_playlist, |
| |
| add_memory, get_all_memories, get_memories_by_user, delete_memory, |
| |
| add_context, get_all_contexts, delete_context, |
| |
| add_play_history, get_play_history, |
| ) |
| from semantic_search import ( |
| |
| add_song_vibe, search_song_vibes, remove_song_vibe, |
| |
| add_memory_vibe, search_memory_vibes, remove_memory_vibe, |
| |
| add_context_vibe, search_context_vibes, remove_context_vibe, |
| |
| add_playlist_journey, search_playlist_journeys, remove_playlist_journey, |
| ) |
| from redis_client import ( |
| init_redis, |
| close_redis, |
| get_cached_song, cache_song, invalidate_song_cache, |
| get_cached_user, cache_user, invalidate_user_cache, |
| get_cached_playlist, cache_playlist, invalidate_playlist_cache, |
| get_cached_list, cache_list, invalidate_list_cache, cache_delete_pattern, |
| store_play_event, get_user_play_history as get_redis_play_history, |
| get_global_play_history as get_redis_global_history, |
| ) |
| from mongo_client import ( |
| init_mongodb, |
| close_mongodb, |
| get_mongo_db, |
| |
| store_play_history_mongo, |
| get_user_play_history_mongo, |
| get_global_play_history_mongo, |
| get_top_songs_mongo, |
| get_song_play_count_mongo, |
| |
| log_user_activity, |
| get_user_activity_mongo, |
| |
| track_analytics_event, |
| get_analytics_summary, |
| |
| log_search, |
| get_popular_searches, |
| ) |
| from minio_client import ( |
| init_minio, |
| get_minio_client, |
| upload_mp3_file, |
| download_mp3_file, |
| stream_mp3_file, |
| delete_mp3_file, |
| get_file_metadata, |
| list_all_mp3_files, |
| check_minio_health, |
| ) |
| import io |
|
|
| app = FastAPI() |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| """Application lifespan manager for Redis, MongoDB, MinIO and database initialization.""" |
| |
| init_database() |
| await init_redis() |
| await init_mongodb() |
| init_minio() |
| yield |
| |
| await close_redis() |
| await close_mongodb() |
|
|
|
|
| app.router.lifespan_context = lifespan |
|
|
|
|
| @app.get("/") |
| def greet_json(): |
| return {"Hello": "World!", "app": "Music Memories"} |
|
|
|
|
| @app.get("/health") |
| async def health_check(): |
| """Health check endpoint with Redis, MongoDB and MinIO status.""" |
| from redis_client import get_redis_client |
| redis_status = "disconnected" |
| mongo_status = "disconnected" |
| mongo_details = {} |
| minio_status = "disconnected" |
|
|
| try: |
| redis_client = get_redis_client() |
| if redis_client: |
| await redis_client.ping() |
| redis_status = "connected" |
| except Exception: |
| redis_status = "error" |
|
|
| try: |
| mongo_db = get_mongo_db() |
| if mongo_db is not None: |
| await mongo_db.command('ping') |
| mongo_status = "connected" |
| |
| collections = await mongo_db.list_collection_names() |
| mongo_details["collections"] = collections |
| except Exception as e: |
| mongo_status = "error" |
| mongo_details["error"] = str(e) |
|
|
| |
| minio_health = check_minio_health() |
| minio_status = minio_health.get("status", "error") |
|
|
| return { |
| "status": "healthy", |
| "redis": redis_status, |
| "mongodb": mongo_status, |
| "mongodb_details": mongo_details, |
| "minio": minio_status, |
| "database": "sqlite" |
| } |
|
|
|
|
| |
|
|
| @app.get("/songs") |
| async def list_songs(): |
| """Get all songs with MinIO file info.""" |
| |
| cached = await get_cached_list("all_songs") |
| if cached: |
| return {"songs": cached, "source": "cache"} |
|
|
| |
| songs = get_all_songs() |
| |
| |
| for song in songs: |
| file_meta = get_file_metadata(song["id"]) |
| song["has_audio"] = file_meta is not None |
| song["file_size"] = file_meta.get("size") if file_meta else None |
|
|
| await cache_list("all_songs", songs) |
| return {"songs": songs, "source": "database"} |
|
|
|
|
| @app.get("/songs/{song_id}") |
| async def get_song(song_id: int): |
| """Get a song by ID with play count and MinIO file info.""" |
| |
| song = await get_cached_song(song_id) |
| if song: |
| play_count = await get_song_play_count_mongo(song_id) |
| file_meta = get_file_metadata(song_id) |
| return { |
| **song, |
| "source": "cache", |
| "play_count": play_count, |
| "has_audio": file_meta is not None, |
| "file_size": file_meta.get("size") if file_meta else None, |
| } |
|
|
| |
| song = get_song_by_id(song_id) |
| if not song: |
| raise HTTPException(status_code=404, detail="Song not found") |
|
|
| |
| play_count = await get_song_play_count_mongo(song_id) |
| |
| |
| file_meta = get_file_metadata(song_id) |
| |
| await cache_song(song_id, song) |
| return { |
| **song, |
| "source": "database", |
| "play_count": play_count, |
| "has_audio": file_meta is not None, |
| "file_size": file_meta.get("size") if file_meta else None, |
| } |
|
|
|
|
| @app.post("/songs") |
| async def create_song( |
| request: Request, |
| title: str = Form(...), |
| artist: str = Form(...), |
| album: str = Form(None), |
| duration: int = Form(None), |
| bpm: int = Form(None), |
| energy_level: int = Form(None), |
| lyrics: str = Form(None), |
| audio_file: UploadFile = File(None), |
| ): |
| """Add a new song with optional MP3 file upload to MinIO.""" |
| song = add_song(title, artist, album, duration, bpm, energy_level) |
|
|
| |
| minio_info = None |
| if audio_file and audio_file.filename: |
| contents = await audio_file.read() |
| file_stream = io.BytesIO(contents) |
| minio_info = upload_mp3_file( |
| file_data=file_stream, |
| file_size=len(contents), |
| song_id=song["id"], |
| filename=audio_file.filename, |
| ) |
|
|
| if minio_info: |
| |
| update_song_file(song["id"], minio_info["object_name"], minio_info["size"]) |
|
|
| |
| if lyrics: |
| add_song_vibe(song["id"], title, artist, lyrics) |
|
|
| |
| await invalidate_list_cache("all_songs") |
|
|
| |
| await track_analytics_event("song_created", {"song_id": song["id"], "title": title}) |
|
|
| |
| |
| public_base = os.getenv("PUBLIC_BASE_URL") |
| |
| if not public_base: |
| |
| hf_space = os.getenv("SPACE_ID") |
| if hf_space: |
| |
| public_base = f"https://{hf_space.replace('/', '-')}.hf.space" |
| else: |
| |
| origin = request.headers.get("origin") |
| if origin: |
| public_base = origin.rstrip("/") |
| else: |
| |
| forwarded_proto = request.headers.get("x-forwarded-proto") |
| forwarded_host = request.headers.get("x-forwarded-host") |
| if forwarded_proto and forwarded_host: |
| public_base = f"{forwarded_proto}://{forwarded_host}" |
| else: |
| |
| public_base = str(request.base_url).rstrip("/") |
| |
| response = {"status": "success", "song": song} |
| if minio_info: |
| public_stream_url = f"{public_base}/songs/{song['id']}/stream" |
| public_download_url = f"{public_base}/songs/{song['id']}/download" |
| response["audio"] = { |
| "uploaded": True, |
| "size": minio_info["size"], |
| "stream_url": public_stream_url, |
| "download_url": public_download_url, |
| "minio_presigned_url": minio_info["presigned_url"], |
| } |
| return response |
|
|
|
|
| @app.delete("/songs/{song_id}") |
| async def delete_song_endpoint(song_id: int): |
| """Delete a song and its MP3 file from MinIO.""" |
| if not delete_song(song_id): |
| raise HTTPException(status_code=404, detail="Song not found") |
| |
| |
| delete_mp3_file(song_id) |
| |
| remove_song_vibe(song_id) |
| await invalidate_song_cache(song_id) |
| await invalidate_list_cache("all_songs") |
| |
| |
| await track_analytics_event("song_deleted", {"song_id": song_id}) |
| return {"status": "success", "deleted_id": song_id} |
|
|
|
|
| |
|
|
| @app.get("/songs/{song_id}/stream") |
| async def stream_song(song_id: int): |
| """Stream an MP3 file through the FastAPI app domain. |
| |
| NOTE: Redirecting to a MinIO presigned URL may point at an internal hostname (e.g. 127.0.0.1) |
| in hosted environments like Hugging Face Spaces, which browsers cannot reach. |
| """ |
| |
| song = get_song_by_id(song_id) |
| if not song: |
| raise HTTPException(status_code=404, detail="Song not found") |
| |
| |
| object_name = song.get("audio_file_path") |
| if not object_name: |
| raise HTTPException(status_code=404, detail="Audio file not found") |
| |
| file_data = download_mp3_file(song_id, object_name=object_name) |
| if not file_data: |
| raise HTTPException(status_code=404, detail="Audio file not found") |
|
|
| filename = f"{song.get('title', 'song')}_{song_id}.mp3" if song else f"song_{song_id}.mp3" |
|
|
| return StreamingResponse( |
| io.BytesIO(file_data), |
| media_type="audio/mpeg", |
| headers={"Content-Disposition": f"inline; filename={filename}"}, |
| ) |
|
|
|
|
| @app.get("/songs/{song_id}/download") |
| async def download_song(song_id: int): |
| """Download an MP3 file from MinIO.""" |
| |
| song = get_song_by_id(song_id) |
| if not song: |
| raise HTTPException(status_code=404, detail="Song not found") |
| |
| |
| object_name = song.get("audio_file_path") |
| if not object_name: |
| raise HTTPException(status_code=404, detail="Audio file not found") |
| |
| file_data = download_mp3_file(song_id, object_name=object_name) |
| if not file_data: |
| raise HTTPException(status_code=404, detail="Audio file not found") |
|
|
| filename = f"{song.get('title', 'song')}_{song_id}.mp3" if song else f"song_{song_id}.mp3" |
|
|
| return StreamingResponse( |
| io.BytesIO(file_data), |
| media_type="audio/mpeg", |
| headers={"Content-Disposition": f"attachment; filename={filename}"}, |
| ) |
|
|
|
|
| @app.post("/songs/{song_id}/upload-audio") |
| async def upload_audio(request: Request, song_id: int, audio_file: UploadFile = File(...)): |
| """Upload/update MP3 file for an existing song.""" |
| |
| song = get_song_by_id(song_id) |
| if not song: |
| raise HTTPException(status_code=404, detail="Song not found") |
|
|
| |
| if not audio_file.filename.lower().endswith('.mp3'): |
| raise HTTPException(status_code=400, detail="Only MP3 files are supported") |
|
|
| |
| contents = await audio_file.read() |
| file_stream = io.BytesIO(contents) |
|
|
| minio_info = upload_mp3_file( |
| file_data=file_stream, |
| file_size=len(contents), |
| song_id=song_id, |
| filename=audio_file.filename, |
| ) |
|
|
| if not minio_info: |
| raise HTTPException(status_code=500, detail="Failed to upload file to MinIO") |
|
|
| |
| update_song_file(song_id, minio_info["object_name"], minio_info["size"]) |
|
|
| |
| await invalidate_song_cache(song_id) |
| await invalidate_list_cache("all_songs") |
|
|
| |
| |
| public_base = os.getenv("PUBLIC_BASE_URL") |
| if not public_base: |
| hf_space = os.getenv("SPACE_ID") |
| if hf_space: |
| public_base = f"https://{hf_space.replace('/', '-')}.hf.space" |
| else: |
| origin = request.headers.get("origin") |
| if origin: |
| public_base = origin.rstrip("/") |
| else: |
| forwarded_proto = request.headers.get("x-forwarded-proto") |
| forwarded_host = request.headers.get("x-forwarded-host") |
| if forwarded_proto and forwarded_host: |
| public_base = f"{forwarded_proto}://{forwarded_host}" |
| else: |
| public_base = str(request.base_url).rstrip("/") |
|
|
| return { |
| "status": "success", |
| "audio": { |
| "uploaded": True, |
| "size": minio_info["size"], |
| "stream_url": f"{public_base}/songs/{song_id}/stream", |
| "download_url": f"{public_base}/songs/{song_id}/download", |
| "minio_presigned_url": minio_info["presigned_url"], |
| }, |
| } |
|
|
|
|
| |
|
|
| @app.get("/users") |
| async def list_users(): |
| """Get all users.""" |
| |
| cached = await get_cached_list("all_users") |
| if cached: |
| return {"users": cached, "source": "cache"} |
|
|
| |
| users = get_all_users() |
| await cache_list("all_users", users) |
| return {"users": users, "source": "database"} |
|
|
|
|
| @app.get("/users/{user_id}") |
| async def get_user(user_id: int): |
| """Get a user by ID.""" |
| |
| user = await get_cached_user(user_id) |
| if user: |
| return {**user, "source": "cache"} |
|
|
| |
| user = get_user_by_id(user_id) |
| if not user: |
| raise HTTPException(status_code=404, detail="User not found") |
|
|
| await cache_user(user_id, user) |
| return {**user, "source": "database"} |
|
|
|
|
| @app.post("/users") |
| async def create_user(name: str): |
| """Add a new user.""" |
| user = add_user(name) |
| await invalidate_list_cache("all_users") |
| |
| await log_user_activity(user["id"], "user_registered", {"name": name}) |
| await track_analytics_event("user_signup", {"user_id": user["id"]}) |
| return {"status": "success", "user": user} |
|
|
|
|
| |
|
|
| @app.get("/playlists") |
| async def list_playlists(): |
| """Get all playlists.""" |
| |
| cached = await get_cached_list("all_playlists") |
| if cached: |
| return {"playlists": cached, "source": "cache"} |
|
|
| |
| playlists = get_all_playlists() |
| await cache_list("all_playlists", playlists) |
| return {"playlists": playlists, "source": "database"} |
|
|
|
|
| @app.post("/playlists") |
| async def create_playlist(name: str, vibe_code: str = None, mood_description: str = None): |
| """Add a new playlist.""" |
| playlist = add_playlist(name, vibe_code) |
| |
| if mood_description: |
| add_playlist_journey(playlist["id"], name, mood_description) |
| await invalidate_list_cache("all_playlists") |
| |
| await track_analytics_event("playlist_created", {"playlist_id": playlist["id"], "name": name}) |
| return {"status": "success", "playlist": playlist} |
|
|
|
|
| @app.delete("/playlists/{playlist_id}") |
| async def delete_playlist_endpoint(playlist_id: int): |
| """Delete a playlist.""" |
| if not delete_playlist(playlist_id): |
| raise HTTPException(status_code=404, detail="Playlist not found") |
| remove_playlist_journey(playlist_id) |
| await invalidate_playlist_cache(playlist_id) |
| await invalidate_list_cache("all_playlists") |
| |
| await track_analytics_event("playlist_deleted", {"playlist_id": playlist_id}) |
| return {"status": "success", "deleted_id": playlist_id} |
|
|
|
|
| |
|
|
| @app.get("/memories") |
| async def list_memories(): |
| """Get all memories.""" |
| |
| cached = await get_cached_list("all_memories") |
| if cached: |
| return {"memories": cached, "source": "cache"} |
|
|
| |
| memories = get_all_memories() |
| await cache_list("all_memories", memories) |
| return {"memories": memories, "source": "database"} |
|
|
|
|
| @app.get("/users/{user_id}/memories") |
| async def list_user_memories(user_id: int): |
| """Get memories for a specific user.""" |
| if not get_user_by_id(user_id): |
| raise HTTPException(status_code=404, detail="User not found") |
|
|
| |
| cached = await get_cached_list(f"user_memories:{user_id}") |
| if cached: |
| return {"memories": cached, "source": "cache"} |
|
|
| |
| memories = get_memories_by_user(user_id) |
| await cache_list(f"user_memories:{user_id}", memories) |
| return {"memories": memories, "source": "database"} |
|
|
|
|
| @app.post("/memories") |
| async def create_memory(user_id: int, description: str, date: str = None, song_id: int = None): |
| """Add a new memory.""" |
| if not get_user_by_id(user_id): |
| raise HTTPException(status_code=404, detail="User not found") |
| memory = add_memory(user_id, description, date, song_id) |
| |
| add_memory_vibe(memory["id"], user_id, description) |
| await invalidate_list_cache("all_memories") |
| await invalidate_list_cache(f"user_memories:{user_id}") |
| |
| await log_user_activity(user_id, "memory_created", {"memory_id": memory["id"]}) |
| await track_analytics_event("memory_added", {"memory_id": memory["id"], "user_id": user_id}) |
| return {"status": "success", "memory": memory} |
|
|
|
|
| @app.delete("/memories/{memory_id}") |
| async def delete_memory_endpoint(memory_id: int): |
| """Delete a memory.""" |
| if not delete_memory(memory_id): |
| raise HTTPException(status_code=404, detail="Memory not found") |
| remove_memory_vibe(memory_id) |
| await cache_delete_pattern("list:user_memories:*") |
| await invalidate_list_cache("all_memories") |
| |
| await track_analytics_event("memory_deleted", {"memory_id": memory_id}) |
| return {"status": "success", "deleted_id": memory_id} |
|
|
|
|
| |
|
|
| @app.get("/contexts") |
| async def list_contexts(): |
| """Get all contexts.""" |
| |
| cached = await get_cached_list("all_contexts") |
| if cached: |
| return {"contexts": cached, "source": "cache"} |
|
|
| |
| contexts = get_all_contexts() |
| await cache_list("all_contexts", contexts) |
| return {"contexts": contexts, "source": "database"} |
|
|
|
|
| @app.post("/contexts") |
| async def create_context(weather: str = None, time_of_day: str = None, location_type: str = None): |
| """Add a new context.""" |
| context = add_context(weather, time_of_day, location_type) |
| |
| add_context_vibe(context["id"], weather or "", time_of_day or "", location_type or "") |
| await invalidate_list_cache("all_contexts") |
| |
| await track_analytics_event("context_created", {"context_id": context["id"]}) |
| return {"status": "success", "context": context} |
|
|
|
|
| @app.delete("/contexts/{context_id}") |
| async def delete_context_endpoint(context_id: int): |
| """Delete a context.""" |
| if not delete_context(context_id): |
| raise HTTPException(status_code=404, detail="Context not found") |
| remove_context_vibe(context_id) |
| await invalidate_list_cache("all_contexts") |
| |
| await track_analytics_event("context_deleted", {"context_id": context_id}) |
| return {"status": "success", "deleted_id": context_id} |
|
|
|
|
| |
|
|
| @app.get("/history") |
| async def list_history(user_id: int = None, limit: int = 50): |
| """Get play history from MongoDB (primary storage).""" |
| if user_id: |
| |
| history = await get_user_play_history_mongo(user_id, limit) |
| if not history: |
| |
| history = await get_redis_play_history(user_id, limit) |
| if not history: |
| |
| history = get_play_history(user_id, limit) |
| return {"history": history, "source": "sqlite"} |
| return {"history": history, "source": "redis"} |
| return {"history": history, "source": "mongodb"} |
| else: |
| |
| history = await get_global_play_history_mongo(limit) |
| if not history: |
| |
| history = await get_redis_global_history(limit) |
| if not history: |
| |
| history = get_play_history(None, limit) |
| return {"history": history, "source": "sqlite"} |
| return {"history": history, "source": "redis"} |
| return {"history": history, "source": "mongodb"} |
|
|
|
|
| @app.post("/history") |
| async def create_history(user_id: int, song_id: int, context_id: int = None, duration_seconds: int = None): |
| """Add a play history entry to MongoDB (primary), Redis (cache), and SQLite (backup).""" |
| |
| song = get_song_by_id(song_id) |
|
|
| |
| mongo_result = await store_play_history_mongo( |
| user_id=user_id, |
| song_id=song_id, |
| song_title=song.get("title") if song else None, |
| song_artist=song.get("artist") if song else None, |
| context_id=context_id, |
| duration_seconds=duration_seconds |
| ) |
|
|
| |
| await store_play_event(user_id, song_id, context_id) |
|
|
| |
| sqlite_history = add_play_history(user_id, song_id, context_id) |
|
|
| |
| await track_analytics_event("song_played", { |
| "user_id": user_id, |
| "song_id": song_id, |
| "context_id": context_id |
| }) |
|
|
| return { |
| "status": "success", |
| "history": sqlite_history, |
| "mongodb_id": mongo_result.get("id") if mongo_result else None, |
| "redis_stored": True |
| } |
|
|
|
|
| |
|
|
| @app.get("/analytics/summary") |
| async def get_analytics(hours: int = Query(24, description="Hours to look back")): |
| """Get analytics summary from MongoDB.""" |
| summary = await get_analytics_summary(hours=hours) |
| return summary |
|
|
|
|
| @app.get("/analytics/top-songs") |
| async def get_top_songs(limit: int = Query(10, description="Number of results")): |
| """Get top played songs from MongoDB.""" |
| top_songs = await get_top_songs_mongo(limit) |
| return {"top_songs": top_songs} |
|
|
|
|
| @app.get("/analytics/popular-searches") |
| async def get_popular_searches_endpoint(hours: int = Query(24, description="Hours to look back"), limit: int = Query(10)): |
| """Get popular search queries from MongoDB.""" |
| searches = await get_popular_searches(hours=hours, limit=limit) |
| return {"popular_searches": searches} |
|
|
|
|
| @app.get("/users/{user_id}/activity") |
| async def get_user_activity(user_id: int, limit: int = Query(50), action_type: str = None): |
| """Get user activity log from MongoDB.""" |
| activity = await get_user_activity_mongo(user_id, limit, action_type) |
| return {"activity": activity} |
|
|
|
|
| @app.get("/storage/files") |
| async def list_storage_files(): |
| """List all MP3 files in MinIO storage.""" |
| files = list_all_mp3_files() |
| return {"files": files, "count": len(files)} |
|
|
|
|
| |
|
|
| @app.get("/search/songs") |
| async def search_songs(q: str = Query(..., description="Search query"), n: int = Query(5, description="Number of results")): |
| """Semantic search for songs by vibe/lyrics.""" |
| results = search_song_vibes(q, n_results=n) |
| |
| await log_search(search_type="songs", query=q, results_count=len(results)) |
| await track_analytics_event("search_performed", {"search_type": "songs", "query": q, "results": len(results)}) |
| return {"query": q, "results": results} |
|
|
|
|
| @app.get("/search/memories") |
| async def search_memories(q: str = Query(..., description="Search query"), n: int = Query(5, description="Number of results")): |
| """Semantic search for memories.""" |
| results = search_memory_vibes(q, n_results=n) |
| |
| await log_search(search_type="memories", query=q, results_count=len(results)) |
| await track_analytics_event("search_performed", {"search_type": "memories", "query": q, "results": len(results)}) |
| return {"query": q, "results": results} |
|
|
|
|
| @app.get("/search/contexts") |
| async def search_contexts(q: str = Query(..., description="Search query"), n: int = Query(5, description="Number of results")): |
| """Semantic search for contexts.""" |
| results = search_context_vibes(q, n_results=n) |
| |
| await log_search(search_type="contexts", query=q, results_count=len(results)) |
| await track_analytics_event("search_performed", {"search_type": "contexts", "query": q, "results": len(results)}) |
| return {"query": q, "results": results} |
|
|
|
|
| @app.get("/search/playlists") |
| async def search_playlists(q: str = Query(..., description="Search query"), n: int = Query(5, description="Number of results")): |
| """Semantic search for playlists by mood.""" |
| results = search_playlist_journeys(q, n_results=n) |
| |
| await log_search(search_type="playlists", query=q, results_count=len(results)) |
| await track_analytics_event("search_performed", {"search_type": "playlists", "query": q, "results": len(results)}) |
| return {"query": q, "results": results} |
|
|
|
|
| |
| app = gr.mount_gradio_app(app, gradio_app, path="/ui") |
|
|