""" FastAPI application for Digital Islamic Therapist API Provides /chat and /chat-stream endpoints with rate limiting, CORS, and error handling Updated with language context support for Islamic therapeutic guidance """ import os import logging import time from typing import Optional from collections import defaultdict import resend from fastapi import FastAPI, Request, HTTPException, status, Depends, Header from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from slowapi.middleware import SlowAPIMiddleware from dotenv import load_dotenv from agents import Runner, RunContextWrapper from agents.exceptions import InputGuardrailTripwireTriggered from openai.types.responses import ResponseTextDeltaEvent from chatbot.chatbot_agent import islamic_therapist_assistant from sessions.session_manager import session_manager # Load environment variables load_dotenv() # Configure Resend resend.api_key = os.getenv("RESEND_API_KEY") # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('app.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Initialize rate limiter with enhanced security limiter = Limiter(key_func=get_remote_address, default_limits=["100/day", "20/hour", "3/minute"]) # Create FastAPI app app = FastAPI( title="Digital Islamic Therapist API", description="AI-powered Islamic therapeutic chatbot API providing guidance through Islamic teachings, stories, and references", version="1.0.0" ) # Add rate limiter middleware app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) app.add_middleware(SlowAPIMiddleware) # Configure CORS from environment variable allowed_origins = os.getenv("ALLOWED_ORIGINS", "").split(",") allowed_origins = [origin.strip() for origin in allowed_origins if origin.strip()] if allowed_origins: app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) logger.info(f"CORS enabled for origins: {allowed_origins}") else: logger.warning("No ALLOWED_ORIGINS set in .env - CORS disabled") # Security setup security = HTTPBearer() # Enhanced rate limiting dictionaries request_counts = defaultdict(list) # Track requests per IP TICKET_RATE_LIMIT = 5 # Max 5 tickets per hour per IP TICKET_TIME_WINDOW = 3600 # 1 hour in seconds MEETING_RATE_LIMIT = 3 # Max 3 meetings per hour per IP MEETING_TIME_WINDOW = 3600 # 1 hour in seconds # Request/Response models class ChatRequest(BaseModel): message: str language: Optional[str] = "urdu" # Default to Urdu/Roman Urdu if not specified. Options: "urdu", "roman-urdu", "english", "arabic" session_id: Optional[str] = None # Session ID for chat history class ChatResponse(BaseModel): response: str success: bool session_id: str # Include session ID in response class ErrorResponse(BaseModel): error: str detail: Optional[str] = None class TicketRequest(BaseModel): name: str email: str message: str class TicketResponse(BaseModel): success: bool message: str class MeetingRequest(BaseModel): name: str email: str date: str # ISO format date string time: str # Time in HH:MM format timezone: str # Timezone identifier duration: int # Duration in minutes topic: str # Meeting topic/title attendees: list[str] # List of attendee emails description: Optional[str] = None # Optional meeting description location: Optional[str] = "Google Meet" # Meeting location/platform class MeetingResponse(BaseModel): success: bool message: str meeting_id: Optional[str] = None # Unique identifier for the meeting # Security dependency for API key validation async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)): """Verify API key for protected endpoints""" # In production, you would check against a database of valid keys # For now, we'll use an environment variable expected_key = os.getenv("API_KEY") if not expected_key or credentials.credentials != expected_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or missing API key", ) return credentials.credentials def is_ticket_rate_limited(ip_address: str) -> bool: """Check if an IP address has exceeded ticket submission rate limits""" current_time = time.time() # Clean old requests outside the time window request_counts[ip_address] = [ req_time for req_time in request_counts[ip_address] if current_time - req_time < TICKET_TIME_WINDOW ] # Check if limit exceeded if len(request_counts[ip_address]) >= TICKET_RATE_LIMIT: return True # Add current request request_counts[ip_address].append(current_time) return False def is_meeting_rate_limited(ip_address: str) -> bool: """Check if an IP address has exceeded meeting scheduling rate limits""" current_time = time.time() # Clean old requests outside the time window request_counts[ip_address] = [ req_time for req_time in request_counts[ip_address] if current_time - req_time < MEETING_TIME_WINDOW ] # Check if limit exceeded if len(request_counts[ip_address]) >= MEETING_RATE_LIMIT: return True # Add current request request_counts[ip_address].append(current_time) return False def query_islamic_therapist_stream(user_message: str, language: str = "urdu", session_id: Optional[str] = None): """ Query the Digital Islamic Therapist bot with streaming - returns async generator. Now includes language context and session history. Implements fallback to non-streaming when streaming fails (e.g., with Gemini models). """ logger.info(f"AGENT STREAM CALL: query_islamic_therapist_stream called with message='{user_message}', language='{language}', session_id='{session_id}'") # Get session history if session_id is provided history = [] if session_id: history = session_manager.get_session_history(session_id) logger.info(f"Retrieved {len(history)} history messages for session {session_id}") try: # Create context with language preference and history context_data = {"language": language} if history: context_data["history"] = history ctx = RunContextWrapper(context=context_data) result = Runner.run_streamed( islamic_therapist_assistant, input=user_message, context=ctx.context ) async def generate_stream(): try: previous = "" has_streamed = False try: # Attempt streaming with error handling for each event async for event in result.stream_events(): try: if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): delta = event.data.delta or "" # ---- Spacing Fix ---- if ( previous and not previous.endswith((" ", "\n")) and not delta.startswith((" ", ".", ",", "?", "!", ":", ";")) ): delta = " " + delta previous = delta # ---- End Fix ---- yield f"data: {delta}\n\n" has_streamed = True except Exception as event_error: # Handle individual event errors (e.g., missing logprobs field) logger.warning(f"Event processing error: {event_error}") continue yield "data: [DONE]\n\n" logger.info("AGENT STREAM RESULT: query_innscribe_bot_stream completed successfully") except Exception as stream_error: # Fallback to non-streaming if streaming fails logger.warning(f"Streaming failed, falling back to non-streaming: {stream_error}") if not has_streamed: # Get final output using the streaming result's final_output property # Wait for the stream to complete to get final output try: # Use the non-streaming API as fallback fallback_response = await Runner.run( islamic_therapist_assistant, input=user_message, context=ctx.context ) if hasattr(fallback_response, 'final_output'): final_output = fallback_response.final_output else: final_output = fallback_response if hasattr(final_output, 'content'): response_text = final_output.content elif isinstance(final_output, str): response_text = final_output else: response_text = str(final_output) yield f"data: {response_text}\n\n" yield "data: [DONE]\n\n" logger.info("AGENT STREAM RESULT: query_innscribe_bot_stream fallback completed successfully") except Exception as fallback_error: logger.error(f"Fallback also failed: {fallback_error}", exc_info=True) yield f"data: [ERROR] Unable to complete request.\n\n" else: # Already streamed some content, just end gracefully yield "data: [DONE]\n\n" except InputGuardrailTripwireTriggered as e: logger.warning(f"Guardrail blocked query during streaming: {e}") yield f"data: [ERROR] Query was blocked by content guardrail.\n\n" except Exception as e: logger.error(f"Streaming error: {e}", exc_info=True) yield f"data: [ERROR] {str(e)}\n\n" return generate_stream() except Exception as e: logger.error(f"Error setting up stream: {e}", exc_info=True) async def error_stream(): yield f"data: [ERROR] Failed to initialize stream.\n\n" return error_stream() async def query_islamic_therapist(user_message: str, language: str = "urdu", session_id: Optional[str] = None): """ Query the Digital Islamic Therapist bot - returns complete response. Now includes language context and session history. """ logger.info(f"AGENT CALL: query_islamic_therapist called with message='{user_message}', language='{language}', session_id='{session_id}'") # Get session history if session_id is provided history = [] if session_id: history = session_manager.get_session_history(session_id) logger.info(f"Retrieved {len(history)} history messages for session {session_id}") try: # Create context with language preference and history context_data = {"language": language} if history: context_data["history"] = history ctx = RunContextWrapper(context=context_data) response = await Runner.run( islamic_therapist_assistant, input=user_message, context=ctx.context ) logger.info("AGENT RESULT: query_innscribe_bot completed successfully") return response.final_output except InputGuardrailTripwireTriggered as e: logger.warning(f"Guardrail blocked query: {e}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Query was blocked by content guardrail. Please ensure your query is related to Islamic therapy and guidance." ) except Exception as e: logger.error(f"Error in query_innscribe_bot: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An internal error occurred while processing your request." ) @app.get("/") async def root(): return {"status": "ok", "service": "Digital Islamic Therapist API"} @app.get("/health") async def health(): return {"status": "healthy"} @app.post("/session") async def create_session(): """ Create a new chat session Returns a session ID that can be used to maintain chat history """ try: session_id = session_manager.create_session() logger.info(f"Created new session: {session_id}") return {"session_id": session_id, "message": "Session created successfully"} except Exception as e: logger.error(f"Error creating session: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create session" ) @app.post("/chat", response_model=ChatResponse) @limiter.limit("10/minute") # Limit to 10 requests per minute per IP async def chat(request: Request, chat_request: ChatRequest): """ Standard chat endpoint with language support and session history. Accepts: {"message": "...", "language": "urdu" (or "roman-urdu", "english", "arabic"), "session_id": "optional-session-id"} """ try: # Create or use existing session session_id = chat_request.session_id if not session_id: session_id = session_manager.create_session() logger.info(f"Created new session for chat: {session_id}") logger.info( f"Chat request from {get_remote_address(request)}: " f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}" ) # Add user message to session history session_manager.add_message_to_history(session_id, "user", chat_request.message) # Pass language and session to the bot response = await query_islamic_therapist( chat_request.message, language=chat_request.language, session_id=session_id ) if hasattr(response, 'content'): response_text = response.content elif isinstance(response, str): response_text = response else: response_text = str(response) # Add bot response to session history session_manager.add_message_to_history(session_id, "assistant", response_text) logger.info(f"Chat response generated successfully in {chat_request.language} for session {session_id}") return ChatResponse( response=response_text, success=True, session_id=session_id ) except HTTPException: raise except Exception as e: logger.error(f"Unexpected error in /chat: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An internal error occurred while processing your request." ) @app.post("/chat-stream") @limiter.limit("10/minute") # Limit to 10 requests per minute per IP async def chat_stream(request: Request, chat_request: ChatRequest): """ Streaming chat endpoint with language support and session history. Accepts: {"message": "...", "language": "urdu" (or "roman-urdu", "english", "arabic"), "session_id": "optional-session-id"} """ try: # Create or use existing session session_id = chat_request.session_id if not session_id: session_id = session_manager.create_session() logger.info(f"Created new session for streaming chat: {session_id}") logger.info( f"Stream request from {get_remote_address(request)}: " f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}" ) # Add user message to session history session_manager.add_message_to_history(session_id, "user", chat_request.message) # Pass language and session to the streaming bot stream_generator = query_islamic_therapist_stream( chat_request.message, language=chat_request.language, session_id=session_id ) # Note: For streaming, we add the response to history after the stream completes # This would need to be handled in the frontend by making a separate call or # by modifying the stream generator to add the complete response to history return StreamingResponse( stream_generator, media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", "Session-ID": session_id # Include session ID in headers } ) except HTTPException: raise except Exception as e: logger.error(f"Unexpected error in /chat-stream: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An internal error occurred while processing your request." ) @app.post("/ticket", response_model=TicketResponse) @limiter.limit("5/hour") # Limit to 5 tickets per hour per IP async def submit_ticket(request: Request, ticket_request: TicketRequest): """ Submit a support ticket via email using Resend API. Accepts: {"name": "John Doe", "email": "john@example.com", "message": "Issue description"} """ try: client_ip = get_remote_address(request) logger.info(f"Ticket submission request from {ticket_request.name} ({ticket_request.email}) - IP: {client_ip}") # Additional rate limiting for tickets if is_ticket_rate_limited(client_ip): logger.warning(f"Rate limit exceeded for ticket submission from IP: {client_ip}") raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many ticket submissions. Please try again later." ) # Get admin email from environment variables or use a default admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") # Use a verified sender email (you need to verify this in your Resend account) # For testing purposes, you can use your Resend account's verified domain sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev") # Prepare the email using Resend params = { "from": sender_email, "to": [admin_email], "subject": f"Support Ticket from {ticket_request.name}", "html": f"""

Hello Admin,

A new support ticket has been submitted:

Name: {ticket_request.name}

Email: {ticket_request.email}

Message:

{ticket_request.message}

IP Address: {client_ip}


Best regards,
Digital Islamic Therapist Support Team

""" } # Send the email email = resend.Emails.send(params) logger.info(f"Ticket submitted successfully by {ticket_request.name} from IP: {client_ip}") return TicketResponse( success=True, message="Ticket submitted successfully. We'll get back to you soon." ) except HTTPException: raise except Exception as e: logger.error(f"Error submitting ticket: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to submit ticket. Please try again later." ) @app.post("/schedule-meeting", response_model=MeetingResponse) @limiter.limit("3/hour") # Limit to 3 meetings per hour per IP async def schedule_meeting(request: Request, meeting_request: MeetingRequest): """ Schedule a meeting and send email invitations using Resend API. Accepts meeting details and sends professional email invitations to organizer and attendees. """ try: client_ip = get_remote_address(request) logger.info(f"Meeting scheduling request from {meeting_request.name} ({meeting_request.email}) - IP: {client_ip}") # Additional rate limiting for meetings if is_meeting_rate_limited(client_ip): logger.warning(f"Rate limit exceeded for meeting scheduling from IP: {client_ip}") raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many meeting requests. Please try again later." ) # Generate a unique meeting ID meeting_id = f"mtg_{int(time.time())}" # Get admin email from environment variables or use a default admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") # Use a verified sender email (you need to verify this in your Resend account) sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev") # For Resend testing limitations, we can only send to the owner's email # In production, you would verify a domain and use that instead owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") # Format date and time for display formatted_datetime = f"{meeting_request.date} at {meeting_request.time} {meeting_request.timezone}" # Create calendar link (Google Calendar link example) calendar_link = f"https://calendar.google.com/calendar/render?action=TEMPLATE&text={meeting_request.topic}&dates={meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z/{meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z&details={meeting_request.description or 'Meeting scheduled via Innoscribe'}&location={meeting_request.location}" # Combine all attendees (organizer + additional attendees) # Validate and format email addresses all_attendees = [meeting_request.email] # Validate additional attendees - they must be valid email addresses for attendee in meeting_request.attendees: # Simple email validation if "@" in attendee and "." in attendee: all_attendees.append(attendee) else: # If not a valid email, skip or treat as name only logger.warning(f"Invalid email format for attendee: {attendee}. Skipping.") # Remove duplicates while preserving order seen = set() unique_attendees = [] for email in all_attendees: if email not in seen: seen.add(email) unique_attendees.append(email) all_attendees = unique_attendees # Prepare the professional HTML email template html_template = f""" Meeting Scheduled - {meeting_request.topic}

Meeting Confirmed!

Your meeting has been successfully scheduled

Meeting Details

Topic: {meeting_request.topic}
Date & Time: {formatted_datetime}
Duration: {meeting_request.duration} minutes
Location: {meeting_request.location}
Organizer: {meeting_request.name} ({meeting_request.email})

Description

{meeting_request.description or 'No description provided.'}

Attendees

Note: Only valid email addresses will receive invitations.

Add to Calendar

Meeting ID: {meeting_id}

Need to make changes? Contact the organizer or reply to this email.

This meeting was scheduled through Digital Islamic Therapist Services

Note: Due to Resend testing limitations, this email is only sent to the administrator. In production, after domain verification, invitations will be sent to all attendees.

© 2025 Digital Islamic Therapist. All rights reserved.

""" # Send email to all attendees # Check if we have valid attendees to send to if not all_attendees: logger.warning("No valid email addresses found for meeting attendees") return MeetingResponse( success=True, message="Meeting scheduled successfully, but no valid email addresses found for invitations.", meeting_id=meeting_id ) # For Resend testing limitations, we can only send to the owner's email # In production, you would verify a domain and send to all attendees owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") # Prepare email for owner with all attendee information attendee_list_html = ''.join([f'
  • {attendee}
  • ' for attendee in all_attendees]) # In a real implementation, you would send to all attendees after verifying your domain # For now, we're sending to the owner with information about all attendees params = { "from": sender_email, "to": [owner_email], # Only send to owner due to Resend testing limitations "subject": f"Meeting Scheduled: {meeting_request.topic}", "html": html_template } # Send the email try: email = resend.Emails.send(params) logger.info(f"Email sent successfully to {len(all_attendees)} attendees") except Exception as email_error: logger.error(f"Failed to send email: {email_error}", exc_info=True) # Even if email fails, we still consider the meeting scheduled return MeetingResponse( success=True, message="Meeting scheduled successfully, but failed to send email invitations.", meeting_id=meeting_id ) logger.info(f"Meeting scheduled successfully by {meeting_request.name} from IP: {client_ip}") return MeetingResponse( success=True, message="Meeting scheduled successfully. Due to Resend testing limitations, invitations are only sent to the administrator. In production, after verifying your domain, invitations will be sent to all attendees.", meeting_id=meeting_id ) except HTTPException: raise except Exception as e: logger.error(f"Error scheduling meeting: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to schedule meeting. Please try again later." ) @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): logger.error( f"Unhandled exception: {exc}", exc_info=True, extra={ "path": request.url.path, "method": request.method, "client": get_remote_address(request) } ) return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ "error": "Internal server error", "detail": "An unexpected error occurred. Please try again later." } ) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)