Spaces:
Configuration error
Configuration error
| # """ | |
| # FastAPI application for Launchlabs Chatbot API | |
| # Provides /chat and /chat-stream endpoints with rate limiting, CORS, and error handling | |
| # Updated with language context support | |
| # """ | |
| # import os | |
| # import logging | |
| # import time | |
| # from typing import Optional | |
| # from collections import defaultdict | |
| # import resend | |
| # from fastapi import FastAPI, Request, HTTPException, status, Depends, Header | |
| # from fastapi.responses import StreamingResponse, JSONResponse | |
| # from fastapi.middleware.cors import CORSMiddleware | |
| # from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| # from pydantic import BaseModel | |
| # from slowapi import Limiter, _rate_limit_exceeded_handler | |
| # from slowapi.util import get_remote_address | |
| # from slowapi.errors import RateLimitExceeded | |
| # from slowapi.middleware import SlowAPIMiddleware | |
| # from dotenv import load_dotenv | |
| # from agents import Runner, RunContextWrapper | |
| # from agents.exceptions import InputGuardrailTripwireTriggered | |
| # from openai.types.responses import ResponseTextDeltaEvent | |
| # from chatbot.chatbot_agent import launchlabs_assistant | |
| # from sessions.session_manager import session_manager | |
| # # Load environment variables | |
| # load_dotenv() | |
| # # Configure Resend | |
| # resend.api_key = os.getenv("RESEND_API_KEY") | |
| # # Configure logging | |
| # logging.basicConfig( | |
| # level=logging.INFO, | |
| # format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| # handlers=[ | |
| # logging.FileHandler('app.log'), | |
| # logging.StreamHandler() | |
| # ] | |
| # ) | |
| # logger = logging.getLogger(__name__) | |
| # # Initialize rate limiter with enhanced security | |
| # limiter = Limiter(key_func=get_remote_address, default_limits=["100/day", "20/hour", "3/minute"]) | |
| # # Create FastAPI app | |
| # app = FastAPI( | |
| # title="Launchlabs Chatbot API", | |
| # description="AI-powered chatbot API for Launchlabs services", | |
| # version="1.0.0" | |
| # ) | |
| # # Add rate limiter middleware | |
| # app.state.limiter = limiter | |
| # app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) | |
| # app.add_middleware(SlowAPIMiddleware) | |
| # # Configure CORS from environment variable | |
| # allowed_origins = os.getenv("ALLOWED_ORIGINS", "").split(",") | |
| # allowed_origins = [origin.strip() for origin in allowed_origins if origin.strip()] | |
| # if allowed_origins: | |
| # app.add_middleware( | |
| # CORSMiddleware, | |
| # allow_origins=["*"] + allowed_origins, | |
| # allow_credentials=True, | |
| # allow_methods=["*"], | |
| # allow_headers=["*"], | |
| # ) | |
| # logger.info(f"CORS enabled for origins: {allowed_origins}") | |
| # else: | |
| # logger.warning("No ALLOWED_ORIGINS set in .env - CORS disabled") | |
| # # Security setup | |
| # security = HTTPBearer() | |
| # # Enhanced rate limiting dictionaries | |
| # request_counts = defaultdict(list) # Track requests per IP | |
| # TICKET_RATE_LIMIT = 5 # Max 5 tickets per hour per IP | |
| # TICKET_TIME_WINDOW = 3600 # 1 hour in seconds | |
| # MEETING_RATE_LIMIT = 3 # Max 3 meetings per hour per IP | |
| # MEETING_TIME_WINDOW = 3600 # 1 hour in seconds | |
| # # Request/Response models | |
| # class ChatRequest(BaseModel): | |
| # message: str | |
| # language: Optional[str] = "english" # Default to English if not specified | |
| # session_id: Optional[str] = None # Session ID for chat history | |
| # class ChatResponse(BaseModel): | |
| # response: str | |
| # success: bool | |
| # session_id: str # Include session ID in response | |
| # class ErrorResponse(BaseModel): | |
| # error: str | |
| # detail: Optional[str] = None | |
| # class TicketRequest(BaseModel): | |
| # name: str | |
| # email: str | |
| # message: str | |
| # class TicketResponse(BaseModel): | |
| # success: bool | |
| # message: str | |
| # class MeetingRequest(BaseModel): | |
| # name: str | |
| # email: str | |
| # date: str # ISO format date string | |
| # time: str # Time in HH:MM format | |
| # timezone: str # Timezone identifier | |
| # duration: int # Duration in minutes | |
| # topic: str # Meeting topic/title | |
| # attendees: list[str] # List of attendee emails | |
| # description: Optional[str] = None # Optional meeting description | |
| # location: Optional[str] = "Google Meet" # Meeting location/platform | |
| # class MeetingResponse(BaseModel): | |
| # success: bool | |
| # message: str | |
| # meeting_id: Optional[str] = None # Unique identifier for the meeting | |
| # # Security dependency for API key validation | |
| # async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
| # """Verify API key for protected endpoints""" | |
| # # In production, you would check against a database of valid keys | |
| # # For now, we'll use an environment variable | |
| # expected_key = os.getenv("API_KEY") | |
| # if not expected_key or credentials.credentials != expected_key: | |
| # raise HTTPException( | |
| # status_code=status.HTTP_401_UNAUTHORIZED, | |
| # detail="Invalid or missing API key", | |
| # ) | |
| # return credentials.credentials | |
| # def is_ticket_rate_limited(ip_address: str) -> bool: | |
| # """Check if an IP address has exceeded ticket submission rate limits""" | |
| # current_time = time.time() | |
| # # Clean old requests outside the time window | |
| # request_counts[ip_address] = [ | |
| # req_time for req_time in request_counts[ip_address] | |
| # if current_time - req_time < TICKET_TIME_WINDOW | |
| # ] | |
| # # Check if limit exceeded | |
| # if len(request_counts[ip_address]) >= TICKET_RATE_LIMIT: | |
| # return True | |
| # # Add current request | |
| # request_counts[ip_address].append(current_time) | |
| # return False | |
| # def is_meeting_rate_limited(ip_address: str) -> bool: | |
| # """Check if an IP address has exceeded meeting scheduling rate limits""" | |
| # current_time = time.time() | |
| # # Clean old requests outside the time window | |
| # request_counts[ip_address] = [ | |
| # req_time for req_time in request_counts[ip_address] | |
| # if current_time - req_time < MEETING_TIME_WINDOW | |
| # ] | |
| # # Check if limit exceeded | |
| # if len(request_counts[ip_address]) >= MEETING_RATE_LIMIT: | |
| # return True | |
| # # Add current request | |
| # request_counts[ip_address].append(current_time) | |
| # return False | |
| # def query_launchlabs_bot_stream(user_message: str, language: str = "english", session_id: Optional[str] = None): | |
| # """ | |
| # Query the Launchlabs bot with streaming - returns async generator. | |
| # Now includes language context and session history. | |
| # Implements fallback to non-streaming when streaming fails (e.g., with Gemini models). | |
| # """ | |
| # logger.info(f"AGENT STREAM CALL: query_launchlabs_bot_stream called with message='{user_message}', language='{language}', session_id='{session_id}'") | |
| # # Get session history if session_id is provided | |
| # history = [] | |
| # if session_id: | |
| # history = session_manager.get_session_history(session_id) | |
| # logger.info(f"Retrieved {len(history)} history messages for session {session_id}") | |
| # try: | |
| # # Create context with language preference and history | |
| # context_data = {"language": language} | |
| # if history: | |
| # context_data["history"] = history | |
| # ctx = RunContextWrapper(context=context_data) | |
| # result = Runner.run_streamed( | |
| # launchlabs_assistant, | |
| # input=user_message, | |
| # context=ctx.context | |
| # ) | |
| # async def generate_stream(): | |
| # try: | |
| # previous = "" | |
| # has_streamed = True | |
| # try: | |
| # # Attempt streaming with error handling for each event | |
| # async for event in result.stream_events(): | |
| # try: | |
| # if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | |
| # delta = event.data.delta or "" | |
| # # ---- Spacing Fix ---- | |
| # if ( | |
| # previous | |
| # and not previous.endswith((" ", "\n")) | |
| # and not delta.startswith((" ", ".", ",", "?", "!", ":", ";")) | |
| # ): | |
| # delta = " " + delta | |
| # previous = delta | |
| # # ---- End Fix ---- | |
| # yield f"data: {delta}\n\n" | |
| # has_streamed = True | |
| # except Exception as event_error: | |
| # # Handle individual event errors (e.g., missing logprobs field) | |
| # logger.warning(f"Event processing error: {event_error}") | |
| # continue | |
| # yield "data: [DONE]\n\n" | |
| # logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream completed successfully") | |
| # except Exception as stream_error: | |
| # # Fallback to non-streaming if streaming fails | |
| # logger.warning(f"Streaming failed, falling back to non-streaming: {stream_error}") | |
| # if not has_streamed: | |
| # # Get final output using the streaming result's final_output property | |
| # # Wait for the stream to complete to get final output | |
| # try: | |
| # # Use the non-streaming API as fallback | |
| # fallback_response = await Runner.run( | |
| # launchlabs_assistant, | |
| # input=user_message, | |
| # context=ctx.context | |
| # ) | |
| # if hasattr(fallback_response, 'final_output'): | |
| # final_output = fallback_response.final_output | |
| # else: | |
| # final_output = fallback_response | |
| # if hasattr(final_output, 'content'): | |
| # response_text = final_output.content | |
| # elif isinstance(final_output, str): | |
| # response_text = final_output | |
| # else: | |
| # response_text = str(final_output) | |
| # yield f"data: {response_text}\n\n" | |
| # yield "data: [DONE]\n\n" | |
| # logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream fallback completed successfully") | |
| # except Exception as fallback_error: | |
| # logger.error(f"Fallback also failed: {fallback_error}", exc_info=True) | |
| # yield f"data: [ERROR] Unable to complete request.\n\n" | |
| # else: | |
| # # Already streamed some content, just end gracefully | |
| # yield "data: [DONE]\n\n" | |
| # except InputGuardrailTripwireTriggered as e: | |
| # logger.warning(f"Guardrail blocked query during streaming: {e}") | |
| # yield f"data: [ERROR] Query was blocked by content guardrail.\n\n" | |
| # except Exception as e: | |
| # logger.error(f"Streaming error: {e}", exc_info=True) | |
| # yield f"data: [ERROR] {str(e)}\n\n" | |
| # return generate_stream() | |
| # except Exception as e: | |
| # logger.error(f"Error setting up stream: {e}", exc_info=True) | |
| # async def error_stream(): | |
| # yield f"data: [ERROR] Failed to initialize stream.\n\n" | |
| # return error_stream() | |
| # async def query_launchlabs_bot(user_message: str, language: str = "english", session_id: Optional[str] = None): | |
| # """ | |
| # Query the Launchlabs bot - returns complete response. | |
| # Now includes language context and session history. | |
| # """ | |
| # logger.info(f"AGENT CALL: query_launchlabs_bot called with message='{user_message}', language='{language}', session_id='{session_id}'") | |
| # # Get session history if session_id is provided | |
| # history = [] | |
| # if session_id: | |
| # history = session_manager.get_session_history(session_id) | |
| # logger.info(f"Retrieved {len(history)} history messages for session {session_id}") | |
| # try: | |
| # # Create context with language preference and history | |
| # context_data = {"language": language} | |
| # if history: | |
| # context_data["history"] = history | |
| # ctx = RunContextWrapper(context=context_data) | |
| # response = await Runner.run( | |
| # launchlabs_assistant, | |
| # input=user_message, | |
| # context=ctx.context | |
| # ) | |
| # logger.info("AGENT RESULT: query_launchlabs_bot completed successfully") | |
| # return response.final_output | |
| # except InputGuardrailTripwireTriggered as e: | |
| # logger.warning(f"Guardrail blocked query: {e}") | |
| # raise HTTPException( | |
| # status_code=status.HTTP_403_FORBIDDEN, | |
| # detail="Query was blocked by content guardrail. Please ensure your query is related to Launchlabs services." | |
| # ) | |
| # except Exception as e: | |
| # logger.error(f"Error in query_launchlabs_bot: {e}", exc_info=True) | |
| # raise HTTPException( | |
| # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| # detail="An internal error occurred while processing your request." | |
| # ) | |
| # @app.get("/") | |
| # async def root(): | |
| # return {"status": "ok", "service": "Launchlabs Chatbot API"} | |
| # @app.get("/health") | |
| # async def health(): | |
| # return {"status": "healthy"} | |
| # @app.post("/session") | |
| # async def create_session(): | |
| # """ | |
| # Create a new chat session | |
| # Returns a session ID that can be used to maintain chat history | |
| # """ | |
| # try: | |
| # session_id = session_manager.create_session() | |
| # logger.info(f"Created new session: {session_id}") | |
| # return {"session_id": session_id, "message": "Session created successfully"} | |
| # except Exception as e: | |
| # logger.error(f"Error creating session: {e}", exc_info=True) | |
| # raise HTTPException( | |
| # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| # detail="Failed to create session" | |
| # ) | |
| # @app.post("/chat", response_model=ChatResponse) | |
| # @limiter.limit("10/minute") # Limit to 10 requests per minute per IP | |
| # async def chat(request: Request, chat_request: ChatRequest): | |
| # """ | |
| # Standard chat endpoint with language support and session history. | |
| # Accepts: {"message": "...", "language": "norwegian", "session_id": "optional-session-id"} | |
| # """ | |
| # try: | |
| # # Create or use existing session | |
| # session_id = chat_request.session_id | |
| # if not session_id: | |
| # session_id = session_manager.create_session() | |
| # logger.info(f"Created new session for chat: {session_id}") | |
| # logger.info( | |
| # f"Chat request from {get_remote_address(request)}: " | |
| # f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}" | |
| # ) | |
| # # Add user message to session history | |
| # session_manager.add_message_to_history(session_id, "user", chat_request.message) | |
| # # Pass language and session to the bot | |
| # response = await query_launchlabs_bot( | |
| # chat_request.message, | |
| # language=chat_request.language, | |
| # session_id=session_id | |
| # ) | |
| # if hasattr(response, 'content'): | |
| # response_text = response.content | |
| # elif isinstance(response, str): | |
| # response_text = response | |
| # else: | |
| # response_text = str(response) | |
| # # Add bot response to session history | |
| # session_manager.add_message_to_history(session_id, "assistant", response_text) | |
| # logger.info(f"Chat response generated successfully in {chat_request.language} for session {session_id}") | |
| # return ChatResponse( | |
| # response=response_text, | |
| # success=True, | |
| # session_id=session_id | |
| # ) | |
| # except HTTPException: | |
| # raise | |
| # except Exception as e: | |
| # logger.error(f"Unexpected error in /chat: {e}", exc_info=True) | |
| # raise HTTPException( | |
| # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| # detail="An internal error occurred while processing your request." | |
| # ) | |
| # @app.post("/api/messages", response_model=ChatResponse) | |
| # @limiter.limit("10/minute") # Same rate limit as /chat | |
| # async def api_messages(request: Request, chat_request: ChatRequest): | |
| # """ | |
| # Frontend-friendly chat endpoint at /api/messages. | |
| # Exactly mirrors /chat logic for session/history support. | |
| # Expects: {"message": "...", "language": "english", "session_id": "optional"} | |
| # """ | |
| # client_ip = get_remote_address(request) | |
| # logger.info(f"API Messages request from {client_ip}: message='{chat_request.message[:50]}...', lang='{chat_request.language}', session='{chat_request.session_id}'") | |
| # try: | |
| # # Create/use session (Firestore-backed) | |
| # session_id = chat_request.session_id | |
| # if not session_id: | |
| # session_id = session_manager.create_session() | |
| # logger.info(f"New session created for /api/messages: {session_id}") | |
| # # Save user message to history | |
| # session_manager.add_message_to_history(session_id, "user", chat_request.message) | |
| # # Call your existing bot query function | |
| # response = await query_launchlabs_bot( | |
| # user_message=chat_request.message, | |
| # language=chat_request.language, | |
| # session_id=session_id | |
| # ) | |
| # # Extract response text | |
| # response_text = ( | |
| # response.content if hasattr(response, 'content') | |
| # else response if isinstance(response, str) | |
| # else str(response) | |
| # ) | |
| # # Save AI response to history | |
| # session_manager.add_message_to_history(session_id, "assistant", response_text) | |
| # logger.info(f"API Messages success: Response sent for session {session_id}") | |
| # return ChatResponse( | |
| # response=response_text, | |
| # success=True, | |
| # session_id=session_id | |
| # ) | |
| # except InputGuardrailTripwireTriggered as e: | |
| # logger.warning(f"Guardrail blocked /api/messages: {e}") | |
| # raise HTTPException( | |
| # status_code=403, | |
| # detail="Query blocked – please ask about Launchlabs services." | |
| # ) | |
| # except Exception as e: | |
| # logger.error(f"Error in /api/messages: {e}", exc_info=True) | |
| # raise HTTPException( | |
| # status_code=500, | |
| # detail="Internal error – try again." | |
| # ) | |
| # @app.post("/chat-stream") | |
| # @limiter.limit("10/minute") # Limit to 10 requests per minute per IP | |
| # async def chat_stream(request: Request, chat_request: ChatRequest): | |
| # """ | |
| # Streaming chat endpoint with language support and session history. | |
| # Accepts: {"message": "...", "language": "norwegian", "session_id": "optional-session-id"} | |
| # """ | |
| # try: | |
| # # Create or use existing session | |
| # session_id = chat_request.session_id | |
| # if not session_id: | |
| # session_id = session_manager.create_session() | |
| # logger.info(f"Created new session for streaming chat: {session_id}") | |
| # logger.info( | |
| # f"Stream request from {get_remote_address(request)}: " | |
| # f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}" | |
| # ) | |
| # # Add user message to session history | |
| # session_manager.add_message_to_history(session_id, "user", chat_request.message) | |
| # # Pass language and session to the streaming bot | |
| # stream_generator = query_launchlabs_bot_stream( | |
| # chat_request.message, | |
| # language=chat_request.language, | |
| # session_id=session_id | |
| # ) | |
| # # Note: For streaming, we add the response to history after the stream completes | |
| # # This would need to be handled in the frontend by making a separate call or | |
| # # by modifying the stream generator to add the complete response to history | |
| # return StreamingResponse( | |
| # stream_generator, | |
| # media_type="text/event-stream", | |
| # headers={ | |
| # "Cache-Control": "no-cache", | |
| # "Connection": "keep-alive", | |
| # "X-Accel-Buffering": "no", | |
| # "Session-ID": session_id # Include session ID in headers | |
| # } | |
| # ) | |
| # except HTTPException: | |
| # raise | |
| # except Exception as e: | |
| # logger.error(f"Unexpected error in /chat-stream: {e}", exc_info=True) | |
| # raise HTTPException( | |
| # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| # detail="An internal error occurred while processing your request." | |
| # ) | |
| # @app.post("/ticket", response_model=TicketResponse) | |
| # @limiter.limit("5/hour") # Limit to 5 tickets per hour per IP | |
| # async def submit_ticket(request: Request, ticket_request: TicketRequest): | |
| # """ | |
| # Submit a support ticket via email using Resend API. | |
| # Accepts: {"name": "John Doe", "email": "john@example.com", "message": "Issue description"} | |
| # """ | |
| # try: | |
| # client_ip = get_remote_address(request) | |
| # logger.info(f"Ticket submission request from {ticket_request.name} ({ticket_request.email}) - IP: {client_ip}") | |
| # # Additional rate limiting for tickets | |
| # if is_ticket_rate_limited(client_ip): | |
| # logger.warning(f"Rate limit exceeded for ticket submission from IP: {client_ip}") | |
| # raise HTTPException( | |
| # status_code=status.HTTP_429_TOO_MANY_REQUESTS, | |
| # detail="Too many ticket submissions. Please try again later." | |
| # ) | |
| # # Get admin email from environment variables or use a default | |
| # admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") | |
| # # Use a verified sender email (you need to verify this in your Resend account) | |
| # # For testing purposes, you can use your Resend account's verified domain | |
| # sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev") | |
| # # Prepare the email using Resend | |
| # params = { | |
| # "from": sender_email, | |
| # "to": [admin_email], | |
| # "subject": f"Support Ticket from {ticket_request.name}", | |
| # "html": f""" | |
| # <p>Hello Admin,</p> | |
| # <p>A new support ticket has been submitted:</p> | |
| # <p><strong>Name:</strong> {ticket_request.name}</p> | |
| # <p><strong>Email:</strong> {ticket_request.email}</p> | |
| # <p><strong>Message:</strong></p> | |
| # <p>{ticket_request.message}</p> | |
| # <p><strong>IP Address:</strong> {client_ip}</p> | |
| # <br> | |
| # <p>Best regards,<br>Launchlabs Support Team</p> | |
| # """ | |
| # } | |
| # # Send the email | |
| # email = resend.Emails.send(params) | |
| # logger.info(f"Ticket submitted successfully by {ticket_request.name} from IP: {client_ip}") | |
| # return TicketResponse( | |
| # success=True, | |
| # message="Ticket submitted successfully. We'll get back to you soon." | |
| # ) | |
| # except HTTPException: | |
| # raise | |
| # except Exception as e: | |
| # logger.error(f"Error submitting ticket: {e}", exc_info=True) | |
| # raise HTTPException( | |
| # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| # detail="Failed to submit ticket. Please try again later." | |
| # ) | |
| # @app.post("/schedule-meeting", response_model=MeetingResponse) | |
| # @limiter.limit("3/hour") # Limit to 3 meetings per hour per IP | |
| # async def schedule_meeting(request: Request, meeting_request: MeetingRequest): | |
| # """ | |
| # Schedule a meeting and send email invitations using Resend API. | |
| # Accepts meeting details and sends professional email invitations to organizer and attendees. | |
| # """ | |
| # try: | |
| # client_ip = get_remote_address(request) | |
| # logger.info(f"Meeting scheduling request from {meeting_request.name} ({meeting_request.email}) - IP: {client_ip}") | |
| # # Additional rate limiting for meetings | |
| # if is_meeting_rate_limited(client_ip): | |
| # logger.warning(f"Rate limit exceeded for meeting scheduling from IP: {client_ip}") | |
| # raise HTTPException( | |
| # status_code=status.HTTP_429_TOO_MANY_REQUESTS, | |
| # detail="Too many meeting requests. Please try again later." | |
| # ) | |
| # # Generate a unique meeting ID | |
| # meeting_id = f"mtg_{int(time.time())}" | |
| # # Get admin email from environment variables or use a default | |
| # admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") | |
| # # Use a verified sender email (you need to verify this in your Resend account) | |
| # sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev") | |
| # # For Resend testing limitations, we can only send to the owner's email | |
| # # In production, you would verify a domain and use that instead | |
| # owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") | |
| # # Format date and time for display | |
| # formatted_datetime = f"{meeting_request.date} at {meeting_request.time} {meeting_request.timezone}" | |
| # # Create calendar link (Google Calendar link example) | |
| # calendar_link = f"https://calendar.google.com/calendar/render?action=TEMPLATE&text={meeting_request.topic}&dates={meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z/{meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z&details={meeting_request.description or 'Meeting scheduled via Launchlabs'}&location={meeting_request.location}" | |
| # # Combine all attendees (organizer + additional attendees) | |
| # # Validate and format email addresses | |
| # all_attendees = [meeting_request.email] | |
| # # Validate additional attendees - they must be valid email addresses | |
| # for attendee in meeting_request.attendees: | |
| # # Simple email validation | |
| # if "@" in attendee and "." in attendee: | |
| # all_attendees.append(attendee) | |
| # else: | |
| # # If not a valid email, skip or treat as name only | |
| # logger.warning(f"Invalid email format for attendee: {attendee}. Skipping.") | |
| # # Remove duplicates while preserving order | |
| # seen = set() | |
| # unique_attendees = [] | |
| # for email in all_attendees: | |
| # if email not in seen: | |
| # seen.add(email) | |
| # unique_attendees.append(email) | |
| # all_attendees = unique_attendees | |
| # # Prepare the professional HTML email template | |
| # html_template = f""" | |
| # <!DOCTYPE html> | |
| # <html> | |
| # <head> | |
| # <meta charset="UTF-8"> | |
| # <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| # <title>Meeting Scheduled - {meeting_request.topic}</title> | |
| # </head> | |
| # <body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333; max-width: 600px; margin: 0 auto; padding: 20px;"> | |
| # <div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 30px; text-align: center; border-radius: 10px 10px 0 0;"> | |
| # <h1 style="margin: 0; font-size: 28px;">Meeting Confirmed!</h1> | |
| # <p style="font-size: 18px; margin-top: 10px;">Your meeting has been successfully scheduled</p> | |
| # </div> | |
| # <div style="background-color: #ffffff; padding: 30px; border: 1px solid #eaeaea; border-top: none; border-radius: 0 0 10px 10px;"> | |
| # <h2 style="color: #333;">Meeting Details</h2> | |
| # <div style="background-color: #f8f9fa; padding: 20px; border-radius: 8px; margin: 20px 0;"> | |
| # <table style="width: 100%; border-collapse: collapse;"> | |
| # <tr> | |
| # <td style="padding: 8px 0; font-weight: bold; width: 30%;">Topic:</td> | |
| # <td style="padding: 8px 0;">{meeting_request.topic}</td> | |
| # </tr> | |
| # <tr style="background-color: #f0f0f0;"> | |
| # <td style="padding: 8px 0; font-weight: bold;">Date & Time:</td> | |
| # <td style="padding: 8px 0;">{formatted_datetime}</td> | |
| # </tr> | |
| # <tr> | |
| # <td style="padding: 8px 0; font-weight: bold;">Duration:</td> | |
| # <td style="padding: 8px 0;">{meeting_request.duration} minutes</td> | |
| # </tr> | |
| # <tr style="background-color: #f0f0f0;"> | |
| # <td style="padding: 8px 0; font-weight: bold;">Location:</td> | |
| # <td style="padding: 8px 0;">{meeting_request.location}</td> | |
| # </tr> | |
| # <tr> | |
| # <td style="padding: 8px 0; font-weight: bold;">Organizer:</td> | |
| # <td style="padding: 8px 0;">{meeting_request.name} ({meeting_request.email})</td> | |
| # </tr> | |
| # </table> | |
| # </div> | |
| # <div style="margin: 25px 0;"> | |
| # <h3 style="color: #333;">Description</h3> | |
| # <p style="background-color: #f8f9fa; padding: 15px; border-radius: 8px; white-space: pre-wrap;">{meeting_request.description or 'No description provided.'}</p> | |
| # </div> | |
| # <div style="margin: 25px 0;"> | |
| # <h3 style="color: #333;">Attendees</h3> | |
| # <ul style="background-color: #f8f9fa; padding: 15px; border-radius: 8px;"> | |
| # {''.join([f'<li>{attendee}</li>' for attendee in all_attendees])} | |
| # </ul> | |
| # <p style="font-size: 12px; color: #666; margin-top: 5px;">Note: Only valid email addresses will receive invitations.</p> | |
| # </div> | |
| # <div style="text-align: center; margin: 30px 0;"> | |
| # <a href="{calendar_link}" style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 12px 25px; text-decoration: none; border-radius: 5px; font-weight: bold; display: inline-block;">Add to Calendar</a> | |
| # </div> | |
| # <div style="background-color: #e3f2fd; padding: 15px; border-radius: 8px; margin-top: 25px;"> | |
| # <p style="margin: 0;"><strong>Meeting ID:</strong> {meeting_id}</p> | |
| # <p style="margin: 10px 0 0 0; font-size: 14px; color: #666;">Need to make changes? Contact the organizer or reply to this email.</p> | |
| # </div> | |
| # </div> | |
| # <div style="text-align: center; margin-top: 30px; color: #888; font-size: 14px;"> | |
| # <p>This meeting was scheduled through Launchlabs Chatbot Services</p> | |
| # <p><strong>Note:</strong> Due to Resend testing limitations, this email is only sent to the administrator. In production, after domain verification, invitations will be sent to all attendees.</p> | |
| # <p>© 2025 Launchlabs. All rights reserved.</p> | |
| # </div> | |
| # </body> | |
| # </html> | |
| # """ | |
| # # Send email to all attendees | |
| # # Check if we have valid attendees to send to | |
| # if not all_attendees: | |
| # logger.warning("No valid email addresses found for meeting attendees") | |
| # return MeetingResponse( | |
| # success=True, | |
| # message="Meeting scheduled successfully, but no valid email addresses found for invitations.", | |
| # meeting_id=meeting_id | |
| # ) | |
| # # For Resend testing limitations, we can only send to the owner's email | |
| # # In production, you would verify a domain and send to all attendees | |
| # owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") | |
| # # Prepare email for owner with all attendee information | |
| # attendee_list_html = ''.join([f'<li>{attendee}</li>' for attendee in all_attendees]) | |
| # # In a real implementation, you would send to all attendees after verifying your domain | |
| # # For now, we're sending to the owner with information about all attendees | |
| # params = { | |
| # "from": sender_email, | |
| # "to": [owner_email], # Only send to owner due to Resend testing limitations | |
| # "subject": f"Meeting Scheduled: {meeting_request.topic}", | |
| # "html": html_template | |
| # } | |
| # # Send the email | |
| # try: | |
| # email = resend.Emails.send(params) | |
| # logger.info(f"Email sent successfully to {len(all_attendees)} attendees") | |
| # except Exception as email_error: | |
| # logger.error(f"Failed to send email: {email_error}", exc_info=True) | |
| # # Even if email fails, we still consider the meeting scheduled | |
| # return MeetingResponse( | |
| # success=True, | |
| # message="Meeting scheduled successfully, but failed to send email invitations.", | |
| # meeting_id=meeting_id | |
| # ) | |
| # logger.info(f"Meeting scheduled successfully by {meeting_request.name} from IP: {client_ip}") | |
| # return MeetingResponse( | |
| # success=True, | |
| # message="Meeting scheduled successfully. Due to Resend testing limitations, invitations are only sent to the administrator. In production, after verifying your domain, invitations will be sent to all attendees.", | |
| # meeting_id=meeting_id | |
| # ) | |
| # except HTTPException: | |
| # raise | |
| # except Exception as e: | |
| # logger.error(f"Error scheduling meeting: {e}", exc_info=True) | |
| # raise HTTPException( | |
| # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| # detail="Failed to schedule meeting. Please try again later." | |
| # ) | |
| # @app.exception_handler(Exception) | |
| # async def global_exception_handler(request: Request, exc: Exception): | |
| # logger.error( | |
| # f"Unhandled exception: {exc}", | |
| # exc_info=True, | |
| # extra={ | |
| # "path": request.url.path, | |
| # "method": request.method, | |
| # "client": get_remote_address(request) | |
| # } | |
| # ) | |
| # return JSONResponse( | |
| # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| # content={ | |
| # "error": "Internal server error", | |
| # "detail": "An unexpected error occurred. Please try again later." | |
| # } | |
| # ) | |
| # if __name__ == "__main__": | |
| # import uvicorn | |
| # uvicorn.run(app, host="0.0.0.0", port=8000) | |
| """ | |
| FastAPI application for Launchlabs Chatbot API | |
| Provides /chat and /chat-stream endpoints with rate limiting, CORS, and error handling | |
| Updated with language context support and FIXED spacing issue in streaming | |
| """ | |
| import os | |
| import logging | |
| import time | |
| from typing import Optional | |
| from collections import defaultdict | |
| import resend | |
| from fastapi import FastAPI, Request, HTTPException, status, Depends, Header | |
| from fastapi.responses import StreamingResponse, JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from pydantic import BaseModel | |
| from slowapi import Limiter, _rate_limit_exceeded_handler | |
| from slowapi.util import get_remote_address | |
| from slowapi.errors import RateLimitExceeded | |
| from slowapi.middleware import SlowAPIMiddleware | |
| from dotenv import load_dotenv | |
| from agents import Runner, RunContextWrapper | |
| from agents.exceptions import InputGuardrailTripwireTriggered | |
| from openai.types.responses import ResponseTextDeltaEvent | |
| from chatbot.chatbot_agent import launchlabs_assistant | |
| from sessions.session_manager import session_manager | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure Resend | |
| resend.api_key = os.getenv("RESEND_API_KEY") | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('app.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Initialize rate limiter with enhanced security | |
| limiter = Limiter(key_func=get_remote_address, default_limits=["100/day", "20/hour", "3/minute"]) | |
| # Create FastAPI app | |
| app = FastAPI( | |
| title="Launchlabs Chatbot API", | |
| description="AI-powered chatbot API for Launchlabs services", | |
| version="1.0.0" | |
| ) | |
| # Add rate limiter middleware | |
| app.state.limiter = limiter | |
| app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) | |
| app.add_middleware(SlowAPIMiddleware) | |
| # Configure CORS from environment variable | |
| allowed_origins = os.getenv("ALLOWED_ORIGINS", "").split(",") | |
| allowed_origins = [origin.strip() for origin in allowed_origins if origin.strip()] | |
| if allowed_origins: | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"] + allowed_origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| logger.info(f"CORS enabled for origins: {allowed_origins}") | |
| else: | |
| logger.warning("No ALLOWED_ORIGINS set in .env - CORS disabled") | |
| # Security setup | |
| security = HTTPBearer() | |
| # Enhanced rate limiting dictionaries | |
| request_counts = defaultdict(list) # Track requests per IP | |
| TICKET_RATE_LIMIT = 5 # Max 5 tickets per hour per IP | |
| TICKET_TIME_WINDOW = 3600 # 1 hour in seconds | |
| MEETING_RATE_LIMIT = 3 # Max 3 meetings per hour per IP | |
| MEETING_TIME_WINDOW = 3600 # 1 hour in seconds | |
| # Request/Response models | |
| class ChatRequest(BaseModel): | |
| message: str | |
| language: Optional[str] = "english" # Default to English if not specified | |
| session_id: Optional[str] = None # Session ID for chat history | |
| class ChatResponse(BaseModel): | |
| response: str | |
| success: bool | |
| session_id: str # Include session ID in response | |
| class ErrorResponse(BaseModel): | |
| error: str | |
| detail: Optional[str] = None | |
| class TicketRequest(BaseModel): | |
| name: str | |
| email: str | |
| message: str | |
| class TicketResponse(BaseModel): | |
| success: bool | |
| message: str | |
| class MeetingRequest(BaseModel): | |
| name: str | |
| email: str | |
| date: str # ISO format date string | |
| time: str # Time in HH:MM format | |
| timezone: str # Timezone identifier | |
| duration: int # Duration in minutes | |
| topic: str # Meeting topic/title | |
| attendees: list[str] # List of attendee emails | |
| description: Optional[str] = None # Optional meeting description | |
| location: Optional[str] = "Google Meet" # Meeting location/platform | |
| class MeetingResponse(BaseModel): | |
| success: bool | |
| message: str | |
| meeting_id: Optional[str] = None # Unique identifier for the meeting | |
| # Security dependency for API key validation | |
| async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
| """Verify API key for protected endpoints""" | |
| # In production, you would check against a database of valid keys | |
| # For now, we'll use an environment variable | |
| expected_key = os.getenv("API_KEY") | |
| if not expected_key or credentials.credentials != expected_key: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid or missing API key", | |
| ) | |
| return credentials.credentials | |
| def is_ticket_rate_limited(ip_address: str) -> bool: | |
| """Check if an IP address has exceeded ticket submission rate limits""" | |
| current_time = time.time() | |
| # Clean old requests outside the time window | |
| request_counts[ip_address] = [ | |
| req_time for req_time in request_counts[ip_address] | |
| if current_time - req_time < TICKET_TIME_WINDOW | |
| ] | |
| # Check if limit exceeded | |
| if len(request_counts[ip_address]) >= TICKET_RATE_LIMIT: | |
| return True | |
| # Add current request | |
| request_counts[ip_address].append(current_time) | |
| return False | |
| def is_meeting_rate_limited(ip_address: str) -> bool: | |
| """Check if an IP address has exceeded meeting scheduling rate limits""" | |
| current_time = time.time() | |
| # Clean old requests outside the time window | |
| request_counts[ip_address] = [ | |
| req_time for req_time in request_counts[ip_address] | |
| if current_time - req_time < MEETING_TIME_WINDOW | |
| ] | |
| # Check if limit exceeded | |
| if len(request_counts[ip_address]) >= MEETING_RATE_LIMIT: | |
| return True | |
| # Add current request | |
| request_counts[ip_address].append(current_time) | |
| return False | |
| # def query_launchlabs_bot_stream(user_message: str, language: str = "english", session_id: Optional[str] = None): | |
| # """ | |
| # Query the Launchlabs bot with streaming - returns async generator. | |
| # Now includes language context and session history. | |
| # FIXED: Proper spacing between words in streaming responses. | |
| # Implements fallback to non-streaming when streaming fails (e.g., with Gemini models). | |
| # """ | |
| # logger.info(f"AGENT STREAM CALL: query_launchlabs_bot_stream called with message='{user_message}', language='{language}', session_id='{session_id}'") | |
| # # Get session history if session_id is provided | |
| # history = [] | |
| # if session_id: | |
| # history = session_manager.get_session_history(session_id) | |
| # logger.info(f"Retrieved {len(history)} history messages for session {session_id}") | |
| # try: | |
| # # Create context with language preference and history | |
| # context_data = {"language": language} | |
| # if history: | |
| # context_data["history"] = history | |
| # ctx = RunContextWrapper(context=context_data) | |
| # result = Runner.run_streamed( | |
| # launchlabs_assistant, | |
| # input=user_message, | |
| # context=ctx.context | |
| # ) | |
| # async def generate_stream(): | |
| # try: | |
| # accumulated_text = "" # FIXED: Track full response for proper spacing | |
| # has_streamed = False | |
| # try: | |
| # # Attempt streaming with error handling for each event | |
| # async for event in result.stream_events(): | |
| # try: | |
| # if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | |
| # delta = event.data.delta or "" | |
| # # ---- Spacing Fix (CORRECTED) ---- | |
| # # Check against accumulated text, not just previous chunk | |
| # if ( | |
| # accumulated_text # Only add space if we have previous text | |
| # and not accumulated_text.endswith((" ", "\n", "\t")) # Previous doesn't end with whitespace | |
| # and not delta.startswith((" ", ".", ",", "?", "!", ":", ";", "\n", "\t", ")", "]", "}", "'", '"')) # Current doesn't start with punctuation/whitespace | |
| # and delta # Make sure delta isn't empty | |
| # ): | |
| # delta = " " + delta | |
| # accumulated_text += delta # Update accumulated text | |
| # # ---- End Fix ---- | |
| # yield f"data: {delta}\n\n" | |
| # has_streamed = True | |
| # except Exception as event_error: | |
| # # Handle individual event errors (e.g., missing logprobs field) | |
| # logger.warning(f"Event processing error: {event_error}") | |
| # continue | |
| # # Add complete response to session history | |
| # if accumulated_text and session_id: | |
| # session_manager.add_message_to_history(session_id, "assistant", accumulated_text) | |
| # logger.info(f"Added assistant response to session history: {session_id}") | |
| # yield "data: [DONE]\n\n" | |
| # logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream completed successfully") | |
| # except Exception as stream_error: | |
| # # Fallback to non-streaming if streaming fails | |
| # logger.warning(f"Streaming failed, falling back to non-streaming: {stream_error}") | |
| # if not has_streamed: | |
| # # Get final output using the streaming result's final_output property | |
| # try: | |
| # # Use the non-streaming API as fallback | |
| # fallback_response = await Runner.run( | |
| # launchlabs_assistant, | |
| # input=user_message, | |
| # context=ctx.context | |
| # ) | |
| # if hasattr(fallback_response, 'final_output'): | |
| # final_output = fallback_response.final_output | |
| # else: | |
| # final_output = fallback_response | |
| # if hasattr(final_output, 'content'): | |
| # response_text = final_output.content | |
| # elif isinstance(final_output, str): | |
| # response_text = final_output | |
| # else: | |
| # response_text = str(final_output) | |
| # # Add to session history | |
| # if session_id: | |
| # session_manager.add_message_to_history(session_id, "assistant", response_text) | |
| # logger.info(f"Added fallback assistant response to session history: {session_id}") | |
| # yield f"data: {response_text}\n\n" | |
| # yield "data: [DONE]\n\n" | |
| # logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream fallback completed successfully") | |
| # except Exception as fallback_error: | |
| # logger.error(f"Fallback also failed: {fallback_error}", exc_info=True) | |
| # yield f"data: [ERROR] Unable to complete request.\n\n" | |
| # else: | |
| # # Already streamed some content, just end gracefully | |
| # yield "data: [DONE]\n\n" | |
| # except InputGuardrailTripwireTriggered as e: | |
| # logger.warning(f"Guardrail blocked query during streaming: {e}") | |
| # yield f"data: [ERROR] Query was blocked by content guardrail.\n\n" | |
| # except Exception as e: | |
| # logger.error(f"Streaming error: {e}", exc_info=True) | |
| # yield f"data: [ERROR] {str(e)}\n\n" | |
| # return generate_stream() | |
| # except Exception as e: | |
| # logger.error(f"Error setting up stream: {e}", exc_info=True) | |
| # async def error_stream(): | |
| # yield f"data: [ERROR] Failed to initialize stream.\n\n" | |
| # return error_stream() | |
| # def query_launchlabs_bot_stream(user_message: str, language: str = "english", session_id: Optional[str] = None): | |
| # """ | |
| # Query the Launchlabs bot with streaming - returns async generator. | |
| # COMPLETELY FIXED: Simple and reliable spacing logic | |
| # """ | |
| # logger.info(f"AGENT STREAM CALL: query_launchlabs_bot_stream called with message='{user_message}', language='{language}', session_id='{session_id}'") | |
| # # Get session history if session_id is provided | |
| # history = [] | |
| # if session_id: | |
| # history = session_manager.get_session_history(session_id) | |
| # logger.info(f"Retrieved {len(history)} history messages for session {session_id}") | |
| # try: | |
| # # Create context with language preference and history | |
| # context_data = {"language": language} | |
| # if history: | |
| # context_data["history"] = history | |
| # ctx = RunContextWrapper(context=context_data) | |
| # result = Runner.run_streamed( | |
| # launchlabs_assistant, | |
| # input=user_message, | |
| # context=ctx.context | |
| # ) | |
| # async def generate_stream(): | |
| # try: | |
| # accumulated_text = "" | |
| # has_streamed = False | |
| # try: | |
| # async for event in result.stream_events(): | |
| # try: | |
| # if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | |
| # delta = event.data.delta or "" | |
| # if not delta: | |
| # continue | |
| # # COMPLETELY FIXED APPROACH: Just send the delta as-is from OpenAI | |
| # # OpenAI already includes proper spaces, so we don't need to add them | |
| # accumulated_text += delta | |
| # # Send delta exactly as received | |
| # yield f"data: {delta}\n\n" | |
| # has_streamed = True | |
| # except Exception as event_error: | |
| # logger.warning(f"Event processing error: {event_error}") | |
| # continue | |
| # # Add complete response to session history | |
| # if accumulated_text and session_id: | |
| # session_manager.add_message_to_history(session_id, "assistant", accumulated_text) | |
| # logger.info(f"Added assistant response to session history: {session_id}") | |
| # yield "data: [DONE]\n\n" | |
| # logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream completed successfully") | |
| # except Exception as stream_error: | |
| # logger.warning(f"Streaming failed, falling back to non-streaming: {stream_error}") | |
| # if not has_streamed: | |
| # try: | |
| # fallback_response = await Runner.run( | |
| # launchlabs_assistant, | |
| # input=user_message, | |
| # context=ctx.context | |
| # ) | |
| # if hasattr(fallback_response, 'final_output'): | |
| # final_output = fallback_response.final_output | |
| # else: | |
| # final_output = fallback_response | |
| # if hasattr(final_output, 'content'): | |
| # response_text = final_output.content | |
| # elif isinstance(final_output, str): | |
| # response_text = final_output | |
| # else: | |
| # response_text = str(final_output) | |
| # if session_id: | |
| # session_manager.add_message_to_history(session_id, "assistant", response_text) | |
| # logger.info(f"Added fallback assistant response to session history: {session_id}") | |
| # yield f"data: {response_text}\n\n" | |
| # yield "data: [DONE]\n\n" | |
| # logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream fallback completed successfully") | |
| # except Exception as fallback_error: | |
| # logger.error(f"Fallback also failed: {fallback_error}", exc_info=True) | |
| # yield f"data: [ERROR] Unable to complete request.\n\n" | |
| # else: | |
| # yield "data: [DONE]\n\n" | |
| # except InputGuardrailTripwireTriggered as e: | |
| # logger.warning(f"Guardrail blocked query during streaming: {e}") | |
| # yield f"data: [ERROR] Query was blocked by content guardrail.\n\n" | |
| # except Exception as e: | |
| # logger.error(f"Streaming error: {e}", exc_info=True) | |
| # yield f"data: [ERROR] {str(e)}\n\n" | |
| # return generate_stream() | |
| # except Exception as e: | |
| # logger.error(f"Error setting up stream: {e}", exc_info=True) | |
| # async def error_stream(): | |
| # yield f"data: [ERROR] Failed to initialize stream.\n\n" | |
| # return error_stream() | |
| def query_launchlabs_bot_stream(user_message: str, language: str = "english", session_id: Optional[str] = None): | |
| """ | |
| Query the Launchlabs bot with streaming - FIXED VERSION | |
| Simply passes through what OpenAI sends without any modification | |
| """ | |
| logger.info(f"AGENT STREAM CALL: query_launchlabs_bot_stream called with message='{user_message}', language='{language}', session_id='{session_id}'") | |
| # Get session history if session_id is provided | |
| history = [] | |
| if session_id: | |
| history = session_manager.get_session_history(session_id) | |
| logger.info(f"Retrieved {len(history)} history messages for session {session_id}") | |
| try: | |
| # Create context with language preference and history | |
| context_data = {"language": language} | |
| if history: | |
| context_data["history"] = history | |
| ctx = RunContextWrapper(context=context_data) | |
| result = Runner.run_streamed( | |
| launchlabs_assistant, | |
| input=user_message, | |
| context=ctx.context | |
| ) | |
| async def generate_stream(): | |
| try: | |
| accumulated_text = "" | |
| has_streamed = False | |
| try: | |
| async for event in result.stream_events(): | |
| try: | |
| if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | |
| delta = event.data.delta | |
| if delta: # Only process if delta has content | |
| # CRITICAL: Send delta exactly as received - NO MODIFICATIONS | |
| accumulated_text += delta | |
| yield f"data: {delta}\n\n" | |
| has_streamed = True | |
| except Exception as event_error: | |
| logger.warning(f"Event processing error: {event_error}") | |
| continue | |
| # Add complete response to session history | |
| if accumulated_text and session_id: | |
| session_manager.add_message_to_history(session_id, "assistant", accumulated_text) | |
| logger.info(f"Added assistant response to session history: {session_id}") | |
| yield "data: [DONE]\n\n" | |
| logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream completed successfully") | |
| except Exception as stream_error: | |
| logger.warning(f"Streaming failed, falling back to non-streaming: {stream_error}") | |
| if not has_streamed: | |
| try: | |
| fallback_response = await Runner.run( | |
| launchlabs_assistant, | |
| input=user_message, | |
| context=ctx.context | |
| ) | |
| if hasattr(fallback_response, 'final_output'): | |
| final_output = fallback_response.final_output | |
| else: | |
| final_output = fallback_response | |
| if hasattr(final_output, 'content'): | |
| response_text = final_output.content | |
| elif isinstance(final_output, str): | |
| response_text = final_output | |
| else: | |
| response_text = str(final_output) | |
| if session_id: | |
| session_manager.add_message_to_history(session_id, "assistant", response_text) | |
| logger.info(f"Added fallback assistant response to session history: {session_id}") | |
| yield f"data: {response_text}\n\n" | |
| yield "data: [DONE]\n\n" | |
| logger.info("AGENT STREAM RESULT: query_launchlabs_bot_stream fallback completed successfully") | |
| except Exception as fallback_error: | |
| logger.error(f"Fallback also failed: {fallback_error}", exc_info=True) | |
| yield f"data: [ERROR] Unable to complete request.\n\n" | |
| else: | |
| yield "data: [DONE]\n\n" | |
| except InputGuardrailTripwireTriggered as e: | |
| logger.warning(f"Guardrail blocked query during streaming: {e}") | |
| yield f"data: [ERROR] Query was blocked by content guardrail.\n\n" | |
| except Exception as e: | |
| logger.error(f"Streaming error: {e}", exc_info=True) | |
| yield f"data: [ERROR] {str(e)}\n\n" | |
| return generate_stream() | |
| except Exception as e: | |
| logger.error(f"Error setting up stream: {e}", exc_info=True) | |
| async def error_stream(): | |
| yield f"data: [ERROR] Failed to initialize stream.\n\n" | |
| return error_stream() | |
| async def query_launchlabs_bot(user_message: str, language: str = "english", session_id: Optional[str] = None): | |
| """ | |
| Query the Launchlabs bot - returns complete response. | |
| Now includes language context and session history. | |
| """ | |
| logger.info(f"AGENT CALL: query_launchlabs_bot called with message='{user_message}', language='{language}', session_id='{session_id}'") | |
| # Get session history if session_id is provided | |
| history = [] | |
| if session_id: | |
| history = session_manager.get_session_history(session_id) | |
| logger.info(f"Retrieved {len(history)} history messages for session {session_id}") | |
| try: | |
| # Create context with language preference and history | |
| context_data = {"language": language} | |
| if history: | |
| context_data["history"] = history | |
| ctx = RunContextWrapper(context=context_data) | |
| response = await Runner.run( | |
| launchlabs_assistant, | |
| input=user_message, | |
| context=ctx.context | |
| ) | |
| logger.info("AGENT RESULT: query_launchlabs_bot completed successfully") | |
| return response.final_output | |
| except InputGuardrailTripwireTriggered as e: | |
| logger.warning(f"Guardrail blocked query: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Query was blocked by content guardrail. Please ensure your query is related to Launchlabs services." | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in query_launchlabs_bot: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="An internal error occurred while processing your request." | |
| ) | |
| async def root(): | |
| return {"status": "ok", "service": "Launchlabs Chatbot API"} | |
| async def health(): | |
| return {"status": "healthy"} | |
| async def create_session(): | |
| """ | |
| Create a new chat session | |
| Returns a session ID that can be used to maintain chat history | |
| """ | |
| try: | |
| session_id = session_manager.create_session() | |
| logger.info(f"Created new session: {session_id}") | |
| return {"session_id": session_id, "message": "Session created successfully"} | |
| except Exception as e: | |
| logger.error(f"Error creating session: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to create session" | |
| ) | |
| # Limit to 10 requests per minute per IP | |
| async def chat(request: Request, chat_request: ChatRequest): | |
| """ | |
| Standard chat endpoint with language support and session history. | |
| Accepts: {"message": "...", "language": "norwegian", "session_id": "optional-session-id"} | |
| """ | |
| try: | |
| # Create or use existing session | |
| session_id = chat_request.session_id | |
| if not session_id: | |
| session_id = session_manager.create_session() | |
| logger.info(f"Created new session for chat: {session_id}") | |
| logger.info( | |
| f"Chat request from {get_remote_address(request)}: " | |
| f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}" | |
| ) | |
| # Add user message to session history | |
| session_manager.add_message_to_history(session_id, "user", chat_request.message) | |
| # Pass language and session to the bot | |
| response = await query_launchlabs_bot( | |
| chat_request.message, | |
| language=chat_request.language, | |
| session_id=session_id | |
| ) | |
| if hasattr(response, 'content'): | |
| response_text = response.content | |
| elif isinstance(response, str): | |
| response_text = response | |
| else: | |
| response_text = str(response) | |
| # Add bot response to session history | |
| session_manager.add_message_to_history(session_id, "assistant", response_text) | |
| logger.info(f"Chat response generated successfully in {chat_request.language} for session {session_id}") | |
| return ChatResponse( | |
| response=response_text, | |
| success=True, | |
| session_id=session_id | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected error in /chat: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="An internal error occurred while processing your request." | |
| ) | |
| # Same rate limit as /chat | |
| async def api_messages(request: Request, chat_request: ChatRequest): | |
| """ | |
| Frontend-friendly chat endpoint at /api/messages. | |
| Exactly mirrors /chat logic for session/history support. | |
| Expects: {"message": "...", "language": "english", "session_id": "optional"} | |
| """ | |
| client_ip = get_remote_address(request) | |
| logger.info(f"API Messages request from {client_ip}: message='{chat_request.message[:50]}...', lang='{chat_request.language}', session='{chat_request.session_id}'") | |
| try: | |
| # Create/use session (Firestore-backed) | |
| session_id = chat_request.session_id | |
| if not session_id: | |
| session_id = session_manager.create_session() | |
| logger.info(f"New session created for /api/messages: {session_id}") | |
| # Save user message to history | |
| session_manager.add_message_to_history(session_id, "user", chat_request.message) | |
| # Call your existing bot query function | |
| response = await query_launchlabs_bot( | |
| user_message=chat_request.message, | |
| language=chat_request.language, | |
| session_id=session_id | |
| ) | |
| # Extract response text | |
| response_text = ( | |
| response.content if hasattr(response, 'content') | |
| else response if isinstance(response, str) | |
| else str(response) | |
| ) | |
| # Save AI response to history | |
| session_manager.add_message_to_history(session_id, "assistant", response_text) | |
| logger.info(f"API Messages success: Response sent for session {session_id}") | |
| return ChatResponse( | |
| response=response_text, | |
| success=True, | |
| session_id=session_id | |
| ) | |
| except InputGuardrailTripwireTriggered as e: | |
| logger.warning(f"Guardrail blocked /api/messages: {e}") | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Query blocked – please ask about Launchlabs services." | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in /api/messages: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=500, | |
| detail="Internal error – try again." | |
| ) | |
| # Limit to 10 requests per minute per IP | |
| async def chat_stream(request: Request, chat_request: ChatRequest): | |
| """ | |
| Streaming chat endpoint with language support and session history. | |
| Accepts: {"message": "...", "language": "norwegian", "session_id": "optional-session-id"} | |
| """ | |
| try: | |
| # Create or use existing session | |
| session_id = chat_request.session_id | |
| if not session_id: | |
| session_id = session_manager.create_session() | |
| logger.info(f"Created new session for streaming chat: {session_id}") | |
| logger.info( | |
| f"Stream request from {get_remote_address(request)}: " | |
| f"language={chat_request.language}, message={chat_request.message[:50]}..., session_id={session_id}" | |
| ) | |
| # Add user message to session history | |
| session_manager.add_message_to_history(session_id, "user", chat_request.message) | |
| # Pass language and session to the streaming bot | |
| stream_generator = query_launchlabs_bot_stream( | |
| chat_request.message, | |
| language=chat_request.language, | |
| session_id=session_id | |
| ) | |
| # Note: Response is added to history inside the stream generator after completion | |
| return StreamingResponse( | |
| stream_generator, | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| "Session-ID": session_id # Include session ID in headers | |
| } | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected error in /chat-stream: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="An internal error occurred while processing your request." | |
| ) | |
| # Limit to 5 tickets per hour per IP | |
| async def submit_ticket(request: Request, ticket_request: TicketRequest): | |
| """ | |
| Submit a support ticket via email using Resend API. | |
| Accepts: {"name": "John Doe", "email": "john@example.com", "message": "Issue description"} | |
| """ | |
| try: | |
| client_ip = get_remote_address(request) | |
| logger.info(f"Ticket submission request from {ticket_request.name} ({ticket_request.email}) - IP: {client_ip}") | |
| # Additional rate limiting for tickets | |
| if is_ticket_rate_limited(client_ip): | |
| logger.warning(f"Rate limit exceeded for ticket submission from IP: {client_ip}") | |
| raise HTTPException( | |
| status_code=status.HTTP_429_TOO_MANY_REQUESTS, | |
| detail="Too many ticket submissions. Please try again later." | |
| ) | |
| # Get admin email from environment variables or use a default | |
| admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") | |
| # Use a verified sender email (you need to verify this in your Resend account) | |
| # For testing purposes, you can use your Resend account's verified domain | |
| sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev") | |
| # Prepare the email using Resend | |
| params = { | |
| "from": sender_email, | |
| "to": [admin_email], | |
| "subject": f"Support Ticket from {ticket_request.name}", | |
| "html": f""" | |
| <p>Hello Admin,</p> | |
| <p>A new support ticket has been submitted:</p> | |
| <p><strong>Name:</strong> {ticket_request.name}</p> | |
| <p><strong>Email:</strong> {ticket_request.email}</p> | |
| <p><strong>Message:</strong></p> | |
| <p>{ticket_request.message}</p> | |
| <p><strong>IP Address:</strong> {client_ip}</p> | |
| <br> | |
| <p>Best regards,<br>Launchlabs Support Team</p> | |
| """ | |
| } | |
| # Send the email | |
| email = resend.Emails.send(params) | |
| logger.info(f"Ticket submitted successfully by {ticket_request.name} from IP: {client_ip}") | |
| return TicketResponse( | |
| success=True, | |
| message="Ticket submitted successfully. We'll get back to you soon." | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error submitting ticket: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to submit ticket. Please try again later." | |
| ) | |
| # Limit to 3 meetings per hour per IP | |
| async def schedule_meeting(request: Request, meeting_request: MeetingRequest): | |
| """ | |
| Schedule a meeting and send email invitations using Resend API. | |
| Accepts meeting details and sends professional email invitations to organizer and attendees. | |
| """ | |
| try: | |
| client_ip = get_remote_address(request) | |
| logger.info(f"Meeting scheduling request from {meeting_request.name} ({meeting_request.email}) - IP: {client_ip}") | |
| # Additional rate limiting for meetings | |
| if is_meeting_rate_limited(client_ip): | |
| logger.warning(f"Rate limit exceeded for meeting scheduling from IP: {client_ip}") | |
| raise HTTPException( | |
| status_code=status.HTTP_429_TOO_MANY_REQUESTS, | |
| detail="Too many meeting requests. Please try again later." | |
| ) | |
| # Generate a unique meeting ID | |
| meeting_id = f"mtg_{int(time.time())}" | |
| # Get admin email from environment variables or use a default | |
| admin_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") | |
| # Use a verified sender email (you need to verify this in your Resend account) | |
| sender_email = os.getenv("SENDER_EMAIL", "onboarding@resend.dev") | |
| # For Resend testing limitations, we can only send to the owner's email | |
| # In production, you would verify a domain and use that instead | |
| owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") | |
| # Format date and time for display | |
| formatted_datetime = f"{meeting_request.date} at {meeting_request.time} {meeting_request.timezone}" | |
| # Create calendar link (Google Calendar link example) | |
| calendar_link = f"https://calendar.google.com/calendar/render?action=TEMPLATE&text={meeting_request.topic}&dates={meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z/{meeting_request.date.replace('-', '')}T{meeting_request.time.replace(':', '')}00Z&details={meeting_request.description or 'Meeting scheduled via Launchlabs'}&location={meeting_request.location}" | |
| # Combine all attendees (organizer + additional attendees) | |
| # Validate and format email addresses | |
| all_attendees = [meeting_request.email] | |
| # Validate additional attendees - they must be valid email addresses | |
| for attendee in meeting_request.attendees: | |
| # Simple email validation | |
| if "@" in attendee and "." in attendee: | |
| all_attendees.append(attendee) | |
| else: | |
| # If not a valid email, skip or treat as name only | |
| logger.warning(f"Invalid email format for attendee: {attendee}. Skipping.") | |
| # Remove duplicates while preserving order | |
| seen = set() | |
| unique_attendees = [] | |
| for email in all_attendees: | |
| if email not in seen: | |
| seen.add(email) | |
| unique_attendees.append(email) | |
| all_attendees = unique_attendees | |
| # Prepare the professional HTML email template | |
| html_template = f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>Meeting Scheduled - {meeting_request.topic}</title> | |
| </head> | |
| <body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333; max-width: 600px; margin: 0 auto; padding: 20px;"> | |
| <div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 30px; text-align: center; border-radius: 10px 10px 0 0;"> | |
| <h1 style="margin: 0; font-size: 28px;">Meeting Confirmed!</h1> | |
| <p style="font-size: 18px; margin-top: 10px;">Your meeting has been successfully scheduled</p> | |
| </div> | |
| <div style="background-color: #ffffff; padding: 30px; border: 1px solid #eaeaea; border-top: none; border-radius: 0 0 10px 10px;"> | |
| <h2 style="color: #333;">Meeting Details</h2> | |
| <div style="background-color: #f8f9fa; padding: 20px; border-radius: 8px; margin: 20px 0;"> | |
| <table style="width: 100%; border-collapse: collapse;"> | |
| <tr> | |
| <td style="padding: 8px 0; font-weight: bold; width: 30%;">Topic:</td> | |
| <td style="padding: 8px 0;">{meeting_request.topic}</td> | |
| </tr> | |
| <tr style="background-color: #f0f0f0;"> | |
| <td style="padding: 8px 0; font-weight: bold;">Date & Time:</td> | |
| <td style="padding: 8px 0;">{formatted_datetime}</td> | |
| </tr> | |
| <tr> | |
| <td style="padding: 8px 0; font-weight: bold;">Duration:</td> | |
| <td style="padding: 8px 0;">{meeting_request.duration} minutes</td> | |
| </tr> | |
| <tr style="background-color: #f0f0f0;"> | |
| <td style="padding: 8px 0; font-weight: bold;">Location:</td> | |
| <td style="padding: 8px 0;">{meeting_request.location}</td> | |
| </tr> | |
| <tr> | |
| <td style="padding: 8px 0; font-weight: bold;">Organizer:</td> | |
| <td style="padding: 8px 0;">{meeting_request.name} ({meeting_request.email})</td> | |
| </tr> | |
| </table> | |
| </div> | |
| <div style="margin: 25px 0;"> | |
| <h3 style="color: #333;">Description</h3> | |
| <p style="background-color: #f8f9fa; padding: 15px; border-radius: 8px; white-space: pre-wrap;">{meeting_request.description or 'No description provided.'}</p> | |
| </div> | |
| <div style="margin: 25px 0;"> | |
| <h3 style="color: #333;">Attendees</h3> | |
| <ul style="background-color: #f8f9fa; padding: 15px; border-radius: 8px;"> | |
| {''.join([f'<li>{attendee}</li>' for attendee in all_attendees])} | |
| </ul> | |
| <p style="font-size: 12px; color: #666; margin-top: 5px;">Note: Only valid email addresses will receive invitations.</p> | |
| </div> | |
| <div style="text-align: center; margin: 30px 0;"> | |
| <a href="{calendar_link}" style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 12px 25px; text-decoration: none; border-radius: 5px; font-weight: bold; display: inline-block;">Add to Calendar</a> | |
| </div> | |
| <div style="background-color: #e3f2fd; padding: 15px; border-radius: 8px; margin-top: 25px;"> | |
| <p style="margin: 0;"><strong>Meeting ID:</strong> {meeting_id}</p> | |
| <p style="margin: 10px 0 0 0; font-size: 14px; color: #666;">Need to make changes? Contact the organizer or reply to this email.</p> | |
| </div> | |
| </div> | |
| <div style="text-align: center; margin-top: 30px; color: #888; font-size: 14px;"> | |
| <p>This meeting was scheduled through Launchlabs Chatbot Services</p> | |
| <p><strong>Note:</strong> Due to Resend testing limitations, this email is only sent to the administrator. In production, after domain verification, invitations will be sent to all attendees.</p> | |
| <p>© 2025 Launchlabs. All rights reserved.</p> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| # Send email to all attendees | |
| # Check if we have valid attendees to send to | |
| if not all_attendees: | |
| logger.warning("No valid email addresses found for meeting attendees") | |
| return MeetingResponse( | |
| success=True, | |
| message="Meeting scheduled successfully, but no valid email addresses found for invitations.", | |
| meeting_id=meeting_id | |
| ) | |
| # For Resend testing limitations, we can only send to the owner's email | |
| # In production, you would verify a domain and send to all attendees | |
| owner_email = os.getenv("ADMIN_EMAIL", "admin@yourcompany.com") | |
| # Prepare email for owner with all attendee information | |
| attendee_list_html = ''.join([f'<li>{attendee}</li>' for attendee in all_attendees]) | |
| # In a real implementation, you would send to all attendees after verifying your domain | |
| # For now, we're sending to the owner with information about all attendees | |
| params = { | |
| "from": sender_email, | |
| "to": [owner_email], # Only send to owner due to Resend testing limitations | |
| "subject": f"Meeting Scheduled: {meeting_request.topic}", | |
| "html": html_template | |
| } | |
| # Send the email | |
| try: | |
| email = resend.Emails.send(params) | |
| logger.info(f"Email sent successfully to {len(all_attendees)} attendees") | |
| except Exception as email_error: | |
| logger.error(f"Failed to send email: {email_error}", exc_info=True) | |
| # Even if email fails, we still consider the meeting scheduled | |
| return MeetingResponse( | |
| success=True, | |
| message="Meeting scheduled successfully, but failed to send email invitations.", | |
| meeting_id=meeting_id | |
| ) | |
| logger.info(f"Meeting scheduled successfully by {meeting_request.name} from IP: {client_ip}") | |
| return MeetingResponse( | |
| success=True, | |
| message="Meeting scheduled successfully. Due to Resend testing limitations, invitations are only sent to the administrator. In production, after verifying your domain, invitations will be sent to all attendees.", | |
| meeting_id=meeting_id | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error scheduling meeting: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to schedule meeting. Please try again later." | |
| ) | |
| async def global_exception_handler(request: Request, exc: Exception): | |
| logger.error( | |
| f"Unhandled exception: {exc}", | |
| exc_info=True, | |
| extra={ | |
| "path": request.url.path, | |
| "method": request.method, | |
| "client": get_remote_address(request) | |
| } | |
| ) | |
| return JSONResponse( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| content={ | |
| "error": "Internal server error", | |
| "detail": "An unexpected error occurred. Please try again later." | |
| } | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |