Naumanellahi's picture
Upload 7 files
32854ed verified
# master_god_tier_api.py
import os
import shutil
import json
import hashlib
import secrets
import datetime
import torch
import librosa
import soundfile as sf
import noisereduce as nr
from pydub import AudioSegment
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt, JWTError
from typing import Dict, Optional
import uvicorn
import numpy as np
from pathlib import Path
from final_multi import main as clone_voice_pipeline # Your main function
# -----------------------------
# CONFIGURATION
# -----------------------------
BASE_DIR = Path(__file__).parent.resolve()
UPLOAD_DIR = BASE_DIR / "uploads"
OUTPUT_DIR = BASE_DIR / "outputs"
PREPARED_DIR = BASE_DIR / "prepared"
USER_DB_FILE = BASE_DIR / "users.json"
SESSION_FILE = BASE_DIR / "sessions.json"
SECRET_KEY = "supersecretjwtkey" # Change in production
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 1 day
PACKAGE_PRICE_PKR = 7000
PACKAGE_DURATION_DAYS = 30
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(PREPARED_DIR, exist_ok=True)
# -----------------------------
# APP INIT
# -----------------------------
app = FastAPI(
title="Master God Tier Voice Cloner API",
description="Production-ready API for ultra-realistic voice cloning with multi-user & subscription support",
version="2.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# -----------------------------
# UTILS
# -----------------------------
def load_users():
if USER_DB_FILE.exists():
with open(USER_DB_FILE, "r") as f:
return json.load(f)
return {}
def save_users(users):
with open(USER_DB_FILE, "w") as f:
json.dump(users, f, indent=2)
def hash_password(password: str):
return hashlib.sha256(password.encode()).hexdigest()
def create_access_token(data: dict, expires_minutes: int = ACCESS_TOKEN_EXPIRE_MINUTES):
to_encode = data.copy()
expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=expires_minutes)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_password(plain_password, hashed_password):
return hash_password(plain_password) == hashed_password
def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
users = load_users()
if username not in users:
raise HTTPException(status_code=401, detail="User not found")
return users[username]
except JWTError:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
def check_subscription(user: dict):
sub_end = datetime.datetime.fromisoformat(user.get("subscription_end", "1970-01-01"))
if sub_end < datetime.datetime.utcnow():
raise HTTPException(status_code=402, detail="Subscription expired")
def preprocess_audio(input_file: Path):
"""Convert audio to 16kHz mono wav, noise reduction"""
filename = input_file.stem
wav_path = PREPARED_DIR / f"{filename}.wav"
ext = input_file.suffix.lower().replace(".", "")
if ext not in ["wav", "mp3", "m4a", "aac"]:
raise HTTPException(status_code=400, detail="Unsupported audio format")
audio = AudioSegment.from_file(input_file, format=ext)
audio = audio.set_frame_rate(16000).set_channels(1)
audio.export(wav_path, format="wav")
y, sr = librosa.load(wav_path, sr=16000)
y_reduced = nr.reduce_noise(y=y, sr=sr)
sf.write(wav_path, y_reduced, sr)
return wav_path
def save_upload_file(upload_file: UploadFile, destination: Path):
with open(destination, "wb") as buffer:
shutil.copyfileobj(upload_file.file, buffer)
return destination
# -----------------------------
# USER ENDPOINTS
# -----------------------------
@app.post("/register")
async def register(username: str = Form(...), password: str = Form(...)):
users = load_users()
if username in users:
raise HTTPException(status_code=400, detail="Username already exists")
users[username] = {
"username": username,
"password": hash_password(password),
"role": "user",
"subscription_start": None,
"subscription_end": None,
"voice_samples": [],
"analytics": []
}
save_users(users)
return {"success": True, "message": "User registered"}
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
users = load_users()
user = users.get(form_data.username)
if not user or not verify_password(form_data.password, user["password"]):
raise HTTPException(status_code=401, detail="Invalid credentials")
token = create_access_token({"sub": user["username"]})
return {"access_token": token, "token_type": "bearer"}
@app.get("/me")
async def read_me(current_user: dict = Depends(get_current_user)):
return current_user
# -----------------------------
# PAYMENT SIMULATION
# -----------------------------
@app.post("/subscribe")
async def subscribe(current_user: dict = Depends(get_current_user)):
# Simulate payment
now = datetime.datetime.utcnow()
sub_end = now + datetime.timedelta(days=PACKAGE_DURATION_DAYS)
users = load_users()
users[current_user["username"]]["subscription_start"] = now.isoformat()
users[current_user["username"]]["subscription_end"] = sub_end.isoformat()
save_users(users)
return {"success": True, "message": f"Subscribed! Expires: {sub_end.isoformat()}"}
# -----------------------------
# VOICE CLONING ENDPOINT
# -----------------------------
@app.post("/clone_voice/")
async def clone_voice(
audio: UploadFile = File(...),
text_file: UploadFile = File(...),
gender: str = Form(...),
language: str = Form(...),
inference_mode: str = Form("natural"),
current_user: dict = Depends(get_current_user)
):
check_subscription(current_user)
# Save uploaded files
audio_path = UPLOAD_DIR / audio.filename
text_path = UPLOAD_DIR / text_file.filename
save_upload_file(audio, audio_path)
save_upload_file(text_file, text_path)
# Check if user has previously uploaded same sample
sample_key = f"{audio.filename}_{current_user['username']}"
if sample_key in current_user.get("voice_samples", []):
processed_audio = PREPARED_DIR / audio_path.name
else:
processed_audio = preprocess_audio(audio_path)
users = load_users()
users[current_user["username"]]["voice_samples"].append(sample_key)
save_users(users)
# Read text content
text_file.file.seek(0)
text_content = text_file.file.read().decode("utf-8")
# Output path
timestamp = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")
output_filename = f"{current_user['username']}_{timestamp}.wav"
output_path = OUTPUT_DIR / output_filename
# Run pipeline
try:
clone_voice_pipeline(
audio=str(processed_audio),
text=text_content,
gender=gender,
language=language,
output_path=str(output_path),
inference_mode=inference_mode
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Pipeline failed: {e}")
# Save analytics
users = load_users()
analytics_entry = {
"timestamp": datetime.datetime.utcnow().isoformat(),
"audio": str(audio.filename),
"text_file": str(text_file.filename),
"gender": gender,
"language": language,
"output_file": str(output_path)
}
users[current_user["username"]]["analytics"].append(analytics_entry)
save_users(users)
return FileResponse(output_path, media_type="audio/wav", filename=output_filename)
# -----------------------------
# ADMIN ENDPOINTS
# -----------------------------
@app.get("/admin/users")
async def list_users(current_user: dict = Depends(get_current_user)):
if current_user["role"] != "admin":
raise HTTPException(status_code=403, detail="Not authorized")
users = load_users()
return users
@app.get("/admin/analytics")
async def admin_analytics(current_user: dict = Depends(get_current_user)):
if current_user["role"] != "admin":
raise HTTPException(status_code=403, detail="Not authorized")
users = load_users()
analytics = {u: users[u]["analytics"] for u in users}
return analytics
# -----------------------------
# HEALTH CHECK
# -----------------------------
@app.get("/status")
async def status():
return {"status": "online", "message": "Master God Tier Voice Cloner API is running!"}
# -----------------------------
# CACHE MANAGEMENT
# -----------------------------
@app.post("/clear_cache")
async def clear_cache():
shutil.rmtree(UPLOAD_DIR, ignore_errors=True)
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
shutil.rmtree(PREPARED_DIR, ignore_errors=True)
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(PREPARED_DIR, exist_ok=True)
return {"status": "success", "message": "Uploads, outputs, and prepared audio cleared"}
# -----------------------------
# RUN API
# -----------------------------
if __name__ == "__main__":
uvicorn.run("master_god_tier_api:app", host="0.0.0.0", port=8000, reload=True)