Launchlab / app.py
MuhammadSaad16's picture
change API Key
24b4e0b
# """
# FastAPI application for Launchlabs Chatbot API
# Provides /chat and /chat-stream endpoints with rate limiting, CORS, and error handling
# Updated with language context support
# """
# import os
# import logging
# import time
# from typing import Optional
# from collections import defaultdict
# import resend
# from fastapi import FastAPI, Request, HTTPException, status, Depends, Header
# from fastapi.responses import StreamingResponse, JSONResponse
# from fastapi.middleware.cors import CORSMiddleware
# from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
# from pydantic import BaseModel
# from slowapi import Limiter, _rate_limit_exceeded_handler
# from slowapi.util import get_remote_address
# from slowapi.errors import RateLimitExceeded
# from slowapi.middleware import SlowAPIMiddleware
# from dotenv import load_dotenv
# from agents import Runner, RunContextWrapper
# from agents.exceptions import InputGuardrailTripwireTriggered
# from openai.types.responses import ResponseTextDeltaEvent
# from chatbot.chatbot_agent import launchlabs_assistant
# from sessions.session_manager import session_manager
# # Load environment variables
# load_dotenv()
# # Configure Resend
# resend.api_key = os.getenv("RESEND_API_KEY")
# # Configure logging
# logging.basicConfig(
# level=logging.INFO,
# format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
# handlers=[
# logging.FileHandler('app.log'),
# logging.StreamHandler()
# ]
# )
# logger = logging.getLogger(__name__)
# # Initialize rate limiter with enhanced security
# limiter = Limiter(key_func=get_remote_address, default_limits=["100/day", "20/hour", "3/minute"])
# # Create FastAPI app
# app = FastAPI(
# title="Launchlabs Chatbot API",
# description="AI-powered chatbot API for Launchlabs services",
# version="1.0.0"
# )
# # Add rate limiter middleware
# app.state.limiter = limiter
# app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# app.add_middleware(SlowAPIMiddleware)
# # Configure CORS from environment variable
# allowed_origins = os.getenv("ALLOWED_ORIGINS", "").split(",")
# allowed_origins = [origin.strip() for origin in allowed_origins if origin.strip()]
# if allowed_origins:
# app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"] + allowed_origins,
# allow_credentials=True,
# allow_methods=["*"],
# allow_headers=["*"],
# )
# logger.info(f"CORS enabled for origins: {allowed_origins}")
# else:
# logger.warning("No ALLOWED_ORIGINS set in .env - CORS disabled")
# # Security setup
# security = HTTPBearer()
# # Enhanced rate limiting dictionaries
# request_counts = defaultdict(list) # Track requests per IP
# TICKET_RATE_LIMIT = 5 # Max 5 tickets per hour per IP
# TICKET_TIME_WINDOW = 3600 # 1 hour in seconds
# MEETING_RATE_LIMIT = 3 # Max 3 meetings per hour per IP
# MEETING_TIME_WINDOW = 3600 # 1 hour in seconds
# # Request/Response models
# class ChatRequest(BaseModel):
# message: str
# language: Optional[str] = "english" # Default to English if not specified
# session_id: Optional[str] = None # Session ID for chat history
# class ChatResponse(BaseModel):
# response: str
# success: bool
# session_id: str # Include session ID in response
# class ErrorResponse(BaseModel):
# error: str
# detail: Optional[str] = None
# class TicketRequest(BaseModel):
# name: str
# email: str
# message: str
# class TicketResponse(BaseModel):
# success: bool
# message: str
# class MeetingRequest(BaseModel):
# name: str
# email: str
# date: str # ISO format date string
# time: str # Time in HH:MM format
# timezone: str # Timezone identifier
# duration: int # Duration in minutes
# topic: str # Meeting topic/title
# attendees: list[str] # List of attendee emails
# description: Optional[str] = None # Optional meeting description
# location: Optional[str] = "Google Meet" # Meeting location/platform
# class MeetingResponse(BaseModel):
# success: bool
# message: str
# meeting_id: Optional[str] = None # Unique identifier for the meeting
# # Security dependency for API key validation
# async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)):
# """Verify API key for protected endpoints"""
# # In production, you would check against a database of valid keys
# # For now, we'll use an environment variable
# expected_key = os.getenv("API_KEY")
# if not expected_key or credentials.credentials != expected_key:
# raise HTTPException(
# status_code=status.HTTP_401_UNAUTHORIZED,
# detail="Invalid or missing API key",
# )
# return credentials.credentials
# def is_ticket_rate_limited(ip_address: str) -> bool:
# """Check if an IP address has exceeded ticket submission rate limits"""
# current_time = time.time()
# # Clean old requests outside the time window
# request_counts[ip_address] = [
# req_time for req_time in request_counts[ip_address]
# if current_time - req_time < TICKET_TIME_WINDOW
# ]
# # Check if limit exceeded
# if len(request_counts[ip_address]) >= TICKET_RATE_LIMIT:
# return True
# # Add current request
# request_counts[ip_address].append(current_time)
# return False
# def is_meeting_rate_limited(ip_address: str) -> bool:
# """Check if an IP address has exceeded meeting scheduling rate limits"""
# current_time = time.time()
# # Clean old requests outside the time window
# request_counts[ip_address] = [
# req_time for req_time in request_counts[ip_address]
# if current_time - req_time < MEETING_TIME_WINDOW
# ]
# # Check if limit exceeded
# if len(request_counts[ip_address]) >= MEETING_RATE_LIMIT:
# return True
# # Add current request
# request_counts[ip_address].append(current_time)
# return False
# def query_launchlabs_bot_stream(user_message: str, language: str = "english", session_id: Optional[str] = None):
# """
# Query the Launchlabs bot with streaming - returns async generator.
# Now includes language context and session history.
# Implements fallback to non-streaming when streaming fails (e.g., with Gemini models).
# """
# logger.info(f"AGENT STREAM CALL: query_launchlabs_bot_stream called with message='{user_message}', language='{language}', session_id='{session_id}'")
# # Get session history if session_id is provided
# history = []
# if session_id:
# history = session_manager.get_session_history(session_id)
# logger.info(f"Retrieved {len(history)} history messages for session {session_id}")
# try:
# # Create context with language preference and history
# context_data = {"language": language}
# if history:
# context_data["history"] = history
# ctx = RunContextWrapper(context=context_data)
# result = Runner.run_streamed(
# launchlabs_assistant,
# input=user_message,
# context=ctx.context
# )
# async def generate_stream():
# try:
# previous = ""
# has_streamed = True
# try:
# # Attempt streaming with error handling for each event
# async for event in result.stream_events():
# try:
# if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
# delta = event.data.delta or ""
# # ---- Spacing Fix ----
# if (
# previous
# and not previous.endswith((" ", "\n"))
# and not delta.startswith((" ", ".", ",", "?", "!", ":", ";"))
# ):
# delta = " " + delta
# previous = delta
# # ---- End Fix ----
# yield f"data: {delta}\n\n"
# has_streamed = True
# except Exception as event_error:
# # Handle individual event errors (e.g., missing logprobs field)
# logger.warning(f"Event processing error: {event_error}")
# continue
# yield "data: [DONE]\n\n"
# logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream completed successfully")
# except Exception as stream_error:
# # Fallback to non-streaming if streaming fails
# logger.warning(f"Streaming failed, falling back to non-streaming: {stream_error}")
# if not has_streamed:
# # Get final output using the streaming result's final_output property
# # Wait for the stream to complete to get final output
# try:
# # Use the non-streaming API as fallback
# fallback_response = await Runner.run(
# launchlabs_assistant,
# input=user_message,
# context=ctx.context
# )
# if hasattr(fallback_response, 'final_output'):
# final_output = fallback_response.final_output
# else:
# final_output = fallback_response
# if hasattr(final_output, 'content'):
# response_text = final_output.content
# elif isinstance(final_output, str):
# response_text = final_output
# else:
# response_text = str(final_output)
# yield f"data: {response_text}\n\n"
# yield "data: [DONE]\n\n"
# logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream fallback completed successfully")
# except Exception as fallback_error:
# logger.error(f"Fallback also failed: {fallback_error}", exc_info=True)
# yield f"data: [ERROR] Unable to complete request.\n\n"
# else:
# # Already streamed some content, just end gracefully
# yield "data: [DONE]\n\n"
# except InputGuardrailTripwireTriggered as e:
# logger.warning(f"Guardrail blocked query during streaming: {e}")
# yield f"data: [ERROR] Query was blocked by content guardrail.\n\n"
# except Exception as e:
# logger.error(f"Streaming error: {e}", exc_info=True)
# yield f"data: [ERROR] {str(e)}\n\n"
# return generate_stream()
# except Exception as e:
# logger.error(f"Error setting up stream: {e}", exc_info=True)
# async def error_stream():
# yield f"data: [ERROR] Failed to initialize stream.\n\n"
# return error_stream()
# async def query_launchlabs_bot(user_message: str, language: str = "english", session_id: Optional[str] = None):
# """
# Query the Launchlabs bot - returns complete response.
# Now includes language context and session history.
# """
# logger.info(f"AGENT CALL: query_launchlabs_bot called with message='{user_message}', language='{language}', session_id='{session_id}'")
# # Get session history if session_id is provided
# history = []
# if session_id:
# history = session_manager.get_session_history(session_id)
# logger.info(f"Retrieved {len(history)} history messages for session {session_id}")
# try:
# # Create context with language preference and history
# context_data = {"language": language}
# if history:
# context_data["history"] = history
# ctx = RunContextWrapper(context=context_data)
# response = await Runner.run(
# launchlabs_assistant,
# input=user_message,
# context=ctx.context
# )
# logger.info("AGENT RESULT: query_launchlabs_bot completed successfully")
# return response.final_output
# except InputGuardrailTripwireTriggered as e:
# logger.warning(f"Guardrail blocked query: {e}")
# raise HTTPException(
# status_code=status.HTTP_403_FORBIDDEN,
# detail="Query was blocked by content guardrail. Please ensure your query is related to Launchlabs services."
# )
# except Exception as e:
# logger.error(f"Error in query_launchlabs_bot: {e}", exc_info=True)
# raise HTTPException(
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
# detail="An internal error occurred while processing your request."
# )
# @app.get("/")
# async def root():
# return {"status": "ok", "service": "Launchlabs Chatbot API"}
# @app.get("/health")
# async def health():
# return {"status": "healthy"}
# @app.post("/session")
# async def create_session():
# """
# Create a new chat session
# Returns a session ID that can be used to maintain chat history
# """
# try:
# session_id = session_manager.create_session()
# logger.info(f"Created new session: {session_id}")
# return {"session_id": session_id, "message": "Session created successfully"}
# except Exception as e:
# logger.error(f"Error creating session: {e}", exc_info=True)
# raise HTTPException(
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
# detail="Failed to create session"
# )
# @app.post("/chat", response_model=ChatResponse)
# @limiter.limit("10/minute") # Limit to 10 requests per minute per IP
# async def chat(request: Request, chat_request: ChatRequest):
# """
# Standard chat endpoint with language support and session history.
# Accepts: {"message": "...", "language": "norwegian", "session_id": "optional-session-id"}
# """
# try:
# # Create or use existing session
# session_id = chat_request.session_id
# if not session_id:
# session_id = session_manager.create_session()
# logger.info(f"Created new session for chat: {session_id}")
# logger.info(
# f"Chat request from {get_remote_address(request)}: "
# f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}"
# )
# # Add user message to session history
# session_manager.add_message_to_history(session_id, "user", chat_request.message)
# # Pass language and session to the bot
# response = await query_launchlabs_bot(
# chat_request.message,
# language=chat_request.language,
# session_id=session_id
# )
# if hasattr(response, 'content'):
# response_text = response.content
# elif isinstance(response, str):
# response_text = response
# else:
# response_text = str(response)
# # Add bot response to session history
# session_manager.add_message_to_history(session_id, "assistant", response_text)
# logger.info(f"Chat response generated successfully in {chat_request.language} for session {session_id}")
# return ChatResponse(
# response=response_text,
# success=True,
# session_id=session_id
# )
# except HTTPException:
# raise
# except Exception as e:
# logger.error(f"Unexpected error in /chat: {e}", exc_info=True)
# raise HTTPException(
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
# detail="An internal error occurred while processing your request."
# )
# @app.post("/api/messages", response_model=ChatResponse)
# @limiter.limit("10/minute") # Same rate limit as /chat
# async def api_messages(request: Request, chat_request: ChatRequest):
# """
# Frontend-friendly chat endpoint at /api/messages.
# Exactly mirrors /chat logic for session/history support.
# Expects: {"message": "...", "language": "english", "session_id": "optional"}
# """
# client_ip = get_remote_address(request)
# logger.info(f"API Messages request from {client_ip}: message='{chat_request.message[:50]}...', lang='{chat_request.language}', session='{chat_request.session_id}'")
# try:
# # Create/use session (Firestore-backed)
# session_id = chat_request.session_id
# if not session_id:
# session_id = session_manager.create_session()
# logger.info(f"New session created for /api/messages: {session_id}")
# # Save user message to history
# session_manager.add_message_to_history(session_id, "user", chat_request.message)
# # Call your existing bot query function
# response = await query_launchlabs_bot(
# user_message=chat_request.message,
# language=chat_request.language,
# session_id=session_id
# )
# # Extract response text
# response_text = (
# response.content if hasattr(response, 'content')
# else response if isinstance(response, str)
# else str(response)
# )
# # Save AI response to history
# session_manager.add_message_to_history(session_id, "assistant", response_text)
# logger.info(f"API Messages success: Response sent for session {session_id}")
# return ChatResponse(
# response=response_text,
# success=True,
# session_id=session_id
# )
# except InputGuardrailTripwireTriggered as e:
# logger.warning(f"Guardrail blocked /api/messages: {e}")
# raise HTTPException(
# status_code=403,
# detail="Query blocked – please ask about Launchlabs services."
# )
# except Exception as e:
# logger.error(f"Error in /api/messages: {e}", exc_info=True)
# raise HTTPException(
# status_code=500,
# detail="Internal error – try again."
# )
# @app.post("/chat-stream")
# @limiter.limit("10/minute") # Limit to 10 requests per minute per IP
# async def chat_stream(request: Request, chat_request: ChatRequest):
# """
# Streaming chat endpoint with language support and session history.
# Accepts: {"message": "...", "language": "norwegian", "session_id": "optional-session-id"}
# """
# try:
# # Create or use existing session
# session_id = chat_request.session_id
# if not session_id:
# session_id = session_manager.create_session()
# logger.info(f"Created new session for streaming chat: {session_id}")
# logger.info(
# f"Stream request from {get_remote_address(request)}: "
# f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}"
# )
# # Add user message to session history
# session_manager.add_message_to_history(session_id, "user", chat_request.message)
# # Pass language and session to the streaming bot
# stream_generator = query_launchlabs_bot_stream(
# chat_request.message,
# language=chat_request.language,
# session_id=session_id
# )
# # Note: For streaming, we add the response to history after the stream completes
# # This would need to be handled in the frontend by making a separate call or
# # by modifying the stream generator to add the complete response to history
# return StreamingResponse(
# stream_generator,
# media_type="text/event-stream",
# headers={
# "Cache-Control": "no-cache",
# "Connection": "keep-alive",
# "X-Accel-Buffering": "no",
# "Session-ID": session_id # Include session ID in headers
# }
# )
# except HTTPException:
# raise
# except Exception as e:
# logger.error(f"Unexpected error in /chat-stream: {e}", exc_info=True)
# raise HTTPException(
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
# detail="An internal error occurred while processing your request."
# )
# @app.post("/ticket", response_model=TicketResponse)
# @limiter.limit("5/hour") # Limit to 5 tickets per hour per IP
# async def submit_ticket(request: Request, ticket_request: TicketRequest):
# """
# Submit a support ticket via email using Resend API.
# Accepts: {"name": "John Doe", "email": "john@example.com", "message": "Issue description"}
# """
# try:
# client_ip = get_remote_address(request)
# logger.info(f"Ticket submission request from {ticket_request.name} ({ticket_request.email}) - IP: {client_ip}")
# # Additional rate limiting for tickets
# if is_ticket_rate_limited(client_ip):
# logger.warning(f"Rate limit exceeded for ticket submission from IP: {client_ip}")
# raise HTTPException(
# status_code=status.HTTP_429_TOO_MANY_REQUESTS,
# detail="Too many ticket submissions. Please try again later."
# )
# # Get admin email from environment variables or use a default
# admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com")
# # Use a verified sender email (you need to verify this in your Resend account)
# # For testing purposes, you can use your Resend account's verified domain
# sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev")
# # Prepare the email using Resend
# params = {
# "from": sender_email,
# "to": [admin_email],
# "subject": f"Support Ticket from {ticket_request.name}",
# "html": f"""
# <p>Hello Admin,</p>
# <p>A new support ticket has been submitted:</p>
# <p><strong>Name:</strong> {ticket_request.name}</p>
# <p><strong>Email:</strong> {ticket_request.email}</p>
# <p><strong>Message:</strong></p>
# <p>{ticket_request.message}</p>
# <p><strong>IP Address:</strong> {client_ip}</p>
# <br>
# <p>Best regards,<br>Launchlabs Support Team</p>
# """
# }
# # Send the email
# email = resend.Emails.send(params)
# logger.info(f"Ticket submitted successfully by {ticket_request.name} from IP: {client_ip}")
# return TicketResponse(
# success=True,
# message="Ticket submitted successfully. We'll get back to you soon."
# )
# except HTTPException:
# raise
# except Exception as e:
# logger.error(f"Error submitting ticket: {e}", exc_info=True)
# raise HTTPException(
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
# detail="Failed to submit ticket. Please try again later."
# )
# @app.post("/schedule-meeting", response_model=MeetingResponse)
# @limiter.limit("3/hour") # Limit to 3 meetings per hour per IP
# async def schedule_meeting(request: Request, meeting_request: MeetingRequest):
# """
# Schedule a meeting and send email invitations using Resend API.
# Accepts meeting details and sends professional email invitations to organizer and attendees.
# """
# try:
# client_ip = get_remote_address(request)
# logger.info(f"Meeting scheduling request from {meeting_request.name} ({meeting_request.email}) - IP: {client_ip}")
# # Additional rate limiting for meetings
# if is_meeting_rate_limited(client_ip):
# logger.warning(f"Rate limit exceeded for meeting scheduling from IP: {client_ip}")
# raise HTTPException(
# status_code=status.HTTP_429_TOO_MANY_REQUESTS,
# detail="Too many meeting requests. Please try again later."
# )
# # Generate a unique meeting ID
# meeting_id = f"mtg_{int(time.time())}"
# # Get admin email from environment variables or use a default
# admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com")
# # Use a verified sender email (you need to verify this in your Resend account)
# sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev")
# # For Resend testing limitations, we can only send to the owner's email
# # In production, you would verify a domain and use that instead
# owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com")
# # Format date and time for display
# formatted_datetime = f"{meeting_request.date} at {meeting_request.time} {meeting_request.timezone}"
# # Create calendar link (Google Calendar link example)
# calendar_link = f"https://calendar.google.com/calendar/render?action=TEMPLATE&text={meeting_request.topic}&dates={meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z/{meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z&details={meeting_request.description or 'Meeting scheduled via Launchlabs'}&location={meeting_request.location}"
# # Combine all attendees (organizer + additional attendees)
# # Validate and format email addresses
# all_attendees = [meeting_request.email]
# # Validate additional attendees - they must be valid email addresses
# for attendee in meeting_request.attendees:
# # Simple email validation
# if "@" in attendee and "." in attendee:
# all_attendees.append(attendee)
# else:
# # If not a valid email, skip or treat as name only
# logger.warning(f"Invalid email format for attendee: {attendee}. Skipping.")
# # Remove duplicates while preserving order
# seen = set()
# unique_attendees = []
# for email in all_attendees:
# if email not in seen:
# seen.add(email)
# unique_attendees.append(email)
# all_attendees = unique_attendees
# # Prepare the professional HTML email template
# html_template = f"""
# <!DOCTYPE html>
# <html>
# <head>
# <meta charset="UTF-8">
# <meta name="viewport" content="width=device-width, initial-scale=1.0">
# <title>Meeting Scheduled - {meeting_request.topic}</title>
# </head>
# <body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333; max-width: 600px; margin: 0 auto; padding: 20px;">
# <div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 30px; text-align: center; border-radius: 10px 10px 0 0;">
# <h1 style="margin: 0; font-size: 28px;">Meeting Confirmed!</h1>
# <p style="font-size: 18px; margin-top: 10px;">Your meeting has been successfully scheduled</p>
# </div>
# <div style="background-color: #ffffff; padding: 30px; border: 1px solid #eaeaea; border-top: none; border-radius: 0 0 10px 10px;">
# <h2 style="color: #333;">Meeting Details</h2>
# <div style="background-color: #f8f9fa; padding: 20px; border-radius: 8px; margin: 20px 0;">
# <table style="width: 100%; border-collapse: collapse;">
# <tr>
# <td style="padding: 8px 0; font-weight: bold; width: 30%;">Topic:</td>
# <td style="padding: 8px 0;">{meeting_request.topic}</td>
# </tr>
# <tr style="background-color: #f0f0f0;">
# <td style="padding: 8px 0; font-weight: bold;">Date & Time:</td>
# <td style="padding: 8px 0;">{formatted_datetime}</td>
# </tr>
# <tr>
# <td style="padding: 8px 0; font-weight: bold;">Duration:</td>
# <td style="padding: 8px 0;">{meeting_request.duration} minutes</td>
# </tr>
# <tr style="background-color: #f0f0f0;">
# <td style="padding: 8px 0; font-weight: bold;">Location:</td>
# <td style="padding: 8px 0;">{meeting_request.location}</td>
# </tr>
# <tr>
# <td style="padding: 8px 0; font-weight: bold;">Organizer:</td>
# <td style="padding: 8px 0;">{meeting_request.name} ({meeting_request.email})</td>
# </tr>
# </table>
# </div>
# <div style="margin: 25px 0;">
# <h3 style="color: #333;">Description</h3>
# <p style="background-color: #f8f9fa; padding: 15px; border-radius: 8px; white-space: pre-wrap;">{meeting_request.description or 'No description provided.'}</p>
# </div>
# <div style="margin: 25px 0;">
# <h3 style="color: #333;">Attendees</h3>
# <ul style="background-color: #f8f9fa; padding: 15px; border-radius: 8px;">
# {''.join([f'<li>{attendee}</li>' for attendee in all_attendees])}
# </ul>
# <p style="font-size: 12px; color: #666; margin-top: 5px;">Note: Only valid email addresses will receive invitations.</p>
# </div>
# <div style="text-align: center; margin: 30px 0;">
# <a href="{calendar_link}" style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 12px 25px; text-decoration: none; border-radius: 5px; font-weight: bold; display: inline-block;">Add to Calendar</a>
# </div>
# <div style="background-color: #e3f2fd; padding: 15px; border-radius: 8px; margin-top: 25px;">
# <p style="margin: 0;"><strong>Meeting ID:</strong> {meeting_id}</p>
# <p style="margin: 10px 0 0 0; font-size: 14px; color: #666;">Need to make changes? Contact the organizer or reply to this email.</p>
# </div>
# </div>
# <div style="text-align: center; margin-top: 30px; color: #888; font-size: 14px;">
# <p>This meeting was scheduled through Launchlabs Chatbot Services</p>
# <p><strong>Note:</strong> Due to Resend testing limitations, this email is only sent to the administrator. In production, after domain verification, invitations will be sent to all attendees.</p>
# <p>© 2025 Launchlabs. All rights reserved.</p>
# </div>
# </body>
# </html>
# """
# # Send email to all attendees
# # Check if we have valid attendees to send to
# if not all_attendees:
# logger.warning("No valid email addresses found for meeting attendees")
# return MeetingResponse(
# success=True,
# message="Meeting scheduled successfully, but no valid email addresses found for invitations.",
# meeting_id=meeting_id
# )
# # For Resend testing limitations, we can only send to the owner's email
# # In production, you would verify a domain and send to all attendees
# owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com")
# # Prepare email for owner with all attendee information
# attendee_list_html = ''.join([f'<li>{attendee}</li>' for attendee in all_attendees])
# # In a real implementation, you would send to all attendees after verifying your domain
# # For now, we're sending to the owner with information about all attendees
# params = {
# "from": sender_email,
# "to": [owner_email], # Only send to owner due to Resend testing limitations
# "subject": f"Meeting Scheduled: {meeting_request.topic}",
# "html": html_template
# }
# # Send the email
# try:
# email = resend.Emails.send(params)
# logger.info(f"Email sent successfully to {len(all_attendees)} attendees")
# except Exception as email_error:
# logger.error(f"Failed to send email: {email_error}", exc_info=True)
# # Even if email fails, we still consider the meeting scheduled
# return MeetingResponse(
# success=True,
# message="Meeting scheduled successfully, but failed to send email invitations.",
# meeting_id=meeting_id
# )
# logger.info(f"Meeting scheduled successfully by {meeting_request.name} from IP: {client_ip}")
# return MeetingResponse(
# success=True,
# message="Meeting scheduled successfully. Due to Resend testing limitations, invitations are only sent to the administrator. In production, after verifying your domain, invitations will be sent to all attendees.",
# meeting_id=meeting_id
# )
# except HTTPException:
# raise
# except Exception as e:
# logger.error(f"Error scheduling meeting: {e}", exc_info=True)
# raise HTTPException(
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
# detail="Failed to schedule meeting. Please try again later."
# )
# @app.exception_handler(Exception)
# async def global_exception_handler(request: Request, exc: Exception):
# logger.error(
# f"Unhandled exception: {exc}",
# exc_info=True,
# extra={
# "path": request.url.path,
# "method": request.method,
# "client": get_remote_address(request)
# }
# )
# return JSONResponse(
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
# content={
# "error": "Internal server error",
# "detail": "An unexpected error occurred. Please try again later."
# }
# )
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000)
"""
FastAPI application for Launchlabs Chatbot API
Provides /chat and /chat-stream endpoints with rate limiting, CORS, and error handling
Updated with language context support and FIXED spacing issue in streaming
"""
import os
import logging
import time
from typing import Optional
from collections import defaultdict
import resend
from fastapi import FastAPI, Request, HTTPException, status, Depends, Header
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from dotenv import load_dotenv
from agents import Runner, RunContextWrapper
from agents.exceptions import InputGuardrailTripwireTriggered
from openai.types.responses import ResponseTextDeltaEvent
from chatbot.chatbot_agent import launchlabs_assistant
from sessions.session_manager import session_manager
# Load environment variables
load_dotenv()
# Configure Resend
resend.api_key = os.getenv("RESEND_API_KEY")
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Initialize rate limiter with enhanced security
limiter = Limiter(key_func=get_remote_address, default_limits=["100/day", "20/hour", "3/minute"])
# Create FastAPI app
app = FastAPI(
title="Launchlabs Chatbot API",
description="AI-powered chatbot API for Launchlabs services",
version="1.0.0"
)
# Add rate limiter middleware
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)
# Configure CORS from environment variable
allowed_origins = os.getenv("ALLOWED_ORIGINS", "").split(",")
allowed_origins = [origin.strip() for origin in allowed_origins if origin.strip()]
if allowed_origins:
app.add_middleware(
CORSMiddleware,
allow_origins=["*"] + allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
logger.info(f"CORS enabled for origins: {allowed_origins}")
else:
logger.warning("No ALLOWED_ORIGINS set in .env - CORS disabled")
# Security setup
security = HTTPBearer()
# Enhanced rate limiting dictionaries
request_counts = defaultdict(list) # Track requests per IP
TICKET_RATE_LIMIT = 5 # Max 5 tickets per hour per IP
TICKET_TIME_WINDOW = 3600 # 1 hour in seconds
MEETING_RATE_LIMIT = 3 # Max 3 meetings per hour per IP
MEETING_TIME_WINDOW = 3600 # 1 hour in seconds
# Request/Response models
class ChatRequest(BaseModel):
message: str
language: Optional[str] = "english" # Default to English if not specified
session_id: Optional[str] = None # Session ID for chat history
class ChatResponse(BaseModel):
response: str
success: bool
session_id: str # Include session ID in response
class ErrorResponse(BaseModel):
error: str
detail: Optional[str] = None
class TicketRequest(BaseModel):
name: str
email: str
message: str
class TicketResponse(BaseModel):
success: bool
message: str
class MeetingRequest(BaseModel):
name: str
email: str
date: str # ISO format date string
time: str # Time in HH:MM format
timezone: str # Timezone identifier
duration: int # Duration in minutes
topic: str # Meeting topic/title
attendees: list[str] # List of attendee emails
description: Optional[str] = None # Optional meeting description
location: Optional[str] = "Google Meet" # Meeting location/platform
class MeetingResponse(BaseModel):
success: bool
message: str
meeting_id: Optional[str] = None # Unique identifier for the meeting
# Security dependency for API key validation
async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify API key for protected endpoints"""
# In production, you would check against a database of valid keys
# For now, we'll use an environment variable
expected_key = os.getenv("API_KEY")
if not expected_key or credentials.credentials != expected_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing API key",
)
return credentials.credentials
def is_ticket_rate_limited(ip_address: str) -> bool:
"""Check if an IP address has exceeded ticket submission rate limits"""
current_time = time.time()
# Clean old requests outside the time window
request_counts[ip_address] = [
req_time for req_time in request_counts[ip_address]
if current_time - req_time < TICKET_TIME_WINDOW
]
# Check if limit exceeded
if len(request_counts[ip_address]) >= TICKET_RATE_LIMIT:
return True
# Add current request
request_counts[ip_address].append(current_time)
return False
def is_meeting_rate_limited(ip_address: str) -> bool:
"""Check if an IP address has exceeded meeting scheduling rate limits"""
current_time = time.time()
# Clean old requests outside the time window
request_counts[ip_address] = [
req_time for req_time in request_counts[ip_address]
if current_time - req_time < MEETING_TIME_WINDOW
]
# Check if limit exceeded
if len(request_counts[ip_address]) >= MEETING_RATE_LIMIT:
return True
# Add current request
request_counts[ip_address].append(current_time)
return False
# def query_launchlabs_bot_stream(user_message: str, language: str = "english", session_id: Optional[str] = None):
# """
# Query the Launchlabs bot with streaming - returns async generator.
# Now includes language context and session history.
# FIXED: Proper spacing between words in streaming responses.
# Implements fallback to non-streaming when streaming fails (e.g., with Gemini models).
# """
# logger.info(f"AGENT STREAM CALL: query_launchlabs_bot_stream called with message='{user_message}', language='{language}', session_id='{session_id}'")
# # Get session history if session_id is provided
# history = []
# if session_id:
# history = session_manager.get_session_history(session_id)
# logger.info(f"Retrieved {len(history)} history messages for session {session_id}")
# try:
# # Create context with language preference and history
# context_data = {"language": language}
# if history:
# context_data["history"] = history
# ctx = RunContextWrapper(context=context_data)
# result = Runner.run_streamed(
# launchlabs_assistant,
# input=user_message,
# context=ctx.context
# )
# async def generate_stream():
# try:
# accumulated_text = "" # FIXED: Track full response for proper spacing
# has_streamed = False
# try:
# # Attempt streaming with error handling for each event
# async for event in result.stream_events():
# try:
# if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
# delta = event.data.delta or ""
# # ---- Spacing Fix (CORRECTED) ----
# # Check against accumulated text, not just previous chunk
# if (
# accumulated_text # Only add space if we have previous text
# and not accumulated_text.endswith((" ", "\n", "\t")) # Previous doesn't end with whitespace
# and not delta.startswith((" ", ".", ",", "?", "!", ":", ";", "\n", "\t", ")", "]", "}", "'", '"')) # Current doesn't start with punctuation/whitespace
# and delta # Make sure delta isn't empty
# ):
# delta = " " + delta
# accumulated_text += delta # Update accumulated text
# # ---- End Fix ----
# yield f"data: {delta}\n\n"
# has_streamed = True
# except Exception as event_error:
# # Handle individual event errors (e.g., missing logprobs field)
# logger.warning(f"Event processing error: {event_error}")
# continue
# # Add complete response to session history
# if accumulated_text and session_id:
# session_manager.add_message_to_history(session_id, "assistant", accumulated_text)
# logger.info(f"Added assistant response to session history: {session_id}")
# yield "data: [DONE]\n\n"
# logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream completed successfully")
# except Exception as stream_error:
# # Fallback to non-streaming if streaming fails
# logger.warning(f"Streaming failed, falling back to non-streaming: {stream_error}")
# if not has_streamed:
# # Get final output using the streaming result's final_output property
# try:
# # Use the non-streaming API as fallback
# fallback_response = await Runner.run(
# launchlabs_assistant,
# input=user_message,
# context=ctx.context
# )
# if hasattr(fallback_response, 'final_output'):
# final_output = fallback_response.final_output
# else:
# final_output = fallback_response
# if hasattr(final_output, 'content'):
# response_text = final_output.content
# elif isinstance(final_output, str):
# response_text = final_output
# else:
# response_text = str(final_output)
# # Add to session history
# if session_id:
# session_manager.add_message_to_history(session_id, "assistant", response_text)
# logger.info(f"Added fallback assistant response to session history: {session_id}")
# yield f"data: {response_text}\n\n"
# yield "data: [DONE]\n\n"
# logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream fallback completed successfully")
# except Exception as fallback_error:
# logger.error(f"Fallback also failed: {fallback_error}", exc_info=True)
# yield f"data: [ERROR] Unable to complete request.\n\n"
# else:
# # Already streamed some content, just end gracefully
# yield "data: [DONE]\n\n"
# except InputGuardrailTripwireTriggered as e:
# logger.warning(f"Guardrail blocked query during streaming: {e}")
# yield f"data: [ERROR] Query was blocked by content guardrail.\n\n"
# except Exception as e:
# logger.error(f"Streaming error: {e}", exc_info=True)
# yield f"data: [ERROR] {str(e)}\n\n"
# return generate_stream()
# except Exception as e:
# logger.error(f"Error setting up stream: {e}", exc_info=True)
# async def error_stream():
# yield f"data: [ERROR] Failed to initialize stream.\n\n"
# return error_stream()
# def query_launchlabs_bot_stream(user_message: str, language: str = "english", session_id: Optional[str] = None):
# """
# Query the Launchlabs bot with streaming - returns async generator.
# COMPLETELY FIXED: Simple and reliable spacing logic
# """
# logger.info(f"AGENT STREAM CALL: query_launchlabs_bot_stream called with message='{user_message}', language='{language}', session_id='{session_id}'")
# # Get session history if session_id is provided
# history = []
# if session_id:
# history = session_manager.get_session_history(session_id)
# logger.info(f"Retrieved {len(history)} history messages for session {session_id}")
# try:
# # Create context with language preference and history
# context_data = {"language": language}
# if history:
# context_data["history"] = history
# ctx = RunContextWrapper(context=context_data)
# result = Runner.run_streamed(
# launchlabs_assistant,
# input=user_message,
# context=ctx.context
# )
# async def generate_stream():
# try:
# accumulated_text = ""
# has_streamed = False
# try:
# async for event in result.stream_events():
# try:
# if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
# delta = event.data.delta or ""
# if not delta:
# continue
# # COMPLETELY FIXED APPROACH: Just send the delta as-is from OpenAI
# # OpenAI already includes proper spaces, so we don't need to add them
# accumulated_text += delta
# # Send delta exactly as received
# yield f"data: {delta}\n\n"
# has_streamed = True
# except Exception as event_error:
# logger.warning(f"Event processing error: {event_error}")
# continue
# # Add complete response to session history
# if accumulated_text and session_id:
# session_manager.add_message_to_history(session_id, "assistant", accumulated_text)
# logger.info(f"Added assistant response to session history: {session_id}")
# yield "data: [DONE]\n\n"
# logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream completed successfully")
# except Exception as stream_error:
# logger.warning(f"Streaming failed, falling back to non-streaming: {stream_error}")
# if not has_streamed:
# try:
# fallback_response = await Runner.run(
# launchlabs_assistant,
# input=user_message,
# context=ctx.context
# )
# if hasattr(fallback_response, 'final_output'):
# final_output = fallback_response.final_output
# else:
# final_output = fallback_response
# if hasattr(final_output, 'content'):
# response_text = final_output.content
# elif isinstance(final_output, str):
# response_text = final_output
# else:
# response_text = str(final_output)
# if session_id:
# session_manager.add_message_to_history(session_id, "assistant", response_text)
# logger.info(f"Added fallback assistant response to session history: {session_id}")
# yield f"data: {response_text}\n\n"
# yield "data: [DONE]\n\n"
# logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream fallback completed successfully")
# except Exception as fallback_error:
# logger.error(f"Fallback also failed: {fallback_error}", exc_info=True)
# yield f"data: [ERROR] Unable to complete request.\n\n"
# else:
# yield "data: [DONE]\n\n"
# except InputGuardrailTripwireTriggered as e:
# logger.warning(f"Guardrail blocked query during streaming: {e}")
# yield f"data: [ERROR] Query was blocked by content guardrail.\n\n"
# except Exception as e:
# logger.error(f"Streaming error: {e}", exc_info=True)
# yield f"data: [ERROR] {str(e)}\n\n"
# return generate_stream()
# except Exception as e:
# logger.error(f"Error setting up stream: {e}", exc_info=True)
# async def error_stream():
# yield f"data: [ERROR] Failed to initialize stream.\n\n"
# return error_stream()
def query_launchlabs_bot_stream(user_message: str, language: str = "english", session_id: Optional[str] = None):
"""
Query the Launchlabs bot with streaming - FIXED VERSION
Simply passes through what OpenAI sends without any modification
"""
logger.info(f"AGENT STREAM CALL: query_launchlabs_bot_stream called with message='{user_message}', language='{language}', session_id='{session_id}'")
# Get session history if session_id is provided
history = []
if session_id:
history = session_manager.get_session_history(session_id)
logger.info(f"Retrieved {len(history)} history messages for session {session_id}")
try:
# Create context with language preference and history
context_data = {"language": language}
if history:
context_data["history"] = history
ctx = RunContextWrapper(context=context_data)
result = Runner.run_streamed(
launchlabs_assistant,
input=user_message,
context=ctx.context
)
async def generate_stream():
try:
accumulated_text = ""
has_streamed = False
try:
async for event in result.stream_events():
try:
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
delta = event.data.delta
if delta: # Only process if delta has content
# CRITICAL: Send delta exactly as received - NO MODIFICATIONS
accumulated_text += delta
yield f"data: {delta}\n\n"
has_streamed = True
except Exception as event_error:
logger.warning(f"Event processing error: {event_error}")
continue
# Add complete response to session history
if accumulated_text and session_id:
session_manager.add_message_to_history(session_id, "assistant", accumulated_text)
logger.info(f"Added assistant response to session history: {session_id}")
yield "data: [DONE]\n\n"
logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream completed successfully")
except Exception as stream_error:
logger.warning(f"Streaming failed, falling back to non-streaming: {stream_error}")
if not has_streamed:
try:
fallback_response = await Runner.run(
launchlabs_assistant,
input=user_message,
context=ctx.context
)
if hasattr(fallback_response, 'final_output'):
final_output = fallback_response.final_output
else:
final_output = fallback_response
if hasattr(final_output, 'content'):
response_text = final_output.content
elif isinstance(final_output, str):
response_text = final_output
else:
response_text = str(final_output)
if session_id:
session_manager.add_message_to_history(session_id, "assistant", response_text)
logger.info(f"Added fallback assistant response to session history: {session_id}")
yield f"data: {response_text}\n\n"
yield "data: [DONE]\n\n"
logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream fallback completed successfully")
except Exception as fallback_error:
logger.error(f"Fallback also failed: {fallback_error}", exc_info=True)
yield f"data: [ERROR] Unable to complete request.\n\n"
else:
yield "data: [DONE]\n\n"
except InputGuardrailTripwireTriggered as e:
logger.warning(f"Guardrail blocked query during streaming: {e}")
yield f"data: [ERROR] Query was blocked by content guardrail.\n\n"
except Exception as e:
logger.error(f"Streaming error: {e}", exc_info=True)
yield f"data: [ERROR] {str(e)}\n\n"
return generate_stream()
except Exception as e:
logger.error(f"Error setting up stream: {e}", exc_info=True)
async def error_stream():
yield f"data: [ERROR] Failed to initialize stream.\n\n"
return error_stream()
async def query_launchlabs_bot(user_message: str, language: str = "english", session_id: Optional[str] = None):
"""
Query the Launchlabs bot - returns complete response.
Now includes language context and session history.
"""
logger.info(f"AGENT CALL: query_launchlabs_bot called with message='{user_message}', language='{language}', session_id='{session_id}'")
# Get session history if session_id is provided
history = []
if session_id:
history = session_manager.get_session_history(session_id)
logger.info(f"Retrieved {len(history)} history messages for session {session_id}")
try:
# Create context with language preference and history
context_data = {"language": language}
if history:
context_data["history"] = history
ctx = RunContextWrapper(context=context_data)
response = await Runner.run(
launchlabs_assistant,
input=user_message,
context=ctx.context
)
logger.info("AGENT RESULT: query_launchlabs_bot completed successfully")
return response.final_output
except InputGuardrailTripwireTriggered as e:
logger.warning(f"Guardrail blocked query: {e}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Query was blocked by content guardrail. Please ensure your query is related to Launchlabs services."
)
except Exception as e:
logger.error(f"Error in query_launchlabs_bot: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An internal error occurred while processing your request."
)
@app.get("/")
async def root():
return {"status": "ok", "service": "Launchlabs Chatbot API"}
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.post("/session")
async def create_session():
"""
Create a new chat session
Returns a session ID that can be used to maintain chat history
"""
try:
session_id = session_manager.create_session()
logger.info(f"Created new session: {session_id}")
return {"session_id": session_id, "message": "Session created successfully"}
except Exception as e:
logger.error(f"Error creating session: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create session"
)
@app.post("/chat", response_model=ChatResponse)
@limiter.limit("10/minute") # Limit to 10 requests per minute per IP
async def chat(request: Request, chat_request: ChatRequest):
"""
Standard chat endpoint with language support and session history.
Accepts: {"message": "...", "language": "norwegian", "session_id": "optional-session-id"}
"""
try:
# Create or use existing session
session_id = chat_request.session_id
if not session_id:
session_id = session_manager.create_session()
logger.info(f"Created new session for chat: {session_id}")
logger.info(
f"Chat request from {get_remote_address(request)}: "
f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}"
)
# Add user message to session history
session_manager.add_message_to_history(session_id, "user", chat_request.message)
# Pass language and session to the bot
response = await query_launchlabs_bot(
chat_request.message,
language=chat_request.language,
session_id=session_id
)
if hasattr(response, 'content'):
response_text = response.content
elif isinstance(response, str):
response_text = response
else:
response_text = str(response)
# Add bot response to session history
session_manager.add_message_to_history(session_id, "assistant", response_text)
logger.info(f"Chat response generated successfully in {chat_request.language} for session {session_id}")
return ChatResponse(
response=response_text,
success=True,
session_id=session_id
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error in /chat: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An internal error occurred while processing your request."
)
@app.post("/api/messages", response_model=ChatResponse)
@limiter.limit("10/minute") # Same rate limit as /chat
async def api_messages(request: Request, chat_request: ChatRequest):
"""
Frontend-friendly chat endpoint at /api/messages.
Exactly mirrors /chat logic for session/history support.
Expects: {"message": "...", "language": "english", "session_id": "optional"}
"""
client_ip = get_remote_address(request)
logger.info(f"API Messages request from {client_ip}: message='{chat_request.message[:50]}...', lang='{chat_request.language}', session='{chat_request.session_id}'")
try:
# Create/use session (Firestore-backed)
session_id = chat_request.session_id
if not session_id:
session_id = session_manager.create_session()
logger.info(f"New session created for /api/messages: {session_id}")
# Save user message to history
session_manager.add_message_to_history(session_id, "user", chat_request.message)
# Call your existing bot query function
response = await query_launchlabs_bot(
user_message=chat_request.message,
language=chat_request.language,
session_id=session_id
)
# Extract response text
response_text = (
response.content if hasattr(response, 'content')
else response if isinstance(response, str)
else str(response)
)
# Save AI response to history
session_manager.add_message_to_history(session_id, "assistant", response_text)
logger.info(f"API Messages success: Response sent for session {session_id}")
return ChatResponse(
response=response_text,
success=True,
session_id=session_id
)
except InputGuardrailTripwireTriggered as e:
logger.warning(f"Guardrail blocked /api/messages: {e}")
raise HTTPException(
status_code=403,
detail="Query blocked – please ask about Launchlabs services."
)
except Exception as e:
logger.error(f"Error in /api/messages: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail="Internal error – try again."
)
@app.post("/chat-stream")
@limiter.limit("10/minute") # Limit to 10 requests per minute per IP
async def chat_stream(request: Request, chat_request: ChatRequest):
"""
Streaming chat endpoint with language support and session history.
Accepts: {"message": "...", "language": "norwegian", "session_id": "optional-session-id"}
"""
try:
# Create or use existing session
session_id = chat_request.session_id
if not session_id:
session_id = session_manager.create_session()
logger.info(f"Created new session for streaming chat: {session_id}")
logger.info(
f"Stream request from {get_remote_address(request)}: "
f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}"
)
# Add user message to session history
session_manager.add_message_to_history(session_id, "user", chat_request.message)
# Pass language and session to the streaming bot
stream_generator = query_launchlabs_bot_stream(
chat_request.message,
language=chat_request.language,
session_id=session_id
)
# Note: Response is added to history inside the stream generator after completion
return StreamingResponse(
stream_generator,
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"Session-ID": session_id # Include session ID in headers
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error in /chat-stream: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An internal error occurred while processing your request."
)
@app.post("/ticket", response_model=TicketResponse)
@limiter.limit("5/hour") # Limit to 5 tickets per hour per IP
async def submit_ticket(request: Request, ticket_request: TicketRequest):
"""
Submit a support ticket via email using Resend API.
Accepts: {"name": "John Doe", "email": "john@example.com", "message": "Issue description"}
"""
try:
client_ip = get_remote_address(request)
logger.info(f"Ticket submission request from {ticket_request.name} ({ticket_request.email}) - IP: {client_ip}")
# Additional rate limiting for tickets
if is_ticket_rate_limited(client_ip):
logger.warning(f"Rate limit exceeded for ticket submission from IP: {client_ip}")
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many ticket submissions. Please try again later."
)
# Get admin email from environment variables or use a default
admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com")
# Use a verified sender email (you need to verify this in your Resend account)
# For testing purposes, you can use your Resend account's verified domain
sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev")
# Prepare the email using Resend
params = {
"from": sender_email,
"to": [admin_email],
"subject": f"Support Ticket from {ticket_request.name}",
"html": f"""
<p>Hello Admin,</p>
<p>A new support ticket has been submitted:</p>
<p><strong>Name:</strong> {ticket_request.name}</p>
<p><strong>Email:</strong> {ticket_request.email}</p>
<p><strong>Message:</strong></p>
<p>{ticket_request.message}</p>
<p><strong>IP Address:</strong> {client_ip}</p>
<br>
<p>Best regards,<br>Launchlabs Support Team</p>
"""
}
# Send the email
email = resend.Emails.send(params)
logger.info(f"Ticket submitted successfully by {ticket_request.name} from IP: {client_ip}")
return TicketResponse(
success=True,
message="Ticket submitted successfully. We'll get back to you soon."
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error submitting ticket: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to submit ticket. Please try again later."
)
@app.post("/schedule-meeting", response_model=MeetingResponse)
@limiter.limit("3/hour") # Limit to 3 meetings per hour per IP
async def schedule_meeting(request: Request, meeting_request: MeetingRequest):
"""
Schedule a meeting and send email invitations using Resend API.
Accepts meeting details and sends professional email invitations to organizer and attendees.
"""
try:
client_ip = get_remote_address(request)
logger.info(f"Meeting scheduling request from {meeting_request.name} ({meeting_request.email}) - IP: {client_ip}")
# Additional rate limiting for meetings
if is_meeting_rate_limited(client_ip):
logger.warning(f"Rate limit exceeded for meeting scheduling from IP: {client_ip}")
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many meeting requests. Please try again later."
)
# Generate a unique meeting ID
meeting_id = f"mtg_{int(time.time())}"
# Get admin email from environment variables or use a default
admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com")
# Use a verified sender email (you need to verify this in your Resend account)
sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev")
# For Resend testing limitations, we can only send to the owner's email
# In production, you would verify a domain and use that instead
owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com")
# Format date and time for display
formatted_datetime = f"{meeting_request.date} at {meeting_request.time} {meeting_request.timezone}"
# Create calendar link (Google Calendar link example)
calendar_link = f"https://calendar.google.com/calendar/render?action=TEMPLATE&text={meeting_request.topic}&dates={meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z/{meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z&details={meeting_request.description or 'Meeting scheduled via Launchlabs'}&location={meeting_request.location}"
# Combine all attendees (organizer + additional attendees)
# Validate and format email addresses
all_attendees = [meeting_request.email]
# Validate additional attendees - they must be valid email addresses
for attendee in meeting_request.attendees:
# Simple email validation
if "@" in attendee and "." in attendee:
all_attendees.append(attendee)
else:
# If not a valid email, skip or treat as name only
logger.warning(f"Invalid email format for attendee: {attendee}. Skipping.")
# Remove duplicates while preserving order
seen = set()
unique_attendees = []
for email in all_attendees:
if email not in seen:
seen.add(email)
unique_attendees.append(email)
all_attendees = unique_attendees
# Prepare the professional HTML email template
html_template = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Meeting Scheduled - {meeting_request.topic}</title>
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333; max-width: 600px; margin: 0 auto; padding: 20px;">
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 30px; text-align: center; border-radius: 10px 10px 0 0;">
<h1 style="margin: 0; font-size: 28px;">Meeting Confirmed!</h1>
<p style="font-size: 18px; margin-top: 10px;">Your meeting has been successfully scheduled</p>
</div>
<div style="background-color: #ffffff; padding: 30px; border: 1px solid #eaeaea; border-top: none; border-radius: 0 0 10px 10px;">
<h2 style="color: #333;">Meeting Details</h2>
<div style="background-color: #f8f9fa; padding: 20px; border-radius: 8px; margin: 20px 0;">
<table style="width: 100%; border-collapse: collapse;">
<tr>
<td style="padding: 8px 0; font-weight: bold; width: 30%;">Topic:</td>
<td style="padding: 8px 0;">{meeting_request.topic}</td>
</tr>
<tr style="background-color: #f0f0f0;">
<td style="padding: 8px 0; font-weight: bold;">Date & Time:</td>
<td style="padding: 8px 0;">{formatted_datetime}</td>
</tr>
<tr>
<td style="padding: 8px 0; font-weight: bold;">Duration:</td>
<td style="padding: 8px 0;">{meeting_request.duration} minutes</td>
</tr>
<tr style="background-color: #f0f0f0;">
<td style="padding: 8px 0; font-weight: bold;">Location:</td>
<td style="padding: 8px 0;">{meeting_request.location}</td>
</tr>
<tr>
<td style="padding: 8px 0; font-weight: bold;">Organizer:</td>
<td style="padding: 8px 0;">{meeting_request.name} ({meeting_request.email})</td>
</tr>
</table>
</div>
<div style="margin: 25px 0;">
<h3 style="color: #333;">Description</h3>
<p style="background-color: #f8f9fa; padding: 15px; border-radius: 8px; white-space: pre-wrap;">{meeting_request.description or 'No description provided.'}</p>
</div>
<div style="margin: 25px 0;">
<h3 style="color: #333;">Attendees</h3>
<ul style="background-color: #f8f9fa; padding: 15px; border-radius: 8px;">
{''.join([f'<li>{attendee}</li>' for attendee in all_attendees])}
</ul>
<p style="font-size: 12px; color: #666; margin-top: 5px;">Note: Only valid email addresses will receive invitations.</p>
</div>
<div style="text-align: center; margin: 30px 0;">
<a href="{calendar_link}" style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 12px 25px; text-decoration: none; border-radius: 5px; font-weight: bold; display: inline-block;">Add to Calendar</a>
</div>
<div style="background-color: #e3f2fd; padding: 15px; border-radius: 8px; margin-top: 25px;">
<p style="margin: 0;"><strong>Meeting ID:</strong> {meeting_id}</p>
<p style="margin: 10px 0 0 0; font-size: 14px; color: #666;">Need to make changes? Contact the organizer or reply to this email.</p>
</div>
</div>
<div style="text-align: center; margin-top: 30px; color: #888; font-size: 14px;">
<p>This meeting was scheduled through Launchlabs Chatbot Services</p>
<p><strong>Note:</strong> Due to Resend testing limitations, this email is only sent to the administrator. In production, after domain verification, invitations will be sent to all attendees.</p>
<p>© 2025 Launchlabs. All rights reserved.</p>
</div>
</body>
</html>
"""
# Send email to all attendees
# Check if we have valid attendees to send to
if not all_attendees:
logger.warning("No valid email addresses found for meeting attendees")
return MeetingResponse(
success=True,
message="Meeting scheduled successfully, but no valid email addresses found for invitations.",
meeting_id=meeting_id
)
# For Resend testing limitations, we can only send to the owner's email
# In production, you would verify a domain and send to all attendees
owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com")
# Prepare email for owner with all attendee information
attendee_list_html = ''.join([f'<li>{attendee}</li>' for attendee in all_attendees])
# In a real implementation, you would send to all attendees after verifying your domain
# For now, we're sending to the owner with information about all attendees
params = {
"from": sender_email,
"to": [owner_email], # Only send to owner due to Resend testing limitations
"subject": f"Meeting Scheduled: {meeting_request.topic}",
"html": html_template
}
# Send the email
try:
email = resend.Emails.send(params)
logger.info(f"Email sent successfully to {len(all_attendees)} attendees")
except Exception as email_error:
logger.error(f"Failed to send email: {email_error}", exc_info=True)
# Even if email fails, we still consider the meeting scheduled
return MeetingResponse(
success=True,
message="Meeting scheduled successfully, but failed to send email invitations.",
meeting_id=meeting_id
)
logger.info(f"Meeting scheduled successfully by {meeting_request.name} from IP: {client_ip}")
return MeetingResponse(
success=True,
message="Meeting scheduled successfully. Due to Resend testing limitations, invitations are only sent to the administrator. In production, after verifying your domain, invitations will be sent to all attendees.",
meeting_id=meeting_id
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error scheduling meeting: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to schedule meeting. Please try again later."
)
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(
f"Unhandled exception: {exc}",
exc_info=True,
extra={
"path": request.url.path,
"method": request.method,
"client": get_remote_address(request)
}
)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": "Internal server error",
"detail": "An unexpected error occurred. Please try again later."
}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)