Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import logging | |
| import io | |
| import json | |
| import time | |
| import re | |
| import requests | |
| import threading | |
| import datetime | |
| import numpy as np | |
| import PyPDF2 | |
| import smtplib | |
| from email.message import EmailMessage | |
| from typing import List, Dict, Tuple, Optional, Any | |
| import soundfile as sf | |
| from dotenv import load_dotenv | |
| import gradio as gr | |
| from openai import OpenAI, AsyncOpenAI | |
| import cloudinary | |
| import cloudinary.uploader | |
| import cloudinary.api | |
| import cloudinary.exceptions | |
| from pydantic import BaseModel, validator | |
| import scipy | |
| import pandas as pd | |
| from PIL import Image | |
| import webbrowser | |
| import urllib.parse | |
| from bs4 import BeautifulSoup | |
| import chromadb | |
| from chromadb.config import Settings | |
| import openai | |
| import docx | |
| from docx.shared import Inches | |
| import tempfile | |
| import openpyxl | |
| import xlrd | |
| import csv | |
| import asyncio | |
| import aiohttp | |
| from functools import wraps | |
| import base64 | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| # Check if running on Hugging Face Spaces | |
| IS_HF_SPACES = 'SPACE_ID' in os.environ | |
| # Initialize OpenAI client | |
| openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| async_openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| # Initialize Cloudinary with proper configuration | |
| try: | |
| cloudinary.config( | |
| cloud_name=os.getenv("CLOUDINARY_CLOUD_NAME", ""), | |
| api_key=os.getenv("CLOUDINARY_API_KEY", ""), | |
| api_secret=os.getenv("CLOUDINARY_API_SECRET", ""), | |
| secure=True | |
| ) | |
| logger.info("Cloudinary configured successfully") | |
| except Exception as e: | |
| logger.error(f"Cloudinary configuration error: {e}") | |
| # Create a mock cloudinary config for development | |
| cloudinary.config(cloud_name="", api_key="", api_secret="", secure=True) | |
| # Initialize ChromaDB | |
| try: | |
| chroma_settings = Settings( | |
| is_persistent=True, | |
| persist_directory="./chroma_db", | |
| anonymized_telemetry=False | |
| ) | |
| chroma_client = chromadb.Client(chroma_settings) | |
| logger.info("ChromaDB initialized successfully") | |
| except Exception as e: | |
| logger.error(f"ChromaDB initialization error: {e}") | |
| chroma_client = None | |
| # Global variables for session management | |
| sessions = {} | |
| notes_data = {} # For storing notes data | |
| uploaded_files_cache = {} # Cache for uploaded files | |
| # Helper functions | |
| def chunk_text(text: str, size: int = 500) -> List[str]: | |
| """Split text into chunks of specified size""" | |
| if not text: | |
| return [] | |
| return [text[i:i + size] for i in range(0, len(text), size)] | |
| def embed_text(text: str) -> np.ndarray: | |
| """Generate embeddings for text""" | |
| try: | |
| resp = openai_client.embeddings.create(input=text, model="text-embedding-3-small") | |
| return np.array(resp.data[0].embedding, dtype=np.float32) | |
| except Exception as e: | |
| logger.error(f"Embedding error: {e}") | |
| return np.random.randn(1536).astype(np.float32) # Fallback | |
| def create_chroma_collection(session_id: str): | |
| """Create or get a ChromaDB collection for a session""" | |
| if not chroma_client: | |
| logger.warning("ChromaDB client not available") | |
| return None | |
| try: | |
| collection_name = f"session_{session_id.replace('-', '_')}" | |
| collection = chroma_client.get_or_create_collection( | |
| name=collection_name, | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| return collection | |
| except Exception as e: | |
| logger.error(f"Error creating ChromaDB collection: {e}") | |
| return None | |
| def add_chunks_to_collection(collection, chunks: List[str]): | |
| """Add text chunks to ChromaDB collection""" | |
| if not collection: | |
| return False | |
| try: | |
| if not chunks: | |
| return False | |
| documents = [] | |
| metadatas = [] | |
| ids = [] | |
| for i, chunk in enumerate(chunks): | |
| if chunk and chunk.strip(): | |
| documents.append(chunk) | |
| metadatas.append({"chunk_id": i, "timestamp": datetime.datetime.now().isoformat()}) | |
| ids.append(f"chunk_{i}_{uuid.uuid4().hex[:8]}") | |
| if not documents: | |
| return False | |
| # Generate embeddings in batches | |
| batch_size = 10 | |
| for i in range(0, len(documents), batch_size): | |
| batch_docs = documents[i:i+batch_size] | |
| batch_metadatas = metadatas[i:i+batch_size] | |
| batch_ids = ids[i:i+batch_size] | |
| try: | |
| embeddings = [embed_text(doc).tolist() for doc in batch_docs] | |
| collection.add( | |
| documents=batch_docs, | |
| metadatas=batch_metadatas, | |
| ids=batch_ids, | |
| embeddings=embeddings | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error adding batch to ChromaDB: {e}") | |
| continue | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error adding chunks to ChromaDB: {e}") | |
| return False | |
| def retrieve_chunks(query: str, collection, top_k: int = 3) -> str: | |
| """Retrieve relevant chunks from ChromaDB""" | |
| if not collection: | |
| return "" | |
| try: | |
| # Generate query embedding | |
| query_embedding = embed_text(query).tolist() | |
| # Search in collection | |
| results = collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=top_k | |
| ) | |
| # Extract and return relevant chunks | |
| if results.get('documents') and results['documents'][0]: | |
| return "\n".join(results['documents'][0]) | |
| return "" | |
| except Exception as e: | |
| logger.error(f"Error retrieving chunks from ChromaDB: {e}") | |
| return "" | |
| # Cloudinary file operations with better error handling | |
| def upload_to_cloudinary(file_content, filename, folder="interview_docs"): | |
| """Upload file to Cloudinary""" | |
| try: | |
| # Validate Cloudinary config | |
| if not all([os.getenv("CLOUDINARY_CLOUD_NAME"), | |
| os.getenv("CLOUDINARY_API_KEY"), | |
| os.getenv("CLOUDINARY_API_SECRET")]): | |
| logger.warning("Cloudinary credentials not configured") | |
| # Return a mock URL for development | |
| mock_url = f"https://mock.cloudinary.com/{folder}/{filename}_{uuid.uuid4().hex[:8]}" | |
| return mock_url, f"{folder}/{filename}_{uuid.uuid4().hex[:8]}" | |
| # Determine resource type based on file extension | |
| resource_type = "auto" # Let Cloudinary auto-detect | |
| file_extension = filename.lower().split('.')[-1] if '.' in filename else '' | |
| # Convert file_content to bytes if it's a string | |
| if isinstance(file_content, str): | |
| file_content = file_content.encode('utf-8') | |
| # Create a unique public ID | |
| unique_id = uuid.uuid4().hex[:8] | |
| public_id = f"{folder}/{filename.rsplit('.', 1)[0]}_{unique_id}" | |
| # Upload to Cloudinary | |
| upload_result = cloudinary.uploader.upload( | |
| file_content, | |
| resource_type=resource_type, | |
| public_id=public_id, | |
| folder=folder, | |
| overwrite=True, | |
| timeout=30 | |
| ) | |
| logger.info(f"Uploaded {filename} to Cloudinary") | |
| return upload_result["secure_url"], upload_result["public_id"] | |
| except cloudinary.exceptions.Error as e: | |
| logger.error(f"Cloudinary upload error: {e}") | |
| return None, None | |
| except Exception as e: | |
| logger.error(f"General upload error: {e}") | |
| return None, None | |
| def list_cloudinary_files(folder="interview_docs", max_results=50): | |
| """List files in Cloudinary with error handling""" | |
| try: | |
| # Check if Cloudinary is configured | |
| if not os.getenv("CLOUDINARY_CLOUD_NAME"): | |
| logger.warning("Cloudinary not configured, returning empty list") | |
| return [] | |
| result = cloudinary.api.resources( | |
| type="upload", | |
| prefix=folder, | |
| max_results=max_results, | |
| timeout=30 | |
| ) | |
| return result.get("resources", []) | |
| except cloudinary.exceptions.Error as e: | |
| logger.error(f"Cloudinary API error: {e}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Error listing Cloudinary files: {e}") | |
| return [] | |
| def get_cloudinary_file_info(public_id): | |
| """Get information about a specific file in Cloudinary""" | |
| try: | |
| # Check if Cloudinary is configured | |
| if not os.getenv("CLOUDINARY_CLOUD_NAME"): | |
| return None | |
| result = cloudinary.api.resource(public_id, timeout=30) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Cloudinary get info error: {e}") | |
| return None | |
| def download_from_cloudinary(public_id): | |
| """Download a file from Cloudinary""" | |
| try: | |
| # Check if Cloudinary is configured | |
| if not os.getenv("CLOUDINARY_CLOUD_NAME"): | |
| return None | |
| # Generate the URL | |
| url = cloudinary.utils.cloudinary_url(public_id)[0] | |
| # Download the file | |
| response = requests.get(url, timeout=30) | |
| if response.status_code == 200: | |
| return response.content | |
| return None | |
| except Exception as e: | |
| logger.error(f"Cloudinary download error: {e}") | |
| return None | |
| # Enhanced text extraction with better error handling | |
| def extract_text_from_cloudinary_file(public_id, filename): | |
| """Extract text from a file stored in Cloudinary""" | |
| try: | |
| # Check if this is a mock URL | |
| if "mock.cloudinary.com" in public_id: | |
| return f"Mock file: {filename} (Cloudinary not configured)" | |
| # Download the file from Cloudinary | |
| file_content = download_from_cloudinary(public_id) | |
| if not file_content: | |
| return f"Failed to download file {filename}" | |
| text = "" | |
| file_extension = filename.lower().split('.')[-1] if '.' in filename else '' | |
| # Extract text based on file type | |
| if file_extension == 'pdf': | |
| text = extract_text_from_pdf_bytes(file_content, filename) | |
| elif file_extension in ['doc', 'docx']: | |
| text = extract_text_from_docx_bytes(file_content, filename) | |
| elif file_extension in ['xls', 'xlsx', 'csv']: | |
| text = extract_text_from_spreadsheet_bytes(file_content, filename, file_extension) | |
| elif file_extension in ['txt', 'md', 'json', 'py', 'js', 'html', 'css']: | |
| try: | |
| text = file_content.decode('utf-8') | |
| except UnicodeDecodeError: | |
| try: | |
| text = file_content.decode('latin-1') | |
| except: | |
| text = str(file_content)[:1000] + "..." | |
| else: | |
| text = f"Unsupported file type: {filename}" | |
| return text if text else f"No text extracted from {filename}" | |
| except Exception as e: | |
| logger.error(f"Error extracting text from {filename}: {e}") | |
| return f"Error extracting text from {filename}: {str(e)[:200]}" | |
| def extract_text_from_pdf_bytes(pdf_bytes, filename): | |
| """Extract text from PDF bytes with multiple fallback methods""" | |
| text = "" | |
| # Method 1: Try PyPDF2 | |
| try: | |
| pdf_stream = io.BytesIO(pdf_bytes) | |
| reader = PyPDF2.PdfReader(pdf_stream, strict=False) | |
| for page_num, page in enumerate(reader.pages): | |
| try: | |
| page_text = page.extract_text() | |
| if page_text and page_text.strip(): | |
| text += page_text.strip() + "\n\n" | |
| except: | |
| continue | |
| if text.strip(): | |
| return text | |
| except Exception as e: | |
| logger.warning(f"PyPDF2 extraction failed: {e}") | |
| # Method 2: Try pdfplumber if available | |
| try: | |
| import pdfplumber | |
| pdf_stream = io.BytesIO(pdf_bytes) | |
| with pdfplumber.open(pdf_stream) as pdf: | |
| for page in pdf.pages: | |
| try: | |
| page_text = page.extract_text() | |
| if page_text and page_text.strip(): | |
| text += page_text.strip() + "\n\n" | |
| except: | |
| continue | |
| if text.strip(): | |
| return text | |
| except ImportError: | |
| logger.warning("pdfplumber not installed") | |
| except Exception as e: | |
| logger.warning(f"pdfplumber extraction failed: {e}") | |
| return text if text else f"Could not extract text from PDF: {filename}" | |
| def extract_text_from_docx_bytes(docx_bytes, filename): | |
| """Extract text from DOCX bytes""" | |
| try: | |
| doc_stream = io.BytesIO(docx_bytes) | |
| doc = docx.Document(doc_stream) | |
| text = "" | |
| for paragraph in doc.paragraphs: | |
| if paragraph.text.strip(): | |
| text += paragraph.text.strip() + "\n" | |
| return text if text else f"Empty document: {filename}" | |
| except Exception as e: | |
| logger.error(f"DOCX extraction error: {e}") | |
| return f"Error extracting from DOCX: {filename}" | |
| def extract_text_from_spreadsheet_bytes(file_bytes, filename, file_extension): | |
| """Extract text from spreadsheet files""" | |
| try: | |
| text = "" | |
| if file_extension in ['xlsx', 'xls']: | |
| try: | |
| import pandas as pd | |
| excel_stream = io.BytesIO(file_bytes) | |
| if file_extension == 'xlsx': | |
| xls = pd.ExcelFile(excel_stream, engine='openpyxl') | |
| else: | |
| xls = pd.ExcelFile(excel_stream, engine='xlrd') | |
| for sheet_name in xls.sheet_names: | |
| try: | |
| df = pd.read_excel(xls, sheet_name=sheet_name, header=None) | |
| text += f"=== Sheet: {sheet_name} ===\n" | |
| text += df.to_string(index=False, header=False) + "\n\n" | |
| except: | |
| continue | |
| except Exception as e: | |
| logger.warning(f"Pandas extraction failed: {e}") | |
| elif file_extension == 'csv': | |
| try: | |
| csv_str = file_bytes.decode('utf-8', errors='ignore') | |
| text = csv_str | |
| except: | |
| csv_str = file_bytes.decode('latin-1', errors='ignore') | |
| text = csv_str | |
| return text if text else f"Empty spreadsheet: {filename}" | |
| except Exception as e: | |
| logger.error(f"Spreadsheet extraction error: {e}") | |
| return f"Error extracting from spreadsheet: {filename}" | |
| # Enhanced Cloudinary CRUD operations for notes | |
| def create_note_docx(title: str, content: str): | |
| """Create a DOCX file for a note""" | |
| try: | |
| doc = docx.Document() | |
| doc.add_heading(title, 0) | |
| # Add content with proper formatting | |
| for paragraph in content.split('\n'): | |
| if paragraph.strip(): | |
| doc.add_paragraph(paragraph.strip()) | |
| # Save to a temporary file | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.docx', mode='wb') | |
| doc.save(temp_file.name) | |
| temp_file.close() | |
| return temp_file.name | |
| except Exception as e: | |
| logger.error(f"Error creating DOCX: {e}") | |
| return None | |
| def upload_note_to_cloudinary(title: str, content: str): | |
| """Create a note as DOCX and upload to Cloudinary""" | |
| try: | |
| # Create DOCX file | |
| docx_file = create_note_docx(title, content) | |
| if not docx_file: | |
| return None, None, "Failed to create DOCX file" | |
| # Read the file | |
| with open(docx_file, 'rb') as f: | |
| file_content = f.read() | |
| # Clean up temp file | |
| os.unlink(docx_file) | |
| # Generate a unique ID for the note | |
| note_id = str(uuid.uuid4()) | |
| filename = f"note_{note_id[:8]}_{title.replace(' ', '_')[:50]}.docx" | |
| # Upload to Cloudinary | |
| url, public_id = upload_to_cloudinary(file_content, filename, folder="notes") | |
| if not url or not public_id: | |
| return None, None, "Failed to upload to Cloudinary" | |
| # Store note metadata | |
| notes_data[note_id] = { | |
| "title": title, | |
| "content": content, | |
| "url": url, | |
| "public_id": public_id, | |
| "created_at": datetime.datetime.now().isoformat(), | |
| "filename": filename | |
| } | |
| return note_id, url, None | |
| except Exception as e: | |
| logger.error(f"Error uploading note to Cloudinary: {e}") | |
| return None, None, str(e) | |
| def get_note_from_cloudinary(note_id: str): | |
| """Get a note from Cloudinary""" | |
| try: | |
| if note_id not in notes_data: | |
| # Try to find by partial ID | |
| for id_key in notes_data: | |
| if note_id in id_key or id_key.startswith(note_id): | |
| note_id = id_key | |
| break | |
| if note_id not in notes_data: | |
| return None, "Note not found" | |
| note = notes_data[note_id] | |
| return note, None | |
| except Exception as e: | |
| logger.error(f"Error getting note: {e}") | |
| return None, str(e) | |
| def update_note_in_cloudinary(note_id: str, title: str = None, content: str = None): | |
| """Update a note in Cloudinary""" | |
| try: | |
| note, error = get_note_from_cloudinary(note_id) | |
| if error: | |
| return None, error | |
| # Update title and/or content | |
| if title: | |
| note["title"] = title | |
| if content: | |
| note["content"] = content | |
| # Create new DOCX | |
| docx_file = create_note_docx(note["title"], note["content"]) | |
| if not docx_file: | |
| return None, "Failed to create updated DOCX file" | |
| # Read and upload | |
| with open(docx_file, 'rb') as f: | |
| file_content = f.read() | |
| os.unlink(docx_file) | |
| # Upload new version | |
| filename = f"note_{note_id[:8]}_{note['title'].replace(' ', '_')[:50]}.docx" | |
| url, public_id = upload_to_cloudinary(file_content, filename, folder="notes") | |
| if not url: | |
| return None, "Failed to update in Cloudinary" | |
| # Update metadata | |
| note["url"] = url | |
| note["public_id"] = public_id | |
| note["updated_at"] = datetime.datetime.now().isoformat() | |
| note["filename"] = filename | |
| notes_data[note_id] = note | |
| return note["url"], None | |
| except Exception as e: | |
| logger.error(f"Error updating note: {e}") | |
| return None, str(e) | |
| def delete_note_from_cloudinary(note_id: str): | |
| """Delete a note from Cloudinary""" | |
| try: | |
| note, error = get_note_from_cloudinary(note_id) | |
| if error: | |
| return error | |
| # Try to delete from Cloudinary if configured | |
| if os.getenv("CLOUDINARY_CLOUD_NAME"): | |
| try: | |
| cloudinary.uploader.destroy(note["public_id"], resource_type="raw") | |
| except: | |
| pass # Continue even if Cloudinary delete fails | |
| # Remove from local data | |
| if note_id in notes_data: | |
| del notes_data[note_id] | |
| return "Note deleted successfully" | |
| except Exception as e: | |
| logger.error(f"Error deleting note: {e}") | |
| return str(e) | |
| def list_notes_from_cloudinary(): | |
| """List all notes with pagination""" | |
| try: | |
| if not notes_data: | |
| return pd.DataFrame(columns=["ID", "Title", "Created", "Updated", "Filename"]) | |
| notes_list = [] | |
| for note_id, note in notes_data.items(): | |
| notes_list.append({ | |
| "ID": note_id[:8] + "...", | |
| "Title": note["title"][:50] + ("..." if len(note["title"]) > 50 else ""), | |
| "Created": note["created_at"][:10], | |
| "Updated": note.get("updated_at", note["created_at"])[:10], | |
| "Filename": note.get("filename", "N/A")[:30] | |
| }) | |
| df = pd.DataFrame(notes_list) | |
| return df | |
| except Exception as e: | |
| logger.error(f"Error listing notes: {e}") | |
| return pd.DataFrame(columns=["Error"]) | |
| # Improved file upload processing | |
| def process_uploaded_files_to_cloudinary(files): | |
| """Upload files to Cloudinary and return their information""" | |
| if not files: | |
| return "Please upload at least one file.", None | |
| uploaded_files = [] | |
| for file in files: | |
| try: | |
| # Handle Gradio file object | |
| if hasattr(file, 'name'): | |
| file_name = os.path.basename(file.name) | |
| with open(file.name, 'rb') as f: | |
| file_content = f.read() | |
| else: | |
| # Handle other file types | |
| file_name = getattr(file, 'orig_name', 'unknown_file') | |
| file_content = getattr(file, 'read', lambda: file)() | |
| if callable(file_content): | |
| file_content = file_content() | |
| # Ensure file_content is bytes | |
| if isinstance(file_content, str): | |
| file_content = file_content.encode('utf-8', errors='ignore') | |
| # Check file size (10MB limit) | |
| if len(file_content) > 10 * 1024 * 1024: | |
| return f"File {file_name} is too large (>10MB)", None | |
| # Upload to Cloudinary | |
| logger.info(f"Uploading {file_name} to Cloudinary...") | |
| cloudinary_url, public_id = upload_to_cloudinary(file_content, file_name) | |
| if cloudinary_url and public_id: | |
| uploaded_files.append({ | |
| "name": file_name, | |
| "url": cloudinary_url, | |
| "public_id": public_id, | |
| "size": len(file_content) | |
| }) | |
| logger.info(f"Successfully uploaded {file_name}") | |
| else: | |
| return f"Failed to upload {file_name}", None | |
| except Exception as e: | |
| logger.error(f"Error processing file {file_name}: {e}") | |
| return f"Error processing {file_name}: {str(e)[:100]}", None | |
| return f"β Successfully uploaded {len(uploaded_files)} file(s) to Cloudinary", uploaded_files | |
| def get_cloudinary_files_for_selection(): | |
| """Get files from Cloudinary for selection in interview""" | |
| try: | |
| resources = list_cloudinary_files(folder="interview_docs", max_results=50) | |
| if not resources: | |
| return [], "No files found in Cloudinary. Upload files first." | |
| # Format for checkbox selection | |
| file_options = [] | |
| for resource in resources: | |
| display_name = resource["public_id"].split("/")[-1] | |
| file_options.append((f"{display_name} ({resource['format'] if 'format' in resource else 'file'})", resource["public_id"])) | |
| return file_options, None | |
| except Exception as e: | |
| logger.error(f"Error getting Cloudinary files: {e}") | |
| return [], f"Error getting Cloudinary files: {str(e)[:100]}" | |
| # Interview Operations with better session handling | |
| def start_interview_session(selected_files, mode, num_questions): | |
| """Start an interview session with selected files from Cloudinary""" | |
| if not selected_files: | |
| return "Please select at least one file.", None, None | |
| session_id = str(uuid.uuid4()) | |
| all_chunks = [] | |
| selected_file_info = [] | |
| # Clear previous cache | |
| if session_id in sessions: | |
| del sessions[session_id] | |
| for public_id in selected_files: | |
| try: | |
| filename = public_id.split("/")[-1] | |
| # Extract text from the file | |
| text = extract_text_from_cloudinary_file(public_id, filename) | |
| if text and not text.startswith("Error") and not text.startswith("Failed"): | |
| chunks = chunk_text(text) | |
| all_chunks.extend(chunks) | |
| selected_file_info.append({ | |
| "name": filename, | |
| "public_id": public_id, | |
| "text_preview": text[:200] + "..." if len(text) > 200 else text | |
| }) | |
| logger.info(f"Extracted {len(chunks)} chunks from {filename}") | |
| else: | |
| logger.warning(f"No text extracted from {filename}: {text[:100]}") | |
| except Exception as e: | |
| logger.error(f"Error processing file {public_id}: {e}") | |
| continue | |
| if not all_chunks: | |
| return "No text could be extracted from selected files. Please try different files.", None, None | |
| # Create ChromaDB collection for this session | |
| collection = create_chroma_collection(session_id) | |
| if collection: | |
| success = add_chunks_to_collection(collection, all_chunks) | |
| if not success: | |
| logger.warning("Failed to add chunks to ChromaDB collection") | |
| # Store session data | |
| sessions[session_id] = { | |
| "mode": mode, | |
| "questions_left": int(num_questions), | |
| "history": [], | |
| "chunks": all_chunks, | |
| "collection": collection, | |
| "files": selected_file_info, | |
| "start_time": datetime.datetime.now().isoformat(), | |
| "message_count": 0 | |
| } | |
| # Generate initial greeting | |
| initial_greeting = generate_interview_response(session_id, "Hello, let's start the interview.") | |
| return f"β Interview session started with {len(selected_files)} file(s). Session ID: {session_id[:8]}...", session_id, initial_greeting | |
| def generate_interview_response(session_id, message): | |
| """Generate response for interview session""" | |
| if session_id not in sessions: | |
| return "Session not found or expired. Please start a new interview session." | |
| session = sessions[session_id] | |
| # Update message count | |
| session["message_count"] += 1 | |
| # Retrieve context using ChromaDB | |
| context = retrieve_chunks(message, session.get("collection"), top_k=3) | |
| # Get recent history (last 3 exchanges) | |
| history_lines = [] | |
| for exchange in session.get("history", [])[-3:]: | |
| if isinstance(exchange, dict): | |
| history_lines.append(f"User: {exchange.get('user', '')}") | |
| history_lines.append(f"AI: {exchange.get('ai', '')}") | |
| elif isinstance(exchange, str): | |
| history_lines.append(exchange) | |
| history = "\n".join(history_lines) | |
| # Create system prompt | |
| system_prompt = f"""You are an AI interview simulator. Mode: {session['mode']} | |
| Context from uploaded files (for reference): | |
| {context[:1000]} | |
| Recent conversation: | |
| {history} | |
| You have {session['questions_left']} questions left in this session. | |
| Instructions: | |
| 1. If in 'interviewer' mode, ask insightful, relevant questions based on the context | |
| 2. If in 'participant' mode, answer professionally as a candidate | |
| 3. Keep responses concise and engaging | |
| 4. If questions reach 0, politely end the session""" | |
| # Generate response using OpenAI | |
| try: | |
| response = openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": message}, | |
| ], | |
| max_tokens=500, | |
| temperature=0.7 | |
| ) | |
| full_response = response.choices[0].message.content | |
| # Update session history | |
| session["history"].append({ | |
| "user": message, | |
| "ai": full_response, | |
| "timestamp": datetime.datetime.now().isoformat() | |
| }) | |
| if session["mode"] == "interviewer": | |
| session["questions_left"] -= 1 | |
| # Update session | |
| sessions[session_id] = session | |
| # Check if session should end | |
| if session["questions_left"] <= 0: | |
| full_response += "\n\nπ― **Interview completed!** Thank you for participating." | |
| return full_response | |
| except Exception as e: | |
| logger.error(f"Error generating response: {e}") | |
| return f"I apologize, but I encountered an error: {str(e)[:100]}" | |
| # Tool functions with better error handling | |
| def transcribe_audio(audio_file): | |
| """Transcribe audio using OpenAI Whisper""" | |
| try: | |
| if audio_file is None: | |
| return "No audio file provided" | |
| # Handle Gradio audio tuple (sample_rate, audio_data) | |
| if isinstance(audio_file, tuple): | |
| sample_rate, audio_data = audio_file | |
| # Convert to mono if stereo | |
| if len(audio_data.shape) > 1: | |
| audio_data = audio_data.mean(axis=1) | |
| # Save to temporary file | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav') | |
| sf.write(temp_file.name, audio_data, sample_rate) | |
| audio_path = temp_file.name | |
| else: | |
| audio_path = audio_file | |
| # Transcribe | |
| with open(audio_path, 'rb') as audio: | |
| transcript = openai_client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio | |
| ) | |
| # Clean up temp file | |
| if isinstance(audio_file, tuple): | |
| os.unlink(audio_path) | |
| return transcript.text | |
| except Exception as e: | |
| logger.error(f"Transcription error: {e}") | |
| return f"Transcription failed: {str(e)[:100]}" | |
| def text_to_speech(text): | |
| """Convert text to speech using OpenAI TTS""" | |
| try: | |
| if not text or len(text.strip()) == 0: | |
| return None, "No text provided" | |
| # Limit text length | |
| if len(text) > 1000: | |
| text = text[:1000] + "..." | |
| # Generate speech | |
| response = openai_client.audio.speech.create( | |
| model="tts-1", | |
| voice="alloy", | |
| input=text, | |
| speed=1.0 | |
| ) | |
| # Save to temporary file | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') | |
| temp_file.write(response.content) | |
| temp_file.close() | |
| return temp_file.name, None | |
| except Exception as e: | |
| logger.error(f"TTS error: {e}") | |
| return None, f"TTS failed: {str(e)[:100]}" | |
| # Unified Command Processor with better parsing | |
| def process_unified_command(command, audio_file=None): | |
| """Process all commands through a unified interface""" | |
| # If audio is provided, transcribe it first | |
| if audio_file is not None: | |
| transcribed_text = transcribe_audio(audio_file) | |
| if transcribed_text and not transcribed_text.startswith("Transcription failed"): | |
| command = transcribed_text | |
| logger.info(f"Transcribed command: {command}") | |
| else: | |
| return transcribed_text or "Failed to transcribe audio" | |
| if not command or len(command.strip()) == 0: | |
| return "Please enter a command or provide audio input" | |
| command_lower = command.lower().strip() | |
| # Interview commands | |
| if command_lower.startswith("interview"): | |
| return "Please use the Interview tab for interview operations. You can start by uploading files and creating a session." | |
| # Note commands | |
| elif command_lower.startswith(("read", "update", "delete", "list")): | |
| return process_note_command(command) | |
| # Tool commands | |
| elif any(keyword in command_lower for keyword in ["search", "google", "summarize", "weather", "news", "email", "url", "analyze", "tts", "speak"]): | |
| return process_tool_command(command) | |
| # Default: treat as general AI query | |
| else: | |
| try: | |
| response = openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful AI assistant. Respond concisely and accurately."}, | |
| {"role": "user", "content": command} | |
| ], | |
| max_tokens=500, | |
| temperature=0.7 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Error: {str(e)[:200]}" | |
| def process_tool_command(command): | |
| """Process tool commands""" | |
| command_lower = command.lower().strip() | |
| if "search" in command_lower or "google" in command_lower: | |
| query = re.sub(r'(search|google)\s+', '', command, flags=re.IGNORECASE).strip() | |
| if query: | |
| return search_web(query) | |
| return "Please provide a search query" | |
| elif "summarize" in command_lower: | |
| text = re.sub(r'summarize\s+', '', command, flags=re.IGNORECASE).strip() | |
| if text: | |
| return summarize_text(text) | |
| return "Please provide text to summarize" | |
| elif "weather" in command_lower: | |
| city_match = re.search(r'weather\s+(.+)', command_lower) | |
| city = city_match.group(1) if city_match else None | |
| return get_weather(city) | |
| elif "news" in command_lower: | |
| return get_news() | |
| elif "email" in command_lower: | |
| # Simple email pattern | |
| pattern = r'email\s+(\S+@\S+\.\S+)\s+"([^"]+)"\s+"([^"]+)"' | |
| match = re.search(pattern, command, re.IGNORECASE) | |
| if match: | |
| return send_email(match.group(2), match.group(3), match.group(1)) | |
| return "Usage: email recipient@example.com 'Subject' 'Message'" | |
| elif "url" in command_lower or "analyze" in command_lower: | |
| url_match = re.search(r'(?:url|analyze)\s+(\S+)', command, re.IGNORECASE) | |
| if url_match: | |
| result, _ = analyze_url(url_match.group(1)) | |
| return result | |
| return "Please provide a URL to analyze" | |
| elif "tts" in command_lower or "speak" in command_lower: | |
| text = re.sub(r'(tts|speak)\s+', '', command, flags=re.IGNORECASE).strip() | |
| if text: | |
| audio_file, error = text_to_speech(text) | |
| if error: | |
| return error | |
| return f"Speech generated: {audio_file}" | |
| return "Please provide text to convert to speech" | |
| return "Unknown command. Try: search, summarize, weather, news, email, url, or tts" | |
| def process_note_command(command): | |
| """Process note commands""" | |
| command_lower = command.lower().strip() | |
| if command_lower.startswith("read"): | |
| match = re.search(r'read\s+(\S+)', command, re.IGNORECASE) | |
| if match: | |
| note_id = match.group(1) | |
| note, error = get_note_from_cloudinary(note_id) | |
| if error: | |
| return error | |
| return f"π **{note['title']}**\n\n{note['content']}\n\nCreated: {note['created_at'][:10]}" | |
| return "Usage: read <note_id>" | |
| elif command_lower.startswith("update"): | |
| match = re.search(r'update\s+(\S+)\s+(.+)', command, re.IGNORECASE) | |
| if match: | |
| note_id = match.group(1) | |
| new_content = match.group(2) | |
| _, error = update_note_in_cloudinary(note_id, content=new_content) | |
| if error: | |
| return error | |
| return "Note updated successfully" | |
| return "Usage: update <note_id> <new_content>" | |
| elif command_lower.startswith("delete"): | |
| match = re.search(r'delete\s+(\S+)', command, re.IGNORECASE) | |
| if match: | |
| note_id = match.group(1) | |
| result = delete_note_from_cloudinary(note_id) | |
| return result | |
| return "Usage: delete <note_id>" | |
| elif command_lower.startswith("list"): | |
| df = list_notes_from_cloudinary() | |
| if isinstance(df, pd.DataFrame) and not df.empty: | |
| return df.to_string(index=False) | |
| return "No notes found" | |
| return "Unknown note command. Use: read, update, delete, list" | |
| # Create Gradio interface with fixed UI | |
| def create_interface(): | |
| # Custom CSS for better UI | |
| custom_css = """ | |
| .gradio-container { | |
| max-width: 1400px !important; | |
| margin: 0 auto !important; | |
| } | |
| .tab-nav { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; | |
| padding: 10px !important; | |
| border-radius: 10px !important; | |
| margin-bottom: 20px !important; | |
| } | |
| .tab-nav button { | |
| background: white !important; | |
| color: #333 !important; | |
| border: 2px solid transparent !important; | |
| margin: 0 5px !important; | |
| border-radius: 8px !important; | |
| padding: 10px 20px !important; | |
| font-weight: 600 !important; | |
| transition: all 0.3s ease !important; | |
| } | |
| .tab-nav button.selected { | |
| background: #4CAF50 !important; | |
| color: white !important; | |
| border-color: #45a049 !important; | |
| box-shadow: 0 4px 6px rgba(0,0,0,0.1) !important; | |
| } | |
| .btn-primary { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; | |
| color: white !important; | |
| border: none !important; | |
| border-radius: 8px !important; | |
| padding: 12px 24px !important; | |
| font-weight: 600 !important; | |
| margin: 5px !important; | |
| transition: all 0.3s ease !important; | |
| } | |
| .btn-primary:hover { | |
| transform: translateY(-2px) !important; | |
| box-shadow: 0 6px 12px rgba(0,0,0,0.2) !important; | |
| } | |
| .btn-secondary { | |
| background: #6c757d !important; | |
| color: white !important; | |
| border: none !important; | |
| border-radius: 8px !important; | |
| padding: 10px 20px !important; | |
| } | |
| .textbox textarea { | |
| border-radius: 10px !important; | |
| border: 2px solid #e0e0e0 !important; | |
| padding: 15px !important; | |
| font-size: 14px !important; | |
| } | |
| .panel { | |
| border-radius: 15px !important; | |
| box-shadow: 0 10px 30px rgba(0,0,0,0.1) !important; | |
| padding: 20px !important; | |
| margin: 10px 0 !important; | |
| background: white !important; | |
| border: 1px solid #e0e0e0 !important; | |
| } | |
| h1, h2, h3 { | |
| color: #333 !important; | |
| margin-bottom: 20px !important; | |
| } | |
| .chatbot { | |
| min-height: 400px !important; | |
| max-height: 500px !important; | |
| overflow-y: auto !important; | |
| border-radius: 10px !important; | |
| border: 2px solid #e0e0e0 !important; | |
| } | |
| .dataframe { | |
| width: 100% !important; | |
| border-collapse: collapse !important; | |
| } | |
| .dataframe th { | |
| background: #667eea !important; | |
| color: white !important; | |
| padding: 12px !important; | |
| } | |
| .dataframe td { | |
| padding: 10px !important; | |
| border-bottom: 1px solid #e0e0e0 !important; | |
| } | |
| """ | |
| with gr.Blocks(title="AI Unified Command Platform", theme=gr.themes.Soft(), css=custom_css) as app: | |
| gr.Markdown(""" | |
| # π€ AI Unified Command Platform | |
| ### All features accessible through a single command interface | |
| """) | |
| with gr.Tabs() as tabs: | |
| # Unified Command Interface Tab | |
| with gr.TabItem("π¬ Unified Commands", id="unified"): | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| with gr.Group(): | |
| gr.Markdown("### π¬ Command Interface") | |
| command_input = gr.Textbox( | |
| label="Enter Command", | |
| placeholder="Type your command here...\nExamples:\n- search latest AI news\n- summarize this text\n- weather New York\n- news\n- tts hello world", | |
| lines=3 | |
| ) | |
| with gr.Row(): | |
| command_btn = gr.Button("π Execute Command", variant="primary", size="lg") | |
| clear_btn = gr.Button("ποΈ Clear", variant="secondary") | |
| gr.Markdown("---") | |
| gr.Markdown("### π€ Voice Input") | |
| audio_input = gr.Audio( | |
| label="Speak your command", | |
| type="filepath", | |
| sources=["microphone"] | |
| ) | |
| transcribe_btn = gr.Button("ποΈ Transcribe & Execute", variant="primary") | |
| with gr.Group(): | |
| gr.Markdown("### π Quick Note") | |
| with gr.Row(): | |
| quick_note_title = gr.Textbox(label="Title", placeholder="Note title...", scale=2) | |
| quick_note_btn = gr.Button("πΎ Save Quick Note", variant="primary", scale=1) | |
| quick_note_content = gr.Textbox(label="Content", placeholder="Note content...", lines=3) | |
| quick_note_status = gr.Textbox(label="Status", interactive=False) | |
| with gr.Column(scale=2): | |
| with gr.Group(): | |
| gr.Markdown("### π Response") | |
| command_output = gr.Textbox( | |
| label="Response", | |
| interactive=False, | |
| lines=20, | |
| elem_classes="panel" | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("### π Speech Output") | |
| tts_output = gr.Audio(label="Generated Speech", interactive=False) | |
| # Event handlers for Unified Commands tab | |
| def execute_command_wrapper(command, audio_path): | |
| return process_unified_command(command, audio_path) | |
| command_btn.click( | |
| fn=execute_command_wrapper, | |
| inputs=[command_input, audio_input], | |
| outputs=[command_output] | |
| ).then( | |
| lambda: (gr.update(value=""), gr.update(value=None)), | |
| outputs=[command_input, audio_input] | |
| ) | |
| transcribe_btn.click( | |
| fn=lambda audio: process_unified_command(None, audio), | |
| inputs=[audio_input], | |
| outputs=[command_output] | |
| ).then( | |
| lambda: gr.update(value=None), | |
| outputs=[audio_input] | |
| ) | |
| clear_btn.click( | |
| lambda: ("", "", None, ""), | |
| outputs=[command_input, command_output, audio_input, quick_note_status] | |
| ) | |
| def save_quick_note(title, content): | |
| if not title.strip() or not content.strip(): | |
| return "Please provide both title and content" | |
| note_id, url, error = upload_note_to_cloudinary(title, content) | |
| if error: | |
| return f"Error: {error}" | |
| return f"β Note saved! ID: {note_id[:8]}" | |
| quick_note_btn.click( | |
| fn=save_quick_note, | |
| inputs=[quick_note_title, quick_note_content], | |
| outputs=[quick_note_status] | |
| ) | |
| # File Management Tab | |
| with gr.TabItem("π File Management", id="files"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| gr.Markdown("### π€ Upload Files") | |
| file_upload = gr.File( | |
| file_count="multiple", | |
| file_types=[".pdf", ".doc", ".docx", ".txt", ".md", ".csv", ".xlsx", ".xls"], | |
| label="Select files to upload (max 10MB each)" | |
| ) | |
| upload_btn = gr.Button("π€ Upload to Cloudinary", variant="primary") | |
| upload_status = gr.Textbox(label="Upload Status", interactive=False) | |
| with gr.Column(scale=2): | |
| with gr.Group(): | |
| gr.Markdown("### π Uploaded Files") | |
| with gr.Row(): | |
| refresh_files_btn = gr.Button("π Refresh List", variant="secondary") | |
| clear_files_btn = gr.Button("ποΈ Clear All", variant="secondary") | |
| files_display = gr.Dataframe( | |
| label="Files in Cloudinary", | |
| headers=["Name", "Type", "Size", "Uploaded"], | |
| interactive=False, | |
| wrap=True | |
| ) | |
| # File management event handlers | |
| def upload_files_handler(files): | |
| if not files: | |
| return "Please select files to upload", pd.DataFrame() | |
| status, uploaded = process_uploaded_files_to_cloudinary(files) | |
| if uploaded: | |
| # Update cache | |
| for file_info in uploaded: | |
| uploaded_files_cache[file_info['public_id']] = file_info | |
| # Format for display | |
| display_data = [] | |
| for file_info in uploaded: | |
| display_data.append([ | |
| file_info['name'][:30], | |
| file_info['name'].split('.')[-1] if '.' in file_info['name'] else 'file', | |
| f"{file_info['size'] // 1024} KB", | |
| "Just now" | |
| ]) | |
| return status, pd.DataFrame(display_data, columns=["Name", "Type", "Size", "Uploaded"]) | |
| else: | |
| return status, pd.DataFrame() | |
| def refresh_files_handler(): | |
| try: | |
| resources = list_cloudinary_files(folder="interview_docs", max_results=50) | |
| if not resources: | |
| return pd.DataFrame(columns=["Name", "Type", "Size", "Uploaded"]) | |
| display_data = [] | |
| for resource in resources: | |
| filename = resource["public_id"].split("/")[-1] | |
| filetype = resource.get("format", filename.split('.')[-1] if '.' in filename else 'file') | |
| size = resource.get("bytes", 0) | |
| uploaded = resource.get("created_at", "")[:10] | |
| display_data.append([ | |
| filename[:30], | |
| filetype, | |
| f"{size // 1024} KB" if size > 0 else "N/A", | |
| uploaded | |
| ]) | |
| return pd.DataFrame(display_data, columns=["Name", "Type", "Size", "Uploaded"]) | |
| except Exception as e: | |
| logger.error(f"Refresh error: {e}") | |
| return pd.DataFrame(columns=["Name", "Type", "Size", "Uploaded"]) | |
| upload_btn.click( | |
| fn=upload_files_handler, | |
| inputs=[file_upload], | |
| outputs=[upload_status, files_display] | |
| ) | |
| refresh_files_btn.click( | |
| fn=refresh_files_handler, | |
| outputs=[files_display] | |
| ) | |
| clear_files_btn.click( | |
| lambda: (pd.DataFrame(columns=["Name", "Type", "Size", "Uploaded"]), "List cleared"), | |
| outputs=[files_display, upload_status] | |
| ) | |
| # Interview Tab | |
| with gr.TabItem("π Interview Mode", id="interview"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| gr.Markdown("### π― Interview Setup") | |
| with gr.Row(): | |
| mode_select = gr.Radio( | |
| choices=["interviewer", "participant"], | |
| value="interviewer", | |
| label="Role", | |
| info="Select your role in the interview" | |
| ) | |
| num_questions = gr.Number( | |
| value=5, | |
| label="Number of Questions", | |
| minimum=1, | |
| maximum=20, | |
| step=1 | |
| ) | |
| gr.Markdown("### π Select Files") | |
| with gr.Row(): | |
| refresh_interview_files_btn = gr.Button("π Refresh Files", variant="secondary", size="sm") | |
| file_selection = gr.CheckboxGroup( | |
| label="Available Files", | |
| choices=[], | |
| info="Select files to use as interview context" | |
| ) | |
| start_interview_btn = gr.Button("π Start Interview Session", variant="primary") | |
| interview_status = gr.Textbox(label="Status", interactive=False) | |
| session_display = gr.Textbox(label="Session ID", interactive=False) | |
| with gr.Column(scale=2): | |
| with gr.Group(): | |
| gr.Markdown("### π¬ Interview Chat") | |
| interview_chatbot = gr.Chatbot( | |
| label="Conversation", | |
| height=400, | |
| type="messages", | |
| avatar_images=(None, "https://api.dicebear.com/7.x/bottts/svg?seed=AI") | |
| ) | |
| with gr.Row(): | |
| interview_msg = gr.Textbox( | |
| label="Your message", | |
| placeholder="Type your message here...", | |
| scale=4 | |
| ) | |
| send_msg_btn = gr.Button("π€ Send", variant="primary", scale=1) | |
| with gr.Row(): | |
| interview_audio = gr.Audio( | |
| label="π€ Speak your message", | |
| sources=["microphone"], | |
| type="filepath" | |
| ) | |
| transcribe_interview_btn = gr.Button("ποΈ Transcribe", variant="secondary") | |
| tts_interview_btn = gr.Button("π Speak Response", variant="secondary") | |
| interview_audio_output = gr.Audio(label="AI Speech", interactive=False) | |
| # Interview event handlers | |
| def refresh_interview_files(): | |
| file_options, error = get_cloudinary_files_for_selection() | |
| if error: | |
| return gr.CheckboxGroup(choices=[], label=f"Error: {error[:100]}") | |
| return gr.CheckboxGroup(choices=[opt[1] for opt in file_options], label=f"Available Files ({len(file_options)} found)") | |
| def start_interview_handler(selected_files, mode, q_count): | |
| if not selected_files: | |
| return "Please select at least one file", "", None | |
| status, session_id, greeting = start_interview_session(selected_files, mode, q_count) | |
| if session_id and greeting: | |
| chat_history = [("AI", greeting)] | |
| return status, session_id, chat_history | |
| else: | |
| return status, "", None | |
| def send_interview_message(message, history, session_id): | |
| if not session_id: | |
| return "", history + [("User", message), ("AI", "Please start an interview session first")] | |
| response = generate_interview_response(session_id, message) | |
| return "", history + [("User", message), ("AI", response)] | |
| refresh_interview_files_btn.click( | |
| fn=refresh_interview_files, | |
| outputs=[file_selection] | |
| ) | |
| start_interview_btn.click( | |
| fn=start_interview_handler, | |
| inputs=[file_selection, mode_select, num_questions], | |
| outputs=[interview_status, session_display, interview_chatbot] | |
| ) | |
| send_msg_btn.click( | |
| fn=send_interview_message, | |
| inputs=[interview_msg, interview_chatbot, session_display], | |
| outputs=[interview_msg, interview_chatbot] | |
| ) | |
| interview_msg.submit( | |
| fn=send_interview_message, | |
| inputs=[interview_msg, interview_chatbot, session_display], | |
| outputs=[interview_msg, interview_chatbot] | |
| ) | |
| def transcribe_interview_audio(audio_path, history, session_id): | |
| if not audio_path: | |
| return "", history | |
| transcribed = transcribe_audio(audio_path) | |
| if transcribed and not transcribed.startswith("Transcription failed"): | |
| response = generate_interview_response(session_id, transcribed) if session_id else "Please start a session first" | |
| return "", history + [("User", f"[Audio] {transcribed}"), ("AI", response)] | |
| return "", history | |
| transcribe_interview_btn.click( | |
| fn=transcribe_interview_audio, | |
| inputs=[interview_audio, interview_chatbot, session_display], | |
| outputs=[interview_msg, interview_chatbot] | |
| ) | |
| def tts_last_response(history): | |
| if not history or len(history) == 0: | |
| return None, "No conversation history" | |
| last_ai = None | |
| for role, msg in reversed(history): | |
| if role == "AI": | |
| last_ai = msg | |
| break | |
| if not last_ai: | |
| return None, "No AI response found" | |
| audio_file, error = text_to_speech(last_ai[:500]) # Limit length | |
| if error: | |
| return None, error | |
| return audio_file, "" | |
| tts_interview_btn.click( | |
| fn=tts_last_response, | |
| inputs=[interview_chatbot], | |
| outputs=[interview_audio_output, gr.Textbox(visible=False)] | |
| ) | |
| # Notes Tab | |
| with gr.TabItem("π Notes Management", id="notes"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| gr.Markdown("### β¨ Create New Note") | |
| new_note_title = gr.Textbox(label="Note Title", placeholder="Enter note title...") | |
| new_note_content = gr.Textbox(label="Note Content", placeholder="Enter note content...", lines=5) | |
| create_note_btn = gr.Button("πΎ Create Note", variant="primary") | |
| create_status = gr.Textbox(label="Status", interactive=False) | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| gr.Markdown("### π Note Operations") | |
| note_op_id = gr.Textbox(label="Note ID", placeholder="Enter note ID (or partial)...") | |
| with gr.Row(): | |
| read_note_btn = gr.Button("π Read", variant="secondary") | |
| update_note_btn = gr.Button("βοΈ Update", variant="secondary") | |
| delete_note_btn = gr.Button("ποΈ Delete", variant="secondary", size="sm") | |
| note_content_display = gr.Textbox(label="Note Content", interactive=False, lines=10) | |
| with gr.Row(): | |
| list_notes_btn = gr.Button("π List All Notes", variant="primary") | |
| clear_notes_btn = gr.Button("ποΈ Clear", variant="secondary") | |
| notes_list_display = gr.Dataframe( | |
| label="All Notes", | |
| headers=["ID", "Title", "Created", "Updated"], | |
| interactive=False, | |
| wrap=True | |
| ) | |
| # Notes event handlers | |
| def create_note_handler(title, content): | |
| if not title.strip() or not content.strip(): | |
| return "Please provide both title and content", "" | |
| note_id, url, error = upload_note_to_cloudinary(title, content) | |
| if error: | |
| return f"Error: {error}", "" | |
| return f"β Note created! ID: {note_id}", note_id[:8] | |
| def read_note_handler(note_id): | |
| if not note_id.strip(): | |
| return "Please provide a note ID", pd.DataFrame() | |
| note, error = get_note_from_cloudinary(note_id) | |
| if error: | |
| return f"Error: {error}", pd.DataFrame() | |
| content = f"π **{note['title']}**\n\n{note['content']}\n\n---\nCreated: {note['created_at'][:19]}" | |
| if 'updated_at' in note: | |
| content += f"\nUpdated: {note['updated_at'][:19]}" | |
| return content, pd.DataFrame() | |
| def update_note_handler(note_id): | |
| if not note_id.strip(): | |
| return "Please provide a note ID", pd.DataFrame() | |
| # For demo, just add an update timestamp | |
| note, error = get_note_from_cloudinary(note_id) | |
| if error: | |
| return f"Error: {error}", pd.DataFrame() | |
| new_content = f"{note['content']}\n\n[Updated at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]" | |
| _, update_error = update_note_in_cloudinary(note_id, content=new_content) | |
| if update_error: | |
| return f"Update error: {update_error}", pd.DataFrame() | |
| return "β Note updated successfully", pd.DataFrame() | |
| def delete_note_handler(note_id): | |
| if not note_id.strip(): | |
| return "Please provide a note ID", pd.DataFrame() | |
| result = delete_note_from_cloudinary(note_id) | |
| return result, pd.DataFrame() | |
| def list_notes_handler(): | |
| df = list_notes_from_cloudinary() | |
| if isinstance(df, pd.DataFrame) and not df.empty: | |
| return "", df | |
| return "No notes found", pd.DataFrame() | |
| create_note_btn.click( | |
| fn=create_note_handler, | |
| inputs=[new_note_title, new_note_content], | |
| outputs=[create_status, note_op_id] | |
| ) | |
| read_note_btn.click( | |
| fn=read_note_handler, | |
| inputs=[note_op_id], | |
| outputs=[note_content_display, notes_list_display] | |
| ) | |
| update_note_btn.click( | |
| fn=update_note_handler, | |
| inputs=[note_op_id], | |
| outputs=[note_content_display, notes_list_display] | |
| ) | |
| delete_note_btn.click( | |
| fn=delete_note_handler, | |
| inputs=[note_op_id], | |
| outputs=[note_content_display, notes_list_display] | |
| ) | |
| list_notes_btn.click( | |
| fn=list_notes_handler, | |
| outputs=[note_content_display, notes_list_display] | |
| ) | |
| clear_notes_btn.click( | |
| lambda: ("", pd.DataFrame(), "", ""), | |
| outputs=[note_content_display, notes_list_display, note_op_id, create_status] | |
| ) | |
| # Footer | |
| gr.Markdown(""" | |
| --- | |
| <div style="text-align: center; padding: 20px; background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); border-radius: 10px;"> | |
| <h3>π AI Unified Command Platform</h3> | |
| <p>Powered by OpenAI, Cloudinary, and ChromaDB</p> | |
| <p><strong>How to use:</strong></p> | |
| <ol style="text-align: left; display: inline-block;"> | |
| <li>Upload files using the File Management tab</li> | |
| <li>Start interviews with files from Cloudinary</li> | |
| <li>Type or speak commands in the Unified Commands tab</li> | |
| <li>Manage notes in the Notes tab</li> | |
| </ol> | |
| </div> | |
| """) | |
| return app | |
| # Launch the application | |
| if __name__ == "__main__": | |
| # Create necessary directories | |
| os.makedirs("./chroma_db", exist_ok=True) | |
| # Create the interface | |
| app = create_interface() | |
| # Launch with appropriate settings | |
| launch_config = { | |
| "server_name": "0.0.0.0", | |
| "server_port": 7860, | |
| "share": False if IS_HF_SPACES else True | |
| } | |
| print("π Starting AI Unified Command Platform...") | |
| print(f"π ChromaDB directory: ./chroma_db") | |
| print(f"π Cloudinary configured: {bool(os.getenv('CLOUDINARY_CLOUD_NAME'))}") | |
| print(f"π€ OpenAI configured: {bool(os.getenv('OPENAI_API_KEY'))}") | |
| app.launch(**launch_config) |