Spaces:
Running
Running
| import os | |
| import subprocess | |
| import librosa | |
| import soundfile as sf | |
| import logging | |
| import json | |
| import numpy as np | |
| # Ocultar ventanas de consola de subprocesos en Windows | |
| CREATION_FLAGS = 0x08000000 if os.name == 'nt' else 0 | |
| logger = logging.getLogger(__name__) | |
| def pitch_shift_audio(input_path: str, output_path: str, semitones: float): | |
| """ | |
| Desplaza el tono (pitch shift) de un archivo de audio sin cambiar su duración. | |
| """ | |
| try: | |
| logger.info(f"Aplicando pitch shift de {semitones} semitonos a {input_path}") | |
| # Cargar el audio con su tasa de muestreo original para conservar la fidelidad | |
| y, sr = librosa.load(input_path, sr=None) | |
| # Realizar el desplazamiento tonal | |
| y_shifted = librosa.effects.pitch_shift(y, sr=sr, n_steps=semitones, res_type='soxr_hq') | |
| # Guardar en formato FLAC para mantener compatibilidad y bajo peso | |
| sf.write(output_path, y_shifted, sr, format='FLAC') | |
| logger.info(f"Guardado exitosamente en {output_path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error en pitch_shift_audio para {input_path} con {semitones} semitonos: {e}", exc_info=True) | |
| return False | |
| def extract_pitch_trajectory(vocals_path: str, output_json_path: str): | |
| """ | |
| Extrae la trayectoria de afinación (YIN) de la voz principal a intervalos de 100ms | |
| y la guarda como un archivo JSON de notas MIDI (floats para afinación precisa). | |
| """ | |
| try: | |
| logger.info(f"Extrayendo trayectoria de afinación (YIN) de: {vocals_path}") | |
| # Carga rápida a 22050Hz para optimizar el análisis de pitch | |
| y, sr = librosa.load(vocals_path, sr=22050) | |
| # Calcular hop_length para muestreo de ~100ms (10 frames por segundo) | |
| hop_length = int(sr * 0.1) | |
| # Algoritmo YIN para detección monofónica de frecuencia fundamental | |
| f0 = librosa.yin( | |
| y, | |
| fmin=librosa.note_to_hz('C2'), # ~65 Hz (Bajo) | |
| fmax=librosa.note_to_hz('C6'), # ~1046 Hz (Soprano) | |
| sr=sr, | |
| hop_length=hop_length | |
| ) | |
| # Calcular la energía cuadrática media (RMS) para discriminar silencios | |
| rms = librosa.feature.rms(y=y, hop_length=hop_length)[0] | |
| energy_threshold = 0.015 | |
| midi_notes = [] | |
| for idx, freq in enumerate(f0): | |
| # Si el frame es silencioso o el algoritmo falló, marcar como 0 (Silencio) | |
| if rms[idx] < energy_threshold or np.isnan(freq) or freq < 65.0: | |
| midi_notes.append(0) | |
| else: | |
| # Convertir Hz a nota MIDI con precisión decimal | |
| midi_note = float(librosa.hz_to_midi(freq)) | |
| midi_notes.append(round(midi_note, 2)) | |
| # Guardar en archivo JSON | |
| with open(output_json_path, 'w') as f: | |
| json.dump(midi_notes, f) | |
| logger.info(f"Trayectoria de afinación guardada con éxito en: {output_json_path}") | |
| return midi_notes | |
| except Exception as e: | |
| logger.error(f"Error extrayendo pitch de {vocals_path}: {e}", exc_info=True) | |
| return None | |
| def generar_armonias(vocals_path: str, output_dir: str): | |
| """ | |
| Genera las 6 voces corales estándar + sus trayectorias de pitch JSON para afinador en tiempo real. | |
| Optimizado cargando la voz una sola vez a 32kHz para reducir el procesamiento y lecturas de disco. | |
| """ | |
| if not os.path.exists(vocals_path): | |
| return {"success": False, "error": f"No se encuentra el archivo de voz principal: {vocals_path}"} | |
| voces = { | |
| "2da_voz": 4.0, # Tercera mayor (+4 semitonos) | |
| "2da_voz_baja": -4.0, # Tercera mayor abajo (-4 semitonos) | |
| "3ra_voz": 7.0, # Quinta perfecta (+7 semitonos) | |
| "3ra_voz_baja": -7.0, # Quinta perfecta abajo (-7 semitonos) | |
| "oct_alta": 12.0, # Octava arriba (+12 semitonos) | |
| "oct_baja": -12.0 # Octava abajo (-12 semitonos) | |
| } | |
| resultados = {} | |
| # Comprobar si ya existen todas las voces procesadas (en m4a o flac) antes de cargar el audio original | |
| necesita_procesar = False | |
| for nombre_voz in voces.keys(): | |
| out_file_m4a = os.path.join(output_dir, f"{nombre_voz}.m4a") | |
| out_file_flac = os.path.join(output_dir, f"{nombre_voz}.flac") | |
| if os.path.exists(out_file_m4a): | |
| resultados[nombre_voz] = out_file_m4a | |
| elif os.path.exists(out_file_flac): | |
| resultados[nombre_voz] = out_file_flac | |
| else: | |
| necesita_procesar = True | |
| # Cargar y procesar solo si es necesario | |
| if necesita_procesar: | |
| try: | |
| logger.info(f"Cargando voz principal a 32kHz para armonización: {vocals_path}") | |
| y, sr = librosa.load(vocals_path, sr=32000) | |
| for nombre_voz, semitonos in voces.items(): | |
| out_file = os.path.join(output_dir, f"{nombre_voz}.m4a") | |
| if nombre_voz in resultados: | |
| continue | |
| try: | |
| logger.info(f"Aplicando pitch shift de {semitonos} semitonos a {nombre_voz}") | |
| y_shifted = librosa.effects.pitch_shift(y, sr=sr, n_steps=semitonos, res_type='soxr_hq') | |
| # Guardar temporalmente en WAV | |
| temp_wav = os.path.join(output_dir, f"{nombre_voz}_temp.wav") | |
| sf.write(temp_wav, y_shifted, sr, format='WAV') | |
| # Transcodificar a M4A con ffmpeg | |
| transcode_ok = False | |
| try: | |
| cmd = ["ffmpeg", "-y", "-i", temp_wav, "-c:a", "aac", "-b:a", "192k", out_file] | |
| subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, creationflags=CREATION_FLAGS) | |
| transcode_ok = True | |
| except Exception as trans_err: | |
| logger.error(f"Error al transcodificar {nombre_voz} a M4A: {trans_err}") | |
| # Eliminar temp wav | |
| if os.path.exists(temp_wav): | |
| try: | |
| os.remove(temp_wav) | |
| except Exception: | |
| pass | |
| if transcode_ok and os.path.exists(out_file): | |
| resultados[nombre_voz] = out_file | |
| logger.info(f"Guardado exitosamente en M4A: {out_file}") | |
| else: | |
| # Fallback a escribir FLAC si ffmpeg falla | |
| fallback_flac = os.path.join(output_dir, f"{nombre_voz}.flac") | |
| sf.write(fallback_flac, y_shifted, sr, format='FLAC') | |
| resultados[nombre_voz] = fallback_flac | |
| logger.info(f"Guardado exitosamente (Fallback FLAC): {fallback_flac}") | |
| except Exception as e: | |
| logger.error(f"Error generando {nombre_voz}: {e}", exc_info=True) | |
| except Exception as e: | |
| logger.error(f"Error al cargar la voz principal para armonizar: {e}", exc_info=True) | |
| return {"success": False, "error": f"Error al cargar la voz: {str(e)}"} | |
| # 2. Extraer trayectoria de pitch base de la voz principal | |
| base_pitch_path = os.path.join(output_dir, "vocals_pitch.json") | |
| base_trajectory = None | |
| if os.path.exists(base_pitch_path): | |
| try: | |
| with open(base_pitch_path, 'r') as f: | |
| base_trajectory = json.load(f) | |
| except Exception: | |
| pass | |
| if base_trajectory is None: | |
| base_trajectory = extract_pitch_trajectory(vocals_path, base_pitch_path) | |
| # 3. Generar matemáticamente los JSON de afinación para las armonías (Optimización CPU 5x) | |
| if base_trajectory: | |
| for nombre_voz, semitonos in voces.items(): | |
| voice_pitch_path = os.path.join(output_dir, f"{nombre_voz}_pitch.json") | |
| if os.path.exists(voice_pitch_path): | |
| continue | |
| # Desplazar la nota MIDI sumando/restando semitonos (manteniendo 0 en silencios) | |
| shifted_trajectory = [ | |
| round(note + semitonos, 2) if note > 0 else 0 | |
| for note in base_trajectory | |
| ] | |
| try: | |
| with open(voice_pitch_path, 'w') as f: | |
| json.dump(shifted_trajectory, f) | |
| except Exception as e: | |
| logger.error(f"Error guardando pitch JSON para {nombre_voz}: {e}") | |
| return { | |
| "success": len(resultados) == len(voces) and base_trajectory is not None, | |
| "voces": resultados | |
| } | |