smart-w / main_fixed.py
rairo's picture
Upload main_fixed.py
ccba1da verified
from flask import Flask, request, make_response
import os
import logging
import re
import sys
import json
import uuid
import threading
from collections import OrderedDict
from time import time
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, Tuple, List
from dotenv import load_dotenv
import assemblyai as aai
import requests
import dns.resolver
import socket
from utility import (
generateResponse,
parse_multiple_transactions,
parse_vision_sale_transactions,
process_image_and_generate_query,
parse_inventory_json,
format_inventory_message,
confirm_stock_in,
persist_temporary_transaction,
persist_pending_image,
retrieve_pending_image,
process_intent,
format_transaction_response,
detect_and_translate_input,
translate_output,
cap_for_tts,
check_expiry_nudge,
apply_price_override,
get_user_currency,
set_user_currency,
CURRENCY_PROMPT,
)
import whatsapp_client as wa_client
load_dotenv()
# ── Config ─────────────────────────────────────────────────────────────────────
IMGUR_CLIENT_ID = os.getenv("IMGUR_CLIENT_ID")
URL_IMGUR = "https://api.imgur.com/3/image"
HEADERS_IMGUR = {"Authorization": f"Client-ID {IMGUR_CLIENT_ID}"} if IMGUR_CLIENT_ID else {}
DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY")
DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak?model=aura-asteria-en"
# AssemblyAI
try:
aai.settings.api_key = os.environ["aai_key"]
transcriber = aai.Transcriber()
except KeyError:
transcriber = None
logging.warning("AAI_KEY not found β€” audio transcription disabled.")
except Exception as e:
transcriber = None
logging.error(f"Failed to initialise AssemblyAI: {e}")
# Firebase
import firebase_admin
from firebase_admin import credentials, firestore, storage
def init_firestore_from_env(env_var: str = "FIREBASE"):
try:
if firebase_admin._apps:
return firestore.client()
sa_json = os.environ[env_var]
sa_info = json.loads(sa_json)
cred = credentials.Certificate(sa_info)
bucket = os.environ.get("FIREBASE_STORAGE_BUCKET")
firebase_admin.initialize_app(cred, {"storageBucket": bucket})
return firestore.client()
except KeyError as e:
logging.error("%s env var not set", e); raise
except Exception as e:
logging.exception("Failed to initialise Firestore: %s", e); raise
db = init_firestore_from_env()
# Wire Firestore into whatsapp_client (kept for compat)
wa_client.set_db(db)
app = Flask(__name__)
# ── Logging ────────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(module)s.%(funcName)s - %(message)s",
stream=sys.stdout,
force=True,
)
logger = logging.getLogger(__name__)
# ── DNS setup ──────────────────────────────────────────────────────────────────
nameserver1 = os.getenv("nameserver1", "8.8.8.8")
nameserver2 = os.getenv("nameserver2", "8.8.4.4")
def setup_dns() -> None:
try:
resolver = dns.resolver.Resolver()
resolver.nameservers = [nameserver1, nameserver2]
overrides = {}
for host in ["graph.facebook.com"]:
try:
ip = str(resolver.resolve(host, "A")[0])
overrides[host] = ip
logger.info(f"DNS override: {host} -> {ip}")
except Exception as e:
logger.warning(f"Could not resolve {host}: {e}")
proxy_url = os.getenv("WHATSAPP_PROXY_URL", "").strip()
if proxy_url:
from urllib.parse import urlparse as _up
proxy_host = _up(proxy_url).hostname
if proxy_host and proxy_host not in overrides:
try:
ip = str(resolver.resolve(proxy_host, "A")[0])
overrides[proxy_host] = ip
logger.info(f"DNS override: {proxy_host} -> {ip}")
except Exception as e:
logger.warning(f"Could not resolve proxy host {proxy_host}: {e}")
if overrides:
wa_client.configure_session(overrides)
except Exception as e:
logger.warning(f"DNS setup failed: {e}")
setup_dns()
# ── Constants ──────────────────────────────────────────────────────────────────
VERIFY_TOKEN = os.environ.get("VERIFY_TOKEN", "30cca545-3838-48b2-80a7-9e43b1ae8ce4")
GREETING_PATTERN = re.compile(r'^\s*(hi|hello|hola|hey|greetings|sawubona)\b.*$', re.IGNORECASE)
# Known currency codes/symbols for reply detection
_KNOWN_CURRENCIES = {
"r": "R", "zar": "R", "rand": "R", "rands": "R",
"south africa rand": "R", "south african rand": "R",
"sa rand": "R", "sa rands": "R",
"usd": "USD", "$": "USD", "dollar": "USD", "dollars": "USD",
"us dollar": "USD", "us dollars": "USD", "american dollar": "USD",
"zwg": "ZWG", "zig": "ZWG", "zimbabwe gold": "ZWG",
"kes": "KES", "ksh": "KES", "shilling": "KES", "kenyan shilling": "KES",
"ngn": "NGN", "naira": "NGN", "nigerian naira": "NGN",
"ghs": "GHS", "cedi": "GHS", "ghana cedi": "GHS",
"eur": "EUR", "euro": "EUR", "euros": "EUR",
"gbp": "GBP", "pound": "GBP", "pounds": "GBP",
"mwk": "MWK", "kwacha": "MWK", "malawi kwacha": "MWK",
"tzs": "TZS", "tshs": "TZS", "tanzanian shilling": "TZS",
"ugx": "UGX", "uganda shilling": "UGX",
"zamk": "ZMW", "zmw": "ZMW", "zambian kwacha": "ZMW",
"bwp": "BWP", "pula": "BWP", "botswana pula": "BWP",
"mzn": "MZN", "metical": "MZN",
}
# Ordered phrase list for mid-sentence currency detection
_CURRENCY_PHRASES = [
("south african rand", "R"), ("south africa rand", "R"),
("sa rands", "R"), ("sa rand", "R"),
("us dollars", "USD"), ("us dollar", "USD"), ("american dollar", "USD"),
("zimbabwe gold", "ZWG"),
("kenyan shilling", "KES"),
("nigerian naira", "NGN"),
("ghana cedi", "GHS"),
("zambian kwacha", "ZMW"),
("botswana pula", "BWP"),
("tanzanian shilling", "TZS"),
("malawi kwacha", "MWK"),
("rands", "R"), ("rand", "R"),
("dollars", "USD"), ("dollar", "USD"),
("euros", "EUR"), ("euro", "EUR"),
("pounds", "GBP"), ("pound", "GBP"),
("kwacha", "MWK"),
("pula", "BWP"),
("shilling", "KES"),
("naira", "NGN"),
("cedi", "GHS"),
("metical", "MZN"),
]
def _parse_currency_reply(text: str) -> Optional[str]:
"""
Detect currency from any natural language phrasing.
Handles: 'R', 'USD', 'Rands', 'South Africa Rands',
'I would like to use South African Rand', 'Please use USD'.
"""
t = text.strip().lower().rstrip(".")
# Direct lookup
if t in _KNOWN_CURRENCIES:
return _KNOWN_CURRENCIES[t]
# Phrase match anywhere in text (handles "please use south africa rands")
for phrase, code in _CURRENCY_PHRASES:
if phrase in t:
return code
# Single short token that looks like a currency code
if re.match(r'^[a-z$£€₦₡]{1,4}$', t):
return t.upper()
return None
def _store_context(db, mobile: str, role: str, text: str) -> None:
"""
Store the last exchange in Firestore for short-term context.
Keeps only the last 3 user+bot pairs (6 messages).
role: "user" | "bot"
"""
try:
col = db.collection("users").document(mobile).collection("context")
col.add({
"role": role,
"text": text[:500], # cap to avoid Firestore bloat
"ts": firestore.SERVER_TIMESTAMP,
})
# Prune to last 6 messages
docs = list(col.order_by("ts", direction=firestore.Query.DESCENDING).limit(20).get())
if len(docs) > 6:
for doc in docs[6:]:
doc.reference.delete()
except Exception as e:
logger.warning(f"_store_context: {e}")
def _load_context(db, mobile: str) -> List[Dict]:
"""
Load last 3 user+bot exchanges as [{role, text}] oldest-first.
Used to resolve pronouns like 'it', 'that', 'this', 'them'.
"""
try:
docs = db.collection("users").document(mobile).collection("context") .order_by("ts", direction=firestore.Query.DESCENDING) .limit(6).get()
return [{"role": d.to_dict()["role"], "text": d.to_dict()["text"]}
for d in reversed(docs)]
except Exception as e:
logger.warning(f"_load_context: {e}")
return []
# Pronoun patterns that signal the current message refers to something
# mentioned in a previous exchange
_PRONOUN_RE = re.compile(
r'\b(it|its|that|this|them|they|those|these|the same|same one|'
r'that one|this one|that transaction|this transaction|'
r'the item|the product|the sale|the loan|the expense)\b',
re.IGNORECASE
)
def _resolve_context(text: str, context: List[Dict]) -> str:
"""
If the current message contains pronouns/references that need context,
prepend a brief context hint so the NLP can resolve them.
Only adds context if pronouns are detected β€” avoids polluting clean messages.
"""
if not context or not _PRONOUN_RE.search(text):
return text
# Build a concise context hint from the last 3 exchanges
# Only include user messages (what they were talking about)
user_msgs = [c["text"] for c in context if c["role"] == "user"][-3:]
if not user_msgs:
return text
hint = "Recent context: " + " | ".join(user_msgs)
# Cap hint length
if len(hint) > 200:
hint = hint[:200] + "..."
return f"[{hint}]\n\nCurrent message: {text}"
def _parse_price_reply(text: str) -> Optional[Dict[str, float]]:
"""
Parse a price reply like:
banana: 3.50
orange: 4
apple: 5.00
Returns {item_name: price} or None if not a price reply.
"""
if text.strip().lower() == "skip":
return {} # Empty dict = skip signal
lines = [l.strip() for l in text.strip().splitlines() if l.strip()]
prices = {}
for line in lines:
# Match "item: price" or "item - price" or "item 3.50"
match = re.match(r'^([a-zA-Z\s]+)[:\-]\s*\$?([0-9]+(?:\.[0-9]{1,2})?)$', line)
if match:
item = match.group(1).strip().lower()
price = float(match.group(2))
prices[item] = price
return prices if prices else None
def _apply_price_replies(db, mobile: str, prices: Dict[str, float]) -> str:
"""
Update price_each on stock_batches for named items.
Also apply any pending_discounts for those items.
Returns confirmation message.
"""
from utility import get_user_currency, apply_price_override
currency = get_user_currency(db, mobile) or ""
updated = []
for item_name, price in prices.items():
try:
# Update all unprice stock_batches for this item
batches = db.collection("users").document(mobile) .collection("stock_batches") .where("name", "==", item_name).get()
for batch in batches:
if batch.to_dict().get("price_each") is None:
batch.reference.update({"price_each": price})
# Check if there's a pending discount to apply
disc_ref = db.collection("users").document(mobile) .collection("pending_discounts").document(item_name)
disc_doc = disc_ref.get()
if disc_doc.exists:
disc_pct = disc_doc.to_dict().get("discount_pct", 0)
new_p = apply_price_override(db, mobile, item_name, price, disc_pct)
updated.append(f"*{item_name.title()}*: {currency}{price} ({disc_pct}% discount active β†’ {currency}{new_p})")
disc_ref.delete()
else:
updated.append(f"*{item_name.title()}*: {currency}{price}")
except Exception as e:
logger.warning(f"_apply_price_replies failed for {item_name}: {e}")
if not updated:
return "No matching stock items found to update."
body = "\n".join(f" {u}" for u in updated)
return "\U0001f4b0 Prices saved:\n" + body
def _looks_like_name(text: str) -> bool:
"""
Heuristic: is this text a person's name?
Names: 2-4 words, each capitalised, no digits, no punctuation.
"""
words = text.strip().split()
if not (2 <= len(words) <= 4):
return False
return all(
w[0].isupper() and w.isalpha()
for w in words
)
def _try_fill_missing_details(reply_text: str, transactions: list):
"""
Try to fill missing details from a user reply.
Handles:
- Amounts: "total 45", "R45", "45 paid 50"
- Names (party/lender/customer): "Siyabonga Dlamini", "John Smith"
Returns (updated_transactions, still_missing_prompt) or None if not a fill reply.
"""
text = reply_text.strip()
# Check if this looks like a name answer
is_name = _looks_like_name(text)
# Check for amounts
total_m = re.search(
r'(?:total|for|amount|price)?\s*[r$]?(\d+(?:\.\d{1,2})?)',
text.lower()
)
paid_m = re.search(
r'(?:paid|customer paid|received)\s*[r$]?(\d+(?:\.\d{1,2})?)',
text.lower()
)
# Neither a name nor an amount β€” not a fill reply
if not is_name and not total_m and not paid_m:
return None
for txn in transactions:
details = txn.get("details", {})
txn_type = txn.get("transaction_type", "").lower()
# Fill name into appropriate field
if is_name:
if txn_type in ("loan",):
details.setdefault("party", text)
elif txn_type == "sale":
details.setdefault("customer", text)
elif txn_type == "expense":
details.setdefault("party", text)
else:
details.setdefault("party", text)
# Fill amounts
current_amount = details.get("amount") or details.get("total")
amount_ok = False
if current_amount is not None:
try:
amount_ok = float(str(current_amount)) > 0
except Exception:
pass
if total_m and not amount_ok:
details["amount"] = float(total_m.group(1))
if paid_m:
details["amount_paid"] = float(paid_m.group(1))
txn["details"] = details
still_missing = _check_missing_details(transactions)
return transactions, still_missing
def _check_missing_details(transactions: list, currency: str = "?") -> Optional[str]:
"""
Check each transaction for missing critical fields.
Returns a prompt string if anything is missing, None if complete.
Required by type:
sale : amount (total) β€” prompt if absent AND no item-level prices
stock_in : quantity per item (usually present); price optional (prompted after confirm)
expense : amount + category
asset : amount + description
loan : amount + party
"""
prompts = []
for txn in transactions:
txn_type = txn.get("transaction_type", "").lower()
details = txn.get("details", {})
items = details.get("items", [])
amount = details.get("amount") or details.get("total")
cur = currency if currency and currency not in ("?", "null", "None") else ""
# Validate amount is a real number
amount_ok = False
if amount is not None:
try:
v = float(str(amount))
amount_ok = v > 0
except (ValueError, TypeError):
pass
if txn_type == "sale":
desc = details.get("description", "") or (
", ".join(f"{it.get('quantity','')} {it.get('item','')}"
for it in items) if items else "items"
)
if not amount_ok:
amt_prompt = f"How much did you sell {desc} for in total?"
if not details.get("amount_paid"):
amt_prompt += f"\nReply: total {cur}[amount] and paid {cur}[amount]"
else:
amt_prompt += f"\nReply: total {cur}[amount]"
prompts.append(amt_prompt)
elif txn_type == "expense":
desc = details.get("description", "this expense")
if not amount_ok:
prompts.append(f"How much did {desc} cost? Reply: {cur}[amount]")
if not details.get("category"):
prompts.append(
"What category is this expense? "
"Reply: rent | transport | airtime | wages | packaging | other"
)
elif txn_type == "asset":
desc = details.get("description", "this asset")
if not amount_ok:
prompts.append(f"How much did {desc} cost? Reply: {cur}[amount]")
elif txn_type == "loan":
if not amount_ok:
prompts.append(f"What is the loan amount? Reply: {cur}[amount]")
if not details.get("party"):
prompts.append("Who is the lender/borrower? Reply with their name.")
if not prompts:
return None
lines = ["I need a few more details before saving:"]
for i, p in enumerate(prompts, 1):
lines.append(f"\n{i}. {p}")
return "\n".join(lines)
def _parse_price_reply(text: str) -> Optional[Dict[str, float]]:
"""Parse price reply: 'banana: 3.50\norange: 4'"""
if text.strip().lower() == "skip":
return {}
lines = [l.strip() for l in text.strip().splitlines() if l.strip()]
prices = {}
for line in lines:
match = re.match(r'^([a-zA-Z\s]+)[:\-]\s*\$?([0-9]+(?:\.[0-9]{1,2})?)$', line)
if match:
prices[match.group(1).strip().lower()] = float(match.group(2))
return prices if prices else None
def _apply_price_replies(db, mobile: str, prices: Dict[str, float]) -> str:
from utility import get_user_currency, apply_price_override
currency = get_user_currency(db, mobile) or ""
updated = []
for item_name, price in prices.items():
try:
batches = db.collection("users").document(mobile) .collection("stock_batches") .where("name", "==", item_name).get()
for batch in batches:
if batch.to_dict().get("price_each") is None:
batch.reference.update({"price_each": price})
disc_ref = db.collection("users").document(mobile) .collection("pending_discounts").document(item_name)
disc_doc = disc_ref.get()
if disc_doc.exists:
disc_pct = disc_doc.to_dict().get("discount_pct", 0)
new_p = apply_price_override(db, mobile, item_name, price, disc_pct)
updated.append(f"*{item_name.title()}*: {currency}{price} "
f"({disc_pct}% discount active β†’ {currency}{new_p})")
disc_ref.delete()
else:
updated.append(f"*{item_name.title()}*: {currency}{price}")
except Exception as e:
logger.warning(f"_apply_price_replies {item_name}: {e}")
if not updated:
return "No matching stock items found."
items_joined = "\n".join(f" {u}" for u in updated)
return "\U0001f4b0 Prices saved:\n" + items_joined
# ── Deduplication ──────────────────────────────────────────────────────────────
PROCESSED_MESSAGES_TTL_HOURS = 24
class MessageDeduplicator:
def __init__(self, ttl_hours=24, max_cache_size=10000, db_client=None):
self.ttl_seconds = ttl_hours * 3600
self.max_cache_size = max_cache_size
self.db_client = db_client
self.cache = OrderedDict()
self.lock = threading.RLock()
threading.Thread(target=self._periodic_cleanup, daemon=True).start()
def is_duplicate(self, message_id):
if not message_id: return False
with self.lock:
if message_id in self.cache:
self.cache.move_to_end(message_id)
return True
if self.db_client:
try:
doc = self.db_client.collection("processed_messages").document(message_id).get()
if doc.exists:
self.cache[message_id] = time()
if len(self.cache) > self.max_cache_size:
self.cache.popitem(last=False)
return True
except Exception as e:
logger.error(f"is_duplicate DB error: {e}")
self._mark_processed(message_id)
return False
def _mark_processed(self, message_id):
with self.lock:
self.cache[message_id] = time()
if len(self.cache) > self.max_cache_size:
self.cache.popitem(last=False)
if self.db_client:
try:
expiry = datetime.now() + timedelta(hours=self.ttl_seconds / 3600)
self.db_client.collection("processed_messages").document(message_id).set(
{"processed_at": firestore.SERVER_TIMESTAMP, "expires_at": expiry}
)
except Exception as e:
logger.error(f"_mark_processed DB error: {e}")
def _periodic_cleanup(self):
while True:
try:
with self.lock:
now = time()
expired = [mid for mid, ts in list(self.cache.items())
if now - ts > self.ttl_seconds]
for mid in expired:
self.cache.pop(mid, None)
threading.Event().wait(3600)
except Exception as e:
logger.error(f"cleanup thread error: {e}")
threading.Event().wait(300)
message_deduplicator = MessageDeduplicator(
ttl_hours=PROCESSED_MESSAGES_TTL_HOURS, db_client=db
)
def check_and_mark_processed(message_id: str) -> bool:
if not message_id:
logger.warning("Empty message ID")
return False
return message_deduplicator.is_duplicate(message_id)
# ── Auth ───────────────────────────────────────────────────────────────────────
def is_user_approved(mobile: str) -> Tuple[bool, Optional[Dict]]:
if not db:
logger.error("Firestore not available for auth")
return False, None
try:
normalized = mobile if mobile.startswith("+") else f"+{mobile}"
logger.info(f"AUTHORIZATION: Checking approval for mobile: '{mobile}'")
doc = db.collection("users").document(normalized).get()
if doc.exists:
data = doc.to_dict()
if data.get("status", "").lower() == "approved":
return True, data
return False, None
except Exception as e:
logger.error(f"is_user_approved error for {mobile}: {e}", exc_info=True)
return False, None
# ── Button senders ─────────────────────────────────────────────────────────────
def send_confirmation_buttons(mobile: str, message_summary: str,
is_variance: bool = False) -> None:
if is_variance:
buttons = [
{"reply": {"id": "confirm_resolved", "title": "βœ… Settled"}},
{"reply": {"id": "confirm_unresolved", "title": "⚠️ Pending"}},
{"reply": {"id": "cancel_transaction", "title": "❌ Cancel"}},
]
else:
buttons = [
{"reply": {"id": "confirm_transaction", "title": "βœ… Confirm"}},
{"reply": {"id": "cancel_transaction", "title": "❌ Cancel"}},
]
# Summary is pre-formatted by format_transaction_response β€” send directly
# WhatsApp button body max 1024 chars
body = message_summary[:1020] + "..." if len(message_summary) > 1024 else message_summary
wa_client.send_reply_buttons(recipient_id=mobile, body_text=body, button_data=buttons)
def send_image_intent_buttons(mobile: str) -> None:
"""Ask vendor whether the image is stock-in or a sale β€” shown when no caption."""
wa_client.send_reply_buttons(
recipient_id=mobile,
body_text="What is this image for?",
button_data=[
{"reply": {"id": "image_stock_in", "title": "πŸ“¦ Stock In"}},
{"reply": {"id": "image_record_sale", "title": "πŸ’° Record Sale"}},
],
)
def send_discount_buttons(mobile: str, item: Dict, idx: int) -> None:
"""
Send a discount suggestion button per flagged item.
No prices shown β€” discount % only. Price set later by user.
"""
name = item["name"]
disc = item["discount_pct"]
quality = item["quality"]
qty = item["quantity"]
icon = "πŸ”΄" if quality == "urgent_move" else "🟠"
body = (
f"{icon} *{name.title()}* β€” {qty} units\n"
f"Quality: {quality.replace('_', ' ')}\n"
f"Suggested discount: *{disc}% off* selling price\n\n"
f"Apply this discount for the next 24 hours?"
)
wa_client.send_reply_buttons(
recipient_id=mobile,
body_text=body,
button_data=[
{"reply": {"id": f"discount_confirm_{idx}_{name}", "title": "βœ… Apply Discount"}},
{"reply": {"id": f"discount_skip_{idx}_{name}", "title": "⏭️ Skip"}},
],
)
# ── Interactive response handler ───────────────────────────────────────────────
def handle_interactive_response(mobile: str, button_id: str) -> None:
if not db:
wa_client.send_text_message(mobile, "Database unavailable β€” cannot process.")
return
# ── Stock in / Sale intent buttons ────────────────────────────────────────
if button_id in ("image_stock_in", "image_record_sale"):
image_bytes, caption = retrieve_pending_image(mobile, db)
if not image_bytes:
wa_client.send_text_message(mobile, "Couldn't find your image β€” please send it again.")
return
mode = "stock_in" if button_id == "image_stock_in" else "sale"
wa_client.send_text_message(mobile, "Analysing your image... πŸ”")
result = process_image_and_generate_query(image_bytes, caption, mode=mode)
_handle_vision_result(mobile, result, caption)
return
# ── Inventory confirm ─────────────────────────────────────────────────────
if button_id == "confirm_inventory":
inv_ref = db.collection("users").document(mobile) \
.collection("temp_inventory").document("pending")
inv_doc = inv_ref.get()
if not inv_doc.exists:
wa_client.send_text_message(mobile, "No pending inventory found.")
return
inventory_data = inv_doc.to_dict().get("inventory", {})
inv_ref.delete()
# Get user currency for price prompt
from utility import get_user_currency
currency = get_user_currency(db, mobile) or "?"
msg = confirm_stock_in(db, mobile, inventory_data, currency=currency)
wa_client.send_text_message(mobile, msg)
# Send discount suggestion buttons for quality-flagged items
_, discount_items = format_inventory_message(inventory_data)
for i, item in enumerate(discount_items):
send_discount_buttons(mobile, item, i)
return
# ── Discount buttons ──────────────────────────────────────────────────────
if button_id.startswith("discount_confirm_"):
# Format: discount_confirm_{idx}_{item_name}
parts = button_id.split("_", 3)
if len(parts) == 4:
item_name = parts[3]
inv_ref = db.collection("users").document(mobile) .collection("temp_inventory").document("discount_meta")
meta_doc = inv_ref.get()
disc = 0
if meta_doc.exists:
meta = meta_doc.to_dict()
disc = meta.get(item_name, {}).get("discount_pct", 20)
# Look up actual price from stock_batches
from utility import _lookup_item_price
price = _lookup_item_price(db, mobile, item_name)
if price:
new_p = apply_price_override(db, mobile, item_name, price, disc)
from utility import get_user_currency
cur = get_user_currency(db, mobile) or ""
wa_client.send_text_message(
mobile,
f"βœ… Discount applied: *{item_name.title()}* β†’ {cur}{new_p} for 24h ({disc}% off)."
)
else:
# No price in DB yet β€” store discount pct, apply when price is set
db.collection("users").document(mobile) .collection("pending_discounts").document(item_name).set({
"item_name": item_name,
"discount_pct": disc,
"created_at": firestore.SERVER_TIMESTAMP,
})
wa_client.send_text_message(
mobile,
f"βœ… {disc}% discount noted for *{item_name.title()}*. "
f"I'll apply it once you set the selling price."
)
return
if button_id.startswith("discount_skip_"):
parts = button_id.split("_", 3)
item_name = parts[3] if len(parts) == 4 else "item"
wa_client.send_text_message(mobile, f"⏭️ Skipped discount for {item_name.title()}.")
return
# ── Transaction confirm / cancel ──────────────────────────────────────────
doc_ref = db.collection("users").document(mobile) \
.collection("temp_transactions").document("pending")
try:
txn_doc = doc_ref.get()
if not txn_doc.exists:
wa_client.send_text_message(mobile, "No transaction waiting for confirmation.")
return
pending = txn_doc.to_dict().get("transactions", [])
if not pending:
wa_client.send_text_message(mobile, "Issue with pending transaction data.")
doc_ref.delete()
return
if button_id == "confirm_transaction":
result = process_intent(pending, mobile, force_settled=False)
elif button_id == "confirm_resolved":
result = process_intent(pending, mobile, force_settled=True)
elif button_id == "confirm_unresolved":
result = process_intent(pending, mobile, force_settled=False)
elif button_id == "confirm_reset":
result = process_intent(pending, mobile, force_settled=False)
elif button_id == "cancel_transaction":
result = "Transaction cancelled."
else:
result = "Unrecognised button."
doc_ref.delete()
# Handle TX_IDS tuple β€” send summary then IDs as separate messages
if isinstance(result, tuple) and len(result) == 3 and result[0] == "TX_IDS":
_, summary, tx_ids = result
wa_client.send_text_message(mobile, summary)
if tx_ids:
id_block = "\n".join(tx_ids)
label = "Transaction IDs" if len(tx_ids) > 1 else "Transaction ID"
wa_client.send_text_message(
mobile,
f"\U0001f194 *{label}* (tap to copy):\n{id_block}"
)
except Exception as e:
logger.error(f"handle_interactive_response error for {mobile}: {e}", exc_info=True)
wa_client.send_text_message(mobile, "Something went wrong handling your confirmation.")
# ── Vision result dispatcher ───────────────────────────────────────────────────
def _handle_vision_result(mobile: str, result: str, caption: Optional[str]) -> None:
"""
Route vision output:
- INVENTORY_JSON prefix β†’ show count summary + store for confirmation
- Plain text β†’ send to NLP pipeline as transaction description
- File path β†’ upload as image
"""
inventory_data = parse_inventory_json(result)
if inventory_data:
# Format and send inventory summary
summary, discount_items = format_inventory_message(inventory_data)
wa_client.send_text_message(mobile, summary)
# Store inventory + discount metadata for button confirmations
db.collection("users").document(mobile) \
.collection("temp_inventory").document("pending").set({
"inventory": inventory_data,
"created_at": firestore.SERVER_TIMESTAMP,
})
# Store discount metadata keyed by item name
if discount_items:
meta = {
item["name"]: {
"original_price": item["original_price"],
"discount_pct": item["discount_pct"],
"new_price": item["new_price"],
}
for item in discount_items
}
db.collection("users").document(mobile) \
.collection("temp_inventory").document("discount_meta").set(meta)
# Ask vendor to confirm stock-in
wa_client.send_reply_buttons(
recipient_id=mobile,
body_text="Save this as new stock?",
button_data=[
{"reply": {"id": "confirm_inventory", "title": "βœ… Save Stock"}},
{"reply": {"id": "cancel_transaction", "title": "❌ Cancel"}},
],
)
# Send discount buttons for flagged items
for i, item in enumerate(discount_items):
send_discount_buttons(mobile, item, i)
elif result.startswith("Error:"):
wa_client.send_text_message(mobile, result)
elif os.path.isfile(result) and HEADERS_IMGUR:
# Chart file β€” upload to Imgur and send
try:
with open(result, "rb") as f:
resp = requests.post(URL_IMGUR, headers=HEADERS_IMGUR, files={"image": f})
resp.raise_for_status()
imgur_data = resp.json()
if imgur_data.get("success"):
wa_client.send_image_message(recipient_id=mobile,
image_url=imgur_data["data"]["link"])
os.remove(result)
return
except Exception as e:
logger.error(f"Imgur upload failed: {e}")
wa_client.send_text_message(mobile, result)
if os.path.exists(result): os.remove(result)
else:
# Sale mode: try direct per-item parser first (avoids NLP collapsing items)
currency = get_user_currency(db, mobile) or "?"
sale_txns = parse_vision_sale_transactions(result, currency=currency)
if sale_txns:
# Direct parsed β€” skip generateResponse, go straight to confirmation
if persist_temporary_transaction(sale_txns, mobile):
summary = format_transaction_response(sale_txns)
has_variance = any(
"amount_paid" in t.get("details", {}) for t in sale_txns
)
send_confirmation_buttons(mobile, summary, is_variance=has_variance)
else:
wa_client.send_text_message(mobile, "Sorry, could not save your sale.")
else:
# Fallback β€” plain text through NLP pipeline
process_text_message(result, mobile)
def _extract_clean_message(message_text: Optional[str]) -> str:
"""
Clean WhatsApp text before sending it into the NLP pipeline.
Main purpose:
- Prevent crashes when users paste/forward the bot's Transaction ID block.
- Remove WhatsApp markdown around transaction IDs.
- Preserve ordinary business messages unchanged.
Examples handled:
"πŸ†” *Transaction ID* (tap to copy):\nABC123"
"Reverse this πŸ†” *Transaction ID* (tap to copy):\nABC123"
"Transaction IDs:\nABC123\nDEF456"
"""
if message_text is None:
return ""
text = str(message_text).strip()
if not text:
return ""
# Remove invisible/control characters that sometimes arrive from WhatsApp copy/paste.
text = re.sub(r"[\u200b\u200c\u200d\ufeff]", "", text)
# Only do aggressive cleanup for transaction-ID style messages.
tx_label_re = re.compile(r"transaction\s+ids?", re.IGNORECASE)
if not tx_label_re.search(text):
return text
# Capture likely transaction IDs. Keep this broad because the utility layer may
# generate IDs with dashes/underscores, not only UUIDs.
tx_ids = re.findall(r"\b[A-Za-z0-9][A-Za-z0-9_-]{5,}\b", text)
# Remove common label/UX text, icons and WhatsApp markdown.
cleaned = text
cleaned = re.sub(r"[πŸ†”*]", "", cleaned)
cleaned = re.sub(r"transaction\s+ids?", "", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"\(?\s*tap\s+to\s+copy\s*\)?", "", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"[:\-]+", " ", cleaned)
# Remove IDs from the instruction portion, then add them back cleanly once.
instruction = cleaned
for tx_id in tx_ids:
instruction = instruction.replace(tx_id, " ")
instruction = re.sub(r"\s+", " ", instruction).strip()
if tx_ids:
unique_ids = list(dict.fromkeys(tx_ids))
id_text = " ".join(unique_ids)
return f"{instruction} {id_text}".strip() if instruction else id_text
# Fallback: return a readable cleaned message instead of failing.
return re.sub(r"\s+", " ", cleaned).strip() or text
# ── Text message processor ─────────────────────────────────────────────────────
def process_text_message(message_text: str, mobile: str,
user_settings: Optional[Dict] = None) -> Optional[str]:
logger.info(f"Processing text message from {mobile}: '{message_text}'")
# Extract raw TX ID if user forwarded/pasted a transaction ID message
# Format: "πŸ†” *Transaction ID* (tap to copy):\nABC123"
# or the user may include surrounding text like "Reverse this [ID]"
message_text = _extract_clean_message(message_text)
# Language sandwich
lang_data = detect_and_translate_input(message_text)
english_text = lang_data.get("english_text", message_text)
detected_lang = lang_data.get("detected_lang", "English")
logger.info(f"Detected language: {detected_lang}. Process text: {english_text}")
# Short-term context: resolve pronouns using last 3 exchanges
context_history = _load_context(db, mobile)
english_text = _resolve_context(english_text, context_history)
# Store this user message for future context resolution
_store_context(db, mobile, "user", message_text)
# ── Perishable nudge (prepend if any stock expiring) ──────────────────────
nudge = check_expiry_nudge(db, mobile)
if GREETING_PATTERN.match(english_text):
base_msg = "Hi there! I'm Qx-SmartLedger, your business assistant. How can I help?"
if nudge:
base_msg = nudge + "\n\n" + base_msg
final_msg = translate_output(base_msg, detected_lang)
wa_client.send_text_message(mobile, final_msg)
return final_msg
# ── Currency resolution β€” order matters ──────────────────────────────────
# 1. Check if THIS message is a currency reply first (breaks the loop)
currency_reply = _parse_currency_reply(english_text)
if currency_reply:
set_user_currency(db, mobile, currency_reply)
reply = f"Got it β€” I'll use *{currency_reply}* for all your transactions. Now, how can I help?"
final = translate_output(reply, detected_lang)
wa_client.send_text_message(mobile, final)
return final
# 2. Load stored currency
user_currency = get_user_currency(db, mobile)
if not user_currency and user_settings:
user_currency = (user_settings.get("currency") or
user_settings.get("settings", {}).get("currency"))
# 3. If still no currency and this isn't a greeting, prompt once
if not user_currency and not GREETING_PATTERN.match(english_text):
wa_client.send_text_message(mobile, CURRENCY_PROMPT)
return None
# 4. Check if this message is a price reply (e.g. "banana: 3.50\norange: 4")
price_reply = _parse_price_reply(english_text)
if price_reply is not None:
if price_reply == {}:
msg = translate_output("No problem β€” you can set prices anytime by telling me.", detected_lang)
wa_client.send_text_message(mobile, msg)
return msg
msg = _apply_price_replies(db, mobile, price_reply)
final = translate_output(msg, detected_lang)
wa_client.send_text_message(mobile, final)
return final
# 5. Check if this message provides missing details for a pending transaction
# e.g. "total 45" or "paid 50" after bot asked for amount
pending_ref = db.collection("users").document(mobile) .collection("temp_transactions").document("pending")
pending_doc = pending_ref.get()
if pending_doc.exists:
missing_fill = _try_fill_missing_details(
english_text, pending_doc.to_dict().get("transactions", [])
)
if missing_fill is not None:
filled_txns, still_missing = missing_fill
pending_ref.set({"transactions": filled_txns,
"created_at": firestore.SERVER_TIMESTAMP})
if still_missing:
msg = translate_output(still_missing, detected_lang)
wa_client.send_text_message(mobile, msg)
return msg
# All details now present β€” show confirmation
summary = format_transaction_response(filled_txns)
has_variance = any("amount_paid" in t.get("details", {}) for t in filled_txns)
is_sale = filled_txns[0].get("transaction_type") == "sale"
send_confirmation_buttons(mobile, translate_output(summary, detected_lang),
is_variance=has_variance and is_sale)
return None
llm_response_str = generateResponse(english_text, currency=user_currency)
parsed_trans_data = parse_multiple_transactions(llm_response_str)
response_msg = ""
send_image = False
image_path = None
if not parsed_trans_data:
response_msg = "Sorry, I couldn't quite understand that. Could you rephrase?"
else:
primary_intent = parsed_trans_data[0].get("intent", "").lower()
primary_type = parsed_trans_data[0].get("transaction_type", "").lower()
if primary_intent == "read" or primary_type == "query":
# Pass currency into transaction details for codegen context
for t in parsed_trans_data:
t.setdefault("details", {})["currency"] = user_currency
response_data = process_intent(parsed_trans_data, mobile)
# Route based on result type
if isinstance(response_data, tuple) and len(response_data) == 3 and response_data[0] == "TX_IDS":
# Shouldn't happen on read, but handle gracefully
_, summary, tx_ids = response_data
response_msg = summary
elif isinstance(response_data, tuple) and len(response_data) == 2:
# Plot: ("PLOT:filepath", insight_string)
plot_path, insight = response_data
if isinstance(plot_path, str) and plot_path.startswith("PLOT:"):
plot_path = plot_path[5:]
if isinstance(plot_path, str) and os.path.isfile(plot_path):
send_image = True
image_path = plot_path
# Insight sent as follow-up after the image
if insight:
response_msg = f"πŸ’‘ {insight}"
else:
# File missing β€” fall back to insight text
response_msg = insight or "Chart could not be generated."
else:
response_msg = str(response_data) if response_data else "No data found."
elif primary_intent in ("create", "update", "delete", "reset_account"):
if primary_intent == "reset_account":
# Never execute reset directly β€” always require explicit confirmation
if persist_temporary_transaction(parsed_trans_data, mobile):
warning = (
"⚠️ *This will permanently delete ALL your transactions, "
"stock records, and price history.*\n\n"
"This cannot be undone. Are you sure?"
)
warning_translated = translate_output(warning, detected_lang)
wa_client.send_reply_buttons(
recipient_id=mobile,
body_text=warning_translated,
button_data=[
{"reply": {"id": "confirm_reset", "title": "πŸ—‘οΈ Yes, Delete All"}},
{"reply": {"id": "cancel_transaction", "title": "❌ Cancel"}},
],
)
if nudge:
wa_client.send_text_message(mobile, nudge)
return None
else:
response_msg = "Could not process your request."
else:
# Check for missing required details before confirming
missing_prompt = _check_missing_details(parsed_trans_data, user_currency)
if missing_prompt:
# Store partially-parsed transaction and ask for missing info
persist_temporary_transaction(parsed_trans_data, mobile)
final_prompt = translate_output(missing_prompt, detected_lang)
wa_client.send_text_message(mobile, final_prompt)
return None
if persist_temporary_transaction(parsed_trans_data, mobile):
transaction_summary = format_transaction_response(parsed_trans_data)
has_payment_input = any(
"amount_paid" in t.get("details", {}) for t in parsed_trans_data
)
is_variance = (
has_payment_input and
primary_intent == "create" and
primary_type == "sale"
)
trans_summary_translated = translate_output(transaction_summary, detected_lang)
send_confirmation_buttons(mobile, trans_summary_translated,
is_variance=is_variance)
if nudge:
wa_client.send_text_message(mobile, nudge)
return None
else:
response_msg = "Sorry, I couldn't save your transaction for confirmation."
else:
response_msg = f"I'm not sure how to handle '{primary_intent}'."
# Prepend nudge to response
if nudge and response_msg:
response_msg = nudge + "\n\n" + response_msg
if response_msg:
final_response = translate_output(response_msg, detected_lang)
if send_image and image_path:
try:
if not HEADERS_IMGUR:
# No Imgur configured β€” skip image, send insight text only
logger.warning("Imgur not configured β€” sending insight text only")
wa_client.send_text_message(mobile, final_response or "Chart generated but Imgur not configured.")
if os.path.exists(image_path): os.remove(image_path)
return final_response
with open(image_path, "rb") as f:
resp = requests.post(URL_IMGUR, headers=HEADERS_IMGUR, files={"image": f})
resp.raise_for_status()
imgur_data = resp.json()
if imgur_data.get("success"):
wa_client.send_image_message(recipient_id=mobile,
image_url=imgur_data["data"]["link"])
os.remove(image_path)
# Send insight as follow-up text if present
if final_response and final_response.startswith("πŸ’‘"):
wa_client.send_text_message(mobile, final_response)
return final_response or None
else:
wa_client.send_text_message(mobile, final_response)
os.remove(image_path)
return final_response
except Exception as e:
logger.error(f"Image upload failed: {e}", exc_info=True)
wa_client.send_text_message(mobile, final_response or "Chart could not be sent.")
if os.path.exists(image_path): os.remove(image_path)
return final_response
else:
wa_client.send_text_message(mobile, final_response)
# Store bot response for future context
_store_context(db, mobile, "bot", final_response)
return final_response
return None
# ── Audio processor ────────────────────────────────────────────────────────────
def _deepgram_tts_to_mp3(text: str) -> Optional[str]:
if not DEEPGRAM_API_KEY:
return None
capped = cap_for_tts(text) # Hard cap before sending to Deepgram
try:
resp = requests.post(
DEEPGRAM_TTS_URL,
headers={"Authorization": f"Token {DEEPGRAM_API_KEY}",
"Content-Type": "application/json"},
json={"text": capped},
timeout=30,
)
resp.raise_for_status()
filepath = os.path.join(os.getcwd(), f"tts_{uuid.uuid4()}.mp3")
with open(filepath, "wb") as f:
f.write(resp.content)
return filepath
except Exception as e:
logger.error(f"DeepGram TTS failed: {e}", exc_info=True)
return None
def _upload_to_firebase_storage(file_path: str) -> Optional[str]:
try:
bucket = storage.bucket()
blob = bucket.blob(f"audio_responses/{os.path.basename(file_path)}")
blob.upload_from_filename(file_path)
url = blob.generate_signed_url(expiration=timedelta(hours=1))
return url
except Exception as e:
logger.error(f"Firebase Storage upload failed: {e}", exc_info=True)
return None
def process_audio_message(audio_id: str, mobile: str,
user_settings: Optional[Dict]) -> None:
if not transcriber:
wa_client.send_text_message(mobile, "Audio processing is unavailable right now.")
return
media_url = wa_client.get_media_url(audio_id)
if not media_url:
wa_client.send_text_message(mobile, "Couldn't retrieve your audio.")
return
os.makedirs("temp_audio", exist_ok=True)
audio_path = os.path.join("temp_audio", f"{mobile}_{audio_id}.ogg")
downloaded = wa_client.download_media(media_url, audio_path)
if not downloaded:
wa_client.send_text_message(mobile, "Couldn't download your audio.")
return
try:
transcript = transcriber.transcribe(downloaded)
if transcript.status == aai.TranscriptStatus.error:
wa_client.send_text_message(mobile, f"Transcription error: {transcript.error}")
elif not transcript.text:
wa_client.send_text_message(mobile, "Couldn't understand the audio.")
else:
text_response = process_text_message(transcript.text, mobile, user_settings)
if text_response:
mp3_path = _deepgram_tts_to_mp3(text_response)
if mp3_path:
audio_url = _upload_to_firebase_storage(mp3_path)
if audio_url:
wa_client.send_audio_message(mobile, audio_url=audio_url)
else:
wa_client.send_audio_message(mobile, audio_path=mp3_path)
if os.path.exists(mp3_path): os.remove(mp3_path)
finally:
if os.path.exists(downloaded): os.remove(downloaded)
# ── Image processor ────────────────────────────────────────────────────────────
def process_image_message(image_id: str, caption: Optional[str],
mobile: str, user_settings: Optional[Dict]) -> None:
logger.info(f"Processing image (ID: {image_id}) from {mobile}, caption: '{caption}'")
wa_client.send_text_message(mobile, "Got your image β€” analysing... πŸ”")
media_url = wa_client.get_media_url(image_id)
if not media_url:
wa_client.send_text_message(mobile, "Couldn't retrieve your image from WhatsApp.")
return
os.makedirs("temp_images", exist_ok=True)
image_path = os.path.join("temp_images", f"{mobile}_{image_id}.jpg")
downloaded = wa_client.download_media(media_url, image_path)
if not downloaded:
wa_client.send_text_message(mobile, "Couldn't download your image.")
return
try:
with open(downloaded, "rb") as f:
image_bytes = f.read()
# If no caption β€” ask vendor what this image is for
if not caption or not caption.strip():
persist_pending_image(mobile, image_bytes, caption, db)
send_image_intent_buttons(mobile)
return
# Caption present β€” infer mode and process directly
result = process_image_and_generate_query(image_bytes, caption, mode="auto")
_handle_vision_result(mobile, result, caption)
except Exception as e:
logger.error(f"Vision processing error: {e}", exc_info=True)
wa_client.send_text_message(mobile, "Something went wrong analysing your image.")
finally:
if os.path.exists(downloaded):
try: os.remove(downloaded)
except Exception: pass
# ── Webhook ────────────────────────────────────────────────────────────────────
@app.route("/", methods=["GET", "POST"])
def webhook_handler():
if request.method == "GET":
mode = request.args.get("hub.mode")
token = request.args.get("hub.verify_token")
challenge = request.args.get("hub.challenge")
if mode == "subscribe" and token == VERIFY_TOKEN:
return make_response(challenge, 200)
return make_response("Verification failed", 403)
if request.method == "POST":
try:
data = request.get_json()
msg_details = wa_client.get_message_details(data)
if not msg_details:
return make_response("ok", 200)
message_id = msg_details.get("id")
mobile = msg_details.get("from")
message_type = msg_details.get("type")
if check_and_mark_processed(message_id):
return make_response("ok - duplicate", 200)
is_approved, user_data = is_user_approved(mobile)
if not is_approved:
wa_client.send_text_message(mobile, "Access denied. Please contact your administrator.")
return make_response("ok", 200)
if message_type == "text":
process_text_message(msg_details.get("text"), mobile, user_data)
elif message_type == "audio":
process_audio_message(msg_details.get("audio_id"), mobile, user_data)
elif message_type == "image":
process_image_message(msg_details.get("image_id"),
msg_details.get("caption"), mobile, user_data)
elif message_type == "interactive":
handle_interactive_response(mobile, msg_details.get("button_reply_id"))
except Exception as e:
logger.error(f"Unhandled exception in webhook: {e}", exc_info=True)
return make_response("ok", 200)
# ── Entry point ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
debug_mode = os.environ.get("FLASK_DEBUG", "False").lower() == "true"
print(f"===== Application Startup at {datetime.now()} =====")
if not debug_mode:
from waitress import serve
serve(app, host="0.0.0.0", port=port)
else:
app.run(debug=True, host="0.0.0.0", port=port)