Spaces:
Sleeping
Sleeping
File size: 12,631 Bytes
6609c06 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 | from mutagen.mp3 import MP3
from mutagen.id3 import ID3, SYLT, USLT, Encoding
import os
import tempfile
import shutil
import subprocess
from typing import List, Dict, Tuple
# --- Helper function to check for ffmpeg ---
def is_ffmpeg_available():
"""Check if ffmpeg is installed and accessible in the system's PATH."""
return shutil.which("ffmpeg") is not None
class MP3Embedder:
"""Handles embedding SYLT synchronized lyrics into MP3 files with robust error handling."""
def __init__(self):
"""Initialize the MP3 embedder."""
self.temp_dir = "/tmp/audio_sync"
os.makedirs(self.temp_dir, exist_ok=True)
self.ffmpeg_available = is_ffmpeg_available()
def embed_sylt_lyrics(self, audio_path: str, word_timestamps: List[Dict],
text: str, output_filename: str) -> Tuple[str, List[str]]:
"""
Embeds SYLT synchronized lyrics into an MP3 file and returns logs.
Returns:
A tuple containing:
- The path to the output MP3 file.
- A list of log messages detailing the process.
"""
log_messages = []
def log_and_print(message):
log_messages.append(message)
print(f"MP3_EMBEDDER: {message}")
log_and_print(f"--- MP3Embedder initialized. ffmpeg available: {self.ffmpeg_available} ---")
log_and_print(f"--- Starting SYLT embedding for: {os.path.basename(audio_path)} ---")
output_path = os.path.join(self.temp_dir, output_filename)
try:
# --- Step 1: Ensure the file is in MP3 format ---
if not audio_path.lower().endswith('.mp3'):
if self.ffmpeg_available:
log_and_print(f"'{os.path.basename(audio_path)}' is not an MP3. Converting with ffmpeg...")
try:
subprocess.run(
['ffmpeg', '-i', audio_path, '-codec:a', 'libmp3lame', '-q:a', '2', output_path],
check=True, capture_output=True, text=True
)
log_and_print("--- ffmpeg conversion successful. ---")
except subprocess.CalledProcessError as e:
log_and_print("--- ERROR: ffmpeg conversion failed. ---")
log_and_print(f"--- ffmpeg stderr: {e.stderr} ---")
log_and_print("--- Fallback: Copying original file without conversion. ---")
shutil.copy2(audio_path, output_path)
else:
log_and_print("--- WARNING: ffmpeg is not available. Cannot convert non-MP3 file. Copying directly. ---")
shutil.copy2(audio_path, output_path)
else:
log_and_print("--- Audio is already MP3. Copying to temporary location. ---")
shutil.copy2(audio_path, output_path)
# --- Step 2: Create SYLT data ---
log_and_print("--- Creating SYLT data from timestamps... ---")
sylt_data = self._create_sylt_data(word_timestamps)
if not sylt_data:
log_and_print("--- WARNING: No SYLT data could be created. Skipping embedding. ---")
return output_path, log_messages
log_and_print(f"--- Created {len(sylt_data)} SYLT entries. ---")
# --- Step 3: Embed data into the MP3 file ---
try:
log_and_print("--- Loading MP3 file with mutagen... ---")
audio_file = MP3(output_path, ID3=ID3)
if audio_file.tags is None:
log_and_print("--- No ID3 tags found. Creating new ones. ---")
audio_file.add_tags()
# --- Embed SYLT (Synchronized Lyrics) ---
log_and_print("--- Creating and adding SYLT frame... ---")
sylt_frame = SYLT(
encoding=Encoding.UTF8,
lang='eng',
format=2,
type=1,
text=sylt_data
)
audio_file.tags.delall('SYLT')
audio_file.tags.add(sylt_frame)
# --- Embed USLT (Unsynchronized Lyrics) as a fallback ---
log_and_print("--- Creating and adding USLT frame... ---")
uslt_frame = USLT(
encoding=Encoding.UTF8,
lang='eng',
desc='',
text=text
)
audio_file.tags.delall('USLT')
audio_file.tags.add(uslt_frame)
audio_file.save()
log_and_print("--- Successfully embedded SYLT and USLT frames. ---")
except Exception as e:
log_and_print(f"--- ERROR: Failed to embed SYLT/USLT: {e} ---")
return output_path, log_messages
except Exception as e:
log_and_print(f"--- ERROR: Unexpected error in embed_sylt_lyrics: {e} ---")
return output_path, log_messages
def _create_sylt_data(self, word_timestamps: List[Dict]) -> List[tuple]:
"""
Create SYLT data format from word timestamps
Args:
word_timestamps: List of word timestamp dictionaries
Returns:
List of tuples (text, timestamp_in_milliseconds)
"""
# Debug print to check incoming data
print(f"DEBUG: word_timestamps received in _create_sylt_data: {word_timestamps}")
try:
sylt_data = []
for word_data in word_timestamps:
word = word_data.get('word', '').strip()
start_time = word_data.get('start', 0)
if word:
# Convert seconds to milliseconds
timestamp_ms = int(start_time * 1000)
sylt_data.append((word, timestamp_ms))
return sylt_data
except Exception as e:
print(f"Error creating SYLT data: {str(e)}")
return []
def _create_line_based_sylt_data(self, word_timestamps: List[Dict], max_words_per_line: int = 6) -> List[tuple]:
"""
Create line-based SYLT data (alternative approach)
Args:
word_timestamps: List of word timestamp dictionaries
max_words_per_line: Maximum words per line
Returns:
List of tuples (line_text, timestamp_in_milliseconds)
"""
try:
sylt_data = []
current_line = []
for word_data in word_timestamps:
current_line.append(word_data)
# Check if we should end this line
if len(current_line) >= max_words_per_line:
if current_line:
line_text = ' '.join([w.get('word', '') for w in current_line]).strip()
start_time = current_line[0].get('start', 0)
timestamp_ms = int(start_time * 1000)
if line_text:
sylt_data.append((line_text, timestamp_ms))
current_line = []
# Add remaining words as final line
if current_line:
line_text = ' '.join([w.get('word', '') for w in current_line]).strip()
start_time = current_line[0].get('start', 0)
timestamp_ms = int(start_time * 1000)
if line_text:
sylt_data.append((line_text, timestamp_ms))
return sylt_data
except Exception as e:
print(f"Error creating line-based SYLT data: {str(e)}")
return []
def verify_sylt_embedding(self, mp3_path: str) -> Dict:
"""
Verify that SYLT lyrics are properly embedded
Args:
mp3_path: Path to the MP3 file
Returns:
Dictionary with verification results
"""
try:
audio_file = MP3(mp3_path)
result = {
'has_sylt': False,
'has_uslt': False,
'sylt_entries': 0,
'error': None
}
if audio_file.tags:
# Check for SYLT
sylt_frames = audio_file.tags.getall('SYLT')
if sylt_frames:
result['has_sylt'] = True
result['sylt_entries'] = len(sylt_frames[0].text) if sylt_frames[0].text else 0
# Check for USLT (fallback)
uslt_frames = audio_file.tags.getall('USLT')
if uslt_frames:
result['has_uslt'] = True
return result
except Exception as e:
return {
'has_sylt': False,
'has_uslt': False,
'sylt_entries': 0,
'error': str(e)
}
def extract_sylt_lyrics(self, mp3_path: str) -> List[Dict]:
"""
Extract SYLT lyrics from an MP3 file (for debugging)
Args:
mp3_path: Path to the MP3 file
Returns:
List of dictionaries with text and timestamp
"""
try:
audio_file = MP3(mp3_path)
lyrics_data = []
if audio_file.tags:
sylt_frames = audio_file.tags.getall('SYLT')
for frame in sylt_frames:
if frame.text:
for text, timestamp_ms in frame.text:
lyrics_data.append({
'text': text,
'timestamp': timestamp_ms / 1000.0 # Convert to seconds
})
return lyrics_data
except Exception as e:
print(f"Error extracting SYLT lyrics: {str(e)}")
return []
def create_lrc_file(self, word_timestamps: List[Dict], output_path: str) -> str:
"""
Create an LRC (lyrics) file as an additional export option
Args:
word_timestamps: List of word timestamp dictionaries
output_path: Path for the output LRC file
Returns:
Path to the created LRC file
"""
try:
lrc_lines = []
# Group words into lines
current_line = []
for word_data in word_timestamps:
current_line.append(word_data)
if len(current_line) >= 8: # 8 words per line
if current_line:
line_text = ' '.join([w.get('word', '') for w in current_line])
start_time = current_line[0].get('start', 0)
# Format timestamp as [mm:ss.xx]
minutes = int(start_time // 60)
seconds = start_time % 60
timestamp_str = f"[{minutes:02d}:{seconds:05.2f}]"
lrc_lines.append(f"{timestamp_str}{line_text}")
current_line = []
# Add remaining words
if current_line:
line_text = ' '.join([w.get('word', '') for w in current_line])
start_time = current_line[0].get('start', 0)
minutes = int(start_time // 60)
seconds = start_time % 60
timestamp_str = f"[{minutes:02d}:{seconds:05.2f}]"
lrc_lines.append(f"{timestamp_str}{line_text}")
# Write LRC file
with open(output_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(lrc_lines))
return output_path
except Exception as e:
raise Exception(f"Error creating LRC file: {str(e)}")
def __del__(self):
"""Clean up temporary files"""
import shutil
if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
except:
pass
|