| from flask import Flask, request, make_response |
| import os |
| import logging |
| import re |
| import sys |
| import json |
| import uuid |
| import threading |
| from collections import OrderedDict |
| from time import time |
| from datetime import datetime, timedelta |
| from typing import Optional, Dict, Any, Tuple, List |
|
|
| from dotenv import load_dotenv |
| import assemblyai as aai |
| import requests |
| import dns.resolver |
| import socket |
|
|
| from utility import ( |
| generateResponse, |
| parse_multiple_transactions, |
| parse_vision_sale_transactions, |
| process_image_and_generate_query, |
| parse_inventory_json, |
| format_inventory_message, |
| confirm_stock_in, |
| persist_temporary_transaction, |
| persist_pending_image, |
| retrieve_pending_image, |
| process_intent, |
| format_transaction_response, |
| detect_and_translate_input, |
| translate_output, |
| cap_for_tts, |
| check_expiry_nudge, |
| apply_price_override, |
| get_user_currency, |
| set_user_currency, |
| CURRENCY_PROMPT, |
| ) |
| import whatsapp_client as wa_client |
|
|
| load_dotenv() |
|
|
| |
| IMGUR_CLIENT_ID = os.getenv("IMGUR_CLIENT_ID") |
| URL_IMGUR = "https://api.imgur.com/3/image" |
| HEADERS_IMGUR = {"Authorization": f"Client-ID {IMGUR_CLIENT_ID}"} if IMGUR_CLIENT_ID else {} |
|
|
| DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY") |
| DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak?model=aura-asteria-en" |
|
|
| |
| try: |
| aai.settings.api_key = os.environ["aai_key"] |
| transcriber = aai.Transcriber() |
| except KeyError: |
| transcriber = None |
| logging.warning("AAI_KEY not found β audio transcription disabled.") |
| except Exception as e: |
| transcriber = None |
| logging.error(f"Failed to initialise AssemblyAI: {e}") |
|
|
| |
| import firebase_admin |
| from firebase_admin import credentials, firestore, storage |
|
|
| def init_firestore_from_env(env_var: str = "FIREBASE"): |
| try: |
| if firebase_admin._apps: |
| return firestore.client() |
| sa_json = os.environ[env_var] |
| sa_info = json.loads(sa_json) |
| cred = credentials.Certificate(sa_info) |
| bucket = os.environ.get("FIREBASE_STORAGE_BUCKET") |
| firebase_admin.initialize_app(cred, {"storageBucket": bucket}) |
| return firestore.client() |
| except KeyError as e: |
| logging.error("%s env var not set", e); raise |
| except Exception as e: |
| logging.exception("Failed to initialise Firestore: %s", e); raise |
|
|
| db = init_firestore_from_env() |
|
|
| |
| wa_client.set_db(db) |
|
|
| app = Flask(__name__) |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s - %(levelname)s - %(module)s.%(funcName)s - %(message)s", |
| stream=sys.stdout, |
| force=True, |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| nameserver1 = os.getenv("nameserver1", "8.8.8.8") |
| nameserver2 = os.getenv("nameserver2", "8.8.4.4") |
|
|
| def setup_dns() -> None: |
| try: |
| resolver = dns.resolver.Resolver() |
| resolver.nameservers = [nameserver1, nameserver2] |
| overrides = {} |
|
|
| for host in ["graph.facebook.com"]: |
| try: |
| ip = str(resolver.resolve(host, "A")[0]) |
| overrides[host] = ip |
| logger.info(f"DNS override: {host} -> {ip}") |
| except Exception as e: |
| logger.warning(f"Could not resolve {host}: {e}") |
|
|
| proxy_url = os.getenv("WHATSAPP_PROXY_URL", "").strip() |
| if proxy_url: |
| from urllib.parse import urlparse as _up |
| proxy_host = _up(proxy_url).hostname |
| if proxy_host and proxy_host not in overrides: |
| try: |
| ip = str(resolver.resolve(proxy_host, "A")[0]) |
| overrides[proxy_host] = ip |
| logger.info(f"DNS override: {proxy_host} -> {ip}") |
| except Exception as e: |
| logger.warning(f"Could not resolve proxy host {proxy_host}: {e}") |
|
|
| if overrides: |
| wa_client.configure_session(overrides) |
| except Exception as e: |
| logger.warning(f"DNS setup failed: {e}") |
|
|
| setup_dns() |
|
|
| |
| VERIFY_TOKEN = os.environ.get("VERIFY_TOKEN", "30cca545-3838-48b2-80a7-9e43b1ae8ce4") |
| GREETING_PATTERN = re.compile(r'^\s*(hi|hello|hola|hey|greetings|sawubona)\b.*$', re.IGNORECASE) |
|
|
| |
| _KNOWN_CURRENCIES = { |
| "r": "R", "zar": "R", "rand": "R", "rands": "R", |
| "south africa rand": "R", "south african rand": "R", |
| "sa rand": "R", "sa rands": "R", |
| "usd": "USD", "$": "USD", "dollar": "USD", "dollars": "USD", |
| "us dollar": "USD", "us dollars": "USD", "american dollar": "USD", |
| "zwg": "ZWG", "zig": "ZWG", "zimbabwe gold": "ZWG", |
| "kes": "KES", "ksh": "KES", "shilling": "KES", "kenyan shilling": "KES", |
| "ngn": "NGN", "naira": "NGN", "nigerian naira": "NGN", |
| "ghs": "GHS", "cedi": "GHS", "ghana cedi": "GHS", |
| "eur": "EUR", "euro": "EUR", "euros": "EUR", |
| "gbp": "GBP", "pound": "GBP", "pounds": "GBP", |
| "mwk": "MWK", "kwacha": "MWK", "malawi kwacha": "MWK", |
| "tzs": "TZS", "tshs": "TZS", "tanzanian shilling": "TZS", |
| "ugx": "UGX", "uganda shilling": "UGX", |
| "zamk": "ZMW", "zmw": "ZMW", "zambian kwacha": "ZMW", |
| "bwp": "BWP", "pula": "BWP", "botswana pula": "BWP", |
| "mzn": "MZN", "metical": "MZN", |
| } |
|
|
| |
| _CURRENCY_PHRASES = [ |
| ("south african rand", "R"), ("south africa rand", "R"), |
| ("sa rands", "R"), ("sa rand", "R"), |
| ("us dollars", "USD"), ("us dollar", "USD"), ("american dollar", "USD"), |
| ("zimbabwe gold", "ZWG"), |
| ("kenyan shilling", "KES"), |
| ("nigerian naira", "NGN"), |
| ("ghana cedi", "GHS"), |
| ("zambian kwacha", "ZMW"), |
| ("botswana pula", "BWP"), |
| ("tanzanian shilling", "TZS"), |
| ("malawi kwacha", "MWK"), |
| ("rands", "R"), ("rand", "R"), |
| ("dollars", "USD"), ("dollar", "USD"), |
| ("euros", "EUR"), ("euro", "EUR"), |
| ("pounds", "GBP"), ("pound", "GBP"), |
| ("kwacha", "MWK"), |
| ("pula", "BWP"), |
| ("shilling", "KES"), |
| ("naira", "NGN"), |
| ("cedi", "GHS"), |
| ("metical", "MZN"), |
| ] |
|
|
|
|
| def _parse_currency_reply(text: str) -> Optional[str]: |
| """ |
| Detect currency from any natural language phrasing. |
| Handles: 'R', 'USD', 'Rands', 'South Africa Rands', |
| 'I would like to use South African Rand', 'Please use USD'. |
| """ |
| t = text.strip().lower().rstrip(".") |
|
|
| |
| if t in _KNOWN_CURRENCIES: |
| return _KNOWN_CURRENCIES[t] |
|
|
| |
| for phrase, code in _CURRENCY_PHRASES: |
| if phrase in t: |
| return code |
|
|
| |
| if re.match(r'^[a-z$Β£β¬β¦β΅]{1,4}$', t): |
| return t.upper() |
|
|
| return None |
|
|
| def _store_context(db, mobile: str, role: str, text: str) -> None: |
| """ |
| Store the last exchange in Firestore for short-term context. |
| Keeps only the last 3 user+bot pairs (6 messages). |
| role: "user" | "bot" |
| """ |
| try: |
| col = db.collection("users").document(mobile).collection("context") |
| col.add({ |
| "role": role, |
| "text": text[:500], |
| "ts": firestore.SERVER_TIMESTAMP, |
| }) |
| |
| docs = list(col.order_by("ts", direction=firestore.Query.DESCENDING).limit(20).get()) |
| if len(docs) > 6: |
| for doc in docs[6:]: |
| doc.reference.delete() |
| except Exception as e: |
| logger.warning(f"_store_context: {e}") |
|
|
|
|
| def _load_context(db, mobile: str) -> List[Dict]: |
| """ |
| Load last 3 user+bot exchanges as [{role, text}] oldest-first. |
| Used to resolve pronouns like 'it', 'that', 'this', 'them'. |
| """ |
| try: |
| docs = db.collection("users").document(mobile).collection("context") .order_by("ts", direction=firestore.Query.DESCENDING) .limit(6).get() |
| return [{"role": d.to_dict()["role"], "text": d.to_dict()["text"]} |
| for d in reversed(docs)] |
| except Exception as e: |
| logger.warning(f"_load_context: {e}") |
| return [] |
|
|
|
|
| |
| |
| _PRONOUN_RE = re.compile( |
| r'\b(it|its|that|this|them|they|those|these|the same|same one|' |
| r'that one|this one|that transaction|this transaction|' |
| r'the item|the product|the sale|the loan|the expense)\b', |
| re.IGNORECASE |
| ) |
|
|
|
|
| def _resolve_context(text: str, context: List[Dict]) -> str: |
| """ |
| If the current message contains pronouns/references that need context, |
| prepend a brief context hint so the NLP can resolve them. |
| Only adds context if pronouns are detected β avoids polluting clean messages. |
| """ |
| if not context or not _PRONOUN_RE.search(text): |
| return text |
|
|
| |
| |
| user_msgs = [c["text"] for c in context if c["role"] == "user"][-3:] |
| if not user_msgs: |
| return text |
|
|
| hint = "Recent context: " + " | ".join(user_msgs) |
| |
| if len(hint) > 200: |
| hint = hint[:200] + "..." |
|
|
| return f"[{hint}]\n\nCurrent message: {text}" |
|
|
|
|
| def _parse_price_reply(text: str) -> Optional[Dict[str, float]]: |
| """ |
| Parse a price reply like: |
| banana: 3.50 |
| orange: 4 |
| apple: 5.00 |
| Returns {item_name: price} or None if not a price reply. |
| """ |
| if text.strip().lower() == "skip": |
| return {} |
|
|
| lines = [l.strip() for l in text.strip().splitlines() if l.strip()] |
| prices = {} |
| for line in lines: |
| |
| match = re.match(r'^([a-zA-Z\s]+)[:\-]\s*\$?([0-9]+(?:\.[0-9]{1,2})?)$', line) |
| if match: |
| item = match.group(1).strip().lower() |
| price = float(match.group(2)) |
| prices[item] = price |
|
|
| return prices if prices else None |
|
|
|
|
| def _apply_price_replies(db, mobile: str, prices: Dict[str, float]) -> str: |
| """ |
| Update price_each on stock_batches for named items. |
| Also apply any pending_discounts for those items. |
| Returns confirmation message. |
| """ |
| from utility import get_user_currency, apply_price_override |
| currency = get_user_currency(db, mobile) or "" |
| updated = [] |
|
|
| for item_name, price in prices.items(): |
| try: |
| |
| batches = db.collection("users").document(mobile) .collection("stock_batches") .where("name", "==", item_name).get() |
| for batch in batches: |
| if batch.to_dict().get("price_each") is None: |
| batch.reference.update({"price_each": price}) |
|
|
| |
| disc_ref = db.collection("users").document(mobile) .collection("pending_discounts").document(item_name) |
| disc_doc = disc_ref.get() |
| if disc_doc.exists: |
| disc_pct = disc_doc.to_dict().get("discount_pct", 0) |
| new_p = apply_price_override(db, mobile, item_name, price, disc_pct) |
| updated.append(f"*{item_name.title()}*: {currency}{price} ({disc_pct}% discount active β {currency}{new_p})") |
| disc_ref.delete() |
| else: |
| updated.append(f"*{item_name.title()}*: {currency}{price}") |
| except Exception as e: |
| logger.warning(f"_apply_price_replies failed for {item_name}: {e}") |
|
|
| if not updated: |
| return "No matching stock items found to update." |
| body = "\n".join(f" {u}" for u in updated) |
| return "\U0001f4b0 Prices saved:\n" + body |
|
|
|
|
|
|
| def _looks_like_name(text: str) -> bool: |
| """ |
| Heuristic: is this text a person's name? |
| Names: 2-4 words, each capitalised, no digits, no punctuation. |
| """ |
| words = text.strip().split() |
| if not (2 <= len(words) <= 4): |
| return False |
| return all( |
| w[0].isupper() and w.isalpha() |
| for w in words |
| ) |
|
|
|
|
| def _try_fill_missing_details(reply_text: str, transactions: list): |
| """ |
| Try to fill missing details from a user reply. |
| Handles: |
| - Amounts: "total 45", "R45", "45 paid 50" |
| - Names (party/lender/customer): "Siyabonga Dlamini", "John Smith" |
| Returns (updated_transactions, still_missing_prompt) or None if not a fill reply. |
| """ |
| text = reply_text.strip() |
|
|
| |
| is_name = _looks_like_name(text) |
|
|
| |
| total_m = re.search( |
| r'(?:total|for|amount|price)?\s*[r$]?(\d+(?:\.\d{1,2})?)', |
| text.lower() |
| ) |
| paid_m = re.search( |
| r'(?:paid|customer paid|received)\s*[r$]?(\d+(?:\.\d{1,2})?)', |
| text.lower() |
| ) |
|
|
| |
| if not is_name and not total_m and not paid_m: |
| return None |
|
|
| for txn in transactions: |
| details = txn.get("details", {}) |
| txn_type = txn.get("transaction_type", "").lower() |
|
|
| |
| if is_name: |
| if txn_type in ("loan",): |
| details.setdefault("party", text) |
| elif txn_type == "sale": |
| details.setdefault("customer", text) |
| elif txn_type == "expense": |
| details.setdefault("party", text) |
| else: |
| details.setdefault("party", text) |
|
|
| |
| current_amount = details.get("amount") or details.get("total") |
| amount_ok = False |
| if current_amount is not None: |
| try: |
| amount_ok = float(str(current_amount)) > 0 |
| except Exception: |
| pass |
|
|
| if total_m and not amount_ok: |
| details["amount"] = float(total_m.group(1)) |
| if paid_m: |
| details["amount_paid"] = float(paid_m.group(1)) |
|
|
| txn["details"] = details |
|
|
| still_missing = _check_missing_details(transactions) |
| return transactions, still_missing |
|
|
|
|
| def _check_missing_details(transactions: list, currency: str = "?") -> Optional[str]: |
| """ |
| Check each transaction for missing critical fields. |
| Returns a prompt string if anything is missing, None if complete. |
| |
| Required by type: |
| sale : amount (total) β prompt if absent AND no item-level prices |
| stock_in : quantity per item (usually present); price optional (prompted after confirm) |
| expense : amount + category |
| asset : amount + description |
| loan : amount + party |
| """ |
| prompts = [] |
|
|
| for txn in transactions: |
| txn_type = txn.get("transaction_type", "").lower() |
| details = txn.get("details", {}) |
| items = details.get("items", []) |
| amount = details.get("amount") or details.get("total") |
| cur = currency if currency and currency not in ("?", "null", "None") else "" |
|
|
| |
| amount_ok = False |
| if amount is not None: |
| try: |
| v = float(str(amount)) |
| amount_ok = v > 0 |
| except (ValueError, TypeError): |
| pass |
|
|
| if txn_type == "sale": |
| desc = details.get("description", "") or ( |
| ", ".join(f"{it.get('quantity','')} {it.get('item','')}" |
| for it in items) if items else "items" |
| ) |
| if not amount_ok: |
| amt_prompt = f"How much did you sell {desc} for in total?" |
| if not details.get("amount_paid"): |
| amt_prompt += f"\nReply: total {cur}[amount] and paid {cur}[amount]" |
| else: |
| amt_prompt += f"\nReply: total {cur}[amount]" |
| prompts.append(amt_prompt) |
|
|
| elif txn_type == "expense": |
| desc = details.get("description", "this expense") |
| if not amount_ok: |
| prompts.append(f"How much did {desc} cost? Reply: {cur}[amount]") |
| if not details.get("category"): |
| prompts.append( |
| "What category is this expense? " |
| "Reply: rent | transport | airtime | wages | packaging | other" |
| ) |
|
|
| elif txn_type == "asset": |
| desc = details.get("description", "this asset") |
| if not amount_ok: |
| prompts.append(f"How much did {desc} cost? Reply: {cur}[amount]") |
|
|
| elif txn_type == "loan": |
| if not amount_ok: |
| prompts.append(f"What is the loan amount? Reply: {cur}[amount]") |
| if not details.get("party"): |
| prompts.append("Who is the lender/borrower? Reply with their name.") |
|
|
| if not prompts: |
| return None |
|
|
| lines = ["I need a few more details before saving:"] |
| for i, p in enumerate(prompts, 1): |
| lines.append(f"\n{i}. {p}") |
| return "\n".join(lines) |
|
|
|
|
| def _parse_price_reply(text: str) -> Optional[Dict[str, float]]: |
| """Parse price reply: 'banana: 3.50\norange: 4'""" |
| if text.strip().lower() == "skip": |
| return {} |
| lines = [l.strip() for l in text.strip().splitlines() if l.strip()] |
| prices = {} |
| for line in lines: |
| match = re.match(r'^([a-zA-Z\s]+)[:\-]\s*\$?([0-9]+(?:\.[0-9]{1,2})?)$', line) |
| if match: |
| prices[match.group(1).strip().lower()] = float(match.group(2)) |
| return prices if prices else None |
|
|
|
|
| def _apply_price_replies(db, mobile: str, prices: Dict[str, float]) -> str: |
| from utility import get_user_currency, apply_price_override |
| currency = get_user_currency(db, mobile) or "" |
| updated = [] |
| for item_name, price in prices.items(): |
| try: |
| batches = db.collection("users").document(mobile) .collection("stock_batches") .where("name", "==", item_name).get() |
| for batch in batches: |
| if batch.to_dict().get("price_each") is None: |
| batch.reference.update({"price_each": price}) |
| disc_ref = db.collection("users").document(mobile) .collection("pending_discounts").document(item_name) |
| disc_doc = disc_ref.get() |
| if disc_doc.exists: |
| disc_pct = disc_doc.to_dict().get("discount_pct", 0) |
| new_p = apply_price_override(db, mobile, item_name, price, disc_pct) |
| updated.append(f"*{item_name.title()}*: {currency}{price} " |
| f"({disc_pct}% discount active β {currency}{new_p})") |
| disc_ref.delete() |
| else: |
| updated.append(f"*{item_name.title()}*: {currency}{price}") |
| except Exception as e: |
| logger.warning(f"_apply_price_replies {item_name}: {e}") |
| if not updated: |
| return "No matching stock items found." |
| items_joined = "\n".join(f" {u}" for u in updated) |
| return "\U0001f4b0 Prices saved:\n" + items_joined |
|
|
| |
| PROCESSED_MESSAGES_TTL_HOURS = 24 |
|
|
| class MessageDeduplicator: |
| def __init__(self, ttl_hours=24, max_cache_size=10000, db_client=None): |
| self.ttl_seconds = ttl_hours * 3600 |
| self.max_cache_size = max_cache_size |
| self.db_client = db_client |
| self.cache = OrderedDict() |
| self.lock = threading.RLock() |
| threading.Thread(target=self._periodic_cleanup, daemon=True).start() |
|
|
| def is_duplicate(self, message_id): |
| if not message_id: return False |
| with self.lock: |
| if message_id in self.cache: |
| self.cache.move_to_end(message_id) |
| return True |
| if self.db_client: |
| try: |
| doc = self.db_client.collection("processed_messages").document(message_id).get() |
| if doc.exists: |
| self.cache[message_id] = time() |
| if len(self.cache) > self.max_cache_size: |
| self.cache.popitem(last=False) |
| return True |
| except Exception as e: |
| logger.error(f"is_duplicate DB error: {e}") |
| self._mark_processed(message_id) |
| return False |
|
|
| def _mark_processed(self, message_id): |
| with self.lock: |
| self.cache[message_id] = time() |
| if len(self.cache) > self.max_cache_size: |
| self.cache.popitem(last=False) |
| if self.db_client: |
| try: |
| expiry = datetime.now() + timedelta(hours=self.ttl_seconds / 3600) |
| self.db_client.collection("processed_messages").document(message_id).set( |
| {"processed_at": firestore.SERVER_TIMESTAMP, "expires_at": expiry} |
| ) |
| except Exception as e: |
| logger.error(f"_mark_processed DB error: {e}") |
|
|
| def _periodic_cleanup(self): |
| while True: |
| try: |
| with self.lock: |
| now = time() |
| expired = [mid for mid, ts in list(self.cache.items()) |
| if now - ts > self.ttl_seconds] |
| for mid in expired: |
| self.cache.pop(mid, None) |
| threading.Event().wait(3600) |
| except Exception as e: |
| logger.error(f"cleanup thread error: {e}") |
| threading.Event().wait(300) |
|
|
| message_deduplicator = MessageDeduplicator( |
| ttl_hours=PROCESSED_MESSAGES_TTL_HOURS, db_client=db |
| ) |
|
|
| def check_and_mark_processed(message_id: str) -> bool: |
| if not message_id: |
| logger.warning("Empty message ID") |
| return False |
| return message_deduplicator.is_duplicate(message_id) |
|
|
| |
|
|
| def is_user_approved(mobile: str) -> Tuple[bool, Optional[Dict]]: |
| if not db: |
| logger.error("Firestore not available for auth") |
| return False, None |
| try: |
| normalized = mobile if mobile.startswith("+") else f"+{mobile}" |
| logger.info(f"AUTHORIZATION: Checking approval for mobile: '{mobile}'") |
| doc = db.collection("users").document(normalized).get() |
| if doc.exists: |
| data = doc.to_dict() |
| if data.get("status", "").lower() == "approved": |
| return True, data |
| return False, None |
| except Exception as e: |
| logger.error(f"is_user_approved error for {mobile}: {e}", exc_info=True) |
| return False, None |
|
|
| |
|
|
| def send_confirmation_buttons(mobile: str, message_summary: str, |
| is_variance: bool = False) -> None: |
| if is_variance: |
| buttons = [ |
| {"reply": {"id": "confirm_resolved", "title": "β
Settled"}}, |
| {"reply": {"id": "confirm_unresolved", "title": "β οΈ Pending"}}, |
| {"reply": {"id": "cancel_transaction", "title": "β Cancel"}}, |
| ] |
| else: |
| buttons = [ |
| {"reply": {"id": "confirm_transaction", "title": "β
Confirm"}}, |
| {"reply": {"id": "cancel_transaction", "title": "β Cancel"}}, |
| ] |
| |
| |
| body = message_summary[:1020] + "..." if len(message_summary) > 1024 else message_summary |
| wa_client.send_reply_buttons(recipient_id=mobile, body_text=body, button_data=buttons) |
|
|
|
|
| def send_image_intent_buttons(mobile: str) -> None: |
| """Ask vendor whether the image is stock-in or a sale β shown when no caption.""" |
| wa_client.send_reply_buttons( |
| recipient_id=mobile, |
| body_text="What is this image for?", |
| button_data=[ |
| {"reply": {"id": "image_stock_in", "title": "π¦ Stock In"}}, |
| {"reply": {"id": "image_record_sale", "title": "π° Record Sale"}}, |
| ], |
| ) |
|
|
|
|
| def send_discount_buttons(mobile: str, item: Dict, idx: int) -> None: |
| """ |
| Send a discount suggestion button per flagged item. |
| No prices shown β discount % only. Price set later by user. |
| """ |
| name = item["name"] |
| disc = item["discount_pct"] |
| quality = item["quality"] |
| qty = item["quantity"] |
|
|
| icon = "π΄" if quality == "urgent_move" else "π " |
| body = ( |
| f"{icon} *{name.title()}* β {qty} units\n" |
| f"Quality: {quality.replace('_', ' ')}\n" |
| f"Suggested discount: *{disc}% off* selling price\n\n" |
| f"Apply this discount for the next 24 hours?" |
| ) |
| wa_client.send_reply_buttons( |
| recipient_id=mobile, |
| body_text=body, |
| button_data=[ |
| {"reply": {"id": f"discount_confirm_{idx}_{name}", "title": "β
Apply Discount"}}, |
| {"reply": {"id": f"discount_skip_{idx}_{name}", "title": "βοΈ Skip"}}, |
| ], |
| ) |
|
|
| |
|
|
| def handle_interactive_response(mobile: str, button_id: str) -> None: |
| if not db: |
| wa_client.send_text_message(mobile, "Database unavailable β cannot process.") |
| return |
|
|
| |
| if button_id in ("image_stock_in", "image_record_sale"): |
| image_bytes, caption = retrieve_pending_image(mobile, db) |
| if not image_bytes: |
| wa_client.send_text_message(mobile, "Couldn't find your image β please send it again.") |
| return |
|
|
| mode = "stock_in" if button_id == "image_stock_in" else "sale" |
| wa_client.send_text_message(mobile, "Analysing your image... π") |
| result = process_image_and_generate_query(image_bytes, caption, mode=mode) |
| _handle_vision_result(mobile, result, caption) |
| return |
|
|
| |
| if button_id == "confirm_inventory": |
| inv_ref = db.collection("users").document(mobile) \ |
| .collection("temp_inventory").document("pending") |
| inv_doc = inv_ref.get() |
| if not inv_doc.exists: |
| wa_client.send_text_message(mobile, "No pending inventory found.") |
| return |
| inventory_data = inv_doc.to_dict().get("inventory", {}) |
| inv_ref.delete() |
| |
| from utility import get_user_currency |
| currency = get_user_currency(db, mobile) or "?" |
| msg = confirm_stock_in(db, mobile, inventory_data, currency=currency) |
| wa_client.send_text_message(mobile, msg) |
|
|
| |
| _, discount_items = format_inventory_message(inventory_data) |
| for i, item in enumerate(discount_items): |
| send_discount_buttons(mobile, item, i) |
| return |
|
|
| |
| if button_id.startswith("discount_confirm_"): |
| |
| parts = button_id.split("_", 3) |
| if len(parts) == 4: |
| item_name = parts[3] |
| inv_ref = db.collection("users").document(mobile) .collection("temp_inventory").document("discount_meta") |
| meta_doc = inv_ref.get() |
| disc = 0 |
| if meta_doc.exists: |
| meta = meta_doc.to_dict() |
| disc = meta.get(item_name, {}).get("discount_pct", 20) |
|
|
| |
| from utility import _lookup_item_price |
| price = _lookup_item_price(db, mobile, item_name) |
| if price: |
| new_p = apply_price_override(db, mobile, item_name, price, disc) |
| from utility import get_user_currency |
| cur = get_user_currency(db, mobile) or "" |
| wa_client.send_text_message( |
| mobile, |
| f"β
Discount applied: *{item_name.title()}* β {cur}{new_p} for 24h ({disc}% off)." |
| ) |
| else: |
| |
| db.collection("users").document(mobile) .collection("pending_discounts").document(item_name).set({ |
| "item_name": item_name, |
| "discount_pct": disc, |
| "created_at": firestore.SERVER_TIMESTAMP, |
| }) |
| wa_client.send_text_message( |
| mobile, |
| f"β
{disc}% discount noted for *{item_name.title()}*. " |
| f"I'll apply it once you set the selling price." |
| ) |
| return |
|
|
| if button_id.startswith("discount_skip_"): |
| parts = button_id.split("_", 3) |
| item_name = parts[3] if len(parts) == 4 else "item" |
| wa_client.send_text_message(mobile, f"βοΈ Skipped discount for {item_name.title()}.") |
| return |
|
|
| |
| doc_ref = db.collection("users").document(mobile) \ |
| .collection("temp_transactions").document("pending") |
| try: |
| txn_doc = doc_ref.get() |
| if not txn_doc.exists: |
| wa_client.send_text_message(mobile, "No transaction waiting for confirmation.") |
| return |
|
|
| pending = txn_doc.to_dict().get("transactions", []) |
| if not pending: |
| wa_client.send_text_message(mobile, "Issue with pending transaction data.") |
| doc_ref.delete() |
| return |
|
|
| if button_id == "confirm_transaction": |
| result = process_intent(pending, mobile, force_settled=False) |
| elif button_id == "confirm_resolved": |
| result = process_intent(pending, mobile, force_settled=True) |
| elif button_id == "confirm_unresolved": |
| result = process_intent(pending, mobile, force_settled=False) |
| elif button_id == "confirm_reset": |
| result = process_intent(pending, mobile, force_settled=False) |
| elif button_id == "cancel_transaction": |
| result = "Transaction cancelled." |
| else: |
| result = "Unrecognised button." |
|
|
| doc_ref.delete() |
|
|
| |
| if isinstance(result, tuple) and len(result) == 3 and result[0] == "TX_IDS": |
| _, summary, tx_ids = result |
| wa_client.send_text_message(mobile, summary) |
| if tx_ids: |
| id_block = "\n".join(tx_ids) |
| label = "Transaction IDs" if len(tx_ids) > 1 else "Transaction ID" |
| wa_client.send_text_message( |
| mobile, |
| f"\U0001f194 *{label}* (tap to copy):\n{id_block}" |
| ) |
|
|
| except Exception as e: |
| logger.error(f"handle_interactive_response error for {mobile}: {e}", exc_info=True) |
| wa_client.send_text_message(mobile, "Something went wrong handling your confirmation.") |
|
|
| |
|
|
| def _handle_vision_result(mobile: str, result: str, caption: Optional[str]) -> None: |
| """ |
| Route vision output: |
| - INVENTORY_JSON prefix β show count summary + store for confirmation |
| - Plain text β send to NLP pipeline as transaction description |
| - File path β upload as image |
| """ |
| inventory_data = parse_inventory_json(result) |
|
|
| if inventory_data: |
| |
| summary, discount_items = format_inventory_message(inventory_data) |
| wa_client.send_text_message(mobile, summary) |
|
|
| |
| db.collection("users").document(mobile) \ |
| .collection("temp_inventory").document("pending").set({ |
| "inventory": inventory_data, |
| "created_at": firestore.SERVER_TIMESTAMP, |
| }) |
|
|
| |
| if discount_items: |
| meta = { |
| item["name"]: { |
| "original_price": item["original_price"], |
| "discount_pct": item["discount_pct"], |
| "new_price": item["new_price"], |
| } |
| for item in discount_items |
| } |
| db.collection("users").document(mobile) \ |
| .collection("temp_inventory").document("discount_meta").set(meta) |
|
|
| |
| wa_client.send_reply_buttons( |
| recipient_id=mobile, |
| body_text="Save this as new stock?", |
| button_data=[ |
| {"reply": {"id": "confirm_inventory", "title": "β
Save Stock"}}, |
| {"reply": {"id": "cancel_transaction", "title": "β Cancel"}}, |
| ], |
| ) |
|
|
| |
| for i, item in enumerate(discount_items): |
| send_discount_buttons(mobile, item, i) |
|
|
| elif result.startswith("Error:"): |
| wa_client.send_text_message(mobile, result) |
|
|
| elif os.path.isfile(result) and HEADERS_IMGUR: |
| |
| try: |
| with open(result, "rb") as f: |
| resp = requests.post(URL_IMGUR, headers=HEADERS_IMGUR, files={"image": f}) |
| resp.raise_for_status() |
| imgur_data = resp.json() |
| if imgur_data.get("success"): |
| wa_client.send_image_message(recipient_id=mobile, |
| image_url=imgur_data["data"]["link"]) |
| os.remove(result) |
| return |
| except Exception as e: |
| logger.error(f"Imgur upload failed: {e}") |
| wa_client.send_text_message(mobile, result) |
| if os.path.exists(result): os.remove(result) |
|
|
| else: |
| |
| currency = get_user_currency(db, mobile) or "?" |
| sale_txns = parse_vision_sale_transactions(result, currency=currency) |
|
|
| if sale_txns: |
| |
| if persist_temporary_transaction(sale_txns, mobile): |
| summary = format_transaction_response(sale_txns) |
| has_variance = any( |
| "amount_paid" in t.get("details", {}) for t in sale_txns |
| ) |
| send_confirmation_buttons(mobile, summary, is_variance=has_variance) |
| else: |
| wa_client.send_text_message(mobile, "Sorry, could not save your sale.") |
| else: |
| |
| process_text_message(result, mobile) |
|
|
|
|
|
|
| def _extract_clean_message(message_text: Optional[str]) -> str: |
| """ |
| Clean WhatsApp text before sending it into the NLP pipeline. |
| |
| Main purpose: |
| - Prevent crashes when users paste/forward the bot's Transaction ID block. |
| - Remove WhatsApp markdown around transaction IDs. |
| - Preserve ordinary business messages unchanged. |
| |
| Examples handled: |
| "π *Transaction ID* (tap to copy):\nABC123" |
| "Reverse this π *Transaction ID* (tap to copy):\nABC123" |
| "Transaction IDs:\nABC123\nDEF456" |
| """ |
| if message_text is None: |
| return "" |
|
|
| text = str(message_text).strip() |
| if not text: |
| return "" |
|
|
| |
| text = re.sub(r"[\u200b\u200c\u200d\ufeff]", "", text) |
|
|
| |
| tx_label_re = re.compile(r"transaction\s+ids?", re.IGNORECASE) |
| if not tx_label_re.search(text): |
| return text |
|
|
| |
| |
| tx_ids = re.findall(r"\b[A-Za-z0-9][A-Za-z0-9_-]{5,}\b", text) |
|
|
| |
| cleaned = text |
| cleaned = re.sub(r"[π*]", "", cleaned) |
| cleaned = re.sub(r"transaction\s+ids?", "", cleaned, flags=re.IGNORECASE) |
| cleaned = re.sub(r"\(?\s*tap\s+to\s+copy\s*\)?", "", cleaned, flags=re.IGNORECASE) |
| cleaned = re.sub(r"[:\-]+", " ", cleaned) |
|
|
| |
| instruction = cleaned |
| for tx_id in tx_ids: |
| instruction = instruction.replace(tx_id, " ") |
| instruction = re.sub(r"\s+", " ", instruction).strip() |
|
|
| if tx_ids: |
| unique_ids = list(dict.fromkeys(tx_ids)) |
| id_text = " ".join(unique_ids) |
| return f"{instruction} {id_text}".strip() if instruction else id_text |
|
|
| |
| return re.sub(r"\s+", " ", cleaned).strip() or text |
|
|
| |
|
|
| def process_text_message(message_text: str, mobile: str, |
| user_settings: Optional[Dict] = None) -> Optional[str]: |
| logger.info(f"Processing text message from {mobile}: '{message_text}'") |
|
|
| |
| |
| |
| message_text = _extract_clean_message(message_text) |
|
|
| |
| lang_data = detect_and_translate_input(message_text) |
| english_text = lang_data.get("english_text", message_text) |
| detected_lang = lang_data.get("detected_lang", "English") |
| logger.info(f"Detected language: {detected_lang}. Process text: {english_text}") |
|
|
| |
| context_history = _load_context(db, mobile) |
| english_text = _resolve_context(english_text, context_history) |
|
|
| |
| _store_context(db, mobile, "user", message_text) |
|
|
| |
| nudge = check_expiry_nudge(db, mobile) |
|
|
| if GREETING_PATTERN.match(english_text): |
| base_msg = "Hi there! I'm Qx-SmartLedger, your business assistant. How can I help?" |
| if nudge: |
| base_msg = nudge + "\n\n" + base_msg |
| final_msg = translate_output(base_msg, detected_lang) |
| wa_client.send_text_message(mobile, final_msg) |
| return final_msg |
|
|
| |
| |
| currency_reply = _parse_currency_reply(english_text) |
| if currency_reply: |
| set_user_currency(db, mobile, currency_reply) |
| reply = f"Got it β I'll use *{currency_reply}* for all your transactions. Now, how can I help?" |
| final = translate_output(reply, detected_lang) |
| wa_client.send_text_message(mobile, final) |
| return final |
|
|
| |
| user_currency = get_user_currency(db, mobile) |
| if not user_currency and user_settings: |
| user_currency = (user_settings.get("currency") or |
| user_settings.get("settings", {}).get("currency")) |
|
|
| |
| if not user_currency and not GREETING_PATTERN.match(english_text): |
| wa_client.send_text_message(mobile, CURRENCY_PROMPT) |
| return None |
|
|
| |
| price_reply = _parse_price_reply(english_text) |
| if price_reply is not None: |
| if price_reply == {}: |
| msg = translate_output("No problem β you can set prices anytime by telling me.", detected_lang) |
| wa_client.send_text_message(mobile, msg) |
| return msg |
| msg = _apply_price_replies(db, mobile, price_reply) |
| final = translate_output(msg, detected_lang) |
| wa_client.send_text_message(mobile, final) |
| return final |
|
|
| |
| |
| pending_ref = db.collection("users").document(mobile) .collection("temp_transactions").document("pending") |
| pending_doc = pending_ref.get() |
| if pending_doc.exists: |
| missing_fill = _try_fill_missing_details( |
| english_text, pending_doc.to_dict().get("transactions", []) |
| ) |
| if missing_fill is not None: |
| filled_txns, still_missing = missing_fill |
| pending_ref.set({"transactions": filled_txns, |
| "created_at": firestore.SERVER_TIMESTAMP}) |
| if still_missing: |
| msg = translate_output(still_missing, detected_lang) |
| wa_client.send_text_message(mobile, msg) |
| return msg |
| |
| summary = format_transaction_response(filled_txns) |
| has_variance = any("amount_paid" in t.get("details", {}) for t in filled_txns) |
| is_sale = filled_txns[0].get("transaction_type") == "sale" |
| send_confirmation_buttons(mobile, translate_output(summary, detected_lang), |
| is_variance=has_variance and is_sale) |
| return None |
|
|
| llm_response_str = generateResponse(english_text, currency=user_currency) |
| parsed_trans_data = parse_multiple_transactions(llm_response_str) |
|
|
| response_msg = "" |
| send_image = False |
| image_path = None |
|
|
| if not parsed_trans_data: |
| response_msg = "Sorry, I couldn't quite understand that. Could you rephrase?" |
| else: |
| primary_intent = parsed_trans_data[0].get("intent", "").lower() |
| primary_type = parsed_trans_data[0].get("transaction_type", "").lower() |
|
|
| if primary_intent == "read" or primary_type == "query": |
| |
| for t in parsed_trans_data: |
| t.setdefault("details", {})["currency"] = user_currency |
| response_data = process_intent(parsed_trans_data, mobile) |
|
|
| |
| if isinstance(response_data, tuple) and len(response_data) == 3 and response_data[0] == "TX_IDS": |
| |
| _, summary, tx_ids = response_data |
| response_msg = summary |
|
|
| elif isinstance(response_data, tuple) and len(response_data) == 2: |
| |
| plot_path, insight = response_data |
| if isinstance(plot_path, str) and plot_path.startswith("PLOT:"): |
| plot_path = plot_path[5:] |
| if isinstance(plot_path, str) and os.path.isfile(plot_path): |
| send_image = True |
| image_path = plot_path |
| |
| if insight: |
| response_msg = f"π‘ {insight}" |
| else: |
| |
| response_msg = insight or "Chart could not be generated." |
|
|
| else: |
| response_msg = str(response_data) if response_data else "No data found." |
|
|
| elif primary_intent in ("create", "update", "delete", "reset_account"): |
| if primary_intent == "reset_account": |
| |
| if persist_temporary_transaction(parsed_trans_data, mobile): |
| warning = ( |
| "β οΈ *This will permanently delete ALL your transactions, " |
| "stock records, and price history.*\n\n" |
| "This cannot be undone. Are you sure?" |
| ) |
| warning_translated = translate_output(warning, detected_lang) |
| wa_client.send_reply_buttons( |
| recipient_id=mobile, |
| body_text=warning_translated, |
| button_data=[ |
| {"reply": {"id": "confirm_reset", "title": "ποΈ Yes, Delete All"}}, |
| {"reply": {"id": "cancel_transaction", "title": "β Cancel"}}, |
| ], |
| ) |
| if nudge: |
| wa_client.send_text_message(mobile, nudge) |
| return None |
| else: |
| response_msg = "Could not process your request." |
| else: |
| |
| missing_prompt = _check_missing_details(parsed_trans_data, user_currency) |
| if missing_prompt: |
| |
| persist_temporary_transaction(parsed_trans_data, mobile) |
| final_prompt = translate_output(missing_prompt, detected_lang) |
| wa_client.send_text_message(mobile, final_prompt) |
| return None |
|
|
| if persist_temporary_transaction(parsed_trans_data, mobile): |
| transaction_summary = format_transaction_response(parsed_trans_data) |
| has_payment_input = any( |
| "amount_paid" in t.get("details", {}) for t in parsed_trans_data |
| ) |
| is_variance = ( |
| has_payment_input and |
| primary_intent == "create" and |
| primary_type == "sale" |
| ) |
| trans_summary_translated = translate_output(transaction_summary, detected_lang) |
| send_confirmation_buttons(mobile, trans_summary_translated, |
| is_variance=is_variance) |
| if nudge: |
| wa_client.send_text_message(mobile, nudge) |
| return None |
| else: |
| response_msg = "Sorry, I couldn't save your transaction for confirmation." |
| else: |
| response_msg = f"I'm not sure how to handle '{primary_intent}'." |
|
|
| |
| if nudge and response_msg: |
| response_msg = nudge + "\n\n" + response_msg |
|
|
| if response_msg: |
| final_response = translate_output(response_msg, detected_lang) |
|
|
| if send_image and image_path: |
| try: |
| if not HEADERS_IMGUR: |
| |
| logger.warning("Imgur not configured β sending insight text only") |
| wa_client.send_text_message(mobile, final_response or "Chart generated but Imgur not configured.") |
| if os.path.exists(image_path): os.remove(image_path) |
| return final_response |
| with open(image_path, "rb") as f: |
| resp = requests.post(URL_IMGUR, headers=HEADERS_IMGUR, files={"image": f}) |
| resp.raise_for_status() |
| imgur_data = resp.json() |
| if imgur_data.get("success"): |
| wa_client.send_image_message(recipient_id=mobile, |
| image_url=imgur_data["data"]["link"]) |
| os.remove(image_path) |
| |
| if final_response and final_response.startswith("π‘"): |
| wa_client.send_text_message(mobile, final_response) |
| return final_response or None |
| else: |
| wa_client.send_text_message(mobile, final_response) |
| os.remove(image_path) |
| return final_response |
| except Exception as e: |
| logger.error(f"Image upload failed: {e}", exc_info=True) |
| wa_client.send_text_message(mobile, final_response or "Chart could not be sent.") |
| if os.path.exists(image_path): os.remove(image_path) |
| return final_response |
| else: |
| wa_client.send_text_message(mobile, final_response) |
| |
| _store_context(db, mobile, "bot", final_response) |
| return final_response |
|
|
| return None |
|
|
| |
|
|
| def _deepgram_tts_to_mp3(text: str) -> Optional[str]: |
| if not DEEPGRAM_API_KEY: |
| return None |
| capped = cap_for_tts(text) |
| try: |
| resp = requests.post( |
| DEEPGRAM_TTS_URL, |
| headers={"Authorization": f"Token {DEEPGRAM_API_KEY}", |
| "Content-Type": "application/json"}, |
| json={"text": capped}, |
| timeout=30, |
| ) |
| resp.raise_for_status() |
| filepath = os.path.join(os.getcwd(), f"tts_{uuid.uuid4()}.mp3") |
| with open(filepath, "wb") as f: |
| f.write(resp.content) |
| return filepath |
| except Exception as e: |
| logger.error(f"DeepGram TTS failed: {e}", exc_info=True) |
| return None |
|
|
|
|
| def _upload_to_firebase_storage(file_path: str) -> Optional[str]: |
| try: |
| bucket = storage.bucket() |
| blob = bucket.blob(f"audio_responses/{os.path.basename(file_path)}") |
| blob.upload_from_filename(file_path) |
| url = blob.generate_signed_url(expiration=timedelta(hours=1)) |
| return url |
| except Exception as e: |
| logger.error(f"Firebase Storage upload failed: {e}", exc_info=True) |
| return None |
|
|
|
|
| def process_audio_message(audio_id: str, mobile: str, |
| user_settings: Optional[Dict]) -> None: |
| if not transcriber: |
| wa_client.send_text_message(mobile, "Audio processing is unavailable right now.") |
| return |
|
|
| media_url = wa_client.get_media_url(audio_id) |
| if not media_url: |
| wa_client.send_text_message(mobile, "Couldn't retrieve your audio.") |
| return |
|
|
| os.makedirs("temp_audio", exist_ok=True) |
| audio_path = os.path.join("temp_audio", f"{mobile}_{audio_id}.ogg") |
| downloaded = wa_client.download_media(media_url, audio_path) |
| if not downloaded: |
| wa_client.send_text_message(mobile, "Couldn't download your audio.") |
| return |
|
|
| try: |
| transcript = transcriber.transcribe(downloaded) |
| if transcript.status == aai.TranscriptStatus.error: |
| wa_client.send_text_message(mobile, f"Transcription error: {transcript.error}") |
| elif not transcript.text: |
| wa_client.send_text_message(mobile, "Couldn't understand the audio.") |
| else: |
| text_response = process_text_message(transcript.text, mobile, user_settings) |
| if text_response: |
| mp3_path = _deepgram_tts_to_mp3(text_response) |
| if mp3_path: |
| audio_url = _upload_to_firebase_storage(mp3_path) |
| if audio_url: |
| wa_client.send_audio_message(mobile, audio_url=audio_url) |
| else: |
| wa_client.send_audio_message(mobile, audio_path=mp3_path) |
| if os.path.exists(mp3_path): os.remove(mp3_path) |
| finally: |
| if os.path.exists(downloaded): os.remove(downloaded) |
|
|
| |
|
|
| def process_image_message(image_id: str, caption: Optional[str], |
| mobile: str, user_settings: Optional[Dict]) -> None: |
| logger.info(f"Processing image (ID: {image_id}) from {mobile}, caption: '{caption}'") |
| wa_client.send_text_message(mobile, "Got your image β analysing... π") |
|
|
| media_url = wa_client.get_media_url(image_id) |
| if not media_url: |
| wa_client.send_text_message(mobile, "Couldn't retrieve your image from WhatsApp.") |
| return |
|
|
| os.makedirs("temp_images", exist_ok=True) |
| image_path = os.path.join("temp_images", f"{mobile}_{image_id}.jpg") |
| downloaded = wa_client.download_media(media_url, image_path) |
| if not downloaded: |
| wa_client.send_text_message(mobile, "Couldn't download your image.") |
| return |
|
|
| try: |
| with open(downloaded, "rb") as f: |
| image_bytes = f.read() |
|
|
| |
| if not caption or not caption.strip(): |
| persist_pending_image(mobile, image_bytes, caption, db) |
| send_image_intent_buttons(mobile) |
| return |
|
|
| |
| result = process_image_and_generate_query(image_bytes, caption, mode="auto") |
| _handle_vision_result(mobile, result, caption) |
|
|
| except Exception as e: |
| logger.error(f"Vision processing error: {e}", exc_info=True) |
| wa_client.send_text_message(mobile, "Something went wrong analysing your image.") |
| finally: |
| if os.path.exists(downloaded): |
| try: os.remove(downloaded) |
| except Exception: pass |
|
|
| |
|
|
| @app.route("/", methods=["GET", "POST"]) |
| def webhook_handler(): |
| if request.method == "GET": |
| mode = request.args.get("hub.mode") |
| token = request.args.get("hub.verify_token") |
| challenge = request.args.get("hub.challenge") |
| if mode == "subscribe" and token == VERIFY_TOKEN: |
| return make_response(challenge, 200) |
| return make_response("Verification failed", 403) |
|
|
| if request.method == "POST": |
| try: |
| data = request.get_json() |
| msg_details = wa_client.get_message_details(data) |
| if not msg_details: |
| return make_response("ok", 200) |
|
|
| message_id = msg_details.get("id") |
| mobile = msg_details.get("from") |
| message_type = msg_details.get("type") |
|
|
| if check_and_mark_processed(message_id): |
| return make_response("ok - duplicate", 200) |
|
|
| is_approved, user_data = is_user_approved(mobile) |
| if not is_approved: |
| wa_client.send_text_message(mobile, "Access denied. Please contact your administrator.") |
| return make_response("ok", 200) |
|
|
| if message_type == "text": |
| process_text_message(msg_details.get("text"), mobile, user_data) |
| elif message_type == "audio": |
| process_audio_message(msg_details.get("audio_id"), mobile, user_data) |
| elif message_type == "image": |
| process_image_message(msg_details.get("image_id"), |
| msg_details.get("caption"), mobile, user_data) |
| elif message_type == "interactive": |
| handle_interactive_response(mobile, msg_details.get("button_reply_id")) |
|
|
| except Exception as e: |
| logger.error(f"Unhandled exception in webhook: {e}", exc_info=True) |
|
|
| return make_response("ok", 200) |
|
|
| |
|
|
| if __name__ == "__main__": |
| port = int(os.environ.get("PORT", 7860)) |
| debug_mode = os.environ.get("FLASK_DEBUG", "False").lower() == "true" |
| print(f"===== Application Startup at {datetime.now()} =====") |
| if not debug_mode: |
| from waitress import serve |
| serve(app, host="0.0.0.0", port=port) |
| else: |
| app.run(debug=True, host="0.0.0.0", port=port) |