import os import uuid import shutil import asyncio import random import requests import zipfile import time import numpy as np import soundfile as sf import librosa from pathlib import Path from fastapi import FastAPI, UploadFile, File, Form, Request, BackgroundTasks from fastapi.responses import FileResponse, JSONResponse from fastapi.templating import Jinja2Templates from gradio_client import Client, handle_file app = FastAPI() # --- لیست کارگرها --- # اینجا بعداً می‌توانید ۵۰ تا کارگر اضافه کنید WORKER_URLS = [ "https://ezmary-rvc-zeroworker1.hf.space", "https://ezmary-rvc-zeroworker2.hf.space", "https://ezmary-rvc-zeroworker3.hf.space", "https://rvisi-rvskareqar4.hf.space", "https://rvisi-rvskareqar5.hf.space", "https://rvisi-rvskareqar6.hf.space", "https://rvisi-rvskareqar7.hf.space", "https://rvisi-rvskareqar8.hf.space", "https://rvisi-rvskareqar9.hf.space", "https://rvisi-rvskareqar10.hf.space", "https://rvisi-rvskareqar11.hf.space", "https://rvisiasa-rvc-zeroworker12.hf.space", "https://rvisiasa-rvc-zeroworker13.hf.space", "https://rvisiasa-rvc-zeroworker14.hf.space", "https://rvisiasa-rvc-zeroworker15.hf.space", "https://rvisiasa-rvc-zeroworker16.hf.space", "https://rvisiasa-rvc-zeroworker17.hf.space", "https://rvisiasa-rvc-zeroworker18.hf.space", "https://rvisiasa-rvc-zeroworker19.hf.space", "https://rvis84651-rvc-zeroworker20.hf.space", "https://rvis84651-rvc-zeroworker21.hf.space", "https://rvis84651-rvc-zeroworker22.hf.space", "https://rvis84651-rvc-zeroworker23.hf.space", "https://rvis84651-rvc-zeroworker24.hf.space", "https://rvis84651-rvc-zeroworker25.hf.space", "https://rvis84651-rvc-zeroworker26.hf.space", "https://rvis84651-rvc-zeroworker27.hf.space" ] BASE_DIR = Path(__file__).parent.absolute() TEMP_DIR = BASE_DIR / "temp" RESULTS_DIR = BASE_DIR / "results" DOWNLOADS_DIR = BASE_DIR / "downloads" TEMP_DIR.mkdir(exist_ok=True) RESULTS_DIR.mkdir(exist_ok=True) DOWNLOADS_DIR.mkdir(exist_ok=True) templates = Jinja2Templates(directory="templates") JOBS = {} # --- کلاس مدیریت اتمیک کارگرها (جدید) --- class AtomicWorkerManager: def __init__(self, urls): self.urls = urls self.total = len(urls) self.current_index = 0 self.lock = asyncio.Lock() # قفل برای جلوگیری از تداخل درخواست‌ها async def get_next_worker(self): """ این تابع به صورت اتمیک کارگر بعدی را برمی‌گرداند و نوبت را جلو می‌برد. """ async with self.lock: worker = self.urls[self.current_index] # جلو بردن نوبت (چرخشی) self.current_index = (self.current_index + 1) % self.total return worker # ایجاد نمونه مدیر کارگر worker_manager = AtomicWorkerManager(WORKER_URLS) # --- تابع دانلود --- def download_file(url, dest_folder): try: url = url.strip() if "huggingface.co" in url: url = url.replace("/blob/", "/resolve/").replace("/tree/", "/resolve/") if "?download=true" not in url: url += "&download=true" if "?" in url else "?download=true" filename = url.split('/')[-1].split('?')[0] if not any(filename.endswith(ext) for ext in ['.pth', '.index', '.zip']): if "zip" in url.lower(): filename = f"model_{random.randint(1000,9999)}.zip" else: filename = f"model_{random.randint(1000,9999)}.pth" dest_path = dest_folder / filename if dest_path.exists(): try: os.remove(dest_path) except: pass print(f"Downloading: {url}") headers = {'User-Agent': 'Mozilla/5.0'} with requests.get(url, stream=True, headers=headers, allow_redirects=True) as r: r.raise_for_status() if 'text/html' in r.headers.get('Content-Type', '').lower(): raise Exception("لینک HTML است.") with open(dest_path, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) if dest_path.stat().st_size < 1024 * 10: os.remove(dest_path) raise Exception("فایل نامعتبر است.") return dest_path except Exception as e: if 'dest_path' in locals() and dest_path.exists(): try: os.remove(dest_path) except: pass raise e # --- تابع برش هوشمند --- def smart_split_audio(audio_path, job_id): print(f"Analyzing audio: {audio_path}") try: y, sr = librosa.load(str(audio_path), sr=None) duration = librosa.get_duration(y=y, sr=sr) if duration <= 30: return [str(audio_path)] chunks_info = [] min_segment = 6 max_segment = 30 total_samples = len(y) current_sample = 0 chunk_idx = 0 while current_sample < total_samples: start_search = current_sample + (min_segment * sr) end_search = current_sample + (max_segment * sr) if end_search >= total_samples: split_point = total_samples else: search_region = y[start_search:end_search] rms = librosa.feature.rms(y=search_region, frame_length=2048, hop_length=512)[0] min_rms_idx = np.argmin(rms) split_offset = min_rms_idx * 512 split_point = start_search + split_offset chunk_audio = y[current_sample:split_point] chunk_name = f"{job_id}_chunk_{chunk_idx}.wav" chunk_path = TEMP_DIR / chunk_name sf.write(str(chunk_path), chunk_audio, sr) chunks_info.append(str(chunk_path)) current_sample = split_point chunk_idx += 1 print(f"Split into {len(chunks_info)} chunks.") return chunks_info except Exception as e: print(f"Split Error: {e}") return [str(audio_path)] # --- پردازشگر یک قطعه (با تلاش بی‌نهایت) --- def process_single_chunk_sync(chunk_path, model_path, index_path, params, assigned_worker_url, all_worker_urls): """ 1. ابتدا با کارگر تعیین شده (assigned_worker_url) تلاش می‌کند. 2. اگر نشد، وارد چرخه تلاش مجدد با سایر کارگرها می‌شود. """ attempt = 0 current_worker = assigned_worker_url # تلاش اول: کارگر اختصاصی while True: attempt += 1 try: # print(f"Processing chunk on {current_worker} (Attempt {attempt})...") client = Client(current_worker) result = client.predict( audio_files=[handle_file(chunk_path)], file_m=handle_file(model_path), file_index=handle_file(index_path) if index_path else None, **params, type_output="wav", steps=1, api_name="/run" ) output_file = result[0] if isinstance(result, list) and result else (result if isinstance(result, str) else None) if output_file and os.path.exists(output_file): if os.path.getsize(output_file) > 1000: return output_file print(f"Worker {current_worker} returned invalid file. Switching worker...") except Exception as e: print(f"Error on {current_worker}: {e}. Switching worker...") # اگر تلاش اول شکست خورد، کارگر بعدی را تصادفی انتخاب کن # (برای جلوگیری از قفل شدن روی یک کارگر خراب) time.sleep(2) current_worker = random.choice(all_worker_urls) # --- تسک اصلی --- async def process_rvc_task(job_id, audio_path_str, model_url, index_url_input, pitch, algo, index_inf, res_filter, env_ratio, protect, denoise, reverb): JOBS[job_id]["status"] = "processing" JOBS[job_id]["log"] = "شروع عملیات..." audio_path = Path(audio_path_str) files_to_clean = [audio_path] dirs_to_clean = [] chunk_paths = [] try: if not audio_path.exists(): raise FileNotFoundError("فایل ورودی یافت نشد.") # 1. دانلود مدل JOBS[job_id]["log"] = "در حال دانلود مدل..." downloaded_path = download_file(model_url, DOWNLOADS_DIR) files_to_clean.append(downloaded_path) final_model_path = None final_index_path = None if downloaded_path.suffix == ".zip": extract_dir = DOWNLOADS_DIR / job_id dirs_to_clean.append(extract_dir) extract_dir.mkdir(exist_ok=True) try: with zipfile.ZipFile(downloaded_path, 'r') as z: z.extractall(extract_dir) for root, _, files in os.walk(extract_dir): for file in files: fp = Path(root) / file if file.endswith(".pth") and not final_model_path: final_model_path = fp if file.endswith(".index") and not final_index_path: final_index_path = fp except: pass else: final_model_path = downloaded_path if index_url_input and len(index_url_input) > 5: try: idx = download_file(index_url_input, DOWNLOADS_DIR) final_index_path = idx files_to_clean.append(idx) except: pass if not final_model_path: raise Exception("مدل یافت نشد.") # 4. برش فایل JOBS[job_id]["log"] = "برش و توزیع..." chunk_paths = await asyncio.to_thread(smart_split_audio, audio_path, job_id) for cp in chunk_paths: if cp != str(audio_path): files_to_clean.append(Path(cp)) # 5. ارسال به کارگرها (با توزیع اتمیک و چرخشی) count = len(chunk_paths) JOBS[job_id]["log"] = f"پردازش {count} قطعه (صبور باشید ☘️)..." rvc_params = { "pitch_alg": algo, "pitch_lvl": float(pitch), "index_inf": float(index_inf), "r_m_f": int(res_filter), "e_r": float(env_ratio), "c_b_p": float(protect), "active_noise_reduce": denoise, "audio_effects": reverb } tasks = [] for chunk in chunk_paths: # دریافت نوبت کارگر بعدی از کلاس اتمیک assigned_worker = await worker_manager.get_next_worker() task = asyncio.to_thread( process_single_chunk_sync, chunk, str(final_model_path), str(final_index_path) if final_index_path else None, rvc_params, assigned_worker, # کارگر اولویت اول WORKER_URLS # لیست کل برای تلاش مجدد ) tasks.append(task) # اجرای موازی processed_chunks = await asyncio.gather(*tasks) # 6. سرهم‌بندی JOBS[job_id]["log"] = "ترکیب نهایی..." final_audio_segments = [] sr = None for output_file in processed_chunks: data, sample_rate = sf.read(output_file) if sr is None: sr = sample_rate final_audio_segments.append(data) files_to_clean.append(Path(output_file)) full_audio = np.concatenate(final_audio_segments) final_filename = f"final_{job_id}.wav" final_path = RESULTS_DIR / final_filename sf.write(str(final_path), full_audio, sr if sr else 44100) JOBS[job_id]["status"] = "completed" JOBS[job_id]["filename"] = final_filename JOBS[job_id]["log"] = "تکمیل شد." except Exception as e: print(f"JOB ERROR: {e}") JOBS[job_id]["status"] = "failed" JOBS[job_id]["log"] = str(e) finally: for p in files_to_clean: if p and p.exists(): try: os.remove(p) except: pass for d in dirs_to_clean: if d and d.exists(): try: shutil.rmtree(d) except: pass @app.get("/") def home(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.post("/upload") async def create_job(background_tasks: BackgroundTasks, audio_file: UploadFile = File(...), model_url: str = Form(...), index_url: str = Form(None), pitch: int = Form(0), algo: str = Form("rmvpe+"), index_inf: float = Form(0.75), res_filter: int = Form(3), env_ratio: float = Form(0.25), protect: float = Form(0.33), denoise: bool = Form(False), reverb: bool = Form(False)): job_id = str(uuid.uuid4()) filename = audio_file.filename ext = filename.split('.')[-1] if '.' in filename else "wav" audio_path = TEMP_DIR / f"{job_id}_input.{ext}" with open(audio_path, "wb") as f: shutil.copyfileobj(audio_file.file, f) if not model_url or len(model_url.strip()) < 5: return JSONResponse({"error": "لینک مدل الزامی است"}, status_code=400) JOBS[job_id] = {"status": "queued", "log": "در صف...", "filename": None} background_tasks.add_task( process_rvc_task, job_id, str(audio_path), model_url.strip(), index_url.strip() if index_url else None, pitch, algo, index_inf, res_filter, env_ratio, protect, denoise, reverb ) return {"job_id": job_id, "status": "queued"} @app.get("/status/{job_id}") def check_status(job_id: str): return JOBS.get(job_id, {"status": "not_found"}) @app.get("/download/{filename}") def download_result(filename: str): path = RESULTS_DIR / filename if path.exists(): return FileResponse(path, filename=filename, media_type="audio/wav") return {"error": "File not found"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)