Spaces:
Running
Running
| import h5py | |
| import numpy as np | |
| import soundfile as sf | |
| from pathlib import Path | |
| import logging | |
| import json | |
| import gzip | |
| from typing import Dict, List, Optional, Tuple, Union | |
| import shutil | |
| from utau_engine import VoicebankManager, OtoEntry | |
| logger = logging.getLogger(__name__) | |
| class VoiceDataCompressor: | |
| """์์ฑ ๋ฐ์ดํฐ๋ฅผ HDF5 ํํ๋ก ์์ถ/๊ด๋ฆฌํ๋ ํด๋์ค""" | |
| def __init__(self, output_path: str = "voice_data.h5"): | |
| self.output_path = Path(output_path) | |
| self.compression = 'gzip' # ์์ถ ์๊ณ ๋ฆฌ์ฆ | |
| self.compression_opts = 6 # ์์ถ ๋ ๋ฒจ (0-9) | |
| def convert_voicebank_to_hdf5(self, voicebank_path: Union[str, Path]) -> bool: | |
| """๋ณด์ด์ค๋ฑ ํฌ๋ฅผ HDF5 ํํ๋ก ๋ณํ""" | |
| try: | |
| voicebank_path = Path(voicebank_path) | |
| logger.info(f"๋ณด์ด์ค๋ฑ ํฌ ๋ณํ ์์: {voicebank_path}") | |
| # VoicebankManager๋ก ๊ธฐ์กด ๋ฐ์ดํฐ ๋ก๋ | |
| vb_manager = VoicebankManager(voicebank_path) | |
| # HDF5 ํ์ผ ์์ฑ | |
| with h5py.File(self.output_path, 'w') as h5file: | |
| # ๋ฉํ๋ฐ์ดํฐ ๊ทธ๋ฃน | |
| meta_group = h5file.create_group('metadata') | |
| # oto.ini ์ ๋ณด ์ ์ฅ | |
| oto_data = {} | |
| for alias, entry in vb_manager.oto_entries.items(): | |
| oto_data[alias] = { | |
| 'filename': entry.filename, | |
| 'alias': entry.alias, | |
| 'offset': entry.offset, | |
| 'consonant': entry.consonant, | |
| 'cutoff': entry.cutoff, | |
| 'preutterance': entry.preutterance, | |
| 'overlap': entry.overlap | |
| } | |
| # JSON์ผ๋ก ์ง๋ ฌํํ์ฌ ์์ถ ์ ์ฅ | |
| oto_json = json.dumps(oto_data, ensure_ascii=False, indent=2) | |
| oto_compressed = gzip.compress(oto_json.encode('utf-8')) | |
| meta_group.create_dataset('oto_data', data=np.frombuffer(oto_compressed, dtype=np.uint8)) | |
| # ๋ณด์ด์ค๋ฑ ํฌ ์ ๋ณด | |
| meta_group.attrs['voicebank_name'] = voicebank_path.name | |
| meta_group.attrs['total_entries'] = len(vb_manager.oto_entries) | |
| meta_group.attrs['total_wav_files'] = len(vb_manager.wav_files) | |
| # ์ค๋์ค ๋ฐ์ดํฐ ๊ทธ๋ฃน | |
| audio_group = h5file.create_group('audio_data') | |
| # ๊ฐ WAV ํ์ผ ์ฒ๋ฆฌ | |
| processed_files = set() | |
| total_original_size = 0 | |
| for filename, wav_path in vb_manager.wav_files.items(): | |
| if filename in processed_files: | |
| continue | |
| try: | |
| # ์ค๋์ค ๋ก๋ | |
| audio_data, sample_rate = sf.read(wav_path) | |
| # ํ์ผ ํฌ๊ธฐ ๊ณ์ฐ | |
| total_original_size += wav_path.stat().st_size | |
| # ์คํ ๋ ์ค โ ๋ชจ๋ ธ ๋ณํ | |
| if len(audio_data.shape) > 1: | |
| audio_data = np.mean(audio_data, axis=1) | |
| # ํ์ผ๋ณ ๊ทธ๋ฃน ์์ฑ | |
| file_group = audio_group.create_group(filename.replace('.wav', '')) | |
| # ์ค๋์ค ๋ฐ์ดํฐ ์ ์ฅ (์์ถ ์ ์ฉ) | |
| file_group.create_dataset( | |
| 'audio', | |
| data=audio_data.astype(np.float32), | |
| compression=self.compression, | |
| compression_opts=self.compression_opts, | |
| shuffle=True, # ์์ถ ํจ์จ ํฅ์ | |
| fletcher32=True # ์ฒดํฌ์ฌ ์ถ๊ฐ | |
| ) | |
| # ๋ฉํ๋ฐ์ดํฐ ์ ์ฅ | |
| file_group.attrs['sample_rate'] = sample_rate | |
| file_group.attrs['duration'] = len(audio_data) / sample_rate | |
| file_group.attrs['original_filename'] = filename | |
| processed_files.add(filename) | |
| logger.info(f"๋ณํ ์๋ฃ: {filename} ({len(audio_data)} samples)") | |
| except Exception as e: | |
| logger.error(f"ํ์ผ ์ฒ๋ฆฌ ์คํจ {wav_path}: {e}") | |
| continue | |
| # ์์ถ ํต๊ณ | |
| compressed_size = self.output_path.stat().st_size | |
| compression_ratio = (1 - compressed_size / total_original_size) * 100 | |
| meta_group.attrs['original_size_bytes'] = total_original_size | |
| meta_group.attrs['compressed_size_bytes'] = compressed_size | |
| meta_group.attrs['compression_ratio_percent'] = compression_ratio | |
| logger.info(f"๋ณํ ์๋ฃ!") | |
| logger.info(f"์๋ณธ ํฌ๊ธฐ: {total_original_size / (1024*1024):.1f} MB") | |
| logger.info(f"์์ถ ํฌ๊ธฐ: {compressed_size / (1024*1024):.1f} MB") | |
| logger.info(f"์์ถ์จ: {compression_ratio:.1f}%") | |
| return True | |
| except Exception as e: | |
| logger.error(f"HDF5 ๋ณํ ์คํจ: {e}") | |
| return False | |
| class CompressedVoicebankManager: | |
| """์์ถ๋ HDF5 ๋ณด์ด์ค๋ฑ ํฌ๋ฅผ ๊ด๋ฆฌํ๋ ํด๋์ค""" | |
| def __init__(self, hdf5_path: Union[str, Path]): | |
| self.hdf5_path = Path(hdf5_path) | |
| self.oto_entries: Dict[str, OtoEntry] = {} | |
| self._audio_cache: Dict[str, Tuple[np.ndarray, int]] = {} | |
| self.cache_size_limit = 50 # ์บ์ํ ์ต๋ ์ค๋์ค ํ์ผ ์ | |
| if not self.hdf5_path.exists(): | |
| raise FileNotFoundError(f"์์ถ๋ ๋ณด์ด์ค๋ฑ ํฌ๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค: {hdf5_path}") | |
| self.load_metadata() | |
| def load_metadata(self): | |
| """HDF5์์ ๋ฉํ๋ฐ์ดํฐ ๋ก๋""" | |
| try: | |
| with h5py.File(self.hdf5_path, 'r') as h5file: | |
| # oto.ini ๋ฐ์ดํฐ ๋ก๋ | |
| oto_compressed = h5file['metadata']['oto_data'][:] | |
| oto_json = gzip.decompress(oto_compressed.tobytes()).decode('utf-8') | |
| oto_data = json.loads(oto_json) | |
| # OtoEntry ๊ฐ์ฒด๋ก ๋ณํ | |
| for alias, data in oto_data.items(): | |
| self.oto_entries[alias] = OtoEntry( | |
| filename=data['filename'], | |
| alias=data['alias'], | |
| offset=data['offset'], | |
| consonant=data['consonant'], | |
| cutoff=data['cutoff'], | |
| preutterance=data['preutterance'], | |
| overlap=data['overlap'] | |
| ) | |
| # ๋ฉํ๋ฐ์ดํฐ ๋ก๊ทธ | |
| meta = h5file['metadata'] | |
| logger.info(f"์์ถ๋ ๋ณด์ด์ค๋ฑ ํฌ ๋ก๋: {meta.attrs['voicebank_name']}") | |
| logger.info(f"์ด {meta.attrs['total_entries']}๊ฐ ์ํธ๋ฆฌ") | |
| logger.info(f"์์ถ์จ: {meta.attrs['compression_ratio_percent']:.1f}%") | |
| except Exception as e: | |
| logger.error(f"๋ฉํ๋ฐ์ดํฐ ๋ก๋ ์คํจ: {e}") | |
| raise | |
| def get_audio_data(self, filename: str) -> Optional[Tuple[np.ndarray, int]]: | |
| """ํน์ ํ์ผ์ ์ค๋์ค ๋ฐ์ดํฐ ๋ก๋ (์บ์ฑ ์ง์)""" | |
| base_filename = filename.replace('.wav', '') | |
| # ์บ์ ํ์ธ | |
| if base_filename in self._audio_cache: | |
| return self._audio_cache[base_filename] | |
| try: | |
| with h5py.File(self.hdf5_path, 'r') as h5file: | |
| if base_filename not in h5file['audio_data']: | |
| return None | |
| file_group = h5file['audio_data'][base_filename] | |
| audio_data = file_group['audio'][:] | |
| sample_rate = file_group.attrs['sample_rate'] | |
| # ์บ์ ๊ด๋ฆฌ (LRU ๋ฐฉ์) | |
| if len(self._audio_cache) >= self.cache_size_limit: | |
| # ๊ฐ์ฅ ์ค๋๋ ํญ๋ชฉ ์ ๊ฑฐ | |
| oldest_key = next(iter(self._audio_cache)) | |
| del self._audio_cache[oldest_key] | |
| # ์บ์์ ์ ์ฅ | |
| result = (audio_data, int(sample_rate)) | |
| self._audio_cache[base_filename] = result | |
| return result | |
| except Exception as e: | |
| logger.error(f"์ค๋์ค ๋ฐ์ดํฐ ๋ก๋ ์คํจ {filename}: {e}") | |
| return None | |
| def get_sample_for_phoneme(self, phoneme: str) -> Optional[OtoEntry]: | |
| """์์์ ํด๋นํ๋ ์ํ ์ฐพ๊ธฐ (๊ธฐ์กด ๋ก์ง๊ณผ ๋์ผ)""" | |
| # ์ ํํ ๋งค์น ๋จผ์ ์๋ | |
| if phoneme in self.oto_entries: | |
| return self.oto_entries[phoneme] | |
| # ์ ์ฌํ ๋ฐ์ ์ฐพ๊ธฐ | |
| candidates = [] | |
| for alias in self.oto_entries: | |
| entry = self.oto_entries[alias] | |
| if entry.clean_alias == phoneme: | |
| candidates.append(entry) | |
| if candidates: | |
| # ์จ์๋ฆฌ๊ฐ ์๋ ๊ฒ์ ์ฐ์ | |
| non_breath = [c for c in candidates if not c.is_breath] | |
| return non_breath[0] if non_breath else candidates[0] | |
| return None | |
| def list_available_phonemes(self) -> List[str]: | |
| """์ฌ์ฉ ๊ฐ๋ฅํ ์์ ๋ชฉ๋ก""" | |
| return list(set(entry.clean_alias for entry in self.oto_entries.values())) | |
| def get_compression_info(self) -> Dict[str, any]: | |
| """์์ถ ์ ๋ณด ๋ฐํ""" | |
| try: | |
| with h5py.File(self.hdf5_path, 'r') as h5file: | |
| meta = h5file['metadata'] | |
| return { | |
| 'voicebank_name': meta.attrs['voicebank_name'], | |
| 'total_entries': meta.attrs['total_entries'], | |
| 'original_size_mb': meta.attrs['original_size_bytes'] / (1024*1024), | |
| 'compressed_size_mb': meta.attrs['compressed_size_bytes'] / (1024*1024), | |
| 'compression_ratio': meta.attrs['compression_ratio_percent'], | |
| 'file_path': str(self.hdf5_path) | |
| } | |
| except Exception as e: | |
| logger.error(f"์์ถ ์ ๋ณด ๋ก๋ ์คํจ: {e}") | |
| return {} | |
| def convert_voicebank_to_compressed_format(voicebank_path: str, output_path: str = None) -> bool: | |
| """๋ณด์ด์ค๋ฑ ํฌ๋ฅผ ์์ถ ํํ๋ก ๋ณํํ๋ ํธ์ ํจ์""" | |
| if output_path is None: | |
| voicebank_name = Path(voicebank_path).name.replace(' ', '_') | |
| output_path = f"voice/{voicebank_name}_compressed.h5" | |
| converter = VoiceDataCompressor(output_path) | |
| return converter.convert_voicebank_to_hdf5(voicebank_path) | |
| if __name__ == "__main__": | |
| # ํ ์คํธ์ฉ ๋ณํ | |
| success = convert_voicebank_to_compressed_format("voice/hanseol CVC") | |
| if success: | |
| print("โ ๋ณด์ด์ค๋ฑ ํฌ ์์ถ ๋ณํ ์๋ฃ!") | |
| # ์์ถ๋ ๋ฒ์ ํ ์คํธ | |
| compressed_vb = CompressedVoicebankManager("voice/hanseol_CVC_compressed.h5") | |
| print(f"๐ ์์ถ ์ ๋ณด: {compressed_vb.get_compression_info()}") | |
| print(f"๐ค ์ฌ์ฉ ๊ฐ๋ฅํ ์์: {len(compressed_vb.list_available_phonemes())}๊ฐ") | |
| else: | |
| print("โ ๋ณด์ด์ค๋ฑ ํฌ ์์ถ ์คํจ!") |