BeatDebate / src /api /backend.py
SulmanK's picture
Remove obsolete phase completion summaries and demo test scripts - Deleted `PHASE1_COMPLETION_SUMMARY.md`, `PHASE2_COMPLETION_SUMMARY.md`, `PHASE3_COMPLETION_SUMMARY.md`, and associated demo test scripts to streamline the codebase and eliminate unused documentation. This cleanup supports ongoing refactoring efforts and enhances overall project maintainability.
d5eabda
Raw
History Blame Contribute Delete
16 kB
"""
FastAPI Backend for BeatDebate Music Recommendation System
This module provides REST API endpoints for the 4-agent music recommendation
system, exposing the Enhanced Recommendation Service functionality via HTTP endpoints.
"""
import time
import uuid
from typing import Dict, Optional
from contextlib import asynccontextmanager
import os
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
# Updated imports to use new recommendation service
from ..services.recommendation_service import (
RecommendationService,
RecommendationRequest as ServiceRequest,
get_recommendation_service
)
from ..services.api_service import get_api_service, close_api_service
from ..services.cache_manager import get_cache_manager
# SmartContextManager functionality moved to SessionManagerService
from ..models.agent_models import SystemConfig, AgentConfig
from ..models.metadata_models import UnifiedTrackMetadata
from .logging_middleware import LoggingMiddleware, PerformanceLoggingMiddleware
# Setup logger - will be initialized after logging setup
logger = None
# Global service instances
recommendation_service: Optional[RecommendationService] = None
api_service = None
cache_manager = None
# context_manager removed - functionality moved to session_manager
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager for startup and shutdown."""
global recommendation_service, api_service, cache_manager, context_manager, logger
# Initialize logging
from ..utils.logging_config import setup_logging, get_logger
setup_logging(log_level=os.getenv("LOG_LEVEL", "INFO"))
logger = get_logger(__name__)
# Startup
logger.info("Initializing BeatDebate enhanced recommendation service...")
try:
# Get API keys from environment variables with fallbacks
gemini_api_key = os.getenv("GEMINI_API_KEY", "demo_gemini_key")
lastfm_api_key = os.getenv("LASTFM_API_KEY", "demo_lastfm_key")
spotify_client_id = os.getenv("SPOTIFY_CLIENT_ID", "demo_spotify_id")
spotify_client_secret = os.getenv(
"SPOTIFY_CLIENT_SECRET",
"demo_spotify_secret"
)
# Create system configuration
system_config = SystemConfig(
gemini_api_key=gemini_api_key,
lastfm_api_key=lastfm_api_key,
spotify_client_id=spotify_client_id,
spotify_client_secret=spotify_client_secret,
agent_configs={
"planner": AgentConfig(
agent_name="PlannerAgent",
agent_type="planner"
),
"genre_mood": AgentConfig(
agent_name="GenreMoodAgent",
agent_type="advocate"
),
"discovery": AgentConfig(
agent_name="DiscoveryAgent",
agent_type="advocate"
),
"judge": AgentConfig(
agent_name="JudgeAgent",
agent_type="judge"
)
}
)
# Initialize shared services
cache_manager = get_cache_manager()
api_service = get_api_service(cache_manager=cache_manager)
# Initialize enhanced recommendation service
recommendation_service = get_recommendation_service(
system_config=system_config,
api_service=api_service,
cache_manager=cache_manager
)
# Initialize agents within the service
await recommendation_service.initialize_agents()
logger.info(
"BeatDebate enhanced recommendation service initialized successfully"
)
except Exception as e:
logger.error(f"Failed to initialize recommendation service: {e}")
# For demo purposes, continue without the service
logger.warning("Continuing without recommendation service for demo")
recommendation_service = None
yield
# Shutdown
logger.info("Shutting down BeatDebate recommendation service...")
if recommendation_service:
await recommendation_service.close()
if api_service:
await close_api_service()
recommendation_service = None
api_service = None
cache_manager = None
# context_manager removed - functionality moved to session_manager
# Create FastAPI app
app = FastAPI(
title="BeatDebate API",
description="4-Agent Music Recommendation System with Strategic Planning",
version="1.0.0",
lifespan=lifespan
)
# Add middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Add logging middleware
app.add_middleware(LoggingMiddleware, exclude_paths=["/health", "/docs", "/openapi.json"])
app.add_middleware(PerformanceLoggingMiddleware, slow_request_threshold=5.0)
# Request/Response Models
class RecommendationRequest(BaseModel):
"""Request model for music recommendations."""
query: str = Field(..., description="User's music preference query")
session_id: Optional[str] = Field(
None,
description="Session identifier for conversation history"
)
max_recommendations: int = Field(
3,
ge=1,
le=10,
description="Maximum number of recommendations"
)
include_previews: bool = Field(
True,
description="Whether to include audio previews"
)
chat_context: Optional[Dict] = Field(
None,
description="Previous chat context for continuity"
)
class HealthResponse(BaseModel):
"""Health check response model."""
status: str
timestamp: float
version: str
components: Dict[str, str]
class PlanningResponse(BaseModel):
"""Response model for planning strategy."""
strategy: Dict # Using Dict instead of PlanningStrategy for now
execution_time: float
session_id: str
# API Endpoints
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint."""
return HealthResponse(
status="healthy",
timestamp=time.time(),
version="1.0.0",
components={
"recommendation_service": (
"active" if recommendation_service else "inactive"
),
"lastfm_client": "configured",
"spotify_client": "configured"
}
)
def transform_unified_to_ui_format(unified_track: UnifiedTrackMetadata) -> Dict:
"""Transform UnifiedTrackMetadata to UI-expected format."""
return {
"title": unified_track.name,
"artist": unified_track.artist,
"album": unified_track.album,
"confidence": unified_track.recommendation_score or 0.0,
"explanation": unified_track.recommendation_reason or "",
"source": unified_track.agent_source or "unknown",
"genres": unified_track.genres,
"moods": unified_track.tags, # tags are used as moods
"preview_url": unified_track.preview_url,
"spotify_url": unified_track.external_urls.get("spotify") if unified_track.external_urls else None,
"quality_score": unified_track.quality_score,
"novelty_score": unified_track.underground_score, # underground_score is used as novelty
# Additional metadata
"popularity": unified_track.popularity,
"listeners": unified_track.listeners,
"playcount": unified_track.playcount
}
@app.post("/recommendations")
async def get_recommendations(request: RecommendationRequest):
"""Get music recommendations using the enhanced 4-agent system."""
if not recommendation_service:
raise HTTPException(
status_code=503,
detail="Recommendation service not available"
)
start_time = time.time()
logger.info(f"Recommendation request: {request.query}")
try:
# Transform to service request format
service_request = ServiceRequest(
query=request.query,
session_id=request.session_id,
max_recommendations=request.max_recommendations,
include_audio_features=request.include_previews,
context=request.chat_context # 🔧 FIX: Map chat_context to context field
)
# Get recommendations using enhanced service
service_response = await recommendation_service.get_recommendations(
service_request
)
processing_time = time.time() - start_time
# Transform recommendations to UI format
recommendations = [
transform_unified_to_ui_format(track)
for track in service_response.recommendations
]
# Format agent reasoning for UI
formatted_reasoning = []
for reasoning in service_response.reasoning:
if isinstance(reasoning, str):
formatted_reasoning.append(reasoning)
else:
formatted_reasoning.append(str(reasoning))
response_data = {
"recommendations": recommendations,
"strategy_used": service_response.strategy_used,
"reasoning": formatted_reasoning,
"session_id": service_response.session_id,
"processing_time": processing_time,
"metadata": service_response.metadata
}
logger.info(
f"✅ Recommendations generated: {len(recommendations)} tracks, "
f"{processing_time:.1f}s"
)
return JSONResponse(content=response_data)
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Recommendation error: {str(e)}")
# Return structured error response
return JSONResponse(
content={
"error": "Recommendation generation failed",
"detail": str(e),
"session_id": request.session_id,
"processing_time": processing_time,
"recommendations": [],
"strategy_used": {},
"reasoning": [f"Error: {str(e)}"],
"metadata": {"error": True}
},
status_code=500
)
@app.post("/planning")
async def get_planning_strategy(request: RecommendationRequest):
"""
Get the planning strategy without executing full recommendations.
This endpoint is useful for demonstrating the PlannerAgent's strategic
thinking process in the UI.
"""
if not recommendation_service:
# Return demo planning response
demo_strategy = {
"task_analysis": {
"primary_goal": "Demo music discovery",
"complexity_level": "medium",
"context_factors": ["demo", "testing"]
},
"coordination_strategy": {
"genre_mood_agent": {"focus": "Demo genre search"},
"discovery_agent": {"focus": "Demo discovery search"}
},
"evaluation_framework": {
"primary_weights": {"quality": 0.5, "novelty": 0.5}
}
}
return PlanningResponse(
strategy=demo_strategy,
execution_time=0.5,
session_id=request.session_id or "demo_session"
)
start_time = time.time()
try:
logger.info(f"Generating planning strategy for: {request.query}")
# Get planning strategy from PlannerAgent
strategy = await recommendation_service.get_planning_strategy(
query=request.query,
session_id=request.session_id
)
execution_time = time.time() - start_time
return PlanningResponse(
strategy=(
strategy.dict() if hasattr(strategy, 'dict') else strategy
),
execution_time=execution_time,
session_id=request.session_id or str(uuid.uuid4())
)
except Exception as e:
logger.error(f"Planning strategy generation failed: {e}")
raise HTTPException(
status_code=500,
detail=f"Planning failed: {str(e)}"
)
@app.get("/sessions/{session_id}/history")
async def get_session_history(session_id: str):
"""Get conversation history for a session."""
if not recommendation_service:
raise HTTPException(
status_code=503,
detail="Recommendation service not available"
)
try:
# This would integrate with session management
# For now, return placeholder
return {
"session_id": session_id,
"history": [],
"message": "Session history feature coming soon"
}
except Exception as e:
logger.error(f"Failed to get session history: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to get session history: {str(e)}"
)
@app.post("/feedback")
async def submit_feedback(
session_id: str,
recommendation_id: str,
feedback: str, # "thumbs_up" or "thumbs_down"
background_tasks: BackgroundTasks
):
"""Submit user feedback for recommendations."""
if feedback not in ["thumbs_up", "thumbs_down"]:
raise HTTPException(status_code=400, detail="Invalid feedback value")
# Add background task to process feedback
background_tasks.add_task(
process_feedback,
session_id=session_id,
recommendation_id=recommendation_id,
feedback=feedback
)
return {"message": "Feedback submitted successfully"}
async def process_feedback(
session_id: str,
recommendation_id: str,
feedback: str
):
"""Background task to process user feedback."""
logger.info(
f"Processing feedback: {feedback} for recommendation "
f"{recommendation_id} in session {session_id}"
)
# This would integrate with learning/improvement systems
# For now, just log the feedback
@app.get("/sessions/{session_id}/context")
async def get_session_context(session_id: str):
"""Get smart context status for a session."""
if not recommendation_service:
raise HTTPException(
status_code=503,
detail="Recommendation service not available"
)
try:
context_summary = await recommendation_service.smart_context_manager.get_session_context(session_id)
return {
"session_id": session_id,
"context_summary": context_summary,
"timestamp": time.time()
}
except Exception as e:
logger.error(f"Failed to get context summary: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to get context summary: {str(e)}"
)
# Error handlers
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
"""Custom HTTP exception handler."""
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"timestamp": time.time(),
"path": str(request.url)
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
"""General exception handler for unexpected errors."""
logger.error(f"Unexpected error: {exc}")
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"timestamp": time.time(),
"path": str(request.url)
}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)