import logging from fastapi import APIRouter, Request, Response, HTTPException from twilio.request_validator import RequestValidator from twilio.twiml.messaging_response import MessagingResponse from ..config import settings from ..services.ingest import handle_incoming_message router = APIRouter() logger = logging.getLogger(__name__) def _form_data_to_payload(form_data): payload = {} for key, value in form_data.multi_items(): if key in payload: existing_value = payload[key] if isinstance(existing_value, list): existing_value.append(value) else: payload[key] = [existing_value, value] else: payload[key] = value return payload def _validate_twilio_signature(request: Request, form_data): if not settings.TWILIO_VALIDATE_SIGNATURES: return signature = request.headers.get("X-Twilio-Signature") if not signature: raise HTTPException(status_code=403, detail="Missing Twilio signature") validator = RequestValidator(settings.TWILIO_AUTH_TOKEN) payload = _form_data_to_payload(form_data) request_url = str(request.url) is_valid = validator.validate(request_url, payload, signature) if not is_valid: raise HTTPException(status_code=403, detail="Invalid Twilio signature") async def _handle_twilio_webhook(request: Request, channel: str) -> Response: form_data = await request.form() _validate_twilio_signature(request, form_data) response_message = await handle_incoming_message(form_data, channel=channel) twiml = MessagingResponse() if response_message: twiml.message(response_message) logger.info("Returning %s TwiML response for channel=%s", "non-empty" if response_message else "empty", channel) return Response(content=str(twiml), media_type="application/xml") @router.post("/twilio/whatsapp") async def twilio_whatsapp_webhook(request: Request): return await _handle_twilio_webhook(request, channel="whatsapp") @router.post("/twilio/sms") async def twilio_sms_webhook(request: Request): return await _handle_twilio_webhook(request, channel="sms")