Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File, Depends, HTTPException, Header, Form | |
| import os | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| from auth import verify_api_key | |
| from audio_processor import process_audio | |
| from real_detector import analyze_audio_real as analyze_audio | |
| from murf_generator import generate_audio_with_murf | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| app = FastAPI() | |
| app.mount("/static", StaticFiles(directory=os.path.join(os.getcwd(), "static")), name="static") | |
| class AnalysisResponse(BaseModel): | |
| status: str | |
| classification: Optional[str] = None | |
| confidence: Optional[float] = None | |
| analysis: dict | |
| metadata: dict | |
| import base64 | |
| import uuid | |
| class DetectRequest(BaseModel): | |
| # Support for the Hackathon Tester format | |
| language: Optional[str] = None | |
| audio_format: Optional[str] = None | |
| audio_base64_format: Optional[str] = None # Guessing key based on UI label | |
| audio_base64: Optional[str] = None # Common alternative | |
| # Support for our internal tools | |
| message: Optional[str] = None | |
| audio_url: Optional[str] = None | |
| async def detect_voice( | |
| request: Optional[DetectRequest] = None, | |
| file: UploadFile = File(None), | |
| message: str = Form(None), | |
| audio_url: str = Form(None), | |
| x_api_key: str = Depends(verify_api_key) | |
| ): | |
| UPLOAD_DIR = os.path.join(os.getcwd(), "uploads") | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| # Check if we received a JSON body (request is not None) | |
| if request: | |
| # Handle Base64 input | |
| b64_data = request.audio_base64_format or request.audio_base64 | |
| if b64_data: | |
| try: | |
| # Handle potential header separation (data:audio/mp3;base64,...) | |
| if "," in b64_data: | |
| b64_data = b64_data.split(",")[1] | |
| decoded_data = base64.b64decode(b64_data) | |
| # Create a temporary file | |
| filename = f"upload_{uuid.uuid4()}.{request.audio_format or 'mp3'}" | |
| file_location = os.path.join(UPLOAD_DIR, filename) | |
| with open(file_location, "wb") as f: | |
| f.write(decoded_data) | |
| # Process | |
| file_size = os.path.getsize(file_location) | |
| metadata = { | |
| "filename": filename, | |
| "size_bytes": file_size, | |
| "format": filename.split(".")[-1], | |
| "duration_seconds": 0.0 | |
| } | |
| return { | |
| "status": "success", | |
| "analysis": analyze_audio(metadata), | |
| "metadata": metadata | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Invalid Base64 audio: {str(e)}") | |
| # Handle audio_url in JSON | |
| if request.audio_url: | |
| metadata = await process_audio(audio_url=request.audio_url) | |
| return { | |
| "status": "success", | |
| "analysis": analyze_audio(metadata), | |
| "metadata": metadata | |
| } | |
| # Handle Form Data (File or URL) | |
| # validate file type if file is uploaded | |
| if file: | |
| if not file.filename.endswith(('.mp3', '.wav', '.m4a')): | |
| raise HTTPException(status_code=400, detail="Invalid file format. Only audio files allowed.") | |
| if not file and not audio_url: | |
| raise HTTPException(status_code=400, detail="Either file, audio_url, or base64 audio is required.") | |
| # Process Audio (get metadata) | |
| metadata = await process_audio(file, audio_url) | |
| # Run Detection | |
| # Run Detection | |
| result = analyze_audio(metadata) | |
| return { | |
| "status": "success", | |
| "classification": result.get("classification"), | |
| "confidence": result.get("confidence"), | |
| "analysis": result, | |
| "metadata": metadata | |
| } | |
| class GenerateRequest(BaseModel): | |
| text: str | |
| murf_api_key: str | |
| voice_id: Optional[str] = "en-US-marcus" | |
| def generate_speech(request: GenerateRequest): | |
| audio_url = generate_audio_with_murf(request.text, request.murf_api_key, request.voice_id) | |
| return {"status": "success", "audio_url": audio_url} | |
| import os | |
| def read_root(): | |
| file_path = os.path.join(os.getcwd(), 'static', 'tester.html') | |
| if not os.path.exists(file_path): | |
| return {"error": "File not found", "path": file_path, "cwd": os.getcwd()} | |
| return FileResponse(file_path) | |