# """# FastAPI application for JOBObike Chatbot API # Provides /chat and /chat-stream endpoints with rate limiting, CORS, and error handling # Updated with language context support # """ # import os # import logging # import time # from typing import Optional # from collections import defaultdict # import resend # from fastapi import FastAPI, Request, HTTPException, status, Depends, Header # from fastapi.responses import StreamingResponse, JSONResponse # from fastapi.middleware.cors import CORSMiddleware # from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials # from pydantic import BaseModel # from slowapi import Limiter, _rate_limit_exceeded_handler # from slowapi.util import get_remote_address # from slowapi.errors import RateLimitExceeded # from slowapi.middleware import SlowAPIMiddleware # from dotenv import load_dotenv # from agents import Runner, RunContextWrapper # from agents.exceptions import InputGuardrailTripwireTriggered # from openai.types.responses import ResponseTextDeltaEvent # from chatbot.chatbot_agent import jobobike_assistant # from sessions.session_manager import session_manager # # Load environment variables # load_dotenv() # # Configure Resend # resend.api_key = os.getenv("RESEND_API_KEY") # # Configure logging # logging.basicConfig( # level=logging.INFO, # format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # handlers=[ # logging.FileHandler('app.log'), # logging.StreamHandler() # ] # ) # logger = logging.getLogger(__name__) # # Initialize rate limiter with enhanced security # limiter = Limiter(key_func=get_remote_address, default_limits=["100/day", "20/hour", "3/minute"]) # # Create FastAPI app # app = FastAPI( # title="JOBObike Chatbot API", # description="AI-powered chatbot API for JOBObike services", # version="1.0.0" # ) # # Add rate limiter middleware # app.state.limiter = limiter # app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # app.add_middleware(SlowAPIMiddleware) # # Configure CORS from environment variable # allowed_origins = os.getenv("ALLOWED_ORIGINS", "").split(",") # allowed_origins = [origin.strip() for origin in allowed_origins if origin.strip()] # if allowed_origins: # app.add_middleware( # CORSMiddleware, # allow_origins=["*"] + allowed_origins, # allow_credentials=True, # allow_methods=["*"], # allow_headers=["*"], # ) # logger.info(f"CORS enabled for origins: {allowed_origins}") # else: # logger.warning("No ALLOWED_ORIGINS set in .env - CORS disabled") # # Security setup # security = HTTPBearer() # # Enhanced rate limiting dictionaries # request_counts = defaultdict(list) # Track requests per IP # TICKET_RATE_LIMIT = 5 # Max 5 tickets per hour per IP # TICKET_TIME_WINDOW = 3600 # 1 hour in seconds # MEETING_RATE_LIMIT = 3 # Max 3 meetings per hour per IP # MEETING_TIME_WINDOW = 3600 # 1 hour in seconds # # Request/Response models # class ChatRequest(BaseModel): # message: str # language: Optional[str] = "english" # Default to English if not specified # session_id: Optional[str] = None # Session ID for chat history # class ChatResponse(BaseModel): # response: str # success: bool # session_id: str # Include session ID in response # class ErrorResponse(BaseModel): # error: str # detail: Optional[str] = None # class TicketRequest(BaseModel): # name: str # email: str # message: str # class TicketResponse(BaseModel): # success: bool # message: str # class MeetingRequest(BaseModel): # name: str # email: str # date: str # ISO format date string # time: str # Time in HH:MM format # timezone: str # Timezone identifier # duration: int # Duration in minutes # topic: str # Meeting topic/title # attendees: list[str] # List of attendee emails # description: Optional[str] = None # Optional meeting description # location: Optional[str] = "Google Meet" # Meeting location/platform # class MeetingResponse(BaseModel): # success: bool # message: str # meeting_id: Optional[str] = None # Unique identifier for the meeting # # Security dependency for API key validation # async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)): # """Verify API key for protected endpoints""" # # In production, you would check against a database of valid keys # # For now, we'll use an environment variable # expected_key = os.getenv("API_KEY") # if not expected_key or credentials.credentials != expected_key: # raise HTTPException( # status_code=status.HTTP_401_UNAUTHORIZED, # detail="Invalid or missing API key", # ) # return credentials.credentials # def is_ticket_rate_limited(ip_address: str) -> bool: # """Check if an IP address has exceeded ticket submission rate limits""" # current_time = time.time() # # Clean old requests outside the time window # request_counts[ip_address] = [ # req_time for req_time in request_counts[ip_address] # if current_time - req_time < TICKET_TIME_WINDOW # ] # # Check if limit exceeded # if len(request_counts[ip_address]) >= TICKET_RATE_LIMIT: # return True # # Add current request # request_counts[ip_address].append(current_time) # return False # def is_meeting_rate_limited(ip_address: str) -> bool: # """Check if an IP address has exceeded meeting scheduling rate limits""" # current_time = time.time() # # Clean old requests outside the time window # request_counts[ip_address] = [ # req_time for req_time in request_counts[ip_address] # if current_time - req_time < MEETING_TIME_WINDOW # ] # # Check if limit exceeded # if len(request_counts[ip_address]) >= MEETING_RATE_LIMIT: # return True # # Add current request # request_counts[ip_address].append(current_time) # return False # def query_jobobike_bot_stream(user_message: str, language: str = "english", session_id: Optional[str] = None): # """ # Query the JOBObike bot with streaming - returns async generator. # Now includes language context and session history. # Implements fallback to non-streaming when streaming fails (e.g., with Gemini models). # """ # logger.info(f"AGENT STREAM CALL: query_jobobike_bot_stream called with message='{user_message}', language='{language}', session_id='{session_id}'") # # Get session history if session_id is provided # history = [] # if session_id: # history = session_manager.get_session_history(session_id) # logger.info(f"Retrieved {len(history)} history messages for session {session_id}") # try: # # Create context with language preference and history # context_data = {"language": language} # if history: # context_data["history"] = history # ctx = RunContextWrapper(context=context_data) # result = Runner.run_streamed( # jobobike_assistant, # input=user_message, # context=ctx.context # ) # async def generate_stream(): # try: # previous = "" # has_streamed = True # try: # # Attempt streaming with error handling for each event # async for event in result.stream_events(): # try: # if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): # delta = event.data.delta or "" # # ---- Spacing Fix ---- # if ( # previous # and not previous.endswith((" ", "\n")) # and not delta.startswith((" ", ".", ",", "?", "!", ":", ";")) # ): # delta = " " + delta # previous = delta # # ---- End Fix ---- # yield f"data: {delta}\n\n" # has_streamed = True # except Exception as event_error: # # Handle individual event errors (e.g., missing logprobs field) # logger.warning(f"Event processing error: {event_error}") # continue # yield "data: [DONE]\n\n" # logger.info("AGENT STREAM RESULT: query_jobobike_bot_stream completed successfully") # except Exception as stream_error: # # Fallback to non-streaming if streaming fails # logger.warning(f"Streaming failed, falling back to non-streaming: {stream_error}") # if not has_streamed: # # Get final output using the streaming result's final_output property # # Wait for the stream to complete to get final output # try: # # Use the non-streaming API as fallback # fallback_response = await Runner.run( # jobobike_assistant, # input=user_message, # context=ctx.context # ) # if hasattr(fallback_response, 'final_output'): # final_output = fallback_response.final_output # else: # final_output = fallback_response # if hasattr(final_output, 'content'): # response_text = final_output.content # elif isinstance(final_output, str): # response_text = final_output # else: # response_text = str(final_output) # yield f"data: {response_text}\n\n" # yield "data: [DONE]\n\n" # logger.info("AGENT STREAM RESULT: query_jobobike_bot_stream fallback completed successfully") # except Exception as fallback_error: # logger.error(f"Fallback also failed: {fallback_error}", exc_info=True) # yield f"data: [ERROR] Unable to complete request.\n\n" # else: # # Already streamed some content, just end gracefully # yield "data: [DONE]\n\n" # except InputGuardrailTripwireTriggered as e: # logger.warning(f"Guardrail blocked query during streaming: {e}") # yield f"data: [ERROR] Query was blocked by content guardrail.\n\n" # except Exception as e: # logger.error(f"Streaming error: {e}", exc_info=True) # yield f"data: [ERROR] {str(e)}\n\n" # return generate_stream() # except Exception as e: # logger.error(f"Error setting up stream: {e}", exc_info=True) # async def error_stream(): # yield f"data: [ERROR] Failed to initialize stream.\n\n" # return error_stream() # async def query_jobobike_bot(user_message: str, language: str = "english", session_id: Optional[str] = None): # """ # Query the JOBObike bot - returns complete response. # Now includes language context and session history. # """ # logger.info(f"AGENT CALL: query_jobobike_bot called with message='{user_message}', language='{language}', session_id='{session_id}'") # # Get session history if session_id is provided # history = [] # if session_id: # history = session_manager.get_session_history(session_id) # logger.info(f"Retrieved {len(history)} history messages for session {session_id}") # try: # # Create context with language preference and history # context_data = {"language": language} # if history: # context_data["history"] = history # ctx = RunContextWrapper(context=context_data) # response = await Runner.run( # jobobike_assistant, # input=user_message, # context=ctx.context # ) # logger.info("AGENT RESULT: query_jobobike_bot completed successfully") # return response.final_output # except InputGuardrailTripwireTriggered as e: # logger.warning(f"Guardrail blocked query: {e}") # raise HTTPException( # status_code=status.HTTP_403_FORBIDDEN, # detail="Query was blocked by content guardrail. Please ensure your query is related to JOBObike services." # ) # except Exception as e: # logger.error(f"Error in query_jobobike_bot: {e}", exc_info=True) # raise HTTPException( # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, # detail="An internal error occurred while processing your request." # ) # @app.get("/") # async def root(): # return {"status": "ok", "service": "JOBObike Chatbot API"} # @app.get("/health") # async def health(): # return {"status": "healthy"} # @app.post("/session") # async def create_session(): # """ # Create a new chat session # Returns a session ID that can be used to maintain chat history # """ # try: # session_id = session_manager.create_session() # logger.info(f"Created new session: {session_id}") # return {"session_id": session_id, "message": "Session created successfully"} # except Exception as e: # logger.error(f"Error creating session: {e}", exc_info=True) # raise HTTPException( # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, # detail="Failed to create session" # ) # @app.post("/chat", response_model=ChatResponse) # @limiter.limit("10/minute") # Limit to 10 requests per minute per IP # async def chat(request: Request, chat_request: ChatRequest): # """ # Standard chat endpoint with language support and session history. # Accepts: {"message": "...", "language": "norwegian", "session_id": "optional-session-id"} # """ # try: # # Create or use existing session # session_id = chat_request.session_id # if not session_id: # session_id = session_manager.create_session() # logger.info(f"Created new session for chat: {session_id}") # logger.info( # f"Chat request from {get_remote_address(request)}: " # f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}" # ) # # Add user message to session history # session_manager.add_message_to_history(session_id, "user", chat_request.message) # # Pass language and session to the bot # response = await query_jobobike_bot( # chat_request.message, # language=chat_request.language, # session_id=session_id # ) # if hasattr(response, 'content'): # response_text = response.content # elif isinstance(response, str): # response_text = response # else: # response_text = str(response) # # Add bot response to session history # session_manager.add_message_to_history(session_id, "assistant", response_text) # logger.info(f"Chat response generated successfully in {chat_request.language} for session {session_id}") # return ChatResponse( # response=response_text, # success=True, # session_id=session_id # ) # except HTTPException: # raise # except Exception as e: # logger.error(f"Unexpected error in /chat: {e}", exc_info=True) # raise HTTPException( # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, # detail="An internal error occurred while processing your request." # ) # @app.post("/api/messages", response_model=ChatResponse) # @limiter.limit("10/minute") # Same rate limit as /chat # async def api_messages(request: Request, chat_request: ChatRequest): # """ # Frontend-friendly chat endpoint at /api/messages. # Exactly mirrors /chat logic for session/history support. # Expects: {"message": "...", "language": "english", "session_id": "optional"} # """ # client_ip = get_remote_address(request) # logger.info(f"API Messages request from {client_ip}: message='{chat_request.message[:50]}...', lang='{chat_request.language}', session='{chat_request.session_id}'") # try: # # Create/use session (Firestore-backed) # session_id = chat_request.session_id # if not session_id: # session_id = session_manager.create_session() # logger.info(f"New session created for /api/messages: {session_id}") # # Save user message to history # session_manager.add_message_to_history(session_id, "user", chat_request.message) # # Call your existing bot query function # response = await query_jobobike_bot( # user_message=chat_request.message, # language=chat_request.language, # session_id=session_id # ) # # Extract response text # response_text = ( # response.content if hasattr(response, 'content') # else response if isinstance(response, str) # else str(response) # ) # # Save AI response to history # session_manager.add_message_to_history(session_id, "assistant", response_text) # logger.info(f"API Messages success: Response sent for session {session_id}") # return ChatResponse( # response=response_text, # success=True, # session_id=session_id # ) # except InputGuardrailTripwireTriggered as e: # logger.warning(f"Guardrail blocked /api/messages: {e}") # raise HTTPException( # status_code=403, # detail="Query blocked – please ask about JOBObike services." # ) # except Exception as e: # logger.error(f"Error in /api/messages: {e}", exc_info=True) # raise HTTPException( # status_code=500, # detail="Internal error – try again." # ) # @app.post("/chat-stream") # @limiter.limit("10/minute") # Limit to 10 requests per minute per IP # async def chat_stream(request: Request, chat_request: ChatRequest): # """ # Streaming chat endpoint with language support and session history. # Accepts: {"message": "...", "language": "norwegian", "session_id": "optional-session-id"} # """ # try: # # Create or use existing session # session_id = chat_request.session_id # if not session_id: # session_id = session_manager.create_session() # logger.info(f"Created new session for streaming chat: {session_id}") # logger.info( # f"Stream request from {get_remote_address(request)}: " # f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}" # ) # # Add user message to session history # session_manager.add_message_to_history(session_id, "user", chat_request.message) # # Pass language and session to the streaming bot # stream_generator = query_jobobike_bot_stream( # chat_request.message, # language=chat_request.language, # session_id=session_id # ) # # Note: For streaming, we add the response to history after the stream completes # # This would need to be handled in the frontend by making a separate call or # # by modifying the stream generator to add the complete response to history # return StreamingResponse( # stream_generator, # media_type="text/event-stream", # headers={ # "Cache-Control": "no-cache", # "Connection": "keep-alive", # "X-Accel-Buffering": "no", # "Session-ID": session_id # Include session ID in headers # } # ) # except HTTPException: # raise # except Exception as e: # logger.error(f"Unexpected error in /chat-stream: {e}", exc_info=True) # raise HTTPException( # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, # detail="An internal error occurred while processing your request." # ) # @app.post("/ticket", response_model=TicketResponse) # @limiter.limit("5/hour") # Limit to 5 tickets per hour per IP # async def submit_ticket(request: Request, ticket_request: TicketRequest): # """ # Submit a support ticket via email using Resend API. # Accepts: {"name": "John Doe", "email": "john@example.com", "message": "Issue description"} # """ # try: # client_ip = get_remote_address(request) # logger.info(f"Ticket submission request from {ticket_request.name} ({ticket_request.email}) - IP: {client_ip}") # # Additional rate limiting for tickets # if is_ticket_rate_limited(client_ip): # logger.warning(f"Rate limit exceeded for ticket submission from IP: {client_ip}") # raise HTTPException( # status_code=status.HTTP_429_TOO_MANY_REQUESTS, # detail="Too many ticket submissions. Please try again later." # ) # # Get admin email from environment variables or use a default # admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") # # Use a verified sender email (you need to verify this in your Resend account) # # For testing purposes, you can use your Resend account's verified domain # sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev") # # Prepare the email using Resend # params = { # "from": sender_email, # "to": [admin_email], # "subject": f"Support Ticket from {ticket_request.name}", # "html": f""" #

Hello Admin,

#

A new support ticket has been submitted:

#

Name: {ticket_request.name}

#

Email: {ticket_request.email}

#

Message:

#

{ticket_request.message}

#

IP Address: {client_ip}

#
#

Best regards,
JOBObike Support Team

# """ # } # # Send the email # email = resend.Emails.send(params) # logger.info(f"Ticket submitted successfully by {ticket_request.name} from IP: {client_ip}") # return TicketResponse( # success=True, # message="Ticket submitted successfully. We'll get back to you soon." # ) # except HTTPException: # raise # except Exception as e: # logger.error(f"Error submitting ticket: {e}", exc_info=True) # raise HTTPException( # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, # detail="Failed to submit ticket. Please try again later." # ) # @app.post("/schedule-meeting", response_model=MeetingResponse) # @limiter.limit("3/hour") # Limit to 3 meetings per hour per IP # async def schedule_meeting(request: Request, meeting_request: MeetingRequest): # """ # Schedule a meeting and send email invitations using Resend API. # Accepts meeting details and sends professional email invitations to organizer and attendees. # """ # try: # client_ip = get_remote_address(request) # logger.info(f"Meeting scheduling request from {meeting_request.name} ({meeting_request.email}) - IP: {client_ip}") # # Additional rate limiting for meetings # if is_meeting_rate_limited(client_ip): # logger.warning(f"Rate limit exceeded for meeting scheduling from IP: {client_ip}") # raise HTTPException( # status_code=status.HTTP_429_TOO_MANY_REQUESTS, # detail="Too many meeting requests. Please try again later." # ) # # Generate a unique meeting ID # meeting_id = f"mtg_{int(time.time())}" # # Get admin email from environment variables or use a default # admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") # # Use a verified sender email (you need to verify this in your Resend account) # sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev") # # For Resend testing limitations, we can only send to the owner's email # # In production, you would verify a domain and use that instead # owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") # # Format date and time for display # formatted_datetime = f"{meeting_request.date} at {meeting_request.time} {meeting_request.timezone}" # # Create calendar link (Google Calendar link example) # calendar_link = f"https://calendar.google.com/calendar/render?action=TEMPLATE&text={meeting_request.topic}&dates={meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z/{meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z&details={meeting_request.description or 'Meeting scheduled via JOBObike'}&location={meeting_request.location}" # # Combine all attendees (organizer + additional attendees) # # Validate and format email addresses # all_attendees = [meeting_request.email] # # Validate additional attendees - they must be valid email addresses # for attendee in meeting_request.attendees: # # Simple email validation # if "@" in attendee and "." in attendee: # all_attendees.append(attendee) # else: # # If not a valid email, skip or treat as name only # logger.warning(f"Invalid email format for attendee: {attendee}. Skipping.") # # Remove duplicates while preserving order # seen = set() # unique_attendees = [] # for email in all_attendees: # if email not in seen: # seen.add(email) # unique_attendees.append(email) # all_attendees = unique_attendees # # Prepare the professional HTML email template # html_template = f""" # # # # # # Meeting Scheduled - {meeting_request.topic} # # #
#

Meeting Confirmed!

#

Your meeting has been successfully scheduled

#
#
#

Meeting Details

#
# # # # # # # # # # # # # # # # # # # # # #
Topic:{meeting_request.topic}
Date & Time:{formatted_datetime}
Duration:{meeting_request.duration} minutes
Location:{meeting_request.location}
Organizer:{meeting_request.name} ({meeting_request.email})
#
#
#

Description

#

{meeting_request.description or 'No description provided.'}

#
#
#

Attendees

# #

Note: Only valid email addresses will receive invitations.

#
#
# Add to Calendar #
#
#

Meeting ID: {meeting_id}

#

Need to make changes? Contact the organizer or reply to this email.

#
#
#
#

This meeting was scheduled through JOBObike Chatbot Services

#

Note: Due to Resend testing limitations, this email is only sent to the administrator. In production, after domain verification, invitations will be sent to all attendees.

#

© 2025 JOBObike. All rights reserved.

#
# # # """ # # Send email to all attendees # # Check if we have valid attendees to send to # if not all_attendees: # logger.warning("No valid email addresses found for meeting attendees") # return MeetingResponse( # success=True, # message="Meeting scheduled successfully, but no valid email addresses found for invitations.", # meeting_id=meeting_id # ) # # For Resend testing limitations, we can only send to the owner's email # # In production, you would verify a domain and send to all attendees # owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") # # Prepare email for owner with all attendee information # attendee_list_html = ''.join([f'
  • {attendee}
  • ' for attendee in all_attendees]) # # In a real implementation, you would send to all attendees after verifying your domain # # For now, we're sending to the owner with information about all attendees # params = { # "from": sender_email, # "to": [owner_email], # Only send to owner due to Resend testing limitations # "subject": f"Meeting Scheduled: {meeting_request.topic}", # "html": html_template # } # # Send the email # try: # email = resend.Emails.send(params) # logger.info(f"Email sent successfully to {len(all_attendees)} attendees") # except Exception as email_error: # logger.error(f"Failed to send email: {email_error}", exc_info=True) # # Even if email fails, we still consider the meeting scheduled # return MeetingResponse( # success=True, # message="Meeting scheduled successfully, but failed to send email invitations.", # meeting_id=meeting_id # ) # logger.info(f"Meeting scheduled successfully by {meeting_request.name} from IP: {client_ip}") # return MeetingResponse( # success=True, # message="Meeting scheduled successfully. Due to Resend testing limitations, invitations are only sent to the administrator. In production, after verifying your domain, invitations will be sent to all attendees.", # meeting_id=meeting_id # ) # except HTTPException: # raise # except Exception as e: # logger.error(f"Error scheduling meeting: {e}", exc_info=True) # raise HTTPException( # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, # detail="Failed to schedule meeting. Please try again later." # ) # @app.exception_handler(Exception) # async def global_exception_handler(request: Request, exc: Exception): # logger.error( # f"Unhandled exception: {exc}", # exc_info=True, # extra={ # "path": request.url.path, # "method": request.method, # "client": get_remote_address(request) # } # ) # return JSONResponse( # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, # content={ # "error": "Internal server error", # "detail": "An unexpected error occurred. Please try again later." # } # ) # if __name__ == "__main__": # import uvicorn # uvicorn.run(app, host="0.0.0.0", port=8000) """ FastAPI application for JOBObike Chatbot API Provides /chat and /chat-stream endpoints with rate limiting, CORS, and error handling Updated with language context support and FIXED spacing issue in streaming """ import os import logging import time from typing import Optional from collections import defaultdict import resend from fastapi import FastAPI, Request, HTTPException, status, Depends, Header from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from slowapi.middleware import SlowAPIMiddleware from dotenv import load_dotenv from agents import Runner, RunContextWrapper from agents.exceptions import InputGuardrailTripwireTriggered from openai.types.responses import ResponseTextDeltaEvent from openai import BadRequestError as OpenAIBadRequestError from chatbot.chatbot_agent import jobobike_assistant from sessions.session_manager import session_manager # Load environment variables load_dotenv() # Configure Resend resend.api_key = os.getenv("RESEND_API_KEY") # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('app.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Initialize rate limiter with enhanced security limiter = Limiter(key_func=get_remote_address, default_limits=["100/day", "20/hour", "3/minute"]) # Create FastAPI app app = FastAPI( title="JOBObike Chatbot API", description="AI-powered chatbot API for JOBObike services", version="1.0.0" ) # Add rate limiter middleware app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) app.add_middleware(SlowAPIMiddleware) # Configure CORS from environment variable allowed_origins = os.getenv("ALLOWED_ORIGINS", "").split(",") allowed_origins = [origin.strip() for origin in allowed_origins if origin.strip()] # Always allow common localhost origins for development default_origins = [ "http://localhost:3000", "http://localhost:3001", "http://localhost:5173", "http://localhost:5174", "http://127.0.0.1:3000", "http://127.0.0.1:3001", "http://127.0.0.1:5173", "http://127.0.0.1:5174", ] # Combine default origins with environment origins all_origins = default_origins + allowed_origins app.add_middleware( CORSMiddleware, allow_origins=all_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) logger.info(f"CORS enabled for origins: {all_origins}") # Security setup security = HTTPBearer() # Enhanced rate limiting dictionaries request_counts = defaultdict(list) # Track requests per IP TICKET_RATE_LIMIT = 5 # Max 5 tickets per hour per IP TICKET_TIME_WINDOW = 3600 # 1 hour in seconds MEETING_RATE_LIMIT = 3 # Max 3 meetings per hour per IP MEETING_TIME_WINDOW = 3600 # 1 hour in seconds # Request/Response models class ChatRequest(BaseModel): message: str language: Optional[str] = "english" # Default to English if not specified session_id: Optional[str] = None # Session ID for chat history class ChatResponse(BaseModel): response: str success: bool session_id: str # Include session ID in response class ErrorResponse(BaseModel): error: str detail: Optional[str] = None class TicketRequest(BaseModel): name: str email: str message: str class TicketResponse(BaseModel): success: bool message: str class MeetingRequest(BaseModel): name: str email: str date: str # ISO format date string time: str # Time in HH:MM format timezone: str # Timezone identifier duration: int # Duration in minutes topic: str # Meeting topic/title attendees: list[str] # List of attendee emails description: Optional[str] = None # Optional meeting description location: Optional[str] = "Google Meet" # Meeting location/platform class MeetingResponse(BaseModel): success: bool message: str meeting_id: Optional[str] = None # Unique identifier for the meeting # Security dependency for API key validation async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)): """Verify API key for protected endpoints""" # In production, you would check against a database of valid keys # For now, we'll use an environment variable expected_key = os.getenv("API_KEY") if not expected_key or credentials.credentials != expected_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or missing API key", ) return credentials.credentials def is_ticket_rate_limited(ip_address: str) -> bool: """Check if an IP address has exceeded ticket submission rate limits""" current_time = time.time() # Clean old requests outside the time window request_counts[ip_address] = [ req_time for req_time in request_counts[ip_address] if current_time - req_time < TICKET_TIME_WINDOW ] # Check if limit exceeded if len(request_counts[ip_address]) >= TICKET_RATE_LIMIT: return True # Add current request request_counts[ip_address].append(current_time) return False def is_meeting_rate_limited(ip_address: str) -> bool: """Check if an IP address has exceeded meeting scheduling rate limits""" current_time = time.time() # Clean old requests outside the time window request_counts[ip_address] = [ req_time for req_time in request_counts[ip_address] if current_time - req_time < MEETING_TIME_WINDOW ] # Check if limit exceeded if len(request_counts[ip_address]) >= MEETING_RATE_LIMIT: return True # Add current request request_counts[ip_address].append(current_time) return False # def query_jobobike_bot_stream(user_message: str, language: str = "english", session_id: Optional[str] = None): # """ # Query the JOBObike bot with streaming - returns async generator. # Now includes language context and session history. # FIXED: Proper spacing between words in streaming responses. # Implements fallback to non-streaming when streaming fails (e.g., with Gemini models). # """ # logger.info(f"AGENT STREAM CALL: query_jobobike_bot_stream called with message='{user_message}', language='{language}', session_id='{session_id}'") # # Get session history if session_id is provided # history = [] # if session_id: # history = session_manager.get_session_history(session_id) # logger.info(f"Retrieved {len(history)} history messages for session {session_id}") # try: # # Create context with language preference and history # context_data = {"language": language} # if history: # context_data["history"] = history # ctx = RunContextWrapper(context=context_data) # result = Runner.run_streamed( # jobobike_assistant, # input=user_message, # context=ctx.context # ) # async def generate_stream(): # try: # accumulated_text = "" # FIXED: Track full response for proper spacing # has_streamed = False # try: # # Attempt streaming with error handling for each event # async for event in result.stream_events(): # try: # if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): # delta = event.data.delta or "" # # ---- Spacing Fix (CORRECTED) ---- # # Check against accumulated text, not just previous chunk # if ( # accumulated_text # Only add space if we have previous text # and not accumulated_text.endswith((" ", "\n", "\t")) # Previous doesn't end with whitespace # and not delta.startswith((" ", ".", ",", "?", "!", ":", ";", "\n", "\t", ")", "]", "}", "'", '"')) # Current doesn't start with punctuation/whitespace # and delta # Make sure delta isn't empty # ): # delta = " " + delta # accumulated_text += delta # Update accumulated text # # ---- End Fix ---- # yield f"data: {delta}\n\n" # has_streamed = True # except Exception as event_error: # # Handle individual event errors (e.g., missing logprobs field) # logger.warning(f"Event processing error: {event_error}") # continue # # Add complete response to session history # if accumulated_text and session_id: # session_manager.add_message_to_history(session_id, "assistant", accumulated_text) # logger.info(f"Added assistant response to session history: {session_id}") # yield "data: [DONE]\n\n" # logger.info("AGENT STREAM RESULT: query_jobobike_bot_stream completed successfully") # except Exception as stream_error: # # Fallback to non-streaming if streaming fails # logger.warning(f"Streaming failed, falling back to non-streaming: {stream_error}") # if not has_streamed: # # Get final output using the streaming result's final_output property # try: # # Use the non-streaming API as fallback # fallback_response = await Runner.run( # jobobike_assistant, # input=user_message, # context=ctx.context # ) # if hasattr(fallback_response, 'final_output'): # final_output = fallback_response.final_output # else: # final_output = fallback_response # if hasattr(final_output, 'content'): # response_text = final_output.content # elif isinstance(final_output, str): # response_text = final_output # else: # response_text = str(final_output) # # Add to session history # if session_id: # session_manager.add_message_to_history(session_id, "assistant", response_text) # logger.info(f"Added fallback assistant response to session history: {session_id}") # yield f"data: {response_text}\n\n" # yield "data: [DONE]\n\n" # logger.info("AGENT STREAM RESULT: query_jobobike_bot_stream fallback completed successfully") # except Exception as fallback_error: # logger.error(f"Fallback also failed: {fallback_error}", exc_info=True) # yield f"data: [ERROR] Unable to complete request.\n\n" # else: # # Already streamed some content, just end gracefully # yield "data: [DONE]\n\n" # except InputGuardrailTripwireTriggered as e: # logger.warning(f"Guardrail blocked query during streaming: {e}") # yield f"data: [ERROR] Query was blocked by content guardrail.\n\n" # except Exception as e: # logger.error(f"Streaming error: {e}", exc_info=True) # yield f"data: [ERROR] {str(e)}\n\n" # return generate_stream() # except Exception as e: # logger.error(f"Error setting up stream: {e}", exc_info=True) # async def error_stream(): # yield f"data: [ERROR] Failed to initialize stream.\n\n" # return error_stream() # def query_jobobike_bot_stream(user_message: str, language: str = "english", session_id: Optional[str] = None): # """ # Query the JOBObike bot with streaming - returns async generator. # COMPLETELY FIXED: Simple and reliable spacing logic # """ # logger.info(f"AGENT STREAM CALL: query_jobobike_bot_stream called with message='{user_message}', language='{language}', session_id='{session_id}'") # # Get session history if session_id is provided # history = [] # if session_id: # history = session_manager.get_session_history(session_id) # logger.info(f"Retrieved {len(history)} history messages for session {session_id}") # try: # # Create context with language preference and history # context_data = {"language": language} # if history: # context_data["history"] = history # ctx = RunContextWrapper(context=context_data) # result = Runner.run_streamed( # jobobike_assistant, # input=user_message, # context=ctx.context # ) # async def generate_stream(): # try: # accumulated_text = "" # has_streamed = False # try: # async for event in result.stream_events(): # try: # if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): # delta = event.data.delta or "" # if not delta: # continue # # COMPLETELY FIXED APPROACH: Just send the delta as-is from OpenAI # # OpenAI already includes proper spaces, so we don't need to add them # accumulated_text += delta # # Send delta exactly as received # yield f"data: {delta}\n\n" # has_streamed = True # except Exception as event_error: # logger.warning(f"Event processing error: {event_error}") # continue # # Add complete response to session history # if accumulated_text and session_id: # session_manager.add_message_to_history(session_id, "assistant", accumulated_text) # logger.info(f"Added assistant response to session history: {session_id}") # yield "data: [DONE]\n\n" # logger.info("AGENT STREAM RESULT: query_jobobike_bot_stream completed successfully") # except Exception as stream_error: # logger.warning(f"Streaming failed, falling back to non-streaming: {stream_error}") # if not has_streamed: # try: # fallback_response = await Runner.run( # jobobike_assistant, # input=user_message, # context=ctx.context # ) # if hasattr(fallback_response, 'final_output'): # final_output = fallback_response.final_output # else: # final_output = fallback_response # if hasattr(final_output, 'content'): # response_text = final_output.content # elif isinstance(final_output, str): # response_text = final_output # else: # response_text = str(final_output) # if session_id: # session_manager.add_message_to_history(session_id, "assistant", response_text) # logger.info(f"Added fallback assistant response to session history: {session_id}") # yield f"data: {response_text}\n\n" # yield "data: [DONE]\n\n" # logger.info("AGENT STREAM RESULT: query_jobobike_bot_stream fallback completed successfully") # except Exception as fallback_error: # logger.error(f"Fallback also failed: {fallback_error}", exc_info=True) # yield f"data: [ERROR] Unable to complete request.\n\n" # else: # yield "data: [DONE]\n\n" # except InputGuardrailTripwireTriggered as e: # logger.warning(f"Guardrail blocked query during streaming: {e}") # yield f"data: [ERROR] Query was blocked by content guardrail.\n\n" # except Exception as e: # logger.error(f"Streaming error: {e}", exc_info=True) # yield f"data: [ERROR] {str(e)}\n\n" # return generate_stream() # except Exception as e: # logger.error(f"Error setting up stream: {e}", exc_info=True) # async def error_stream(): # yield f"data: [ERROR] Failed to initialize stream.\n\n" # return error_stream() def query_jobobike_bot_stream(user_message: str, language: str = "english", session_id: Optional[str] = None): """ Query the JOBObike bot with streaming - FIXED VERSION Simply passes through what Gemini sends without any modification """ logger.info(f"AGENT STREAM CALL: query_jobobike_bot_stream called with message='{user_message}', language='{language}', session_id='{session_id}'") # Get session history if session_id is provided history = [] if session_id: history = session_manager.get_session_history(session_id) logger.info(f"Retrieved {len(history)} history messages for session {session_id}") try: # Create context with language preference and history context_data = {"language": language} if history: context_data["history"] = history ctx = RunContextWrapper(context=context_data) result = Runner.run_streamed( jobobike_assistant, input=user_message, context=ctx.context ) async def generate_stream(): try: accumulated_text = "" has_streamed = False try: async for event in result.stream_events(): try: if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): delta = event.data.delta if delta: # Only process if delta has content # CRITICAL: Send delta exactly as received - NO MODIFICATIONS accumulated_text += delta yield f"data: {delta}\n\n" has_streamed = True except Exception as event_error: logger.warning(f"Event processing error: {event_error}") continue # Add complete response to session history if accumulated_text and session_id: session_manager.add_message_to_history(session_id, "assistant", accumulated_text) logger.info(f"Added assistant response to session history: {session_id}") yield "data: [DONE]\n\n" logger.info("AGENT STREAM RESULT: query_jobobike_bot_stream completed successfully") except OpenAIBadRequestError as api_error: error_str = str(api_error) logger.error(f"API Error in streaming: {error_str[:300]}", exc_info=True) # Check if it's an API key error if "API key" in error_str or "expired" in error_str.lower() or "INVALID_ARGUMENT" in error_str or "API_KEY_INVALID" in error_str: logger.error("Gemini API key error detected in streaming - please check your GEMINI_API_KEY") yield f"data: [ERROR] API key expired. Please update GEMINI_API_KEY in server configuration.\n\n" return else: yield f"data: [ERROR] Invalid request to AI service. Please try again.\n\n" return except Exception as stream_error: error_str = str(stream_error) logger.warning(f"Streaming failed, falling back to non-streaming: {error_str[:200]}") # Check if it's an API key error before trying fallback if "API key" in error_str or "expired" in error_str.lower() or "INVALID_ARGUMENT" in error_str: logger.error("API key error detected - skipping fallback") yield f"data: [ERROR] API key expired. Please update GEMINI_API_KEY in server configuration.\n\n" return if not has_streamed: try: fallback_response = await Runner.run( jobobike_assistant, input=user_message, context=ctx.context ) if hasattr(fallback_response, 'final_output'): final_output = fallback_response.final_output else: final_output = fallback_response if hasattr(final_output, 'content'): response_text = final_output.content elif isinstance(final_output, str): response_text = final_output else: response_text = str(final_output) if session_id: session_manager.add_message_to_history(session_id, "assistant", response_text) logger.info(f"Added fallback assistant response to session history: {session_id}") yield f"data: {response_text}\n\n" yield "data: [DONE]\n\n" logger.info("AGENT STREAM RESULT: query_jobobike_bot_stream fallback completed successfully") except OpenAIBadRequestError as fallback_api_error: error_str = str(fallback_api_error) logger.error(f"API Error in fallback: {error_str[:300]}", exc_info=True) if "API key" in error_str or "expired" in error_str.lower() or "INVALID_ARGUMENT" in error_str or "API_KEY_INVALID" in error_str: logger.error("Gemini API key error in fallback - please check your GEMINI_API_KEY") yield f"data: [ERROR] API key expired. Please update GEMINI_API_KEY in server configuration.\n\n" else: yield f"data: [ERROR] Invalid request to AI service. Please try again.\n\n" except Exception as fallback_error: error_str = str(fallback_error) logger.error(f"Fallback also failed: {error_str[:200]}", exc_info=True) if "API key" in error_str or "expired" in error_str.lower() or "INVALID_ARGUMENT" in error_str: yield f"data: [ERROR] API key expired. Please update GEMINI_API_KEY in server configuration.\n\n" else: yield f"data: [ERROR] Unable to complete request. Please try again.\n\n" else: yield "data: [DONE]\n\n" except InputGuardrailTripwireTriggered as e: logger.warning(f"Guardrail blocked query during streaming: {e}") yield f"data: [ERROR] Query was blocked by content guardrail.\n\n" except OpenAIBadRequestError as e: error_str = str(e) logger.error(f"API Error in streaming: {error_str[:300]}", exc_info=True) # Check if it's an API key error if "API key" in error_str or "expired" in error_str.lower() or "INVALID_ARGUMENT" in error_str or "API_KEY_INVALID" in error_str: logger.error("Gemini API key error detected in streaming - please check your GEMINI_API_KEY") yield f"data: [ERROR] API service unavailable. Please check server configuration.\n\n" else: yield f"data: [ERROR] Invalid request to AI service. Please try again.\n\n" except Exception as e: error_str = str(e) logger.error(f"Streaming error: {error_str[:200]}", exc_info=True) # Check if it's an API key error if "API key" in error_str or "expired" in error_str.lower() or "INVALID_ARGUMENT" in error_str: logger.error("Gemini API key error detected in streaming - please check your GEMINI_API_KEY") yield f"data: [ERROR] API service unavailable. Please check server configuration.\n\n" else: yield f"data: [ERROR] An error occurred. Please try again.\n\n" return generate_stream() except Exception as e: logger.error(f"Error setting up stream: {e}", exc_info=True) async def error_stream(): yield f"data: [ERROR] Failed to initialize stream.\n\n" return error_stream() async def query_jobobike_bot(user_message: str, language: str = "english", session_id: Optional[str] = None): """ Query the JOBObike bot - returns complete response. Now includes language context and session history. """ logger.info(f"AGENT CALL: query_jobobike_bot called with message='{user_message}', language='{language}', session_id='{session_id}'") # Get session history if session_id is provided history = [] if session_id: history = session_manager.get_session_history(session_id) logger.info(f"Retrieved {len(history)} history messages for session {session_id}") try: # Create context with language preference and history context_data = {"language": language} if history: context_data["history"] = history ctx = RunContextWrapper(context=context_data) response = await Runner.run( jobobike_assistant, input=user_message, context=ctx.context ) logger.info("AGENT RESULT: query_jobobike_bot completed successfully") return response.final_output except InputGuardrailTripwireTriggered as e: logger.warning(f"Guardrail blocked query: {e}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Query was blocked by content guardrail. Please ensure your query is related to JOBObike services." ) except OpenAIBadRequestError as e: error_str = str(e) logger.error(f"API Error in query_jobobike_bot: {error_str[:300]}", exc_info=True) # Check if it's an API key error if "API key" in error_str or "expired" in error_str.lower() or "INVALID_ARGUMENT" in error_str or "API_KEY_INVALID" in error_str: logger.error("Gemini API key error detected - please check your GEMINI_API_KEY in .env file") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="The API service is currently unavailable. Please check the server configuration or try again later." ) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid request to the AI service. Please try again." ) except Exception as e: error_str = str(e) logger.error(f"Error in query_jobobike_bot: {error_str[:200]}", exc_info=True) # Check if it's an API key error in the error message if "API key" in error_str or "expired" in error_str.lower() or "INVALID_ARGUMENT" in error_str: logger.error("Gemini API key error detected - please check your GEMINI_API_KEY in .env file") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="The API service is currently unavailable. Please check the server configuration." ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An internal error occurred while processing your request." ) @app.get("/") async def root(): return {"status": "ok", "service": "JOBObike Chatbot API"} @app.get("/health") async def health(): return {"status": "healthy"} @app.post("/session") async def create_session(): """ Create a new chat session Returns a session ID that can be used to maintain chat history """ try: session_id = session_manager.create_session() logger.info(f"Created new session: {session_id}") return {"session_id": session_id, "message": "Session created successfully"} except Exception as e: logger.error(f"Error creating session: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create session" ) @app.post("/chat", response_model=ChatResponse) @limiter.limit("10/minute") # Limit to 10 requests per minute per IP async def chat(request: Request, chat_request: ChatRequest): """ Standard chat endpoint with language support and session history. Accepts: {"message": "...", "language": "norwegian", "session_id": "optional-session-id"} """ try: # Create or use existing session session_id = chat_request.session_id if not session_id: session_id = session_manager.create_session() logger.info(f"Created new session for chat: {session_id}") logger.info( f"Chat request from {get_remote_address(request)}: " f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}" ) # Add user message to session history session_manager.add_message_to_history(session_id, "user", chat_request.message) # Pass language and session to the bot response = await query_jobobike_bot( chat_request.message, language=chat_request.language, session_id=session_id ) if hasattr(response, 'content'): response_text = response.content elif isinstance(response, str): response_text = response else: response_text = str(response) # Add bot response to session history session_manager.add_message_to_history(session_id, "assistant", response_text) logger.info(f"Chat response generated successfully in {chat_request.language} for session {session_id}") return ChatResponse( response=response_text, success=True, session_id=session_id ) except HTTPException: raise except Exception as e: logger.error(f"Unexpected error in /chat: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An internal error occurred while processing your request." ) @app.post("/api/messages", response_model=ChatResponse) @limiter.limit("10/minute") # Same rate limit as /chat async def api_messages(request: Request, chat_request: ChatRequest): """ Frontend-friendly chat endpoint at /api/messages. Exactly mirrors /chat logic for session/history support. Expects: {"message": "...", "language": "english", "session_id": "optional"} """ client_ip = get_remote_address(request) logger.info(f"API Messages request from {client_ip}: message='{chat_request.message[:50]}...', lang='{chat_request.language}', session='{chat_request.session_id}'") try: # Create/use session (Firestore-backed) session_id = chat_request.session_id if not session_id: session_id = session_manager.create_session() logger.info(f"New session created for /api/messages: {session_id}") # Save user message to history session_manager.add_message_to_history(session_id, "user", chat_request.message) # Call your existing bot query function response = await query_jobobike_bot( user_message=chat_request.message, language=chat_request.language, session_id=session_id ) # Extract response text response_text = ( response.content if hasattr(response, 'content') else response if isinstance(response, str) else str(response) ) # Save AI response to history session_manager.add_message_to_history(session_id, "assistant", response_text) logger.info(f"API Messages success: Response sent for session {session_id}") return ChatResponse( response=response_text, success=True, session_id=session_id ) except InputGuardrailTripwireTriggered as e: logger.warning(f"Guardrail blocked /api/messages: {e}") raise HTTPException( status_code=403, detail="Query blocked – please ask about JOBObike services." ) except OpenAIBadRequestError as e: error_str = str(e) logger.error(f"API Error in /api/messages: {error_str[:300]}", exc_info=True) if "API key" in error_str or "expired" in error_str.lower() or "INVALID_ARGUMENT" in error_str or "API_KEY_INVALID" in error_str: logger.error("Gemini API key error detected in /api/messages") raise HTTPException( status_code=503, detail="AI service is currently unavailable. Please check server configuration." ) raise HTTPException( status_code=400, detail="Invalid request to AI service. Please try again." ) except Exception as e: error_str = str(e) logger.error(f"Error in /api/messages: {error_str[:200]}", exc_info=True) if "API key" in error_str or "expired" in error_str.lower() or "INVALID_ARGUMENT" in error_str: logger.error("Gemini API key error detected in /api/messages") raise HTTPException( status_code=503, detail="AI service is currently unavailable. Please check server configuration." ) raise HTTPException( status_code=500, detail="Internal error – try again." ) @app.post("/chat-stream") @limiter.limit("10/minute") # Limit to 10 requests per minute per IP async def chat_stream(request: Request, chat_request: ChatRequest): """ Streaming chat endpoint with language support and session history. Accepts: {"message": "...", "language": "norwegian", "session_id": "optional-session-id"} """ try: # Create or use existing session session_id = chat_request.session_id if not session_id: session_id = session_manager.create_session() logger.info(f"Created new session for streaming chat: {session_id}") logger.info( f"Stream request from {get_remote_address(request)}: " f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}" ) # Add user message to session history session_manager.add_message_to_history(session_id, "user", chat_request.message) # Pass language and session to the streaming bot stream_generator = query_jobobike_bot_stream( chat_request.message, language=chat_request.language, session_id=session_id ) # Note: Response is added to history inside the stream generator after completion return StreamingResponse( stream_generator, media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", "Session-ID": session_id # Include session ID in headers } ) except HTTPException: raise except Exception as e: logger.error(f"Unexpected error in /chat-stream: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An internal error occurred while processing your request." ) @app.post("/ticket", response_model=TicketResponse) @limiter.limit("5/hour") # Limit to 5 tickets per hour per IP async def submit_ticket(request: Request, ticket_request: TicketRequest): """ Submit a support ticket via email using Resend API. Accepts: {"name": "John Doe", "email": "john@example.com", "message": "Issue description"} """ try: client_ip = get_remote_address(request) logger.info(f"Ticket submission request from {ticket_request.name} ({ticket_request.email}) - IP: {client_ip}") # Additional rate limiting for tickets if is_ticket_rate_limited(client_ip): logger.warning(f"Rate limit exceeded for ticket submission from IP: {client_ip}") raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many ticket submissions. Please try again later." ) # Get admin email from environment variables or use a default admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") # Use a verified sender email (you need to verify this in your Resend account) # For testing purposes, you can use your Resend account's verified domain sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev") # Prepare the email using Resend params = { "from": sender_email, "to": [admin_email], "subject": f"Support Ticket from {ticket_request.name}", "html": f"""

    Hello Admin,

    A new support ticket has been submitted:

    Name: {ticket_request.name}

    Email: {ticket_request.email}

    Message:

    {ticket_request.message}

    IP Address: {client_ip}


    Best regards,
    JOBObike Support Team

    """ } # Send the email email = resend.Emails.send(params) logger.info(f"Ticket submitted successfully by {ticket_request.name} from IP: {client_ip}") return TicketResponse( success=True, message="Ticket submitted successfully. We'll get back to you soon." ) except HTTPException: raise except Exception as e: logger.error(f"Error submitting ticket: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to submit ticket. Please try again later." ) @app.post("/schedule-meeting", response_model=MeetingResponse) @limiter.limit("3/hour") # Limit to 3 meetings per hour per IP async def schedule_meeting(request: Request, meeting_request: MeetingRequest): """ Schedule a meeting and send email invitations using Resend API. Accepts meeting details and sends professional email invitations to organizer and attendees. """ try: client_ip = get_remote_address(request) logger.info(f"Meeting scheduling request from {meeting_request.name} ({meeting_request.email}) - IP: {client_ip}") # Additional rate limiting for meetings if is_meeting_rate_limited(client_ip): logger.warning(f"Rate limit exceeded for meeting scheduling from IP: {client_ip}") raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many meeting requests. Please try again later." ) # Generate a unique meeting ID meeting_id = f"mtg_{int(time.time())}" # Get admin email from environment variables or use a default admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") # Use a verified sender email (you need to verify this in your Resend account) sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev") # For Resend testing limitations, we can only send to the owner's email # In production, you would verify a domain and use that instead owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") # Format date and time for display formatted_datetime = f"{meeting_request.date} at {meeting_request.time} {meeting_request.timezone}" # Create calendar link (Google Calendar link example) calendar_link = f"https://calendar.google.com/calendar/render?action=TEMPLATE&text={meeting_request.topic}&dates={meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z/{meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z&details={meeting_request.description or 'Meeting scheduled via JOBObike'}&location={meeting_request.location}" # Combine all attendees (organizer + additional attendees) # Validate and format email addresses all_attendees = [meeting_request.email] # Validate additional attendees - they must be valid email addresses for attendee in meeting_request.attendees: # Simple email validation if "@" in attendee and "." in attendee: all_attendees.append(attendee) else: # If not a valid email, skip or treat as name only logger.warning(f"Invalid email format for attendee: {attendee}. Skipping.") # Remove duplicates while preserving order seen = set() unique_attendees = [] for email in all_attendees: if email not in seen: seen.add(email) unique_attendees.append(email) all_attendees = unique_attendees # Prepare the professional HTML email template html_template = f""" Meeting Scheduled - {meeting_request.topic}

    Meeting Confirmed!

    Your meeting has been successfully scheduled

    Meeting Details

    Topic: {meeting_request.topic}
    Date & Time: {formatted_datetime}
    Duration: {meeting_request.duration} minutes
    Location: {meeting_request.location}
    Organizer: {meeting_request.name} ({meeting_request.email})

    Description

    {meeting_request.description or 'No description provided.'}

    Attendees

    Note: Only valid email addresses will receive invitations.

    Add to Calendar

    Meeting ID: {meeting_id}

    Need to make changes? Contact the organizer or reply to this email.

    This meeting was scheduled through JOBObike Chatbot Services

    Note: Due to Resend testing limitations, this email is only sent to the administrator. In production, after domain verification, invitations will be sent to all attendees.

    © 2025 JOBObike. All rights reserved.

    """ # Send email to all attendees # Check if we have valid attendees to send to if not all_attendees: logger.warning("No valid email addresses found for meeting attendees") return MeetingResponse( success=True, message="Meeting scheduled successfully, but no valid email addresses found for invitations.", meeting_id=meeting_id ) # For Resend testing limitations, we can only send to the owner's email # In production, you would verify a domain and send to all attendees owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") # Prepare email for owner with all attendee information attendee_list_html = ''.join([f'
  • {attendee}
  • ' for attendee in all_attendees]) # In a real implementation, you would send to all attendees after verifying your domain # For now, we're sending to the owner with information about all attendees params = { "from": sender_email, "to": [owner_email], # Only send to owner due to Resend testing limitations "subject": f"Meeting Scheduled: {meeting_request.topic}", "html": html_template } # Send the email try: email = resend.Emails.send(params) logger.info(f"Email sent successfully to {len(all_attendees)} attendees") except Exception as email_error: logger.error(f"Failed to send email: {email_error}", exc_info=True) # Even if email fails, we still consider the meeting scheduled return MeetingResponse( success=True, message="Meeting scheduled successfully, but failed to send email invitations.", meeting_id=meeting_id ) logger.info(f"Meeting scheduled successfully by {meeting_request.name} from IP: {client_ip}") return MeetingResponse( success=True, message="Meeting scheduled successfully. Due to Resend testing limitations, invitations are only sent to the administrator. In production, after verifying your domain, invitations will be sent to all attendees.", meeting_id=meeting_id ) except HTTPException: raise except Exception as e: logger.error(f"Error scheduling meeting: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to schedule meeting. Please try again later." ) @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): logger.error( f"Unhandled exception: {exc}", exc_info=True, extra={ "path": request.url.path, "method": request.method, "client": get_remote_address(request) } ) return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ "error": "Internal server error", "detail": "An unexpected error occurred. Please try again later." } ) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)