Spaces:
Sleeping
Sleeping
File size: 12,774 Bytes
3f538a2 cf9a5d3 3f538a2 cf9a5d3 3f538a2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 | """
WhatsApp Webhook Server for Epigen Chatbot
This server receives webhook events from WhatsApp via Green API,
processes them using Google's Gemini AI model, and sends responses
back to the user.
The server is built with Flask and runs on Uvicorn for improved performance.
"""
import os
import json
import time
from typing import Dict, List, Any, Optional
import requests
from flask import Flask, request, jsonify
from loguru import logger
from dotenv import load_dotenv
import sys
# Load environment variables from .env file (for local development)
# This has no effect in production where environment variables are set differently
load_dotenv()
# Initialize Flask application
app = Flask(__name__)
# ==================== CONFIGURATION ====================
# Get API credentials from environment variables
# These will be set as secrets in Hugging Face Spaces or other cloud environments
GREEN_API_ID = os.environ.get("GREEN_API_ID")
GREEN_API_TOKEN = os.environ.get("GREEN_API_TOKEN")
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
# Check if required environment variables are set
if not GREEN_API_ID or not GREEN_API_TOKEN:
logger.warning("WhatsApp API credentials not set. Webhook will not be able to send messages.")
if not GOOGLE_API_KEY:
logger.warning("Google API key not set. AI responses will not work.")
# Configure logging
#logger.add("webhook.log", rotation="500 MB", level="INFO", retention="10 days")
logger.add(sys.stdout, level="INFO")
# ==================== DATA STORAGE ====================
# In-memory storage for chat histories
# In a production environment, this would be replaced with a database
whatsapp_chat_histories: Dict[str, List[Dict[str, str]]] = {}
# Knowledge base content - replace with your actual content from the Streamlit app
knowledge_content = """
# Datos de Epigen
- WhatsApp: 5544918977
- Direccion: Avenida de los Insurgentes 601, 03810 Col. Nápoles, CDMX, CP:03100
- Sitio Web: https://epigen.mx/
""" # Add your full knowledge base here
# ==================== ROUTE HANDLERS ====================
@app.route('/', methods=['GET'])
def home():
"""
Home route to confirm the server is running.
This endpoint is useful for:
1. Checking if the server is alive
2. Basic health monitoring
3. Browser-based verification
Returns:
JSON response with status message
"""
return jsonify({
"status": "online",
"message": "Epigen WhatsApp webhook server is running",
"version": "1.0.0"
}), 200
@app.route('/webhook', methods=['GET', 'POST'])
def webhook():
"""
Main webhook endpoint for WhatsApp.
Handles two types of requests:
- GET: Used by Green API to verify the webhook URL
- POST: Receives incoming message notifications
Returns:
JSON response indicating success or error
"""
# Handle webhook verification (GET request)
if request.method == 'GET':
logger.info("Received webhook verification request")
return jsonify({"status": "webhook is active"}), 200
# Handle incoming webhook events (POST request)
try:
# Get the JSON data from the request
data = request.get_json()
logger.info(f"Received webhook data: {json.dumps(data)}")
# Process incoming messages
if data.get("typeWebhook") == "incomingMessageReceived":
message_data = data.get("messageData", {})
# Handle text messages
if message_data.get("typeMessage") == "textMessage":
sender = data["senderData"]["sender"].split("@")[0] # Get phone number
message_text = message_data["textMessageData"]["textMessage"]
logger.info(f"Received message from {sender}: {message_text}")
# Process the message and get a response
ai_response = process_message(sender, message_text)
# Send the response back to the user
send_whatsapp_message(sender, ai_response)
# Handle voice messages (future enhancement)
elif message_data.get("typeMessage") == "audioMessage":
sender = data["senderData"]["sender"].split("@")[0]
logger.info(f"Received audio message from {sender}")
# Currently we don't process audio, so just send a default response
send_whatsapp_message(
sender,
"Recibí tu mensaje de voz, pero actualmente solo puedo procesar mensajes de texto."
)
return jsonify({"status": "message processed"}), 200
except Exception as e:
logger.error(f"Error processing webhook: {str(e)}")
return jsonify({"status": "error", "message": str(e)}), 500
# ==================== MESSAGE PROCESSING ====================
def process_message(sender: str, message_text: str) -> str:
"""
Process a message and generate an AI response.
This function:
1. Initializes chat history for new users
2. Adds the user message to history
3. Generates an AI response
4. Adds the response to history
Args:
sender (str): The phone number of the sender
message_text (str): The content of the message
Returns:
str: The AI-generated response
"""
try:
# Initialize chat history for new users
if sender not in whatsapp_chat_histories:
whatsapp_chat_histories[sender] = [
{"role": "assistant", "content": "¡Hola! Soy el asistente de Epigen. ¿Cómo puedo ayudarte hoy? 🧬"}
]
logger.info(f"Initialized new chat history for {sender}")
# Add user message to history
whatsapp_chat_histories[sender].append({"role": "user", "content": message_text})
# Generate AI response with retry mechanism
max_retries = 3
for attempt in range(max_retries):
try:
# Generate response using AI
response = generate_ai_response(
whatsapp_chat_histories[sender],
message_text
)
# Add AI response to history
whatsapp_chat_histories[sender].append({"role": "assistant", "content": response})
logger.info(f"Generated response for {sender}: {response[:50]}...")
return response
except Exception as e:
logger.error(f"Attempt {attempt+1}/{max_retries} failed: {str(e)}")
if attempt == max_retries - 1: # Last attempt
raise
time.sleep(1) # Wait before retrying
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
return "Lo siento, tuve un problema procesando tu mensaje. Por favor intenta de nuevo."
def generate_ai_response(chat_history: List[Dict[str, str]], user_message: str) -> str:
"""
Generate a response using the Google Gemini model.
This function:
1. Configures the Gemini API
2. Formats the conversation history
3. Adds the system message with knowledge base
4. Generates and returns the response
Args:
chat_history (List[Dict[str, str]]): The conversation history
user_message (str): The latest user message
Returns:
str: The generated AI response
"""
# Import the Gemini API library
# We import here to avoid loading it unless needed
import google.generativeai as genai
# Configure the Gemini API
genai.configure(api_key=GOOGLE_API_KEY)
# Set up the model with appropriate parameters
generation_config = {
"temperature": 0.7, # Controls randomness (0.0 = deterministic, 1.0 = creative)
"top_p": 0.95, # Nucleus sampling parameter
"top_k": 0, # Limits vocabulary to top K tokens
"max_output_tokens": 1000, # Maximum length of response
}
# Safety settings to prevent harmful or inappropriate content
safety_settings = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
]
# Initialize the generative model
model = genai.GenerativeModel(
model_name="gemini-2.0-flash", # Using the more efficient model for faster responses
generation_config=generation_config,
safety_settings=safety_settings,
)
# Format the conversation history for Gemini
# Gemini uses "user" and "model" roles instead of "user" and "assistant"
formatted_history = []
for message in chat_history:
role = "user" if message["role"] == "user" else "model"
formatted_history.append({"role": role, "parts": [message["content"]]})
# Add system message with knowledge base
# This provides context about Epigen to inform the AI's responses
system_message = (
"Eres un agente conversacional de IA experto en epigenética y en los productos de Epigen. "
"Usa la siguiente información para responder preguntas sobre Epigen:\n\n" + knowledge_content
)
formatted_history.insert(0, {"role": "model", "parts": [system_message]})
# Generate response
chat = model.start_chat(history=formatted_history)
response = chat.send_message(user_message)
return response.text
# ==================== WHATSAPP INTEGRATION ====================
def send_whatsapp_message(recipient: str, message: str) -> Optional[Dict[str, Any]]:
"""
Send a message back to the user via WhatsApp.
Uses Green API to send messages to WhatsApp users.
Args:
recipient (str): The phone number to send the message to
message (str): The content of the message
Returns:
Optional[Dict[str, Any]]: The response from the Green API, or None if failed
"""
# Construct the URL for the Green API endpoint
url = f"https://api.green-api.com/waInstance{GREEN_API_ID}/sendMessage/{GREEN_API_TOKEN}"
# Prepare the payload with the recipient and message
payload = {
"chatId": f"{recipient}@c.us", # Format required by WhatsApp
"message": message
}
try:
# Send the request to Green API
response = requests.post(url, json=payload)
response_data = response.json()
# Log the result
if response.status_code == 200 and response_data.get("idMessage"):
logger.info(f"Message sent to {recipient}: {message[:50]}...")
else:
logger.error(f"Error sending message: {response_data}")
return response_data
except Exception as e:
logger.error(f"Exception when sending message: {str(e)}")
return None
# ==================== UTILITY ROUTES ====================
@app.route('/health', methods=['GET'])
def health_check():
"""
Health check endpoint for monitoring services.
Returns detailed information about the server's status,
including environment configuration and service availability.
Returns:
JSON response with health information
"""
# Check Green API connectivity
green_api_status = "configured" if GREEN_API_ID and GREEN_API_TOKEN else "not configured"
# Check Google API connectivity
google_api_status = "configured" if GOOGLE_API_KEY else "not configured"
# Return comprehensive health status
return jsonify({
"status": "healthy",
"timestamp": time.time(),
"services": {
"green_api": green_api_status,
"google_ai": google_api_status
},
"active_chats": len(whatsapp_chat_histories)
}), 200
# ==================== SERVER STARTUP ====================
# This block only runs when executing this file directly
# In production, Uvicorn will import and run the Flask app object
if __name__ == "__main__":
import uvicorn
# Get port from environment or use default
port = int(os.environ.get('PORT', 7860))
# Log the server startup
logger.info(f"Starting server on port {port}")
# Run the server using Uvicorn
# Using WSGI interface since Flask is a WSGI application
uvicorn.run("app:app", host="0.0.0.0", port=port, interface="wsgi")
|