from flask import Flask, request, make_response import os import logging import re import sys import json import uuid import threading from collections import OrderedDict from time import time from datetime import datetime, timedelta from typing import Optional, Dict, Any, Tuple, List from dotenv import load_dotenv import assemblyai as aai import requests import dns.resolver import socket from utility import ( generateResponse, parse_multiple_transactions, parse_vision_sale_transactions, process_image_and_generate_query, parse_inventory_json, format_inventory_message, confirm_stock_in, persist_temporary_transaction, persist_pending_image, retrieve_pending_image, process_intent, format_transaction_response, detect_and_translate_input, translate_output, cap_for_tts, check_expiry_nudge, apply_price_override, get_user_currency, set_user_currency, CURRENCY_PROMPT, ) import whatsapp_client as wa_client load_dotenv() # ── Config ───────────────────────────────────────────────────────────────────── IMGUR_CLIENT_ID = os.getenv("IMGUR_CLIENT_ID") URL_IMGUR = "https://api.imgur.com/3/image" HEADERS_IMGUR = {"Authorization": f"Client-ID {IMGUR_CLIENT_ID}"} if IMGUR_CLIENT_ID else {} DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY") DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak?model=aura-asteria-en" # AssemblyAI try: aai.settings.api_key = os.environ["aai_key"] transcriber = aai.Transcriber() except KeyError: transcriber = None logging.warning("AAI_KEY not found — audio transcription disabled.") except Exception as e: transcriber = None logging.error(f"Failed to initialise AssemblyAI: {e}") # Firebase import firebase_admin from firebase_admin import credentials, firestore, storage def init_firestore_from_env(env_var: str = "FIREBASE"): try: if firebase_admin._apps: return firestore.client() sa_json = os.environ[env_var] sa_info = json.loads(sa_json) cred = credentials.Certificate(sa_info) bucket = os.environ.get("FIREBASE_STORAGE_BUCKET") firebase_admin.initialize_app(cred, {"storageBucket": bucket}) return firestore.client() except KeyError as e: logging.error("%s env var not set", e); raise except Exception as e: logging.exception("Failed to initialise Firestore: %s", e); raise db = init_firestore_from_env() # Wire Firestore into whatsapp_client (kept for compat) wa_client.set_db(db) app = Flask(__name__) # ── Logging ──────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(module)s.%(funcName)s - %(message)s", stream=sys.stdout, force=True, ) logger = logging.getLogger(__name__) # ── DNS setup ────────────────────────────────────────────────────────────────── nameserver1 = os.getenv("nameserver1", "8.8.8.8") nameserver2 = os.getenv("nameserver2", "8.8.4.4") def setup_dns() -> None: try: resolver = dns.resolver.Resolver() resolver.nameservers = [nameserver1, nameserver2] overrides = {} for host in ["graph.facebook.com"]: try: ip = str(resolver.resolve(host, "A")[0]) overrides[host] = ip logger.info(f"DNS override: {host} -> {ip}") except Exception as e: logger.warning(f"Could not resolve {host}: {e}") proxy_url = os.getenv("WHATSAPP_PROXY_URL", "").strip() if proxy_url: from urllib.parse import urlparse as _up proxy_host = _up(proxy_url).hostname if proxy_host and proxy_host not in overrides: try: ip = str(resolver.resolve(proxy_host, "A")[0]) overrides[proxy_host] = ip logger.info(f"DNS override: {proxy_host} -> {ip}") except Exception as e: logger.warning(f"Could not resolve proxy host {proxy_host}: {e}") if overrides: wa_client.configure_session(overrides) except Exception as e: logger.warning(f"DNS setup failed: {e}") setup_dns() # ── Constants ────────────────────────────────────────────────────────────────── VERIFY_TOKEN = os.environ.get("VERIFY_TOKEN", "30cca545-3838-48b2-80a7-9e43b1ae8ce4") GREETING_PATTERN = re.compile(r'^\s*(hi|hello|hola|hey|greetings|sawubona)\b.*$', re.IGNORECASE) # Known currency codes/symbols for reply detection _KNOWN_CURRENCIES = { "r": "R", "zar": "R", "rand": "R", "rands": "R", "south africa rand": "R", "south african rand": "R", "sa rand": "R", "sa rands": "R", "usd": "USD", "$": "USD", "dollar": "USD", "dollars": "USD", "us dollar": "USD", "us dollars": "USD", "american dollar": "USD", "zwg": "ZWG", "zig": "ZWG", "zimbabwe gold": "ZWG", "kes": "KES", "ksh": "KES", "shilling": "KES", "kenyan shilling": "KES", "ngn": "NGN", "naira": "NGN", "nigerian naira": "NGN", "ghs": "GHS", "cedi": "GHS", "ghana cedi": "GHS", "eur": "EUR", "euro": "EUR", "euros": "EUR", "gbp": "GBP", "pound": "GBP", "pounds": "GBP", "mwk": "MWK", "kwacha": "MWK", "malawi kwacha": "MWK", "tzs": "TZS", "tshs": "TZS", "tanzanian shilling": "TZS", "ugx": "UGX", "uganda shilling": "UGX", "zamk": "ZMW", "zmw": "ZMW", "zambian kwacha": "ZMW", "bwp": "BWP", "pula": "BWP", "botswana pula": "BWP", "mzn": "MZN", "metical": "MZN", } # Ordered phrase list for mid-sentence currency detection _CURRENCY_PHRASES = [ ("south african rand", "R"), ("south africa rand", "R"), ("sa rands", "R"), ("sa rand", "R"), ("us dollars", "USD"), ("us dollar", "USD"), ("american dollar", "USD"), ("zimbabwe gold", "ZWG"), ("kenyan shilling", "KES"), ("nigerian naira", "NGN"), ("ghana cedi", "GHS"), ("zambian kwacha", "ZMW"), ("botswana pula", "BWP"), ("tanzanian shilling", "TZS"), ("malawi kwacha", "MWK"), ("rands", "R"), ("rand", "R"), ("dollars", "USD"), ("dollar", "USD"), ("euros", "EUR"), ("euro", "EUR"), ("pounds", "GBP"), ("pound", "GBP"), ("kwacha", "MWK"), ("pula", "BWP"), ("shilling", "KES"), ("naira", "NGN"), ("cedi", "GHS"), ("metical", "MZN"), ] def _parse_currency_reply(text: str) -> Optional[str]: """ Detect currency from any natural language phrasing. Handles: 'R', 'USD', 'Rands', 'South Africa Rands', 'I would like to use South African Rand', 'Please use USD'. """ t = text.strip().lower().rstrip(".") # Direct lookup if t in _KNOWN_CURRENCIES: return _KNOWN_CURRENCIES[t] # Phrase match anywhere in text (handles "please use south africa rands") for phrase, code in _CURRENCY_PHRASES: if phrase in t: return code # Single short token that looks like a currency code if re.match(r'^[a-z$£€₦₵]{1,4}$', t): return t.upper() return None def _store_context(db, mobile: str, role: str, text: str) -> None: """ Store the last exchange in Firestore for short-term context. Keeps only the last 3 user+bot pairs (6 messages). role: "user" | "bot" """ try: col = db.collection("users").document(mobile).collection("context") col.add({ "role": role, "text": text[:500], # cap to avoid Firestore bloat "ts": firestore.SERVER_TIMESTAMP, }) # Prune to last 6 messages docs = list(col.order_by("ts", direction=firestore.Query.DESCENDING).limit(20).get()) if len(docs) > 6: for doc in docs[6:]: doc.reference.delete() except Exception as e: logger.warning(f"_store_context: {e}") def _load_context(db, mobile: str) -> List[Dict]: """ Load last 3 user+bot exchanges as [{role, text}] oldest-first. Used to resolve pronouns like 'it', 'that', 'this', 'them'. """ try: docs = db.collection("users").document(mobile).collection("context") .order_by("ts", direction=firestore.Query.DESCENDING) .limit(6).get() return [{"role": d.to_dict()["role"], "text": d.to_dict()["text"]} for d in reversed(docs)] except Exception as e: logger.warning(f"_load_context: {e}") return [] # Pronoun patterns that signal the current message refers to something # mentioned in a previous exchange _PRONOUN_RE = re.compile( r'\b(it|its|that|this|them|they|those|these|the same|same one|' r'that one|this one|that transaction|this transaction|' r'the item|the product|the sale|the loan|the expense)\b', re.IGNORECASE ) def _resolve_context(text: str, context: List[Dict]) -> str: """ If the current message contains pronouns/references that need context, prepend a brief context hint so the NLP can resolve them. Only adds context if pronouns are detected — avoids polluting clean messages. """ if not context or not _PRONOUN_RE.search(text): return text # Build a concise context hint from the last 3 exchanges # Only include user messages (what they were talking about) user_msgs = [c["text"] for c in context if c["role"] == "user"][-3:] if not user_msgs: return text hint = "Recent context: " + " | ".join(user_msgs) # Cap hint length if len(hint) > 200: hint = hint[:200] + "..." return f"[{hint}]\n\nCurrent message: {text}" def _parse_price_reply(text: str) -> Optional[Dict[str, float]]: """ Parse a price reply like: banana: 3.50 orange: 4 apple: 5.00 Returns {item_name: price} or None if not a price reply. """ if text.strip().lower() == "skip": return {} # Empty dict = skip signal lines = [l.strip() for l in text.strip().splitlines() if l.strip()] prices = {} for line in lines: # Match "item: price" or "item - price" or "item 3.50" match = re.match(r'^([a-zA-Z\s]+)[:\-]\s*\$?([0-9]+(?:\.[0-9]{1,2})?)$', line) if match: item = match.group(1).strip().lower() price = float(match.group(2)) prices[item] = price return prices if prices else None def _apply_price_replies(db, mobile: str, prices: Dict[str, float]) -> str: """ Update price_each on stock_batches for named items. Also apply any pending_discounts for those items. Returns confirmation message. """ from utility import get_user_currency, apply_price_override currency = get_user_currency(db, mobile) or "" updated = [] for item_name, price in prices.items(): try: # Update all unprice stock_batches for this item batches = db.collection("users").document(mobile) .collection("stock_batches") .where("name", "==", item_name).get() for batch in batches: if batch.to_dict().get("price_each") is None: batch.reference.update({"price_each": price}) # Check if there's a pending discount to apply disc_ref = db.collection("users").document(mobile) .collection("pending_discounts").document(item_name) disc_doc = disc_ref.get() if disc_doc.exists: disc_pct = disc_doc.to_dict().get("discount_pct", 0) new_p = apply_price_override(db, mobile, item_name, price, disc_pct) updated.append(f"*{item_name.title()}*: {currency}{price} ({disc_pct}% discount active → {currency}{new_p})") disc_ref.delete() else: updated.append(f"*{item_name.title()}*: {currency}{price}") except Exception as e: logger.warning(f"_apply_price_replies failed for {item_name}: {e}") if not updated: return "No matching stock items found to update." body = "\n".join(f" {u}" for u in updated) return "\U0001f4b0 Prices saved:\n" + body def _looks_like_name(text: str) -> bool: """ Heuristic: is this text a person's name? Names: 2-4 words, each capitalised, no digits, no punctuation. """ words = text.strip().split() if not (2 <= len(words) <= 4): return False return all( w[0].isupper() and w.isalpha() for w in words ) def _try_fill_missing_details(reply_text: str, transactions: list): """ Try to fill missing details from a user reply. Handles: - Amounts: "total 45", "R45", "45 paid 50" - Names (party/lender/customer): "Siyabonga Dlamini", "John Smith" Returns (updated_transactions, still_missing_prompt) or None if not a fill reply. """ text = reply_text.strip() # Check if this looks like a name answer is_name = _looks_like_name(text) # Check for amounts total_m = re.search( r'(?:total|for|amount|price)?\s*[r$]?(\d+(?:\.\d{1,2})?)', text.lower() ) paid_m = re.search( r'(?:paid|customer paid|received)\s*[r$]?(\d+(?:\.\d{1,2})?)', text.lower() ) # Neither a name nor an amount — not a fill reply if not is_name and not total_m and not paid_m: return None for txn in transactions: details = txn.get("details", {}) txn_type = txn.get("transaction_type", "").lower() # Fill name into appropriate field if is_name: if txn_type in ("loan",): details.setdefault("party", text) elif txn_type == "sale": details.setdefault("customer", text) elif txn_type == "expense": details.setdefault("party", text) else: details.setdefault("party", text) # Fill amounts current_amount = details.get("amount") or details.get("total") amount_ok = False if current_amount is not None: try: amount_ok = float(str(current_amount)) > 0 except Exception: pass if total_m and not amount_ok: details["amount"] = float(total_m.group(1)) if paid_m: details["amount_paid"] = float(paid_m.group(1)) txn["details"] = details still_missing = _check_missing_details(transactions) return transactions, still_missing def _check_missing_details(transactions: list, currency: str = "?") -> Optional[str]: """ Check each transaction for missing critical fields. Returns a prompt string if anything is missing, None if complete. Required by type: sale : amount (total) — prompt if absent AND no item-level prices stock_in : quantity per item (usually present); price optional (prompted after confirm) expense : amount + category asset : amount + description loan : amount + party """ prompts = [] for txn in transactions: txn_type = txn.get("transaction_type", "").lower() details = txn.get("details", {}) items = details.get("items", []) amount = details.get("amount") or details.get("total") cur = currency if currency and currency not in ("?", "null", "None") else "" # Validate amount is a real number amount_ok = False if amount is not None: try: v = float(str(amount)) amount_ok = v > 0 except (ValueError, TypeError): pass if txn_type == "sale": desc = details.get("description", "") or ( ", ".join(f"{it.get('quantity','')} {it.get('item','')}" for it in items) if items else "items" ) if not amount_ok: amt_prompt = f"How much did you sell {desc} for in total?" if not details.get("amount_paid"): amt_prompt += f"\nReply: total {cur}[amount] and paid {cur}[amount]" else: amt_prompt += f"\nReply: total {cur}[amount]" prompts.append(amt_prompt) elif txn_type == "expense": desc = details.get("description", "this expense") if not amount_ok: prompts.append(f"How much did {desc} cost? Reply: {cur}[amount]") if not details.get("category"): prompts.append( "What category is this expense? " "Reply: rent | transport | airtime | wages | packaging | other" ) elif txn_type == "asset": desc = details.get("description", "this asset") if not amount_ok: prompts.append(f"How much did {desc} cost? Reply: {cur}[amount]") elif txn_type == "loan": if not amount_ok: prompts.append(f"What is the loan amount? Reply: {cur}[amount]") if not details.get("party"): prompts.append("Who is the lender/borrower? Reply with their name.") if not prompts: return None lines = ["I need a few more details before saving:"] for i, p in enumerate(prompts, 1): lines.append(f"\n{i}. {p}") return "\n".join(lines) def _parse_price_reply(text: str) -> Optional[Dict[str, float]]: """Parse price reply: 'banana: 3.50\norange: 4'""" if text.strip().lower() == "skip": return {} lines = [l.strip() for l in text.strip().splitlines() if l.strip()] prices = {} for line in lines: match = re.match(r'^([a-zA-Z\s]+)[:\-]\s*\$?([0-9]+(?:\.[0-9]{1,2})?)$', line) if match: prices[match.group(1).strip().lower()] = float(match.group(2)) return prices if prices else None def _apply_price_replies(db, mobile: str, prices: Dict[str, float]) -> str: from utility import get_user_currency, apply_price_override currency = get_user_currency(db, mobile) or "" updated = [] for item_name, price in prices.items(): try: batches = db.collection("users").document(mobile) .collection("stock_batches") .where("name", "==", item_name).get() for batch in batches: if batch.to_dict().get("price_each") is None: batch.reference.update({"price_each": price}) disc_ref = db.collection("users").document(mobile) .collection("pending_discounts").document(item_name) disc_doc = disc_ref.get() if disc_doc.exists: disc_pct = disc_doc.to_dict().get("discount_pct", 0) new_p = apply_price_override(db, mobile, item_name, price, disc_pct) updated.append(f"*{item_name.title()}*: {currency}{price} " f"({disc_pct}% discount active → {currency}{new_p})") disc_ref.delete() else: updated.append(f"*{item_name.title()}*: {currency}{price}") except Exception as e: logger.warning(f"_apply_price_replies {item_name}: {e}") if not updated: return "No matching stock items found." items_joined = "\n".join(f" {u}" for u in updated) return "\U0001f4b0 Prices saved:\n" + items_joined # ── Deduplication ────────────────────────────────────────────────────────────── PROCESSED_MESSAGES_TTL_HOURS = 24 class MessageDeduplicator: def __init__(self, ttl_hours=24, max_cache_size=10000, db_client=None): self.ttl_seconds = ttl_hours * 3600 self.max_cache_size = max_cache_size self.db_client = db_client self.cache = OrderedDict() self.lock = threading.RLock() threading.Thread(target=self._periodic_cleanup, daemon=True).start() def is_duplicate(self, message_id): if not message_id: return False with self.lock: if message_id in self.cache: self.cache.move_to_end(message_id) return True if self.db_client: try: doc = self.db_client.collection("processed_messages").document(message_id).get() if doc.exists: self.cache[message_id] = time() if len(self.cache) > self.max_cache_size: self.cache.popitem(last=False) return True except Exception as e: logger.error(f"is_duplicate DB error: {e}") self._mark_processed(message_id) return False def _mark_processed(self, message_id): with self.lock: self.cache[message_id] = time() if len(self.cache) > self.max_cache_size: self.cache.popitem(last=False) if self.db_client: try: expiry = datetime.now() + timedelta(hours=self.ttl_seconds / 3600) self.db_client.collection("processed_messages").document(message_id).set( {"processed_at": firestore.SERVER_TIMESTAMP, "expires_at": expiry} ) except Exception as e: logger.error(f"_mark_processed DB error: {e}") def _periodic_cleanup(self): while True: try: with self.lock: now = time() expired = [mid for mid, ts in list(self.cache.items()) if now - ts > self.ttl_seconds] for mid in expired: self.cache.pop(mid, None) threading.Event().wait(3600) except Exception as e: logger.error(f"cleanup thread error: {e}") threading.Event().wait(300) message_deduplicator = MessageDeduplicator( ttl_hours=PROCESSED_MESSAGES_TTL_HOURS, db_client=db ) def check_and_mark_processed(message_id: str) -> bool: if not message_id: logger.warning("Empty message ID") return False return message_deduplicator.is_duplicate(message_id) # ── Auth ─────────────────────────────────────────────────────────────────────── def is_user_approved(mobile: str) -> Tuple[bool, Optional[Dict]]: if not db: logger.error("Firestore not available for auth") return False, None try: normalized = mobile if mobile.startswith("+") else f"+{mobile}" logger.info(f"AUTHORIZATION: Checking approval for mobile: '{mobile}'") doc = db.collection("users").document(normalized).get() if doc.exists: data = doc.to_dict() if data.get("status", "").lower() == "approved": return True, data return False, None except Exception as e: logger.error(f"is_user_approved error for {mobile}: {e}", exc_info=True) return False, None # ── Button senders ───────────────────────────────────────────────────────────── def send_confirmation_buttons(mobile: str, message_summary: str, is_variance: bool = False) -> None: if is_variance: buttons = [ {"reply": {"id": "confirm_resolved", "title": "✅ Settled"}}, {"reply": {"id": "confirm_unresolved", "title": "⚠️ Pending"}}, {"reply": {"id": "cancel_transaction", "title": "❌ Cancel"}}, ] else: buttons = [ {"reply": {"id": "confirm_transaction", "title": "✅ Confirm"}}, {"reply": {"id": "cancel_transaction", "title": "❌ Cancel"}}, ] # Summary is pre-formatted by format_transaction_response — send directly # WhatsApp button body max 1024 chars body = message_summary[:1020] + "..." if len(message_summary) > 1024 else message_summary wa_client.send_reply_buttons(recipient_id=mobile, body_text=body, button_data=buttons) def send_image_intent_buttons(mobile: str) -> None: """Ask vendor whether the image is stock-in or a sale — shown when no caption.""" wa_client.send_reply_buttons( recipient_id=mobile, body_text="What is this image for?", button_data=[ {"reply": {"id": "image_stock_in", "title": "📦 Stock In"}}, {"reply": {"id": "image_record_sale", "title": "💰 Record Sale"}}, ], ) def send_discount_buttons(mobile: str, item: Dict, idx: int) -> None: """ Send a discount suggestion button per flagged item. No prices shown — discount % only. Price set later by user. """ name = item["name"] disc = item["discount_pct"] quality = item["quality"] qty = item["quantity"] icon = "🔴" if quality == "urgent_move" else "🟠" body = ( f"{icon} *{name.title()}* — {qty} units\n" f"Quality: {quality.replace('_', ' ')}\n" f"Suggested discount: *{disc}% off* selling price\n\n" f"Apply this discount for the next 24 hours?" ) wa_client.send_reply_buttons( recipient_id=mobile, body_text=body, button_data=[ {"reply": {"id": f"discount_confirm_{idx}_{name}", "title": "✅ Apply Discount"}}, {"reply": {"id": f"discount_skip_{idx}_{name}", "title": "⏭️ Skip"}}, ], ) # ── Interactive response handler ─────────────────────────────────────────────── def handle_interactive_response(mobile: str, button_id: str) -> None: if not db: wa_client.send_text_message(mobile, "Database unavailable — cannot process.") return # ── Stock in / Sale intent buttons ──────────────────────────────────────── if button_id in ("image_stock_in", "image_record_sale"): image_bytes, caption = retrieve_pending_image(mobile, db) if not image_bytes: wa_client.send_text_message(mobile, "Couldn't find your image — please send it again.") return mode = "stock_in" if button_id == "image_stock_in" else "sale" wa_client.send_text_message(mobile, "Analysing your image... 🔍") result = process_image_and_generate_query(image_bytes, caption, mode=mode) _handle_vision_result(mobile, result, caption) return # ── Inventory confirm ───────────────────────────────────────────────────── if button_id == "confirm_inventory": inv_ref = db.collection("users").document(mobile) \ .collection("temp_inventory").document("pending") inv_doc = inv_ref.get() if not inv_doc.exists: wa_client.send_text_message(mobile, "No pending inventory found.") return inventory_data = inv_doc.to_dict().get("inventory", {}) inv_ref.delete() # Get user currency for price prompt from utility import get_user_currency currency = get_user_currency(db, mobile) or "?" msg = confirm_stock_in(db, mobile, inventory_data, currency=currency) wa_client.send_text_message(mobile, msg) # Send discount suggestion buttons for quality-flagged items _, discount_items = format_inventory_message(inventory_data) for i, item in enumerate(discount_items): send_discount_buttons(mobile, item, i) return # ── Discount buttons ────────────────────────────────────────────────────── if button_id.startswith("discount_confirm_"): # Format: discount_confirm_{idx}_{item_name} parts = button_id.split("_", 3) if len(parts) == 4: item_name = parts[3] inv_ref = db.collection("users").document(mobile) .collection("temp_inventory").document("discount_meta") meta_doc = inv_ref.get() disc = 0 if meta_doc.exists: meta = meta_doc.to_dict() disc = meta.get(item_name, {}).get("discount_pct", 20) # Look up actual price from stock_batches from utility import _lookup_item_price price = _lookup_item_price(db, mobile, item_name) if price: new_p = apply_price_override(db, mobile, item_name, price, disc) from utility import get_user_currency cur = get_user_currency(db, mobile) or "" wa_client.send_text_message( mobile, f"✅ Discount applied: *{item_name.title()}* → {cur}{new_p} for 24h ({disc}% off)." ) else: # No price in DB yet — store discount pct, apply when price is set db.collection("users").document(mobile) .collection("pending_discounts").document(item_name).set({ "item_name": item_name, "discount_pct": disc, "created_at": firestore.SERVER_TIMESTAMP, }) wa_client.send_text_message( mobile, f"✅ {disc}% discount noted for *{item_name.title()}*. " f"I'll apply it once you set the selling price." ) return if button_id.startswith("discount_skip_"): parts = button_id.split("_", 3) item_name = parts[3] if len(parts) == 4 else "item" wa_client.send_text_message(mobile, f"⏭️ Skipped discount for {item_name.title()}.") return # ── Transaction confirm / cancel ────────────────────────────────────────── doc_ref = db.collection("users").document(mobile) \ .collection("temp_transactions").document("pending") try: txn_doc = doc_ref.get() if not txn_doc.exists: wa_client.send_text_message(mobile, "No transaction waiting for confirmation.") return pending = txn_doc.to_dict().get("transactions", []) if not pending: wa_client.send_text_message(mobile, "Issue with pending transaction data.") doc_ref.delete() return if button_id == "confirm_transaction": result = process_intent(pending, mobile, force_settled=False) elif button_id == "confirm_resolved": result = process_intent(pending, mobile, force_settled=True) elif button_id == "confirm_unresolved": result = process_intent(pending, mobile, force_settled=False) elif button_id == "confirm_reset": result = process_intent(pending, mobile, force_settled=False) elif button_id == "cancel_transaction": result = "Transaction cancelled." else: result = "Unrecognised button." doc_ref.delete() # Handle TX_IDS tuple — send summary then IDs as separate messages if isinstance(result, tuple) and len(result) == 3 and result[0] == "TX_IDS": _, summary, tx_ids = result wa_client.send_text_message(mobile, summary) if tx_ids: id_block = "\n".join(tx_ids) label = "Transaction IDs" if len(tx_ids) > 1 else "Transaction ID" wa_client.send_text_message( mobile, f"\U0001f194 *{label}* (tap to copy):\n{id_block}" ) except Exception as e: logger.error(f"handle_interactive_response error for {mobile}: {e}", exc_info=True) wa_client.send_text_message(mobile, "Something went wrong handling your confirmation.") # ── Vision result dispatcher ─────────────────────────────────────────────────── def _handle_vision_result(mobile: str, result: str, caption: Optional[str]) -> None: """ Route vision output: - INVENTORY_JSON prefix → show count summary + store for confirmation - Plain text → send to NLP pipeline as transaction description - File path → upload as image """ inventory_data = parse_inventory_json(result) if inventory_data: # Format and send inventory summary summary, discount_items = format_inventory_message(inventory_data) wa_client.send_text_message(mobile, summary) # Store inventory + discount metadata for button confirmations db.collection("users").document(mobile) \ .collection("temp_inventory").document("pending").set({ "inventory": inventory_data, "created_at": firestore.SERVER_TIMESTAMP, }) # Store discount metadata keyed by item name if discount_items: meta = { item["name"]: { "original_price": item["original_price"], "discount_pct": item["discount_pct"], "new_price": item["new_price"], } for item in discount_items } db.collection("users").document(mobile) \ .collection("temp_inventory").document("discount_meta").set(meta) # Ask vendor to confirm stock-in wa_client.send_reply_buttons( recipient_id=mobile, body_text="Save this as new stock?", button_data=[ {"reply": {"id": "confirm_inventory", "title": "✅ Save Stock"}}, {"reply": {"id": "cancel_transaction", "title": "❌ Cancel"}}, ], ) # Send discount buttons for flagged items for i, item in enumerate(discount_items): send_discount_buttons(mobile, item, i) elif result.startswith("Error:"): wa_client.send_text_message(mobile, result) elif os.path.isfile(result) and HEADERS_IMGUR: # Chart file — upload to Imgur and send try: with open(result, "rb") as f: resp = requests.post(URL_IMGUR, headers=HEADERS_IMGUR, files={"image": f}) resp.raise_for_status() imgur_data = resp.json() if imgur_data.get("success"): wa_client.send_image_message(recipient_id=mobile, image_url=imgur_data["data"]["link"]) os.remove(result) return except Exception as e: logger.error(f"Imgur upload failed: {e}") wa_client.send_text_message(mobile, result) if os.path.exists(result): os.remove(result) else: # Sale mode: try direct per-item parser first (avoids NLP collapsing items) currency = get_user_currency(db, mobile) or "?" sale_txns = parse_vision_sale_transactions(result, currency=currency) if sale_txns: # Direct parsed — skip generateResponse, go straight to confirmation if persist_temporary_transaction(sale_txns, mobile): summary = format_transaction_response(sale_txns) has_variance = any( "amount_paid" in t.get("details", {}) for t in sale_txns ) send_confirmation_buttons(mobile, summary, is_variance=has_variance) else: wa_client.send_text_message(mobile, "Sorry, could not save your sale.") else: # Fallback — plain text through NLP pipeline process_text_message(result, mobile) def _extract_clean_message(message_text: Optional[str]) -> str: """ Clean WhatsApp text before sending it into the NLP pipeline. Main purpose: - Prevent crashes when users paste/forward the bot's Transaction ID block. - Remove WhatsApp markdown around transaction IDs. - Preserve ordinary business messages unchanged. Examples handled: "🆔 *Transaction ID* (tap to copy):\nABC123" "Reverse this 🆔 *Transaction ID* (tap to copy):\nABC123" "Transaction IDs:\nABC123\nDEF456" """ if message_text is None: return "" text = str(message_text).strip() if not text: return "" # Remove invisible/control characters that sometimes arrive from WhatsApp copy/paste. text = re.sub(r"[\u200b\u200c\u200d\ufeff]", "", text) # Only do aggressive cleanup for transaction-ID style messages. tx_label_re = re.compile(r"transaction\s+ids?", re.IGNORECASE) if not tx_label_re.search(text): return text # Capture likely transaction IDs. Keep this broad because the utility layer may # generate IDs with dashes/underscores, not only UUIDs. tx_ids = re.findall(r"\b[A-Za-z0-9][A-Za-z0-9_-]{5,}\b", text) # Remove common label/UX text, icons and WhatsApp markdown. cleaned = text cleaned = re.sub(r"[🆔*]", "", cleaned) cleaned = re.sub(r"transaction\s+ids?", "", cleaned, flags=re.IGNORECASE) cleaned = re.sub(r"\(?\s*tap\s+to\s+copy\s*\)?", "", cleaned, flags=re.IGNORECASE) cleaned = re.sub(r"[:\-]+", " ", cleaned) # Remove IDs from the instruction portion, then add them back cleanly once. instruction = cleaned for tx_id in tx_ids: instruction = instruction.replace(tx_id, " ") instruction = re.sub(r"\s+", " ", instruction).strip() if tx_ids: unique_ids = list(dict.fromkeys(tx_ids)) id_text = " ".join(unique_ids) return f"{instruction} {id_text}".strip() if instruction else id_text # Fallback: return a readable cleaned message instead of failing. return re.sub(r"\s+", " ", cleaned).strip() or text # ── Text message processor ───────────────────────────────────────────────────── def process_text_message(message_text: str, mobile: str, user_settings: Optional[Dict] = None) -> Optional[str]: logger.info(f"Processing text message from {mobile}: '{message_text}'") # Extract raw TX ID if user forwarded/pasted a transaction ID message # Format: "🆔 *Transaction ID* (tap to copy):\nABC123" # or the user may include surrounding text like "Reverse this [ID]" message_text = _extract_clean_message(message_text) # Language sandwich lang_data = detect_and_translate_input(message_text) english_text = lang_data.get("english_text", message_text) detected_lang = lang_data.get("detected_lang", "English") logger.info(f"Detected language: {detected_lang}. Process text: {english_text}") # Short-term context: resolve pronouns using last 3 exchanges context_history = _load_context(db, mobile) english_text = _resolve_context(english_text, context_history) # Store this user message for future context resolution _store_context(db, mobile, "user", message_text) # ── Perishable nudge (prepend if any stock expiring) ────────────────────── nudge = check_expiry_nudge(db, mobile) if GREETING_PATTERN.match(english_text): base_msg = "Hi there! I'm Qx-SmartLedger, your business assistant. How can I help?" if nudge: base_msg = nudge + "\n\n" + base_msg final_msg = translate_output(base_msg, detected_lang) wa_client.send_text_message(mobile, final_msg) return final_msg # ── Currency resolution — order matters ────────────────────────────────── # 1. Check if THIS message is a currency reply first (breaks the loop) currency_reply = _parse_currency_reply(english_text) if currency_reply: set_user_currency(db, mobile, currency_reply) reply = f"Got it — I'll use *{currency_reply}* for all your transactions. Now, how can I help?" final = translate_output(reply, detected_lang) wa_client.send_text_message(mobile, final) return final # 2. Load stored currency user_currency = get_user_currency(db, mobile) if not user_currency and user_settings: user_currency = (user_settings.get("currency") or user_settings.get("settings", {}).get("currency")) # 3. If still no currency and this isn't a greeting, prompt once if not user_currency and not GREETING_PATTERN.match(english_text): wa_client.send_text_message(mobile, CURRENCY_PROMPT) return None # 4. Check if this message is a price reply (e.g. "banana: 3.50\norange: 4") price_reply = _parse_price_reply(english_text) if price_reply is not None: if price_reply == {}: msg = translate_output("No problem — you can set prices anytime by telling me.", detected_lang) wa_client.send_text_message(mobile, msg) return msg msg = _apply_price_replies(db, mobile, price_reply) final = translate_output(msg, detected_lang) wa_client.send_text_message(mobile, final) return final # 5. Check if this message provides missing details for a pending transaction # e.g. "total 45" or "paid 50" after bot asked for amount pending_ref = db.collection("users").document(mobile) .collection("temp_transactions").document("pending") pending_doc = pending_ref.get() if pending_doc.exists: missing_fill = _try_fill_missing_details( english_text, pending_doc.to_dict().get("transactions", []) ) if missing_fill is not None: filled_txns, still_missing = missing_fill pending_ref.set({"transactions": filled_txns, "created_at": firestore.SERVER_TIMESTAMP}) if still_missing: msg = translate_output(still_missing, detected_lang) wa_client.send_text_message(mobile, msg) return msg # All details now present — show confirmation summary = format_transaction_response(filled_txns) has_variance = any("amount_paid" in t.get("details", {}) for t in filled_txns) is_sale = filled_txns[0].get("transaction_type") == "sale" send_confirmation_buttons(mobile, translate_output(summary, detected_lang), is_variance=has_variance and is_sale) return None llm_response_str = generateResponse(english_text, currency=user_currency) parsed_trans_data = parse_multiple_transactions(llm_response_str) response_msg = "" send_image = False image_path = None if not parsed_trans_data: response_msg = "Sorry, I couldn't quite understand that. Could you rephrase?" else: primary_intent = parsed_trans_data[0].get("intent", "").lower() primary_type = parsed_trans_data[0].get("transaction_type", "").lower() if primary_intent == "read" or primary_type == "query": # Pass currency into transaction details for codegen context for t in parsed_trans_data: t.setdefault("details", {})["currency"] = user_currency response_data = process_intent(parsed_trans_data, mobile) # Route based on result type if isinstance(response_data, tuple) and len(response_data) == 3 and response_data[0] == "TX_IDS": # Shouldn't happen on read, but handle gracefully _, summary, tx_ids = response_data response_msg = summary elif isinstance(response_data, tuple) and len(response_data) == 2: # Plot: ("PLOT:filepath", insight_string) plot_path, insight = response_data if isinstance(plot_path, str) and plot_path.startswith("PLOT:"): plot_path = plot_path[5:] if isinstance(plot_path, str) and os.path.isfile(plot_path): send_image = True image_path = plot_path # Insight sent as follow-up after the image if insight: response_msg = f"💡 {insight}" else: # File missing — fall back to insight text response_msg = insight or "Chart could not be generated." else: response_msg = str(response_data) if response_data else "No data found." elif primary_intent in ("create", "update", "delete", "reset_account"): if primary_intent == "reset_account": # Never execute reset directly — always require explicit confirmation if persist_temporary_transaction(parsed_trans_data, mobile): warning = ( "⚠️ *This will permanently delete ALL your transactions, " "stock records, and price history.*\n\n" "This cannot be undone. Are you sure?" ) warning_translated = translate_output(warning, detected_lang) wa_client.send_reply_buttons( recipient_id=mobile, body_text=warning_translated, button_data=[ {"reply": {"id": "confirm_reset", "title": "🗑️ Yes, Delete All"}}, {"reply": {"id": "cancel_transaction", "title": "❌ Cancel"}}, ], ) if nudge: wa_client.send_text_message(mobile, nudge) return None else: response_msg = "Could not process your request." else: # Check for missing required details before confirming missing_prompt = _check_missing_details(parsed_trans_data, user_currency) if missing_prompt: # Store partially-parsed transaction and ask for missing info persist_temporary_transaction(parsed_trans_data, mobile) final_prompt = translate_output(missing_prompt, detected_lang) wa_client.send_text_message(mobile, final_prompt) return None if persist_temporary_transaction(parsed_trans_data, mobile): transaction_summary = format_transaction_response(parsed_trans_data) has_payment_input = any( "amount_paid" in t.get("details", {}) for t in parsed_trans_data ) is_variance = ( has_payment_input and primary_intent == "create" and primary_type == "sale" ) trans_summary_translated = translate_output(transaction_summary, detected_lang) send_confirmation_buttons(mobile, trans_summary_translated, is_variance=is_variance) if nudge: wa_client.send_text_message(mobile, nudge) return None else: response_msg = "Sorry, I couldn't save your transaction for confirmation." else: response_msg = f"I'm not sure how to handle '{primary_intent}'." # Prepend nudge to response if nudge and response_msg: response_msg = nudge + "\n\n" + response_msg if response_msg: final_response = translate_output(response_msg, detected_lang) if send_image and image_path: try: if not HEADERS_IMGUR: # No Imgur configured — skip image, send insight text only logger.warning("Imgur not configured — sending insight text only") wa_client.send_text_message(mobile, final_response or "Chart generated but Imgur not configured.") if os.path.exists(image_path): os.remove(image_path) return final_response with open(image_path, "rb") as f: resp = requests.post(URL_IMGUR, headers=HEADERS_IMGUR, files={"image": f}) resp.raise_for_status() imgur_data = resp.json() if imgur_data.get("success"): wa_client.send_image_message(recipient_id=mobile, image_url=imgur_data["data"]["link"]) os.remove(image_path) # Send insight as follow-up text if present if final_response and final_response.startswith("💡"): wa_client.send_text_message(mobile, final_response) return final_response or None else: wa_client.send_text_message(mobile, final_response) os.remove(image_path) return final_response except Exception as e: logger.error(f"Image upload failed: {e}", exc_info=True) wa_client.send_text_message(mobile, final_response or "Chart could not be sent.") if os.path.exists(image_path): os.remove(image_path) return final_response else: wa_client.send_text_message(mobile, final_response) # Store bot response for future context _store_context(db, mobile, "bot", final_response) return final_response return None # ── Audio processor ──────────────────────────────────────────────────────────── def _deepgram_tts_to_mp3(text: str) -> Optional[str]: if not DEEPGRAM_API_KEY: return None capped = cap_for_tts(text) # Hard cap before sending to Deepgram try: resp = requests.post( DEEPGRAM_TTS_URL, headers={"Authorization": f"Token {DEEPGRAM_API_KEY}", "Content-Type": "application/json"}, json={"text": capped}, timeout=30, ) resp.raise_for_status() filepath = os.path.join(os.getcwd(), f"tts_{uuid.uuid4()}.mp3") with open(filepath, "wb") as f: f.write(resp.content) return filepath except Exception as e: logger.error(f"DeepGram TTS failed: {e}", exc_info=True) return None def _upload_to_firebase_storage(file_path: str) -> Optional[str]: try: bucket = storage.bucket() blob = bucket.blob(f"audio_responses/{os.path.basename(file_path)}") blob.upload_from_filename(file_path) url = blob.generate_signed_url(expiration=timedelta(hours=1)) return url except Exception as e: logger.error(f"Firebase Storage upload failed: {e}", exc_info=True) return None def process_audio_message(audio_id: str, mobile: str, user_settings: Optional[Dict]) -> None: if not transcriber: wa_client.send_text_message(mobile, "Audio processing is unavailable right now.") return media_url = wa_client.get_media_url(audio_id) if not media_url: wa_client.send_text_message(mobile, "Couldn't retrieve your audio.") return os.makedirs("temp_audio", exist_ok=True) audio_path = os.path.join("temp_audio", f"{mobile}_{audio_id}.ogg") downloaded = wa_client.download_media(media_url, audio_path) if not downloaded: wa_client.send_text_message(mobile, "Couldn't download your audio.") return try: transcript = transcriber.transcribe(downloaded) if transcript.status == aai.TranscriptStatus.error: wa_client.send_text_message(mobile, f"Transcription error: {transcript.error}") elif not transcript.text: wa_client.send_text_message(mobile, "Couldn't understand the audio.") else: text_response = process_text_message(transcript.text, mobile, user_settings) if text_response: mp3_path = _deepgram_tts_to_mp3(text_response) if mp3_path: audio_url = _upload_to_firebase_storage(mp3_path) if audio_url: wa_client.send_audio_message(mobile, audio_url=audio_url) else: wa_client.send_audio_message(mobile, audio_path=mp3_path) if os.path.exists(mp3_path): os.remove(mp3_path) finally: if os.path.exists(downloaded): os.remove(downloaded) # ── Image processor ──────────────────────────────────────────────────────────── def process_image_message(image_id: str, caption: Optional[str], mobile: str, user_settings: Optional[Dict]) -> None: logger.info(f"Processing image (ID: {image_id}) from {mobile}, caption: '{caption}'") wa_client.send_text_message(mobile, "Got your image — analysing... 🔍") media_url = wa_client.get_media_url(image_id) if not media_url: wa_client.send_text_message(mobile, "Couldn't retrieve your image from WhatsApp.") return os.makedirs("temp_images", exist_ok=True) image_path = os.path.join("temp_images", f"{mobile}_{image_id}.jpg") downloaded = wa_client.download_media(media_url, image_path) if not downloaded: wa_client.send_text_message(mobile, "Couldn't download your image.") return try: with open(downloaded, "rb") as f: image_bytes = f.read() # If no caption — ask vendor what this image is for if not caption or not caption.strip(): persist_pending_image(mobile, image_bytes, caption, db) send_image_intent_buttons(mobile) return # Caption present — infer mode and process directly result = process_image_and_generate_query(image_bytes, caption, mode="auto") _handle_vision_result(mobile, result, caption) except Exception as e: logger.error(f"Vision processing error: {e}", exc_info=True) wa_client.send_text_message(mobile, "Something went wrong analysing your image.") finally: if os.path.exists(downloaded): try: os.remove(downloaded) except Exception: pass # ── Webhook ──────────────────────────────────────────────────────────────────── @app.route("/", methods=["GET", "POST"]) def webhook_handler(): if request.method == "GET": mode = request.args.get("hub.mode") token = request.args.get("hub.verify_token") challenge = request.args.get("hub.challenge") if mode == "subscribe" and token == VERIFY_TOKEN: return make_response(challenge, 200) return make_response("Verification failed", 403) if request.method == "POST": try: data = request.get_json() msg_details = wa_client.get_message_details(data) if not msg_details: return make_response("ok", 200) message_id = msg_details.get("id") mobile = msg_details.get("from") message_type = msg_details.get("type") if check_and_mark_processed(message_id): return make_response("ok - duplicate", 200) is_approved, user_data = is_user_approved(mobile) if not is_approved: wa_client.send_text_message(mobile, "Access denied. Please contact your administrator.") return make_response("ok", 200) if message_type == "text": process_text_message(msg_details.get("text"), mobile, user_data) elif message_type == "audio": process_audio_message(msg_details.get("audio_id"), mobile, user_data) elif message_type == "image": process_image_message(msg_details.get("image_id"), msg_details.get("caption"), mobile, user_data) elif message_type == "interactive": handle_interactive_response(mobile, msg_details.get("button_reply_id")) except Exception as e: logger.error(f"Unhandled exception in webhook: {e}", exc_info=True) return make_response("ok", 200) # ── Entry point ──────────────────────────────────────────────────────────────── if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) debug_mode = os.environ.get("FLASK_DEBUG", "False").lower() == "true" print(f"===== Application Startup at {datetime.now()} =====") if not debug_mode: from waitress import serve serve(app, host="0.0.0.0", port=port) else: app.run(debug=True, host="0.0.0.0", port=port)