Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import spotipy | |
| from spotipy.oauth2 import SpotifyOAuth | |
| import requests | |
| import json | |
| import time | |
| from datetime import datetime, timedelta | |
| import openai | |
| import anthropic | |
| import os | |
| from typing import Dict, List, Any, Optional | |
| import base64 | |
| # Page configuration | |
| st.set_page_config( | |
| page_title="Spotify AI Assistant", | |
| page_icon="π΅", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Custom CSS | |
| st.markdown(""" | |
| <style> | |
| .main-header { | |
| font-size: 2.5rem; | |
| font-weight: bold; | |
| background: linear-gradient(90deg, #1DB954, #191414); | |
| -webkit-background-clip: text; | |
| -webkit-text-fill-color: transparent; | |
| margin-bottom: 1rem; | |
| } | |
| .spotify-card { | |
| background: linear-gradient(135deg, #1DB954, #191414); | |
| color: white; | |
| padding: 1.5rem; | |
| border-radius: 1rem; | |
| margin: 1rem 0; | |
| } | |
| .chat-message { | |
| padding: 1rem; | |
| border-radius: 0.5rem; | |
| margin: 0.5rem 0; | |
| } | |
| .user-message { | |
| background-color: #1DB954; | |
| color: white; | |
| margin-left: 2rem; | |
| } | |
| .ai-message { | |
| background-color: #f0f0f0; | |
| color: black; | |
| margin-right: 2rem; | |
| } | |
| .track-card { | |
| background: #f8f9fa; | |
| padding: 1rem; | |
| border-radius: 0.5rem; | |
| margin: 0.5rem 0; | |
| border-left: 4px solid #1DB954; | |
| } | |
| .stButton>button { | |
| background-color: #1DB954; | |
| color: white; | |
| font-weight: bold; | |
| } | |
| .stButton>button:hover { | |
| background-color: #1ed760; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Initialize session state | |
| def init_session_state(): | |
| if 'spotify_token' not in st.session_state: | |
| st.session_state.spotify_token = None | |
| if 'spotify_auth_manager' not in st.session_state: | |
| st.session_state.spotify_auth_manager = None | |
| if 'chat_history' not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if 'user_profile' not in st.session_state: | |
| st.session_state.user_profile = None | |
| if 'ai_provider' not in st.session_state: | |
| st.session_state.ai_provider = "OpenAI" | |
| if 'openai_api_key' not in st.session_state: | |
| st.session_state.openai_api_key = "" | |
| if 'anthropic_api_key' not in st.session_state: | |
| st.session_state.anthropic_api_key = "" | |
| if 'openrouter_api_key' not in st.session_state: | |
| st.session_state.openrouter_api_key = "" | |
| if 'current_model' not in st.session_state: | |
| st.session_state.current_model = "gpt-3.5-turbo" | |
| if 'spotify_client_id' not in st.session_state: | |
| st.session_state.spotify_client_id = "" | |
| if 'spotify_client_secret' not in st.session_state: | |
| st.session_state.spotify_client_secret = "" | |
| if 'spotify_redirect_uri' not in st.session_state: | |
| st.session_state.spotify_redirect_uri = "http://localhost:8501" | |
| init_session_state() | |
| # Header | |
| st.markdown('<h1 class="main-header">π΅ Spotify AI Assistant</h1>', unsafe_allow_html=True) | |
| st.markdown("Built with anycoder β’ [GitHub](https://github.com/anycoder)", unsafe_allow_html=True) | |
| # Sidebar | |
| with st.sidebar: | |
| st.image("https://storage.googleapis.com/pr-newsroom-wp/1/2023/05/Spotify_Primary_Logo_RGB_Green.png", width=200) | |
| st.markdown("### π Spotify Configuration") | |
| # Spotify credentials | |
| st.session_state.spotify_client_id = st.text_input( | |
| "Spotify Client ID", | |
| value=st.session_state.spotify_client_id, | |
| type="password", | |
| help="Get from Spotify Developer Dashboard" | |
| ) | |
| st.session_state.spotify_client_secret = st.text_input( | |
| "Spotify Client Secret", | |
| value=st.session_state.spotify_client_secret, | |
| type="password", | |
| help="Get from Spotify Developer Dashboard" | |
| ) | |
| st.session_state.spotify_redirect_uri = st.text_input( | |
| "Redirect URI", | |
| value=st.session_state.spotify_redirect_uri, | |
| help="Must match Spotify Developer Dashboard" | |
| ) | |
| st.markdown("---") | |
| st.markdown("### π€ AI Configuration") | |
| # AI Provider selection | |
| st.session_state.ai_provider = st.selectbox( | |
| "AI Provider", | |
| ["OpenAI", "Anthropic", "OpenRouter"], | |
| help="Choose your AI model provider" | |
| ) | |
| # API keys based on provider | |
| if st.session_state.ai_provider == "OpenAI": | |
| st.session_state.openai_api_key = st.text_input( | |
| "OpenAI API Key", | |
| value=st.session_state.openai_api_key, | |
| type="password", | |
| help="Get from platform.openai.com" | |
| ) | |
| st.session_state.current_model = st.selectbox( | |
| "Model", | |
| ["gpt-3.5-turbo", "gpt-4", "gpt-4-turbo"] | |
| ) | |
| elif st.session_state.ai_provider == "Anthropic": | |
| st.session_state.anthropic_api_key = st.text_input( | |
| "Anthropic API Key", | |
| value=st.session_state.anthropic_api_key, | |
| type="password", | |
| help="Get from console.anthropic.com" | |
| ) | |
| st.session_state.current_model = st.selectbox( | |
| "Model", | |
| ["claude-3-haiku-20240307", "claude-3-sonnet-20240229", "claude-3-opus-20240229"] | |
| ) | |
| elif st.session_state.ai_provider == "OpenRouter": | |
| st.session_state.openrouter_api_key = st.text_input( | |
| "OpenRouter API Key", | |
| value=st.session_state.openrouter_api_key, | |
| type="password", | |
| help="Get from openrouter.ai" | |
| ) | |
| st.session_state.current_model = st.text_input( | |
| "Model (OpenRouter Format)", | |
| value="openai/gpt-3.5-turbo" | |
| ) | |
| st.markdown("---") | |
| st.markdown("### π Options") | |
| limit = st.slider("Data Limit", 5, 50, 20, help="Number of items to fetch from Spotify") | |
| if st.button("Clear Chat History"): | |
| st.session_state.chat_history = [] | |
| st.success("Chat history cleared!") | |
| # Spotify Authentication | |
| def get_spotify_auth_manager(): | |
| if not st.session_state.spotify_client_id or not st.session_state.spotify_client_secret: | |
| return None | |
| return SpotifyOAuth( | |
| client_id=st.session_state.spotify_client_id, | |
| client_secret=st.session_state.spotify_client_secret, | |
| redirect_uri=st.session_state.spotify_redirect_uri, | |
| scope="user-read-private user-read-email user-top-read user-read-recently-played playlist-read-private playlist-modify-private playlist-modify-public", | |
| cache_path=".spotify_cache" | |
| ) | |
| def authenticate_spotify(): | |
| auth_manager = get_spotify_auth_manager() | |
| if not auth_manager: | |
| st.error("Please enter Spotify credentials in the sidebar") | |
| return False | |
| st.session_state.spotify_auth_manager = auth_manager | |
| # Generate auth URL | |
| auth_url = auth_manager.get_authorize_url() | |
| st.markdown("### π Spotify Authentication") | |
| st.markdown(f"1. Click [here to authorize]({auth_url})") | |
| st.markdown("2. Copy the full redirect URL after authorization") | |
| redirect_url = st.text_input("Paste redirect URL here:", placeholder="http://localhost:8501/?code=...") | |
| if st.button("Complete Authentication"): | |
| if redirect_url: | |
| try: | |
| # Extract code from URL | |
| code = auth_manager.parse_response_code(redirect_url) | |
| token_info = auth_manager.get_access_token(code) | |
| st.session_state.spotify_token = token_info | |
| st.success("β Successfully authenticated with Spotify!") | |
| return True | |
| except Exception as e: | |
| st.error(f"Authentication failed: {str(e)}") | |
| return False | |
| else: | |
| st.warning("Please paste the redirect URL") | |
| return False | |
| return False | |
| def get_spotify_client(): | |
| if not st.session_state.spotify_token: | |
| return None | |
| auth_manager = get_spotify_auth_manager() | |
| if not auth_manager: | |
| return None | |
| return spotipy.Spotify(auth=st.session_state.spotify_token['access_token']) | |
| # Spotify Data Functions | |
| def fetch_user_profile(): | |
| sp = get_spotify_client() | |
| if not sp: | |
| return None | |
| try: | |
| return sp.current_user() | |
| except: | |
| return None | |
| def fetch_top_tracks(limit=20, time_range='medium_term'): | |
| sp = get_spotify_client() | |
| if not sp: | |
| return [] | |
| try: | |
| results = sp.current_user_top_tracks(limit=limit, time_range=time_range) | |
| return results['items'] | |
| except: | |
| return [] | |
| def fetch_top_artists(limit=20, time_range='medium_term'): | |
| sp = get_spotify_client() | |
| if not sp: | |
| return [] | |
| try: | |
| results = sp.current_user_top_artists(limit=limit, time_range=time_range) | |
| return results['items'] | |
| except: | |
| return [] | |
| def fetch_recently_played(limit=20): | |
| sp = get_spotify_client() | |
| if not sp: | |
| return [] | |
| try: | |
| results = sp.current_user_recently_played(limit=limit) | |
| return results['items'] | |
| except: | |
| return [] | |
| def fetch_user_playlists(limit=20): | |
| sp = get_spotify_client() | |
| if not sp: | |
| return [] | |
| try: | |
| results = sp.current_user_playlists(limit=limit) | |
| return results['items'] | |
| except: | |
| return [] | |
| def search_spotify(query, types=['track', 'artist', 'album'], limit=5): | |
| sp = get_spotify_client() | |
| if not sp: | |
| return {} | |
| try: | |
| results = sp.search(q=query, type=','.join(types), limit=limit) | |
| return results | |
| except: | |
| return {} | |
| def create_playlist(name, description="", public=False): | |
| sp = get_spotify_client() | |
| if not sp: | |
| return None | |
| try: | |
| user_id = sp.current_user()['id'] | |
| playlist = sp.user_playlist_create(user_id, name, public=public, description=description) | |
| return playlist | |
| except Exception as e: | |
| st.error(f"Failed to create playlist: {str(e)}") | |
| return None | |
| def add_tracks_to_playlist(playlist_id, track_uris): | |
| sp = get_spotify_client() | |
| if not sp: | |
| return False | |
| try: | |
| sp.playlist_add_items(playlist_id, track_uris) | |
| return True | |
| except: | |
| return False | |
| # AI Chat Functions | |
| def get_ai_client(): | |
| provider = st.session_state.ai_provider | |
| if provider == "OpenAI" and st.session_state.openai_api_key: | |
| openai.api_key = st.session_state.openai_api_key | |
| return "openai" | |
| elif provider == "Anthropic" and st.session_state.anthropic_api_key: | |
| return "anthropic" | |
| elif provider == "OpenRouter" and st.session_state.openrouter_api_key: | |
| return "openrouter" | |
| return None | |
| def format_spotify_data_for_ai(): | |
| """Format user Spotify data for AI context""" | |
| profile = fetch_user_profile() | |
| top_tracks = fetch_top_tracks(limit=10) | |
| top_artists = fetch_top_artists(limit=10) | |
| recent = fetch_recently_played(limit=5) | |
| context = "Spotify User Data:\n\n" | |
| if profile: | |
| context += f"User: {profile.get('display_name', 'Unknown')} " | |
| context += f"({profile.get('email', 'No email')})\n" | |
| context += f"Country: {profile.get('country', 'Unknown')}\n" | |
| context += f"Product: {profile.get('product', 'Unknown')}\n\n" | |
| if top_tracks: | |
| context += "Top Tracks:\n" | |
| for i, track in enumerate(top_tracks[:5], 1): | |
| artists = ", ".join([a['name'] for a in track['artists']]) | |
| context += f"{i}. {track['name']} by {artists}\n" | |
| context += "\n" | |
| if top_artists: | |
| context += "Top Artists:\n" | |
| for i, artist in enumerate(top_artists[:5], 1): | |
| context += f"{i}. {artist['name']}\n" | |
| context += "\n" | |
| if recent: | |
| context += "Recently Played:\n" | |
| for i, item in enumerate(recent[:3], 1): | |
| track = item['track'] | |
| artists = ", ".join([a['name'] for a in track['artists']]) | |
| context += f"{i}. {track['name']} by {artists}\n" | |
| return context | |
| def generate_ai_response(prompt, context=""): | |
| client_type = get_ai_client() | |
| if not client_type: | |
| return "Please configure your AI API keys in the sidebar." | |
| full_prompt = f"""You are a knowledgeable Spotify AI assistant with access to user data. | |
| Use the following Spotify data to provide personalized, accurate responses: | |
| {context} | |
| User Question: {prompt} | |
| Provide helpful, specific recommendations and insights based on their actual listening data. | |
| If they ask about their music taste, reference their top artists and tracks. | |
| If they want recommendations, consider their preferences. | |
| Be conversational and friendly.""" | |
| try: | |
| if client_type == "openai": | |
| response = openai.ChatCompletion.create( | |
| model=st.session_state.current_model, | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful Spotify AI assistant."}, | |
| {"role": "user", "content": full_prompt} | |
| ], | |
| max_tokens=500, | |
| temperature=0.7 | |
| ) | |
| return response.choices[0].message.content | |
| elif client_type == "anthropic": | |
| client = anthropic.Anthropic(api_key=st.session_state.anthropic_api_key) | |
| response = client.messages.create( | |
| model=st.session_state.current_model, | |
| max_tokens=500, | |
| temperature=0.7, | |
| messages=[ | |
| {"role": "user", "content": full_prompt} | |
| ] | |
| ) | |
| return response.content[0].text | |
| elif client_type == "openrouter": | |
| headers = { | |
| "Authorization": f"Bearer {st.session_state.openrouter_api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "model": st.session_state.current_model, | |
| "messages": [ | |
| {"role": "user", "content": full_prompt} | |
| ], | |
| "max_tokens": 500, | |
| "temperature": 0.7 | |
| } | |
| response = requests.post( | |
| "https://openrouter.ai/api/v1/chat/completions", | |
| headers=headers, | |
| json=data | |
| ) | |
| return response.json()['choices'][0]['message']['content'] | |
| except Exception as e: | |
| return f"Error generating response: {str(e)}" | |
| def generate_recommendations_from_tracks(seed_tracks, limit=10): | |
| sp = get_spotify_client() | |
| if not sp: | |
| return [] | |
| try: | |
| recommendations = sp.recommendations(seed_tracks=seed_tracks, limit=limit) | |
| return recommendations['tracks'] | |
| except: | |
| return [] | |
| # Main App Logic | |
| def main(): | |
| # Check if authenticated | |
| if not st.session_state.spotify_token: | |
| with st.container(): | |
| st.markdown('<div class="spotify-card">', unsafe_allow_html=True) | |
| st.markdown("### Welcome to Spotify AI Assistant!") | |
| st.markdown(""" | |
| This app combines Spotify data with AI to provide: | |
| - Personalized music recommendations | |
| - Insights into your listening habits | |
| - Natural language queries about your music | |
| - Playlist creation assistance | |
| """) | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| if authenticate_spotify(): | |
| st.rerun() | |
| return | |
| # Main interface after authentication | |
| sp = get_spotify_client() | |
| if not sp: | |
| st.error("Failed to initialize Spotify client") | |
| return | |
| # Fetch user data | |
| profile = fetch_user_profile() | |
| if profile: | |
| st.session_state.user_profile = profile | |
| # Create tabs | |
| tab1, tab2, tab3, tab4 = st.tabs(["π¬ Chat", "π Insights", "π΅ Recommendations", "π§ Your Data"]) | |
| # Chat Tab | |
| with tab1: | |
| st.markdown("### π¬ Chat with your Spotify AI Assistant") | |
| # Display chat history | |
| chat_container = st.container() | |
| with chat_container: | |
| for message in st.session_state.chat_history: | |
| if message["role"] == "user": | |
| st.markdown(f'<div class="chat-message user-message"><strong>You:</strong> {message["content"]}</div>', unsafe_allow_html=True) | |
| else: | |
| st.markdown(f'<div class="chat-message ai-message"><strong>AI:</strong> {message["content"]}</div>', unsafe_allow_html=True) | |
| # Chat input | |
| user_input = st.chat_input("Ask about your music, get recommendations, or create playlists...") | |
| if user_input: | |
| # Add user message to history | |
| st.session_state.chat_history.append({"role": "user", "content": user_input}) | |
| # Get context and generate response | |
| with st.spinner("Thinking..."): | |
| context = format_spotify_data_for_ai() | |
| response = generate_ai_response(user_input, context) | |
| # Add AI response to history | |
| st.session_state.chat_history.append({"role": "assistant", "content": response}) | |
| # Special handling for playlist creation requests | |
| if "create playlist" in user_input.lower() and "created" in response.lower(): | |
| st.info("π‘ Tip: You can create playlists by asking 'Create a playlist based on my top artists'") | |
| st.rerun() | |
| # Insights Tab | |
| with tab2: | |
| st.markdown("### π Your Music Insights") | |
| if profile: | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Username", profile.get('display_name', 'N/A')) | |
| with col2: | |
| st.metric("Country", profile.get('country', 'N/A')) | |
| with col3: | |
| st.metric("Account Type", profile.get('product', 'N/A').title()) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("#### π€ Top Artists") | |
| top_artists = fetch_top_artists(limit=10) | |
| for i, artist in enumerate(top_artists, 1): | |
| st.markdown(f""" | |
| <div class="track-card"> | |
| <strong>{i}. {artist['name']}</strong><br> | |
| <small>Genres: {', '.join(artist.get('genres', [])[:3])}</small> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| with col2: | |
| st.markdown("#### π΅ Top Tracks") | |
| top_tracks = fetch_top_tracks(limit=10) | |
| for i, track in enumerate(top_tracks, 1): | |
| artists = ", ".join([a['name'] for a in track['artists']]) | |
| st.markdown(f""" | |
| <div class="track-card"> | |
| <strong>{i}. {track['name']}</strong><br> | |
| <small>by {artists}</small> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Recommendations Tab | |
| with tab3: | |
| st.markdown("### π΅ AI-Powered Recommendations") | |
| # Get seed tracks from top tracks | |
| top_tracks = fetch_top_tracks(limit=5) | |
| if top_tracks: | |
| seed_tracks = [track['id'] for track in top_tracks[:3]] | |
| st.markdown("#### Based on your top tracks:") | |
| for track in top_tracks: | |
| artists = ", ".join([a['name'] for a in track['artists']]) | |
| st.markdown(f"- {track['name']} by {artists}") | |
| if st.button("Generate Recommendations"): | |
| with st.spinner("Generating personalized recommendations..."): | |
| recommendations = generate_recommendations_from_tracks(seed_tracks, limit=10) | |
| if recommendations: | |
| st.success("Here are your personalized recommendations:") | |
| for track in recommendations: | |
| artists = ", ".join([a['name'] for a in track['artists']]) | |
| album = track['album']['name'] | |
| st.markdown(f""" | |
| <div class="track-card"> | |
| <strong>{track['name']}</strong> by {artists}<br> | |
| <small>Album: {album}</small> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| else: | |
| st.warning("Could not generate recommendations. Try again later.") | |
| else: | |
| st.info("Listen to more music to get recommendations!") | |
| # Data Tab | |
| with tab4: | |
| st.markdown("### π§ Your Spotify Data") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("#### Recently Played") | |
| recent = fetch_recently_played(limit=10) | |
| for item in recent: | |
| track = item['track'] | |
| played_at = datetime.strptime(item['played_at'][:19], "%Y-%m-%dT%H:%M:%S") | |
| artists = ", ".join([a['name'] for a in track['artists']]) | |
| st.markdown(f""" | |
| <div class="track-card"> | |
| <strong>{track['name']}</strong> by {artists}<br> | |
| <small>Played at: {played_at.strftime('%Y-%m-%d %H:%M')}</small> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| with col2: | |
| st.markdown("#### Your Playlists") | |
| playlists = fetch_user_playlists(limit=10) | |
| for playlist in playlists: | |
| st.markdown(f""" | |
| <div class="track-card"> | |
| <strong>{playlist['name']}</strong><br> | |
| <small>{playlist['tracks']['total']} tracks</small> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Footer | |
| st.markdown("---") | |
| st.markdown(""" | |
| <div style="text-align: center; color: #666;"> | |
| <p>Built with β€οΈ using Streamlit | Data powered by Spotify API | Intelligence powered by AI</p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| if __name__ == "__main__": | |
| main() |