UTAU-WebUI / voice_data_converter.py
crlotwhite's picture
Add UTAU WebUI project with LFS support for voice files
1056960
import h5py
import numpy as np
import soundfile as sf
from pathlib import Path
import logging
import json
import gzip
from typing import Dict, List, Optional, Tuple, Union
import shutil
from utau_engine import VoicebankManager, OtoEntry
logger = logging.getLogger(__name__)
class VoiceDataCompressor:
"""์Œ์„ฑ ๋ฐ์ดํ„ฐ๋ฅผ HDF5 ํ˜•ํƒœ๋กœ ์••์ถ•/๊ด€๋ฆฌํ•˜๋Š” ํด๋ž˜์Šค"""
def __init__(self, output_path: str = "voice_data.h5"):
self.output_path = Path(output_path)
self.compression = 'gzip' # ์••์ถ• ์•Œ๊ณ ๋ฆฌ์ฆ˜
self.compression_opts = 6 # ์••์ถ• ๋ ˆ๋ฒจ (0-9)
def convert_voicebank_to_hdf5(self, voicebank_path: Union[str, Path]) -> bool:
"""๋ณด์ด์Šค๋ฑ…ํฌ๋ฅผ HDF5 ํ˜•ํƒœ๋กœ ๋ณ€ํ™˜"""
try:
voicebank_path = Path(voicebank_path)
logger.info(f"๋ณด์ด์Šค๋ฑ…ํฌ ๋ณ€ํ™˜ ์‹œ์ž‘: {voicebank_path}")
# VoicebankManager๋กœ ๊ธฐ์กด ๋ฐ์ดํ„ฐ ๋กœ๋“œ
vb_manager = VoicebankManager(voicebank_path)
# HDF5 ํŒŒ์ผ ์ƒ์„ฑ
with h5py.File(self.output_path, 'w') as h5file:
# ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ๊ทธ๋ฃน
meta_group = h5file.create_group('metadata')
# oto.ini ์ •๋ณด ์ €์žฅ
oto_data = {}
for alias, entry in vb_manager.oto_entries.items():
oto_data[alias] = {
'filename': entry.filename,
'alias': entry.alias,
'offset': entry.offset,
'consonant': entry.consonant,
'cutoff': entry.cutoff,
'preutterance': entry.preutterance,
'overlap': entry.overlap
}
# JSON์œผ๋กœ ์ง๋ ฌํ™”ํ•˜์—ฌ ์••์ถ• ์ €์žฅ
oto_json = json.dumps(oto_data, ensure_ascii=False, indent=2)
oto_compressed = gzip.compress(oto_json.encode('utf-8'))
meta_group.create_dataset('oto_data', data=np.frombuffer(oto_compressed, dtype=np.uint8))
# ๋ณด์ด์Šค๋ฑ…ํฌ ์ •๋ณด
meta_group.attrs['voicebank_name'] = voicebank_path.name
meta_group.attrs['total_entries'] = len(vb_manager.oto_entries)
meta_group.attrs['total_wav_files'] = len(vb_manager.wav_files)
# ์˜ค๋””์˜ค ๋ฐ์ดํ„ฐ ๊ทธ๋ฃน
audio_group = h5file.create_group('audio_data')
# ๊ฐ WAV ํŒŒ์ผ ์ฒ˜๋ฆฌ
processed_files = set()
total_original_size = 0
for filename, wav_path in vb_manager.wav_files.items():
if filename in processed_files:
continue
try:
# ์˜ค๋””์˜ค ๋กœ๋“œ
audio_data, sample_rate = sf.read(wav_path)
# ํŒŒ์ผ ํฌ๊ธฐ ๊ณ„์‚ฐ
total_original_size += wav_path.stat().st_size
# ์Šคํ…Œ๋ ˆ์˜ค โ†’ ๋ชจ๋…ธ ๋ณ€ํ™˜
if len(audio_data.shape) > 1:
audio_data = np.mean(audio_data, axis=1)
# ํŒŒ์ผ๋ณ„ ๊ทธ๋ฃน ์ƒ์„ฑ
file_group = audio_group.create_group(filename.replace('.wav', ''))
# ์˜ค๋””์˜ค ๋ฐ์ดํ„ฐ ์ €์žฅ (์••์ถ• ์ ์šฉ)
file_group.create_dataset(
'audio',
data=audio_data.astype(np.float32),
compression=self.compression,
compression_opts=self.compression_opts,
shuffle=True, # ์••์ถ• ํšจ์œจ ํ–ฅ์ƒ
fletcher32=True # ์ฒดํฌ์„ฌ ์ถ”๊ฐ€
)
# ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ €์žฅ
file_group.attrs['sample_rate'] = sample_rate
file_group.attrs['duration'] = len(audio_data) / sample_rate
file_group.attrs['original_filename'] = filename
processed_files.add(filename)
logger.info(f"๋ณ€ํ™˜ ์™„๋ฃŒ: {filename} ({len(audio_data)} samples)")
except Exception as e:
logger.error(f"ํŒŒ์ผ ์ฒ˜๋ฆฌ ์‹คํŒจ {wav_path}: {e}")
continue
# ์••์ถ• ํ†ต๊ณ„
compressed_size = self.output_path.stat().st_size
compression_ratio = (1 - compressed_size / total_original_size) * 100
meta_group.attrs['original_size_bytes'] = total_original_size
meta_group.attrs['compressed_size_bytes'] = compressed_size
meta_group.attrs['compression_ratio_percent'] = compression_ratio
logger.info(f"๋ณ€ํ™˜ ์™„๋ฃŒ!")
logger.info(f"์›๋ณธ ํฌ๊ธฐ: {total_original_size / (1024*1024):.1f} MB")
logger.info(f"์••์ถ• ํฌ๊ธฐ: {compressed_size / (1024*1024):.1f} MB")
logger.info(f"์••์ถ•์œจ: {compression_ratio:.1f}%")
return True
except Exception as e:
logger.error(f"HDF5 ๋ณ€ํ™˜ ์‹คํŒจ: {e}")
return False
class CompressedVoicebankManager:
"""์••์ถ•๋œ HDF5 ๋ณด์ด์Šค๋ฑ…ํฌ๋ฅผ ๊ด€๋ฆฌํ•˜๋Š” ํด๋ž˜์Šค"""
def __init__(self, hdf5_path: Union[str, Path]):
self.hdf5_path = Path(hdf5_path)
self.oto_entries: Dict[str, OtoEntry] = {}
self._audio_cache: Dict[str, Tuple[np.ndarray, int]] = {}
self.cache_size_limit = 50 # ์บ์‹œํ•  ์ตœ๋Œ€ ์˜ค๋””์˜ค ํŒŒ์ผ ์ˆ˜
if not self.hdf5_path.exists():
raise FileNotFoundError(f"์••์ถ•๋œ ๋ณด์ด์Šค๋ฑ…ํฌ๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค: {hdf5_path}")
self.load_metadata()
def load_metadata(self):
"""HDF5์—์„œ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ๋กœ๋“œ"""
try:
with h5py.File(self.hdf5_path, 'r') as h5file:
# oto.ini ๋ฐ์ดํ„ฐ ๋กœ๋“œ
oto_compressed = h5file['metadata']['oto_data'][:]
oto_json = gzip.decompress(oto_compressed.tobytes()).decode('utf-8')
oto_data = json.loads(oto_json)
# OtoEntry ๊ฐ์ฒด๋กœ ๋ณ€ํ™˜
for alias, data in oto_data.items():
self.oto_entries[alias] = OtoEntry(
filename=data['filename'],
alias=data['alias'],
offset=data['offset'],
consonant=data['consonant'],
cutoff=data['cutoff'],
preutterance=data['preutterance'],
overlap=data['overlap']
)
# ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ๋กœ๊ทธ
meta = h5file['metadata']
logger.info(f"์••์ถ•๋œ ๋ณด์ด์Šค๋ฑ…ํฌ ๋กœ๋“œ: {meta.attrs['voicebank_name']}")
logger.info(f"์ด {meta.attrs['total_entries']}๊ฐœ ์—”ํŠธ๋ฆฌ")
logger.info(f"์••์ถ•์œจ: {meta.attrs['compression_ratio_percent']:.1f}%")
except Exception as e:
logger.error(f"๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ๋กœ๋“œ ์‹คํŒจ: {e}")
raise
def get_audio_data(self, filename: str) -> Optional[Tuple[np.ndarray, int]]:
"""ํŠน์ • ํŒŒ์ผ์˜ ์˜ค๋””์˜ค ๋ฐ์ดํ„ฐ ๋กœ๋“œ (์บ์‹ฑ ์ง€์›)"""
base_filename = filename.replace('.wav', '')
# ์บ์‹œ ํ™•์ธ
if base_filename in self._audio_cache:
return self._audio_cache[base_filename]
try:
with h5py.File(self.hdf5_path, 'r') as h5file:
if base_filename not in h5file['audio_data']:
return None
file_group = h5file['audio_data'][base_filename]
audio_data = file_group['audio'][:]
sample_rate = file_group.attrs['sample_rate']
# ์บ์‹œ ๊ด€๋ฆฌ (LRU ๋ฐฉ์‹)
if len(self._audio_cache) >= self.cache_size_limit:
# ๊ฐ€์žฅ ์˜ค๋ž˜๋œ ํ•ญ๋ชฉ ์ œ๊ฑฐ
oldest_key = next(iter(self._audio_cache))
del self._audio_cache[oldest_key]
# ์บ์‹œ์— ์ €์žฅ
result = (audio_data, int(sample_rate))
self._audio_cache[base_filename] = result
return result
except Exception as e:
logger.error(f"์˜ค๋””์˜ค ๋ฐ์ดํ„ฐ ๋กœ๋“œ ์‹คํŒจ {filename}: {e}")
return None
def get_sample_for_phoneme(self, phoneme: str) -> Optional[OtoEntry]:
"""์Œ์†Œ์— ํ•ด๋‹นํ•˜๋Š” ์ƒ˜ํ”Œ ์ฐพ๊ธฐ (๊ธฐ์กด ๋กœ์ง๊ณผ ๋™์ผ)"""
# ์ •ํ™•ํ•œ ๋งค์น˜ ๋จผ์ € ์‹œ๋„
if phoneme in self.oto_entries:
return self.oto_entries[phoneme]
# ์œ ์‚ฌํ•œ ๋ฐœ์Œ ์ฐพ๊ธฐ
candidates = []
for alias in self.oto_entries:
entry = self.oto_entries[alias]
if entry.clean_alias == phoneme:
candidates.append(entry)
if candidates:
# ์ˆจ์†Œ๋ฆฌ๊ฐ€ ์•„๋‹Œ ๊ฒƒ์„ ์šฐ์„ 
non_breath = [c for c in candidates if not c.is_breath]
return non_breath[0] if non_breath else candidates[0]
return None
def list_available_phonemes(self) -> List[str]:
"""์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์Œ์†Œ ๋ชฉ๋ก"""
return list(set(entry.clean_alias for entry in self.oto_entries.values()))
def get_compression_info(self) -> Dict[str, any]:
"""์••์ถ• ์ •๋ณด ๋ฐ˜ํ™˜"""
try:
with h5py.File(self.hdf5_path, 'r') as h5file:
meta = h5file['metadata']
return {
'voicebank_name': meta.attrs['voicebank_name'],
'total_entries': meta.attrs['total_entries'],
'original_size_mb': meta.attrs['original_size_bytes'] / (1024*1024),
'compressed_size_mb': meta.attrs['compressed_size_bytes'] / (1024*1024),
'compression_ratio': meta.attrs['compression_ratio_percent'],
'file_path': str(self.hdf5_path)
}
except Exception as e:
logger.error(f"์••์ถ• ์ •๋ณด ๋กœ๋“œ ์‹คํŒจ: {e}")
return {}
def convert_voicebank_to_compressed_format(voicebank_path: str, output_path: str = None) -> bool:
"""๋ณด์ด์Šค๋ฑ…ํฌ๋ฅผ ์••์ถ• ํ˜•ํƒœ๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ํŽธ์˜ ํ•จ์ˆ˜"""
if output_path is None:
voicebank_name = Path(voicebank_path).name.replace(' ', '_')
output_path = f"voice/{voicebank_name}_compressed.h5"
converter = VoiceDataCompressor(output_path)
return converter.convert_voicebank_to_hdf5(voicebank_path)
if __name__ == "__main__":
# ํ…Œ์ŠคํŠธ์šฉ ๋ณ€ํ™˜
success = convert_voicebank_to_compressed_format("voice/hanseol CVC")
if success:
print("โœ… ๋ณด์ด์Šค๋ฑ…ํฌ ์••์ถ• ๋ณ€ํ™˜ ์™„๋ฃŒ!")
# ์••์ถ•๋œ ๋ฒ„์ „ ํ…Œ์ŠคํŠธ
compressed_vb = CompressedVoicebankManager("voice/hanseol_CVC_compressed.h5")
print(f"๐Ÿ“Š ์••์ถ• ์ •๋ณด: {compressed_vb.get_compression_info()}")
print(f"๐ŸŽค ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์Œ์†Œ: {len(compressed_vb.list_available_phonemes())}๊ฐœ")
else:
print("โŒ ๋ณด์ด์Šค๋ฑ…ํฌ ์••์ถ• ์‹คํŒจ!")