Spaces:
Runtime error
Runtime error
| # master_god_tier_api.py | |
| import os | |
| import shutil | |
| import json | |
| import hashlib | |
| import secrets | |
| import datetime | |
| import torch | |
| import librosa | |
| import soundfile as sf | |
| import noisereduce as nr | |
| from pydub import AudioSegment | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm | |
| from jose import jwt, JWTError | |
| from typing import Dict, Optional | |
| import uvicorn | |
| import numpy as np | |
| from pathlib import Path | |
| from final_multi import main as clone_voice_pipeline # Your main function | |
| # ----------------------------- | |
| # CONFIGURATION | |
| # ----------------------------- | |
| BASE_DIR = Path(__file__).parent.resolve() | |
| UPLOAD_DIR = BASE_DIR / "uploads" | |
| OUTPUT_DIR = BASE_DIR / "outputs" | |
| PREPARED_DIR = BASE_DIR / "prepared" | |
| USER_DB_FILE = BASE_DIR / "users.json" | |
| SESSION_FILE = BASE_DIR / "sessions.json" | |
| SECRET_KEY = "supersecretjwtkey" # Change in production | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 1 day | |
| PACKAGE_PRICE_PKR = 7000 | |
| PACKAGE_DURATION_DAYS = 30 | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| os.makedirs(PREPARED_DIR, exist_ok=True) | |
| # ----------------------------- | |
| # APP INIT | |
| # ----------------------------- | |
| app = FastAPI( | |
| title="Master God Tier Voice Cloner API", | |
| description="Production-ready API for ultra-realistic voice cloning with multi-user & subscription support", | |
| version="2.0.0" | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") | |
| # ----------------------------- | |
| # UTILS | |
| # ----------------------------- | |
| def load_users(): | |
| if USER_DB_FILE.exists(): | |
| with open(USER_DB_FILE, "r") as f: | |
| return json.load(f) | |
| return {} | |
| def save_users(users): | |
| with open(USER_DB_FILE, "w") as f: | |
| json.dump(users, f, indent=2) | |
| def hash_password(password: str): | |
| return hashlib.sha256(password.encode()).hexdigest() | |
| def create_access_token(data: dict, expires_minutes: int = ACCESS_TOKEN_EXPIRE_MINUTES): | |
| to_encode = data.copy() | |
| expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=expires_minutes) | |
| to_encode.update({"exp": expire}) | |
| return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| def verify_password(plain_password, hashed_password): | |
| return hash_password(plain_password) == hashed_password | |
| def get_current_user(token: str = Depends(oauth2_scheme)): | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| username: str = payload.get("sub") | |
| if username is None: | |
| raise HTTPException(status_code=401, detail="Invalid authentication credentials") | |
| users = load_users() | |
| if username not in users: | |
| raise HTTPException(status_code=401, detail="User not found") | |
| return users[username] | |
| except JWTError: | |
| raise HTTPException(status_code=401, detail="Invalid authentication credentials") | |
| def check_subscription(user: dict): | |
| sub_end = datetime.datetime.fromisoformat(user.get("subscription_end", "1970-01-01")) | |
| if sub_end < datetime.datetime.utcnow(): | |
| raise HTTPException(status_code=402, detail="Subscription expired") | |
| def preprocess_audio(input_file: Path): | |
| """Convert audio to 16kHz mono wav, noise reduction""" | |
| filename = input_file.stem | |
| wav_path = PREPARED_DIR / f"{filename}.wav" | |
| ext = input_file.suffix.lower().replace(".", "") | |
| if ext not in ["wav", "mp3", "m4a", "aac"]: | |
| raise HTTPException(status_code=400, detail="Unsupported audio format") | |
| audio = AudioSegment.from_file(input_file, format=ext) | |
| audio = audio.set_frame_rate(16000).set_channels(1) | |
| audio.export(wav_path, format="wav") | |
| y, sr = librosa.load(wav_path, sr=16000) | |
| y_reduced = nr.reduce_noise(y=y, sr=sr) | |
| sf.write(wav_path, y_reduced, sr) | |
| return wav_path | |
| def save_upload_file(upload_file: UploadFile, destination: Path): | |
| with open(destination, "wb") as buffer: | |
| shutil.copyfileobj(upload_file.file, buffer) | |
| return destination | |
| # ----------------------------- | |
| # USER ENDPOINTS | |
| # ----------------------------- | |
| async def register(username: str = Form(...), password: str = Form(...)): | |
| users = load_users() | |
| if username in users: | |
| raise HTTPException(status_code=400, detail="Username already exists") | |
| users[username] = { | |
| "username": username, | |
| "password": hash_password(password), | |
| "role": "user", | |
| "subscription_start": None, | |
| "subscription_end": None, | |
| "voice_samples": [], | |
| "analytics": [] | |
| } | |
| save_users(users) | |
| return {"success": True, "message": "User registered"} | |
| async def login(form_data: OAuth2PasswordRequestForm = Depends()): | |
| users = load_users() | |
| user = users.get(form_data.username) | |
| if not user or not verify_password(form_data.password, user["password"]): | |
| raise HTTPException(status_code=401, detail="Invalid credentials") | |
| token = create_access_token({"sub": user["username"]}) | |
| return {"access_token": token, "token_type": "bearer"} | |
| async def read_me(current_user: dict = Depends(get_current_user)): | |
| return current_user | |
| # ----------------------------- | |
| # PAYMENT SIMULATION | |
| # ----------------------------- | |
| async def subscribe(current_user: dict = Depends(get_current_user)): | |
| # Simulate payment | |
| now = datetime.datetime.utcnow() | |
| sub_end = now + datetime.timedelta(days=PACKAGE_DURATION_DAYS) | |
| users = load_users() | |
| users[current_user["username"]]["subscription_start"] = now.isoformat() | |
| users[current_user["username"]]["subscription_end"] = sub_end.isoformat() | |
| save_users(users) | |
| return {"success": True, "message": f"Subscribed! Expires: {sub_end.isoformat()}"} | |
| # ----------------------------- | |
| # VOICE CLONING ENDPOINT | |
| # ----------------------------- | |
| async def clone_voice( | |
| audio: UploadFile = File(...), | |
| text_file: UploadFile = File(...), | |
| gender: str = Form(...), | |
| language: str = Form(...), | |
| inference_mode: str = Form("natural"), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| check_subscription(current_user) | |
| # Save uploaded files | |
| audio_path = UPLOAD_DIR / audio.filename | |
| text_path = UPLOAD_DIR / text_file.filename | |
| save_upload_file(audio, audio_path) | |
| save_upload_file(text_file, text_path) | |
| # Check if user has previously uploaded same sample | |
| sample_key = f"{audio.filename}_{current_user['username']}" | |
| if sample_key in current_user.get("voice_samples", []): | |
| processed_audio = PREPARED_DIR / audio_path.name | |
| else: | |
| processed_audio = preprocess_audio(audio_path) | |
| users = load_users() | |
| users[current_user["username"]]["voice_samples"].append(sample_key) | |
| save_users(users) | |
| # Read text content | |
| text_file.file.seek(0) | |
| text_content = text_file.file.read().decode("utf-8") | |
| # Output path | |
| timestamp = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| output_filename = f"{current_user['username']}_{timestamp}.wav" | |
| output_path = OUTPUT_DIR / output_filename | |
| # Run pipeline | |
| try: | |
| clone_voice_pipeline( | |
| audio=str(processed_audio), | |
| text=text_content, | |
| gender=gender, | |
| language=language, | |
| output_path=str(output_path), | |
| inference_mode=inference_mode | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Pipeline failed: {e}") | |
| # Save analytics | |
| users = load_users() | |
| analytics_entry = { | |
| "timestamp": datetime.datetime.utcnow().isoformat(), | |
| "audio": str(audio.filename), | |
| "text_file": str(text_file.filename), | |
| "gender": gender, | |
| "language": language, | |
| "output_file": str(output_path) | |
| } | |
| users[current_user["username"]]["analytics"].append(analytics_entry) | |
| save_users(users) | |
| return FileResponse(output_path, media_type="audio/wav", filename=output_filename) | |
| # ----------------------------- | |
| # ADMIN ENDPOINTS | |
| # ----------------------------- | |
| async def list_users(current_user: dict = Depends(get_current_user)): | |
| if current_user["role"] != "admin": | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| users = load_users() | |
| return users | |
| async def admin_analytics(current_user: dict = Depends(get_current_user)): | |
| if current_user["role"] != "admin": | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| users = load_users() | |
| analytics = {u: users[u]["analytics"] for u in users} | |
| return analytics | |
| # ----------------------------- | |
| # HEALTH CHECK | |
| # ----------------------------- | |
| async def status(): | |
| return {"status": "online", "message": "Master God Tier Voice Cloner API is running!"} | |
| # ----------------------------- | |
| # CACHE MANAGEMENT | |
| # ----------------------------- | |
| async def clear_cache(): | |
| shutil.rmtree(UPLOAD_DIR, ignore_errors=True) | |
| shutil.rmtree(OUTPUT_DIR, ignore_errors=True) | |
| shutil.rmtree(PREPARED_DIR, ignore_errors=True) | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| os.makedirs(PREPARED_DIR, exist_ok=True) | |
| return {"status": "success", "message": "Uploads, outputs, and prepared audio cleared"} | |
| # ----------------------------- | |
| # RUN API | |
| # ----------------------------- | |
| if __name__ == "__main__": | |
| uvicorn.run("master_god_tier_api:app", host="0.0.0.0", port=8000, reload=True) | |