# master_god_tier_api.py import os import shutil import json import hashlib import secrets import datetime import torch import librosa import soundfile as sf import noisereduce as nr from pydub import AudioSegment from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends from fastapi.responses import FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import jwt, JWTError from typing import Dict, Optional import uvicorn import numpy as np from pathlib import Path from final_multi import main as clone_voice_pipeline # Your main function # ----------------------------- # CONFIGURATION # ----------------------------- BASE_DIR = Path(__file__).parent.resolve() UPLOAD_DIR = BASE_DIR / "uploads" OUTPUT_DIR = BASE_DIR / "outputs" PREPARED_DIR = BASE_DIR / "prepared" USER_DB_FILE = BASE_DIR / "users.json" SESSION_FILE = BASE_DIR / "sessions.json" SECRET_KEY = "supersecretjwtkey" # Change in production ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 1 day PACKAGE_PRICE_PKR = 7000 PACKAGE_DURATION_DAYS = 30 os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(PREPARED_DIR, exist_ok=True) # ----------------------------- # APP INIT # ----------------------------- app = FastAPI( title="Master God Tier Voice Cloner API", description="Production-ready API for ultra-realistic voice cloning with multi-user & subscription support", version="2.0.0" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # ----------------------------- # UTILS # ----------------------------- def load_users(): if USER_DB_FILE.exists(): with open(USER_DB_FILE, "r") as f: return json.load(f) return {} def save_users(users): with open(USER_DB_FILE, "w") as f: json.dump(users, f, indent=2) def hash_password(password: str): return hashlib.sha256(password.encode()).hexdigest() def create_access_token(data: dict, expires_minutes: int = ACCESS_TOKEN_EXPIRE_MINUTES): to_encode = data.copy() expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=expires_minutes) to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def verify_password(plain_password, hashed_password): return hash_password(plain_password) == hashed_password def get_current_user(token: str = Depends(oauth2_scheme)): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise HTTPException(status_code=401, detail="Invalid authentication credentials") users = load_users() if username not in users: raise HTTPException(status_code=401, detail="User not found") return users[username] except JWTError: raise HTTPException(status_code=401, detail="Invalid authentication credentials") def check_subscription(user: dict): sub_end = datetime.datetime.fromisoformat(user.get("subscription_end", "1970-01-01")) if sub_end < datetime.datetime.utcnow(): raise HTTPException(status_code=402, detail="Subscription expired") def preprocess_audio(input_file: Path): """Convert audio to 16kHz mono wav, noise reduction""" filename = input_file.stem wav_path = PREPARED_DIR / f"{filename}.wav" ext = input_file.suffix.lower().replace(".", "") if ext not in ["wav", "mp3", "m4a", "aac"]: raise HTTPException(status_code=400, detail="Unsupported audio format") audio = AudioSegment.from_file(input_file, format=ext) audio = audio.set_frame_rate(16000).set_channels(1) audio.export(wav_path, format="wav") y, sr = librosa.load(wav_path, sr=16000) y_reduced = nr.reduce_noise(y=y, sr=sr) sf.write(wav_path, y_reduced, sr) return wav_path def save_upload_file(upload_file: UploadFile, destination: Path): with open(destination, "wb") as buffer: shutil.copyfileobj(upload_file.file, buffer) return destination # ----------------------------- # USER ENDPOINTS # ----------------------------- @app.post("/register") async def register(username: str = Form(...), password: str = Form(...)): users = load_users() if username in users: raise HTTPException(status_code=400, detail="Username already exists") users[username] = { "username": username, "password": hash_password(password), "role": "user", "subscription_start": None, "subscription_end": None, "voice_samples": [], "analytics": [] } save_users(users) return {"success": True, "message": "User registered"} @app.post("/token") async def login(form_data: OAuth2PasswordRequestForm = Depends()): users = load_users() user = users.get(form_data.username) if not user or not verify_password(form_data.password, user["password"]): raise HTTPException(status_code=401, detail="Invalid credentials") token = create_access_token({"sub": user["username"]}) return {"access_token": token, "token_type": "bearer"} @app.get("/me") async def read_me(current_user: dict = Depends(get_current_user)): return current_user # ----------------------------- # PAYMENT SIMULATION # ----------------------------- @app.post("/subscribe") async def subscribe(current_user: dict = Depends(get_current_user)): # Simulate payment now = datetime.datetime.utcnow() sub_end = now + datetime.timedelta(days=PACKAGE_DURATION_DAYS) users = load_users() users[current_user["username"]]["subscription_start"] = now.isoformat() users[current_user["username"]]["subscription_end"] = sub_end.isoformat() save_users(users) return {"success": True, "message": f"Subscribed! Expires: {sub_end.isoformat()}"} # ----------------------------- # VOICE CLONING ENDPOINT # ----------------------------- @app.post("/clone_voice/") async def clone_voice( audio: UploadFile = File(...), text_file: UploadFile = File(...), gender: str = Form(...), language: str = Form(...), inference_mode: str = Form("natural"), current_user: dict = Depends(get_current_user) ): check_subscription(current_user) # Save uploaded files audio_path = UPLOAD_DIR / audio.filename text_path = UPLOAD_DIR / text_file.filename save_upload_file(audio, audio_path) save_upload_file(text_file, text_path) # Check if user has previously uploaded same sample sample_key = f"{audio.filename}_{current_user['username']}" if sample_key in current_user.get("voice_samples", []): processed_audio = PREPARED_DIR / audio_path.name else: processed_audio = preprocess_audio(audio_path) users = load_users() users[current_user["username"]]["voice_samples"].append(sample_key) save_users(users) # Read text content text_file.file.seek(0) text_content = text_file.file.read().decode("utf-8") # Output path timestamp = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S") output_filename = f"{current_user['username']}_{timestamp}.wav" output_path = OUTPUT_DIR / output_filename # Run pipeline try: clone_voice_pipeline( audio=str(processed_audio), text=text_content, gender=gender, language=language, output_path=str(output_path), inference_mode=inference_mode ) except Exception as e: raise HTTPException(status_code=500, detail=f"Pipeline failed: {e}") # Save analytics users = load_users() analytics_entry = { "timestamp": datetime.datetime.utcnow().isoformat(), "audio": str(audio.filename), "text_file": str(text_file.filename), "gender": gender, "language": language, "output_file": str(output_path) } users[current_user["username"]]["analytics"].append(analytics_entry) save_users(users) return FileResponse(output_path, media_type="audio/wav", filename=output_filename) # ----------------------------- # ADMIN ENDPOINTS # ----------------------------- @app.get("/admin/users") async def list_users(current_user: dict = Depends(get_current_user)): if current_user["role"] != "admin": raise HTTPException(status_code=403, detail="Not authorized") users = load_users() return users @app.get("/admin/analytics") async def admin_analytics(current_user: dict = Depends(get_current_user)): if current_user["role"] != "admin": raise HTTPException(status_code=403, detail="Not authorized") users = load_users() analytics = {u: users[u]["analytics"] for u in users} return analytics # ----------------------------- # HEALTH CHECK # ----------------------------- @app.get("/status") async def status(): return {"status": "online", "message": "Master God Tier Voice Cloner API is running!"} # ----------------------------- # CACHE MANAGEMENT # ----------------------------- @app.post("/clear_cache") async def clear_cache(): shutil.rmtree(UPLOAD_DIR, ignore_errors=True) shutil.rmtree(OUTPUT_DIR, ignore_errors=True) shutil.rmtree(PREPARED_DIR, ignore_errors=True) os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(PREPARED_DIR, exist_ok=True) return {"status": "success", "message": "Uploads, outputs, and prepared audio cleared"} # ----------------------------- # RUN API # ----------------------------- if __name__ == "__main__": uvicorn.run("master_god_tier_api:app", host="0.0.0.0", port=8000, reload=True)