Spaces:
Sleeping
Sleeping
| import tempfile | |
| import numpy as np | |
| import soundfile as sf | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Union | |
| import logging | |
| from straycat import Resampler | |
| from voice_data_converter import CompressedVoicebankManager, OtoEntry | |
| logger = logging.getLogger(__name__) | |
| class CompressedUTAUEngine: | |
| """์์ถ๋ HDF5 ๋ณด์ด์ค๋ฑ ํฌ๋ฅผ ์ฌ์ฉํ๋ UTAU ํธํ ์์ฑ ํฉ์ฑ ์์ง""" | |
| def __init__(self, compressed_voicebank_path: Union[str, Path]): | |
| self.voicebank = CompressedVoicebankManager(compressed_voicebank_path) | |
| self.default_phoneme = "ใ" # ๊ธฐ๋ณธ ์์ | |
| logger.info(f"์์ถ๋ UTAU ์์ง ์ด๊ธฐํ ์๋ฃ") | |
| def synthesize_sequence(self, | |
| notes: List[Dict], | |
| lyrics: List[str], | |
| tempo: int = 120, | |
| volume: int = 100) -> Tuple[Optional[str], str]: | |
| """๋ ธํธ ์ํ์ค์ ๊ฐ์ฌ๋ก ์์ฑ ํฉ์ฑ""" | |
| if len(notes) != len(lyrics): | |
| return None, "๋ ธํธ์ ๊ฐ์ฌ์ ๊ฐ์๊ฐ ์ผ์นํ์ง ์์ต๋๋ค." | |
| if not notes: | |
| return None, "ํฉ์ฑํ ๋ ธํธ๊ฐ ์์ต๋๋ค." | |
| try: | |
| # ์ ์ฒด ์ํ์ค ๊ธธ์ด ๊ณ์ฐ | |
| max_end_time = max(note.get('endSeconds', | |
| note.get('startSeconds', 0) + note.get('durationSeconds', 0.5)) | |
| for note in notes) | |
| sample_rate = 44100 | |
| total_samples = int(max_end_time * sample_rate) + sample_rate | |
| final_audio = np.zeros(total_samples) | |
| # ๊ฐ ๋ ธํธ ํฉ์ฑ | |
| for i, (note, lyric) in enumerate(zip(notes, lyrics)): | |
| try: | |
| # ์์ ๋ณํ | |
| phoneme = self._lyric_to_phoneme(lyric) | |
| # oto ์ํธ๋ฆฌ ์ฐพ๊ธฐ | |
| oto_entry = self.voicebank.get_sample_for_phoneme(phoneme) | |
| if not oto_entry: | |
| logger.warning(f"์์ '{phoneme}'์ ํด๋นํ๋ ์ํ์ ์ฐพ์ ์ ์์") | |
| continue | |
| # ์ค๋์ค ๋ฐ์ดํฐ ๋ก๋ (์์ถ๋ ๋ฐ์ดํฐ์์) | |
| audio_result = self.voicebank.get_audio_data(oto_entry.filename) | |
| if not audio_result: | |
| logger.warning(f"์ค๋์ค ํ์ผ ๋ก๋ ์คํจ: {oto_entry.filename}") | |
| continue | |
| source_audio, source_sample_rate = audio_result | |
| # ๋ ธํธ ํฉ์ฑ | |
| synth_audio = self._synthesize_note( | |
| note, oto_entry, source_audio, source_sample_rate, tempo, volume | |
| ) | |
| if synth_audio is not None: | |
| # ์๊ฐ ์์น ๊ณ์ฐ ๋ฐ ์ค๋์ค ๋ฐฐ์น | |
| start_sample = int(note.get('startSeconds', 0) * sample_rate) | |
| end_sample = start_sample + len(synth_audio) | |
| if end_sample <= len(final_audio): | |
| final_audio[start_sample:end_sample] += synth_audio * (note.get('velocity', 100) / 100) | |
| else: | |
| # ๋ฒํผ ํ์ฅ | |
| new_size = end_sample + sample_rate | |
| new_final_audio = np.zeros(new_size) | |
| new_final_audio[:len(final_audio)] = final_audio | |
| new_final_audio[start_sample:end_sample] += synth_audio * (note.get('velocity', 100) / 100) | |
| final_audio = new_final_audio | |
| logger.info(f"๋ ธํธ {i+1} ํฉ์ฑ ์๋ฃ: {phoneme}") | |
| except Exception as e: | |
| logger.error(f"๋ ธํธ {i+1} ํฉ์ฑ ์คํจ: {e}") | |
| continue | |
| # ์ต์ข ์ค๋์ค ์ ๊ทํ | |
| if np.max(np.abs(final_audio)) > 0: | |
| final_audio = final_audio / np.max(np.abs(final_audio)) * 0.85 | |
| # ์์ ํ์ผ ์ ์ฅ | |
| output_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) | |
| sf.write(output_file.name, final_audio, sample_rate) | |
| output_file.close() | |
| duration_sec = len(final_audio) / sample_rate | |
| return output_file.name, f"โ ์์ถ๋ ๋ณด์ด์ค๋ฑ ํฌ๋ก ํฉ์ฑ ์๋ฃ: {len(notes)}๊ฐ ๋ ธํธ, {duration_sec:.1f}์ด" | |
| except Exception as e: | |
| logger.error(f"์ํ์ค ํฉ์ฑ ์คํจ: {e}") | |
| return None, f"โ ํฉ์ฑ ์คํจ: {str(e)}" | |
| def _lyric_to_phoneme(self, lyric: str) -> str: | |
| """๊ฐ์ฌ๋ฅผ ์์๋ก ๋ณํ (๊ธฐ์กด ๋ก์ง๊ณผ ๋์ผ)""" | |
| lyric = lyric.strip() | |
| if not lyric: | |
| return self.default_phoneme | |
| # ํ๊ธ โ ์ผ๋ณธ์ด ์์ ๋ณํ (๊ฐ๋จํ ๋งคํ) | |
| hangul_to_japanese = { | |
| '๊ฐ': 'ka', '๋': 'na', '๋ค': 'da', '๋ผ': 'ra', '๋ง': 'ma', | |
| '๋ฐ': 'ba', '์ฌ': 'sa', '์': 'a', '์': 'za', '์ฐจ': 'cha', | |
| '์นด': 'ka', 'ํ': 'ta', 'ํ': 'pa', 'ํ': 'ha', | |
| '๊ฑฐ': 'ke', '๋': 'ne', '๋': 'de', '๋ฌ': 're', '๋จธ': 'me', | |
| '๋ฒ': 'be', '์': 'se', '์ด': 'e', '์ ': 'ze', '์ฒ': 'che', | |
| '์ปค': 'ke', 'ํฐ': 'te', 'ํผ': 'pe', 'ํ': 'he', | |
| '๊ณ ': 'ko', '๋ ธ': 'no', '๋': 'do', '๋ก': 'ro', '๋ชจ': 'mo', | |
| '๋ณด': 'bo', '์': 'so', '์ค': 'o', '์กฐ': 'zo', '์ด': 'cho', | |
| '์ฝ': 'ko', 'ํ ': 'to', 'ํฌ': 'po', 'ํธ': 'ho', | |
| '๊ตฌ': 'ku', '๋': 'nu', '๋': 'du', '๋ฃจ': 'ru', '๋ฌด': 'mu', | |
| '๋ถ': 'bu', '์': 'su', '์ฐ': 'u', '์ฃผ': 'zu', '์ถ': 'chu', | |
| '์ฟ ': 'ku', 'ํฌ': 'tu', 'ํธ': 'pu', 'ํ': 'hu', | |
| '๊ธฐ': 'ki', '๋': 'ni', '๋': 'di', '๋ฆฌ': 'ri', '๋ฏธ': 'mi', | |
| '๋น': 'bi', '์': 'si', '์ด': 'i', '์ง': 'zi', '์น': 'chi', | |
| 'ํค': 'ki', 'ํฐ': 'ti', 'ํผ': 'pi', 'ํ': 'hi', | |
| '๋': 'do', '๋ ': 're', '๋ฏธ': 'mi', 'ํ': 'pa', '์': 'so', '๋ผ': 'ra', '์': 'si' | |
| } | |
| if lyric in hangul_to_japanese: | |
| return hangul_to_japanese[lyric] | |
| return lyric if lyric in self.voicebank.oto_entries else self.default_phoneme | |
| def _synthesize_note(self, | |
| note: Dict, | |
| oto_entry: OtoEntry, | |
| source_audio: np.ndarray, | |
| source_sample_rate: int, | |
| tempo: int, | |
| volume: int) -> Optional[np.ndarray]: | |
| """๊ฐ๋ณ ๋ ธํธ ํฉ์ฑ (์์ถ๋ ์ค๋์ค ๋ฐ์ดํฐ ์ฌ์ฉ)""" | |
| try: | |
| # ์์ ํ์ผ์ ์๋ณธ ์ค๋์ค ์ ์ฅ | |
| temp_input = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) | |
| sf.write(temp_input.name, source_audio, source_sample_rate) | |
| temp_input.close() | |
| # ์ถ๋ ฅ ํ์ผ | |
| temp_output = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) | |
| temp_output.close() | |
| # ๋ ธํธ ์ ๋ณด ์ถ์ถ | |
| pitch = note['pitch'] | |
| duration_ms = note.get('durationSeconds', 0.5) * 1000 | |
| velocity = note.get('velocity', 100) | |
| # MIDI ๋ ธํธ๋ฅผ ์๊ณ๋ช ์ผ๋ก ๋ณํ | |
| note_name = self._midi_to_note_name(pitch) | |
| # straycat Resampler๋ก ํฉ์ฑ | |
| resampler = Resampler( | |
| in_file=temp_input.name, | |
| out_file=temp_output.name, | |
| pitch=note_name, | |
| velocity=velocity, | |
| length=max(duration_ms, 200), # ์ต์ 200ms | |
| volume=volume, | |
| offset=oto_entry.offset, | |
| consonant=oto_entry.consonant, | |
| cutoff=oto_entry.cutoff, | |
| modulation=10, | |
| tempo=f'!{tempo}' | |
| ) | |
| # ํฉ์ฑ๋ ์ค๋์ค ๋ก๋ | |
| if Path(temp_output.name).exists(): | |
| synth_audio, _ = sf.read(temp_output.name) | |
| # ์ ๋ฆฌ | |
| Path(temp_input.name).unlink(missing_ok=True) | |
| Path(temp_output.name).unlink(missing_ok=True) | |
| return synth_audio | |
| else: | |
| logger.error(f"ํฉ์ฑ๋ ํ์ผ์ด ์์ฑ๋์ง ์์: {temp_output.name}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"๋ ธํธ ํฉ์ฑ ์คํจ: {e}") | |
| return None | |
| def _midi_to_note_name(self, midi_note: int) -> str: | |
| """MIDI ๋ ธํธ ๋ฒํธ๋ฅผ ์๊ณ๋ช ์ผ๋ก ๋ณํ""" | |
| notes = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B'] | |
| octave = (midi_note // 12) - 1 | |
| note = notes[midi_note % 12] | |
| return f"{note}{octave}" | |
| def get_available_phonemes(self) -> List[str]: | |
| """์ฌ์ฉ ๊ฐ๋ฅํ ์์ ๋ชฉ๋ก ๋ฐํ""" | |
| return self.voicebank.list_available_phonemes() | |
| def get_compression_info(self) -> Dict[str, any]: | |
| """์์ถ ์ ๋ณด ๋ฐํ""" | |
| return self.voicebank.get_compression_info() |