""" FastAPI application for Digital Islamic Therapist API Provides /chat and /chat-stream endpoints with rate limiting, CORS, and error handling Updated with language context support for Islamic therapeutic guidance """ import os import logging import time from typing import Optional from collections import defaultdict import resend from fastapi import FastAPI, Request, HTTPException, status, Depends, Header from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from slowapi.middleware import SlowAPIMiddleware from dotenv import load_dotenv from agents import Runner, RunContextWrapper from agents.exceptions import InputGuardrailTripwireTriggered from openai.types.responses import ResponseTextDeltaEvent from chatbot.chatbot_agent import islamic_therapist_assistant from sessions.session_manager import session_manager # Load environment variables load_dotenv() # Configure Resend resend.api_key = os.getenv("RESEND_API_KEY") # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('app.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Initialize rate limiter with enhanced security limiter = Limiter(key_func=get_remote_address, default_limits=["100/day", "20/hour", "3/minute"]) # Create FastAPI app app = FastAPI( title="Digital Islamic Therapist API", description="AI-powered Islamic therapeutic chatbot API providing guidance through Islamic teachings, stories, and references", version="1.0.0" ) # Add rate limiter middleware app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) app.add_middleware(SlowAPIMiddleware) # Configure CORS from environment variable allowed_origins = os.getenv("ALLOWED_ORIGINS", "").split(",") allowed_origins = [origin.strip() for origin in allowed_origins if origin.strip()] if allowed_origins: app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) logger.info(f"CORS enabled for origins: {allowed_origins}") else: logger.warning("No ALLOWED_ORIGINS set in .env - CORS disabled") # Security setup security = HTTPBearer() # Enhanced rate limiting dictionaries request_counts = defaultdict(list) # Track requests per IP TICKET_RATE_LIMIT = 5 # Max 5 tickets per hour per IP TICKET_TIME_WINDOW = 3600 # 1 hour in seconds MEETING_RATE_LIMIT = 3 # Max 3 meetings per hour per IP MEETING_TIME_WINDOW = 3600 # 1 hour in seconds # Request/Response models class ChatRequest(BaseModel): message: str language: Optional[str] = "urdu" # Default to Urdu/Roman Urdu if not specified. Options: "urdu", "roman-urdu", "english", "arabic" session_id: Optional[str] = None # Session ID for chat history class ChatResponse(BaseModel): response: str success: bool session_id: str # Include session ID in response class ErrorResponse(BaseModel): error: str detail: Optional[str] = None class TicketRequest(BaseModel): name: str email: str message: str class TicketResponse(BaseModel): success: bool message: str class MeetingRequest(BaseModel): name: str email: str date: str # ISO format date string time: str # Time in HH:MM format timezone: str # Timezone identifier duration: int # Duration in minutes topic: str # Meeting topic/title attendees: list[str] # List of attendee emails description: Optional[str] = None # Optional meeting description location: Optional[str] = "Google Meet" # Meeting location/platform class MeetingResponse(BaseModel): success: bool message: str meeting_id: Optional[str] = None # Unique identifier for the meeting # Security dependency for API key validation async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)): """Verify API key for protected endpoints""" # In production, you would check against a database of valid keys # For now, we'll use an environment variable expected_key = os.getenv("API_KEY") if not expected_key or credentials.credentials != expected_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or missing API key", ) return credentials.credentials def is_ticket_rate_limited(ip_address: str) -> bool: """Check if an IP address has exceeded ticket submission rate limits""" current_time = time.time() # Clean old requests outside the time window request_counts[ip_address] = [ req_time for req_time in request_counts[ip_address] if current_time - req_time < TICKET_TIME_WINDOW ] # Check if limit exceeded if len(request_counts[ip_address]) >= TICKET_RATE_LIMIT: return True # Add current request request_counts[ip_address].append(current_time) return False def is_meeting_rate_limited(ip_address: str) -> bool: """Check if an IP address has exceeded meeting scheduling rate limits""" current_time = time.time() # Clean old requests outside the time window request_counts[ip_address] = [ req_time for req_time in request_counts[ip_address] if current_time - req_time < MEETING_TIME_WINDOW ] # Check if limit exceeded if len(request_counts[ip_address]) >= MEETING_RATE_LIMIT: return True # Add current request request_counts[ip_address].append(current_time) return False def query_islamic_therapist_stream(user_message: str, language: str = "urdu", session_id: Optional[str] = None): """ Query the Digital Islamic Therapist bot with streaming - returns async generator. Now includes language context and session history. Implements fallback to non-streaming when streaming fails (e.g., with Gemini models). """ logger.info(f"AGENT STREAM CALL: query_islamic_therapist_stream called with message='{user_message}', language='{language}', session_id='{session_id}'") # Get session history if session_id is provided history = [] if session_id: history = session_manager.get_session_history(session_id) logger.info(f"Retrieved {len(history)} history messages for session {session_id}") try: # Create context with language preference and history context_data = {"language": language} if history: context_data["history"] = history ctx = RunContextWrapper(context=context_data) result = Runner.run_streamed( islamic_therapist_assistant, input=user_message, context=ctx.context ) async def generate_stream(): try: previous = "" has_streamed = False try: # Attempt streaming with error handling for each event async for event in result.stream_events(): try: if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): delta = event.data.delta or "" # ---- Spacing Fix ---- if ( previous and not previous.endswith((" ", "\n")) and not delta.startswith((" ", ".", ",", "?", "!", ":", ";")) ): delta = " " + delta previous = delta # ---- End Fix ---- yield f"data: {delta}\n\n" has_streamed = True except Exception as event_error: # Handle individual event errors (e.g., missing logprobs field) logger.warning(f"Event processing error: {event_error}") continue yield "data: [DONE]\n\n" logger.info("AGENT STREAM RESULT: query_innscribe_bot_stream completed successfully") except Exception as stream_error: # Fallback to non-streaming if streaming fails logger.warning(f"Streaming failed, falling back to non-streaming: {stream_error}") if not has_streamed: # Get final output using the streaming result's final_output property # Wait for the stream to complete to get final output try: # Use the non-streaming API as fallback fallback_response = await Runner.run( islamic_therapist_assistant, input=user_message, context=ctx.context ) if hasattr(fallback_response, 'final_output'): final_output = fallback_response.final_output else: final_output = fallback_response if hasattr(final_output, 'content'): response_text = final_output.content elif isinstance(final_output, str): response_text = final_output else: response_text = str(final_output) yield f"data: {response_text}\n\n" yield "data: [DONE]\n\n" logger.info("AGENT STREAM RESULT: query_innscribe_bot_stream fallback completed successfully") except Exception as fallback_error: logger.error(f"Fallback also failed: {fallback_error}", exc_info=True) yield f"data: [ERROR] Unable to complete request.\n\n" else: # Already streamed some content, just end gracefully yield "data: [DONE]\n\n" except InputGuardrailTripwireTriggered as e: logger.warning(f"Guardrail blocked query during streaming: {e}") yield f"data: [ERROR] Query was blocked by content guardrail.\n\n" except Exception as e: logger.error(f"Streaming error: {e}", exc_info=True) yield f"data: [ERROR] {str(e)}\n\n" return generate_stream() except Exception as e: logger.error(f"Error setting up stream: {e}", exc_info=True) async def error_stream(): yield f"data: [ERROR] Failed to initialize stream.\n\n" return error_stream() async def query_islamic_therapist(user_message: str, language: str = "urdu", session_id: Optional[str] = None): """ Query the Digital Islamic Therapist bot - returns complete response. Now includes language context and session history. """ logger.info(f"AGENT CALL: query_islamic_therapist called with message='{user_message}', language='{language}', session_id='{session_id}'") # Get session history if session_id is provided history = [] if session_id: history = session_manager.get_session_history(session_id) logger.info(f"Retrieved {len(history)} history messages for session {session_id}") try: # Create context with language preference and history context_data = {"language": language} if history: context_data["history"] = history ctx = RunContextWrapper(context=context_data) response = await Runner.run( islamic_therapist_assistant, input=user_message, context=ctx.context ) logger.info("AGENT RESULT: query_innscribe_bot completed successfully") return response.final_output except InputGuardrailTripwireTriggered as e: logger.warning(f"Guardrail blocked query: {e}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Query was blocked by content guardrail. Please ensure your query is related to Islamic therapy and guidance." ) except Exception as e: logger.error(f"Error in query_innscribe_bot: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An internal error occurred while processing your request." ) @app.get("/") async def root(): return {"status": "ok", "service": "Digital Islamic Therapist API"} @app.get("/health") async def health(): return {"status": "healthy"} @app.post("/session") async def create_session(): """ Create a new chat session Returns a session ID that can be used to maintain chat history """ try: session_id = session_manager.create_session() logger.info(f"Created new session: {session_id}") return {"session_id": session_id, "message": "Session created successfully"} except Exception as e: logger.error(f"Error creating session: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create session" ) @app.post("/chat", response_model=ChatResponse) @limiter.limit("10/minute") # Limit to 10 requests per minute per IP async def chat(request: Request, chat_request: ChatRequest): """ Standard chat endpoint with language support and session history. Accepts: {"message": "...", "language": "urdu" (or "roman-urdu", "english", "arabic"), "session_id": "optional-session-id"} """ try: # Create or use existing session session_id = chat_request.session_id if not session_id: session_id = session_manager.create_session() logger.info(f"Created new session for chat: {session_id}") logger.info( f"Chat request from {get_remote_address(request)}: " f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}" ) # Add user message to session history session_manager.add_message_to_history(session_id, "user", chat_request.message) # Pass language and session to the bot response = await query_islamic_therapist( chat_request.message, language=chat_request.language, session_id=session_id ) if hasattr(response, 'content'): response_text = response.content elif isinstance(response, str): response_text = response else: response_text = str(response) # Add bot response to session history session_manager.add_message_to_history(session_id, "assistant", response_text) logger.info(f"Chat response generated successfully in {chat_request.language} for session {session_id}") return ChatResponse( response=response_text, success=True, session_id=session_id ) except HTTPException: raise except Exception as e: logger.error(f"Unexpected error in /chat: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An internal error occurred while processing your request." ) @app.post("/chat-stream") @limiter.limit("10/minute") # Limit to 10 requests per minute per IP async def chat_stream(request: Request, chat_request: ChatRequest): """ Streaming chat endpoint with language support and session history. Accepts: {"message": "...", "language": "urdu" (or "roman-urdu", "english", "arabic"), "session_id": "optional-session-id"} """ try: # Create or use existing session session_id = chat_request.session_id if not session_id: session_id = session_manager.create_session() logger.info(f"Created new session for streaming chat: {session_id}") logger.info( f"Stream request from {get_remote_address(request)}: " f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}" ) # Add user message to session history session_manager.add_message_to_history(session_id, "user", chat_request.message) # Pass language and session to the streaming bot stream_generator = query_islamic_therapist_stream( chat_request.message, language=chat_request.language, session_id=session_id ) # Note: For streaming, we add the response to history after the stream completes # This would need to be handled in the frontend by making a separate call or # by modifying the stream generator to add the complete response to history return StreamingResponse( stream_generator, media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", "Session-ID": session_id # Include session ID in headers } ) except HTTPException: raise except Exception as e: logger.error(f"Unexpected error in /chat-stream: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An internal error occurred while processing your request." ) @app.post("/ticket", response_model=TicketResponse) @limiter.limit("5/hour") # Limit to 5 tickets per hour per IP async def submit_ticket(request: Request, ticket_request: TicketRequest): """ Submit a support ticket via email using Resend API. Accepts: {"name": "John Doe", "email": "john@example.com", "message": "Issue description"} """ try: client_ip = get_remote_address(request) logger.info(f"Ticket submission request from {ticket_request.name} ({ticket_request.email}) - IP: {client_ip}") # Additional rate limiting for tickets if is_ticket_rate_limited(client_ip): logger.warning(f"Rate limit exceeded for ticket submission from IP: {client_ip}") raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many ticket submissions. Please try again later." ) # Get admin email from environment variables or use a default admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") # Use a verified sender email (you need to verify this in your Resend account) # For testing purposes, you can use your Resend account's verified domain sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev") # Prepare the email using Resend params = { "from": sender_email, "to": [admin_email], "subject": f"Support Ticket from {ticket_request.name}", "html": f"""
Hello Admin,
A new support ticket has been submitted:
Name: {ticket_request.name}
Email: {ticket_request.email}
Message:
{ticket_request.message}
IP Address: {client_ip}
Best regards,
Digital Islamic Therapist Support Team
Your meeting has been successfully scheduled
| Topic: | {meeting_request.topic} |
| Date & Time: | {formatted_datetime} |
| Duration: | {meeting_request.duration} minutes |
| Location: | {meeting_request.location} |
| Organizer: | {meeting_request.name} ({meeting_request.email}) |
{meeting_request.description or 'No description provided.'}
Note: Only valid email addresses will receive invitations.
Meeting ID: {meeting_id}
Need to make changes? Contact the organizer or reply to this email.
This meeting was scheduled through Digital Islamic Therapist Services
Note: Due to Resend testing limitations, this email is only sent to the administrator. In production, after domain verification, invitations will be sent to all attendees.
© 2025 Digital Islamic Therapist. All rights reserved.