Spaces:
Sleeping
Sleeping
| from flask import Flask, render_template, send_from_directory, jsonify, request | |
| import os | |
| import mutagen | |
| from mutagen.flac import FLAC | |
| from mutagen.mp3 import MP3 | |
| from mutagen.wave import WAVE | |
| from functools import lru_cache | |
| import base64 | |
| from pathlib import Path | |
| app = Flask(__name__) | |
| # Configure music folder using Path for better path handling | |
| app.config['MUSIC_FOLDER'] = Path(__file__).parent / 'music' | |
| # Global cache for metadata | |
| METADATA_CACHE = {} | |
| STRUCTURE_CACHE = None | |
| CACHE_VERSION = 1 # Increment this when changing cache logic | |
| def get_audio_metadata(file_path): | |
| """Cache metadata for frequently accessed files""" | |
| try: | |
| # Check if metadata is in cache | |
| if file_path in METADATA_CACHE: | |
| return METADATA_CACHE[file_path] | |
| metadata = None | |
| if file_path.endswith('.flac'): | |
| audio = FLAC(file_path) | |
| metadata = { | |
| 'title': str(audio.get('title', [Path(file_path).name])[0]), | |
| 'artist': str(audio.get('artist', ['Unknown'])[0]), | |
| 'album': str(audio.get('album', ['Unknown'])[0]), | |
| 'duration': int(audio.info.length), | |
| 'sample_rate': audio.info.sample_rate, | |
| 'channels': audio.info.channels, | |
| 'artwork': None, | |
| 'lyrics': str(audio.get('lyrics', [''])[0]), | |
| 'synchronized_lyrics': get_synchronized_lyrics(audio) | |
| } | |
| if audio.pictures: | |
| try: | |
| artwork_data = audio.pictures[0].data | |
| metadata['artwork'] = f"data:image/jpeg;base64,{base64.b64encode(artwork_data).decode('utf-8')}" | |
| except Exception: | |
| pass | |
| elif file_path.endswith('.mp3'): | |
| audio = MP3(file_path) | |
| metadata = { | |
| 'title': audio.tags.get('TIT2', [Path(file_path).name])[0] if hasattr(audio, 'tags') else Path(file_path).name, | |
| 'artist': audio.tags.get('TPE1', ['Unknown'])[0] if hasattr(audio, 'tags') else 'Unknown', | |
| 'album': audio.tags.get('TALB', ['Unknown'])[0] if hasattr(audio, 'tags') else 'Unknown', | |
| 'duration': int(audio.info.length), | |
| 'sample_rate': audio.info.sample_rate, | |
| 'channels': audio.info.channels, | |
| 'artwork': None, | |
| 'lyrics': get_mp3_lyrics(audio), | |
| 'synchronized_lyrics': get_mp3_synchronized_lyrics(audio) | |
| } | |
| if hasattr(audio, 'tags'): | |
| try: | |
| apic_keys = [k for k in audio.tags.keys() if k.startswith('APIC:')] | |
| if apic_keys: | |
| artwork_data = audio.tags[apic_keys[0]].data | |
| metadata['artwork'] = f"data:image/jpeg;base64,{base64.b64encode(artwork_data).decode('utf-8')}" | |
| except Exception: | |
| pass | |
| elif file_path.endswith('.wav'): | |
| audio = WAVE(file_path) | |
| metadata = { | |
| 'title': Path(file_path).name, | |
| 'artist': 'Unknown', | |
| 'album': 'Unknown', | |
| 'duration': int(audio.info.length), | |
| 'sample_rate': audio.info.sample_rate, | |
| 'channels': audio.info.channels, | |
| 'artwork': None, | |
| 'lyrics': '', | |
| 'synchronized_lyrics': [] | |
| } | |
| if metadata: | |
| METADATA_CACHE[file_path] = metadata | |
| return metadata | |
| return None | |
| except Exception as e: | |
| print(f"Error reading metadata for {file_path}: {str(e)}") | |
| return { | |
| 'title': Path(file_path).name, | |
| 'artist': 'Unknown', | |
| 'album': 'Unknown', | |
| 'duration': 0, | |
| 'sample_rate': 0, | |
| 'channels': 0, | |
| 'artwork': None | |
| } | |
| def get_folder_structure(path): | |
| """Get folder structure with caching""" | |
| global STRUCTURE_CACHE | |
| try: | |
| if STRUCTURE_CACHE is not None: | |
| return STRUCTURE_CACHE | |
| if not Path(path).exists(): | |
| return [] | |
| structure = [] | |
| for item in Path(path).iterdir(): | |
| if item.is_file() and item.suffix.lower() in ('.mp3', '.wav', '.flac'): | |
| metadata = get_audio_metadata(str(item)) | |
| if metadata: | |
| structure.append({ | |
| 'type': 'file', | |
| 'name': item.name, | |
| 'path': str(item.relative_to(app.config['MUSIC_FOLDER'])), | |
| 'metadata': metadata | |
| }) | |
| elif item.is_dir(): | |
| structure.append({ | |
| 'type': 'folder', | |
| 'name': item.name, | |
| 'path': str(item.relative_to(app.config['MUSIC_FOLDER'])), | |
| 'contents': get_folder_structure(item) | |
| }) | |
| STRUCTURE_CACHE = structure | |
| return structure | |
| except Exception as e: | |
| print(f"Error scanning directory {path}: {str(e)}") | |
| return [] | |
| def get_music_structure(): | |
| """Get music structure with caching""" | |
| music_dir = app.config['MUSIC_FOLDER'] | |
| if not music_dir.exists(): | |
| music_dir.mkdir(parents=True, exist_ok=True) | |
| return jsonify([]) | |
| if not any(music_dir.iterdir()): | |
| return jsonify([]) | |
| structure = get_folder_structure(music_dir) | |
| return jsonify(structure) | |
| # Add cache invalidation endpoint for development | |
| def clear_cache(): | |
| """Clear all caches - use in development only""" | |
| global STRUCTURE_CACHE, METADATA_CACHE | |
| STRUCTURE_CACHE = None | |
| METADATA_CACHE.clear() | |
| get_audio_metadata.cache_clear() | |
| return jsonify({"message": "Cache cleared"}) | |
| def serve_music(filename): | |
| return send_from_directory('music', filename) | |
| def index(): | |
| return render_template('index.html') | |
| def default_cover(): | |
| return send_from_directory('static', 'default-cover.png') | |
| def manage_playlists(): | |
| global playlists | |
| if request.method == 'GET': | |
| return jsonify(playlists) | |
| if request.method == 'POST': | |
| data = request.get_json() | |
| playlist_name = data.get('name') | |
| songs = data.get('songs', []) | |
| if playlist_name: | |
| playlists[playlist_name] = songs | |
| return jsonify({'message': f'Playlist {playlist_name} created/updated'}) | |
| return jsonify({'error': 'No playlist name provided'}), 400 | |
| if request.method == 'DELETE': | |
| playlist_name = request.args.get('name') | |
| if playlist_name in playlists: | |
| del playlists[playlist_name] | |
| return jsonify({'message': f'Playlist {playlist_name} deleted'}) | |
| return jsonify({'error': 'Playlist not found'}), 404 | |
| def add_to_playlist(name): | |
| data = request.get_json() | |
| songs = data.get('songs', []) | |
| if name not in playlists: | |
| playlists[name] = [] | |
| playlists[name].extend(songs) | |
| return jsonify(playlists[name]) | |
| def remove_from_playlist(name): | |
| if name not in playlists: | |
| return jsonify({'error': 'Playlist not found'}), 404 | |
| data = request.get_json() | |
| indices = data.get('indices', []) | |
| # Remove songs in reverse order to avoid index shifting | |
| for index in sorted(indices, reverse=True): | |
| if 0 <= index < len(playlists[name]): | |
| playlists[name].pop(index) | |
| return jsonify(playlists[name]) | |
| def get_synchronized_lyrics(audio): | |
| """Extract synchronized lyrics from FLAC metadata""" | |
| try: | |
| if 'LYRICS' in audio.tags: | |
| # Try to parse synchronized lyrics in LRC format | |
| lyrics = audio.tags['LYRICS'][0] | |
| return parse_lrc_format(lyrics) | |
| except Exception as e: | |
| print(f"Error parsing synchronized lyrics: {str(e)}") | |
| return [] | |
| def get_mp3_lyrics(audio): | |
| """Extract plain lyrics from MP3 metadata""" | |
| try: | |
| if hasattr(audio, 'tags'): | |
| # Try different common lyrics tag formats | |
| for tag in ['USLT::', 'LYRICS', 'LYRICS:']: | |
| if tag in audio.tags: | |
| return str(audio.tags[tag]) | |
| except Exception as e: | |
| print(f"Error extracting MP3 lyrics: {str(e)}") | |
| return '' | |
| def get_mp3_synchronized_lyrics(audio): | |
| """Extract synchronized lyrics from MP3 metadata""" | |
| try: | |
| if hasattr(audio, 'tags'): | |
| # Look for SYLT (Synchronized Lyrics) frames | |
| sylt_frames = [f for f in audio.tags if f.startswith('SYLT:')] | |
| if sylt_frames: | |
| sylt = audio.tags[sylt_frames[0]] | |
| return parse_sylt_format(sylt) | |
| except Exception as e: | |
| print(f"Error extracting MP3 synchronized lyrics: {str(e)}") | |
| return [] | |
| def parse_lrc_format(lrc_text): | |
| """Parse LRC format synchronized lyrics""" | |
| lyrics = [] | |
| for line in lrc_text.split('\n'): | |
| if line.strip(): | |
| try: | |
| # LRC format: [mm:ss.xx]lyrics text | |
| time_str = line[1:line.find(']')] | |
| text = line[line.find(']')+1:].strip() | |
| # Convert time to seconds | |
| if '.' in time_str: | |
| mm_ss, ms = time_str.split('.') | |
| else: | |
| mm_ss, ms = time_str, '0' | |
| minutes, seconds = map(int, mm_ss.split(':')) | |
| timestamp = minutes * 60 + seconds + float(f'0.{ms}') | |
| lyrics.append({'time': timestamp, 'text': text}) | |
| except: | |
| continue | |
| return sorted(lyrics, key=lambda x: x['time']) | |
| def parse_sylt_format(sylt): | |
| """Parse SYLT format synchronized lyrics""" | |
| lyrics = [] | |
| for time, text in sylt.text: | |
| lyrics.append({ | |
| 'time': time / 1000.0, # Convert milliseconds to seconds | |
| 'text': text | |
| }) | |
| return sorted(lyrics, key=lambda x: x['time']) | |
| if __name__ == '__main__': | |
| # Create music directory if it doesn't exist | |
| if not os.path.exists(app.config['MUSIC_FOLDER']): | |
| os.makedirs(app.config['MUSIC_FOLDER']) | |
| # Use environment variables for host and port | |
| port = int(os.environ.get('PORT', 7860)) | |
| app.run(host='0.0.0.0', port=port) |