Tahasaif3's picture
'code'
39fc07a
"""
FastAPI application for Digital Islamic Therapist API
Provides /chat and /chat-stream endpoints with rate limiting, CORS, and error handling
Updated with language context support for Islamic therapeutic guidance
"""
import os
import logging
import time
from typing import Optional
from collections import defaultdict
import resend
from fastapi import FastAPI, Request, HTTPException, status, Depends, Header
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from dotenv import load_dotenv
from agents import Runner, RunContextWrapper
from agents.exceptions import InputGuardrailTripwireTriggered
from openai.types.responses import ResponseTextDeltaEvent
from chatbot.chatbot_agent import islamic_therapist_assistant
from sessions.session_manager import session_manager
# Load environment variables
load_dotenv()
# Configure Resend
resend.api_key = os.getenv("RESEND_API_KEY")
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Initialize rate limiter with enhanced security
limiter = Limiter(key_func=get_remote_address, default_limits=["100/day", "20/hour", "3/minute"])
# Create FastAPI app
app = FastAPI(
title="Digital Islamic Therapist API",
description="AI-powered Islamic therapeutic chatbot API providing guidance through Islamic teachings, stories, and references",
version="1.0.0"
)
# Add rate limiter middleware
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)
# Configure CORS from environment variable
allowed_origins = os.getenv("ALLOWED_ORIGINS", "").split(",")
allowed_origins = [origin.strip() for origin in allowed_origins if origin.strip()]
if allowed_origins:
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
logger.info(f"CORS enabled for origins: {allowed_origins}")
else:
logger.warning("No ALLOWED_ORIGINS set in .env - CORS disabled")
# Security setup
security = HTTPBearer()
# Enhanced rate limiting dictionaries
request_counts = defaultdict(list) # Track requests per IP
TICKET_RATE_LIMIT = 5 # Max 5 tickets per hour per IP
TICKET_TIME_WINDOW = 3600 # 1 hour in seconds
MEETING_RATE_LIMIT = 3 # Max 3 meetings per hour per IP
MEETING_TIME_WINDOW = 3600 # 1 hour in seconds
# Request/Response models
class ChatRequest(BaseModel):
message: str
language: Optional[str] = "urdu" # Default to Urdu/Roman Urdu if not specified. Options: "urdu", "roman-urdu", "english", "arabic"
session_id: Optional[str] = None # Session ID for chat history
class ChatResponse(BaseModel):
response: str
success: bool
session_id: str # Include session ID in response
class ErrorResponse(BaseModel):
error: str
detail: Optional[str] = None
class TicketRequest(BaseModel):
name: str
email: str
message: str
class TicketResponse(BaseModel):
success: bool
message: str
class MeetingRequest(BaseModel):
name: str
email: str
date: str # ISO format date string
time: str # Time in HH:MM format
timezone: str # Timezone identifier
duration: int # Duration in minutes
topic: str # Meeting topic/title
attendees: list[str] # List of attendee emails
description: Optional[str] = None # Optional meeting description
location: Optional[str] = "Google Meet" # Meeting location/platform
class MeetingResponse(BaseModel):
success: bool
message: str
meeting_id: Optional[str] = None # Unique identifier for the meeting
# Security dependency for API key validation
async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify API key for protected endpoints"""
# In production, you would check against a database of valid keys
# For now, we'll use an environment variable
expected_key = os.getenv("API_KEY")
if not expected_key or credentials.credentials != expected_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing API key",
)
return credentials.credentials
def is_ticket_rate_limited(ip_address: str) -> bool:
"""Check if an IP address has exceeded ticket submission rate limits"""
current_time = time.time()
# Clean old requests outside the time window
request_counts[ip_address] = [
req_time for req_time in request_counts[ip_address]
if current_time - req_time < TICKET_TIME_WINDOW
]
# Check if limit exceeded
if len(request_counts[ip_address]) >= TICKET_RATE_LIMIT:
return True
# Add current request
request_counts[ip_address].append(current_time)
return False
def is_meeting_rate_limited(ip_address: str) -> bool:
"""Check if an IP address has exceeded meeting scheduling rate limits"""
current_time = time.time()
# Clean old requests outside the time window
request_counts[ip_address] = [
req_time for req_time in request_counts[ip_address]
if current_time - req_time < MEETING_TIME_WINDOW
]
# Check if limit exceeded
if len(request_counts[ip_address]) >= MEETING_RATE_LIMIT:
return True
# Add current request
request_counts[ip_address].append(current_time)
return False
def query_islamic_therapist_stream(user_message: str, language: str = "urdu", session_id: Optional[str] = None):
"""
Query the Digital Islamic Therapist bot with streaming - returns async generator.
Now includes language context and session history.
Implements fallback to non-streaming when streaming fails (e.g., with Gemini models).
"""
logger.info(f"AGENT STREAM CALL: query_islamic_therapist_stream called with message='{user_message}', language='{language}', session_id='{session_id}'")
# Get session history if session_id is provided
history = []
if session_id:
history = session_manager.get_session_history(session_id)
logger.info(f"Retrieved {len(history)} history messages for session {session_id}")
try:
# Create context with language preference and history
context_data = {"language": language}
if history:
context_data["history"] = history
ctx = RunContextWrapper(context=context_data)
result = Runner.run_streamed(
islamic_therapist_assistant,
input=user_message,
context=ctx.context
)
async def generate_stream():
try:
previous = ""
has_streamed = False
try:
# Attempt streaming with error handling for each event
async for event in result.stream_events():
try:
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
delta = event.data.delta or ""
# ---- Spacing Fix ----
if (
previous
and not previous.endswith((" ", "\n"))
and not delta.startswith((" ", ".", ",", "?", "!", ":", ";"))
):
delta = " " + delta
previous = delta
# ---- End Fix ----
yield f"data: {delta}\n\n"
has_streamed = True
except Exception as event_error:
# Handle individual event errors (e.g., missing logprobs field)
logger.warning(f"Event processing error: {event_error}")
continue
yield "data: [DONE]\n\n"
logger.info("AGENT STREAM RESULT: query_innscribe_bot_stream completed successfully")
except Exception as stream_error:
# Fallback to non-streaming if streaming fails
logger.warning(f"Streaming failed, falling back to non-streaming: {stream_error}")
if not has_streamed:
# Get final output using the streaming result's final_output property
# Wait for the stream to complete to get final output
try:
# Use the non-streaming API as fallback
fallback_response = await Runner.run(
islamic_therapist_assistant,
input=user_message,
context=ctx.context
)
if hasattr(fallback_response, 'final_output'):
final_output = fallback_response.final_output
else:
final_output = fallback_response
if hasattr(final_output, 'content'):
response_text = final_output.content
elif isinstance(final_output, str):
response_text = final_output
else:
response_text = str(final_output)
yield f"data: {response_text}\n\n"
yield "data: [DONE]\n\n"
logger.info("AGENT STREAM RESULT: query_innscribe_bot_stream fallback completed successfully")
except Exception as fallback_error:
logger.error(f"Fallback also failed: {fallback_error}", exc_info=True)
yield f"data: [ERROR] Unable to complete request.\n\n"
else:
# Already streamed some content, just end gracefully
yield "data: [DONE]\n\n"
except InputGuardrailTripwireTriggered as e:
logger.warning(f"Guardrail blocked query during streaming: {e}")
yield f"data: [ERROR] Query was blocked by content guardrail.\n\n"
except Exception as e:
logger.error(f"Streaming error: {e}", exc_info=True)
yield f"data: [ERROR] {str(e)}\n\n"
return generate_stream()
except Exception as e:
logger.error(f"Error setting up stream: {e}", exc_info=True)
async def error_stream():
yield f"data: [ERROR] Failed to initialize stream.\n\n"
return error_stream()
async def query_islamic_therapist(user_message: str, language: str = "urdu", session_id: Optional[str] = None):
"""
Query the Digital Islamic Therapist bot - returns complete response.
Now includes language context and session history.
"""
logger.info(f"AGENT CALL: query_islamic_therapist called with message='{user_message}', language='{language}', session_id='{session_id}'")
# Get session history if session_id is provided
history = []
if session_id:
history = session_manager.get_session_history(session_id)
logger.info(f"Retrieved {len(history)} history messages for session {session_id}")
try:
# Create context with language preference and history
context_data = {"language": language}
if history:
context_data["history"] = history
ctx = RunContextWrapper(context=context_data)
response = await Runner.run(
islamic_therapist_assistant,
input=user_message,
context=ctx.context
)
logger.info("AGENT RESULT: query_innscribe_bot completed successfully")
return response.final_output
except InputGuardrailTripwireTriggered as e:
logger.warning(f"Guardrail blocked query: {e}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Query was blocked by content guardrail. Please ensure your query is related to Islamic therapy and guidance."
)
except Exception as e:
logger.error(f"Error in query_innscribe_bot: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An internal error occurred while processing your request."
)
@app.get("/")
async def root():
return {"status": "ok", "service": "Digital Islamic Therapist API"}
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.post("/session")
async def create_session():
"""
Create a new chat session
Returns a session ID that can be used to maintain chat history
"""
try:
session_id = session_manager.create_session()
logger.info(f"Created new session: {session_id}")
return {"session_id": session_id, "message": "Session created successfully"}
except Exception as e:
logger.error(f"Error creating session: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create session"
)
@app.post("/chat", response_model=ChatResponse)
@limiter.limit("10/minute") # Limit to 10 requests per minute per IP
async def chat(request: Request, chat_request: ChatRequest):
"""
Standard chat endpoint with language support and session history.
Accepts: {"message": "...", "language": "urdu" (or "roman-urdu", "english", "arabic"), "session_id": "optional-session-id"}
"""
try:
# Create or use existing session
session_id = chat_request.session_id
if not session_id:
session_id = session_manager.create_session()
logger.info(f"Created new session for chat: {session_id}")
logger.info(
f"Chat request from {get_remote_address(request)}: "
f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}"
)
# Add user message to session history
session_manager.add_message_to_history(session_id, "user", chat_request.message)
# Pass language and session to the bot
response = await query_islamic_therapist(
chat_request.message,
language=chat_request.language,
session_id=session_id
)
if hasattr(response, 'content'):
response_text = response.content
elif isinstance(response, str):
response_text = response
else:
response_text = str(response)
# Add bot response to session history
session_manager.add_message_to_history(session_id, "assistant", response_text)
logger.info(f"Chat response generated successfully in {chat_request.language} for session {session_id}")
return ChatResponse(
response=response_text,
success=True,
session_id=session_id
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error in /chat: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An internal error occurred while processing your request."
)
@app.post("/chat-stream")
@limiter.limit("10/minute") # Limit to 10 requests per minute per IP
async def chat_stream(request: Request, chat_request: ChatRequest):
"""
Streaming chat endpoint with language support and session history.
Accepts: {"message": "...", "language": "urdu" (or "roman-urdu", "english", "arabic"), "session_id": "optional-session-id"}
"""
try:
# Create or use existing session
session_id = chat_request.session_id
if not session_id:
session_id = session_manager.create_session()
logger.info(f"Created new session for streaming chat: {session_id}")
logger.info(
f"Stream request from {get_remote_address(request)}: "
f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}"
)
# Add user message to session history
session_manager.add_message_to_history(session_id, "user", chat_request.message)
# Pass language and session to the streaming bot
stream_generator = query_islamic_therapist_stream(
chat_request.message,
language=chat_request.language,
session_id=session_id
)
# Note: For streaming, we add the response to history after the stream completes
# This would need to be handled in the frontend by making a separate call or
# by modifying the stream generator to add the complete response to history
return StreamingResponse(
stream_generator,
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"Session-ID": session_id # Include session ID in headers
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error in /chat-stream: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An internal error occurred while processing your request."
)
@app.post("/ticket", response_model=TicketResponse)
@limiter.limit("5/hour") # Limit to 5 tickets per hour per IP
async def submit_ticket(request: Request, ticket_request: TicketRequest):
"""
Submit a support ticket via email using Resend API.
Accepts: {"name": "John Doe", "email": "john@example.com", "message": "Issue description"}
"""
try:
client_ip = get_remote_address(request)
logger.info(f"Ticket submission request from {ticket_request.name} ({ticket_request.email}) - IP: {client_ip}")
# Additional rate limiting for tickets
if is_ticket_rate_limited(client_ip):
logger.warning(f"Rate limit exceeded for ticket submission from IP: {client_ip}")
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many ticket submissions. Please try again later."
)
# Get admin email from environment variables or use a default
admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com")
# Use a verified sender email (you need to verify this in your Resend account)
# For testing purposes, you can use your Resend account's verified domain
sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev")
# Prepare the email using Resend
params = {
"from": sender_email,
"to": [admin_email],
"subject": f"Support Ticket from {ticket_request.name}",
"html": f"""
<p>Hello Admin,</p>
<p>A new support ticket has been submitted:</p>
<p><strong>Name:</strong> {ticket_request.name}</p>
<p><strong>Email:</strong> {ticket_request.email}</p>
<p><strong>Message:</strong></p>
<p>{ticket_request.message}</p>
<p><strong>IP Address:</strong> {client_ip}</p>
<br>
<p>Best regards,<br>Digital Islamic Therapist Support Team</p>
"""
}
# Send the email
email = resend.Emails.send(params)
logger.info(f"Ticket submitted successfully by {ticket_request.name} from IP: {client_ip}")
return TicketResponse(
success=True,
message="Ticket submitted successfully. We'll get back to you soon."
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error submitting ticket: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to submit ticket. Please try again later."
)
@app.post("/schedule-meeting", response_model=MeetingResponse)
@limiter.limit("3/hour") # Limit to 3 meetings per hour per IP
async def schedule_meeting(request: Request, meeting_request: MeetingRequest):
"""
Schedule a meeting and send email invitations using Resend API.
Accepts meeting details and sends professional email invitations to organizer and attendees.
"""
try:
client_ip = get_remote_address(request)
logger.info(f"Meeting scheduling request from {meeting_request.name} ({meeting_request.email}) - IP: {client_ip}")
# Additional rate limiting for meetings
if is_meeting_rate_limited(client_ip):
logger.warning(f"Rate limit exceeded for meeting scheduling from IP: {client_ip}")
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many meeting requests. Please try again later."
)
# Generate a unique meeting ID
meeting_id = f"mtg_{int(time.time())}"
# Get admin email from environment variables or use a default
admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com")
# Use a verified sender email (you need to verify this in your Resend account)
sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev")
# For Resend testing limitations, we can only send to the owner's email
# In production, you would verify a domain and use that instead
owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com")
# Format date and time for display
formatted_datetime = f"{meeting_request.date} at {meeting_request.time} {meeting_request.timezone}"
# Create calendar link (Google Calendar link example)
calendar_link = f"https://calendar.google.com/calendar/render?action=TEMPLATE&text={meeting_request.topic}&dates={meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z/{meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z&details={meeting_request.description or 'Meeting scheduled via Innoscribe'}&location={meeting_request.location}"
# Combine all attendees (organizer + additional attendees)
# Validate and format email addresses
all_attendees = [meeting_request.email]
# Validate additional attendees - they must be valid email addresses
for attendee in meeting_request.attendees:
# Simple email validation
if "@" in attendee and "." in attendee:
all_attendees.append(attendee)
else:
# If not a valid email, skip or treat as name only
logger.warning(f"Invalid email format for attendee: {attendee}. Skipping.")
# Remove duplicates while preserving order
seen = set()
unique_attendees = []
for email in all_attendees:
if email not in seen:
seen.add(email)
unique_attendees.append(email)
all_attendees = unique_attendees
# Prepare the professional HTML email template
html_template = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Meeting Scheduled - {meeting_request.topic}</title>
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333; max-width: 600px; margin: 0 auto; padding: 20px;">
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 30px; text-align: center; border-radius: 10px 10px 0 0;">
<h1 style="margin: 0; font-size: 28px;">Meeting Confirmed!</h1>
<p style="font-size: 18px; margin-top: 10px;">Your meeting has been successfully scheduled</p>
</div>
<div style="background-color: #ffffff; padding: 30px; border: 1px solid #eaeaea; border-top: none; border-radius: 0 0 10px 10px;">
<h2 style="color: #333;">Meeting Details</h2>
<div style="background-color: #f8f9fa; padding: 20px; border-radius: 8px; margin: 20px 0;">
<table style="width: 100%; border-collapse: collapse;">
<tr>
<td style="padding: 8px 0; font-weight: bold; width: 30%;">Topic:</td>
<td style="padding: 8px 0;">{meeting_request.topic}</td>
</tr>
<tr style="background-color: #f0f0f0;">
<td style="padding: 8px 0; font-weight: bold;">Date & Time:</td>
<td style="padding: 8px 0;">{formatted_datetime}</td>
</tr>
<tr>
<td style="padding: 8px 0; font-weight: bold;">Duration:</td>
<td style="padding: 8px 0;">{meeting_request.duration} minutes</td>
</tr>
<tr style="background-color: #f0f0f0;">
<td style="padding: 8px 0; font-weight: bold;">Location:</td>
<td style="padding: 8px 0;">{meeting_request.location}</td>
</tr>
<tr>
<td style="padding: 8px 0; font-weight: bold;">Organizer:</td>
<td style="padding: 8px 0;">{meeting_request.name} ({meeting_request.email})</td>
</tr>
</table>
</div>
<div style="margin: 25px 0;">
<h3 style="color: #333;">Description</h3>
<p style="background-color: #f8f9fa; padding: 15px; border-radius: 8px; white-space: pre-wrap;">{meeting_request.description or 'No description provided.'}</p>
</div>
<div style="margin: 25px 0;">
<h3 style="color: #333;">Attendees</h3>
<ul style="background-color: #f8f9fa; padding: 15px; border-radius: 8px;">
{''.join([f'<li>{attendee}</li>' for attendee in all_attendees])}
</ul>
<p style="font-size: 12px; color: #666; margin-top: 5px;">Note: Only valid email addresses will receive invitations.</p>
</div>
<div style="text-align: center; margin: 30px 0;">
<a href="{calendar_link}" style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 12px 25px; text-decoration: none; border-radius: 5px; font-weight: bold; display: inline-block;">Add to Calendar</a>
</div>
<div style="background-color: #e3f2fd; padding: 15px; border-radius: 8px; margin-top: 25px;">
<p style="margin: 0;"><strong>Meeting ID:</strong> {meeting_id}</p>
<p style="margin: 10px 0 0 0; font-size: 14px; color: #666;">Need to make changes? Contact the organizer or reply to this email.</p>
</div>
</div>
<div style="text-align: center; margin-top: 30px; color: #888; font-size: 14px;">
<p>This meeting was scheduled through Digital Islamic Therapist Services</p>
<p><strong>Note:</strong> Due to Resend testing limitations, this email is only sent to the administrator. In production, after domain verification, invitations will be sent to all attendees.</p>
<p>© 2025 Digital Islamic Therapist. All rights reserved.</p>
</div>
</body>
</html>
"""
# Send email to all attendees
# Check if we have valid attendees to send to
if not all_attendees:
logger.warning("No valid email addresses found for meeting attendees")
return MeetingResponse(
success=True,
message="Meeting scheduled successfully, but no valid email addresses found for invitations.",
meeting_id=meeting_id
)
# For Resend testing limitations, we can only send to the owner's email
# In production, you would verify a domain and send to all attendees
owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com")
# Prepare email for owner with all attendee information
attendee_list_html = ''.join([f'<li>{attendee}</li>' for attendee in all_attendees])
# In a real implementation, you would send to all attendees after verifying your domain
# For now, we're sending to the owner with information about all attendees
params = {
"from": sender_email,
"to": [owner_email], # Only send to owner due to Resend testing limitations
"subject": f"Meeting Scheduled: {meeting_request.topic}",
"html": html_template
}
# Send the email
try:
email = resend.Emails.send(params)
logger.info(f"Email sent successfully to {len(all_attendees)} attendees")
except Exception as email_error:
logger.error(f"Failed to send email: {email_error}", exc_info=True)
# Even if email fails, we still consider the meeting scheduled
return MeetingResponse(
success=True,
message="Meeting scheduled successfully, but failed to send email invitations.",
meeting_id=meeting_id
)
logger.info(f"Meeting scheduled successfully by {meeting_request.name} from IP: {client_ip}")
return MeetingResponse(
success=True,
message="Meeting scheduled successfully. Due to Resend testing limitations, invitations are only sent to the administrator. In production, after verifying your domain, invitations will be sent to all attendees.",
meeting_id=meeting_id
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error scheduling meeting: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to schedule meeting. Please try again later."
)
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(
f"Unhandled exception: {exc}",
exc_info=True,
extra={
"path": request.url.path,
"method": request.method,
"client": get_remote_address(request)
}
)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": "Internal server error",
"detail": "An unexpected error occurred. Please try again later."
}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)