voiceCal-ai-v2 / app /api /main.py
pgits's picture
UPDATE: Switch landing page to use VoiceCal LinkedIn diagram
415943d
"""Main FastAPI application for ChatCal.ai."""
from fastapi import FastAPI, HTTPException, Depends, Request, status, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, StreamingResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.exception_handlers import request_validation_exception_handler
from fastapi.exceptions import RequestValidationError
from datetime import datetime
import uuid
import json
import logging
from typing import Dict, Any, Optional
from app.config import settings
from app.api.models import (
ChatRequest, ChatResponse, StreamChatResponse, SessionCreate, SessionResponse,
ConversationHistory, HealthResponse, ErrorResponse, AuthRequest, AuthResponse
)
from app.api.chat_widget import router as chat_widget_router
from app.api.simple_chat import router as simple_chat_router
from app.core.session_factory import session_manager
from app.calendar.auth import CalendarAuth
from app.core.exceptions import (
ChatCalException, AuthenticationError, CalendarError,
LLMError, ValidationError, RateLimitError
)
from app.core.llm_anthropic import anthropic_llm
# Create FastAPI app
app = FastAPI(
title="ChatCal.ai",
description="AI-powered calendar assistant for booking appointments",
version="0.1.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Log testing mode status on startup
if settings.testing_mode:
logger.info("🧪 TESTING MODE ENABLED - Peter's email will be treated as regular user email")
else:
logger.info("📧 Production mode - Peter's email will receive special formatting")
# Calendar auth instance
calendar_auth = CalendarAuth()
# Include routers
app.include_router(chat_widget_router)
app.include_router(simple_chat_router)
# Mount static files
import os
static_path = "/app/static"
if os.path.exists(static_path):
app.mount("/static", StaticFiles(directory=static_path), name="static")
# Global exception handlers
@app.exception_handler(ChatCalException)
async def chatcal_exception_handler(request: Request, exc: ChatCalException):
"""Handle custom ChatCal exceptions."""
logger.error(f"ChatCal exception: {exc.message}", extra={"details": exc.details})
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={
"error": exc.__class__.__name__,
"message": exc.message,
"details": exc.details,
"timestamp": datetime.utcnow().isoformat()
}
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""Handle request validation errors."""
logger.warning(f"Validation error: {exc}")
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"error": "ValidationError",
"message": "Invalid request data",
"details": {"validation_errors": exc.errors()},
"timestamp": datetime.utcnow().isoformat()
}
)
@app.exception_handler(500)
async def internal_server_error_handler(request: Request, exc: Exception):
"""Handle internal server errors."""
logger.error(f"Internal server error: {exc}", exc_info=True)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": "InternalServerError",
"message": "An unexpected error occurred. Please try again later.",
"details": {},
"timestamp": datetime.utcnow().isoformat()
}
)
@app.get("/", response_class=HTMLResponse)
async def root():
"""Root endpoint with basic information."""
return """
<!DOCTYPE html>
<html>
<head>
<title>VoiceCal.ai</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; background-color: #f5f5f5; }
.container { background: white; padding: 40px; border-radius: 10px; max-width: 800px; margin: 0 auto; }
h1 { color: #2c3e50; text-align: center; }
.feature { background: #e8f5e9; padding: 15px; margin: 15px 0; border-radius: 8px; border-left: 4px solid #4caf50; }
.api-link { background: #e3f2fd; padding: 15px; margin: 15px 0; border-radius: 8px; text-align: center; }
a { color: #1976d2; text-decoration: none; }
a:hover { text-decoration: underline; }
</style>
</head>
<body>
<div class="container">
<h1>📅 Schedule Time with Peter Michael Gits</h1>
<p style="text-align: center; font-size: 18px; color: #666;">
Book consultations, meetings, and advisory sessions with Peter
</p>
<div style="text-align: center; margin: 30px 0;">
<div style="background: #f8f9fa; padding: 20px; border-radius: 8px; margin-bottom: 20px; max-width: 400px; margin-left: auto; margin-right: auto;">
<h4 style="margin-top: 0; color: #2c3e50;">📧 Enter Your Email Address</h4>
<p style="margin-bottom: 15px; color: #666; font-size: 14px;">
This helps us get your contact details right for booking confirmation
</p>
<input
type="email"
id="userEmail"
placeholder="your.email@example.com"
style="width: 100%; padding: 12px; border: 2px solid #ddd; border-radius: 6px; font-size: 16px; box-sizing: border-box;"
autocomplete="email"
>
<div id="emailError" style="color: #e74c3c; font-size: 12px; margin-top: 5px; display: none;">
Please enter a valid email address
</div>
</div>
<a
href="#"
id="bookingButton"
onclick="proceedToBooking()"
style="background: #cccccc; color: #888; padding: 15px 40px; font-size: 20px; border-radius: 8px; text-decoration: none; display: inline-block; cursor: not-allowed; transition: all 0.3s ease;"
>
Book an Appointment Now
</a>
</div>
<div class="feature">
<h3>💼 Professional Consultations</h3>
<p>Schedule one-on-one business consultations and advisory sessions with Peter Michael Gits</p>
</div>
<div class="feature">
<h3>🤖 AI-Powered Scheduling</h3>
<p>Our intelligent assistant helps you find the perfect time that works for both you and Peter</p>
</div>
<div class="feature">
<h3>📧 Instant Confirmation</h3>
<p>Receive immediate confirmation and calendar invitations for your scheduled meetings</p>
</div>
<div class="api-link">
<h3>🛠️ API Documentation</h3>
<p>
<a href="/docs">Interactive API Docs (Swagger)</a> |
<a href="/redoc">Alternative Docs (ReDoc)</a> |
<a href="/linkedin-diagram">📊 LinkedIn Diagram</a>
</p>
</div>
<div style="text-align: center; margin-top: 30px; color: #888;">
<p>Version 0.1.0 | Built with FastAPI & LlamaIndex</p>
</div>
</div>
<script>
// Email validation functionality
const emailInput = document.getElementById('userEmail');
const bookingButton = document.getElementById('bookingButton');
const emailError = document.getElementById('emailError');
// Email validation regex
const emailRegex = /^[^ @]+@[^ @]+\.[^ @]+$/;
function validateEmail() {
const email = emailInput.value.trim();
const isValid = emailRegex.test(email);
if (isValid) {
// Valid email - enable button
bookingButton.style.background = '#4caf50';
bookingButton.style.color = 'white';
bookingButton.style.cursor = 'pointer';
emailError.style.display = 'none';
emailInput.style.borderColor = '#4caf50';
} else if (email.length > 0) {
// Invalid email with text - show error
bookingButton.style.background = '#cccccc';
bookingButton.style.color = '#888';
bookingButton.style.cursor = 'not-allowed';
emailError.style.display = 'block';
emailInput.style.borderColor = '#e74c3c';
} else {
// Empty email - reset to neutral
bookingButton.style.background = '#cccccc';
bookingButton.style.color = '#888';
bookingButton.style.cursor = 'not-allowed';
emailError.style.display = 'none';
emailInput.style.borderColor = '#ddd';
}
return isValid;
}
function proceedToBooking() {
if (validateEmail()) {
const email = emailInput.value.trim();
// Pass email to chat widget as URL parameter
window.location.href = `/chat-widget?email=${encodeURIComponent(email)}`;
} else {
// Focus on email input if not valid
emailInput.focus();
}
}
// Add real-time validation
emailInput.addEventListener('input', validateEmail);
emailInput.addEventListener('blur', validateEmail);
// Handle Enter key press
emailInput.addEventListener('keypress', function(event) {
if (event.key === 'Enter') {
proceedToBooking();
}
});
// Ensure form resets on page load (no state persistence)
window.addEventListener('load', function() {
emailInput.value = '';
validateEmail();
});
</script>
</body>
</html>
"""
@app.get("/linkedin-diagram", response_class=HTMLResponse)
async def linkedin_diagram():
"""Serve the VoiceCal LinkedIn diagram HTML file."""
try:
with open("VOICECAL_LINKEDIN_DIAGRAM.html", "r", encoding="utf-8") as f:
return f.read()
except FileNotFoundError:
raise HTTPException(status_code=404, detail="LinkedIn diagram not found")
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint."""
services = {}
# Check session backend connection
try:
if hasattr(session_manager, 'redis_client'):
# Redis session manager
session_manager.redis_client.ping()
else:
# JWT session manager - always healthy if initialized
pass
services["session_backend"] = "healthy"
except Exception as e:
logger.error(f"Session backend health check failed: {e}")
services["session_backend"] = "unhealthy"
# Check Groq LLM (via anthropic_llm interface)
try:
from app.core.llm_anthropic import anthropic_llm
if anthropic_llm.test_connection():
services["llm"] = "healthy"
else:
services["llm"] = "unhealthy"
except Exception as e:
logger.error(f"LLM health check failed: {e}")
services["llm"] = "not_configured"
# Add testing mode status
services["testing_mode"] = "enabled" if settings.testing_mode else "disabled"
# Check Calendar service
try:
if calendar_auth.client_id and calendar_auth.client_secret:
services["calendar"] = "ready"
else:
services["calendar"] = "not_configured"
except Exception as e:
logger.error(f"Calendar health check failed: {e}")
services["calendar"] = "unhealthy"
overall_status = "healthy" if all(status in ["healthy", "ready"] for status in services.values()) else "degraded"
return HealthResponse(
status=overall_status,
version="0.1.0",
timestamp=datetime.utcnow(),
services=services
)
@app.post("/sessions", response_model=SessionResponse)
async def create_session(request: SessionCreate):
"""Create a new chat session."""
try:
# Debug logging for session creation
print(f"📧 Session creation request received with user_data: {request.user_data}")
if request.user_data and 'email' in request.user_data:
print(f"📧 Email found in request: {request.user_data['email']}")
else:
print(f"❌ No email found in session creation request!")
session_id = session_manager.create_session(request.user_data)
if not session_id:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create session"
)
session_data = session_manager.get_session(session_id)
if not session_data:
raise HTTPException(status_code=500, detail="Failed to create session")
# Debug logging for stored session data
print(f"📧 Session {session_id} created successfully")
if 'user_data' in session_data and session_data['user_data']:
print(f"📧 Stored user_data: {session_data['user_data']}")
if 'email' in session_data['user_data']:
print(f"✅ Email successfully stored in session: {session_data['user_data']['email']}")
else:
print(f"❌ Email NOT found in stored session data!")
else:
print(f"❌ No user_data found in stored session!")
return SessionResponse(
session_id=session_id,
created_at=datetime.fromisoformat(session_data["created_at"]),
last_activity=datetime.fromisoformat(session_data["last_activity"]),
is_active=True
)
except Exception as e:
logger.error(f"Session creation failed: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create session. Please try again."
)
@app.get("/sessions/{session_id}", response_model=SessionResponse)
async def get_session(session_id: str):
"""Get session information."""
session_data = session_manager.get_session(session_id)
if not session_data:
raise HTTPException(status_code=404, detail="Session not found")
return SessionResponse(
session_id=session_id,
created_at=datetime.fromisoformat(session_data["created_at"]),
last_activity=datetime.fromisoformat(session_data["last_activity"]),
is_active=True
)
@app.get("/api/session-info")
async def get_current_session_info(request: Request):
"""Get current session information for the frontend."""
try:
# Extract session ID from cookies
session_id = request.cookies.get("session_id")
if not session_id:
return {"error": "No session found"}
session_data = session_manager.get_session(session_id)
if not session_data:
return {"error": "Session not found"}
return {
"session_id": session_id,
"user_data": session_data.get("user_data", {}),
"created_at": session_data.get("created_at"),
"last_activity": session_data.get("last_activity")
}
except Exception as e:
print(f"Error getting session info: {e}")
return {"error": str(e)}
@app.delete("/sessions/{session_id}")
async def delete_session(session_id: str):
"""Delete a session."""
success = session_manager.delete_session(session_id)
if not success:
raise HTTPException(status_code=404, detail="Session not found")
return {"message": "Session deleted successfully"}
@app.get("/sessions/{session_id}/history", response_model=ConversationHistory)
async def get_conversation_history(session_id: str):
"""Get conversation history for a session."""
session_data = session_manager.get_session(session_id)
if not session_data:
raise HTTPException(status_code=404, detail="Session not found")
history = session_manager.get_conversation_history(session_id)
return ConversationHistory(
session_id=session_id,
messages=history.get("messages", []) if history else [],
created_at=datetime.fromisoformat(session_data["created_at"])
)
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""Chat with the AI assistant."""
from app.core.logging_config import log_chat_request, log_chat_response, debug_print
# Enhanced logging for chat requests
session_id = request.session_id or "new_session"
log_chat_request(session_id, request.message)
debug_print(f"CHAT START - Session: {session_id[:8]}...", {"message_length": len(request.message)})
try:
# Create session if not provided
if not request.session_id:
debug_print("Creating new session")
request.session_id = session_manager.create_session()
debug_print(f"New session created: {request.session_id}")
# Get or create conversation
debug_print("Getting conversation manager")
conversation = session_manager.get_or_create_conversation(request.session_id)
if not conversation:
debug_print("ERROR: Conversation not found", {"session_id": request.session_id})
raise HTTPException(status_code=404, detail="Session not found")
debug_print("Conversation found, getting response from agent")
# Get response from agent
response = conversation.get_response(request.message)
debug_print("Agent response received", {"response_length": len(response)})
# Store conversation history
debug_print("Storing conversation history")
session_manager.store_conversation_history(request.session_id)
# Log successful response
log_chat_response(request.session_id, response)
debug_print("CHAT SUCCESS - Response ready")
return ChatResponse(
response=response,
session_id=request.session_id,
timestamp=datetime.utcnow(),
tools_used=None # Will implement tool tracking later
)
except Exception as e:
import traceback
error_traceback = traceback.format_exc()
logger.error(f"Chat error: {str(e)}")
logger.error(f"Traceback: {error_traceback}")
raise HTTPException(status_code=500, detail=f"Chat error: {str(e) if str(e) else 'Unknown error'} | Type: {type(e).__name__}")
@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
"""Stream chat response from the AI assistant."""
try:
# Create session if not provided
if not request.session_id:
request.session_id = session_manager.create_session()
# Get or create conversation
conversation = session_manager.get_or_create_conversation(request.session_id)
if not conversation:
raise HTTPException(status_code=404, detail="Session not found")
async def generate_stream():
"""Generate streaming response."""
try:
for token in conversation.get_streaming_response(request.message):
chunk = StreamChatResponse(
token=token,
session_id=request.session_id,
is_complete=False
)
yield f"data: {chunk.model_dump_json()}\n\n"
# Send completion signal
final_chunk = StreamChatResponse(
token="",
session_id=request.session_id,
is_complete=True
)
yield f"data: {final_chunk.model_dump_json()}\n\n"
# Store conversation history
session_manager.store_conversation_history(request.session_id)
except Exception as e:
error_chunk = {
"error": str(e),
"session_id": request.session_id,
"is_complete": True
}
yield f"data: {json.dumps(error_chunk)}\n\n"
return StreamingResponse(
generate_stream(),
media_type="text/plain",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Stream chat error: {str(e)}")
@app.post("/tts/synthesize")
async def tts_synthesize(request_data: Dict[str, Any], request: Request):
"""TTS synthesis proxy to avoid CORS issues."""
try:
from groq import Groq
import os
import tempfile
text = request_data.get("text", "")
# Valid Groq PlayAI voices
valid_voices = [
'Aaliyah-PlayAI', 'Adelaide-PlayAI', 'Angelo-PlayAI', 'Arista-PlayAI',
'Atlas-PlayAI', 'Basil-PlayAI', 'Briggs-PlayAI', 'Calum-PlayAI',
'Celeste-PlayAI', 'Cheyenne-PlayAI', 'Chip-PlayAI', 'Cillian-PlayAI',
'Deedee-PlayAI', 'Eleanor-PlayAI', 'Fritz-PlayAI', 'Gail-PlayAI',
'Indigo-PlayAI', 'Jennifer-PlayAI', 'Judy-PlayAI', 'Mamaw-PlayAI',
'Mason-PlayAI', 'Mikail-PlayAI', 'Mitch-PlayAI', 'Nia-PlayAI',
'Quinn-PlayAI', 'Ruby-PlayAI', 'Thunder-PlayAI'
]
requested_voice = request_data.get("voice", "Cheyenne-PlayAI")
voice = requested_voice if requested_voice in valid_voices else "Cheyenne-PlayAI"
if not text.strip():
raise HTTPException(status_code=400, detail="Text is required")
logger.info(f"🎵 TTS synthesis request: {text[:50]}...")
if requested_voice != voice:
logger.warning(f"⚠️ Voice '{requested_voice}' not valid, using '{voice}' instead")
# Create Groq TTS client
logger.info("🔌 Creating Groq TTS client connection")
client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
# Time the TTS generation for performance monitoring
import time
start_time = time.time()
# Generate speech using Groq TTS
response = client.audio.speech.create(
model="playai-tts",
voice=voice,
input=text,
response_format="wav"
)
# Create temporary file for audio
temp_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
temp_file_path = temp_file.name
temp_file.close()
# Write Groq response to temporary file
response.write_to_file(temp_file_path)
synthesis_time = time.time() - start_time
logger.info(f"⏱️ TTS synthesis took {synthesis_time:.2f} seconds")
logger.info(f"🎵 TTS synthesis successful using Groq {voice}")
# Read the generated audio file
import uuid
if os.path.exists(temp_file_path):
logger.info(f"🎵 Reading audio file: {temp_file_path} ({os.path.getsize(temp_file_path)} bytes)")
# Store the file in memory temporarily with a unique ID
file_id = str(uuid.uuid4())
with open(temp_file_path, 'rb') as f:
audio_content = f.read()
# Store in a simple in-memory cache
if not hasattr(app.state, 'audio_cache'):
app.state.audio_cache = {}
app.state.audio_cache[file_id] = audio_content
# Clean up old entries (keep last 10)
if len(app.state.audio_cache) > 10:
oldest_keys = list(app.state.audio_cache.keys())[:-10]
for old_key in oldest_keys:
del app.state.audio_cache[old_key]
# Clean up the temporary file
try:
os.unlink(temp_file_path)
except:
pass
# Create URL for our audio endpoint
if "hf.space" in str(request.url.netloc):
base_url = f"https://{request.url.netloc}"
else:
base_url = f"{request.url.scheme}://{request.url.netloc}"
audio_url = f"{base_url}/tts/audio/{file_id}"
return {
"success": True,
"audio_url": audio_url,
"text": text,
"file_id": file_id
}
else:
raise HTTPException(status_code=500, detail=f"TTS audio file not found: {temp_file_path}")
except Exception as e:
# Enhanced error logging for Groq API issues
error_msg = str(e)
if "voice must be one of" in error_msg:
logger.error(f"Invalid voice parameter: {requested_voice}. Available voices: {valid_voices}")
raise HTTPException(status_code=400, detail=f"Invalid voice '{requested_voice}'. Using default 'Fritz-PlayAI'")
elif "Error code: 400" in error_msg:
logger.error(f"Groq API validation error: {error_msg}")
raise HTTPException(status_code=400, detail="TTS request validation failed")
elif "Error code: 401" in error_msg:
logger.error("Groq API authentication error - check GROQ_API_KEY")
raise HTTPException(status_code=500, detail="TTS service authentication failed")
else:
logger.error(f"TTS synthesis error: {e}")
raise HTTPException(status_code=500, detail=f"TTS synthesis failed: {str(e)}")
@app.get("/tts/audio/{file_id}")
async def get_tts_audio(file_id: str):
"""Serve TTS audio files from in-memory cache."""
try:
# Check if file exists in cache
if not hasattr(app.state, 'audio_cache') or file_id not in app.state.audio_cache:
logger.warning(f"🔇 Audio file not found in cache: {file_id}")
raise HTTPException(status_code=404, detail="Audio file not found or expired")
audio_content = app.state.audio_cache[file_id]
logger.info(f"🔊 Serving audio file: {file_id} ({len(audio_content)} bytes)")
from fastapi.responses import Response
return Response(
content=audio_content,
media_type="audio/wav",
headers={
"Content-Disposition": "inline",
"Cache-Control": "no-cache",
"Access-Control-Allow-Origin": "*",
"Accept-Ranges": "bytes"
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Audio serving error: {e}")
raise HTTPException(status_code=500, detail=f"Audio serving failed: {str(e)}")
@app.post("/api/stt/transcribe")
async def stt_transcribe(file: UploadFile = File(...)):
"""STT transcription using Groq Whisper API."""
try:
from groq import Groq
import os
import tempfile
logger.info(f"🎤 STT transcription request: {file.filename} ({file.content_type})")
# Create Groq STT client
client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
# Time the STT generation for performance monitoring
import time
start_time = time.time()
# For MP4 with Opus codec, convert to a format Groq accepts
audio_data = file.file
content_type = file.content_type
filename = file.filename
if content_type and "mp4" in content_type and "opus" in content_type:
# Convert MP4+Opus to WebM for better Groq compatibility
filename = filename.replace('.mp4', '.webm')
content_type = 'audio/webm'
logger.info(f"🔄 Converting MP4+Opus to WebM for Groq compatibility")
# Create transcription using Groq Whisper
transcription = client.audio.transcriptions.create(
file=(filename, audio_data, content_type),
model="whisper-large-v3-turbo",
response_format="json",
language="en",
temperature=0.0
)
transcription_time = time.time() - start_time
logger.info(f"⏱️ STT transcription took {transcription_time:.2f} seconds")
if transcription and transcription.text:
logger.info(f"🎤 STT transcription successful: \"{transcription.text[:100]}...\"")
return {
"success": True,
"text": transcription.text,
"processing_time": round(transcription_time, 2)
}
else:
raise HTTPException(status_code=500, detail="Empty transcription result")
except Exception as e:
# Enhanced error logging for Groq API issues
error_msg = str(e)
if "Error code: 401" in error_msg:
logger.error("Groq API authentication error - check GROQ_API_KEY")
raise HTTPException(status_code=500, detail="STT service authentication failed")
elif "Error code: 400" in error_msg:
logger.error(f"Groq API validation error: {error_msg}")
raise HTTPException(status_code=400, detail="STT request validation failed")
else:
logger.error(f"STT transcription error: {e}")
raise HTTPException(status_code=500, detail=f"STT transcription failed: {str(e)}")
@app.get("/auth/login", response_model=AuthResponse)
async def google_auth_login(request: Request, state: Optional[str] = None):
"""Initiate Google OAuth login."""
try:
auth_url, oauth_state = calendar_auth.get_authorization_url(state)
return AuthResponse(
auth_url=auth_url,
state=oauth_state
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Auth error: {str(e)}")
@app.get("/auth/callback")
async def google_auth_callback(request: Request, code: str, state: str):
"""Handle Google OAuth callback."""
try:
# Reconstruct the authorization response URL
authorization_response = str(request.url)
# Exchange code for credentials
credentials = calendar_auth.handle_callback(authorization_response, state)
return {
"message": "Authentication successful! Your calendar is now connected.",
"status": "success",
"expires_at": credentials.expiry.isoformat() if credentials.expiry else None
}
except Exception as e:
raise HTTPException(status_code=400, detail=f"Authentication failed: {str(e)}")
@app.get("/auth/status")
async def auth_status():
"""Check authentication status."""
try:
is_authenticated = calendar_auth.is_authenticated()
return {
"authenticated": is_authenticated,
"calendar_id": settings.google_calendar_id if is_authenticated else None,
"message": "Connected to Google Calendar" if is_authenticated else "Not authenticated"
}
except Exception as e:
return {
"authenticated": False,
"error": str(e),
"message": "Authentication check failed"
}
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""Custom HTTP exception handler."""
return JSONResponse(
status_code=exc.status_code,
content={
"error": "HTTPException",
"message": exc.detail,
"details": {"status_code": exc.status_code},
"timestamp": datetime.utcnow().isoformat()
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""General exception handler."""
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": type(exc).__name__,
"message": str(exc),
"details": {"path": str(request.url)},
"timestamp": datetime.utcnow().isoformat()
}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app.api.main:app",
host=settings.app_host,
port=settings.app_port,
reload=True if settings.app_env == "development" else False
)