JARVIS / app.py
Siddu2004-2006's picture
Update app.py
ed8e33c verified
import os
import uuid
import logging
import io
import json
import time
import re
import requests
import threading
import datetime
import numpy as np
import PyPDF2
import smtplib
from email.message import EmailMessage
from typing import List, Dict, Tuple, Optional, Any
import soundfile as sf
from dotenv import load_dotenv
import gradio as gr
from openai import OpenAI, AsyncOpenAI
import cloudinary
import cloudinary.uploader
import cloudinary.api
import cloudinary.exceptions
from pydantic import BaseModel, validator
import scipy
import pandas as pd
from PIL import Image
import webbrowser
import urllib.parse
from bs4 import BeautifulSoup
import chromadb
from chromadb.config import Settings
import openai
import docx
from docx.shared import Inches
import tempfile
import openpyxl
import xlrd
import csv
import asyncio
import aiohttp
from functools import wraps
import base64
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Check if running on Hugging Face Spaces
IS_HF_SPACES = 'SPACE_ID' in os.environ
# Initialize OpenAI client
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
async_openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Initialize Cloudinary with proper configuration
try:
cloudinary.config(
cloud_name=os.getenv("CLOUDINARY_CLOUD_NAME", ""),
api_key=os.getenv("CLOUDINARY_API_KEY", ""),
api_secret=os.getenv("CLOUDINARY_API_SECRET", ""),
secure=True
)
logger.info("Cloudinary configured successfully")
except Exception as e:
logger.error(f"Cloudinary configuration error: {e}")
# Create a mock cloudinary config for development
cloudinary.config(cloud_name="", api_key="", api_secret="", secure=True)
# Initialize ChromaDB
try:
chroma_settings = Settings(
is_persistent=True,
persist_directory="./chroma_db",
anonymized_telemetry=False
)
chroma_client = chromadb.Client(chroma_settings)
logger.info("ChromaDB initialized successfully")
except Exception as e:
logger.error(f"ChromaDB initialization error: {e}")
chroma_client = None
# Global variables for session management
sessions = {}
notes_data = {} # For storing notes data
uploaded_files_cache = {} # Cache for uploaded files
# Helper functions
def chunk_text(text: str, size: int = 500) -> List[str]:
"""Split text into chunks of specified size"""
if not text:
return []
return [text[i:i + size] for i in range(0, len(text), size)]
def embed_text(text: str) -> np.ndarray:
"""Generate embeddings for text"""
try:
resp = openai_client.embeddings.create(input=text, model="text-embedding-3-small")
return np.array(resp.data[0].embedding, dtype=np.float32)
except Exception as e:
logger.error(f"Embedding error: {e}")
return np.random.randn(1536).astype(np.float32) # Fallback
def create_chroma_collection(session_id: str):
"""Create or get a ChromaDB collection for a session"""
if not chroma_client:
logger.warning("ChromaDB client not available")
return None
try:
collection_name = f"session_{session_id.replace('-', '_')}"
collection = chroma_client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
return collection
except Exception as e:
logger.error(f"Error creating ChromaDB collection: {e}")
return None
def add_chunks_to_collection(collection, chunks: List[str]):
"""Add text chunks to ChromaDB collection"""
if not collection:
return False
try:
if not chunks:
return False
documents = []
metadatas = []
ids = []
for i, chunk in enumerate(chunks):
if chunk and chunk.strip():
documents.append(chunk)
metadatas.append({"chunk_id": i, "timestamp": datetime.datetime.now().isoformat()})
ids.append(f"chunk_{i}_{uuid.uuid4().hex[:8]}")
if not documents:
return False
# Generate embeddings in batches
batch_size = 10
for i in range(0, len(documents), batch_size):
batch_docs = documents[i:i+batch_size]
batch_metadatas = metadatas[i:i+batch_size]
batch_ids = ids[i:i+batch_size]
try:
embeddings = [embed_text(doc).tolist() for doc in batch_docs]
collection.add(
documents=batch_docs,
metadatas=batch_metadatas,
ids=batch_ids,
embeddings=embeddings
)
except Exception as e:
logger.error(f"Error adding batch to ChromaDB: {e}")
continue
return True
except Exception as e:
logger.error(f"Error adding chunks to ChromaDB: {e}")
return False
def retrieve_chunks(query: str, collection, top_k: int = 3) -> str:
"""Retrieve relevant chunks from ChromaDB"""
if not collection:
return ""
try:
# Generate query embedding
query_embedding = embed_text(query).tolist()
# Search in collection
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_k
)
# Extract and return relevant chunks
if results.get('documents') and results['documents'][0]:
return "\n".join(results['documents'][0])
return ""
except Exception as e:
logger.error(f"Error retrieving chunks from ChromaDB: {e}")
return ""
# Cloudinary file operations with better error handling
def upload_to_cloudinary(file_content, filename, folder="interview_docs"):
"""Upload file to Cloudinary"""
try:
# Validate Cloudinary config
if not all([os.getenv("CLOUDINARY_CLOUD_NAME"),
os.getenv("CLOUDINARY_API_KEY"),
os.getenv("CLOUDINARY_API_SECRET")]):
logger.warning("Cloudinary credentials not configured")
# Return a mock URL for development
mock_url = f"https://mock.cloudinary.com/{folder}/{filename}_{uuid.uuid4().hex[:8]}"
return mock_url, f"{folder}/{filename}_{uuid.uuid4().hex[:8]}"
# Determine resource type based on file extension
resource_type = "auto" # Let Cloudinary auto-detect
file_extension = filename.lower().split('.')[-1] if '.' in filename else ''
# Convert file_content to bytes if it's a string
if isinstance(file_content, str):
file_content = file_content.encode('utf-8')
# Create a unique public ID
unique_id = uuid.uuid4().hex[:8]
public_id = f"{folder}/{filename.rsplit('.', 1)[0]}_{unique_id}"
# Upload to Cloudinary
upload_result = cloudinary.uploader.upload(
file_content,
resource_type=resource_type,
public_id=public_id,
folder=folder,
overwrite=True,
timeout=30
)
logger.info(f"Uploaded {filename} to Cloudinary")
return upload_result["secure_url"], upload_result["public_id"]
except cloudinary.exceptions.Error as e:
logger.error(f"Cloudinary upload error: {e}")
return None, None
except Exception as e:
logger.error(f"General upload error: {e}")
return None, None
def list_cloudinary_files(folder="interview_docs", max_results=50):
"""List files in Cloudinary with error handling"""
try:
# Check if Cloudinary is configured
if not os.getenv("CLOUDINARY_CLOUD_NAME"):
logger.warning("Cloudinary not configured, returning empty list")
return []
result = cloudinary.api.resources(
type="upload",
prefix=folder,
max_results=max_results,
timeout=30
)
return result.get("resources", [])
except cloudinary.exceptions.Error as e:
logger.error(f"Cloudinary API error: {e}")
return []
except Exception as e:
logger.error(f"Error listing Cloudinary files: {e}")
return []
def get_cloudinary_file_info(public_id):
"""Get information about a specific file in Cloudinary"""
try:
# Check if Cloudinary is configured
if not os.getenv("CLOUDINARY_CLOUD_NAME"):
return None
result = cloudinary.api.resource(public_id, timeout=30)
return result
except Exception as e:
logger.error(f"Cloudinary get info error: {e}")
return None
def download_from_cloudinary(public_id):
"""Download a file from Cloudinary"""
try:
# Check if Cloudinary is configured
if not os.getenv("CLOUDINARY_CLOUD_NAME"):
return None
# Generate the URL
url = cloudinary.utils.cloudinary_url(public_id)[0]
# Download the file
response = requests.get(url, timeout=30)
if response.status_code == 200:
return response.content
return None
except Exception as e:
logger.error(f"Cloudinary download error: {e}")
return None
# Enhanced text extraction with better error handling
def extract_text_from_cloudinary_file(public_id, filename):
"""Extract text from a file stored in Cloudinary"""
try:
# Check if this is a mock URL
if "mock.cloudinary.com" in public_id:
return f"Mock file: {filename} (Cloudinary not configured)"
# Download the file from Cloudinary
file_content = download_from_cloudinary(public_id)
if not file_content:
return f"Failed to download file {filename}"
text = ""
file_extension = filename.lower().split('.')[-1] if '.' in filename else ''
# Extract text based on file type
if file_extension == 'pdf':
text = extract_text_from_pdf_bytes(file_content, filename)
elif file_extension in ['doc', 'docx']:
text = extract_text_from_docx_bytes(file_content, filename)
elif file_extension in ['xls', 'xlsx', 'csv']:
text = extract_text_from_spreadsheet_bytes(file_content, filename, file_extension)
elif file_extension in ['txt', 'md', 'json', 'py', 'js', 'html', 'css']:
try:
text = file_content.decode('utf-8')
except UnicodeDecodeError:
try:
text = file_content.decode('latin-1')
except:
text = str(file_content)[:1000] + "..."
else:
text = f"Unsupported file type: {filename}"
return text if text else f"No text extracted from {filename}"
except Exception as e:
logger.error(f"Error extracting text from {filename}: {e}")
return f"Error extracting text from {filename}: {str(e)[:200]}"
def extract_text_from_pdf_bytes(pdf_bytes, filename):
"""Extract text from PDF bytes with multiple fallback methods"""
text = ""
# Method 1: Try PyPDF2
try:
pdf_stream = io.BytesIO(pdf_bytes)
reader = PyPDF2.PdfReader(pdf_stream, strict=False)
for page_num, page in enumerate(reader.pages):
try:
page_text = page.extract_text()
if page_text and page_text.strip():
text += page_text.strip() + "\n\n"
except:
continue
if text.strip():
return text
except Exception as e:
logger.warning(f"PyPDF2 extraction failed: {e}")
# Method 2: Try pdfplumber if available
try:
import pdfplumber
pdf_stream = io.BytesIO(pdf_bytes)
with pdfplumber.open(pdf_stream) as pdf:
for page in pdf.pages:
try:
page_text = page.extract_text()
if page_text and page_text.strip():
text += page_text.strip() + "\n\n"
except:
continue
if text.strip():
return text
except ImportError:
logger.warning("pdfplumber not installed")
except Exception as e:
logger.warning(f"pdfplumber extraction failed: {e}")
return text if text else f"Could not extract text from PDF: {filename}"
def extract_text_from_docx_bytes(docx_bytes, filename):
"""Extract text from DOCX bytes"""
try:
doc_stream = io.BytesIO(docx_bytes)
doc = docx.Document(doc_stream)
text = ""
for paragraph in doc.paragraphs:
if paragraph.text.strip():
text += paragraph.text.strip() + "\n"
return text if text else f"Empty document: {filename}"
except Exception as e:
logger.error(f"DOCX extraction error: {e}")
return f"Error extracting from DOCX: {filename}"
def extract_text_from_spreadsheet_bytes(file_bytes, filename, file_extension):
"""Extract text from spreadsheet files"""
try:
text = ""
if file_extension in ['xlsx', 'xls']:
try:
import pandas as pd
excel_stream = io.BytesIO(file_bytes)
if file_extension == 'xlsx':
xls = pd.ExcelFile(excel_stream, engine='openpyxl')
else:
xls = pd.ExcelFile(excel_stream, engine='xlrd')
for sheet_name in xls.sheet_names:
try:
df = pd.read_excel(xls, sheet_name=sheet_name, header=None)
text += f"=== Sheet: {sheet_name} ===\n"
text += df.to_string(index=False, header=False) + "\n\n"
except:
continue
except Exception as e:
logger.warning(f"Pandas extraction failed: {e}")
elif file_extension == 'csv':
try:
csv_str = file_bytes.decode('utf-8', errors='ignore')
text = csv_str
except:
csv_str = file_bytes.decode('latin-1', errors='ignore')
text = csv_str
return text if text else f"Empty spreadsheet: {filename}"
except Exception as e:
logger.error(f"Spreadsheet extraction error: {e}")
return f"Error extracting from spreadsheet: {filename}"
# Enhanced Cloudinary CRUD operations for notes
def create_note_docx(title: str, content: str):
"""Create a DOCX file for a note"""
try:
doc = docx.Document()
doc.add_heading(title, 0)
# Add content with proper formatting
for paragraph in content.split('\n'):
if paragraph.strip():
doc.add_paragraph(paragraph.strip())
# Save to a temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.docx', mode='wb')
doc.save(temp_file.name)
temp_file.close()
return temp_file.name
except Exception as e:
logger.error(f"Error creating DOCX: {e}")
return None
def upload_note_to_cloudinary(title: str, content: str):
"""Create a note as DOCX and upload to Cloudinary"""
try:
# Create DOCX file
docx_file = create_note_docx(title, content)
if not docx_file:
return None, None, "Failed to create DOCX file"
# Read the file
with open(docx_file, 'rb') as f:
file_content = f.read()
# Clean up temp file
os.unlink(docx_file)
# Generate a unique ID for the note
note_id = str(uuid.uuid4())
filename = f"note_{note_id[:8]}_{title.replace(' ', '_')[:50]}.docx"
# Upload to Cloudinary
url, public_id = upload_to_cloudinary(file_content, filename, folder="notes")
if not url or not public_id:
return None, None, "Failed to upload to Cloudinary"
# Store note metadata
notes_data[note_id] = {
"title": title,
"content": content,
"url": url,
"public_id": public_id,
"created_at": datetime.datetime.now().isoformat(),
"filename": filename
}
return note_id, url, None
except Exception as e:
logger.error(f"Error uploading note to Cloudinary: {e}")
return None, None, str(e)
def get_note_from_cloudinary(note_id: str):
"""Get a note from Cloudinary"""
try:
if note_id not in notes_data:
# Try to find by partial ID
for id_key in notes_data:
if note_id in id_key or id_key.startswith(note_id):
note_id = id_key
break
if note_id not in notes_data:
return None, "Note not found"
note = notes_data[note_id]
return note, None
except Exception as e:
logger.error(f"Error getting note: {e}")
return None, str(e)
def update_note_in_cloudinary(note_id: str, title: str = None, content: str = None):
"""Update a note in Cloudinary"""
try:
note, error = get_note_from_cloudinary(note_id)
if error:
return None, error
# Update title and/or content
if title:
note["title"] = title
if content:
note["content"] = content
# Create new DOCX
docx_file = create_note_docx(note["title"], note["content"])
if not docx_file:
return None, "Failed to create updated DOCX file"
# Read and upload
with open(docx_file, 'rb') as f:
file_content = f.read()
os.unlink(docx_file)
# Upload new version
filename = f"note_{note_id[:8]}_{note['title'].replace(' ', '_')[:50]}.docx"
url, public_id = upload_to_cloudinary(file_content, filename, folder="notes")
if not url:
return None, "Failed to update in Cloudinary"
# Update metadata
note["url"] = url
note["public_id"] = public_id
note["updated_at"] = datetime.datetime.now().isoformat()
note["filename"] = filename
notes_data[note_id] = note
return note["url"], None
except Exception as e:
logger.error(f"Error updating note: {e}")
return None, str(e)
def delete_note_from_cloudinary(note_id: str):
"""Delete a note from Cloudinary"""
try:
note, error = get_note_from_cloudinary(note_id)
if error:
return error
# Try to delete from Cloudinary if configured
if os.getenv("CLOUDINARY_CLOUD_NAME"):
try:
cloudinary.uploader.destroy(note["public_id"], resource_type="raw")
except:
pass # Continue even if Cloudinary delete fails
# Remove from local data
if note_id in notes_data:
del notes_data[note_id]
return "Note deleted successfully"
except Exception as e:
logger.error(f"Error deleting note: {e}")
return str(e)
def list_notes_from_cloudinary():
"""List all notes with pagination"""
try:
if not notes_data:
return pd.DataFrame(columns=["ID", "Title", "Created", "Updated", "Filename"])
notes_list = []
for note_id, note in notes_data.items():
notes_list.append({
"ID": note_id[:8] + "...",
"Title": note["title"][:50] + ("..." if len(note["title"]) > 50 else ""),
"Created": note["created_at"][:10],
"Updated": note.get("updated_at", note["created_at"])[:10],
"Filename": note.get("filename", "N/A")[:30]
})
df = pd.DataFrame(notes_list)
return df
except Exception as e:
logger.error(f"Error listing notes: {e}")
return pd.DataFrame(columns=["Error"])
# Improved file upload processing
def process_uploaded_files_to_cloudinary(files):
"""Upload files to Cloudinary and return their information"""
if not files:
return "Please upload at least one file.", None
uploaded_files = []
for file in files:
try:
# Handle Gradio file object
if hasattr(file, 'name'):
file_name = os.path.basename(file.name)
with open(file.name, 'rb') as f:
file_content = f.read()
else:
# Handle other file types
file_name = getattr(file, 'orig_name', 'unknown_file')
file_content = getattr(file, 'read', lambda: file)()
if callable(file_content):
file_content = file_content()
# Ensure file_content is bytes
if isinstance(file_content, str):
file_content = file_content.encode('utf-8', errors='ignore')
# Check file size (10MB limit)
if len(file_content) > 10 * 1024 * 1024:
return f"File {file_name} is too large (>10MB)", None
# Upload to Cloudinary
logger.info(f"Uploading {file_name} to Cloudinary...")
cloudinary_url, public_id = upload_to_cloudinary(file_content, file_name)
if cloudinary_url and public_id:
uploaded_files.append({
"name": file_name,
"url": cloudinary_url,
"public_id": public_id,
"size": len(file_content)
})
logger.info(f"Successfully uploaded {file_name}")
else:
return f"Failed to upload {file_name}", None
except Exception as e:
logger.error(f"Error processing file {file_name}: {e}")
return f"Error processing {file_name}: {str(e)[:100]}", None
return f"βœ… Successfully uploaded {len(uploaded_files)} file(s) to Cloudinary", uploaded_files
def get_cloudinary_files_for_selection():
"""Get files from Cloudinary for selection in interview"""
try:
resources = list_cloudinary_files(folder="interview_docs", max_results=50)
if not resources:
return [], "No files found in Cloudinary. Upload files first."
# Format for checkbox selection
file_options = []
for resource in resources:
display_name = resource["public_id"].split("/")[-1]
file_options.append((f"{display_name} ({resource['format'] if 'format' in resource else 'file'})", resource["public_id"]))
return file_options, None
except Exception as e:
logger.error(f"Error getting Cloudinary files: {e}")
return [], f"Error getting Cloudinary files: {str(e)[:100]}"
# Interview Operations with better session handling
def start_interview_session(selected_files, mode, num_questions):
"""Start an interview session with selected files from Cloudinary"""
if not selected_files:
return "Please select at least one file.", None, None
session_id = str(uuid.uuid4())
all_chunks = []
selected_file_info = []
# Clear previous cache
if session_id in sessions:
del sessions[session_id]
for public_id in selected_files:
try:
filename = public_id.split("/")[-1]
# Extract text from the file
text = extract_text_from_cloudinary_file(public_id, filename)
if text and not text.startswith("Error") and not text.startswith("Failed"):
chunks = chunk_text(text)
all_chunks.extend(chunks)
selected_file_info.append({
"name": filename,
"public_id": public_id,
"text_preview": text[:200] + "..." if len(text) > 200 else text
})
logger.info(f"Extracted {len(chunks)} chunks from {filename}")
else:
logger.warning(f"No text extracted from {filename}: {text[:100]}")
except Exception as e:
logger.error(f"Error processing file {public_id}: {e}")
continue
if not all_chunks:
return "No text could be extracted from selected files. Please try different files.", None, None
# Create ChromaDB collection for this session
collection = create_chroma_collection(session_id)
if collection:
success = add_chunks_to_collection(collection, all_chunks)
if not success:
logger.warning("Failed to add chunks to ChromaDB collection")
# Store session data
sessions[session_id] = {
"mode": mode,
"questions_left": int(num_questions),
"history": [],
"chunks": all_chunks,
"collection": collection,
"files": selected_file_info,
"start_time": datetime.datetime.now().isoformat(),
"message_count": 0
}
# Generate initial greeting
initial_greeting = generate_interview_response(session_id, "Hello, let's start the interview.")
return f"βœ… Interview session started with {len(selected_files)} file(s). Session ID: {session_id[:8]}...", session_id, initial_greeting
def generate_interview_response(session_id, message):
"""Generate response for interview session"""
if session_id not in sessions:
return "Session not found or expired. Please start a new interview session."
session = sessions[session_id]
# Update message count
session["message_count"] += 1
# Retrieve context using ChromaDB
context = retrieve_chunks(message, session.get("collection"), top_k=3)
# Get recent history (last 3 exchanges)
history_lines = []
for exchange in session.get("history", [])[-3:]:
if isinstance(exchange, dict):
history_lines.append(f"User: {exchange.get('user', '')}")
history_lines.append(f"AI: {exchange.get('ai', '')}")
elif isinstance(exchange, str):
history_lines.append(exchange)
history = "\n".join(history_lines)
# Create system prompt
system_prompt = f"""You are an AI interview simulator. Mode: {session['mode']}
Context from uploaded files (for reference):
{context[:1000]}
Recent conversation:
{history}
You have {session['questions_left']} questions left in this session.
Instructions:
1. If in 'interviewer' mode, ask insightful, relevant questions based on the context
2. If in 'participant' mode, answer professionally as a candidate
3. Keep responses concise and engaging
4. If questions reach 0, politely end the session"""
# Generate response using OpenAI
try:
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": message},
],
max_tokens=500,
temperature=0.7
)
full_response = response.choices[0].message.content
# Update session history
session["history"].append({
"user": message,
"ai": full_response,
"timestamp": datetime.datetime.now().isoformat()
})
if session["mode"] == "interviewer":
session["questions_left"] -= 1
# Update session
sessions[session_id] = session
# Check if session should end
if session["questions_left"] <= 0:
full_response += "\n\n🎯 **Interview completed!** Thank you for participating."
return full_response
except Exception as e:
logger.error(f"Error generating response: {e}")
return f"I apologize, but I encountered an error: {str(e)[:100]}"
# Tool functions with better error handling
def transcribe_audio(audio_file):
"""Transcribe audio using OpenAI Whisper"""
try:
if audio_file is None:
return "No audio file provided"
# Handle Gradio audio tuple (sample_rate, audio_data)
if isinstance(audio_file, tuple):
sample_rate, audio_data = audio_file
# Convert to mono if stereo
if len(audio_data.shape) > 1:
audio_data = audio_data.mean(axis=1)
# Save to temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
sf.write(temp_file.name, audio_data, sample_rate)
audio_path = temp_file.name
else:
audio_path = audio_file
# Transcribe
with open(audio_path, 'rb') as audio:
transcript = openai_client.audio.transcriptions.create(
model="whisper-1",
file=audio
)
# Clean up temp file
if isinstance(audio_file, tuple):
os.unlink(audio_path)
return transcript.text
except Exception as e:
logger.error(f"Transcription error: {e}")
return f"Transcription failed: {str(e)[:100]}"
def text_to_speech(text):
"""Convert text to speech using OpenAI TTS"""
try:
if not text or len(text.strip()) == 0:
return None, "No text provided"
# Limit text length
if len(text) > 1000:
text = text[:1000] + "..."
# Generate speech
response = openai_client.audio.speech.create(
model="tts-1",
voice="alloy",
input=text,
speed=1.0
)
# Save to temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3')
temp_file.write(response.content)
temp_file.close()
return temp_file.name, None
except Exception as e:
logger.error(f"TTS error: {e}")
return None, f"TTS failed: {str(e)[:100]}"
# Unified Command Processor with better parsing
def process_unified_command(command, audio_file=None):
"""Process all commands through a unified interface"""
# If audio is provided, transcribe it first
if audio_file is not None:
transcribed_text = transcribe_audio(audio_file)
if transcribed_text and not transcribed_text.startswith("Transcription failed"):
command = transcribed_text
logger.info(f"Transcribed command: {command}")
else:
return transcribed_text or "Failed to transcribe audio"
if not command or len(command.strip()) == 0:
return "Please enter a command or provide audio input"
command_lower = command.lower().strip()
# Interview commands
if command_lower.startswith("interview"):
return "Please use the Interview tab for interview operations. You can start by uploading files and creating a session."
# Note commands
elif command_lower.startswith(("read", "update", "delete", "list")):
return process_note_command(command)
# Tool commands
elif any(keyword in command_lower for keyword in ["search", "google", "summarize", "weather", "news", "email", "url", "analyze", "tts", "speak"]):
return process_tool_command(command)
# Default: treat as general AI query
else:
try:
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful AI assistant. Respond concisely and accurately."},
{"role": "user", "content": command}
],
max_tokens=500,
temperature=0.7
)
return response.choices[0].message.content
except Exception as e:
return f"Error: {str(e)[:200]}"
def process_tool_command(command):
"""Process tool commands"""
command_lower = command.lower().strip()
if "search" in command_lower or "google" in command_lower:
query = re.sub(r'(search|google)\s+', '', command, flags=re.IGNORECASE).strip()
if query:
return search_web(query)
return "Please provide a search query"
elif "summarize" in command_lower:
text = re.sub(r'summarize\s+', '', command, flags=re.IGNORECASE).strip()
if text:
return summarize_text(text)
return "Please provide text to summarize"
elif "weather" in command_lower:
city_match = re.search(r'weather\s+(.+)', command_lower)
city = city_match.group(1) if city_match else None
return get_weather(city)
elif "news" in command_lower:
return get_news()
elif "email" in command_lower:
# Simple email pattern
pattern = r'email\s+(\S+@\S+\.\S+)\s+"([^"]+)"\s+"([^"]+)"'
match = re.search(pattern, command, re.IGNORECASE)
if match:
return send_email(match.group(2), match.group(3), match.group(1))
return "Usage: email recipient@example.com 'Subject' 'Message'"
elif "url" in command_lower or "analyze" in command_lower:
url_match = re.search(r'(?:url|analyze)\s+(\S+)', command, re.IGNORECASE)
if url_match:
result, _ = analyze_url(url_match.group(1))
return result
return "Please provide a URL to analyze"
elif "tts" in command_lower or "speak" in command_lower:
text = re.sub(r'(tts|speak)\s+', '', command, flags=re.IGNORECASE).strip()
if text:
audio_file, error = text_to_speech(text)
if error:
return error
return f"Speech generated: {audio_file}"
return "Please provide text to convert to speech"
return "Unknown command. Try: search, summarize, weather, news, email, url, or tts"
def process_note_command(command):
"""Process note commands"""
command_lower = command.lower().strip()
if command_lower.startswith("read"):
match = re.search(r'read\s+(\S+)', command, re.IGNORECASE)
if match:
note_id = match.group(1)
note, error = get_note_from_cloudinary(note_id)
if error:
return error
return f"πŸ“ **{note['title']}**\n\n{note['content']}\n\nCreated: {note['created_at'][:10]}"
return "Usage: read <note_id>"
elif command_lower.startswith("update"):
match = re.search(r'update\s+(\S+)\s+(.+)', command, re.IGNORECASE)
if match:
note_id = match.group(1)
new_content = match.group(2)
_, error = update_note_in_cloudinary(note_id, content=new_content)
if error:
return error
return "Note updated successfully"
return "Usage: update <note_id> <new_content>"
elif command_lower.startswith("delete"):
match = re.search(r'delete\s+(\S+)', command, re.IGNORECASE)
if match:
note_id = match.group(1)
result = delete_note_from_cloudinary(note_id)
return result
return "Usage: delete <note_id>"
elif command_lower.startswith("list"):
df = list_notes_from_cloudinary()
if isinstance(df, pd.DataFrame) and not df.empty:
return df.to_string(index=False)
return "No notes found"
return "Unknown note command. Use: read, update, delete, list"
# Create Gradio interface with fixed UI
def create_interface():
# Custom CSS for better UI
custom_css = """
.gradio-container {
max-width: 1400px !important;
margin: 0 auto !important;
}
.tab-nav {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
padding: 10px !important;
border-radius: 10px !important;
margin-bottom: 20px !important;
}
.tab-nav button {
background: white !important;
color: #333 !important;
border: 2px solid transparent !important;
margin: 0 5px !important;
border-radius: 8px !important;
padding: 10px 20px !important;
font-weight: 600 !important;
transition: all 0.3s ease !important;
}
.tab-nav button.selected {
background: #4CAF50 !important;
color: white !important;
border-color: #45a049 !important;
box-shadow: 0 4px 6px rgba(0,0,0,0.1) !important;
}
.btn-primary {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
color: white !important;
border: none !important;
border-radius: 8px !important;
padding: 12px 24px !important;
font-weight: 600 !important;
margin: 5px !important;
transition: all 0.3s ease !important;
}
.btn-primary:hover {
transform: translateY(-2px) !important;
box-shadow: 0 6px 12px rgba(0,0,0,0.2) !important;
}
.btn-secondary {
background: #6c757d !important;
color: white !important;
border: none !important;
border-radius: 8px !important;
padding: 10px 20px !important;
}
.textbox textarea {
border-radius: 10px !important;
border: 2px solid #e0e0e0 !important;
padding: 15px !important;
font-size: 14px !important;
}
.panel {
border-radius: 15px !important;
box-shadow: 0 10px 30px rgba(0,0,0,0.1) !important;
padding: 20px !important;
margin: 10px 0 !important;
background: white !important;
border: 1px solid #e0e0e0 !important;
}
h1, h2, h3 {
color: #333 !important;
margin-bottom: 20px !important;
}
.chatbot {
min-height: 400px !important;
max-height: 500px !important;
overflow-y: auto !important;
border-radius: 10px !important;
border: 2px solid #e0e0e0 !important;
}
.dataframe {
width: 100% !important;
border-collapse: collapse !important;
}
.dataframe th {
background: #667eea !important;
color: white !important;
padding: 12px !important;
}
.dataframe td {
padding: 10px !important;
border-bottom: 1px solid #e0e0e0 !important;
}
"""
with gr.Blocks(title="AI Unified Command Platform", theme=gr.themes.Soft(), css=custom_css) as app:
gr.Markdown("""
# πŸ€– AI Unified Command Platform
### All features accessible through a single command interface
""")
with gr.Tabs() as tabs:
# Unified Command Interface Tab
with gr.TabItem("πŸ’¬ Unified Commands", id="unified"):
with gr.Row():
with gr.Column(scale=3):
with gr.Group():
gr.Markdown("### πŸ’¬ Command Interface")
command_input = gr.Textbox(
label="Enter Command",
placeholder="Type your command here...\nExamples:\n- search latest AI news\n- summarize this text\n- weather New York\n- news\n- tts hello world",
lines=3
)
with gr.Row():
command_btn = gr.Button("πŸš€ Execute Command", variant="primary", size="lg")
clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
gr.Markdown("---")
gr.Markdown("### 🎀 Voice Input")
audio_input = gr.Audio(
label="Speak your command",
type="filepath",
sources=["microphone"]
)
transcribe_btn = gr.Button("πŸŽ™οΈ Transcribe & Execute", variant="primary")
with gr.Group():
gr.Markdown("### πŸ“ Quick Note")
with gr.Row():
quick_note_title = gr.Textbox(label="Title", placeholder="Note title...", scale=2)
quick_note_btn = gr.Button("πŸ’Ύ Save Quick Note", variant="primary", scale=1)
quick_note_content = gr.Textbox(label="Content", placeholder="Note content...", lines=3)
quick_note_status = gr.Textbox(label="Status", interactive=False)
with gr.Column(scale=2):
with gr.Group():
gr.Markdown("### πŸ“Š Response")
command_output = gr.Textbox(
label="Response",
interactive=False,
lines=20,
elem_classes="panel"
)
with gr.Group():
gr.Markdown("### πŸ”Š Speech Output")
tts_output = gr.Audio(label="Generated Speech", interactive=False)
# Event handlers for Unified Commands tab
def execute_command_wrapper(command, audio_path):
return process_unified_command(command, audio_path)
command_btn.click(
fn=execute_command_wrapper,
inputs=[command_input, audio_input],
outputs=[command_output]
).then(
lambda: (gr.update(value=""), gr.update(value=None)),
outputs=[command_input, audio_input]
)
transcribe_btn.click(
fn=lambda audio: process_unified_command(None, audio),
inputs=[audio_input],
outputs=[command_output]
).then(
lambda: gr.update(value=None),
outputs=[audio_input]
)
clear_btn.click(
lambda: ("", "", None, ""),
outputs=[command_input, command_output, audio_input, quick_note_status]
)
def save_quick_note(title, content):
if not title.strip() or not content.strip():
return "Please provide both title and content"
note_id, url, error = upload_note_to_cloudinary(title, content)
if error:
return f"Error: {error}"
return f"βœ… Note saved! ID: {note_id[:8]}"
quick_note_btn.click(
fn=save_quick_note,
inputs=[quick_note_title, quick_note_content],
outputs=[quick_note_status]
)
# File Management Tab
with gr.TabItem("πŸ“ File Management", id="files"):
with gr.Row():
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### πŸ“€ Upload Files")
file_upload = gr.File(
file_count="multiple",
file_types=[".pdf", ".doc", ".docx", ".txt", ".md", ".csv", ".xlsx", ".xls"],
label="Select files to upload (max 10MB each)"
)
upload_btn = gr.Button("πŸ“€ Upload to Cloudinary", variant="primary")
upload_status = gr.Textbox(label="Upload Status", interactive=False)
with gr.Column(scale=2):
with gr.Group():
gr.Markdown("### πŸ“‹ Uploaded Files")
with gr.Row():
refresh_files_btn = gr.Button("πŸ”„ Refresh List", variant="secondary")
clear_files_btn = gr.Button("πŸ—‘οΈ Clear All", variant="secondary")
files_display = gr.Dataframe(
label="Files in Cloudinary",
headers=["Name", "Type", "Size", "Uploaded"],
interactive=False,
wrap=True
)
# File management event handlers
def upload_files_handler(files):
if not files:
return "Please select files to upload", pd.DataFrame()
status, uploaded = process_uploaded_files_to_cloudinary(files)
if uploaded:
# Update cache
for file_info in uploaded:
uploaded_files_cache[file_info['public_id']] = file_info
# Format for display
display_data = []
for file_info in uploaded:
display_data.append([
file_info['name'][:30],
file_info['name'].split('.')[-1] if '.' in file_info['name'] else 'file',
f"{file_info['size'] // 1024} KB",
"Just now"
])
return status, pd.DataFrame(display_data, columns=["Name", "Type", "Size", "Uploaded"])
else:
return status, pd.DataFrame()
def refresh_files_handler():
try:
resources = list_cloudinary_files(folder="interview_docs", max_results=50)
if not resources:
return pd.DataFrame(columns=["Name", "Type", "Size", "Uploaded"])
display_data = []
for resource in resources:
filename = resource["public_id"].split("/")[-1]
filetype = resource.get("format", filename.split('.')[-1] if '.' in filename else 'file')
size = resource.get("bytes", 0)
uploaded = resource.get("created_at", "")[:10]
display_data.append([
filename[:30],
filetype,
f"{size // 1024} KB" if size > 0 else "N/A",
uploaded
])
return pd.DataFrame(display_data, columns=["Name", "Type", "Size", "Uploaded"])
except Exception as e:
logger.error(f"Refresh error: {e}")
return pd.DataFrame(columns=["Name", "Type", "Size", "Uploaded"])
upload_btn.click(
fn=upload_files_handler,
inputs=[file_upload],
outputs=[upload_status, files_display]
)
refresh_files_btn.click(
fn=refresh_files_handler,
outputs=[files_display]
)
clear_files_btn.click(
lambda: (pd.DataFrame(columns=["Name", "Type", "Size", "Uploaded"]), "List cleared"),
outputs=[files_display, upload_status]
)
# Interview Tab
with gr.TabItem("πŸ“‹ Interview Mode", id="interview"):
with gr.Row():
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### 🎯 Interview Setup")
with gr.Row():
mode_select = gr.Radio(
choices=["interviewer", "participant"],
value="interviewer",
label="Role",
info="Select your role in the interview"
)
num_questions = gr.Number(
value=5,
label="Number of Questions",
minimum=1,
maximum=20,
step=1
)
gr.Markdown("### πŸ“š Select Files")
with gr.Row():
refresh_interview_files_btn = gr.Button("πŸ”„ Refresh Files", variant="secondary", size="sm")
file_selection = gr.CheckboxGroup(
label="Available Files",
choices=[],
info="Select files to use as interview context"
)
start_interview_btn = gr.Button("πŸš€ Start Interview Session", variant="primary")
interview_status = gr.Textbox(label="Status", interactive=False)
session_display = gr.Textbox(label="Session ID", interactive=False)
with gr.Column(scale=2):
with gr.Group():
gr.Markdown("### πŸ’¬ Interview Chat")
interview_chatbot = gr.Chatbot(
label="Conversation",
height=400,
type="messages",
avatar_images=(None, "https://api.dicebear.com/7.x/bottts/svg?seed=AI")
)
with gr.Row():
interview_msg = gr.Textbox(
label="Your message",
placeholder="Type your message here...",
scale=4
)
send_msg_btn = gr.Button("πŸ“€ Send", variant="primary", scale=1)
with gr.Row():
interview_audio = gr.Audio(
label="🎀 Speak your message",
sources=["microphone"],
type="filepath"
)
transcribe_interview_btn = gr.Button("πŸŽ™οΈ Transcribe", variant="secondary")
tts_interview_btn = gr.Button("πŸ”Š Speak Response", variant="secondary")
interview_audio_output = gr.Audio(label="AI Speech", interactive=False)
# Interview event handlers
def refresh_interview_files():
file_options, error = get_cloudinary_files_for_selection()
if error:
return gr.CheckboxGroup(choices=[], label=f"Error: {error[:100]}")
return gr.CheckboxGroup(choices=[opt[1] for opt in file_options], label=f"Available Files ({len(file_options)} found)")
def start_interview_handler(selected_files, mode, q_count):
if not selected_files:
return "Please select at least one file", "", None
status, session_id, greeting = start_interview_session(selected_files, mode, q_count)
if session_id and greeting:
chat_history = [("AI", greeting)]
return status, session_id, chat_history
else:
return status, "", None
def send_interview_message(message, history, session_id):
if not session_id:
return "", history + [("User", message), ("AI", "Please start an interview session first")]
response = generate_interview_response(session_id, message)
return "", history + [("User", message), ("AI", response)]
refresh_interview_files_btn.click(
fn=refresh_interview_files,
outputs=[file_selection]
)
start_interview_btn.click(
fn=start_interview_handler,
inputs=[file_selection, mode_select, num_questions],
outputs=[interview_status, session_display, interview_chatbot]
)
send_msg_btn.click(
fn=send_interview_message,
inputs=[interview_msg, interview_chatbot, session_display],
outputs=[interview_msg, interview_chatbot]
)
interview_msg.submit(
fn=send_interview_message,
inputs=[interview_msg, interview_chatbot, session_display],
outputs=[interview_msg, interview_chatbot]
)
def transcribe_interview_audio(audio_path, history, session_id):
if not audio_path:
return "", history
transcribed = transcribe_audio(audio_path)
if transcribed and not transcribed.startswith("Transcription failed"):
response = generate_interview_response(session_id, transcribed) if session_id else "Please start a session first"
return "", history + [("User", f"[Audio] {transcribed}"), ("AI", response)]
return "", history
transcribe_interview_btn.click(
fn=transcribe_interview_audio,
inputs=[interview_audio, interview_chatbot, session_display],
outputs=[interview_msg, interview_chatbot]
)
def tts_last_response(history):
if not history or len(history) == 0:
return None, "No conversation history"
last_ai = None
for role, msg in reversed(history):
if role == "AI":
last_ai = msg
break
if not last_ai:
return None, "No AI response found"
audio_file, error = text_to_speech(last_ai[:500]) # Limit length
if error:
return None, error
return audio_file, ""
tts_interview_btn.click(
fn=tts_last_response,
inputs=[interview_chatbot],
outputs=[interview_audio_output, gr.Textbox(visible=False)]
)
# Notes Tab
with gr.TabItem("πŸ“ Notes Management", id="notes"):
with gr.Row():
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### ✨ Create New Note")
new_note_title = gr.Textbox(label="Note Title", placeholder="Enter note title...")
new_note_content = gr.Textbox(label="Note Content", placeholder="Enter note content...", lines=5)
create_note_btn = gr.Button("πŸ’Ύ Create Note", variant="primary")
create_status = gr.Textbox(label="Status", interactive=False)
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### πŸ” Note Operations")
note_op_id = gr.Textbox(label="Note ID", placeholder="Enter note ID (or partial)...")
with gr.Row():
read_note_btn = gr.Button("πŸ“– Read", variant="secondary")
update_note_btn = gr.Button("✏️ Update", variant="secondary")
delete_note_btn = gr.Button("πŸ—‘οΈ Delete", variant="secondary", size="sm")
note_content_display = gr.Textbox(label="Note Content", interactive=False, lines=10)
with gr.Row():
list_notes_btn = gr.Button("πŸ“‹ List All Notes", variant="primary")
clear_notes_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
notes_list_display = gr.Dataframe(
label="All Notes",
headers=["ID", "Title", "Created", "Updated"],
interactive=False,
wrap=True
)
# Notes event handlers
def create_note_handler(title, content):
if not title.strip() or not content.strip():
return "Please provide both title and content", ""
note_id, url, error = upload_note_to_cloudinary(title, content)
if error:
return f"Error: {error}", ""
return f"βœ… Note created! ID: {note_id}", note_id[:8]
def read_note_handler(note_id):
if not note_id.strip():
return "Please provide a note ID", pd.DataFrame()
note, error = get_note_from_cloudinary(note_id)
if error:
return f"Error: {error}", pd.DataFrame()
content = f"πŸ“ **{note['title']}**\n\n{note['content']}\n\n---\nCreated: {note['created_at'][:19]}"
if 'updated_at' in note:
content += f"\nUpdated: {note['updated_at'][:19]}"
return content, pd.DataFrame()
def update_note_handler(note_id):
if not note_id.strip():
return "Please provide a note ID", pd.DataFrame()
# For demo, just add an update timestamp
note, error = get_note_from_cloudinary(note_id)
if error:
return f"Error: {error}", pd.DataFrame()
new_content = f"{note['content']}\n\n[Updated at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]"
_, update_error = update_note_in_cloudinary(note_id, content=new_content)
if update_error:
return f"Update error: {update_error}", pd.DataFrame()
return "βœ… Note updated successfully", pd.DataFrame()
def delete_note_handler(note_id):
if not note_id.strip():
return "Please provide a note ID", pd.DataFrame()
result = delete_note_from_cloudinary(note_id)
return result, pd.DataFrame()
def list_notes_handler():
df = list_notes_from_cloudinary()
if isinstance(df, pd.DataFrame) and not df.empty:
return "", df
return "No notes found", pd.DataFrame()
create_note_btn.click(
fn=create_note_handler,
inputs=[new_note_title, new_note_content],
outputs=[create_status, note_op_id]
)
read_note_btn.click(
fn=read_note_handler,
inputs=[note_op_id],
outputs=[note_content_display, notes_list_display]
)
update_note_btn.click(
fn=update_note_handler,
inputs=[note_op_id],
outputs=[note_content_display, notes_list_display]
)
delete_note_btn.click(
fn=delete_note_handler,
inputs=[note_op_id],
outputs=[note_content_display, notes_list_display]
)
list_notes_btn.click(
fn=list_notes_handler,
outputs=[note_content_display, notes_list_display]
)
clear_notes_btn.click(
lambda: ("", pd.DataFrame(), "", ""),
outputs=[note_content_display, notes_list_display, note_op_id, create_status]
)
# Footer
gr.Markdown("""
---
<div style="text-align: center; padding: 20px; background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); border-radius: 10px;">
<h3>πŸš€ AI Unified Command Platform</h3>
<p>Powered by OpenAI, Cloudinary, and ChromaDB</p>
<p><strong>How to use:</strong></p>
<ol style="text-align: left; display: inline-block;">
<li>Upload files using the File Management tab</li>
<li>Start interviews with files from Cloudinary</li>
<li>Type or speak commands in the Unified Commands tab</li>
<li>Manage notes in the Notes tab</li>
</ol>
</div>
""")
return app
# Launch the application
if __name__ == "__main__":
# Create necessary directories
os.makedirs("./chroma_db", exist_ok=True)
# Create the interface
app = create_interface()
# Launch with appropriate settings
launch_config = {
"server_name": "0.0.0.0",
"server_port": 7860,
"share": False if IS_HF_SPACES else True
}
print("πŸš€ Starting AI Unified Command Platform...")
print(f"πŸ“ ChromaDB directory: ./chroma_db")
print(f"🌐 Cloudinary configured: {bool(os.getenv('CLOUDINARY_CLOUD_NAME'))}")
print(f"πŸ€– OpenAI configured: {bool(os.getenv('OPENAI_API_KEY'))}")
app.launch(**launch_config)