Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Header, Form, status | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| from datetime import datetime | |
| import os | |
| from dotenv import load_dotenv | |
| from typing import Optional, List | |
| import uuid | |
| import tempfile | |
| from transformers import pipeline | |
| from sqlalchemy.orm import Session | |
| import asyncio | |
| from app.settings import ( | |
| ADMIN_USERNAME, ADMIN_PASSWORD, WHISPER_MODEL, | |
| SUMMARIZER_MODEL, SENTIMENT_MODEL | |
| ) | |
| from app.models import Customer as CustomerModel, CallRecord as CallRecordModel | |
| from app.schemas import Customer, CustomerCreate, CallRecord | |
| from app.auth import verify_api_key, get_current_admin, create_access_token, router as auth_router | |
| from app.database import get_db | |
| from sqlalchemy.orm import sessionmaker | |
| from fastapi.security import OAuth2PasswordRequestForm | |
| # Load environment variables | |
| load_dotenv() | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="vBot - Voice Call Analysis API", | |
| description=""" | |
| vBot is an AI-powered call analysis tool for Asterisk-based PBX systems. | |
| ## Features | |
| * Voice call transcription using Whisper | |
| * Call summary generation | |
| * Sentiment analysis | |
| * Multi-tenant support with isolated customer databases | |
| ## Authentication | |
| * Admin access requires username/password | |
| * Customer access requires API key | |
| ## API Endpoints | |
| ### Call Processing | |
| * POST /api/v1/process-call | |
| - Process a voice call recording | |
| - Requires: WAV file, caller number, called number | |
| - Returns: Transcription, summary, sentiment analysis | |
| ### Call Management | |
| * GET /api/v1/calls | |
| - List all calls for a customer | |
| - Optional filters: start_date, end_date | |
| - Returns: List of call records | |
| * GET /api/v1/calls/{call_id} | |
| - Get specific call details | |
| - Returns: Detailed call record | |
| * GET /api/v1/calls/search | |
| - Search calls with custom query | |
| - Returns: Matching call records | |
| ### Admin Operations | |
| * POST /api/v1/customers/ | |
| - Create new customer | |
| - Requires: name, company_name, email | |
| - Returns: Customer details with API key | |
| * GET /api/v1/customers | |
| - List all customers | |
| - Returns: List of customer records | |
| * GET /api/v1/customers/{customer_id} | |
| - Get customer details | |
| - Returns: Customer record | |
| * DELETE /api/v1/customers/{customer_id} | |
| - Delete customer | |
| - Returns: Success message | |
| ### System | |
| * GET /api/v1/health | |
| - Health check endpoint | |
| - Returns: System status | |
| ## Request/Response Formats | |
| ### Process Call | |
| ```json | |
| POST /api/v1/process-call | |
| Headers: { | |
| "X-API-Key": "your-api-key" | |
| } | |
| Body: { | |
| "file": <wav_file>, | |
| "caller_number": "string", | |
| "called_number": "string" | |
| } | |
| Response: { | |
| "id": "uuid", | |
| "transcription": "string", | |
| "summary": "string", | |
| "sentiment": "string", | |
| "timestamp": "datetime" | |
| } | |
| ``` | |
| ### Create Customer | |
| ```json | |
| POST /api/v1/customers/ | |
| Headers: { | |
| "Authorization": "Bearer <admin_token>" | |
| } | |
| Body: { | |
| "name": "string", | |
| "company_name": "string", | |
| "email": "string" | |
| } | |
| Response: { | |
| "id": "integer", | |
| "name": "string", | |
| "company_name": "string", | |
| "email": "string", | |
| "api_key": "string", | |
| "is_active": "boolean", | |
| "created_at": "datetime", | |
| "updated_at": "datetime" | |
| } | |
| """, | |
| version="1.0.0" | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Mount static files | |
| app.mount("/static", StaticFiles(directory="app/static"), name="static") | |
| # Initialize Hugging Face models | |
| transcriber = pipeline("automatic-speech-recognition", model=WHISPER_MODEL) | |
| summarizer = pipeline("summarization", model=SUMMARIZER_MODEL) | |
| sentiment_analyzer = pipeline("sentiment-analysis", model=SENTIMENT_MODEL) | |
| # Include auth router | |
| app.include_router(auth_router, prefix="/api/v1", tags=["auth"]) | |
| async def root(): | |
| """Root endpoint providing API information""" | |
| return FileResponse("app/static/admin.html") | |
| async def get_customer_by_api_key(api_key: str = Header(...), db: Session = Depends(get_db)): | |
| """Get customer by API key""" | |
| customer = db.query(CustomerModel).filter( | |
| CustomerModel.api_key == api_key, | |
| CustomerModel.is_active == True | |
| ).first() | |
| if not customer: | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Invalid or inactive API key" | |
| ) | |
| return customer | |
| async def process_call( | |
| file: UploadFile = File(...), | |
| caller_number: str = Form(...), | |
| called_number: str = Form(...), | |
| customer: CustomerModel = Depends(verify_api_key), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Process a voice call recording file and return transcription, summary, and sentiment analysis. | |
| The customer can then store this data in their own database. | |
| """ | |
| try: | |
| # Generate unique ID for this processing request | |
| process_id = str(uuid.uuid4()) | |
| # Save the uploaded file temporarily | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_file: | |
| content = await file.read() | |
| temp_file.write(content) | |
| temp_file_path = temp_file.name | |
| try: | |
| # Process audio using Hugging Face models | |
| transcription = transcriber(temp_file_path)["text"] | |
| summary = summarizer(transcription, max_length=130, min_length=30, do_sample=False)[0]["summary_text"] | |
| sentiment = sentiment_analyzer(transcription)[0]["label"] | |
| # Return results | |
| return { | |
| "id": process_id, | |
| "caller_number": caller_number, | |
| "called_number": called_number, | |
| "transcription": transcription, | |
| "summary": summary, | |
| "sentiment": sentiment, | |
| "timestamp": datetime.utcnow().isoformat() + "Z" | |
| } | |
| finally: | |
| # Clean up temporary file | |
| os.unlink(temp_file_path) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def list_calls( | |
| customer_id: int, | |
| db: Session = Depends(get_db), | |
| current_admin: str = Depends(get_current_admin) | |
| ): | |
| """ | |
| List all calls for a specific customer. | |
| """ | |
| customer = db.query(Customer).filter(Customer.id == customer_id).first() | |
| if not customer: | |
| raise HTTPException(status_code=404, detail="Customer not found") | |
| customer_engine = customer.get_db_engine() | |
| if not customer_engine: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Customer database not configured" | |
| ) | |
| # Create a session for the customer's database | |
| CustomerSession = sessionmaker(autocommit=False, autoflush=False, bind=customer_engine) | |
| customer_db = CustomerSession() | |
| try: | |
| calls = customer_db.query(CallRecord).filter( | |
| CallRecord.customer_id == customer_id | |
| ).all() | |
| return calls | |
| finally: | |
| customer_db.close() | |
| async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): | |
| """Admin login endpoint""" | |
| if form_data.username != ADMIN_USERNAME or form_data.password != ADMIN_PASSWORD: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Incorrect username or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| access_token = create_access_token(data={"sub": form_data.username}) | |
| return {"access_token": access_token, "token_type": "bearer"} | |
| async def create_customer( | |
| customer: CustomerCreate, | |
| db: Session = Depends(get_db), | |
| current_admin: str = Depends(get_current_admin) | |
| ): | |
| """Create new customer""" | |
| try: | |
| # Check if email already exists | |
| existing_customer = db.query(CustomerModel).filter(CustomerModel.email == customer.email).first() | |
| if existing_customer: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Email already registered" | |
| ) | |
| # Create new customer | |
| db_customer = CustomerModel( | |
| name=customer.name, | |
| company_name=customer.company_name, | |
| email=customer.email, | |
| api_key=str(uuid.uuid4()), | |
| is_active=True, | |
| created_at=datetime.utcnow(), | |
| updated_at=datetime.utcnow() | |
| ) | |
| db.add(db_customer) | |
| db.commit() | |
| db.refresh(db_customer) | |
| return Customer( | |
| id=db_customer.id, | |
| name=db_customer.name, | |
| company_name=db_customer.company_name, | |
| email=db_customer.email, | |
| api_key=db_customer.api_key, | |
| is_active=db_customer.is_active, | |
| created_at=db_customer.created_at, | |
| updated_at=db_customer.updated_at | |
| ) | |
| except HTTPException as he: | |
| raise he | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to create customer: {str(e)}" | |
| ) | |
| async def get_calls( | |
| start_date: Optional[str] = None, | |
| end_date: Optional[str] = None, | |
| customer: Customer = Depends(verify_api_key) | |
| ): | |
| """Get customer's call records""" | |
| return customer.get_call_records(start_date, end_date) | |
| async def get_call( | |
| call_id: str, | |
| customer: Customer = Depends(verify_api_key) | |
| ): | |
| """Get specific call details""" | |
| return customer.get_call_details(call_id) | |
| async def search_calls( | |
| query: dict, | |
| customer: Customer = Depends(verify_api_key) | |
| ): | |
| """Search calls""" | |
| return customer.search_calls(query) | |
| async def health_check(db: Session = Depends(get_db)): | |
| """Health check endpoint to verify database connection""" | |
| try: | |
| # Test database connection | |
| db.execute("SELECT 1") | |
| return { | |
| "status": "healthy", | |
| "database": "connected" | |
| } | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail=f"Database connection error: {str(e)}" | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| host = os.getenv("HOST", "0.0.0.0") | |
| port = int(os.getenv("PORT", 7860)) | |
| uvicorn.run("main:app", host=host, port=port, reload=True) |