from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Header, Form, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from datetime import datetime import os from dotenv import load_dotenv from typing import Optional, List import uuid import tempfile from transformers import pipeline from sqlalchemy.orm import Session import asyncio from app.settings import ( ADMIN_USERNAME, ADMIN_PASSWORD, WHISPER_MODEL, SUMMARIZER_MODEL, SENTIMENT_MODEL ) from app.models import Customer as CustomerModel, CallRecord as CallRecordModel from app.schemas import Customer, CustomerCreate, CallRecord from app.auth import verify_api_key, get_current_admin, create_access_token, router as auth_router from app.database import get_db from sqlalchemy.orm import sessionmaker from fastapi.security import OAuth2PasswordRequestForm # Load environment variables load_dotenv() # Initialize FastAPI app app = FastAPI( title="vBot - Voice Call Analysis API", description=""" vBot is an AI-powered call analysis tool for Asterisk-based PBX systems. ## Features * Voice call transcription using Whisper * Call summary generation * Sentiment analysis * Multi-tenant support with isolated customer databases ## Authentication * Admin access requires username/password * Customer access requires API key ## API Endpoints ### Call Processing * POST /api/v1/process-call - Process a voice call recording - Requires: WAV file, caller number, called number - Returns: Transcription, summary, sentiment analysis ### Call Management * GET /api/v1/calls - List all calls for a customer - Optional filters: start_date, end_date - Returns: List of call records * GET /api/v1/calls/{call_id} - Get specific call details - Returns: Detailed call record * GET /api/v1/calls/search - Search calls with custom query - Returns: Matching call records ### Admin Operations * POST /api/v1/customers/ - Create new customer - Requires: name, company_name, email - Returns: Customer details with API key * GET /api/v1/customers - List all customers - Returns: List of customer records * GET /api/v1/customers/{customer_id} - Get customer details - Returns: Customer record * DELETE /api/v1/customers/{customer_id} - Delete customer - Returns: Success message ### System * GET /api/v1/health - Health check endpoint - Returns: System status ## Request/Response Formats ### Process Call ```json POST /api/v1/process-call Headers: { "X-API-Key": "your-api-key" } Body: { "file": , "caller_number": "string", "called_number": "string" } Response: { "id": "uuid", "transcription": "string", "summary": "string", "sentiment": "string", "timestamp": "datetime" } ``` ### Create Customer ```json POST /api/v1/customers/ Headers: { "Authorization": "Bearer " } Body: { "name": "string", "company_name": "string", "email": "string" } Response: { "id": "integer", "name": "string", "company_name": "string", "email": "string", "api_key": "string", "is_active": "boolean", "created_at": "datetime", "updated_at": "datetime" } """, version="1.0.0" ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Mount static files app.mount("/static", StaticFiles(directory="app/static"), name="static") # Initialize Hugging Face models transcriber = pipeline("automatic-speech-recognition", model=WHISPER_MODEL) summarizer = pipeline("summarization", model=SUMMARIZER_MODEL) sentiment_analyzer = pipeline("sentiment-analysis", model=SENTIMENT_MODEL) # Include auth router app.include_router(auth_router, prefix="/api/v1", tags=["auth"]) @app.get("/") async def root(): """Root endpoint providing API information""" return FileResponse("app/static/admin.html") async def get_customer_by_api_key(api_key: str = Header(...), db: Session = Depends(get_db)): """Get customer by API key""" customer = db.query(CustomerModel).filter( CustomerModel.api_key == api_key, CustomerModel.is_active == True ).first() if not customer: raise HTTPException( status_code=401, detail="Invalid or inactive API key" ) return customer @app.post("/api/v1/process-call") async def process_call( file: UploadFile = File(...), caller_number: str = Form(...), called_number: str = Form(...), customer: CustomerModel = Depends(verify_api_key), db: Session = Depends(get_db) ): """ Process a voice call recording file and return transcription, summary, and sentiment analysis. The customer can then store this data in their own database. """ try: # Generate unique ID for this processing request process_id = str(uuid.uuid4()) # Save the uploaded file temporarily with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_file: content = await file.read() temp_file.write(content) temp_file_path = temp_file.name try: # Process audio using Hugging Face models transcription = transcriber(temp_file_path)["text"] summary = summarizer(transcription, max_length=130, min_length=30, do_sample=False)[0]["summary_text"] sentiment = sentiment_analyzer(transcription)[0]["label"] # Return results return { "id": process_id, "caller_number": caller_number, "called_number": called_number, "transcription": transcription, "summary": summary, "sentiment": sentiment, "timestamp": datetime.utcnow().isoformat() + "Z" } finally: # Clean up temporary file os.unlink(temp_file_path) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/v1/calls/{customer_id}") async def list_calls( customer_id: int, db: Session = Depends(get_db), current_admin: str = Depends(get_current_admin) ): """ List all calls for a specific customer. """ customer = db.query(Customer).filter(Customer.id == customer_id).first() if not customer: raise HTTPException(status_code=404, detail="Customer not found") customer_engine = customer.get_db_engine() if not customer_engine: raise HTTPException( status_code=400, detail="Customer database not configured" ) # Create a session for the customer's database CustomerSession = sessionmaker(autocommit=False, autoflush=False, bind=customer_engine) customer_db = CustomerSession() try: calls = customer_db.query(CallRecord).filter( CallRecord.customer_id == customer_id ).all() return calls finally: customer_db.close() @app.post("/api/v1/token") async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): """Admin login endpoint""" if form_data.username != ADMIN_USERNAME or form_data.password != ADMIN_PASSWORD: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token = create_access_token(data={"sub": form_data.username}) return {"access_token": access_token, "token_type": "bearer"} @app.post("/api/v1/customers/", response_model=Customer) async def create_customer( customer: CustomerCreate, db: Session = Depends(get_db), current_admin: str = Depends(get_current_admin) ): """Create new customer""" try: # Check if email already exists existing_customer = db.query(CustomerModel).filter(CustomerModel.email == customer.email).first() if existing_customer: raise HTTPException( status_code=400, detail="Email already registered" ) # Create new customer db_customer = CustomerModel( name=customer.name, company_name=customer.company_name, email=customer.email, api_key=str(uuid.uuid4()), is_active=True, created_at=datetime.utcnow(), updated_at=datetime.utcnow() ) db.add(db_customer) db.commit() db.refresh(db_customer) return Customer( id=db_customer.id, name=db_customer.name, company_name=db_customer.company_name, email=db_customer.email, api_key=db_customer.api_key, is_active=db_customer.is_active, created_at=db_customer.created_at, updated_at=db_customer.updated_at ) except HTTPException as he: raise he except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to create customer: {str(e)}" ) @app.get("/api/v1/calls", response_model=List[CallRecord]) async def get_calls( start_date: Optional[str] = None, end_date: Optional[str] = None, customer: Customer = Depends(verify_api_key) ): """Get customer's call records""" return customer.get_call_records(start_date, end_date) @app.get("/api/v1/calls/{call_id}", response_model=CallRecord) async def get_call( call_id: str, customer: Customer = Depends(verify_api_key) ): """Get specific call details""" return customer.get_call_details(call_id) @app.get("/api/v1/calls/search") async def search_calls( query: dict, customer: Customer = Depends(verify_api_key) ): """Search calls""" return customer.search_calls(query) @app.get("/api/v1/health") async def health_check(db: Session = Depends(get_db)): """Health check endpoint to verify database connection""" try: # Test database connection db.execute("SELECT 1") return { "status": "healthy", "database": "connected" } except Exception as e: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"Database connection error: {str(e)}" ) if __name__ == "__main__": import uvicorn host = os.getenv("HOST", "0.0.0.0") port = int(os.getenv("PORT", 7860)) uvicorn.run("main:app", host=host, port=port, reload=True)