pool / main.py
gcharanteja
minio fix forthe streaming
1d51b72
Raw
History Blame Contribute Delete
26.5 kB
from contextlib import asynccontextmanager
from fastapi import FastAPI, Query, HTTPException, UploadFile, File, Form, Request
from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse
import gradio as gr
from app import gradio_app
import os
from database import (
init_database,
# Songs
add_song, get_all_songs, get_song_by_id, delete_song, update_song_file,
# Users
add_user, get_all_users, get_user_by_id,
# Playlists
add_playlist, get_all_playlists, delete_playlist,
# Memories
add_memory, get_all_memories, get_memories_by_user, delete_memory,
# Contexts
add_context, get_all_contexts, delete_context,
# Play History
add_play_history, get_play_history,
)
from semantic_search import (
# Song vibes
add_song_vibe, search_song_vibes, remove_song_vibe,
# Memory vibes
add_memory_vibe, search_memory_vibes, remove_memory_vibe,
# Context vibes
add_context_vibe, search_context_vibes, remove_context_vibe,
# Playlist journeys
add_playlist_journey, search_playlist_journeys, remove_playlist_journey,
)
from redis_client import (
init_redis,
close_redis,
get_cached_song, cache_song, invalidate_song_cache,
get_cached_user, cache_user, invalidate_user_cache,
get_cached_playlist, cache_playlist, invalidate_playlist_cache,
get_cached_list, cache_list, invalidate_list_cache, cache_delete_pattern,
store_play_event, get_user_play_history as get_redis_play_history,
get_global_play_history as get_redis_global_history,
)
from mongo_client import (
init_mongodb,
close_mongodb,
get_mongo_db,
# Play history
store_play_history_mongo,
get_user_play_history_mongo,
get_global_play_history_mongo,
get_top_songs_mongo,
get_song_play_count_mongo,
# Activity logs
log_user_activity,
get_user_activity_mongo,
# Analytics
track_analytics_event,
get_analytics_summary,
# Search history
log_search,
get_popular_searches,
)
from minio_client import (
init_minio,
get_minio_client,
upload_mp3_file,
download_mp3_file,
stream_mp3_file,
delete_mp3_file,
get_file_metadata,
list_all_mp3_files,
check_minio_health,
)
import io
app = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager for Redis, MongoDB, MinIO and database initialization."""
# Startup
init_database()
await init_redis()
await init_mongodb()
init_minio()
yield
# Shutdown
await close_redis()
await close_mongodb()
app.router.lifespan_context = lifespan
@app.get("/")
def greet_json():
return {"Hello": "World!", "app": "Music Memories"}
@app.get("/health")
async def health_check():
"""Health check endpoint with Redis, MongoDB and MinIO status."""
from redis_client import get_redis_client
redis_status = "disconnected"
mongo_status = "disconnected"
mongo_details = {}
minio_status = "disconnected"
try:
redis_client = get_redis_client()
if redis_client:
await redis_client.ping()
redis_status = "connected"
except Exception:
redis_status = "error"
try:
mongo_db = get_mongo_db()
if mongo_db is not None:
await mongo_db.command('ping')
mongo_status = "connected"
# Get collection stats
collections = await mongo_db.list_collection_names()
mongo_details["collections"] = collections
except Exception as e:
mongo_status = "error"
mongo_details["error"] = str(e)
# MinIO health check
minio_health = check_minio_health()
minio_status = minio_health.get("status", "error")
return {
"status": "healthy",
"redis": redis_status,
"mongodb": mongo_status,
"mongodb_details": mongo_details,
"minio": minio_status,
"database": "sqlite"
}
# ============== SONGS + MP3 FILES ==============
@app.get("/songs")
async def list_songs():
"""Get all songs with MinIO file info."""
# Try cache first
cached = await get_cached_list("all_songs")
if cached:
return {"songs": cached, "source": "cache"}
# Fetch from database
songs = get_all_songs()
# Add MinIO file info to each song
for song in songs:
file_meta = get_file_metadata(song["id"])
song["has_audio"] = file_meta is not None
song["file_size"] = file_meta.get("size") if file_meta else None
await cache_list("all_songs", songs)
return {"songs": songs, "source": "database"}
@app.get("/songs/{song_id}")
async def get_song(song_id: int):
"""Get a song by ID with play count and MinIO file info."""
# Try cache first
song = await get_cached_song(song_id)
if song:
play_count = await get_song_play_count_mongo(song_id)
file_meta = get_file_metadata(song_id)
return {
**song,
"source": "cache",
"play_count": play_count,
"has_audio": file_meta is not None,
"file_size": file_meta.get("size") if file_meta else None,
}
# Fetch from database
song = get_song_by_id(song_id)
if not song:
raise HTTPException(status_code=404, detail="Song not found")
# Add play count from MongoDB
play_count = await get_song_play_count_mongo(song_id)
# Add MinIO file info
file_meta = get_file_metadata(song_id)
await cache_song(song_id, song)
return {
**song,
"source": "database",
"play_count": play_count,
"has_audio": file_meta is not None,
"file_size": file_meta.get("size") if file_meta else None,
}
@app.post("/songs")
async def create_song(
request: Request,
title: str = Form(...),
artist: str = Form(...),
album: str = Form(None),
duration: int = Form(None),
bpm: int = Form(None),
energy_level: int = Form(None),
lyrics: str = Form(None),
audio_file: UploadFile = File(None),
):
"""Add a new song with optional MP3 file upload to MinIO."""
song = add_song(title, artist, album, duration, bpm, energy_level)
# Handle MP3 file upload to MinIO
minio_info = None
if audio_file and audio_file.filename:
contents = await audio_file.read()
file_stream = io.BytesIO(contents)
minio_info = upload_mp3_file(
file_data=file_stream,
file_size=len(contents),
song_id=song["id"],
filename=audio_file.filename,
)
if minio_info:
# Update song with file reference
update_song_file(song["id"], minio_info["object_name"], minio_info["size"])
# Add to semantic search index if lyrics provided
if lyrics:
add_song_vibe(song["id"], title, artist, lyrics)
# Invalidate songs list cache
await invalidate_list_cache("all_songs")
# Log activity to MongoDB
await track_analytics_event("song_created", {"song_id": song["id"], "title": title})
# Determine the public base URL for the hosted environment
# Priority: 1) Env var, 2) HF Space detection, 3) Origin header, 4) Forwarded headers, 5) Fallback
public_base = os.getenv("PUBLIC_BASE_URL")
if not public_base:
# Check if running on Hugging Face Spaces
hf_space = os.getenv("SPACE_ID")
if hf_space:
# Extract space name from SPACE_ID (e.g., "username/space-name")
public_base = f"https://{hf_space.replace('/', '-')}.hf.space"
else:
# Try Origin header (usually present in browser requests)
origin = request.headers.get("origin")
if origin:
public_base = origin.rstrip("/")
else:
# Try to derive from request headers (works with proxies like HF Spaces)
forwarded_proto = request.headers.get("x-forwarded-proto")
forwarded_host = request.headers.get("x-forwarded-host")
if forwarded_proto and forwarded_host:
public_base = f"{forwarded_proto}://{forwarded_host}"
else:
# Fallback to request base URL
public_base = str(request.base_url).rstrip("/")
response = {"status": "success", "song": song}
if minio_info:
public_stream_url = f"{public_base}/songs/{song['id']}/stream"
public_download_url = f"{public_base}/songs/{song['id']}/download"
response["audio"] = {
"uploaded": True,
"size": minio_info["size"],
"stream_url": public_stream_url,
"download_url": public_download_url,
"minio_presigned_url": minio_info["presigned_url"],
}
return response
@app.delete("/songs/{song_id}")
async def delete_song_endpoint(song_id: int):
"""Delete a song and its MP3 file from MinIO."""
if not delete_song(song_id):
raise HTTPException(status_code=404, detail="Song not found")
# Delete MP3 file from MinIO
delete_mp3_file(song_id)
remove_song_vibe(song_id)
await invalidate_song_cache(song_id)
await invalidate_list_cache("all_songs")
# Log activity to MongoDB
await track_analytics_event("song_deleted", {"song_id": song_id})
return {"status": "success", "deleted_id": song_id}
# ============== MP3 FILE STREAMING ENDPOINTS ==============
@app.get("/songs/{song_id}/stream")
async def stream_song(song_id: int):
"""Stream an MP3 file through the FastAPI app domain.
NOTE: Redirecting to a MinIO presigned URL may point at an internal hostname (e.g. 127.0.0.1)
in hosted environments like Hugging Face Spaces, which browsers cannot reach.
"""
# Get song info to find the audio file path
song = get_song_by_id(song_id)
if not song:
raise HTTPException(status_code=404, detail="Song not found")
# Get the stored audio file path from the database
object_name = song.get("audio_file_path")
if not object_name:
raise HTTPException(status_code=404, detail="Audio file not found")
file_data = download_mp3_file(song_id, object_name=object_name)
if not file_data:
raise HTTPException(status_code=404, detail="Audio file not found")
filename = f"{song.get('title', 'song')}_{song_id}.mp3" if song else f"song_{song_id}.mp3"
return StreamingResponse(
io.BytesIO(file_data),
media_type="audio/mpeg",
headers={"Content-Disposition": f"inline; filename={filename}"},
)
@app.get("/songs/{song_id}/download")
async def download_song(song_id: int):
"""Download an MP3 file from MinIO."""
# Get song info to find the audio file path
song = get_song_by_id(song_id)
if not song:
raise HTTPException(status_code=404, detail="Song not found")
# Get the stored audio file path from the database
object_name = song.get("audio_file_path")
if not object_name:
raise HTTPException(status_code=404, detail="Audio file not found")
file_data = download_mp3_file(song_id, object_name=object_name)
if not file_data:
raise HTTPException(status_code=404, detail="Audio file not found")
filename = f"{song.get('title', 'song')}_{song_id}.mp3" if song else f"song_{song_id}.mp3"
return StreamingResponse(
io.BytesIO(file_data),
media_type="audio/mpeg",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
@app.post("/songs/{song_id}/upload-audio")
async def upload_audio(request: Request, song_id: int, audio_file: UploadFile = File(...)):
"""Upload/update MP3 file for an existing song."""
# Verify song exists
song = get_song_by_id(song_id)
if not song:
raise HTTPException(status_code=404, detail="Song not found")
# Validate file type
if not audio_file.filename.lower().endswith('.mp3'):
raise HTTPException(status_code=400, detail="Only MP3 files are supported")
# Read and upload file
contents = await audio_file.read()
file_stream = io.BytesIO(contents)
minio_info = upload_mp3_file(
file_data=file_stream,
file_size=len(contents),
song_id=song_id,
filename=audio_file.filename,
)
if not minio_info:
raise HTTPException(status_code=500, detail="Failed to upload file to MinIO")
# Update song with file reference
update_song_file(song_id, minio_info["object_name"], minio_info["size"])
# Invalidate cache
await invalidate_song_cache(song_id)
await invalidate_list_cache("all_songs")
# Determine the public base URL for the hosted environment
# Priority: 1) Env var, 2) HF Space detection, 3) Origin header, 4) Forwarded headers, 5) Fallback
public_base = os.getenv("PUBLIC_BASE_URL")
if not public_base:
hf_space = os.getenv("SPACE_ID")
if hf_space:
public_base = f"https://{hf_space.replace('/', '-')}.hf.space"
else:
origin = request.headers.get("origin")
if origin:
public_base = origin.rstrip("/")
else:
forwarded_proto = request.headers.get("x-forwarded-proto")
forwarded_host = request.headers.get("x-forwarded-host")
if forwarded_proto and forwarded_host:
public_base = f"{forwarded_proto}://{forwarded_host}"
else:
public_base = str(request.base_url).rstrip("/")
return {
"status": "success",
"audio": {
"uploaded": True,
"size": minio_info["size"],
"stream_url": f"{public_base}/songs/{song_id}/stream",
"download_url": f"{public_base}/songs/{song_id}/download",
"minio_presigned_url": minio_info["presigned_url"],
},
}
# ============== USERS ==============
@app.get("/users")
async def list_users():
"""Get all users."""
# Try cache first
cached = await get_cached_list("all_users")
if cached:
return {"users": cached, "source": "cache"}
# Fetch from database
users = get_all_users()
await cache_list("all_users", users)
return {"users": users, "source": "database"}
@app.get("/users/{user_id}")
async def get_user(user_id: int):
"""Get a user by ID."""
# Try cache first
user = await get_cached_user(user_id)
if user:
return {**user, "source": "cache"}
# Fetch from database
user = get_user_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
await cache_user(user_id, user)
return {**user, "source": "database"}
@app.post("/users")
async def create_user(name: str):
"""Add a new user."""
user = add_user(name)
await invalidate_list_cache("all_users")
# Log activity to MongoDB
await log_user_activity(user["id"], "user_registered", {"name": name})
await track_analytics_event("user_signup", {"user_id": user["id"]})
return {"status": "success", "user": user}
# ============== PLAYLISTS ==============
@app.get("/playlists")
async def list_playlists():
"""Get all playlists."""
# Try cache first
cached = await get_cached_list("all_playlists")
if cached:
return {"playlists": cached, "source": "cache"}
# Fetch from database
playlists = get_all_playlists()
await cache_list("all_playlists", playlists)
return {"playlists": playlists, "source": "database"}
@app.post("/playlists")
async def create_playlist(name: str, vibe_code: str = None, mood_description: str = None):
"""Add a new playlist."""
playlist = add_playlist(name, vibe_code)
# Add to playlist journeys for mood search
if mood_description:
add_playlist_journey(playlist["id"], name, mood_description)
await invalidate_list_cache("all_playlists")
# Log activity to MongoDB
await track_analytics_event("playlist_created", {"playlist_id": playlist["id"], "name": name})
return {"status": "success", "playlist": playlist}
@app.delete("/playlists/{playlist_id}")
async def delete_playlist_endpoint(playlist_id: int):
"""Delete a playlist."""
if not delete_playlist(playlist_id):
raise HTTPException(status_code=404, detail="Playlist not found")
remove_playlist_journey(playlist_id)
await invalidate_playlist_cache(playlist_id)
await invalidate_list_cache("all_playlists")
# Log activity to MongoDB
await track_analytics_event("playlist_deleted", {"playlist_id": playlist_id})
return {"status": "success", "deleted_id": playlist_id}
# ============== MEMORIES ==============
@app.get("/memories")
async def list_memories():
"""Get all memories."""
# Try cache first
cached = await get_cached_list("all_memories")
if cached:
return {"memories": cached, "source": "cache"}
# Fetch from database
memories = get_all_memories()
await cache_list("all_memories", memories)
return {"memories": memories, "source": "database"}
@app.get("/users/{user_id}/memories")
async def list_user_memories(user_id: int):
"""Get memories for a specific user."""
if not get_user_by_id(user_id):
raise HTTPException(status_code=404, detail="User not found")
# Try cache first
cached = await get_cached_list(f"user_memories:{user_id}")
if cached:
return {"memories": cached, "source": "cache"}
# Fetch from database
memories = get_memories_by_user(user_id)
await cache_list(f"user_memories:{user_id}", memories)
return {"memories": memories, "source": "database"}
@app.post("/memories")
async def create_memory(user_id: int, description: str, date: str = None, song_id: int = None):
"""Add a new memory."""
if not get_user_by_id(user_id):
raise HTTPException(status_code=404, detail="User not found")
memory = add_memory(user_id, description, date, song_id)
# Add to semantic search index
add_memory_vibe(memory["id"], user_id, description)
await invalidate_list_cache("all_memories")
await invalidate_list_cache(f"user_memories:{user_id}")
# Log activity to MongoDB
await log_user_activity(user_id, "memory_created", {"memory_id": memory["id"]})
await track_analytics_event("memory_added", {"memory_id": memory["id"], "user_id": user_id})
return {"status": "success", "memory": memory}
@app.delete("/memories/{memory_id}")
async def delete_memory_endpoint(memory_id: int):
"""Delete a memory."""
if not delete_memory(memory_id):
raise HTTPException(status_code=404, detail="Memory not found")
remove_memory_vibe(memory_id)
await cache_delete_pattern("list:user_memories:*")
await invalidate_list_cache("all_memories")
# Log activity to MongoDB
await track_analytics_event("memory_deleted", {"memory_id": memory_id})
return {"status": "success", "deleted_id": memory_id}
# ============== CONTEXTS ==============
@app.get("/contexts")
async def list_contexts():
"""Get all contexts."""
# Try cache first
cached = await get_cached_list("all_contexts")
if cached:
return {"contexts": cached, "source": "cache"}
# Fetch from database
contexts = get_all_contexts()
await cache_list("all_contexts", contexts)
return {"contexts": contexts, "source": "database"}
@app.post("/contexts")
async def create_context(weather: str = None, time_of_day: str = None, location_type: str = None):
"""Add a new context."""
context = add_context(weather, time_of_day, location_type)
# Add to semantic search index
add_context_vibe(context["id"], weather or "", time_of_day or "", location_type or "")
await invalidate_list_cache("all_contexts")
# Log activity to MongoDB
await track_analytics_event("context_created", {"context_id": context["id"]})
return {"status": "success", "context": context}
@app.delete("/contexts/{context_id}")
async def delete_context_endpoint(context_id: int):
"""Delete a context."""
if not delete_context(context_id):
raise HTTPException(status_code=404, detail="Context not found")
remove_context_vibe(context_id)
await invalidate_list_cache("all_contexts")
# Log activity to MongoDB
await track_analytics_event("context_deleted", {"context_id": context_id})
return {"status": "success", "deleted_id": context_id}
# ============== PLAY HISTORY (MongoDB Primary) ==============
@app.get("/history")
async def list_history(user_id: int = None, limit: int = 50):
"""Get play history from MongoDB (primary storage)."""
if user_id:
# Try MongoDB first (primary storage)
history = await get_user_play_history_mongo(user_id, limit)
if not history:
# Fallback to Redis
history = await get_redis_play_history(user_id, limit)
if not history:
# Fallback to SQLite
history = get_play_history(user_id, limit)
return {"history": history, "source": "sqlite"}
return {"history": history, "source": "redis"}
return {"history": history, "source": "mongodb"}
else:
# Try MongoDB first (primary storage)
history = await get_global_play_history_mongo(limit)
if not history:
# Fallback to Redis
history = await get_redis_global_history(limit)
if not history:
# Fallback to SQLite
history = get_play_history(None, limit)
return {"history": history, "source": "sqlite"}
return {"history": history, "source": "redis"}
return {"history": history, "source": "mongodb"}
@app.post("/history")
async def create_history(user_id: int, song_id: int, context_id: int = None, duration_seconds: int = None):
"""Add a play history entry to MongoDB (primary), Redis (cache), and SQLite (backup)."""
# Get song info for MongoDB storage
song = get_song_by_id(song_id)
# Store in MongoDB (primary storage for analytics)
mongo_result = await store_play_history_mongo(
user_id=user_id,
song_id=song_id,
song_title=song.get("title") if song else None,
song_artist=song.get("artist") if song else None,
context_id=context_id,
duration_seconds=duration_seconds
)
# Store in Redis (fast cache)
await store_play_event(user_id, song_id, context_id)
# Store in SQLite (backup)
sqlite_history = add_play_history(user_id, song_id, context_id)
# Log analytics event
await track_analytics_event("song_played", {
"user_id": user_id,
"song_id": song_id,
"context_id": context_id
})
return {
"status": "success",
"history": sqlite_history,
"mongodb_id": mongo_result.get("id") if mongo_result else None,
"redis_stored": True
}
# ============== ANALYTICS ENDPOINTS (MongoDB) ==============
@app.get("/analytics/summary")
async def get_analytics(hours: int = Query(24, description="Hours to look back")):
"""Get analytics summary from MongoDB."""
summary = await get_analytics_summary(hours=hours)
return summary
@app.get("/analytics/top-songs")
async def get_top_songs(limit: int = Query(10, description="Number of results")):
"""Get top played songs from MongoDB."""
top_songs = await get_top_songs_mongo(limit)
return {"top_songs": top_songs}
@app.get("/analytics/popular-searches")
async def get_popular_searches_endpoint(hours: int = Query(24, description="Hours to look back"), limit: int = Query(10)):
"""Get popular search queries from MongoDB."""
searches = await get_popular_searches(hours=hours, limit=limit)
return {"popular_searches": searches}
@app.get("/users/{user_id}/activity")
async def get_user_activity(user_id: int, limit: int = Query(50), action_type: str = None):
"""Get user activity log from MongoDB."""
activity = await get_user_activity_mongo(user_id, limit, action_type)
return {"activity": activity}
@app.get("/storage/files")
async def list_storage_files():
"""List all MP3 files in MinIO storage."""
files = list_all_mp3_files()
return {"files": files, "count": len(files)}
# ============== SEMANTIC SEARCH ==============
@app.get("/search/songs")
async def search_songs(q: str = Query(..., description="Search query"), n: int = Query(5, description="Number of results")):
"""Semantic search for songs by vibe/lyrics."""
results = search_song_vibes(q, n_results=n)
# Log search to MongoDB
await log_search(search_type="songs", query=q, results_count=len(results))
await track_analytics_event("search_performed", {"search_type": "songs", "query": q, "results": len(results)})
return {"query": q, "results": results}
@app.get("/search/memories")
async def search_memories(q: str = Query(..., description="Search query"), n: int = Query(5, description="Number of results")):
"""Semantic search for memories."""
results = search_memory_vibes(q, n_results=n)
# Log search to MongoDB
await log_search(search_type="memories", query=q, results_count=len(results))
await track_analytics_event("search_performed", {"search_type": "memories", "query": q, "results": len(results)})
return {"query": q, "results": results}
@app.get("/search/contexts")
async def search_contexts(q: str = Query(..., description="Search query"), n: int = Query(5, description="Number of results")):
"""Semantic search for contexts."""
results = search_context_vibes(q, n_results=n)
# Log search to MongoDB
await log_search(search_type="contexts", query=q, results_count=len(results))
await track_analytics_event("search_performed", {"search_type": "contexts", "query": q, "results": len(results)})
return {"query": q, "results": results}
@app.get("/search/playlists")
async def search_playlists(q: str = Query(..., description="Search query"), n: int = Query(5, description="Number of results")):
"""Semantic search for playlists by mood."""
results = search_playlist_journeys(q, n_results=n)
# Log search to MongoDB
await log_search(search_type="playlists", query=q, results_count=len(results))
await track_analytics_event("search_performed", {"search_type": "playlists", "query": q, "results": len(results)})
return {"query": q, "results": results}
# Mount the Gradio app at /ui
app = gr.mount_gradio_app(app, gradio_app, path="/ui")