from flask import Flask, request, make_response, jsonify import os import logging import re from dotenv import load_dotenv import assemblyai as aai from utility import ( generateResponse, parse_multiple_transactions, process_image_and_generate_query, persist_temporary_transaction, process_intent, format_transaction_response, detect_and_translate_input, translate_output ) import whatsapp_client as wa_client import base64 import requests import dns.resolver import socket from datetime import datetime, timedelta from typing import Optional, Dict, Any, Tuple import json import sys import threading from collections import OrderedDict from time import time import uuid load_dotenv() # Configuration IMGUR_CLIENT_ID = os.getenv("IMGUR_CLIENT_ID") URL_IMGUR = "https://api.imgur.com/3/image" HEADERS_IMGUR = {"Authorization": f"Client-ID {IMGUR_CLIENT_ID}"} if IMGUR_CLIENT_ID else {} # --- TTS Configuration --- DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY") DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak?model=aura-asteria-en" # AssemblyAI Configuration try: aai.settings.api_key = os.environ["aai_key"] transcriber = aai.Transcriber() except KeyError: transcriber = None logging.warning("AAI_KEY not found. Audio transcription disabled.") except Exception as e: transcriber = None logging.error(f"Failed to initialize AssemblyAI: {e}") import firebase_admin from firebase_admin import credentials, firestore, storage def init_firestore_from_env(env_var: str = "FIREBASE"): """ Initialise firebase-admin with the service-account JSON and Storage Bucket. """ try: if firebase_admin._apps: return firestore.client() sa_json = os.environ[env_var] sa_info = json.loads(sa_json) cred = credentials.Certificate(sa_info) # FIX: Explicitly include storageBucket from environment for the storage SDK to work bucket_name = os.environ.get("FIREBASE_STORAGE_BUCKET") firebase_admin.initialize_app(cred, { 'storageBucket': bucket_name }) return firestore.client() except KeyError as e: logging.error("%s environment variable is not set", e) raise except (json.JSONDecodeError, ValueError) as e: logging.error("Invalid service-account JSON in %s: %s", env_var, e) raise except Exception as e: logging.exception("Failed to initialise Firestore: %s", e) raise db = init_firestore_from_env() app = Flask(__name__) # Logging Configuration logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(module)s.%(funcName)s - %(message)s", stream=sys.stdout, force=True ) logger = logging.getLogger(__name__) # --- DNS Workaround --- nameserver1 = os.getenv('nameserver1', '8.8.8.8') nameserver2 = os.getenv('nameserver2', '8.8.4.4') def setup_dns(): try: resolver = dns.resolver.Resolver() resolver.nameservers = [nameserver1, nameserver2] _orig_getaddrinfo = socket.getaddrinfo def new_getaddrinfo(*args, **kwargs): try: if args and isinstance(args[0], str) and 'graph.facebook.com' in args[0]: answers = resolver.resolve(args[0], 'A') ip = str(answers[0]) logger.info(f"DNS Override: Resolved {args[0]} to {ip} using {resolver.nameservers}") return _orig_getaddrinfo(ip, *args[1:], **kwargs) except dns.resolver.NoAnswer: logger.warning(f"DNS Override: No A record found for {args[0]} using {resolver.nameservers}") except Exception as e: logger.error(f"DNS resolution override failed for {args}: {e}") return _orig_getaddrinfo(*args, **kwargs) socket.getaddrinfo = new_getaddrinfo logger.info(f"DNS resolver configured with nameservers: {resolver.nameservers}") except Exception as e: logger.error(f"Failed to setup custom DNS resolver: {e}") setup_dns() # --- End DNS Workaround --- VERIFY_TOKEN = os.environ.get("VERIFY_TOKEN", "30cca545-3838-48b2-80a7-9e43b1ae8ce4") GREETING_PATTERN = re.compile(r'^\s*(hi|hello|hola|hey|greetings|sawubona)\b.*$', re.IGNORECASE) # --- Duplicate Message Handling --- PROCESSED_MESSAGES_TTL_HOURS = 24 class MessageDeduplicator: def __init__(self, ttl_hours=24, max_cache_size=10000, db_client=None): self.ttl_seconds = ttl_hours * 3600 self.max_cache_size = max_cache_size self.db_client = db_client self.cache = OrderedDict() self.lock = threading.RLock() self._cleanup_thread = threading.Thread(target=self._periodic_cleanup, daemon=True) self._cleanup_thread.start() def is_duplicate(self, message_id): if not message_id: return False with self.lock: if message_id in self.cache: self.cache.move_to_end(message_id) return True if self.db_client: try: doc_ref = self.db_client.collection("processed_messages").document(message_id) if doc_ref.get().exists: self.cache[message_id] = time() if len(self.cache) > self.max_cache_size: self.cache.popitem(last=False) return True except Exception as e: logger.error(f"Database error in is_duplicate: {e}", exc_info=True) self._mark_processed(message_id) return False def _mark_processed(self, message_id): current_time = time() with self.lock: self.cache[message_id] = current_time if len(self.cache) > self.max_cache_size: self.cache.popitem(last=False) if self.db_client: try: expiry = datetime.now() + timedelta(hours=self.ttl_seconds/3600) doc_ref = self.db_client.collection("processed_messages").document(message_id) doc_ref.set({"processed_at": firestore.SERVER_TIMESTAMP, "expires_at": expiry}) except Exception as e: logger.error(f"Database error in _mark_processed: {e}", exc_info=True) def _periodic_cleanup(self): while True: try: with self.lock: current_time = time() expired_ids = [msg_id for msg_id, timestamp in list(self.cache.items()) if current_time - timestamp > self.ttl_seconds] for msg_id in expired_ids: self.cache.pop(msg_id, None) threading.Event().wait(3600) except Exception as e: logger.error(f"Error in cleanup thread: {e}", exc_info=True) threading.Event().wait(300) message_deduplicator = MessageDeduplicator(ttl_hours=PROCESSED_MESSAGES_TTL_HOURS, db_client=db) def check_and_mark_processed(message_id: str) -> bool: if not message_id: logger.warning("Empty message ID provided to check_and_mark_processed") return False return message_deduplicator.is_duplicate(message_id) def is_user_approved(mobile: str) -> Tuple[bool, Optional[Dict]]: """Checks approval AND returns user data (for currency preferences).""" if not db: logger.error("Authorization check failed: Firestore client is not available.") return False, None try: normalized_mobile = mobile if mobile.startswith('+') else f'+{mobile}' logger.info(f"AUTHORIZATION: Checking approval for mobile: '{mobile}'") user_ref = db.collection("users").document(normalized_mobile) user_doc = user_ref.get() if user_doc.exists: user_data = user_doc.to_dict() status = user_data.get("status", "N/A").lower() if status == "approved": return True, user_data return False, None except Exception as e: logger.error(f"AUTHORIZATION: An exception occurred for {mobile}: {e}", exc_info=True) return False, None def send_confirmation_buttons(mobile: str, message_summary: str, is_variance: bool = False) -> None: # --- UPGRADE 5: Variance Flow Buttons --- if is_variance: buttons = [ {"reply": {"id": "confirm_resolved", "title": "✅ Settled"}}, {"reply": {"id": "confirm_unresolved", "title": "⚠️ Pending"}}, {"reply": {"id": "cancel_transaction", "title": "❌ Cancel"}} ] else: buttons = [ {"reply": {"id": "confirm_transaction", "title": "✅ Confirm"}}, {"reply": {"id": "cancel_transaction", "title": "❌ Cancel"}} ] body = f"Please confirm the following transaction(s):\n\n{message_summary}" wa_client.send_reply_buttons(recipient_id=mobile, body_text=body, button_data=buttons) def handle_interactive_response(mobile: str, button_id: str) -> None: if not db: wa_client.send_text_message(mobile, "Sorry, cannot process confirmation due to a database issue.") return doc_ref = db.collection("users").document(mobile).collection("temp_transactions").document('pending') try: transaction_doc = doc_ref.get() if not transaction_doc.exists: wa_client.send_text_message(mobile, "Hmm, I couldn't find a transaction waiting for confirmation.") return pending_transactions = transaction_doc.to_dict().get("transactions", []) if not pending_transactions: wa_client.send_text_message(mobile, "There seems to be an issue with the pending transaction data.") doc_ref.delete() return # --- Button Logic --- if button_id == "confirm_transaction": # Standard Confirm msg = process_intent(pending_transactions, mobile, force_settled=False) doc_ref.delete() elif button_id == "confirm_resolved": # "Settled" - Force Settled msg = process_intent(pending_transactions, mobile, force_settled=True) doc_ref.delete() elif button_id == "confirm_unresolved": # "Pending" - Keep Debt/Change msg = process_intent(pending_transactions, mobile, force_settled=False) doc_ref.delete() elif button_id == "cancel_transaction": msg = "Transaction cancelled as requested." doc_ref.delete() else: msg = "Sorry, I didn't understand that button press." wa_client.send_text_message(mobile, msg) except Exception as e: logger.error(f"Error handling interactive response for user {mobile}: {e}", exc_info=True) wa_client.send_text_message(mobile, "Sorry, something went wrong while handling your confirmation.") def process_text_message(message_text: str, mobile: str, user_settings: Optional[Dict] = None) -> Optional[str]: """ Processes incoming text messages using the Language Sandwich pattern: Detect -> Translate to English -> Process -> Translate Output -> Send. Also handles Variance Flow logic. """ logger.info(f"Processing text message from {mobile}: '{message_text}'") # 1. Detect Language and Translate to English (Upgrade 4) lang_data = detect_and_translate_input(message_text) english_text = lang_data.get('english_text', message_text) detected_lang = lang_data.get('detected_lang', 'English') logger.info(f"Detected language: {detected_lang}. Process text: {english_text}") if GREETING_PATTERN.match(english_text): base_msg = "Hi there! I'm Qx-SmartLedger, your business assistant. How can I help?" final_msg = translate_output(base_msg, detected_lang) wa_client.send_text_message(mobile, final_msg) return final_msg user_currency = "R" if user_settings: user_currency = user_settings.get('currency') or user_settings.get('settings', {}).get('currency', "R") # 2. Process logic using English Text llm_response_str = generateResponse(english_text, currency=user_currency) parsed_trans_data = parse_multiple_transactions(llm_response_str) response_msg = "" send_image = False image_path = None if not parsed_trans_data: response_msg = "Sorry, I couldn't quite understand that. Could you please rephrase?" else: primary_intent = parsed_trans_data[0].get('intent', '').lower() primary_type = parsed_trans_data[0].get('transaction_type', '').lower() if primary_intent == 'read' or primary_type == 'query': response_data = process_intent(parsed_trans_data, mobile) if isinstance(response_data, str) and os.path.isfile(response_data) and HEADERS_IMGUR: send_image = True image_path = response_data else: response_msg = str(response_data) elif primary_intent in ['create', 'update', 'delete', 'reset_account']: if primary_intent == 'reset_account': response_msg = process_intent(parsed_trans_data, mobile) else: if persist_temporary_transaction(parsed_trans_data, mobile): transaction_summary = format_transaction_response(parsed_trans_data) # --- VARIANCE DETECTION FOR BUTTONS --- has_payment_input = any('amount_paid' in t.get('details', {}) for t in parsed_trans_data) is_variance = (has_payment_input and primary_intent == 'create' and primary_type == 'sale') prompt_text = f"Please confirm the following transaction(s):\n\n{transaction_summary}" trans_summary_translated = translate_output(prompt_text, detected_lang) send_confirmation_buttons(mobile, trans_summary_translated, is_variance=is_variance) return None else: response_msg = "Sorry, I couldn't save your transaction for confirmation." else: response_msg = f"I'm not sure how to handle the action '{primary_intent}'." # 3. Translate Output back to Source Language (Upgrade 4) if response_msg: final_response = translate_output(response_msg, detected_lang) if send_image and image_path: try: with open(image_path, "rb") as f: response_imgur = requests.post(URL_IMGUR, headers=HEADERS_IMGUR, files={"image": f}) response_imgur.raise_for_status() imgur_data = response_imgur.json() if imgur_data.get("success"): wa_client.send_image_message(recipient_id=mobile, image_url=imgur_data["data"]["link"]) os.remove(image_path) return None else: wa_client.send_text_message(mobile, final_response) # Fallback to text os.remove(image_path) return final_response except Exception as e: logger.error(f"Image upload failed: {e}", exc_info=True) wa_client.send_text_message(mobile, final_response) if os.path.exists(image_path): os.remove(image_path) return final_response else: wa_client.send_text_message(mobile, final_response) return final_response return None # --- NEW FEATURE: Audio Helpers --- def _deepgram_tts_to_mp3(text: str) -> Optional[str]: """Generates MP3 from text using DeepGram.""" if not DEEPGRAM_API_KEY: logger.warning("DEEPGRAM_API_KEY not found. Skipping TTS.") return None try: headers = { "Authorization": f"Token {DEEPGRAM_API_KEY}", "Content-Type": "application/json" } payload = {"text": text} response = requests.post(DEEPGRAM_TTS_URL, headers=headers, json=payload) response.raise_for_status() filename = f"tts_{uuid.uuid4()}.mp3" filepath = os.path.join(os.getcwd(), filename) with open(filepath, "wb") as f: f.write(response.content) return filepath except Exception as e: logger.error(f"DeepGram TTS failed: {e}", exc_info=True) return None def _upload_to_firebase_storage(file_path: str) -> Optional[str]: """Uploads file to Firebase Storage and returns signed URL.""" try: # storage.bucket() now works because init_firestore_from_env was updated with storageBucket bucket = storage.bucket() blob = bucket.blob(f"audio_responses/{os.path.basename(file_path)}") blob.upload_from_filename(file_path) url = blob.generate_signed_url(expiration=timedelta(hours=1)) return url except Exception as e: logger.error(f"Firebase Storage upload failed: {e}", exc_info=True) return None def process_audio_message(audio_id: str, mobile: str, user_settings: Optional[Dict]) -> None: """Processes incoming audio messages: Transcribe -> Process -> TTS -> Send Audio.""" if not transcriber: wa_client.send_text_message(mobile, "Sorry, I cannot process audio messages at the moment.") return media_url = wa_client.get_media_url(audio_id) if not media_url: wa_client.send_text_message(mobile, "Sorry, I couldn't retrieve your audio message.") return temp_dir = "temp_audio" os.makedirs(temp_dir, exist_ok=True) audio_filename = os.path.join(temp_dir, f"{mobile}_{audio_id}.ogg") downloaded_path = wa_client.download_media(media_url, audio_filename) if not downloaded_path: wa_client.send_text_message(mobile, "Sorry, I couldn't download your audio message.") return try: transcript = transcriber.transcribe(downloaded_path) if transcript.status == aai.TranscriptStatus.error: wa_client.send_text_message(mobile, f"Sorry, I couldn't transcribe your audio. Error: {transcript.error}") elif not transcript.text: wa_client.send_text_message(mobile, "Sorry, I couldn't understand the audio.") else: text_response = process_text_message(transcript.text, mobile, user_settings) if text_response: mp3_path = _deepgram_tts_to_mp3(text_response) if mp3_path: audio_url = _upload_to_firebase_storage(mp3_path) if audio_url: wa_client.send_audio_message(mobile, audio_url=audio_url) else: wa_client.send_audio_message(mobile, audio_path=mp3_path) if os.path.exists(mp3_path): os.remove(mp3_path) finally: if os.path.exists(downloaded_path): os.remove(downloaded_path) def process_image_message(image_id: str, caption: Optional[str], mobile: str, user_settings: Optional[Dict]) -> None: """Processes incoming image messages.""" logger.info(f"Processing image message (ID: {image_id}) from {mobile} with caption: '{caption}'") wa_client.send_text_message(mobile, "Analyzing your image, please wait a moment...") media_url = wa_client.get_media_url(image_id) if not media_url: wa_client.send_text_message(mobile, "Sorry, I couldn't retrieve your image from WhatsApp's servers.") return temp_dir = "temp_images" os.makedirs(temp_dir, exist_ok=True) image_filename = os.path.join(temp_dir, f"{mobile}_{image_id}.jpg") downloaded_path = wa_client.download_media(media_url, image_filename) if not downloaded_path: wa_client.send_text_message(mobile, "Sorry, I couldn't download your image for processing.") return try: with open(downloaded_path, "rb") as f: image_bytes = f.read() generated_query = process_image_and_generate_query(image_bytes, caption) if not generated_query or "Error:" in generated_query: wa_client.send_text_message(mobile, f"I had some trouble understanding that image. {generated_query}") else: process_text_message(generated_query, mobile, user_settings) except Exception as e: logger.error(f"Error during vision processing: {e}", exc_info=True) wa_client.send_text_message(mobile, "Sorry, an unexpected error occurred while analyzing your image.") finally: if os.path.exists(downloaded_path): try: os.remove(downloaded_path) except: pass @app.route("/", methods=["GET", "POST"]) def webhook_handler(): if request.method == "GET": mode = request.args.get("hub.mode") token = request.args.get("hub.verify_token") challenge = request.args.get("hub.challenge") if mode == "subscribe" and token == VERIFY_TOKEN: return make_response(challenge, 200) else: return make_response("Verification failed", 403) if request.method == "POST": try: data = request.get_json() msg_details = wa_client.get_message_details(data) if not msg_details: return make_response("ok", 200) message_id, mobile, message_type = msg_details.get("id"), msg_details.get("from"), msg_details.get("type") if check_and_mark_processed(message_id): return make_response("ok - duplicate", 200) is_approved, user_data = is_user_approved(mobile) if not is_approved: wa_client.send_text_message(mobile, "Access denied. Please contact your administrator.") return make_response("ok", 200) if message_type == "text": process_text_message(msg_details.get("text"), mobile, user_data) elif message_type == "audio": process_audio_message(msg_details.get("audio_id"), mobile, user_data) elif message_type == "image": process_image_message(msg_details.get("image_id"), msg_details.get("caption"), mobile, user_data) elif message_type == "interactive": handle_interactive_response(mobile, msg_details.get("button_reply_id")) except Exception as e: logger.error(f"Unhandled exception: {e}", exc_info=True) return make_response("ok", 200) if __name__ == '__main__': port = int(os.environ.get("PORT", 7860)) debug_mode = os.environ.get("FLASK_DEBUG", "False").lower() == "true" if not debug_mode: from waitress import serve serve(app, host="0.0.0.0", port=port) else: app.run(debug=True, host="0.0.0.0", port=port)