Spaces:
Sleeping
Sleeping
| from mutagen.mp3 import MP3 | |
| from mutagen.id3 import ID3, SYLT, USLT, Encoding | |
| import os | |
| import tempfile | |
| import shutil | |
| import subprocess | |
| from typing import List, Dict, Tuple | |
| # --- Helper function to check for ffmpeg --- | |
| def is_ffmpeg_available(): | |
| """Check if ffmpeg is installed and accessible in the system's PATH.""" | |
| return shutil.which("ffmpeg") is not None | |
| class MP3Embedder: | |
| """Handles embedding SYLT synchronized lyrics into MP3 files with robust error handling.""" | |
| def __init__(self): | |
| """Initialize the MP3 embedder.""" | |
| self.temp_dir = "/tmp/audio_sync" | |
| os.makedirs(self.temp_dir, exist_ok=True) | |
| self.ffmpeg_available = is_ffmpeg_available() | |
| def embed_sylt_lyrics(self, audio_path: str, word_timestamps: List[Dict], | |
| text: str, output_filename: str) -> Tuple[str, List[str]]: | |
| """ | |
| Embeds SYLT synchronized lyrics into an MP3 file and returns logs. | |
| Returns: | |
| A tuple containing: | |
| - The path to the output MP3 file. | |
| - A list of log messages detailing the process. | |
| """ | |
| log_messages = [] | |
| def log_and_print(message): | |
| log_messages.append(message) | |
| print(f"MP3_EMBEDDER: {message}") | |
| log_and_print(f"--- MP3Embedder initialized. ffmpeg available: {self.ffmpeg_available} ---") | |
| log_and_print(f"--- Starting SYLT embedding for: {os.path.basename(audio_path)} ---") | |
| output_path = os.path.join(self.temp_dir, output_filename) | |
| try: | |
| # --- Step 1: Ensure the file is in MP3 format --- | |
| if not audio_path.lower().endswith('.mp3'): | |
| if self.ffmpeg_available: | |
| log_and_print(f"'{os.path.basename(audio_path)}' is not an MP3. Converting with ffmpeg...") | |
| try: | |
| subprocess.run( | |
| ['ffmpeg', '-i', audio_path, '-codec:a', 'libmp3lame', '-q:a', '2', output_path], | |
| check=True, capture_output=True, text=True | |
| ) | |
| log_and_print("--- ffmpeg conversion successful. ---") | |
| except subprocess.CalledProcessError as e: | |
| log_and_print("--- ERROR: ffmpeg conversion failed. ---") | |
| log_and_print(f"--- ffmpeg stderr: {e.stderr} ---") | |
| log_and_print("--- Fallback: Copying original file without conversion. ---") | |
| shutil.copy2(audio_path, output_path) | |
| else: | |
| log_and_print("--- WARNING: ffmpeg is not available. Cannot convert non-MP3 file. Copying directly. ---") | |
| shutil.copy2(audio_path, output_path) | |
| else: | |
| log_and_print("--- Audio is already MP3. Copying to temporary location. ---") | |
| shutil.copy2(audio_path, output_path) | |
| # --- Step 2: Create SYLT data --- | |
| log_and_print("--- Creating SYLT data from timestamps... ---") | |
| sylt_data = self._create_sylt_data(word_timestamps) | |
| if not sylt_data: | |
| log_and_print("--- WARNING: No SYLT data could be created. Skipping embedding. ---") | |
| return output_path, log_messages | |
| log_and_print(f"--- Created {len(sylt_data)} SYLT entries. ---") | |
| # --- Step 3: Embed data into the MP3 file --- | |
| try: | |
| log_and_print("--- Loading MP3 file with mutagen... ---") | |
| audio_file = MP3(output_path, ID3=ID3) | |
| if audio_file.tags is None: | |
| log_and_print("--- No ID3 tags found. Creating new ones. ---") | |
| audio_file.add_tags() | |
| # --- Embed SYLT (Synchronized Lyrics) --- | |
| log_and_print("--- Creating and adding SYLT frame... ---") | |
| sylt_frame = SYLT( | |
| encoding=Encoding.UTF8, | |
| lang='eng', | |
| format=2, | |
| type=1, | |
| text=sylt_data | |
| ) | |
| audio_file.tags.delall('SYLT') | |
| audio_file.tags.add(sylt_frame) | |
| # --- Embed USLT (Unsynchronized Lyrics) as a fallback --- | |
| log_and_print("--- Creating and adding USLT frame... ---") | |
| uslt_frame = USLT( | |
| encoding=Encoding.UTF8, | |
| lang='eng', | |
| desc='', | |
| text=text | |
| ) | |
| audio_file.tags.delall('USLT') | |
| audio_file.tags.add(uslt_frame) | |
| audio_file.save() | |
| log_and_print("--- Successfully embedded SYLT and USLT frames. ---") | |
| except Exception as e: | |
| log_and_print(f"--- ERROR: Failed to embed SYLT/USLT: {e} ---") | |
| return output_path, log_messages | |
| except Exception as e: | |
| log_and_print(f"--- ERROR: Unexpected error in embed_sylt_lyrics: {e} ---") | |
| return output_path, log_messages | |
| def _create_sylt_data(self, word_timestamps: List[Dict]) -> List[tuple]: | |
| """ | |
| Create SYLT data format from word timestamps | |
| Args: | |
| word_timestamps: List of word timestamp dictionaries | |
| Returns: | |
| List of tuples (text, timestamp_in_milliseconds) | |
| """ | |
| # Debug print to check incoming data | |
| print(f"DEBUG: word_timestamps received in _create_sylt_data: {word_timestamps}") | |
| try: | |
| sylt_data = [] | |
| for word_data in word_timestamps: | |
| word = word_data.get('word', '').strip() | |
| start_time = word_data.get('start', 0) | |
| if word: | |
| # Convert seconds to milliseconds | |
| timestamp_ms = int(start_time * 1000) | |
| sylt_data.append((word, timestamp_ms)) | |
| return sylt_data | |
| except Exception as e: | |
| print(f"Error creating SYLT data: {str(e)}") | |
| return [] | |
| def _create_line_based_sylt_data(self, word_timestamps: List[Dict], max_words_per_line: int = 6) -> List[tuple]: | |
| """ | |
| Create line-based SYLT data (alternative approach) | |
| Args: | |
| word_timestamps: List of word timestamp dictionaries | |
| max_words_per_line: Maximum words per line | |
| Returns: | |
| List of tuples (line_text, timestamp_in_milliseconds) | |
| """ | |
| try: | |
| sylt_data = [] | |
| current_line = [] | |
| for word_data in word_timestamps: | |
| current_line.append(word_data) | |
| # Check if we should end this line | |
| if len(current_line) >= max_words_per_line: | |
| if current_line: | |
| line_text = ' '.join([w.get('word', '') for w in current_line]).strip() | |
| start_time = current_line[0].get('start', 0) | |
| timestamp_ms = int(start_time * 1000) | |
| if line_text: | |
| sylt_data.append((line_text, timestamp_ms)) | |
| current_line = [] | |
| # Add remaining words as final line | |
| if current_line: | |
| line_text = ' '.join([w.get('word', '') for w in current_line]).strip() | |
| start_time = current_line[0].get('start', 0) | |
| timestamp_ms = int(start_time * 1000) | |
| if line_text: | |
| sylt_data.append((line_text, timestamp_ms)) | |
| return sylt_data | |
| except Exception as e: | |
| print(f"Error creating line-based SYLT data: {str(e)}") | |
| return [] | |
| def verify_sylt_embedding(self, mp3_path: str) -> Dict: | |
| """ | |
| Verify that SYLT lyrics are properly embedded | |
| Args: | |
| mp3_path: Path to the MP3 file | |
| Returns: | |
| Dictionary with verification results | |
| """ | |
| try: | |
| audio_file = MP3(mp3_path) | |
| result = { | |
| 'has_sylt': False, | |
| 'has_uslt': False, | |
| 'sylt_entries': 0, | |
| 'error': None | |
| } | |
| if audio_file.tags: | |
| # Check for SYLT | |
| sylt_frames = audio_file.tags.getall('SYLT') | |
| if sylt_frames: | |
| result['has_sylt'] = True | |
| result['sylt_entries'] = len(sylt_frames[0].text) if sylt_frames[0].text else 0 | |
| # Check for USLT (fallback) | |
| uslt_frames = audio_file.tags.getall('USLT') | |
| if uslt_frames: | |
| result['has_uslt'] = True | |
| return result | |
| except Exception as e: | |
| return { | |
| 'has_sylt': False, | |
| 'has_uslt': False, | |
| 'sylt_entries': 0, | |
| 'error': str(e) | |
| } | |
| def extract_sylt_lyrics(self, mp3_path: str) -> List[Dict]: | |
| """ | |
| Extract SYLT lyrics from an MP3 file (for debugging) | |
| Args: | |
| mp3_path: Path to the MP3 file | |
| Returns: | |
| List of dictionaries with text and timestamp | |
| """ | |
| try: | |
| audio_file = MP3(mp3_path) | |
| lyrics_data = [] | |
| if audio_file.tags: | |
| sylt_frames = audio_file.tags.getall('SYLT') | |
| for frame in sylt_frames: | |
| if frame.text: | |
| for text, timestamp_ms in frame.text: | |
| lyrics_data.append({ | |
| 'text': text, | |
| 'timestamp': timestamp_ms / 1000.0 # Convert to seconds | |
| }) | |
| return lyrics_data | |
| except Exception as e: | |
| print(f"Error extracting SYLT lyrics: {str(e)}") | |
| return [] | |
| def create_lrc_file(self, word_timestamps: List[Dict], output_path: str) -> str: | |
| """ | |
| Create an LRC (lyrics) file as an additional export option | |
| Args: | |
| word_timestamps: List of word timestamp dictionaries | |
| output_path: Path for the output LRC file | |
| Returns: | |
| Path to the created LRC file | |
| """ | |
| try: | |
| lrc_lines = [] | |
| # Group words into lines | |
| current_line = [] | |
| for word_data in word_timestamps: | |
| current_line.append(word_data) | |
| if len(current_line) >= 8: # 8 words per line | |
| if current_line: | |
| line_text = ' '.join([w.get('word', '') for w in current_line]) | |
| start_time = current_line[0].get('start', 0) | |
| # Format timestamp as [mm:ss.xx] | |
| minutes = int(start_time // 60) | |
| seconds = start_time % 60 | |
| timestamp_str = f"[{minutes:02d}:{seconds:05.2f}]" | |
| lrc_lines.append(f"{timestamp_str}{line_text}") | |
| current_line = [] | |
| # Add remaining words | |
| if current_line: | |
| line_text = ' '.join([w.get('word', '') for w in current_line]) | |
| start_time = current_line[0].get('start', 0) | |
| minutes = int(start_time // 60) | |
| seconds = start_time % 60 | |
| timestamp_str = f"[{minutes:02d}:{seconds:05.2f}]" | |
| lrc_lines.append(f"{timestamp_str}{line_text}") | |
| # Write LRC file | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| f.write('\n'.join(lrc_lines)) | |
| return output_path | |
| except Exception as e: | |
| raise Exception(f"Error creating LRC file: {str(e)}") | |
| def __del__(self): | |
| """Clean up temporary files""" | |
| import shutil | |
| if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir): | |
| try: | |
| shutil.rmtree(self.temp_dir) | |
| except: | |
| pass | |