Spaces:
Sleeping
Sleeping
| """ | |
| WhatsApp Webhook Server for Epigen Chatbot | |
| This server receives webhook events from WhatsApp via Green API, | |
| processes them using Google's Gemini AI model, and sends responses | |
| back to the user. | |
| The server is built with Flask and runs on Uvicorn for improved performance. | |
| """ | |
| import os | |
| import json | |
| import time | |
| from typing import Dict, List, Any, Optional | |
| import requests | |
| from flask import Flask, request, jsonify | |
| from loguru import logger | |
| from dotenv import load_dotenv | |
| import sys | |
| # Load environment variables from .env file (for local development) | |
| # This has no effect in production where environment variables are set differently | |
| load_dotenv() | |
| # Initialize Flask application | |
| app = Flask(__name__) | |
| # ==================== CONFIGURATION ==================== | |
| # Get API credentials from environment variables | |
| # These will be set as secrets in Hugging Face Spaces or other cloud environments | |
| GREEN_API_ID = os.environ.get("GREEN_API_ID") | |
| GREEN_API_TOKEN = os.environ.get("GREEN_API_TOKEN") | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") | |
| # Check if required environment variables are set | |
| if not GREEN_API_ID or not GREEN_API_TOKEN: | |
| logger.warning("WhatsApp API credentials not set. Webhook will not be able to send messages.") | |
| if not GOOGLE_API_KEY: | |
| logger.warning("Google API key not set. AI responses will not work.") | |
| # Configure logging | |
| #logger.add("webhook.log", rotation="500 MB", level="INFO", retention="10 days") | |
| logger.add(sys.stdout, level="INFO") | |
| # ==================== DATA STORAGE ==================== | |
| # In-memory storage for chat histories | |
| # In a production environment, this would be replaced with a database | |
| whatsapp_chat_histories: Dict[str, List[Dict[str, str]]] = {} | |
| # Knowledge base content - replace with your actual content from the Streamlit app | |
| knowledge_content = """ | |
| # Datos de Epigen | |
| - WhatsApp: 5544918977 | |
| - Direccion: Avenida de los Insurgentes 601, 03810 Col. Nápoles, CDMX, CP:03100 | |
| - Sitio Web: https://epigen.mx/ | |
| """ # Add your full knowledge base here | |
| # ==================== ROUTE HANDLERS ==================== | |
| def home(): | |
| """ | |
| Home route to confirm the server is running. | |
| This endpoint is useful for: | |
| 1. Checking if the server is alive | |
| 2. Basic health monitoring | |
| 3. Browser-based verification | |
| Returns: | |
| JSON response with status message | |
| """ | |
| return jsonify({ | |
| "status": "online", | |
| "message": "Epigen WhatsApp webhook server is running", | |
| "version": "1.0.0" | |
| }), 200 | |
| def webhook(): | |
| """ | |
| Main webhook endpoint for WhatsApp. | |
| Handles two types of requests: | |
| - GET: Used by Green API to verify the webhook URL | |
| - POST: Receives incoming message notifications | |
| Returns: | |
| JSON response indicating success or error | |
| """ | |
| # Handle webhook verification (GET request) | |
| if request.method == 'GET': | |
| logger.info("Received webhook verification request") | |
| return jsonify({"status": "webhook is active"}), 200 | |
| # Handle incoming webhook events (POST request) | |
| try: | |
| # Get the JSON data from the request | |
| data = request.get_json() | |
| logger.info(f"Received webhook data: {json.dumps(data)}") | |
| # Process incoming messages | |
| if data.get("typeWebhook") == "incomingMessageReceived": | |
| message_data = data.get("messageData", {}) | |
| # Handle text messages | |
| if message_data.get("typeMessage") == "textMessage": | |
| sender = data["senderData"]["sender"].split("@")[0] # Get phone number | |
| message_text = message_data["textMessageData"]["textMessage"] | |
| logger.info(f"Received message from {sender}: {message_text}") | |
| # Process the message and get a response | |
| ai_response = process_message(sender, message_text) | |
| # Send the response back to the user | |
| send_whatsapp_message(sender, ai_response) | |
| # Handle voice messages (future enhancement) | |
| elif message_data.get("typeMessage") == "audioMessage": | |
| sender = data["senderData"]["sender"].split("@")[0] | |
| logger.info(f"Received audio message from {sender}") | |
| # Currently we don't process audio, so just send a default response | |
| send_whatsapp_message( | |
| sender, | |
| "Recibí tu mensaje de voz, pero actualmente solo puedo procesar mensajes de texto." | |
| ) | |
| return jsonify({"status": "message processed"}), 200 | |
| except Exception as e: | |
| logger.error(f"Error processing webhook: {str(e)}") | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| # ==================== MESSAGE PROCESSING ==================== | |
| def process_message(sender: str, message_text: str) -> str: | |
| """ | |
| Process a message and generate an AI response. | |
| This function: | |
| 1. Initializes chat history for new users | |
| 2. Adds the user message to history | |
| 3. Generates an AI response | |
| 4. Adds the response to history | |
| Args: | |
| sender (str): The phone number of the sender | |
| message_text (str): The content of the message | |
| Returns: | |
| str: The AI-generated response | |
| """ | |
| try: | |
| # Initialize chat history for new users | |
| if sender not in whatsapp_chat_histories: | |
| whatsapp_chat_histories[sender] = [ | |
| {"role": "assistant", "content": "¡Hola! Soy el asistente de Epigen. ¿Cómo puedo ayudarte hoy? 🧬"} | |
| ] | |
| logger.info(f"Initialized new chat history for {sender}") | |
| # Add user message to history | |
| whatsapp_chat_histories[sender].append({"role": "user", "content": message_text}) | |
| # Generate AI response with retry mechanism | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| # Generate response using AI | |
| response = generate_ai_response( | |
| whatsapp_chat_histories[sender], | |
| message_text | |
| ) | |
| # Add AI response to history | |
| whatsapp_chat_histories[sender].append({"role": "assistant", "content": response}) | |
| logger.info(f"Generated response for {sender}: {response[:50]}...") | |
| return response | |
| except Exception as e: | |
| logger.error(f"Attempt {attempt+1}/{max_retries} failed: {str(e)}") | |
| if attempt == max_retries - 1: # Last attempt | |
| raise | |
| time.sleep(1) # Wait before retrying | |
| except Exception as e: | |
| logger.error(f"Error processing message: {str(e)}") | |
| return "Lo siento, tuve un problema procesando tu mensaje. Por favor intenta de nuevo." | |
| def generate_ai_response(chat_history: List[Dict[str, str]], user_message: str) -> str: | |
| """ | |
| Generate a response using the Google Gemini model. | |
| This function: | |
| 1. Configures the Gemini API | |
| 2. Formats the conversation history | |
| 3. Adds the system message with knowledge base | |
| 4. Generates and returns the response | |
| Args: | |
| chat_history (List[Dict[str, str]]): The conversation history | |
| user_message (str): The latest user message | |
| Returns: | |
| str: The generated AI response | |
| """ | |
| # Import the Gemini API library | |
| # We import here to avoid loading it unless needed | |
| import google.generativeai as genai | |
| # Configure the Gemini API | |
| genai.configure(api_key=GOOGLE_API_KEY) | |
| # Set up the model with appropriate parameters | |
| generation_config = { | |
| "temperature": 0.7, # Controls randomness (0.0 = deterministic, 1.0 = creative) | |
| "top_p": 0.95, # Nucleus sampling parameter | |
| "top_k": 0, # Limits vocabulary to top K tokens | |
| "max_output_tokens": 1000, # Maximum length of response | |
| } | |
| # Safety settings to prevent harmful or inappropriate content | |
| safety_settings = [ | |
| {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| ] | |
| # Initialize the generative model | |
| model = genai.GenerativeModel( | |
| model_name="gemini-2.0-flash", # Using the more efficient model for faster responses | |
| generation_config=generation_config, | |
| safety_settings=safety_settings, | |
| ) | |
| # Format the conversation history for Gemini | |
| # Gemini uses "user" and "model" roles instead of "user" and "assistant" | |
| formatted_history = [] | |
| for message in chat_history: | |
| role = "user" if message["role"] == "user" else "model" | |
| formatted_history.append({"role": role, "parts": [message["content"]]}) | |
| # Add system message with knowledge base | |
| # This provides context about Epigen to inform the AI's responses | |
| system_message = ( | |
| "Eres un agente conversacional de IA experto en epigenética y en los productos de Epigen. " | |
| "Usa la siguiente información para responder preguntas sobre Epigen:\n\n" + knowledge_content | |
| ) | |
| formatted_history.insert(0, {"role": "model", "parts": [system_message]}) | |
| # Generate response | |
| chat = model.start_chat(history=formatted_history) | |
| response = chat.send_message(user_message) | |
| return response.text | |
| # ==================== WHATSAPP INTEGRATION ==================== | |
| def send_whatsapp_message(recipient: str, message: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Send a message back to the user via WhatsApp. | |
| Uses Green API to send messages to WhatsApp users. | |
| Args: | |
| recipient (str): The phone number to send the message to | |
| message (str): The content of the message | |
| Returns: | |
| Optional[Dict[str, Any]]: The response from the Green API, or None if failed | |
| """ | |
| # Construct the URL for the Green API endpoint | |
| url = f"https://api.green-api.com/waInstance{GREEN_API_ID}/sendMessage/{GREEN_API_TOKEN}" | |
| # Prepare the payload with the recipient and message | |
| payload = { | |
| "chatId": f"{recipient}@c.us", # Format required by WhatsApp | |
| "message": message | |
| } | |
| try: | |
| # Send the request to Green API | |
| response = requests.post(url, json=payload) | |
| response_data = response.json() | |
| # Log the result | |
| if response.status_code == 200 and response_data.get("idMessage"): | |
| logger.info(f"Message sent to {recipient}: {message[:50]}...") | |
| else: | |
| logger.error(f"Error sending message: {response_data}") | |
| return response_data | |
| except Exception as e: | |
| logger.error(f"Exception when sending message: {str(e)}") | |
| return None | |
| # ==================== UTILITY ROUTES ==================== | |
| def health_check(): | |
| """ | |
| Health check endpoint for monitoring services. | |
| Returns detailed information about the server's status, | |
| including environment configuration and service availability. | |
| Returns: | |
| JSON response with health information | |
| """ | |
| # Check Green API connectivity | |
| green_api_status = "configured" if GREEN_API_ID and GREEN_API_TOKEN else "not configured" | |
| # Check Google API connectivity | |
| google_api_status = "configured" if GOOGLE_API_KEY else "not configured" | |
| # Return comprehensive health status | |
| return jsonify({ | |
| "status": "healthy", | |
| "timestamp": time.time(), | |
| "services": { | |
| "green_api": green_api_status, | |
| "google_ai": google_api_status | |
| }, | |
| "active_chats": len(whatsapp_chat_histories) | |
| }), 200 | |
| # ==================== SERVER STARTUP ==================== | |
| # This block only runs when executing this file directly | |
| # In production, Uvicorn will import and run the Flask app object | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # Get port from environment or use default | |
| port = int(os.environ.get('PORT', 7860)) | |
| # Log the server startup | |
| logger.info(f"Starting server on port {port}") | |
| # Run the server using Uvicorn | |
| # Using WSGI interface since Flask is a WSGI application | |
| uvicorn.run("app:app", host="0.0.0.0", port=port, interface="wsgi") | |