smart-w / main.py
rairo's picture
Update main.py
62506bf verified
from flask import Flask, request, make_response, jsonify
import os
import logging
import re
from dotenv import load_dotenv
import assemblyai as aai
from utility import (
generateResponse,
parse_multiple_transactions,
process_image_and_generate_query,
persist_temporary_transaction,
process_intent,
format_transaction_response,
detect_and_translate_input,
translate_output
)
import whatsapp_client as wa_client
import base64
import requests
import dns.resolver
import socket
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, Tuple
import json
import sys
import threading
from collections import OrderedDict
from time import time
import uuid
load_dotenv()
# Configuration
IMGUR_CLIENT_ID = os.getenv("IMGUR_CLIENT_ID")
URL_IMGUR = "https://api.imgur.com/3/image"
HEADERS_IMGUR = {"Authorization": f"Client-ID {IMGUR_CLIENT_ID}"} if IMGUR_CLIENT_ID else {}
# --- TTS Configuration ---
DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY")
DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak?model=aura-asteria-en"
# AssemblyAI Configuration
try:
aai.settings.api_key = os.environ["aai_key"]
transcriber = aai.Transcriber()
except KeyError:
transcriber = None
logging.warning("AAI_KEY not found. Audio transcription disabled.")
except Exception as e:
transcriber = None
logging.error(f"Failed to initialize AssemblyAI: {e}")
import firebase_admin
from firebase_admin import credentials, firestore, storage
def init_firestore_from_env(env_var: str = "FIREBASE"):
"""
Initialise firebase-admin with the service-account JSON and Storage Bucket.
"""
try:
if firebase_admin._apps:
return firestore.client()
sa_json = os.environ[env_var]
sa_info = json.loads(sa_json)
cred = credentials.Certificate(sa_info)
# FIX: Explicitly include storageBucket from environment for the storage SDK to work
bucket_name = os.environ.get("FIREBASE_STORAGE_BUCKET")
firebase_admin.initialize_app(cred, {
'storageBucket': bucket_name
})
return firestore.client()
except KeyError as e:
logging.error("%s environment variable is not set", e)
raise
except (json.JSONDecodeError, ValueError) as e:
logging.error("Invalid service-account JSON in %s: %s", env_var, e)
raise
except Exception as e:
logging.exception("Failed to initialise Firestore: %s", e)
raise
db = init_firestore_from_env()
app = Flask(__name__)
# Logging Configuration
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(module)s.%(funcName)s - %(message)s",
stream=sys.stdout,
force=True
)
logger = logging.getLogger(__name__)
# --- DNS Workaround ---
nameserver1 = os.getenv('nameserver1', '8.8.8.8')
nameserver2 = os.getenv('nameserver2', '8.8.4.4')
def setup_dns():
try:
resolver = dns.resolver.Resolver()
resolver.nameservers = [nameserver1, nameserver2]
_orig_getaddrinfo = socket.getaddrinfo
def new_getaddrinfo(*args, **kwargs):
try:
if args and isinstance(args[0], str) and 'graph.facebook.com' in args[0]:
answers = resolver.resolve(args[0], 'A')
ip = str(answers[0])
logger.info(f"DNS Override: Resolved {args[0]} to {ip} using {resolver.nameservers}")
return _orig_getaddrinfo(ip, *args[1:], **kwargs)
except dns.resolver.NoAnswer:
logger.warning(f"DNS Override: No A record found for {args[0]} using {resolver.nameservers}")
except Exception as e:
logger.error(f"DNS resolution override failed for {args}: {e}")
return _orig_getaddrinfo(*args, **kwargs)
socket.getaddrinfo = new_getaddrinfo
logger.info(f"DNS resolver configured with nameservers: {resolver.nameservers}")
except Exception as e:
logger.error(f"Failed to setup custom DNS resolver: {e}")
setup_dns()
# --- End DNS Workaround ---
VERIFY_TOKEN = os.environ.get("VERIFY_TOKEN", "30cca545-3838-48b2-80a7-9e43b1ae8ce4")
GREETING_PATTERN = re.compile(r'^\s*(hi|hello|hola|hey|greetings|sawubona)\b.*$', re.IGNORECASE)
# --- Duplicate Message Handling ---
PROCESSED_MESSAGES_TTL_HOURS = 24
class MessageDeduplicator:
def __init__(self, ttl_hours=24, max_cache_size=10000, db_client=None):
self.ttl_seconds = ttl_hours * 3600
self.max_cache_size = max_cache_size
self.db_client = db_client
self.cache = OrderedDict()
self.lock = threading.RLock()
self._cleanup_thread = threading.Thread(target=self._periodic_cleanup, daemon=True)
self._cleanup_thread.start()
def is_duplicate(self, message_id):
if not message_id: return False
with self.lock:
if message_id in self.cache:
self.cache.move_to_end(message_id)
return True
if self.db_client:
try:
doc_ref = self.db_client.collection("processed_messages").document(message_id)
if doc_ref.get().exists:
self.cache[message_id] = time()
if len(self.cache) > self.max_cache_size: self.cache.popitem(last=False)
return True
except Exception as e:
logger.error(f"Database error in is_duplicate: {e}", exc_info=True)
self._mark_processed(message_id)
return False
def _mark_processed(self, message_id):
current_time = time()
with self.lock:
self.cache[message_id] = current_time
if len(self.cache) > self.max_cache_size: self.cache.popitem(last=False)
if self.db_client:
try:
expiry = datetime.now() + timedelta(hours=self.ttl_seconds/3600)
doc_ref = self.db_client.collection("processed_messages").document(message_id)
doc_ref.set({"processed_at": firestore.SERVER_TIMESTAMP, "expires_at": expiry})
except Exception as e:
logger.error(f"Database error in _mark_processed: {e}", exc_info=True)
def _periodic_cleanup(self):
while True:
try:
with self.lock:
current_time = time()
expired_ids = [msg_id for msg_id, timestamp in list(self.cache.items()) if current_time - timestamp > self.ttl_seconds]
for msg_id in expired_ids: self.cache.pop(msg_id, None)
threading.Event().wait(3600)
except Exception as e:
logger.error(f"Error in cleanup thread: {e}", exc_info=True)
threading.Event().wait(300)
message_deduplicator = MessageDeduplicator(ttl_hours=PROCESSED_MESSAGES_TTL_HOURS, db_client=db)
def check_and_mark_processed(message_id: str) -> bool:
if not message_id:
logger.warning("Empty message ID provided to check_and_mark_processed")
return False
return message_deduplicator.is_duplicate(message_id)
def is_user_approved(mobile: str) -> Tuple[bool, Optional[Dict]]:
"""Checks approval AND returns user data (for currency preferences)."""
if not db:
logger.error("Authorization check failed: Firestore client is not available.")
return False, None
try:
normalized_mobile = mobile if mobile.startswith('+') else f'+{mobile}'
logger.info(f"AUTHORIZATION: Checking approval for mobile: '{mobile}'")
user_ref = db.collection("users").document(normalized_mobile)
user_doc = user_ref.get()
if user_doc.exists:
user_data = user_doc.to_dict()
status = user_data.get("status", "N/A").lower()
if status == "approved":
return True, user_data
return False, None
except Exception as e:
logger.error(f"AUTHORIZATION: An exception occurred for {mobile}: {e}", exc_info=True)
return False, None
def send_confirmation_buttons(mobile: str, message_summary: str, is_variance: bool = False) -> None:
# --- UPGRADE 5: Variance Flow Buttons ---
if is_variance:
buttons = [
{"reply": {"id": "confirm_resolved", "title": "✅ Settled"}},
{"reply": {"id": "confirm_unresolved", "title": "⚠️ Pending"}},
{"reply": {"id": "cancel_transaction", "title": "❌ Cancel"}}
]
else:
buttons = [
{"reply": {"id": "confirm_transaction", "title": "✅ Confirm"}},
{"reply": {"id": "cancel_transaction", "title": "❌ Cancel"}}
]
body = f"Please confirm the following transaction(s):\n\n{message_summary}"
wa_client.send_reply_buttons(recipient_id=mobile, body_text=body, button_data=buttons)
def handle_interactive_response(mobile: str, button_id: str) -> None:
if not db:
wa_client.send_text_message(mobile, "Sorry, cannot process confirmation due to a database issue.")
return
doc_ref = db.collection("users").document(mobile).collection("temp_transactions").document('pending')
try:
transaction_doc = doc_ref.get()
if not transaction_doc.exists:
wa_client.send_text_message(mobile, "Hmm, I couldn't find a transaction waiting for confirmation.")
return
pending_transactions = transaction_doc.to_dict().get("transactions", [])
if not pending_transactions:
wa_client.send_text_message(mobile, "There seems to be an issue with the pending transaction data.")
doc_ref.delete()
return
# --- Button Logic ---
if button_id == "confirm_transaction":
# Standard Confirm
msg = process_intent(pending_transactions, mobile, force_settled=False)
doc_ref.delete()
elif button_id == "confirm_resolved":
# "Settled" - Force Settled
msg = process_intent(pending_transactions, mobile, force_settled=True)
doc_ref.delete()
elif button_id == "confirm_unresolved":
# "Pending" - Keep Debt/Change
msg = process_intent(pending_transactions, mobile, force_settled=False)
doc_ref.delete()
elif button_id == "cancel_transaction":
msg = "Transaction cancelled as requested."
doc_ref.delete()
else:
msg = "Sorry, I didn't understand that button press."
wa_client.send_text_message(mobile, msg)
except Exception as e:
logger.error(f"Error handling interactive response for user {mobile}: {e}", exc_info=True)
wa_client.send_text_message(mobile, "Sorry, something went wrong while handling your confirmation.")
def process_text_message(message_text: str, mobile: str, user_settings: Optional[Dict] = None) -> Optional[str]:
"""
Processes incoming text messages using the Language Sandwich pattern:
Detect -> Translate to English -> Process -> Translate Output -> Send.
Also handles Variance Flow logic.
"""
logger.info(f"Processing text message from {mobile}: '{message_text}'")
# 1. Detect Language and Translate to English (Upgrade 4)
lang_data = detect_and_translate_input(message_text)
english_text = lang_data.get('english_text', message_text)
detected_lang = lang_data.get('detected_lang', 'English')
logger.info(f"Detected language: {detected_lang}. Process text: {english_text}")
if GREETING_PATTERN.match(english_text):
base_msg = "Hi there! I'm Qx-SmartLedger, your business assistant. How can I help?"
final_msg = translate_output(base_msg, detected_lang)
wa_client.send_text_message(mobile, final_msg)
return final_msg
user_currency = "R"
if user_settings:
user_currency = user_settings.get('currency') or user_settings.get('settings', {}).get('currency', "R")
# 2. Process logic using English Text
llm_response_str = generateResponse(english_text, currency=user_currency)
parsed_trans_data = parse_multiple_transactions(llm_response_str)
response_msg = ""
send_image = False
image_path = None
if not parsed_trans_data:
response_msg = "Sorry, I couldn't quite understand that. Could you please rephrase?"
else:
primary_intent = parsed_trans_data[0].get('intent', '').lower()
primary_type = parsed_trans_data[0].get('transaction_type', '').lower()
if primary_intent == 'read' or primary_type == 'query':
response_data = process_intent(parsed_trans_data, mobile)
if isinstance(response_data, str) and os.path.isfile(response_data) and HEADERS_IMGUR:
send_image = True
image_path = response_data
else:
response_msg = str(response_data)
elif primary_intent in ['create', 'update', 'delete', 'reset_account']:
if primary_intent == 'reset_account':
response_msg = process_intent(parsed_trans_data, mobile)
else:
if persist_temporary_transaction(parsed_trans_data, mobile):
transaction_summary = format_transaction_response(parsed_trans_data)
# --- VARIANCE DETECTION FOR BUTTONS ---
has_payment_input = any('amount_paid' in t.get('details', {}) for t in parsed_trans_data)
is_variance = (has_payment_input and primary_intent == 'create' and primary_type == 'sale')
prompt_text = f"Please confirm the following transaction(s):\n\n{transaction_summary}"
trans_summary_translated = translate_output(prompt_text, detected_lang)
send_confirmation_buttons(mobile, trans_summary_translated, is_variance=is_variance)
return None
else:
response_msg = "Sorry, I couldn't save your transaction for confirmation."
else:
response_msg = f"I'm not sure how to handle the action '{primary_intent}'."
# 3. Translate Output back to Source Language (Upgrade 4)
if response_msg:
final_response = translate_output(response_msg, detected_lang)
if send_image and image_path:
try:
with open(image_path, "rb") as f:
response_imgur = requests.post(URL_IMGUR, headers=HEADERS_IMGUR, files={"image": f})
response_imgur.raise_for_status()
imgur_data = response_imgur.json()
if imgur_data.get("success"):
wa_client.send_image_message(recipient_id=mobile, image_url=imgur_data["data"]["link"])
os.remove(image_path)
return None
else:
wa_client.send_text_message(mobile, final_response) # Fallback to text
os.remove(image_path)
return final_response
except Exception as e:
logger.error(f"Image upload failed: {e}", exc_info=True)
wa_client.send_text_message(mobile, final_response)
if os.path.exists(image_path): os.remove(image_path)
return final_response
else:
wa_client.send_text_message(mobile, final_response)
return final_response
return None
# --- NEW FEATURE: Audio Helpers ---
def _deepgram_tts_to_mp3(text: str) -> Optional[str]:
"""Generates MP3 from text using DeepGram."""
if not DEEPGRAM_API_KEY:
logger.warning("DEEPGRAM_API_KEY not found. Skipping TTS.")
return None
try:
headers = {
"Authorization": f"Token {DEEPGRAM_API_KEY}",
"Content-Type": "application/json"
}
payload = {"text": text}
response = requests.post(DEEPGRAM_TTS_URL, headers=headers, json=payload)
response.raise_for_status()
filename = f"tts_{uuid.uuid4()}.mp3"
filepath = os.path.join(os.getcwd(), filename)
with open(filepath, "wb") as f:
f.write(response.content)
return filepath
except Exception as e:
logger.error(f"DeepGram TTS failed: {e}", exc_info=True)
return None
def _upload_to_firebase_storage(file_path: str) -> Optional[str]:
"""Uploads file to Firebase Storage and returns signed URL."""
try:
# storage.bucket() now works because init_firestore_from_env was updated with storageBucket
bucket = storage.bucket()
blob = bucket.blob(f"audio_responses/{os.path.basename(file_path)}")
blob.upload_from_filename(file_path)
url = blob.generate_signed_url(expiration=timedelta(hours=1))
return url
except Exception as e:
logger.error(f"Firebase Storage upload failed: {e}", exc_info=True)
return None
def process_audio_message(audio_id: str, mobile: str, user_settings: Optional[Dict]) -> None:
"""Processes incoming audio messages: Transcribe -> Process -> TTS -> Send Audio."""
if not transcriber:
wa_client.send_text_message(mobile, "Sorry, I cannot process audio messages at the moment.")
return
media_url = wa_client.get_media_url(audio_id)
if not media_url:
wa_client.send_text_message(mobile, "Sorry, I couldn't retrieve your audio message.")
return
temp_dir = "temp_audio"
os.makedirs(temp_dir, exist_ok=True)
audio_filename = os.path.join(temp_dir, f"{mobile}_{audio_id}.ogg")
downloaded_path = wa_client.download_media(media_url, audio_filename)
if not downloaded_path:
wa_client.send_text_message(mobile, "Sorry, I couldn't download your audio message.")
return
try:
transcript = transcriber.transcribe(downloaded_path)
if transcript.status == aai.TranscriptStatus.error:
wa_client.send_text_message(mobile, f"Sorry, I couldn't transcribe your audio. Error: {transcript.error}")
elif not transcript.text:
wa_client.send_text_message(mobile, "Sorry, I couldn't understand the audio.")
else:
text_response = process_text_message(transcript.text, mobile, user_settings)
if text_response:
mp3_path = _deepgram_tts_to_mp3(text_response)
if mp3_path:
audio_url = _upload_to_firebase_storage(mp3_path)
if audio_url:
wa_client.send_audio_message(mobile, audio_url=audio_url)
else:
wa_client.send_audio_message(mobile, audio_path=mp3_path)
if os.path.exists(mp3_path):
os.remove(mp3_path)
finally:
if os.path.exists(downloaded_path):
os.remove(downloaded_path)
def process_image_message(image_id: str, caption: Optional[str], mobile: str, user_settings: Optional[Dict]) -> None:
"""Processes incoming image messages."""
logger.info(f"Processing image message (ID: {image_id}) from {mobile} with caption: '{caption}'")
wa_client.send_text_message(mobile, "Analyzing your image, please wait a moment...")
media_url = wa_client.get_media_url(image_id)
if not media_url:
wa_client.send_text_message(mobile, "Sorry, I couldn't retrieve your image from WhatsApp's servers.")
return
temp_dir = "temp_images"
os.makedirs(temp_dir, exist_ok=True)
image_filename = os.path.join(temp_dir, f"{mobile}_{image_id}.jpg")
downloaded_path = wa_client.download_media(media_url, image_filename)
if not downloaded_path:
wa_client.send_text_message(mobile, "Sorry, I couldn't download your image for processing.")
return
try:
with open(downloaded_path, "rb") as f:
image_bytes = f.read()
generated_query = process_image_and_generate_query(image_bytes, caption)
if not generated_query or "Error:" in generated_query:
wa_client.send_text_message(mobile, f"I had some trouble understanding that image. {generated_query}")
else:
process_text_message(generated_query, mobile, user_settings)
except Exception as e:
logger.error(f"Error during vision processing: {e}", exc_info=True)
wa_client.send_text_message(mobile, "Sorry, an unexpected error occurred while analyzing your image.")
finally:
if os.path.exists(downloaded_path):
try:
os.remove(downloaded_path)
except: pass
@app.route("/", methods=["GET", "POST"])
def webhook_handler():
if request.method == "GET":
mode = request.args.get("hub.mode")
token = request.args.get("hub.verify_token")
challenge = request.args.get("hub.challenge")
if mode == "subscribe" and token == VERIFY_TOKEN:
return make_response(challenge, 200)
else:
return make_response("Verification failed", 403)
if request.method == "POST":
try:
data = request.get_json()
msg_details = wa_client.get_message_details(data)
if not msg_details:
return make_response("ok", 200)
message_id, mobile, message_type = msg_details.get("id"), msg_details.get("from"), msg_details.get("type")
if check_and_mark_processed(message_id):
return make_response("ok - duplicate", 200)
is_approved, user_data = is_user_approved(mobile)
if not is_approved:
wa_client.send_text_message(mobile, "Access denied. Please contact your administrator.")
return make_response("ok", 200)
if message_type == "text":
process_text_message(msg_details.get("text"), mobile, user_data)
elif message_type == "audio":
process_audio_message(msg_details.get("audio_id"), mobile, user_data)
elif message_type == "image":
process_image_message(msg_details.get("image_id"), msg_details.get("caption"), mobile, user_data)
elif message_type == "interactive":
handle_interactive_response(mobile, msg_details.get("button_reply_id"))
except Exception as e:
logger.error(f"Unhandled exception: {e}", exc_info=True)
return make_response("ok", 200)
if __name__ == '__main__':
port = int(os.environ.get("PORT", 7860))
debug_mode = os.environ.get("FLASK_DEBUG", "False").lower() == "true"
if not debug_mode:
from waitress import serve
serve(app, host="0.0.0.0", port=port)
else:
app.run(debug=True, host="0.0.0.0", port=port)