import os import json import traceback import logging import gradio as gr import numpy as np import librosa import torch import asyncio import edge_tts import re import shutil import time from datetime import datetime from fairseq import checkpoint_utils from fairseq.data.dictionary import Dictionary from lib.infer_pack.models import ( SynthesizerTrnMs256NSFsid, SynthesizerTrnMs256NSFsid_nono, SynthesizerTrnMs768NSFsid, SynthesizerTrnMs768NSFsid_nono, ) from vc_infer_pipeline import VC from config import Config # ============================= # LOAD ENVIRONMENT VARIABLES # ============================= from dotenv import load_dotenv load_dotenv() HF_TOKEN = os.getenv("HF_TOKEN") if HF_TOKEN: print("š Hugging Face token detected") os.environ["HUGGINGFACE_TOKEN"] = HF_TOKEN else: print("ā ļø No HF_TOKEN found") # ============================= # AUTO-DOWNLOAD DARI HUGGING FACE - UNTUK BLUE ARCHIVE # ============================= def download_required_weights(): """Fungsi untuk download model Blue Archive dari Hugging Face""" print("=" * 50) print("š BLUE ARCHIVE VOICE CONVERSION v2.0") print("=" * 50) target_dir = "weights" # Cek jika model sudah ada blue_archive_dir = os.path.join(target_dir, "Blue-Archive") if os.path.exists(blue_archive_dir): print(f"š Checking existing models in: {blue_archive_dir}") model_files = [] for root, dirs, files in os.walk(blue_archive_dir): for file in files: if file.endswith(".pth"): model_files.append(os.path.join(root, file)) if len(model_files) >= 1: # Minimal ada 1 model print(f"ā Models already exist: {len(model_files)} .pth files found") return True else: print(f"ā ļø Incomplete models: {len(model_files)} .pth files found") try: from huggingface_hub import snapshot_download repo_id = "Plana-Archive/Premium-Model" print(f"š„ Downloading from: {repo_id}") print("š Looking for: Blue Archive - RCV/weights") # Download dengan pattern yang spesifik untuk Blue Archive downloaded_path = snapshot_download( repo_id=repo_id, allow_patterns=[ "Blue Archive - RCV/weights/**", ], local_dir=".", local_dir_use_symlinks=False, token=HF_TOKEN, max_workers=2 ) print("ā Download completed") # Pindahkan file source_dir = "Blue Archive - RCV/weights" if os.path.exists(source_dir): os.makedirs(target_dir, exist_ok=True) # Pindahkan semua konten for item in os.listdir(source_dir): s = os.path.join(source_dir, item) d = os.path.join(target_dir, item) if os.path.isdir(s): if os.path.exists(d): shutil.rmtree(d) shutil.move(s, d) else: shutil.move(s, d) print(f"š Moved models to: {target_dir}") # Buat folder_info.json jika tidak ada folder_info_path = os.path.join(target_dir, "folder_info.json") if not os.path.exists(folder_info_path): folder_info = { "Blue-Archive": { "title": "Blue Archive - RCV Collection", "folder_path": "Blue-Archive", "description": "Official RVC Weights for Blue Archive characters by Plana-Archive", "enable": True } } with open(folder_info_path, "w", encoding="utf-8") as f: json.dump(folder_info, f, indent=2, ensure_ascii=False) print(f"š Created folder_info.json") # Buat model_info.json yang sesuai dengan file yang sebenarnya create_model_info_from_files(target_dir) return True else: print("ā Source directory not found after download!") return False except Exception as e: print(f"ā ļø Download failed: {str(e)}") traceback.print_exc() print("\nš Manual setup:") print("1. Create folder: weights/") print("2. Download from: https://huggingface.co/Plana-Archive/Anime-RCV/tree/main/Blue Archive - RCV/weights") print("3. Put Blue-Archive folder in weights/") return False def create_model_info_from_files(base_path): """Buat model_info.json berdasarkan file yang sebenarnya ada untuk Blue Archive""" blue_archive_dir = os.path.join(base_path, "Blue-Archive") if not os.path.exists(blue_archive_dir): return model_info_path = os.path.join(blue_archive_dir, "model_info.json") # Scan semua karakter dari subfolder model_info = {} # Cari semua folder karakter for char_folder in os.listdir(blue_archive_dir): char_path = os.path.join(blue_archive_dir, char_folder) if not os.path.isdir(char_path): continue # Cari file dalam folder karakter pth_files = [f for f in os.listdir(char_path) if f.endswith('.pth')] index_files = [f for f in os.listdir(char_path) if f.endswith('.index')] image_files = [f for f in os.listdir(char_path) if f.lower().endswith(('.png', '.jpg', '.jpeg'))] if not pth_files: continue # Format nama karakter untuk judul # Contoh: "AjitaniHifumi" -> "Ajitani Hifumi" char_name_formatted = re.sub(r"([a-z])([A-Z])", r"\1 \2", char_folder) model_info[char_folder] = { "enable": True, "model_path": pth_files[0], "title": f"Blue Archive - {char_name_formatted}", "cover": image_files[0] if image_files else "cover.png", "feature_retrieval_library": index_files[0] if index_files else "", "author": "Plana-Archive" } with open(model_info_path, "w", encoding="utf-8") as f: json.dump(model_info, f, indent=2, ensure_ascii=False) print(f"ā Created model_info.json with {len(model_info)} characters") return model_info # Jalankan download download_required_weights() # Inisialisasi konfigurasi config = Config() logging.getLogger("numba").setLevel(logging.WARNING) logging.getLogger("fairseq").setLevel(logging.WARNING) # Cache untuk model model_cache = {} hubert_loaded = False hubert_model = None spaces = True if spaces: audio_mode = ["Upload audio", "TTS Audio"] else: audio_mode = ["Input path", "Upload audio", "TTS Audio"] f0method_mode = ["pm", "harvest"] if os.path.isfile("rmvpe.pt"): f0method_mode.insert(2, "rmvpe") def clean_title(title): title = re.sub(r'^Blue Archive\s*-\s*', '', title, flags=re.IGNORECASE) return re.sub(r'\s*-\s*\d+\s*epochs', '', title, flags=re.IGNORECASE) # OPTIMASI: Audio processing yang lebih cepat def _load_audio_input(vc_audio_mode, vc_input, vc_upload, tts_text, spaces_limit=20): temp_file = None try: if vc_audio_mode == "Input path" and vc_input: # Gunakan librosa untuk loading audio, sr = librosa.load(vc_input, sr=16000, mono=True) return audio.astype(np.float32), 16000, None elif vc_audio_mode == "Upload audio": if vc_upload is None: raise ValueError("Mohon upload file audio terlebih dahulu!") sampling_rate, audio = vc_upload # Konversi ke float32 if audio.dtype != np.float32: audio = audio.astype(np.float32) / np.iinfo(audio.dtype).max if len(audio.shape) > 1: audio = np.mean(audio, axis=0) if sampling_rate != 16000: audio = librosa.resample(audio, orig_sr=sampling_rate, target_sr=16000, res_type='kaiser_fast') return audio.astype(np.float32), 16000, None elif vc_audio_mode == "TTS Audio": if not tts_text or tts_text.strip() == "": raise ValueError("Mohon masukkan teks untuk TTS!") temp_file = "tts_temp.wav" # Async TTS dengan timeout async def tts_task(): return await edge_tts.Communicate(tts_text, "ja-JP-NanamiNeural").save(temp_file) # Jalankan dengan timeout try: asyncio.run(asyncio.wait_for(tts_task(), timeout=10)) except asyncio.TimeoutError: raise ValueError("TTS timeout! Silakan coba lagi.") audio, sr = librosa.load(temp_file, sr=16000, mono=True) return audio.astype(np.float32), 16000, temp_file except Exception as e: if temp_file and os.path.exists(temp_file): os.remove(temp_file) raise e raise ValueError("Invalid audio mode or missing input.") def adjust_audio_speed(audio, speed): if speed == 1.0: return audio # Gunakan metode yang lebih cepat untuk time stretching return librosa.effects.time_stretch(audio.astype(np.float32), rate=speed) # OPTIMASI: Fungsi preprocessing audio yang lebih efisien def preprocess_audio(audio): # Normalize audio if np.max(np.abs(audio)) > 1.0: audio = audio / np.max(np.abs(audio)) * 0.9 return audio.astype(np.float32) # OPTIMASI: Pipeline inferensi yang lebih cepat def create_vc_fn(model_key, tgt_sr, net_g, vc, if_f0, version, file_index): def vc_fn( vc_audio_mode, vc_input, vc_upload, tts_text, f0_up_key, f0_method, index_rate, filter_radius, resample_sr, rms_mix_rate, protect, speed, ): temp_audio_file = None try: # Clear GPU cache sebelum memulai if torch.cuda.is_available(): torch.cuda.empty_cache() # Preload model ke GPU net_g.to(config.device) yield "Status: š Memproses audio...", None # Load audio dengan optimasi audio, sr, temp_audio_file = _load_audio_input(vc_audio_mode, vc_input, vc_upload, tts_text) # Preprocess audio audio = preprocess_audio(audio) # Konversi ke tensor dengan optimasi memory audio_tensor = torch.FloatTensor(audio).to(config.device) times = [0, 0, 0] # OPTIMASI: Gunakan batch processing untuk audio yang panjang max_chunk_size = 16000 * 30 # 30 detik per chunk if len(audio) > max_chunk_size: chunks = [] for i in range(0, len(audio), max_chunk_size): chunk = audio[i:i + max_chunk_size] chunk_tensor = torch.FloatTensor(chunk).to(config.device) chunk_opt = vc.pipeline( hubert_model, net_g, 0, chunk_tensor, "chunk" if vc_input else "temp", times, int(f0_up_key), f0_method, file_index, index_rate, if_f0, filter_radius, tgt_sr, resample_sr, rms_mix_rate, version, protect, f0_file=None, ) chunks.append(chunk_opt) audio_opt = np.concatenate(chunks) else: # Processing single chunk audio_opt = vc.pipeline( hubert_model, net_g, 0, audio_tensor, vc_input if vc_input else "temp", times, int(f0_up_key), f0_method, file_index, index_rate, if_f0, filter_radius, tgt_sr, resample_sr, rms_mix_rate, version, protect, f0_file=None, ) # Pastikan audio_opt dalam format float32 audio_opt = audio_opt.astype(np.float32) # Apply speed adjustment if speed != 1.0: audio_opt = adjust_audio_speed(audio_opt, speed) # Normalize output dan pastikan float32 if np.max(np.abs(audio_opt)) > 0: audio_opt = (audio_opt / np.max(np.abs(audio_opt)) * 0.9).astype(np.float32) # Return format yang sesuai untuk gradio.Audio yield "Status: ā Selesai!", (tgt_sr, audio_opt) except Exception as e: yield f"ā Error: {str(e)}\n\n{traceback.format_exc()}", None finally: # Cleanup if temp_audio_file and os.path.exists(temp_audio_file): os.remove(temp_audio_file) # Kosongkan GPU cache if torch.cuda.is_available(): torch.cuda.empty_cache() # Return model ke CPU untuk hemat memory (kecuali untuk cache) if model_key not in model_cache: net_g.to('cpu') return vc_fn def load_model(): categories = [] base_path = "weights" if not os.path.exists(base_path): print(f"ā Folder '{base_path}' not found!") return categories # Baca folder_info.json atau buat default folder_info_path = f"{base_path}/folder_info.json" if not os.path.isfile(folder_info_path): print(f"š Creating default folder_info.json...") folder_info = { "Blue-Archive": { "title": "Blue Archive - RCV Collection", "folder_path": "Blue-Archive", "description": "Official RVC Weights for Blue Archive characters by Plana-Archive", "enable": True } } with open(folder_info_path, "w", encoding="utf-8") as f: json.dump(folder_info, f, indent=2, ensure_ascii=False) with open(folder_info_path, "r", encoding="utf-8") as f: folder_info = json.load(f) for category_name, category_info in folder_info.items(): if not category_info.get('enable', True): continue category_title, category_folder, description = ( category_info['title'], category_info['folder_path'], category_info['description'] ) models = [] model_info_path = f"{base_path}/{category_folder}/model_info.json" # Jika model_info.json tidak ada, buat dari file yang ada if not os.path.exists(model_info_path): print(f" ā ļø model_info.json not found, creating from files...") model_info = create_model_info_from_files(base_path) if not model_info: continue if os.path.exists(model_info_path): with open(model_info_path, "r", encoding="utf-8") as f: models_info = json.load(f) for character_name, info in models_info.items(): if not info.get('enable', True): continue model_title, model_name, model_author = ( info['title'], info['model_path'], info.get("author") ) # Buat key unik untuk cache cache_key = f"{category_folder}_{character_name}" # Gunakan cache jika tersedia if cache_key in model_cache: tgt_sr, net_g, vc, if_f0, version, model_index = model_cache[cache_key] else: model_cover = f"{base_path}/{category_folder}/{character_name}/{info['cover']}" model_index = f"{base_path}/{category_folder}/{character_name}/{info['feature_retrieval_library']}" # Load model weights model_path = f"{base_path}/{category_folder}/{character_name}/{model_name}" cpt = torch.load(model_path, map_location="cpu") tgt_sr = cpt["config"][-1] cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] if_f0, version = cpt.get("f0", 1), cpt.get("version", "v1") # Inisialisasi model if version == "v1": if if_f0 == 1: net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) else: net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) else: if if_f0 == 1: net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half) else: net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) # Load weights if hasattr(net_g, "enc_q"): del net_g.enc_q net_g.load_state_dict(cpt["weight"], strict=False) net_g.eval().to('cpu') # Simpan di CPU dulu # Buat VC instance vc = VC(tgt_sr, config) # Cache model model_cache[cache_key] = (tgt_sr, net_g, vc, if_f0, version, model_index) models.append(( character_name, model_title, model_author, f"{base_path}/{category_folder}/{character_name}/{info['cover']}", version, create_vc_fn(cache_key, tgt_sr, net_g, vc, if_f0, version, model_index) )) categories.append([category_title, category_folder, description, models]) return categories def load_hubert(): global hubert_model, hubert_loaded if hubert_loaded: return torch.serialization.add_safe_globals([Dictionary]) models, _, _ = checkpoint_utils.load_model_ensemble_and_task( ["hubert_base.pt"], suffix="", ) hubert_model = models[0].to(config.device) hubert_model = hubert_model.half() if config.is_half else hubert_model.float() hubert_model.eval() hubert_loaded = True def change_audio_mode(vc_audio_mode): is_input_path = vc_audio_mode == "Input path" is_upload = vc_audio_mode == "Upload audio" is_tts = vc_audio_mode == "TTS Audio" return ( gr.Textbox.update(visible=is_input_path), gr.Checkbox.update(visible=is_upload), gr.Audio.update(visible=is_upload), gr.Textbox.update(visible=is_tts, lines=4 if is_tts else 2) ) def use_microphone(microphone): return gr.Audio.update(source="microphone" if microphone else "upload") # CSS tetap sama css = """ @import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&family=Quicksand:wght@400;600;700&display=swap'); body, .gradio-container { background-color: #ffffff !important; font-family: 'Inter', sans-serif !important; } footer { display: none !important; } .arona-loading-container { display: flex; align-items: center; justify-content: center; gap: 15px; margin-top: 15px; padding: 10px; } .loading-text-blue { font-family: 'Quicksand', sans-serif; font-size: 20px; font-weight: 700; color: #00b0ff; letter-spacing: 1px; } .loading-gif-small { width: 100px; height: auto; border-radius: 8px; } .header-img-container { text-align: center; padding: 10px 0; background: #ffffff !important; } .header-img { width: 100%; max-width: 500px; border-radius: 15px; margin: 0 auto; display: block; } .status-card { background: #ffffff; border: 1px solid #e1f0ff; border-radius: 14px; padding: 15px 10px; margin: 0 auto 15px auto; max-width: 400px; display: flex; flex-direction: column; align-items: center; } .status-online-box { display: flex; align-items: center; gap: 8px; margin-bottom: 12px; } .status-details-container { display: flex; width: 100%; justify-content: center; align-items: center; border-top: 1px solid #f0f7ff; padding-top: 10px; } .status-detail-item { flex: 1; display: flex; flex-direction: column; align-items: center; text-align: center; } .status-detail-item:first-child { border-right: 1px solid #e1f0ff; } .status-text-main { font-size: 13px !important; font-weight: 600; color: #546e7a; } .status-text-sub { font-size: 11px !important; color: #90a4ae; } .dot-online { height: 8px; width: 8px; background-color: #2ecc71; border-radius: 50%; display: inline-block; animation: blink-green 1.5s infinite; } @keyframes blink-green { 0% { opacity: 1; } 50% { opacity: 0.4; } 100% { opacity: 1; } } .gr-form .gr-block label span, .gr-box label span, .gr-panel label span { background: linear-gradient(135deg, #4fc3f7 0%, #00b0ff 100%) !important; color: white !important; padding: 4px 12px !important; border-radius: 8px !important; font-weight: 600 !important; box-shadow: 0 0 15px rgba(79, 195, 247, 0.4) !important; } input[type="range"] { accent-color: #00b0ff !important; } .char-scroll-box { display: grid !important; grid-template-columns: repeat(2, 1fr) !important; gap: 12px !important; max-height: 280px; overflow-y: auto; padding: 15px; background: #ffffff; border: 2px solid #eef5ff; border-radius: 14px; } .char-card { background: white; padding: 12px; border-radius: 12px; cursor: pointer; border: 1px solid #e1f5fe; border-left: 5px solid #4fc3f7; transition: all 0.2s ease; display: flex; flex-direction: column; height: 65px; } .char-name-jp { font-weight: 700; font-size: 11px !important; color: #455a64; } .char-name-en { font-size: 8.5px !important; color: #90a4ae; text-transform: uppercase; } .speed-section { margin-top: 20px; padding: 18px; border-radius: 20px; background: linear-gradient(135deg, #f0f7ff 0%, #ffffff 100%); border: 2px solid #e1f0ff; } .speed-title { font-family: 'Quicksand', sans-serif; font-weight: 700; color: #4ea8de; text-align: center; margin-bottom: 12px; font-size: 14px; } .generate-btn { font-family: 'Quicksand', sans-serif; font-weight: 700 !important; background: linear-gradient(135deg, #64b5f6 0%, #2196f3 100%) !important; color: white !important; border-radius: 12px !important; } .footer-text { text-align: center; padding: 20px; border-top: 1px solid #f0f4f8; color: #b0bec5; font-size: 11px; } .speed-notes-box { font-family: 'Arial'; border: 1px solid #ffd8b2; border-radius: 8px; padding: 12px; background: #fff7ed; border-left: 4px solid #fb923c; margin-top: 10px; } .speed-notes-title { color: #c2410c; font-size: 12px; margin: 0 0 5px 0; font-weight: bold; } .speed-notes-content { color: #9a3412; font-size: 11px; margin: 0; } .video-demo-container { text-align: center; padding: 20px; background: #ffffff; border-radius: 20px; border: 2px solid #e1f0ff; margin: 20px auto; max-width: 800px; } .video-demo-title { font-family: 'Quicksand', sans-serif; font-weight: 700; color: #4fc3f7; font-size: 18px; margin-bottom: 15px; } .video-demo-player { width: 100%; border-radius: 15px; box-shadow: 0 10px 30px rgba(0, 176, 255, 0.2); } """ if __name__ == '__main__': # Preload hubert model load_hubert() # Load models dengan cache categories = load_model() total_models = sum(len(models) for _, _, _, models in categories) # Optimasi Gradio dengan queue yang lebih efisien with gr.Blocks(css=css, theme=gr.themes.Soft()) as app: gr.HTML('
Pitch: Mengatur nada suara (naik/turun)
Algoritma: Metode ekstraksi nada (RMVPE paling akurat)
Retrieval: Kemiripan karakter suara (0-1)
Filter: Smoothing untuk mengurangi noise
Volume: Stabilitas volume output
Protect: Proteksi suara agar tetap natural
Pitch: +12 (Ubah untuk Character Cewek)
Pitch: (0) (Ubah untuk Character Cowok "Senseii")
Algoritma: RMVPE (Akurasi tinggi)
Retrieval: 0.75 (Keseimbangan)
Filter: 7 (Noise reduction optimal)
Volume: 0.76 (Stabil)
Protect: 0.33 (Natural)
