diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..1ee8eade80c0f70bb874d8a009516bc0d2b7fdc5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +venv/ +.env diff --git a/README.md b/README.md new file mode 100644 index 0000000000000000000000000000000000000000..e750ca91c414888f5913b27d5a4d8e4112d7ec58 --- /dev/null +++ b/README.md @@ -0,0 +1,389 @@ +# Voice RAG Bot - AI Customer Support System + +**Status**: โœ… **FULLY FUNCTIONAL** | Latest Update: May 30, 2026 + +## ๐Ÿ“‹ Quick Overview + +Voice RAG Bot is an intelligent AI customer support system that: +- ๐ŸŽค **Accepts voice input** via microphone or audio file upload +- ๐Ÿง  **Processes with LLM** (Groq) for intent detection and response generation +- ๐Ÿ“š **Retrieves relevant context** from knowledge base and customer history using vector search +- ๐Ÿ˜Š **Analyzes sentiment** to provide empathetic, sentiment-aware responses +- ๐Ÿ”Š **Generates speech output** via text-to-speech +- ๐Ÿ“Š **Orchestrates 9-node workflow** using LangGraph + +**Tech Stack**: Faster Whisper (STT) โ†’ LangGraph (9 nodes) โ†’ Groq LLM โ†’ Qdrant (Vector DB) โ†’ gTTS (TTS) + +--- + +## ๐Ÿš€ Quick Start (3 Steps) + +### Step 1: Prerequisites +- Docker Desktop running (for Qdrant) +- Python 3.11+ +- Git (optional) + +### Step 2: Start Qdrant (Vector Database) +```bash +docker run -p 6333:6333 qdrant/qdrant:latest +``` +Leave this running in background. โœ… System will auto-create collections. + +### Step 3: Start Voice RAG Bot +```bash +cd d:\Voice RAG Bot\voice-rag-bot + +# Activate virtual environment +.\venv\Scripts\Activate.ps1 + +# Run startup script (starts backend + Streamlit) +.\START_SYSTEM.ps1 +``` + +**Or start services manually:** + +Terminal 1 (Backend): +```bash +.\venv\Scripts\Activate.ps1 +python backend/main.py +# Runs on http://localhost:8000 +``` + +Terminal 2 (Frontend): +```bash +.\venv\Scripts\Activate.ps1 +streamlit run frontend/streamlit_app.py +# Opens http://localhost:8501 +``` + +--- + +## ๐Ÿ“– Usage Guide + +### Via Streamlit Frontend (Recommended) + +1. **Open Browser**: http://localhost:8501 +2. **Enter Customer ID**: Unique identifier for customer (enables history tracking) +3. **Choose Input Method**: + - **Option A**: Click ๐ŸŽค **Record** โ†’ Speak your message โ†’ **Process Audio** + - **Option B**: Upload audio file (MP3/WAV) + - **Option C**: Type message directly in text area +4. **View Results** (automatically displayed): + - ๐Ÿ“ Generated Response + - ๐ŸŽฏ Detected Intent (+ confidence) + - ๐Ÿ˜Š Sentiment Analysis (+ confidence) + - ๐Ÿท๏ธ Extracted Entities + - ๐Ÿ“š Knowledge Base context (if relevant) + - ๐Ÿ“œ Customer History (if relevant) + - ๐Ÿ”Š Audio playback of response + +### Via REST API (For Integration) + +**Process Audio:** +```bash +curl -X POST "http://localhost:8000/process-audio?customer_id=CUST_001" \ + -F "file=@voice_message.wav" +``` + +**Process Text:** +```bash +curl -X POST "http://localhost:8000/process-text" \ + -d "user_input=I want to return my laptop&customer_id=CUST_001" +``` + +**Health Check:** +```bash +curl http://localhost:8000/health +``` + +--- + +## ๐Ÿ“Š System Architecture + +``` +Input Layer + โ”œโ”€ ๐ŸŽค Audio Input (Streamlit st.audio_input) + โ””โ”€ ๐Ÿ“ Text Input (Streamlit text area) + โ†“ +Speech-to-Text + โ””โ”€ Faster Whisper (base model, CPU inference) + โ†“ +Orchestration Layer (LangGraph - 9 Nodes) + 1. sentiment_analysis (DistilBERT) + 2. entity_extraction (BERT-base-NER) + 3. intent_detection (Groq LLM) + 4. retrieval_router (Qdrant search) + 5. context_builder (Format prompt) + 6. response_generation (Groq LLM) + 7. validation (Hallucination checks) + 8. memory_persistence (Qdrant upsert) + 9. tts_generation (gTTS) + โ†“ +Output Layer + โ”œโ”€ ๐Ÿ“ Text Response + โ”œโ”€ ๐Ÿ˜Š Sentiment-aware Tone + โ”œโ”€ ๐Ÿ”Š Audio File (MP3) + โ””โ”€ ๐ŸŽฏ Intent Classification +``` + +--- + +## ๐Ÿ”ง Configuration + +**Environment Variables** (`.env`): +``` +GROQ_API_KEY=your_groq_api_key_here +QDRANT_URL=http://localhost:6333 +BACKEND_URL=http://localhost:8000 +VECTOR_DIMENSION=1024 +EMBEDDING_MODEL=BAAI/bge-m3 +GROQ_MODEL=openai/gpt-oss-20b +KB_COLLECTION_NAME=knowledge_base +HISTORY_COLLECTION_NAME=customer_history +WHISPER_MODEL=base +``` + +--- + +## ๐Ÿ“ Sample Data + +Load sample data (4 KB documents + 4 customer history records): +```bash +.\venv\Scripts\Activate.ps1 +python data/load_sample_data.py +``` + +**Included Data:** +- KB Documents: Return Policy, Shipping Info, Warranty Info, Account Management +- Customer History: 4 interactions (complaints, refunds, inquiries) + +--- + +## ๐Ÿงช Testing + +### Quick Verification +```bash +# Test complete pipeline (end-to-end) +.\venv\Scripts\Activate.ps1 +python tests/test_full_integration.py +``` + +**Expected Output**: โœ… FULL INTEGRATION TEST PASSED + +### Component Status +- โœ… All 9 nodes connected and working +- โœ… FastAPI endpoints operational +- โœ… Qdrant vector search functional +- โœ… LLM integration responding +- โœ… Audio processing working +- โœ… Sample data loadable + +--- + +## ๐ŸŽฏ Intent Types Supported + +| Intent | Example | Response | +|--------|---------|----------| +| `refund_request` | "I want to return this" | Empathetic, processing info | +| `order_status` | "Where's my order?" | Tracking info | +| `product_inquiry` | "Tell me about...?" | Product details | +| `billing_issue` | "My charge was wrong" | Empathetic, billing process | +| `warranty_claim` | "Product broke" | Warranty eligibility info | +| `account_management` | "Change my password" | Account instructions | +| `general_support` | "How do I...?" | General assistance | +| `complaint` | "This is unacceptable" | Empathetic, resolution steps | +| `other` | Misc questions | General help | + +--- + +## ๐Ÿ“Š Response Quality Factors + +1. **Sentiment Detection**: POSITIVE/NEGATIVE/NEUTRAL classification +2. **Confidence Scores**: 0-1 for both intent and sentiment +3. **Context Retrieval**: Up to 3 KB documents + customer history +4. **Tone Matching**: Empathetic for negative, professional for neutral, friendly for positive +5. **Hallucination Prevention**: Validation layer checks for accuracy + +--- + +## ๐Ÿ› Troubleshooting + +### Issue: "Backend Not Connected" +**Solution**: Ensure FastAPI backend is running +```bash +python backend/main.py +``` + +### Issue: "Qdrant Connection Error" +**Solution**: Start Qdrant Docker container +```bash +docker run -p 6333:6333 qdrant/qdrant:latest +``` + +### Issue: "Groq API Error" +**Solution**: Check GROQ_API_KEY in `.env` file +```bash +# Verify key is set +echo $env:GROQ_API_KEY +``` + +### Issue: "Audio Processing Timeout" +**Solution**: Processing may take 30-60 seconds for audio +- First run downloads models (Whisper, BGE-M3, DistilBERT) +- Subsequent runs are faster +- Ensure sufficient disk space (~5GB) + +### Issue: "Module Not Found" +**Solution**: Reinstall dependencies +```bash +.\venv\Scripts\Activate.ps1 +pip install -r requirements.txt +``` + +--- + +## ๐Ÿ“ Project Structure + +``` +d:\Voice RAG Bot\voice-rag-bot\ +โ”œโ”€โ”€ backend/ +โ”‚ โ”œโ”€โ”€ main.py FastAPI server +โ”‚ โ””โ”€โ”€ config.py Configuration +โ”œโ”€โ”€ frontend/ +โ”‚ โ””โ”€โ”€ streamlit_app.py Web UI +โ”œโ”€โ”€ orchestration/ +โ”‚ โ”œโ”€โ”€ langgraph_workflow.py 9-node workflow +โ”‚ โ”œโ”€โ”€ state.py State management +โ”‚ โ””โ”€โ”€ nodes/ Individual nodes +โ”‚ โ”œโ”€โ”€ sentiment_analysis.py +โ”‚ โ”œโ”€โ”€ entity_extraction.py +โ”‚ โ”œโ”€โ”€ intent_detection.py +โ”‚ โ”œโ”€โ”€ retrieval_router.py +โ”‚ โ”œโ”€โ”€ context_builder.py +โ”‚ โ”œโ”€โ”€ response_generation.py +โ”‚ โ”œโ”€โ”€ validation.py +โ”‚ โ”œโ”€โ”€ memory_persistence.py +โ”‚ โ””โ”€โ”€ tts_generation.py +โ”œโ”€โ”€ rag/ +โ”‚ โ”œโ”€โ”€ qdrant_manager.py Vector DB client +โ”‚ โ””โ”€โ”€ embedding_manager.py BGE-M3 embeddings +โ”œโ”€โ”€ data/ +โ”‚ โ”œโ”€โ”€ load_sample_data.py Sample data loader +โ”‚ โ””โ”€โ”€ audio_output/ Generated audio files +โ”œโ”€โ”€ tests/ +โ”‚ โ””โ”€โ”€ test_full_integration.py End-to-end test +โ”œโ”€โ”€ .env Configuration +โ”œโ”€โ”€ requirements.txt Dependencies +โ”œโ”€โ”€ START_SYSTEM.ps1 Quick start script +โ””โ”€โ”€ venv/ Python environment +``` + +--- + +## ๐Ÿ”„ Workflow Execution (Behind the Scenes) + +1. **sentiment_analysis**: Input โ†’ DistilBERT โ†’ POSITIVE/NEGATIVE/NEUTRAL +2. **entity_extraction**: Input โ†’ BERT-NER โ†’ Extract names, locations, etc. +3. **intent_detection**: Input โ†’ Groq LLM โ†’ 9-intent classification +4. **retrieval_router**: Intent โ†’ Qdrant search โ†’ 3 KB docs + customer history +5. **context_builder**: Format contexts โ†’ Unified prompt +6. **response_generation**: Prompt โ†’ Groq LLM โ†’ Response text +7. **validation**: Check hallucinations โ†’ Retry if needed +8. **memory_persistence**: Embed response โ†’ Upsert to Qdrant +9. **tts_generation**: Response text โ†’ gTTS โ†’ MP3 audio file + +--- + +## ๐Ÿ“Š Performance Metrics (Approximate) + +| Component | Time | Notes | +|-----------|------|-------| +| STT (Audio โ†’ Text) | 5-15s | Depends on audio length | +| Sentiment Analysis | 0.5s | DistilBERT inference | +| Entity Extraction | 0.5s | BERT-NER inference | +| Intent Detection | 1-2s | Groq API call | +| KB Search | 0.2s | Qdrant vector search | +| Response Generation | 2-5s | Groq streaming | +| Validation | 0.5s | Local checks | +| TTS Generation | 2-5s | gTTS processing | +| **Total End-to-End** | **12-35s** | First run slower (model loading) | + +--- + +## ๐Ÿ’ก Tips & Tricks + +### Faster Processing +- Use text input instead of audio (skips STT) +- System caches models after first run +- Keep audio messages under 30 seconds + +### Better Responses +- Use clear, grammatically correct input +- Provide context ("purchased last week" vs "bought before") +- Specify what you need (return, refund, replacement) + +### Debugging +- Check `backend/main.py` logs for errors +- View Qdrant collections: http://localhost:6333/api/swagger/index.html +- Monitor Streamlit server in terminal for issues + +--- + +## ๐Ÿš€ Next Steps + +1. **Load Sample Data**: `python data/load_sample_data.py` +2. **Test with Demo Scenarios**: Use Streamlit to test various intents +3. **Customize KB Documents**: Add your own documents to Qdrant +4. **Fine-tune Prompts**: Edit prompts in `prompts/` directory +5. **Production Deployment**: Add authentication, rate limiting, monitoring + +--- + +## ๐Ÿ“ž Support & References + +**Documentation Files:** +- `data/DATA_REQUIREMENTS.md` - Data schema documentation +- `.env` - Environment configuration + +**API Endpoints:** +- `POST /process-audio` - Audio input endpoint +- `POST /process-text` - Text input endpoint +- `GET /health` - Health check + +**Backend Logs:** +- Location: Console output when running `python backend/main.py` +- Check for errors, model loading, API calls + +--- + +## ๐Ÿ“ License & Attribution + +**Components**: +- **Groq LLM**: Free tier, gpt-oss-20b model +- **Faster Whisper**: OpenAI (MIT License) +- **LangGraph**: LangChain (Open Source) +- **Qdrant**: Open source vector database +- **BGE-M3**: BAAI embeddings model +- **DistilBERT**: Hugging Face transformers +- **gTTS**: Google Text-to-Speech + +--- + +## โœ… Verification Checklist + +Before considering system "ready for production": + +- [ ] Backend running on http://localhost:8000 +- [ ] Qdrant running on http://localhost:6333 +- [ ] Streamlit frontend accessible at http://localhost:8501 +- [ ] Sample data loaded (`python data/load_sample_data.py`) +- [ ] Integration test passing (`python tests/test_full_integration.py`) +- [ ] Audio input working (record or upload) +- [ ] All 9 nodes executing (check logs) +- [ ] Response generation working +- [ ] Audio playback working +- [ ] History tracking working (multiple messages same customer) + +--- + +**Built with โค๏ธ | Last Updated: May 30, 2026** diff --git a/START_SYSTEM.ps1 b/START_SYSTEM.ps1 new file mode 100644 index 0000000000000000000000000000000000000000..5f8fbbae29a3b0e603274c3f2b195ed97c1110fc --- /dev/null +++ b/START_SYSTEM.ps1 @@ -0,0 +1,64 @@ +# Voice RAG Bot - System Startup Script +# Starts FastAPI backend and Streamlit frontend + +Write-Host "==================================" +Write-Host "Voice RAG Bot - System Startup" +Write-Host "==================================" +Write-Host "" + +# Check if venv exists +if (-not (Test-Path "venv\Scripts\Activate.ps1")) { + Write-Host "ERROR: Virtual environment not found!" + Write-Host "Please run: python -m venv venv" + exit 1 +} + +# Activate venv +Write-Host "[1/3] Activating virtual environment..." +& .\venv\Scripts\Activate.ps1 + +# Check if Qdrant is running +Write-Host "[2/3] Checking Qdrant connection..." +try { + $response = Invoke-WebRequest -Uri "http://localhost:6333/health" -UseBasicParsing -TimeoutSec 2 + if ($response.StatusCode -eq 200) { + Write-Host "โœ… Qdrant is running on localhost:6333" + } +} catch { + Write-Host "โš ๏ธ WARNING: Cannot connect to Qdrant on localhost:6333" + Write-Host " Make sure Docker is running and Qdrant container is active" + Write-Host " Run: docker run -p 6333:6333 qdrant/qdrant:latest" +} + +Write-Host "" +Write-Host "[3/3] Starting services..." +Write-Host "" + +# Start backend in a separate process +Write-Host "Starting FastAPI backend on http://localhost:8000" +$backendProcess = Start-Process -NoNewWindow -FilePath "python" -ArgumentList "backend/main.py" -PassThru +Start-Sleep -Seconds 3 + +# Start Streamlit +Write-Host "Starting Streamlit frontend on http://localhost:8501" +Write-Host "" +Write-Host "==================================" +Write-Host "Services started successfully!" +Write-Host "==================================" +Write-Host "" +Write-Host "Frontend URL: http://localhost:8501" +Write-Host "Backend API: http://localhost:8000" +Write-Host "" +Write-Host "Backend PID: $($backendProcess.Id)" +Write-Host "" +Write-Host "To stop the backend, run: Stop-Process -Id $($backendProcess.Id)" +Write-Host "" + +# Start Streamlit +python -m streamlit run frontend/streamlit_app.py + +# Cleanup +Write-Host "" +Write-Host "Stopping backend..." +Stop-Process -Id $backendProcess.Id -Force +Write-Host "Shutdown complete." diff --git a/backend/__init__.py b/backend/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..b95b7c4f96be983ddee8e15d0a5c32fa8faf1893 --- /dev/null +++ b/backend/__init__.py @@ -0,0 +1 @@ +"""Voice RAG Bot Backend Package""" diff --git a/backend/__pycache__/__init__.cpython-311.pyc b/backend/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..48c658c98b0b664cd74c44c21b003b17fa89afa4 Binary files /dev/null and b/backend/__pycache__/__init__.cpython-311.pyc differ diff --git a/backend/__pycache__/config.cpython-311.pyc b/backend/__pycache__/config.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..51ee3dd14acfb7de27c2820355589eeec1bca44c Binary files /dev/null and b/backend/__pycache__/config.cpython-311.pyc differ diff --git a/backend/__pycache__/main.cpython-311.pyc b/backend/__pycache__/main.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8245e5d370dc19a2d682f7c500894bded471698f Binary files /dev/null and b/backend/__pycache__/main.cpython-311.pyc differ diff --git a/backend/__pycache__/voice_bot_controller.cpython-311.pyc b/backend/__pycache__/voice_bot_controller.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..bb3efdb539b8897ff4dca0a8db80fb0c01168461 Binary files /dev/null and b/backend/__pycache__/voice_bot_controller.cpython-311.pyc differ diff --git a/backend/config.py b/backend/config.py new file mode 100644 index 0000000000000000000000000000000000000000..a9793836ff1cddc90dfddb7e3e766d69a59e9ea7 --- /dev/null +++ b/backend/config.py @@ -0,0 +1,68 @@ +""" +Central Configuration Management using Pydantic Settings +Loads environment variables from .env file +""" + +from pydantic_settings import BaseSettings +from typing import Optional +from pathlib import Path + + +class Settings(BaseSettings): + """Application configuration loaded from environment variables""" + + # Groq LLM Configuration + groq_api_key: str + groq_model: str = "llama-3.3-70b-versatile" + groq_temperature: float = 0.7 + groq_max_tokens: int = 1024 + + # Qdrant Vector Database Configuration + qdrant_url: str = "http://localhost:6333" + qdrant_api_key: Optional[str] = None # Optional for local Docker setup + + # Embedding Model Configuration + embedding_model: str = "BAAI/bge-m3" + embedding_batch_size: int = 32 + + # Collection Names + kb_collection_name: str = "knowledge_base" + history_collection_name: str = "customer_history" + + # Vector Dimensions (BGE-M3 uses 1024 dimensions) + vector_dimension: int = 1024 + + # Model Configuration for NLP Tasks + sentiment_model: str = "distilbert-base-uncased-finetuned-sst-2-english" + + # Application Configuration + app_name: str = "Voice RAG Bot" + app_version: str = "1.0.0" + debug_mode: bool = False + + # Conversation Memory + max_conversation_history: int = 10 + summary_interval: int = 5 # Generate summary every 5 turns + + # Audio Configuration + sample_rate: int = 16000 # 16kHz for Whisper + audio_format: str = "wav" + + class Config: + """Pydantic config for reading from .env file""" + env_file = str(Path(__file__).parent.parent / ".env") + case_sensitive = False + extra = "ignore" # Ignore unknown fields from .env + + def __repr__(self) -> str: + """String representation (hides API keys)""" + return ( + f"Settings(" + f"groq_model={self.groq_model}, " + f"qdrant_url={self.qdrant_url}, " + f"embedding_model={self.embedding_model})" + ) + + +# Global settings instance +settings = Settings() diff --git a/backend/main.py b/backend/main.py new file mode 100644 index 0000000000000000000000000000000000000000..572547b808154fd6ffc0644e45e9dc9b47c60bba --- /dev/null +++ b/backend/main.py @@ -0,0 +1,241 @@ +""" +FastAPI Backend for Voice RAG Bot +Handles audio input, STT conversion, workflow orchestration, and response generation +""" + +import logging +import asyncio +import sys +from pathlib import Path +from typing import Optional +from io import BytesIO + +# Add project root to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from fastapi import FastAPI, UploadFile, File, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +import uvicorn + +# Import configuration +from backend.config import settings + +# Import workflow +from orchestration.langgraph_workflow import run_workflow +from orchestration.latency_tracker import get_tracker, reset_tracker + +# Import STT (Faster Whisper) +from faster_whisper import WhisperModel + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# ============================================================================ +# MODELS +# ============================================================================ + +class ProcessAudioResponse(BaseModel): + """Response model for audio processing""" + response_text: str + audio_path: Optional[str] + intent: dict + sentiment: dict + entities: Optional[dict] + kb_context: str + history_context: str + + +class HealthResponse(BaseModel): + """Health check response""" + status: str + llm_model: str + qdrant_url: str + whisper_model: str + + +# ============================================================================ +# FASTAPI APP INITIALIZATION +# ============================================================================ + +app = FastAPI( + title="Voice RAG Bot Backend", + description="AI-powered customer service bot with RAG and voice interface", + version="1.0.0" +) + +# Add CORS middleware for frontend communication +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# ============================================================================ +# GLOBAL STATE +# ============================================================================ + +whisper_model = WhisperModel("base", device="cpu", compute_type="int8") + +def extract_audio_content(audio_bytes: bytes) -> str: + try: + audio_file = BytesIO(audio_bytes) + segments, _ = whisper_model.transcribe(audio_file, language="en") + transcribed_text = " ".join([segment.text for segment in segments]) + + if not transcribed_text.strip(): + return "No speech detected" + + tracker = get_tracker() + tracker.start("whisper_stt") + tracker.end("whisper_stt") + return transcribed_text + + except Exception as e: + logger.error(f"STT Error: {str(e)}") + raise HTTPException(status_code=400, detail=f"STT failed: {str(e)}") + + +async def run_workflow_async(user_input: str, customer_id: str) -> dict: + try: + return await run_workflow(user_input, customer_id) + except Exception as e: + logger.error(f"Workflow Error: {str(e)}") + raise HTTPException(status_code=500, detail=f"Workflow failed: {str(e)}") + + +@app.get("/health", response_model=HealthResponse) +async def health_check(): + return { + "status": "healthy", + "llm_model": settings.groq_model, + "qdrant_url": settings.qdrant_url, + "whisper_model": "base" + } + + +@app.post("/process-audio", response_model=ProcessAudioResponse) +async def process_audio( + file: UploadFile = File(...), + customer_id: str = "DEFAULT_CUSTOMER" +): + try: + reset_tracker() + tracker = get_tracker() + tracker.start_total() + + audio_bytes = await file.read() + user_input = extract_audio_content(audio_bytes) + final_state = await run_workflow_async(user_input, customer_id) + + response = ProcessAudioResponse( + response_text=final_state.get("response", ""), + audio_path=final_state.get("final_audio_path"), + intent=final_state.get("intent", {}), + sentiment=final_state.get("sentiment", {}), + entities=final_state.get("entities"), + kb_context=final_state.get("kb_context", ""), + history_context=final_state.get("history_context", "") + ) + + return response + + except HTTPException: + raise + except Exception as e: + logger.error(f"Unexpected error: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}") + + +@app.post("/process-text") +async def process_text( + user_input: str, + customer_id: str = "DEFAULT_CUSTOMER" +): + try: + final_state = await run_workflow_async(user_input, customer_id) + + return ProcessAudioResponse( + response_text=final_state.get("response", ""), + audio_path=final_state.get("final_audio_path"), + intent=final_state.get("intent", {}), + sentiment=final_state.get("sentiment", {}), + entities=final_state.get("entities"), + kb_context=final_state.get("kb_context", ""), + history_context=final_state.get("history_context", "") + ) + except Exception as e: + logger.error(f"Error: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}") + + +@app.get("/") +async def root(): + return { + "name": "Voice RAG Bot Backend", + "version": "1.0.0", + "endpoints": { + "health": "GET /health", + "process_audio": "POST /process-audio (requires audio file)", + "process_text": "POST /process-text (requires text input)", + "voice_bot_start": "POST /voice-bot/start", + "voice_bot_message": "POST /voice-bot/message", + "voice_bot_end": "POST /voice-bot/end", + "docs": "GET /docs (Swagger UI)" + } + } + + +from backend.voice_bot_controller import get_voice_bot_controller + +@app.post("/voice-bot/start") +async def voice_bot_start(customer_id: str = "CUST_DEFAULT"): + try: + controller = get_voice_bot_controller() + return await controller.start_session(customer_id) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/voice-bot/message") +async def voice_bot_message(user_message: str): + try: + controller = get_voice_bot_controller() + return await controller.process_user_message(user_message) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/voice-bot/end") +async def voice_bot_end(): + try: + controller = get_voice_bot_controller() + return await controller.end_session() + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/voice-bot/history") +async def voice_bot_history(): + try: + controller = get_voice_bot_controller() + return {"history": controller.get_session_history()} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.on_event("startup") +async def startup_event(): + logger.info(f"Backend started - Config: {settings.groq_model}") + +@app.on_event("shutdown") +async def shutdown_event(): + logger.info("Backend shutdown") + +if __name__ == "__main__": + logger.info("Starting FastAPI server...") + uvicorn.run( + app, + host="0.0.0.0", + port=8000, + log_level="info" + ) diff --git a/backend/voice_bot_controller.py b/backend/voice_bot_controller.py new file mode 100644 index 0000000000000000000000000000000000000000..457b879633bd4481d85aea705d1dd7cbbaf53d3c --- /dev/null +++ b/backend/voice_bot_controller.py @@ -0,0 +1,134 @@ +"""Voice Bot Controller - Session management for conversations""" + +from typing import Dict, Any +from datetime import datetime +import asyncio +from rag.session_manager import get_session_manager +from rag.cache_manager import get_cache_manager +from rag.tts_generator import get_tts_generator +from orchestration.langgraph_workflow import run_workflow + + +class VoiceBotController: + def __init__(self): + self.session_mgr = get_session_manager() + self.cache_mgr = get_cache_manager() + self.tts_gen = get_tts_generator() + self.current_session = None + self.customer_id = None + self.conversation_history = [] + + async def start_session(self, customer_id: str) -> Dict[str, Any]: + self.customer_id = customer_id + self.current_session = self.session_mgr.create_session(customer_id) + self.conversation_history = [] + + greeting = "Hello! How can I help you today?" + audio_path = self.tts_gen.generate_greeting(customer_id) + + return { + "session_id": self.current_session, + "greeting": greeting, + "audio_path": audio_path, + "status": "listening" + } + + async def process_user_message(self, user_message: str) -> Dict[str, Any]: + if not self.current_session: + return {"error": "No active session"} + + self.session_mgr.add_message(self.current_session, "user", user_message) + + cached_response = self.cache_mgr.get(self.customer_id, user_message) + if cached_response: + response_text = cached_response.get("response_text", "") + intent = cached_response.get("intent", {}).get("intent", "") + sentiment = cached_response.get("sentiment", {}).get("label", "") + else: + try: + result = await run_workflow(user_message, self.customer_id) + response_text = result.get("response", "") + intent = result.get("intent", {}).get("intent", "") + sentiment = result.get("sentiment", {}).get("label", "") + self.cache_mgr.set(self.customer_id, user_message, result) + except Exception as e: + response_text = f"Error processing request: {str(e)}" + intent = "error" + sentiment = "NEGATIVE" + + self.session_mgr.add_message(self.current_session, "assistant", response_text, intent=intent, sentiment=sentiment) + + follow_up = self._generate_follow_up(intent, sentiment) + should_continue = self._should_continue(intent, sentiment) + audio_path = self.tts_gen.generate_audio(response_text, self.customer_id, self.current_session) + + return { + "response": response_text, + "intent": intent, + "sentiment": sentiment, + "follow_up": follow_up, + "audio_path": audio_path, + "status": "listening" if should_continue else "done", + "session_id": self.current_session + } + + def _generate_follow_up(self, intent: str, sentiment: str) -> str: + """Generate context-aware follow-up question""" + follow_ups = { + "refund_request": "Would you like assistance with starting a return?", + "product_inquiry": "Do you need more details about this product?", + "billing_issue": "Can I help you further with your billing concern?", + "warranty_claim": "Would you like to proceed with the warranty claim?", + "order_status": "Is there anything else about your order?", + "complaint": "How can I make this right for you?", + "general_support": "Is there anything else I can help you with?" + } + + # Choose follow-up based on intent + if intent in follow_ups: + return follow_ups[intent] + + # Default follow-ups based on sentiment + if sentiment == "NEGATIVE": + return "I apologize for the inconvenience. Is there anything else I can help resolve?" + elif sentiment == "POSITIVE": + return "Great! Is there anything else I can help you with today?" + else: + return "Is there anything else I can help you with?" + + def _should_continue(self, intent: str, sentiment: str) -> bool: + """Determine if conversation should continue""" + # Continue unless user explicitly ends or issue resolved + end_indicators = ["goodbye", "thanks", "that's it", "no thanks"] + + # For now, always continue unless error + return intent != "error" + + async def end_session(self) -> Dict[str, Any]: + if self.current_session: + self.session_mgr.close_session(self.current_session) + history = self.session_mgr.get_session_history(self.current_session) + return { + "session_id": self.current_session, + "status": "closed", + "message_count": len(history), + "farewell": "Thank you for contacting us. Goodbye!" + } + return {"error": "No active session"} + + def get_session_history(self) -> list: + if not self.current_session: + return [] + return self.session_mgr.get_session_history(self.current_session) + + +# Global controller instance +_voice_bot_controller = None + + +def get_voice_bot_controller() -> VoiceBotController: + """Get or create global voice bot controller""" + global _voice_bot_controller + if _voice_bot_controller is None: + _voice_bot_controller = VoiceBotController() + return _voice_bot_controller diff --git a/data/latency_results.json b/data/latency_results.json new file mode 100644 index 0000000000000000000000000000000000000000..64b1fa620ba2584c58a9ee8b0eebfc43a043b514 --- /dev/null +++ b/data/latency_results.json @@ -0,0 +1,142 @@ +[ + { + "timestamp": "2026-06-02T20:29:02.891174", + "total_time_ms": 38587.39, + "modules": { + "sentiment_analysis": 0.0, + "entity_extraction": 1627.82, + "intent_detection": 881.28, + "retrieval_router": 10918.91, + "context_builder": 0.0, + "response_generation": 1045.21, + "validation": 1.73, + "memory_persistence": 743.59, + "tts_generation": 23313.93, + "workflow_orchestration": 38587.39 + }, + "breakdown_percent": { + "sentiment_analysis": 0.0, + "entity_extraction": 2.1, + "intent_detection": 1.1, + "retrieval_router": 14.2, + "context_builder": 0.0, + "response_generation": 1.4, + "validation": 0.0, + "memory_persistence": 1.0, + "tts_generation": 30.2, + "workflow_orchestration": 50.0 + } + }, + { + "timestamp": "2026-06-02T20:32:27.270235", + "total_time_ms": 18292.14, + "modules": { + "sentiment_analysis": 0.0, + "entity_extraction": 1851.01, + "intent_detection": 879.92, + "retrieval_router": 11678.87, + "context_builder": 0.0, + "response_generation": 935.09, + "validation": 0.0, + "memory_persistence": 518.29, + "tts_generation": 2400.3, + "workflow_orchestration": 18292.14 + }, + "breakdown_percent": { + "sentiment_analysis": 0.0, + "entity_extraction": 5.1, + "intent_detection": 2.4, + "retrieval_router": 31.9, + "context_builder": 0.0, + "response_generation": 2.6, + "validation": 0.0, + "memory_persistence": 1.4, + "tts_generation": 6.6, + "workflow_orchestration": 50.0 + } + }, + { + "timestamp": "2026-06-02T20:33:09.830661", + "total_time_ms": 6769.27, + "modules": { + "sentiment_analysis": 0.0, + "entity_extraction": 489.84, + "intent_detection": 670.14, + "retrieval_router": 2088.21, + "context_builder": 0.0, + "response_generation": 850.0, + "validation": 0.0, + "memory_persistence": 602.15, + "tts_generation": 2051.77, + "workflow_orchestration": 6769.27 + }, + "breakdown_percent": { + "sentiment_analysis": 0.0, + "entity_extraction": 3.6, + "intent_detection": 5.0, + "retrieval_router": 15.4, + "context_builder": 0.0, + "response_generation": 6.3, + "validation": 0.0, + "memory_persistence": 4.5, + "tts_generation": 15.2, + "workflow_orchestration": 50.1 + } + }, + { + "timestamp": "2026-06-02T20:48:50.209913", + "total_time_ms": 7611.41, + "modules": { + "sentiment_analysis": 0.0, + "entity_extraction": 521.71, + "intent_detection": 869.67, + "retrieval_router": 1815.81, + "context_builder": 0.0, + "response_generation": 881.13, + "validation": 0.62, + "memory_persistence": 569.26, + "tts_generation": 2904.53, + "workflow_orchestration": 7611.41 + }, + "breakdown_percent": { + "sentiment_analysis": 0.0, + "entity_extraction": 3.4, + "intent_detection": 5.7, + "retrieval_router": 12.0, + "context_builder": 0.0, + "response_generation": 5.8, + "validation": 0.0, + "memory_persistence": 3.8, + "tts_generation": 19.1, + "workflow_orchestration": 50.2 + } + }, + { + "timestamp": "2026-06-02T20:50:03.048163", + "total_time_ms": 3904.49, + "modules": { + "sentiment_analysis": 0.0, + "entity_extraction": 451.61, + "intent_detection": 712.09, + "retrieval_router": 296.21, + "context_builder": 0.0, + "response_generation": 682.71, + "validation": 0.0, + "memory_persistence": 456.91, + "tts_generation": 1295.47, + "workflow_orchestration": 3904.49 + }, + "breakdown_percent": { + "sentiment_analysis": 0.0, + "entity_extraction": 5.8, + "intent_detection": 9.1, + "retrieval_router": 3.8, + "context_builder": 0.0, + "response_generation": 8.8, + "validation": 0.0, + "memory_persistence": 5.9, + "tts_generation": 16.6, + "workflow_orchestration": 50.1 + } + } +] \ No newline at end of file diff --git a/data/load_sample_data.py b/data/load_sample_data.py new file mode 100644 index 0000000000000000000000000000000000000000..a26b5eaf8a5a4755a30373f0de3df5fcefe61790 --- /dev/null +++ b/data/load_sample_data.py @@ -0,0 +1,201 @@ +""" +Step 13: Load Sample Data into Qdrant +Creates sample documents for knowledge base and customer history +""" +import sys +from pathlib import Path + +# Add project root to path +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +import asyncio +import json +import logging +from datetime import datetime + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +print("\n" + "="*80) +print("๐Ÿ“š LOADING SAMPLE DATA INTO QDRANT") +print("="*80) + +# ============================================================================ +# SAMPLE DATA +# ============================================================================ + +# Knowledge Base Documents (Company Policies, FAQs) +KB_DOCUMENTS = [ + { + "id": "kb_001", + "title": "Return Policy", + "content": """ + Return Policy: Customers can return unopened products within 30 days of purchase + for a full refund. Items must be in original condition with all packaging and accessories. + Refunds are processed within 5-7 business days. Shipping costs are non-refundable unless + the return is due to our error. For defective items, we offer replacements immediately. + """ + }, + { + "id": "kb_002", + "title": "Shipping Information", + "content": """ + Shipping Options: We offer standard shipping (5-7 days), express shipping (2-3 days), + and overnight shipping. Standard shipping is free for orders over $50. Tracking + information is provided via email. All orders are insured. We ship to most countries + worldwide. International orders may have customs delays. + """ + }, + { + "id": "kb_003", + "title": "Product Warranty", + "content": """ + Warranty Coverage: All electronics come with a 1-year manufacturer's warranty covering + defects in materials and workmanship. Warranty does not cover physical damage, water damage, + or normal wear. Warranty service is available through our support team or authorized + service centers. Extended warranty options are available for 2 or 3 years. + """ + }, + { + "id": "kb_004", + "title": "Account Management", + "content": """ + Account Features: Create an account to track orders, save preferences, and manage + payment methods. Password requirements: minimum 8 characters with upper/lowercase, + numbers, and symbols. Two-factor authentication available for security. Account + information can be updated anytime in settings. Contact support to delete account. + """ + } +] + +# Customer History Records (Previous Interactions) +CUSTOMER_HISTORY = [ + { + "customer_id": "CUST_001", + "interaction_type": "complaint", + "text": "Customer complained about slow shipping on previous order. Resolution: expedited reshipment provided." + }, + { + "customer_id": "CUST_001", + "interaction_type": "purchase", + "text": "Purchased laptop model XPS-15 on 2025-11-20. Status: delivered. Customer satisfied." + }, + { + "customer_id": "CUST_002", + "interaction_type": "inquiry", + "text": "Asked about warranty coverage for defective phone. Explained 1-year coverage policy. Customer satisfied." + }, + { + "customer_id": "CUST_002", + "interaction_type": "refund_request", + "text": "Requested refund for unopened tablet within 30-day window. Refund approved and processed." + } +] + +# ============================================================================ +# LOAD DATA +# ============================================================================ + +async def load_sample_data(): + """Load sample data into Qdrant""" + + try: + from rag.qdrant_manager import qdrant_manager + from rag.embedding_manager import embedding_manager + + print("\n[1] Initializing managers...") + print(f" โœ… Qdrant Manager: {qdrant_manager}") + print(f" โœ… Embedding Manager: {embedding_manager}") + + # ========== LOAD KNOWLEDGE BASE ========== + print("\n[2] Loading Knowledge Base documents...") + print(f" Documents to load: {len(KB_DOCUMENTS)}") + + for doc in KB_DOCUMENTS: + try: + # Create document object for Qdrant + text = f"Title: {doc['title']}\n\n{doc['content']}" + logger.info(f"Adding KB doc: {doc['id']}") + + # Add to knowledge base using qdrant_manager + qdrant_manager.add_to_kb( + documents=[{ + "id": doc['id'], + "text": text, + "title": doc['title'] + }] + ) + print(f" โœ… {doc['title']} (ID: {doc['id']})") + + except Exception as e: + print(f" โŒ Error loading {doc['id']}: {str(e)}") + + print(" โœ… Knowledge Base loaded") + + # ========== LOAD CUSTOMER HISTORY ========== + print("\n[3] Loading Customer History...") + print(f" Records to load: {len(CUSTOMER_HISTORY)}") + + for record in CUSTOMER_HISTORY: + try: + logger.info(f"Adding history for {record['customer_id']}") + + # Add to customer history using qdrant_manager + qdrant_manager.add_to_history( + customer_id=record['customer_id'], + text=record['text'], + interaction_type=record['interaction_type'] + ) + print(f" โœ… {record['customer_id']}: {record['interaction_type']} ({len(record['text'])} chars)") + + except Exception as e: + print(f" โŒ Error loading history for {record['customer_id']}: {str(e)}") + + print(" โœ… Customer History loaded") + + # ========== VERIFY DATA ========== + print("\n[4] Verifying loaded data...") + + try: + kb_info = qdrant_manager.get_collection_info("knowledge_base") + print(f" โœ… Knowledge Base:") + print(f" - Name: knowledge_base") + print(f" - Vector size: {kb_info.get('vector_size', 'N/A')}") + print(f" - Points count: {kb_info.get('points_count', 'N/A')}") + + hist_info = qdrant_manager.get_collection_info("customer_history") + print(f" โœ… Customer History:") + print(f" - Name: customer_history") + print(f" - Vector size: {hist_info.get('vector_size', 'N/A')}") + print(f" - Points count: {hist_info.get('points_count', 'N/A')}") + except Exception as e: + print(f" โš ๏ธ Could not verify: {str(e)}") + + print("\n" + "="*80) + print("โœ… SAMPLE DATA LOADED SUCCESSFULLY") + print("="*80) + print("\n๐Ÿ“Š Summary:") + print(f" โ€ข Knowledge Base: {len(KB_DOCUMENTS)} documents loaded") + print(f" โ€ข Customer History: {len(CUSTOMER_HISTORY)} records loaded") + print("\n๐ŸŽฏ Data is now available for:") + print(" โ€ข KB Search (retrieval_router node)") + print(" โ€ข Customer History Context (conditional retrieval)") + print(" โ€ข Personalized responses based on customer history") + print("="*80 + "\n") + + return True + + except Exception as e: + print(f"\nโŒ ERROR: {str(e)}") + import traceback + traceback.print_exc() + return False + + +if __name__ == "__main__": + print("\n๐Ÿš€ Starting sample data loader...\n") + + success = asyncio.run(load_sample_data()) + + sys.exit(0 if success else 1) diff --git a/data/sessions.db b/data/sessions.db new file mode 100644 index 0000000000000000000000000000000000000000..b0e9c279f70a6ddf2d0328c985bf19972941ddac Binary files /dev/null and b/data/sessions.db differ diff --git a/data/test_latency.json b/data/test_latency.json new file mode 100644 index 0000000000000000000000000000000000000000..dcd09748e3cb7c24c0d13f7bfbbc935a5a02d173 --- /dev/null +++ b/data/test_latency.json @@ -0,0 +1,22 @@ +[ + { + "timestamp": "2026-06-02T20:03:43.290341", + "total_time_ms": 51.15, + "modules": { + "test_module": 51.15 + }, + "breakdown_percent": { + "test_module": 100.0 + } + }, + { + "timestamp": "2026-06-02T20:04:07.232259", + "total_time_ms": 50.66, + "modules": { + "test_module": 50.66 + }, + "breakdown_percent": { + "test_module": 100.0 + } + } +] \ No newline at end of file diff --git a/frontend/__pycache__/streamlit_app.cpython-311.pyc b/frontend/__pycache__/streamlit_app.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..02b6cdd0589c74df131ecbb47058c1b4033ea9e1 Binary files /dev/null and b/frontend/__pycache__/streamlit_app.cpython-311.pyc differ diff --git a/frontend/streamlit_app.py b/frontend/streamlit_app.py new file mode 100644 index 0000000000000000000000000000000000000000..311d2bb447c3c726ceb0c2463dfe7dcc4e1db201 --- /dev/null +++ b/frontend/streamlit_app.py @@ -0,0 +1,739 @@ +""" +Streamlit Frontend - Voice RAG Bot +Interactive UI for audio input, processing, and response playback +""" + +import streamlit as st +import requests +import json +import os +import time +import base64 +from pathlib import Path +from datetime import datetime +from typing import Optional, Dict, Any + +# Page configuration +st.set_page_config( + page_title="Voice RAG Bot", + page_icon="๐Ÿค–", + layout="wide", + initial_sidebar_state="expanded" +) + +# Styling +st.markdown(""" + +""", unsafe_allow_html=True) + +# ============================================================================ +# CONFIGURATION +# ============================================================================ +BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000") +DATA_DIR = Path("data/audio_output") +DATA_DIR.mkdir(parents=True, exist_ok=True) + +# Session state initialization +if "customer_id" not in st.session_state: + st.session_state.customer_id = "CUST_001" +if "processing" not in st.session_state: + st.session_state.processing = False +if "last_response" not in st.session_state: + st.session_state.last_response = None +if "history" not in st.session_state: + st.session_state.history = [] +if "voice_bot_mode" not in st.session_state: + st.session_state.voice_bot_mode = False +if "voice_bot_session" not in st.session_state: + st.session_state.voice_bot_session = None +if "voice_bot_active" not in st.session_state: + st.session_state.voice_bot_active = False +if "voice_bot_messages" not in st.session_state: + st.session_state.voice_bot_messages = [] +if "pending_audio" not in st.session_state: + st.session_state.pending_audio = None +if "processing_audio" not in st.session_state: + st.session_state.processing_audio = False +if "last_processed_audio_id" not in st.session_state: + st.session_state.last_processed_audio_id = None + +# ============================================================================ +# UTILITY FUNCTIONS +# ============================================================================ + +def check_backend_health() -> bool: + """Check if FastAPI backend is running""" + try: + response = requests.get(f"{BACKEND_URL}/health", timeout=5) + return response.status_code == 200 + except requests.exceptions.ConnectionError: + return False + except requests.exceptions.Timeout: + return False + except Exception as e: + return False + + +def process_audio_file(audio_bytes: bytes, customer_id: str) -> Optional[Dict[str, Any]]: + """Send audio to backend for processing""" + try: + from io import BytesIO + + # Send audio bytes directly as file to backend + with st.spinner("Processing audio... (may take 30-60 seconds)"): + # Create file-like object from bytes + audio_file = BytesIO(audio_bytes) + audio_file.name = f"audio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.wav" + + files = {"file": (audio_file.name, audio_file, "audio/wav")} + response = requests.post( + f"{BACKEND_URL}/process-audio", + files=files, + params={"customer_id": customer_id}, + timeout=120 + ) + + if response.status_code == 200: + result = response.json() + return result + else: + st.error(f"Backend error: {response.status_code}") + st.error(response.text) + return None + + except requests.exceptions.Timeout: + st.error("Request timeout. Processing took too long.") + return None + except Exception as e: + st.error(f"Error processing audio: {str(e)}") + import traceback + st.error(traceback.format_exc()) + return None + + +def process_text_input(user_input: str, customer_id: str) -> Optional[Dict[str, Any]]: + """Send text to backend for processing""" + try: + with st.spinner("Processing text... (may take 20-30 seconds)"): + response = requests.post( + f"{BACKEND_URL}/process-text", + params={ + "user_input": user_input, + "customer_id": customer_id + }, + timeout=120 + ) + + if response.status_code == 200: + return response.json() + else: + st.error(f"Backend error: {response.status_code}") + st.error(response.text) + return None + + except requests.exceptions.Timeout: + st.error("Request timeout. Processing took too long.") + return None + except Exception as e: + st.error(f"Error processing text: {str(e)}") + return None + + + + +def voice_bot_start(customer_id: str) -> Optional[Dict[str, Any]]: + """Start voice bot session""" + try: + response = requests.post( + f"{BACKEND_URL}/voice-bot/start", + params={"customer_id": customer_id}, + timeout=60 + ) + + if response.status_code == 200: + return response.json() + else: + st.error(f"Error starting voice bot: {response.status_code}") + return None + except Exception as e: + st.error(f"Error starting voice bot: {str(e)}") + return None + + +def voice_bot_process_message(user_message: str) -> Optional[Dict[str, Any]]: + """Process message in voice bot session""" + try: + response = requests.post( + f"{BACKEND_URL}/voice-bot/message", + params={"user_message": user_message}, + timeout=120 + ) + + if response.status_code == 200: + return response.json() + else: + st.error(f"Backend error {response.status_code}: {response.text}") + return None + except Exception as e: + st.error(f"Backend connection error: {str(e)}") + return None + + +def voice_bot_end() -> Optional[Dict[str, Any]]: + """End voice bot session""" + try: + response = requests.post( + f"{BACKEND_URL}/voice-bot/end", + timeout=10 + ) + + if response.status_code == 200: + return response.json() + else: + return None + except Exception as e: + return None + + +def display_response_results(response: Dict[str, Any]): + """Display formatted response from backend""" + + # Display latency metrics first if available + latency_metrics = response.get("latency_metrics") + if latency_metrics: + st.markdown("### โฑ๏ธ Performance Metrics") + + total_time = latency_metrics.get("total_time_ms", 0) + modules = latency_metrics.get("modules", {}) + breakdown = latency_metrics.get("breakdown_percent", {}) + + # Display total time prominently + col1, col2, col3 = st.columns(3) + with col1: + st.metric("Total Processing Time", f"{total_time:.0f} ms", f"{total_time/1000:.2f}s") + with col2: + fastest = min(modules.items(), key=lambda x: x[1]) if modules else ("N/A", 0) + st.metric("Fastest Module", fastest[0].replace("_", " ").title(), f"{fastest[1]:.0f} ms") + with col3: + slowest = max(modules.items(), key=lambda x: x[1]) if modules else ("N/A", 0) + st.metric("Slowest Module", slowest[0].replace("_", " ").title(), f"{slowest[1]:.0f} ms") + + # Module breakdown with progress bars + with st.expander("๐Ÿ“Š Detailed Module Breakdown", expanded=True): + st.markdown("#### Time per Module") + + # Sort modules by time + sorted_modules = sorted(modules.items(), key=lambda x: x[1], reverse=True) + + for module_name, time_ms in sorted_modules: + percent = breakdown.get(module_name, 0) + display_name = module_name.replace("_", " ").title() + + col1, col2, col3 = st.columns([3, 1, 1]) + with col1: + st.write(f"**{display_name}**") + with col2: + st.write(f"{time_ms:.2f} ms") + with col3: + st.write(f"{percent:.1f}%") + + # Progress bar + st.progress(percent / 100) + + st.markdown("---") + + # Create tabs for different result sections + tabs = st.tabs([ + "๐Ÿ“ Response", + "๐ŸŽฏ Intent", + "๐Ÿ˜Š Sentiment", + "๐Ÿท๏ธ Entities", + "๐Ÿ“š Knowledge Base", + "๐Ÿ“œ History", + "๐Ÿ”Š Audio" + ]) + + # Tab 1: Main Response + with tabs[0]: + st.markdown("### Generated Response") + st.info(response.get("response_text", "No response generated")) + + # Save to history + st.session_state.history.append({ + "timestamp": datetime.now().isoformat(), + "customer_id": st.session_state.customer_id, + "response": response.get("response_text", ""), + "intent": response.get("intent", {}).get("intent", ""), + "sentiment": response.get("sentiment", {}).get("label", "") + }) + + # Tab 2: Intent Detection + with tabs[1]: + intent_data = response.get("intent", {}) + col1, col2 = st.columns(2) + + with col1: + st.metric("Detected Intent", intent_data.get("intent", "N/A")) + with col2: + confidence = intent_data.get("confidence", 0) + st.metric("Confidence", f"{confidence:.1%}") + + # Intent explanation + intent_types = { + "refund_request": "Customer wants to return/refund a product", + "order_status": "Customer inquiring about order tracking", + "product_inquiry": "Customer asking product details", + "billing_issue": "Customer has billing/payment problems", + "warranty_claim": "Customer filing warranty claim", + "account_management": "Account settings/updates", + "general_support": "General support request", + "complaint": "Customer complaint", + "other": "Other inquiry" + } + + intent = intent_data.get("intent", "") + if intent in intent_types: + st.write(f"**Category**: {intent_types[intent]}") + + # Tab 3: Sentiment Analysis + with tabs[2]: + sentiment_data = response.get("sentiment", {}) + label = sentiment_data.get("label", "NEUTRAL") + score = sentiment_data.get("score", 0) + + # Color-coded sentiment display + if label == "POSITIVE": + color = "๐ŸŸข" + tone = "Positive" + elif label == "NEGATIVE": + color = "๐Ÿ”ด" + tone = "Negative" + else: + color = "๐ŸŸก" + tone = "Neutral" + + col1, col2 = st.columns(2) + with col1: + st.metric("Sentiment", f"{color} {tone}") + with col2: + st.metric("Confidence", f"{score:.1%}") + + st.write(f"**Interpretation**: Response was generated with {tone.lower()}-{tone.lower()} tone") + + # Tab 4: Entities + with tabs[3]: + entities = response.get("entities", {}) + if entities: + for entity_type, values in entities.items(): + if values: + st.write(f"**{entity_type.upper()}**") + for entity in values: + st.write(f" โ€ข {entity}") + else: + st.info("No entities extracted from input") + + # Tab 5: Knowledge Base Context + with tabs[4]: + kb_context = response.get("kb_context", "") + if kb_context and isinstance(kb_context, str) and kb_context.strip() != "No relevant policies found.": + st.write("**Retrieved Documents:**") + st.write(kb_context) + else: + st.info("No KB documents retrieved") + + # Tab 6: Customer History + with tabs[5]: + history_context = response.get("history_context", "") + if history_context and isinstance(history_context, str) and history_context.strip() != "No customer history available.": + st.write("**Customer History:**") + st.write(history_context) + else: + st.info("No customer history found") + + # Tab 7: Audio Output + with tabs[6]: + audio_path = response.get("audio_path", "") + if audio_path and audio_path.strip(): + try: + # Normalize path + audio_file_path = Path(audio_path.replace("\\", "/")) + if not audio_file_path.is_absolute(): + project_root = Path(__file__).parent.parent + audio_file_path = project_root / audio_file_path + + if audio_file_path.exists(): + st.write(f"**Audio file**: {audio_path}") + with open(audio_file_path, "rb") as audio_file: + st.audio(audio_file, format="audio/mp3") + else: + st.warning(f"Audio file not found: {audio_file_path}") + except Exception as e: + st.error(f"Could not load audio file: {str(e)}") + else: + st.warning("No audio file generated") + + +# ============================================================================ +# MAIN UI LAYOUT +# ============================================================================ + +# Header +st.title("๐Ÿค– Voice RAG Bot") +st.markdown("AI Customer Support with Voice Recognition and Retrieval-Augmented Generation") + +# Sidebar +with st.sidebar: + st.header("โš™๏ธ Configuration") + + # Backend status with refresh + col1, col2 = st.columns([3, 1]) + with col1: + st.write("**Backend Status**") + with col2: + if st.button("๐Ÿ”„", help="Refresh status", key="refresh_health"): + st.rerun() + + backend_healthy = check_backend_health() + if backend_healthy: + st.success("โœ… Backend Connected") + st.caption(f"URL: {BACKEND_URL}") + else: + st.error("โŒ Backend Not Connected") + st.error(f"Cannot reach {BACKEND_URL}") + st.info("**To fix:**") + st.code("python -m uvicorn backend.main:app --reload --port 8000", language="bash") + st.info("**Or use startup script:**") + st.code(".\\START_SYSTEM.ps1", language="bash") + + # Customer ID input + st.subheader("Customer Information") + customer_id = st.text_input( + "Customer ID", + value=st.session_state.customer_id, + help="Unique identifier for customer (used for history)" + ) + st.session_state.customer_id = customer_id + + st.divider() + + # Model information + st.subheader("System Components") + st.write("**LLM**: Groq (gpt-oss-20b)") + st.write("**STT**: Faster Whisper (base)") + st.write("**Vector DB**: Qdrant") + st.write("**Embeddings**: BGE-M3 (1024-dim)") + st.write("**Sentiment**: DistilBERT") + st.write("**NER**: BERT-base-NER") + +# Main content +st.divider() + +# Voice Bot Mode Toggle +col1, col2, col3 = st.columns([1, 3, 1]) +with col1: + voice_bot_enabled = st.toggle("๐Ÿค– Voice Bot Mode", value=st.session_state.voice_bot_mode, key="voice_bot_toggle") + st.session_state.voice_bot_mode = voice_bot_enabled + +if voice_bot_enabled: + # Voice Bot Interface + st.markdown("### ๐ŸŽ™๏ธ Voice Bot Assistant") + + if not st.session_state.voice_bot_active: + # Start button + col1, col2, col3 = st.columns([1, 2, 1]) + with col2: + if st.button("๐ŸŽ™๏ธ Start Conversation", use_container_width=True, key="start_voice_bot"): + with st.spinner("Starting voice bot..."): + result = voice_bot_start(st.session_state.customer_id) + if result: + st.session_state.voice_bot_session = result.get("session_id") + st.session_state.voice_bot_active = True + greeting_audio = result.get("audio_path", "") + st.session_state.voice_bot_messages = [ + { + "role": "assistant", + "content": result.get("greeting"), + "audio_path": greeting_audio + } + ] + st.rerun() + + else: + # Conversation display + st.markdown("#### Conversation") + + # Display conversation history + for msg in st.session_state.voice_bot_messages: + if msg["role"] == "assistant": + with st.chat_message("assistant", avatar="๐Ÿค–"): + st.write(msg["content"]) + # Play audio if available + audio_path = msg.get("audio_path", "") + if audio_path and audio_path.strip(): + try: + # Normalize path and check in project root + audio_file_path = Path(audio_path.replace("\\", "/")) + if not audio_file_path.is_absolute(): + project_root = Path(__file__).parent.parent + audio_file_path = project_root / audio_file_path + + if audio_file_path.exists(): + with open(audio_file_path, "rb") as audio_file: + audio_bytes = audio_file.read() + audio_b64 = base64.b64encode(audio_bytes).decode() + st.markdown(f""" + + """, unsafe_allow_html=True) + else: + st.caption(f"โš ๏ธ Audio file not found: {audio_file_path}") + except Exception as e: + st.caption(f"โš ๏ธ Error loading audio: {str(e)}") + else: + with st.chat_message("user", avatar="๐Ÿ‘ค"): + st.write(msg["content"]) + + # Voice conversation section + st.markdown("---") + st.markdown("#### ๐ŸŽค Record your message:") + + # Voice input - Store audio in session state + audio_bytes = st.audio_input( + "Record your message", + label_visibility="collapsed", + key="voice_bot_audio_input" + ) + + # If new audio recorded, store it with unique ID + if audio_bytes: + audio_id = id(audio_bytes) + if audio_id != st.session_state.last_processed_audio_id: + st.session_state.pending_audio = audio_bytes + st.session_state.last_processed_audio_id = audio_id + st.session_state.processing_audio = True + + # Process pending audio (happens on next render after audio is saved) + if st.session_state.pending_audio and st.session_state.processing_audio: + # Immediately mark as processing to prevent duplicate processing + st.session_state.processing_audio = False + + st.info("๐ŸŽค Processing audio...") + try: + from io import BytesIO + from faster_whisper import WhisperModel + + # Convert UploadedFile to bytes if needed + audio_data = st.session_state.pending_audio + if hasattr(audio_data, 'read'): + audio_data = audio_data.read() + + st.info("Loading Whisper model...") + @st.cache_resource + def load_whisper(): + return WhisperModel("base", device="cpu", compute_type="int8") + + whisper = load_whisper() + st.success("โœ… Whisper model loaded") + + st.info("Transcribing audio...") + audio_file = BytesIO(audio_data) + segments, info = whisper.transcribe(audio_file, language="en") + transcribed_text = " ".join([segment.text for segment in segments]) + + if transcribed_text.strip(): + st.success(f"โœ… Transcribed: {transcribed_text}") + + # Add user message + st.session_state.voice_bot_messages.append({ + "role": "user", + "content": f"๐ŸŽค {transcribed_text}" + }) + + st.info("๐Ÿค– Sending to bot...") + result = voice_bot_process_message(transcribed_text) + + if result: + response = result.get("response", "") + audio_path = result.get("audio_path", "") + + if response: + st.success("โœ… Bot responded") + # Add ONLY ONE bot response + st.session_state.voice_bot_messages.append({ + "role": "assistant", + "content": response, + "audio_path": audio_path + }) + + # Clear pending audio immediately + st.session_state.pending_audio = None + st.session_state.processing_audio = False + else: + st.error("โŒ Bot response is empty") + st.session_state.pending_audio = None + st.session_state.processing_audio = False + else: + st.error("โŒ Backend returned None") + st.session_state.pending_audio = None + st.session_state.processing_audio = False + else: + st.warning("โš ๏ธ No speech detected in audio") + st.session_state.pending_audio = None + st.session_state.processing_audio = False + + except Exception as e: + st.error(f"โŒ Error: {str(e)}") + st.session_state.pending_audio = None + st.session_state.processing_audio = False + import traceback + st.write(traceback.format_exc()) + + # End conversation button + st.markdown("---") + if st.button("๐Ÿ›‘ End Conversation", use_container_width=True, key="end_voice_bot"): + with st.spinner("Ending session..."): + result = voice_bot_end() + st.session_state.voice_bot_active = False + st.session_state.voice_bot_messages = [] + st.success("โœ… Session ended. Thank you!") + st.rerun() + +else: + # Regular Input Tabs + st.markdown("### ๐Ÿ’ฌ Manual Input Mode") + + # Tabs for input methods + input_tab1, input_tab2 = st.tabs(["๐ŸŽค Audio Input", "๐Ÿ“ Text Input"]) + + with input_tab1: + st.subheader("Upload or Record Audio") + + col1, col2 = st.columns(2) + + with col1: + st.write("**Option 1: Record Audio**") + audio_data = st.audio_input( + "Record your message", + label_visibility="collapsed", + key="audio_input" + ) + + if audio_data: + st.success("Audio recorded successfully!") + if st.button("๐Ÿ”„ Process Audio", key="process_audio_btn"): + response = process_audio_file(audio_data.getvalue(), st.session_state.customer_id) + if response: + st.session_state.last_response = response + st.success("โœ… Processing complete!") + st.rerun() + + with col2: + st.write("**Option 2: Upload Audio File**") + uploaded_file = st.file_uploader( + "Upload an MP3 or WAV file", + type=["mp3", "wav"], + label_visibility="collapsed" + ) + + if uploaded_file: + st.success(f"File uploaded: {uploaded_file.name}") + if st.button("๐Ÿ”„ Process Uploaded Audio", key="process_uploaded_btn"): + response = process_audio_file(uploaded_file.getvalue(), st.session_state.customer_id) + if response: + st.session_state.last_response = response + st.success("โœ… Processing complete!") + st.rerun() + + with input_tab2: + st.subheader("Enter Text Directly") + + # Text area for input + user_input = st.text_area( + "Enter your message", + placeholder="E.g., 'I want to return my defective laptop purchased last week'", + height=100, + label_visibility="collapsed" + ) + + if user_input: + col1, col2, col3 = st.columns([1, 1, 2]) + + with col1: + if st.button("๐Ÿš€ Process Text", use_container_width=True): + response = process_text_input(user_input, st.session_state.customer_id) + if response: + st.session_state.last_response = response + st.success("โœ… Processing complete!") + st.rerun() + + with col2: + if st.button("๐Ÿ”„ Clear", use_container_width=True): + st.rerun() + + with col3: + st.caption("โ„น๏ธ Processing may take 20-30 seconds") + +# Display last response if available +st.divider() + +if st.session_state.last_response: + st.subheader("๐Ÿ“Š Latest Results") + display_response_results(st.session_state.last_response) + +# Conversation history +st.divider() + +with st.expander("๐Ÿ“œ Conversation History"): + if st.session_state.history: + for i, record in enumerate(st.session_state.history, 1): + with st.container(border=True): + col1, col2, col3, col4 = st.columns(4) + with col1: + st.caption(f"Time: {record['timestamp'][:16]}") + with col2: + st.caption(f"Customer: {record['customer_id']}") + with col3: + st.caption(f"Intent: {record['intent']}") + with col4: + st.caption(f"Sentiment: {record['sentiment']}") + st.write(record['response'][:150] + "..." if len(record['response']) > 150 else record['response']) + else: + st.info("No conversation history yet") + +# Footer +st.divider() +st.markdown(""" +--- +**Voice RAG Bot** | Powered by Groq LLM, Qdrant Vector DB, and LangGraph Orchestration +For technical support, refer to the backend logs at `backend/main.py` +""") diff --git a/orchestration/__init__.py b/orchestration/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..309ffdb25f2235b4419c145c1b917ff348b5bde1 --- /dev/null +++ b/orchestration/__init__.py @@ -0,0 +1 @@ +"""Voice RAG Bot Orchestration Package""" diff --git a/orchestration/__pycache__/__init__.cpython-311.pyc b/orchestration/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..a38ec287616d289c937d4286d65ac82ae8f15b12 Binary files /dev/null and b/orchestration/__pycache__/__init__.cpython-311.pyc differ diff --git a/orchestration/__pycache__/langgraph_workflow.cpython-311.pyc b/orchestration/__pycache__/langgraph_workflow.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8d8383c734899ff9022a38e49fd0e7ce05df1d3c Binary files /dev/null and b/orchestration/__pycache__/langgraph_workflow.cpython-311.pyc differ diff --git a/orchestration/__pycache__/latency_tracker.cpython-311.pyc b/orchestration/__pycache__/latency_tracker.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..5f93889700b4d6f25c68b41e432f87cdf09e73f8 Binary files /dev/null and b/orchestration/__pycache__/latency_tracker.cpython-311.pyc differ diff --git a/orchestration/__pycache__/state.cpython-311.pyc b/orchestration/__pycache__/state.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..5e183290f2b717cb77f782cc37a8f27506c61bb4 Binary files /dev/null and b/orchestration/__pycache__/state.cpython-311.pyc differ diff --git a/orchestration/langgraph_workflow.py b/orchestration/langgraph_workflow.py new file mode 100644 index 0000000000000000000000000000000000000000..c149e451cfc0480ee69dcd25f506658e57135097 --- /dev/null +++ b/orchestration/langgraph_workflow.py @@ -0,0 +1,119 @@ +"""LangGraph Workflow - 9-node orchestration pipeline""" + +from langgraph.graph import StateGraph, END, START +from orchestration.state import ConversationState +from typing import Any, Dict +import logging +from orchestration.latency_tracker import get_tracker, reset_tracker + +# Import all nodes +from orchestration.nodes.sentiment_hybrid import sentiment_analysis_hybrid as sentiment_analysis_node +from orchestration.nodes.entity_extraction import entity_extraction_node +from orchestration.nodes.intent_detection import intent_detection_node +from orchestration.nodes.retrieval_router import retrieval_router_node +from orchestration.nodes.context_builder import context_builder_node +from orchestration.nodes.response_generation import response_generation_node +from orchestration.nodes.validation import validation_node +from orchestration.nodes.memory_persistence import memory_persistence_node +from orchestration.nodes.tts_generation import tts_generation_node + +logger = logging.getLogger(__name__) + +def build_workflow() -> StateGraph: + workflow = StateGraph(ConversationState) + + workflow.add_node("sentiment_analysis", sentiment_analysis_node) + workflow.add_node("entity_extraction", entity_extraction_node) + workflow.add_node("intent_detection", intent_detection_node) + workflow.add_node("retrieval_router", retrieval_router_node) + workflow.add_node("context_builder", context_builder_node) + workflow.add_node("response_generation", response_generation_node) + workflow.add_node("validation", validation_node) + workflow.add_node("memory_persistence", memory_persistence_node) + workflow.add_node("tts_generation", tts_generation_node) + + workflow.add_edge(START, "sentiment_analysis") + workflow.add_edge(START, "entity_extraction") + workflow.add_edge("sentiment_analysis", "intent_detection") + workflow.add_edge("entity_extraction", "intent_detection") + workflow.add_edge("intent_detection", "retrieval_router") + workflow.add_edge("retrieval_router", "context_builder") + workflow.add_edge("context_builder", "response_generation") + workflow.add_edge("response_generation", "validation") + + def should_regenerate(state: ConversationState) -> str: + return "memory_persistence" if state.get("validation_passed", False) else "response_generation" + + workflow.add_conditional_edges("validation", should_regenerate, {"memory_persistence": "memory_persistence", "response_generation": "response_generation"}) + workflow.add_edge("memory_persistence", "tts_generation") + workflow.add_edge("tts_generation", END) + + return workflow + + +# Compile the workflow +workflow = build_workflow() +compiled_workflow = workflow.compile() + + +async def run_workflow(user_input: str, customer_id: str) -> Dict[str, Any]: + """ + Execute the complete workflow + + Args: + user_input: Text from STT (user's speech converted to text) + customer_id: Unique customer identifier + + Returns: + Complete state with response, audio path, and metadata + """ + + try: + # Reset and start tracking + reset_tracker() + tracker = get_tracker() + tracker.start_total() + tracker.start("workflow_orchestration") + + # Initialize state + initial_state: ConversationState = { + "user_input": user_input, + "customer_id": customer_id, + "intent": {"intent": "unknown", "confidence": 0.0}, + "sentiment": {"label": "NEUTRAL", "score": 0.5}, + "entities": None, + "conversation_summary": "", + "kb_context": "", + "history_context": "", + "response": "", + "validation_passed": False, + "final_audio_path": None + } + + # Run workflow + final_state = await compiled_workflow.ainvoke(initial_state) + + tracker.end("workflow_orchestration") + + # Save and print results + latency_results = tracker.save_to_file() + tracker.print_summary() + + # Convert to regular dict and add latency info + result_dict = dict(final_state) + result_dict["latency_metrics"] = latency_results + + logger.info(f"Total workflow time: {latency_results['total_time_ms']} ms") + + return result_dict + + except Exception as e: + logger.error(f"Workflow execution failed: {str(e)}") + logger.error(f"Error type: {type(e).__name__}") + import traceback + logger.error(traceback.format_exc()) + raise + + +def get_workflow_graph(): + return compiled_workflow.get_graph().draw_mermaid() diff --git a/orchestration/latency_tracker.py b/orchestration/latency_tracker.py new file mode 100644 index 0000000000000000000000000000000000000000..5aaf8797e6f90314f2e66564fd1700cc8b4bf7b3 --- /dev/null +++ b/orchestration/latency_tracker.py @@ -0,0 +1,123 @@ +""" +Latency Tracker - Track execution time for each module +""" + +import time +import json +from pathlib import Path +from typing import Dict, Any +from datetime import datetime + + +class LatencyTracker: + """Track latency for each processing module""" + + def __init__(self): + self.timings: Dict[str, float] = {} + self.start_times: Dict[str, float] = {} + self.total_start = None + + def start_total(self): + """Start tracking total execution time""" + self.total_start = time.time() + + def start(self, module_name: str): + """Start timing a module""" + self.start_times[module_name] = time.time() + + def end(self, module_name: str): + """End timing a module""" + if module_name in self.start_times: + elapsed = time.time() - self.start_times[module_name] + self.timings[module_name] = round(elapsed * 1000, 2) # Convert to ms + del self.start_times[module_name] + + def get_results(self) -> Dict[str, Any]: + """Get all timing results""" + total_time = round((time.time() - self.total_start) * 1000, 2) if self.total_start else 0 + + return { + "timestamp": datetime.now().isoformat(), + "total_time_ms": total_time, + "modules": self.timings, + "breakdown_percent": self._calculate_percentages() + } + + def _calculate_percentages(self) -> Dict[str, float]: + """Calculate percentage of total time for each module""" + total = sum(self.timings.values()) + if total == 0: + return {} + return { + module: round((time_ms / total) * 100, 1) + for module, time_ms in self.timings.items() + } + + def save_to_file(self, filepath: str = "data/latency_results.json"): + """Save results to JSON file""" + results = self.get_results() + path = Path(filepath) + path.parent.mkdir(parents=True, exist_ok=True) + + # Append to existing results + existing = [] + if path.exists(): + try: + with open(path, 'r') as f: + loaded = json.load(f) + # Handle both list and dict formats for backward compatibility + if isinstance(loaded, list): + existing = loaded + elif isinstance(loaded, dict): + # If it's a dict, start fresh with a list + existing = [] + else: + existing = [] + except: + existing = [] + + existing.append(results) + + # Keep only last 100 results + if len(existing) > 100: + existing = existing[-100:] + + with open(path, 'w') as f: + json.dump(existing, f, indent=2) + + return results + + def print_summary(self): + """Print formatted summary""" + results = self.get_results() + print("\n" + "="*60) + print("LATENCY TRACKING RESULTS") + print("="*60) + print(f"Total Time: {results['total_time_ms']} ms") + print("\nModule Breakdown:") + print("-"*60) + + for module, time_ms in results['modules'].items(): + percent = results['breakdown_percent'].get(module, 0) + bar = "#" * int(percent / 2) # Visual bar + print(f"{module:25} {time_ms:8.2f} ms {percent:5.1f}% {bar}") + + print("="*60 + "\n") + + +# Global tracker instance +_tracker = None + + +def get_tracker() -> LatencyTracker: + """Get or create global tracker instance""" + global _tracker + if _tracker is None: + _tracker = LatencyTracker() + return _tracker + + +def reset_tracker(): + """Reset the global tracker""" + global _tracker + _tracker = LatencyTracker() diff --git a/orchestration/nodes/__init__.py b/orchestration/nodes/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..39862e3feee516099c3468cf9799474ff96180ec --- /dev/null +++ b/orchestration/nodes/__init__.py @@ -0,0 +1 @@ +"""Voice RAG Bot Orchestration Nodes""" diff --git a/orchestration/nodes/__pycache__/__init__.cpython-311.pyc b/orchestration/nodes/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..bbc4044b30cf7685fceaacd86467b82c42d2aa10 Binary files /dev/null and b/orchestration/nodes/__pycache__/__init__.cpython-311.pyc differ diff --git a/orchestration/nodes/__pycache__/context_builder.cpython-311.pyc b/orchestration/nodes/__pycache__/context_builder.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..46136e78b30d9f550c296e40768d870140c4e7cd Binary files /dev/null and b/orchestration/nodes/__pycache__/context_builder.cpython-311.pyc differ diff --git a/orchestration/nodes/__pycache__/entity_extraction.cpython-311.pyc b/orchestration/nodes/__pycache__/entity_extraction.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f8518faff572580b60955e37052f64ea12e3bed7 Binary files /dev/null and b/orchestration/nodes/__pycache__/entity_extraction.cpython-311.pyc differ diff --git a/orchestration/nodes/__pycache__/intent_detection.cpython-311.pyc b/orchestration/nodes/__pycache__/intent_detection.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..7339cc3c6c61e544ae1381dd57ef403a70e0a647 Binary files /dev/null and b/orchestration/nodes/__pycache__/intent_detection.cpython-311.pyc differ diff --git a/orchestration/nodes/__pycache__/memory_persistence.cpython-311.pyc b/orchestration/nodes/__pycache__/memory_persistence.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..0b7fcf1d48a8fade8c0a0df937555a70d4683ebc Binary files /dev/null and b/orchestration/nodes/__pycache__/memory_persistence.cpython-311.pyc differ diff --git a/orchestration/nodes/__pycache__/response_generation.cpython-311.pyc b/orchestration/nodes/__pycache__/response_generation.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..03a07bbba20c06937902a4b1818a73adde135bd6 Binary files /dev/null and b/orchestration/nodes/__pycache__/response_generation.cpython-311.pyc differ diff --git a/orchestration/nodes/__pycache__/retrieval_router.cpython-311.pyc b/orchestration/nodes/__pycache__/retrieval_router.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..da8548823867b1b0882740a64c28090dfe0761e1 Binary files /dev/null and b/orchestration/nodes/__pycache__/retrieval_router.cpython-311.pyc differ diff --git a/orchestration/nodes/__pycache__/sentiment_analysis.cpython-311.pyc b/orchestration/nodes/__pycache__/sentiment_analysis.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..1a54c63ea9c44d98b63ff554b37413acb2c28989 Binary files /dev/null and b/orchestration/nodes/__pycache__/sentiment_analysis.cpython-311.pyc differ diff --git a/orchestration/nodes/__pycache__/sentiment_hybrid.cpython-311.pyc b/orchestration/nodes/__pycache__/sentiment_hybrid.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..b0a54284d04b2de01e3282d5cde9d917adeda7ba Binary files /dev/null and b/orchestration/nodes/__pycache__/sentiment_hybrid.cpython-311.pyc differ diff --git a/orchestration/nodes/__pycache__/tts_generation.cpython-311.pyc b/orchestration/nodes/__pycache__/tts_generation.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..967a0aad382f58ac083586bfdea24b912390ab0d Binary files /dev/null and b/orchestration/nodes/__pycache__/tts_generation.cpython-311.pyc differ diff --git a/orchestration/nodes/__pycache__/validation.cpython-311.pyc b/orchestration/nodes/__pycache__/validation.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..13a6dea9aa88bf457b515b5fb1f31e9c0e009ba2 Binary files /dev/null and b/orchestration/nodes/__pycache__/validation.cpython-311.pyc differ diff --git a/orchestration/nodes/context_builder.py b/orchestration/nodes/context_builder.py new file mode 100644 index 0000000000000000000000000000000000000000..0a4cd2b2499dd2ac755199d217836517a66f6134 --- /dev/null +++ b/orchestration/nodes/context_builder.py @@ -0,0 +1,60 @@ +"""Context formatter for LLM prompts""" + +from orchestration.state import ConversationState +from typing import Dict, Any +from orchestration.latency_tracker import get_tracker + + +def context_builder_node(state: ConversationState) -> Dict[str, Any]: + """ + Build complete context string from all available information + for LLM to generate response + + Combines: + - User input + - Intent & sentiment + - KB context + - Customer history + - Conversation summary + + Returns: + State update (no new fields added, just confirmation) + """ + tracker = get_tracker() + tracker.start("context_builder") + + # Extract components + sentiment_label = state['sentiment']['label'] + sentiment_score = state['sentiment']['score'] + intent = state['intent']['intent'] + kb_context = state['kb_context'] + history_context = state['history_context'] + conversation_summary = state['conversation_summary'] + entities = state.get('entities', {}) + + # Build prompt context (this will be used by response_generation node) + # We just validate all components exist, they'll be used by next node + + # Prepare formatted context for logging/debugging + formatted_context = f""" +=== UNIFIED CONTEXT === +User Intent: {intent} +User Sentiment: {sentiment_label} (confidence: {sentiment_score:.2f}) + +KB Context: +{kb_context} + +Customer History: +{history_context} + +Conversation Summary: +{conversation_summary} + +Entities Detected: +{entities} +""" + + tracker.end("context_builder") + # Return minimal state update - context is already in state + # Next node (response_generation) will use these state fields directly + return {} diff --git a/orchestration/nodes/entity_extraction.py b/orchestration/nodes/entity_extraction.py new file mode 100644 index 0000000000000000000000000000000000000000..1c29e22903e4deeef96f4bd1a2561686c6e204bc --- /dev/null +++ b/orchestration/nodes/entity_extraction.py @@ -0,0 +1,58 @@ +"""Named entity extraction using BERT-NER""" + +from transformers import pipeline +from orchestration.state import ConversationState +from typing import Dict, Any +from orchestration.latency_tracker import get_tracker + + +# Global model cache +_ner_model = None + + +def get_ner_model(): + """Load NER model once and cache""" + global _ner_model + if _ner_model is None: + _ner_model = pipeline( + "token-classification", + model="dslim/bert-base-NER", + aggregation_strategy="simple" + ) + return _ner_model + + +def entity_extraction_node(state: ConversationState) -> Dict[str, Any]: + """ + Extract named entities from user input + Uses token classification model to identify entity types + + Returns: + state update with entities field + """ + tracker = get_tracker() + tracker.start("entity_extraction") + + try: + ner_pipeline = get_ner_model() + + # Extract entities + entities_raw = ner_pipeline(state['user_input']) + + # Format entities as dict with types + entities_dict = {} + for entity in entities_raw: + entity_type = entity['entity_group'] + if entity_type not in entities_dict: + entities_dict[entity_type] = [] + entities_dict[entity_type].append(entity['word']) + + # Return formatted entities + tracker.end("entity_extraction") + if entities_dict: + return {"entities": entities_dict} + else: + return {"entities": None} + except Exception as e: + tracker.end("entity_extraction") + return {"entities": None} diff --git a/orchestration/nodes/intent_detection.py b/orchestration/nodes/intent_detection.py new file mode 100644 index 0000000000000000000000000000000000000000..9bef583af7e0bff3a621be3de6d8aa3290a1a214 --- /dev/null +++ b/orchestration/nodes/intent_detection.py @@ -0,0 +1,61 @@ +"""Intent classification using Groq LLM""" + +from langchain_groq import ChatGroq +from langchain_core.prompts import PromptTemplate +from orchestration.state import ConversationState +from typing import Dict, Any +import json +from backend.config import settings +from orchestration.latency_tracker import get_tracker + + +def intent_detection_node(state: ConversationState) -> Dict[str, Any]: + """ + Detect user intent using Groq LLM + + Returns: + state update with intent field: + {"intent": {"intent": "...", "confidence": float}} + """ + tracker = get_tracker() + tracker.start("intent_detection") + + # Initialize Groq LLM + llm = ChatGroq( + model=settings.groq_model, + temperature=0.3, # Low temp for consistent intent detection + groq_api_key=settings.groq_api_key + ) + + # Prompt template for intent detection + intent_prompt = PromptTemplate( + input_variables=["user_input"], + template="""Analyze the user's input and determine their intent. Respond ONLY with JSON. + +User Input: {user_input} + +Possible intents: complaint, refund_request, inquiry, account_issue, escalation, billing, product_question, order_status, other + +Response format: +{{ + "intent": "", + "confidence": <0.0-1.0> +}}""" + ) + + # Generate intent using chain pattern + chain = intent_prompt | llm + response = chain.invoke({"user_input": state['user_input']}) + + try: + # Parse JSON response from LLM + intent_data = json.loads(response.content.strip()) + except json.JSONDecodeError: + # Fallback if JSON parsing fails + intent_data = { + "intent": "other", + "confidence": 0.5 + } + + tracker.end("intent_detection") + return {"intent": intent_data} diff --git a/orchestration/nodes/memory_persistence.py b/orchestration/nodes/memory_persistence.py new file mode 100644 index 0000000000000000000000000000000000000000..d55403e56a56c56dd07fc9256d2b10e1bc0a16c1 --- /dev/null +++ b/orchestration/nodes/memory_persistence.py @@ -0,0 +1,45 @@ +"""Conversation storage to Qdrant customer history""" + +from orchestration.state import ConversationState +from rag.qdrant_manager import qdrant_manager +from typing import Dict, Any +from orchestration.latency_tracker import get_tracker + + +def memory_persistence_node(state: ConversationState) -> Dict[str, Any]: + """ + Store conversation turn to customer history collection in Qdrant + Enables historical context retrieval for repeat customers + + Stores: + - Customer ID (for filtering) + - User input + response + - Intent classification + - Sentiment + - Timestamp + + Returns: + state update (minimal, side-effect is primary) + """ + tracker = get_tracker() + tracker.start("memory_persistence") + + # Combine user input and response for storage + conversation_text = f"User: {state['user_input']}\nAssistant: {state['response']}" + + # Determine interaction type from intent for categorization + intent = state['intent']['intent'] + interaction_type = intent + + # Store to Qdrant customer history + qdrant_manager.add_to_history( + customer_id=state['customer_id'], + text=conversation_text, + interaction_type=interaction_type + ) + + # Update conversation summary in memory (every 5 turns) + # For now, just store the current exchange + + tracker.end("memory_persistence") + return {} diff --git a/orchestration/nodes/response_generation.py b/orchestration/nodes/response_generation.py new file mode 100644 index 0000000000000000000000000000000000000000..6f9748e37c0bfbf00f8eb4d3bccdcf5a47bd816f --- /dev/null +++ b/orchestration/nodes/response_generation.py @@ -0,0 +1,93 @@ +"""Response generation using Groq LLM""" + +from langchain_groq import ChatGroq +from langchain_core.prompts import PromptTemplate +from orchestration.state import ConversationState +from typing import Dict, Any +import logging +from backend.config import settings +from orchestration.latency_tracker import get_tracker + +logger = logging.getLogger(__name__) + + +def response_generation_node(state: ConversationState) -> Dict[str, Any]: + """ + Generate final response using Groq LLM + Incorporates KB context, customer history, intent, and sentiment + + Returns: + state update with response field + """ + tracker = get_tracker() + tracker.start("response_generation") + + try: + logger.info("Response Generation: Initializing Groq LLM...") + + # Initialize Groq LLM + llm = ChatGroq( + model=settings.groq_model, + temperature=settings.groq_temperature, + max_tokens=settings.groq_max_tokens, + groq_api_key=settings.groq_api_key + ) + + # Determine tone based on sentiment + sentiment_label = state['sentiment']['label'] + tone_instruction = { + "POSITIVE": "Use a friendly, upbeat tone.", + "NEGATIVE": "Use an empathetic, understanding tone. Acknowledge frustration.", + "NEUTRAL": "Use a professional, helpful tone." + }.get(sentiment_label, "Use a professional tone.") + + # Build response prompt + response_prompt = PromptTemplate( + input_variables=[ + "user_input", + "intent", + "kb_context", + "history_context", + "tone_instruction" + ], + template="""You are a helpful customer service AI assistant. + +User Intent: {intent} +{tone_instruction} + +Knowledge Base Context: +{kb_context} + +Customer History: +{history_context} + +User Message: {user_input} + +Provide a helpful, accurate response based on the context above. Keep response concise (2-3 sentences). +If you don't have relevant information, say so clearly.""" + ) + + # Generate response using chain pattern + logger.info("๐Ÿค– Response Generation: Invoking LLM chain...") + chain = response_prompt | llm + response = chain.invoke({ + "user_input": state['user_input'], + "intent": state['intent']['intent'], + "kb_context": state['kb_context'], + "history_context": state['history_context'], + "tone_instruction": tone_instruction + }) + + response_text = response.content.strip() + logger.info(f"โœ… Response generated: '{response_text[:80]}...'") + + tracker.end("response_generation") + return {"response": response_text} + + except Exception as e: + logger.error(f"โŒ Response generation failed: {type(e).__name__}: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + tracker.end("response_generation") + # Return fallback response + return {"response": "I apologize, but I encountered an error processing your request. Please try again."} diff --git a/orchestration/nodes/retrieval_router.py b/orchestration/nodes/retrieval_router.py new file mode 100644 index 0000000000000000000000000000000000000000..66631e30af2aefea36e9b97ddb8fbecd97499274 --- /dev/null +++ b/orchestration/nodes/retrieval_router.py @@ -0,0 +1,51 @@ +"""Dual RAG routing - knowledge base + customer history""" + +from orchestration.state import ConversationState +from rag.qdrant_manager import qdrant_manager +from typing import Dict, Any +from orchestration.latency_tracker import get_tracker + + +def retrieval_router_node(state: ConversationState) -> Dict[str, Any]: + """ + Dual RAG retrieval strategy: + 1. Always search knowledge base for relevant policies/docs + 2. For specific intents (complaint, refund, escalation), also search customer history + + Returns: + state update with kb_context and history_context + """ + tracker = get_tracker() + tracker.start("retrieval_router") + + user_input = state['user_input'] + customer_id = state['customer_id'] + intent = state['intent']['intent'] # Intent classification result + + # Always retrieve from knowledge base + kb_results = qdrant_manager.search_kb(user_input, limit=3) + kb_context = "\n".join([ + f"- [{r['source']}] {r['text']} (relevance: {r['score']:.2f})" + for r in kb_results + ]) + + # Conditionally retrieve from customer history + history_context = "" + history_intents = ["complaint", "refund_request", "escalation", "billing_inquiry", "billing", "negative_sentiment"] + + if intent in history_intents or state['sentiment']['label'] == "NEGATIVE": + history_results = qdrant_manager.search_history( + user_input, + customer_id, + limit=3 + ) + history_context = "\n".join([ + f"- [{r['interaction_type']}] {r['text']} (relevance: {r['score']:.2f})" + for r in history_results + ]) + + tracker.end("retrieval_router") + return { + "kb_context": kb_context if kb_results else "No relevant policies found.", + "history_context": history_context if history_context else "No customer history available." + } diff --git a/orchestration/nodes/sentiment_analysis.py b/orchestration/nodes/sentiment_analysis.py new file mode 100644 index 0000000000000000000000000000000000000000..057f488a04688c69bf20acbc5baa0b11230e396c --- /dev/null +++ b/orchestration/nodes/sentiment_analysis.py @@ -0,0 +1,49 @@ +""" +Sentiment Analysis Node - Emotion detection using DistilBERT +Analyzes user input sentiment for tone-aware response generation +""" + +from transformers import pipeline +from orchestration.state import ConversationState +from typing import Dict, Any + + +# Global model cache +_sentiment_model = None + + +def get_sentiment_model(): + """Load model once and cache""" + global _sentiment_model + if _sentiment_model is None: + _sentiment_model = pipeline( + "sentiment-analysis", + model="distilbert-base-uncased-finetuned-sst-2-english" + ) + return _sentiment_model + + +def sentiment_analysis_node(state: ConversationState) -> Dict[str, Any]: + """ + Analyze sentiment of user input using DistilBERT + + Returns: + state update with sentiment field populated: + {"sentiment": {"label": "POSITIVE|NEGATIVE|NEUTRAL", "score": float}} + """ + try: + # Use cached model + sentiment_pipeline = get_sentiment_model() + + # Analyze ONLY user input + result = sentiment_pipeline(state['user_input'])[0] + + sentiment = { + "label": result['label'].upper(), # POSITIVE, NEGATIVE, or NEUTRAL + "score": result['score'] + } + + return {"sentiment": sentiment} + except Exception as e: + # Default to neutral on error + return {"sentiment": {"label": "NEUTRAL", "score": 0.5}} diff --git a/orchestration/nodes/sentiment_hybrid.py b/orchestration/nodes/sentiment_hybrid.py new file mode 100644 index 0000000000000000000000000000000000000000..fa941bb646e785f44c838958e848eb6ea3958ac0 --- /dev/null +++ b/orchestration/nodes/sentiment_hybrid.py @@ -0,0 +1,133 @@ +"""Hybrid sentiment classifier - keyword-based + model fallback""" + +from orchestration.state import ConversationState +from typing import Dict, Any +from transformers import pipeline +from orchestration.latency_tracker import get_tracker + + +# Global model cache +_sentiment_model = None + + +def get_sentiment_model(): + """Load model once and cache""" + global _sentiment_model + if _sentiment_model is None: + _sentiment_model = pipeline( + "sentiment-analysis", + model="distilbert-base-uncased-finetuned-sst-2-english" + ) + return _sentiment_model + + +def sentiment_analysis_hybrid(state: ConversationState) -> Dict[str, Any]: + """ + Hybrid sentiment classification: + 1. Check FAQ keywords โ†’ NEUTRAL + 2. Check explicit sentiment words โ†’ POSITIVE/NEGATIVE + 3. Fall back to DistilBERT model + + Returns: + {"sentiment": {"label": "POSITIVE|NEGATIVE|NEUTRAL", "score": 0.95}} + """ + tracker = get_tracker() + tracker.start("sentiment_analysis") + + user_input = state['user_input'].lower() + + # Step 1: FAQ keywords โ†’ Always NEUTRAL (domain-specific) + faq_keywords = [ + "policy", "return", "warranty", "shipping", "account", + "details", "information", "how", "what", "when", "where", + "can i", "do you", "tell me", "help", "need", "about" + ] + + if any(kw in user_input for kw in faq_keywords): + # Still check for strong sentiment words within FAQ + strong_negative = ["frustrated", "angry", "hate", "terrible", "broken", "worst", "useless"] + strong_positive = ["thank", "love", "great", "excellent", "amazing", "perfect"] + + if any(word in user_input for word in strong_negative): + tracker.end("sentiment_analysis") + return { + "sentiment": { + "label": "NEGATIVE", + "score": 0.95, + "reason": "Complaint with sentiment" + } + } + elif any(word in user_input for word in strong_positive): + tracker.end("sentiment_analysis") + return { + "sentiment": { + "label": "POSITIVE", + "score": 0.95, + "reason": "Praise with sentiment" + } + } + else: + # Pure FAQ question = NEUTRAL + tracker.end("sentiment_analysis") + return { + "sentiment": { + "label": "NEUTRAL", + "score": 0.99, + "reason": "FAQ inquiry" + } + } + + # Step 2: Explicit strong sentiment words + strong_negative = [ + "frustrated", "angry", "hate", "terrible", "broken", + "worst", "useless", "disaster", "awful", "horrible", + "unacceptable", "disgusted", "disappointed" + ] + strong_positive = [ + "thank", "love", "great", "excellent", "amazing", + "perfect", "wonderful", "fantastic", "awesome", "impressed" + ] + + if any(word in user_input for word in strong_negative): + tracker.end("sentiment_analysis") + return { + "sentiment": { + "label": "NEGATIVE", + "score": 0.95, + "reason": "Strong negative sentiment" + } + } + + if any(word in user_input for word in strong_positive): + tracker.end("sentiment_analysis") + return { + "sentiment": { + "label": "POSITIVE", + "score": 0.95, + "reason": "Strong positive sentiment" + } + } + + # Step 3: Fall back to DistilBERT model + try: + sentiment_pipeline = get_sentiment_model() + result = sentiment_pipeline(state['user_input'])[0] + + tracker.end("sentiment_analysis") + return { + "sentiment": { + "label": result['label'].upper(), + "score": result['score'], + "reason": "Model inference" + } + } + except Exception as e: + # Default to neutral on error + tracker.end("sentiment_analysis") + return { + "sentiment": { + "label": "NEUTRAL", + "score": 0.5, + "reason": "Error - defaulting to neutral" + } + } diff --git a/orchestration/nodes/tts_generation.py b/orchestration/nodes/tts_generation.py new file mode 100644 index 0000000000000000000000000000000000000000..7d114a9ee87dd5cdc0ed9b26f6fdbc9fc2178b5d --- /dev/null +++ b/orchestration/nodes/tts_generation.py @@ -0,0 +1,72 @@ +"""Text-to-speech generation using gTTS""" + +from gtts import gTTS +from orchestration.state import ConversationState +from typing import Dict, Any +import os +import logging +from pathlib import Path +from orchestration.latency_tracker import get_tracker + +logger = logging.getLogger(__name__) + + +def tts_generation_node(state: ConversationState) -> Dict[str, Any]: + """ + Convert response text to speech using gTTS + Saves audio file and returns path + + Returns: + state update with final_audio_path field + """ + tracker = get_tracker() + tracker.start("tts_generation") + + response_text = state.get('response', '') + + # Validate response text exists + if not response_text or len(response_text.strip()) == 0: + logger.warning("โš ๏ธ TTS: No response text to convert to speech") + tracker.end("tts_generation") + return {"final_audio_path": None} + + # Create output directory if doesn't exist + audio_dir = Path("data/audio_output") + try: + audio_dir.mkdir(parents=True, exist_ok=True) + except Exception as e: + logger.error(f"โŒ TTS: Failed to create audio directory: {e}") + tracker.end("tts_generation") + return {"final_audio_path": None} + + # Generate unique filename + customer_id = state.get('customer_id', 'UNKNOWN') + import datetime + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:19] + audio_filename = f"bot_response_{customer_id}_{timestamp}.mp3" + audio_path = audio_dir / audio_filename + + try: + logger.info(f"๐Ÿ“ข TTS: Generating audio for: '{response_text[:50]}...'") + + # Generate TTS + tts = gTTS(text=response_text, lang='en', slow=False) + tts.save(str(audio_path)) + + # Verify file was created + if audio_path.exists(): + file_size = audio_path.stat().st_size + logger.info(f"โœ… TTS: Audio generated successfully ({file_size} bytes) -> {audio_path}") + final_audio_path = str(audio_path) + else: + logger.error(f"โŒ TTS: File created but not found at {audio_path}") + final_audio_path = None + + except Exception as e: + logger.error(f"โŒ TTS generation failed: {type(e).__name__}: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + final_audio_path = None + + tracker.end("tts_generation") + return {"final_audio_path": final_audio_path} diff --git a/orchestration/nodes/validation.py b/orchestration/nodes/validation.py new file mode 100644 index 0000000000000000000000000000000000000000..b1d082e8eb9e45d9472534b02d06e9a026200e4b --- /dev/null +++ b/orchestration/nodes/validation.py @@ -0,0 +1,61 @@ +"""Response quality validation""" + +from orchestration.state import ConversationState +from typing import Dict, Any +import logging +from orchestration.latency_tracker import get_tracker + +logger = logging.getLogger(__name__) + + +def validation_node(state: ConversationState) -> Dict[str, Any]: + """ + Validate generated response against quality criteria: + 1. Length checks (not too short, not too long) + 2. Tone-sentiment consistency + 3. Basic sanity checks + + Returns: + state update with validation_passed boolean + """ + tracker = get_tracker() + tracker.start("validation") + + response = state.get('response', '') + sentiment = state.get('sentiment', {}).get('label', 'NEUTRAL') + + # Check 1: Response length (between 50-500 characters) + response_length = len(response) + length_valid = 50 <= response_length <= 500 + + # Check 2: Tone-sentiment consistency + tone_checks = { + "NEGATIVE": { + "forbidden_words": ["happy", "excellent", "amazing"], + "required_sentiment": ["understand", "apologize", "help"] + }, + "POSITIVE": { + "forbidden_words": ["sorry", "problem", "issue"], + "required_sentiment": ["great", "happy", "enjoy"] + } + } + + response_lower = response.lower() + tone_valid = True + + if sentiment in tone_checks: + checks = tone_checks[sentiment] + # Check forbidden words aren't present + forbidden_present = any(word in response_lower for word in checks.get("forbidden_words", [])) + tone_valid = not forbidden_present + + # Check 3: Response not empty + content_valid = len(response.strip()) > 0 + + # Overall validation + validation_passed = length_valid and content_valid and tone_valid + + logger.info(f"โœ“ Validation: length={response_length} ({length_valid}), content={content_valid}, tone={tone_valid} -> {'PASS' if validation_passed else 'FAIL'}") + + tracker.end("validation") + return {"validation_passed": validation_passed} diff --git a/orchestration/state.py b/orchestration/state.py new file mode 100644 index 0000000000000000000000000000000000000000..675690ee2cd895c4ac91a21e2865d8f86a7e80bd --- /dev/null +++ b/orchestration/state.py @@ -0,0 +1,67 @@ +""" +LangGraph State Definition - Central State Management for Voice RAG Bot Workflow +Defines all data flowing through the orchestration pipeline +""" + +from typing import TypedDict, List, Optional, Dict, Any + + +class ConversationState(TypedDict): + """ + Complete state passed through LangGraph nodes + + Fields: + - user_input: Original text from voice input (after STT) + - customer_id: Unique customer identifier for history tracking + - intent: Intent detection result with confidence score + - sentiment: Sentiment analysis result with label and confidence + - entities: Extracted entities from user input (optional) + - conversation_summary: LLM-generated summary of conversation + - kb_context: Retrieved context from knowledge base + - history_context: Retrieved context from customer history (persistent memory) + - response: Final LLM-generated response text + - validation_passed: Boolean flag for response validation + - final_audio_path: Path to generated TTS audio file + """ + + # Input & Context + user_input: str + customer_id: str + + # NLP Analysis Results + intent: Dict[str, Any] # {"intent": "...", "confidence": float} + sentiment: Dict[str, Any] # {"label": "POSITIVE|NEGATIVE|NEUTRAL", "score": float} + entities: Optional[Dict[str, Any]] # {"entity_type": [...], ...} + + # Memory Management + conversation_summary: str # LLM-generated summary + + # RAG Contexts + kb_context: str # Knowledge base retrieval results + history_context: str # Customer history retrieval results + + # Response Generation + response: str # Final LLM-generated response + + # Validation & Output + validation_passed: bool + final_audio_path: Optional[str] + + +class ConversationStateOptional(TypedDict, total=False): + """ + Optional version of ConversationState for partial updates + Allows nodes to update only the fields they produce + """ + + user_input: str + customer_id: str + intent: Dict[str, Any] + sentiment: Dict[str, Any] + entities: Optional[Dict[str, Any]] + conversation_summary: str + kb_context: str + history_context: str + response: str + validation_passed: bool + final_audio_path: Optional[str] diff --git a/rag/__init__.py b/rag/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..ab83d1e74e7cf2cce07663ae74802150dd24aedc --- /dev/null +++ b/rag/__init__.py @@ -0,0 +1 @@ +"""Voice RAG Bot RAG (Retrieval-Augmented Generation) Package""" diff --git a/rag/__pycache__/__init__.cpython-311.pyc b/rag/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..fc5cf302c27e99222eeafbb69cd86ae3274847ea Binary files /dev/null and b/rag/__pycache__/__init__.cpython-311.pyc differ diff --git a/rag/__pycache__/cache_manager.cpython-311.pyc b/rag/__pycache__/cache_manager.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..1de00c8f87ff954233e39b33e6d7ce5f87f8f87d Binary files /dev/null and b/rag/__pycache__/cache_manager.cpython-311.pyc differ diff --git a/rag/__pycache__/embedding_manager.cpython-311.pyc b/rag/__pycache__/embedding_manager.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..330d12280fb38645a75aada338b4e6d7a4c62bf6 Binary files /dev/null and b/rag/__pycache__/embedding_manager.cpython-311.pyc differ diff --git a/rag/__pycache__/qdrant_manager.cpython-311.pyc b/rag/__pycache__/qdrant_manager.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..b54bec49294f04ae55d8ef34c5f2290c732e8946 Binary files /dev/null and b/rag/__pycache__/qdrant_manager.cpython-311.pyc differ diff --git a/rag/__pycache__/session_manager.cpython-311.pyc b/rag/__pycache__/session_manager.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8e58026a69ec2945a93707e5fc6a1ebd70e0f364 Binary files /dev/null and b/rag/__pycache__/session_manager.cpython-311.pyc differ diff --git a/rag/__pycache__/tts_generator.cpython-311.pyc b/rag/__pycache__/tts_generator.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..97b26daae44afd3aef9e2efaf4520d348f7d07f3 Binary files /dev/null and b/rag/__pycache__/tts_generator.cpython-311.pyc differ diff --git a/rag/cache_manager.py b/rag/cache_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..4a445b2928d0564130464d79810696bd7afb2b0c --- /dev/null +++ b/rag/cache_manager.py @@ -0,0 +1,134 @@ +""" +Redis Cache Manager - Response caching for faster repeated queries +Reduces processing time for identical or similar queries +""" + +import redis +import json +import hashlib +from typing import Optional, Dict, Any +from datetime import timedelta +import os + + +class CacheManager: + """Manages response caching using Redis""" + + def __init__(self, host: str = "localhost", port: int = 6379, ttl_minutes: int = 30): + """ + Initialize Redis connection + + Args: + host: Redis server host + port: Redis server port + ttl_minutes: Time-to-live for cached responses + """ + try: + self.client = redis.Redis( + host=host, + port=port, + db=0, + decode_responses=True, + socket_connect_timeout=5 + ) + # Test connection + self.client.ping() + self.available = True + print("[OK] Redis cache connected") + except Exception as e: + self.available = False + print(f"[WARN] Redis cache unavailable: {e}") + self.client = None + + self.ttl = timedelta(minutes=ttl_minutes) + + def _generate_key(self, customer_id: str, user_input: str) -> str: + """Generate cache key from customer_id and user_input""" + combined = f"{customer_id}:{user_input.lower().strip()}" + # Use hash to keep key reasonable length + return f"response:{hashlib.md5(combined.encode()).hexdigest()}" + + def get(self, customer_id: str, user_input: str) -> Optional[Dict[str, Any]]: + """ + Retrieve cached response + + Returns: + Cached response dict or None if not found/expired + """ + if not self.available: + return None + + try: + key = self._generate_key(customer_id, user_input) + cached = self.client.get(key) + + if cached: + print(f"[CACHE HIT] {customer_id}: {user_input[:30]}...") + return json.loads(cached) + + return None + except Exception as e: + print(f"[CACHE] Get error: {e}") + return None + + def set(self, customer_id: str, user_input: str, response: Dict[str, Any]) -> bool: + """ + Cache a response + + Returns: + True if cached successfully, False otherwise + """ + if not self.available: + return False + + try: + key = self._generate_key(customer_id, user_input) + self.client.setex( + key, + self.ttl, + json.dumps(response) + ) + print(f"[CACHE SET] {customer_id}: {user_input[:30]}...") + return True + except Exception as e: + print(f"[CACHE] Set error: {e}") + return False + + def clear(self, customer_id: Optional[str] = None) -> bool: + """ + Clear cache for customer or all + + Returns: + True if cleared successfully + """ + if not self.available: + return False + + try: + if customer_id: + # Clear only this customer's cache + pattern = f"response:{hashlib.md5(f'{customer_id}:'.encode()).hexdigest()}*" + # Simple: just clear all response keys for this customer + keys = self.client.keys(f"response:*") + for key in keys: + self.client.delete(key) + else: + # Clear all response cache + self.client.delete(*self.client.keys("response:*")) + + return True + except Exception as e: + print(f"[CACHE] Clear error: {e}") + return False + + +# Global cache instance +cache_manager = None + + +def get_cache_manager() -> CacheManager: + """Get or create global cache manager""" + global cache_manager + if cache_manager is None: + cache_manager = CacheManager() + return cache_manager diff --git a/rag/embedding_manager.py b/rag/embedding_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..ddc58430eea9ee1fd8ff1897991ee089db3fc600 --- /dev/null +++ b/rag/embedding_manager.py @@ -0,0 +1,65 @@ +""" +Embedding Manager - Singleton for BGE-M3 embeddings +Handles text โ†’ vector conversion for RAG queries and persistence +""" + +from sentence_transformers import SentenceTransformer +from typing import List +import numpy as np +from backend.config import settings + + +class EmbeddingManager: + """Singleton for managing embeddings with BGE-M3""" + + _instance = None + _model = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if not self._initialized: + self.model = SentenceTransformer(settings.embedding_model) + self._initialized = True + + def embed(self, text: str) -> List[float]: + """ + Convert single text to embedding vector + + Args: + text: Input text to embed + + Returns: + List of floats (1024-dimensional for BGE-M3) + """ + embedding = self.model.encode(text, convert_to_tensor=False) + return embedding.tolist() + + def embed_batch(self, texts: List[str]) -> List[List[float]]: + """ + Convert multiple texts to embeddings (efficient batch processing) + + Args: + texts: List of texts to embed + + Returns: + List of embedding vectors + """ + embeddings = self.model.encode( + texts, + batch_size=settings.embedding_batch_size, + convert_to_tensor=False + ) + return embeddings.tolist() + + def get_dimension(self) -> int: + """Get embedding dimension""" + return settings.vector_dimension + + +# Global singleton instance +embedding_manager = EmbeddingManager() diff --git a/rag/qdrant_manager.py b/rag/qdrant_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..47c85a31cc9ead49ac4cae99baebd872a2491e8f --- /dev/null +++ b/rag/qdrant_manager.py @@ -0,0 +1,195 @@ +""" +Qdrant Manager - Vector database client for RAG persistence and retrieval +Manages collections, embeddings, and search operations +""" + +from qdrant_client import QdrantClient +from qdrant_client.models import Distance, VectorParams, PointStruct +from typing import List, Dict, Any, Optional +from uuid import uuid4 +from backend.config import settings + + +class QdrantManager: + """Singleton for managing Qdrant vector database""" + + _instance = None + _client = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if not self._initialized: + # Initialize Qdrant client + self.client = QdrantClient( + url=settings.qdrant_url, + api_key=settings.qdrant_api_key + ) + self._initialized = True + self._ensure_collections() + + def _ensure_collections(self): + """Create collections if they don't exist""" + collections = [ + settings.kb_collection_name, + settings.history_collection_name + ] + + for collection_name in collections: + try: + # Try to get collection info + self.client.get_collection(collection_name) + except Exception: + # Create if doesn't exist + self.client.create_collection( + collection_name=collection_name, + vectors_config=VectorParams( + size=settings.vector_dimension, + distance=Distance.COSINE + ) + ) + print(f"โœ… Created collection: {collection_name}") + + def add_to_kb(self, documents: List[Dict[str, Any]]) -> None: + """ + Add documents to knowledge base collection + + Args: + documents: List of dicts with 'id', 'text', 'embedding' keys + """ + from rag.embedding_manager import embedding_manager + + points = [] + for doc in documents: + embedding = embedding_manager.embed(doc['text']) + point = PointStruct( + id=int(uuid4().int % (10**8)), + vector=embedding, + payload={ + "text": doc['text'], + "source": doc.get('source', 'unknown'), + "document_id": doc.get('document_id', str(uuid4())) + } + ) + points.append(point) + + self.client.upsert( + collection_name=settings.kb_collection_name, + points=points + ) + print(f"โœ… Added {len(documents)} documents to KB collection") + + def search_kb(self, query: str, limit: int = 3) -> List[Dict[str, Any]]: + """ + Search knowledge base + + Args: + query: Search query text + limit: Number of results to return + + Returns: + List of similar documents + """ + from rag.embedding_manager import embedding_manager + + query_embedding = embedding_manager.embed(query) + + results = self.client.query_points( + collection_name=settings.kb_collection_name, + query=query_embedding, + limit=limit + ).points + + return [ + { + "text": r.payload['text'], + "source": r.payload.get('source', 'unknown'), + "score": r.score + } + for r in results + ] + + def search_history(self, query: str, customer_id: str, limit: int = 3) -> List[Dict[str, Any]]: + """ + Search customer history with customer_id filter + + Args: + query: Search query text + customer_id: Filter by customer + limit: Number of results + + Returns: + List of similar history records for customer + """ + from rag.embedding_manager import embedding_manager + + query_embedding = embedding_manager.embed(query) + + results = self.client.query_points( + collection_name=settings.history_collection_name, + query=query_embedding, + limit=limit, + query_filter={ + "must": [ + { + "key": "customer_id", + "match": {"value": customer_id} + } + ] + } + ).points + + return [ + { + "text": r.payload['text'], + "customer_id": r.payload.get('customer_id'), + "interaction_type": r.payload.get('interaction_type'), + "score": r.score + } + for r in results + ] + + def add_to_history(self, customer_id: str, text: str, interaction_type: str) -> None: + """ + Add conversation to customer history + + Args: + customer_id: Customer identifier + text: Conversation text + interaction_type: Type of interaction (e.g., 'complaint', 'refund_request') + """ + from rag.embedding_manager import embedding_manager + + embedding = embedding_manager.embed(text) + point = PointStruct( + id=int(uuid4().int % (10**8)), + vector=embedding, + payload={ + "customer_id": customer_id, + "text": text, + "interaction_type": interaction_type, + "timestamp": str(__import__('datetime').datetime.now()) + } + ) + + self.client.upsert( + collection_name=settings.history_collection_name, + points=[point] + ) + + def get_collection_info(self, collection_name: str) -> Dict[str, Any]: + """Get collection statistics""" + info = self.client.get_collection(collection_name) + return { + "name": collection_name, + "points_count": info.points_count, + "vectors_count": info.vectors_count + } + + +# Global singleton instance +qdrant_manager = QdrantManager() diff --git a/rag/session_manager.py b/rag/session_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..36195fefa9e451c174f8c173a896a02e8f6828cf --- /dev/null +++ b/rag/session_manager.py @@ -0,0 +1,189 @@ +""" +Conversation Session Manager - Track conversation history and context +Uses SQLite for lightweight persistence +""" + +import sqlite3 +import json +from datetime import datetime +from typing import Optional, List, Dict, Any +from pathlib import Path +import uuid + + +class SessionManager: + """Manages conversation sessions and history""" + + def __init__(self, db_path: str = "data/sessions.db"): + """Initialize SQLite database for sessions""" + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + + # Create tables if not exist + self._init_db() + + def _init_db(self): + """Initialize database schema""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + # Sessions table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS sessions ( + session_id TEXT PRIMARY KEY, + customer_id TEXT NOT NULL, + start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + end_time TIMESTAMP, + status TEXT DEFAULT 'active', + message_count INTEGER DEFAULT 0, + context TEXT + ) + """) + + # Messages table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + role TEXT NOT NULL, + content TEXT NOT NULL, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + intent TEXT, + sentiment TEXT, + kb_context TEXT, + FOREIGN KEY (session_id) REFERENCES sessions(session_id) + ) + """) + + conn.commit() + + def create_session(self, customer_id: str) -> str: + """ + Create new conversation session + + Returns: + session_id + """ + session_id = str(uuid.uuid4()) + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute(""" + INSERT INTO sessions (session_id, customer_id, status) + VALUES (?, ?, ?) + """, (session_id, customer_id, 'active')) + conn.commit() + + print(f"[SESSION] Created: {session_id[:8]}... for {customer_id}") + return session_id + + def add_message( + self, + session_id: str, + role: str, + content: str, + intent: Optional[str] = None, + sentiment: Optional[str] = None, + kb_context: Optional[str] = None + ) -> bool: + """ + Add message to session + + Args: + session_id: Session ID + role: "user" or "assistant" + content: Message content + intent: Detected intent (optional) + sentiment: Sentiment label (optional) + kb_context: Retrieved KB context (optional) + """ + try: + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + # Insert message + cursor.execute(""" + INSERT INTO messages + (session_id, role, content, intent, sentiment, kb_context) + VALUES (?, ?, ?, ?, ?, ?) + """, (session_id, role, content, intent, sentiment, kb_context)) + + # Update message count + cursor.execute(""" + UPDATE sessions + SET message_count = message_count + 1 + WHERE session_id = ? + """, (session_id,)) + + conn.commit() + + return True + except Exception as e: + print(f"[SESSION] Add message error: {e}") + return False + + def get_session_history(self, session_id: str) -> List[Dict[str, Any]]: + """Retrieve all messages in a session""" + try: + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + cursor.execute(""" + SELECT * FROM messages + WHERE session_id = ? + ORDER BY timestamp ASC + """, (session_id,)) + + rows = cursor.fetchall() + return [dict(row) for row in rows] + except Exception as e: + print(f"[SESSION] Get history error: {e}") + return [] + + def close_session(self, session_id: str) -> bool: + """Close a conversation session""" + try: + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute(""" + UPDATE sessions + SET status = 'closed', end_time = CURRENT_TIMESTAMP + WHERE session_id = ? + """, (session_id,)) + conn.commit() + + print(f"[SESSION] Closed: {session_id[:8]}...") + return True + except Exception as e: + print(f"[SESSION] Close error: {e}") + return False + + def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]: + """Get session metadata""" + try: + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + cursor.execute(""" + SELECT * FROM sessions WHERE session_id = ? + """, (session_id,)) + + row = cursor.fetchone() + return dict(row) if row else None + except Exception as e: + print(f"[SESSION] Get info error: {e}") + return None + + +# Global session manager instance +session_manager = None + + +def get_session_manager() -> SessionManager: + """Get or create global session manager""" + global session_manager + if session_manager is None: + session_manager = SessionManager() + return session_manager diff --git a/rag/tts_generator.py b/rag/tts_generator.py new file mode 100644 index 0000000000000000000000000000000000000000..fe298b1f97fbd97d589ca9ce9dad245ff2afd082 --- /dev/null +++ b/rag/tts_generator.py @@ -0,0 +1,93 @@ +""" +Text-to-Speech Generator - Converts bot responses to audio +Uses gTTS with professional customer support voice +""" + +from gtts import gTTS +from pathlib import Path +from datetime import datetime +import json + + +class TTSGenerator: + """Generates audio files for bot responses""" + + def __init__(self, output_dir: str = "data/audio_output"): + """Initialize TTS generator + + Args: + output_dir: Directory to save audio files + """ + self.output_dir = Path(output_dir) + self.output_dir.mkdir(parents=True, exist_ok=True) + + def generate_audio(self, text: str, customer_id: str, session_id: str) -> str: + """ + Generate audio file from text + + Args: + text: Response text to convert to speech + customer_id: Customer ID for file organization + session_id: Session ID for context + + Returns: + Path to generated audio file + """ + try: + # Create filename with timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"bot_response_{customer_id}_{session_id[:8]}_{timestamp}.mp3" + filepath = self.output_dir / filename + + # Generate speech with gTTS + # Using slower speech rate (slow=True) for clarity + # Language: English (US) for professional tone + tts = gTTS( + text=text, + lang='en', + slow=True, # Slower speech for clarity + tld='com' # Top-level domain for better quality + ) + + # Save to file + tts.save(str(filepath)) + + print(f"[TTS] Generated audio: {filename} ({len(text)} chars)") + return str(filepath) + + except Exception as e: + print(f"[TTS ERROR] Failed to generate audio: {str(e)}") + return "" + + def generate_greeting(self, customer_id: str) -> str: + """Generate audio for greeting message""" + greeting_text = "Hello! How can I help you today?" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"greeting_{customer_id}_{timestamp}.mp3" + filepath = self.output_dir / filename + + try: + tts = gTTS( + text=greeting_text, + lang='en', + slow=True, + tld='com' + ) + tts.save(str(filepath)) + print(f"[TTS] Generated greeting: {filename}") + return str(filepath) + except Exception as e: + print(f"[TTS ERROR] Failed to generate greeting: {str(e)}") + return "" + + +# Global TTS generator instance +_tts_generator = None + + +def get_tts_generator() -> TTSGenerator: + """Get or create global TTS generator""" + global _tts_generator + if _tts_generator is None: + _tts_generator = TTSGenerator() + return _tts_generator diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..ceb9a03a51e069e46c66e95a1cdace86bf2443ea --- /dev/null +++ b/requirements.txt @@ -0,0 +1,35 @@ +# Core Framework +langchain>=0.1.0 +langgraph>=0.0.20 +langchain-groq>=0.1.0 + +# STT +faster-whisper>=0.9.0 + +# NLP Models +sentence-transformers>=2.2.0 +transformers>=4.30.0 +torch>=2.0.0 + +# Vector DB +qdrant-client>=1.7.0 + +# Backend +fastapi>=0.100.0 +uvicorn>=0.23.0 +python-multipart>=0.0.5 + +# Frontend +streamlit>=1.27.0 + +# Utilities +python-dotenv>=1.0.0 +pydantic>=2.0.0 +pydantic-settings>=2.0.0 +requests>=2.31.0 +aiofiles>=23.0.0 +numpy>=1.23.0 +redis>=4.5.0 + +# TTS (Text-to-Speech) +gtts>=2.3.0 diff --git a/run_voice_bot_demo.py b/run_voice_bot_demo.py new file mode 100644 index 0000000000000000000000000000000000000000..0f6bf2a760e84c72f38778f58802653d26abe5f6 --- /dev/null +++ b/run_voice_bot_demo.py @@ -0,0 +1,131 @@ +""" +Automated Voice Bot Demo - Minimal test without UI overhead +Runs multi-turn conversation automatically +""" +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent)) + +import asyncio +import requests +import json +from typing import Dict, Any + +BASE_URL = "http://localhost:8000" + +class VoiceBotDemo: + """Automated voice bot conversation""" + + def __init__(self, customer_id: str = "CUST_001"): + self.customer_id = customer_id + self.session_id = None + self.turn = 0 + + def start_session(self) -> bool: + """Start voice bot session""" + try: + resp = requests.post( + f"{BASE_URL}/voice-bot/start", + json={"customer_id": self.customer_id}, + timeout=60 + ) + if resp.status_code == 200: + data = resp.json() + self.session_id = data.get("session_id") + greeting = data.get("greeting", "") + audio = data.get("audio_path", "") + print(f"โœ… Session started: {self.session_id}") + print(f" Bot: {greeting}") + if audio: + print(f" ๐Ÿ”Š Audio: {audio}") + return True + else: + print(f"โŒ Start failed: {resp.status_code}") + return False + except Exception as e: + print(f"โŒ Start error: {str(e)}") + return False + + def send_message(self, user_message: str) -> bool: + """Send message to bot""" + if not self.session_id: + print("โŒ No active session") + return False + + try: + self.turn += 1 + print(f"\n--- Turn {self.turn} ---") + print(f"You: {user_message}") + + resp = requests.post( + f"{BASE_URL}/voice-bot/message", + params={"user_message": user_message}, + timeout=120 + ) + + if resp.status_code == 200: + data = resp.json() + bot_response = data.get("response", "") + audio = data.get("audio_path", "") + sentiment = data.get("sentiment", "") + intent = data.get("intent", "") + + print(f"Bot: {bot_response}") + print(f"Intent: {intent} | Sentiment: {sentiment}") + if audio: + print(f"๐Ÿ”Š Audio: {audio}") + + status = data.get("status", "") + return status == "listening" + else: + print(f"โŒ Message failed: {resp.status_code}") + return False + except Exception as e: + print(f"โŒ Error: {str(e)}") + return False + + def end_session(self): + """End session""" + try: + resp = requests.post( + f"{BASE_URL}/voice-bot/end", + timeout=30 + ) + if resp.status_code == 200: + data = resp.json() + farewell = data.get("farewell", "") + print(f"\nโœ… Session ended") + print(f" {farewell}") + except Exception as e: + print(f"โš ๏ธ End error: {str(e)}") + +async def main(): + """Run demo conversation""" + print("\n" + "="*60) + print("๐Ÿค– VOICE BOT AUTOMATED DEMO") + print("="*60 + "\n") + + bot = VoiceBotDemo("CUST_001") + + # Start session + if not bot.start_session(): + return + + # Multi-turn conversation + messages = [ + "What is your return policy?", + "Can I return my laptop if it's defective?" + ] + + for msg in messages: + if not bot.send_message(msg): + break + + # End + bot.end_session() + print("\n" + "="*60) + print("โœ… DEMO COMPLETE") + print("="*60 + "\n") + +if __name__ == "__main__": + asyncio.run(main())