rairo commited on
Commit
e9d7492
·
verified ·
1 Parent(s): 60f5448

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +155 -189
main.py CHANGED
@@ -1,4 +1,3 @@
1
- # main.py
2
  from flask import Flask, request, make_response, jsonify
3
  import os
4
  import logging
@@ -19,7 +18,7 @@ import requests
19
  import dns.resolver
20
  import socket
21
  from datetime import datetime, timedelta
22
- from typing import Optional, Dict, Any
23
  import json
24
  import sys
25
  import threading
@@ -33,10 +32,9 @@ IMGUR_CLIENT_ID = os.getenv("IMGUR_CLIENT_ID")
33
  URL_IMGUR = "https://api.imgur.com/3/image"
34
  HEADERS_IMGUR = {"Authorization": f"Client-ID {IMGUR_CLIENT_ID}"} if IMGUR_CLIENT_ID else {}
35
 
36
- # Deepgram (TTS) Configuration (ONLY used for audio-input pipeline)
37
- DEEPGRAM_API = os.getenv("DEEPGRAM_API") # user specified: Kenya’s DEEPGRAM_API
38
- DEEPGRAM_TTS_MODEL = os.getenv("DEEPGRAM_TTS_MODEL", "aura-asteria-en")
39
- DEEPGRAM_TTS_VOICE_PARAMS = os.getenv("DEEPGRAM_TTS_VOICE_PARAMS", "") # optional querystring extras
40
 
41
  # AssemblyAI Configuration
42
  try:
@@ -50,7 +48,7 @@ except Exception as e:
50
  logging.error(f"Failed to initialize AssemblyAI: {e}")
51
 
52
  import firebase_admin
53
- from firebase_admin import credentials, firestore
54
 
55
  def init_firestore_from_env(env_var: str = "FIREBASE"):
56
  """
@@ -79,7 +77,7 @@ db = init_firestore_from_env()
79
 
80
  app = Flask(__name__)
81
 
82
- # Logging Configuration (Improved Format)
83
  logging.basicConfig(
84
  level=logging.INFO,
85
  format="%(asctime)s - %(levelname)s - %(module)s.%(funcName)s - %(message)s",
@@ -91,13 +89,11 @@ logger = logging.getLogger(__name__)
91
  # --- DNS Workaround ---
92
  nameserver1 = os.getenv('nameserver1', '8.8.8.8')
93
  nameserver2 = os.getenv('nameserver2', '8.8.4.4')
94
-
95
  def setup_dns():
96
  try:
97
  resolver = dns.resolver.Resolver()
98
  resolver.nameservers = [nameserver1, nameserver2]
99
  _orig_getaddrinfo = socket.getaddrinfo
100
-
101
  def new_getaddrinfo(*args, **kwargs):
102
  try:
103
  if args and isinstance(args[0], str) and 'graph.facebook.com' in args[0]:
@@ -106,16 +102,14 @@ def setup_dns():
106
  logger.info(f"DNS Override: Resolved {args[0]} to {ip} using {resolver.nameservers}")
107
  return _orig_getaddrinfo(ip, *args[1:], **kwargs)
108
  except dns.resolver.NoAnswer:
109
- logger.warning(f"DNS Override: No A record found for {args[0]} using {resolver.nameservers}")
110
  except Exception as e:
111
  logger.error(f"DNS resolution override failed for {args}: {e}")
112
  return _orig_getaddrinfo(*args, **kwargs)
113
-
114
  socket.getaddrinfo = new_getaddrinfo
115
  logger.info(f"DNS resolver configured with nameservers: {resolver.nameservers}")
116
  except Exception as e:
117
  logger.error(f"Failed to setup custom DNS resolver: {e}")
118
-
119
  setup_dns()
120
  # --- End DNS Workaround ---
121
 
@@ -126,7 +120,6 @@ GREETING_PATTERN = re.compile(r'^\s*(hi|hello|hola|hey|greetings|sawubona)\b.*$'
126
  PROCESSED_MESSAGES_TTL_HOURS = 24
127
 
128
  class MessageDeduplicator:
129
- """Thread-safe in-memory cache for message IDs with TTL and Firestore backup."""
130
  def __init__(self, ttl_hours=24, max_cache_size=10000, db_client=None):
131
  self.ttl_seconds = ttl_hours * 3600
132
  self.max_cache_size = max_cache_size
@@ -135,55 +128,45 @@ class MessageDeduplicator:
135
  self.lock = threading.RLock()
136
  self._cleanup_thread = threading.Thread(target=self._periodic_cleanup, daemon=True)
137
  self._cleanup_thread.start()
138
-
139
  def is_duplicate(self, message_id):
140
- if not message_id:
141
- return False
142
  with self.lock:
143
  if message_id in self.cache:
144
  self.cache.move_to_end(message_id)
145
  return True
146
-
147
  if self.db_client:
148
  try:
149
  doc_ref = self.db_client.collection("processed_messages").document(message_id)
150
  if doc_ref.get().exists:
151
  self.cache[message_id] = time()
152
- if len(self.cache) > self.max_cache_size:
153
- self.cache.popitem(last=False)
154
  return True
155
  except Exception as e:
156
  logger.error(f"Database error in is_duplicate: {e}", exc_info=True)
157
-
158
  self._mark_processed(message_id)
159
  return False
160
-
161
  def _mark_processed(self, message_id):
162
  current_time = time()
163
  with self.lock:
164
  self.cache[message_id] = current_time
165
- if len(self.cache) > self.max_cache_size:
166
- self.cache.popitem(last=False)
167
-
168
  if self.db_client:
169
  try:
170
- expiry = datetime.now() + timedelta(seconds=self.ttl_seconds)
171
  doc_ref = self.db_client.collection("processed_messages").document(message_id)
172
  doc_ref.set({"processed_at": firestore.SERVER_TIMESTAMP, "expires_at": expiry})
173
  except Exception as e:
174
  logger.error(f"Database error in _mark_processed: {e}", exc_info=True)
175
-
176
  def _periodic_cleanup(self):
177
  while True:
178
  try:
179
  with self.lock:
180
  current_time = time()
181
- expired_ids = [
182
- msg_id for msg_id, timestamp in list(self.cache.items())
183
- if current_time - timestamp > self.ttl_seconds
184
- ]
185
- for msg_id in expired_ids:
186
- self.cache.pop(msg_id, None)
187
  threading.Event().wait(3600)
188
  except Exception as e:
189
  logger.error(f"Error in cleanup thread: {e}", exc_info=True)
@@ -192,34 +175,33 @@ class MessageDeduplicator:
192
  message_deduplicator = MessageDeduplicator(ttl_hours=PROCESSED_MESSAGES_TTL_HOURS, db_client=db)
193
 
194
  def check_and_mark_processed(message_id: str) -> bool:
195
- """Check if a message has been processed already using the deduplicator."""
196
  if not message_id:
197
  logger.warning("Empty message ID provided to check_and_mark_processed")
198
  return False
199
  return message_deduplicator.is_duplicate(message_id)
200
 
201
- def is_user_approved(mobile: str) -> bool:
202
- """Checks if a user's phone number is in the 'users' collection and has status 'approved'."""
203
  if not db:
204
  logger.error("Authorization check failed: Firestore client is not available.")
205
- return False
206
  try:
207
  normalized_mobile = mobile if mobile.startswith('+') else f'+{mobile}'
208
- logger.info(f"AUTHORIZATION: Checking approval for mobile: '{mobile}', using normalized ID: '{normalized_mobile}'")
209
  user_ref = db.collection("users").document(normalized_mobile)
210
  user_doc = user_ref.get()
211
 
212
  if user_doc.exists:
213
  user_data = user_doc.to_dict()
214
- status = str(user_data.get("status", "N/A")).lower()
215
- return status == "approved"
216
- return False
 
217
  except Exception as e:
218
  logger.error(f"AUTHORIZATION: An exception occurred for {mobile}: {e}", exc_info=True)
219
- return False
220
 
221
  def send_confirmation_buttons(mobile: str, message_summary: str) -> None:
222
- """Send interactive confirmation buttons using the new client."""
223
  buttons = [
224
  {"reply": {"id": "confirm_transaction", "title": "✅ Confirm"}},
225
  {"reply": {"id": "cancel_transaction", "title": "❌ Cancel"}}
@@ -228,7 +210,6 @@ def send_confirmation_buttons(mobile: str, message_summary: str) -> None:
228
  wa_client.send_reply_buttons(recipient_id=mobile, body_text=body, button_data=buttons)
229
 
230
  def handle_interactive_response(mobile: str, button_id: str) -> None:
231
- """Process user's button response (Confirm/Cancel)."""
232
  if not db:
233
  wa_client.send_text_message(mobile, "Sorry, cannot process confirmation due to a database issue.")
234
  return
@@ -242,9 +223,9 @@ def handle_interactive_response(mobile: str, button_id: str) -> None:
242
 
243
  pending_transactions = transaction_doc.to_dict().get("transactions", [])
244
  if not pending_transactions:
245
- wa_client.send_text_message(mobile, "There seems to be an issue with the pending transaction data.")
246
- doc_ref.delete()
247
- return
248
 
249
  if button_id == "confirm_transaction":
250
  msg = process_intent(pending_transactions, mobile)
@@ -254,86 +235,45 @@ def handle_interactive_response(mobile: str, button_id: str) -> None:
254
  doc_ref.delete()
255
  else:
256
  msg = "Sorry, I didn't understand that button press."
257
-
258
  wa_client.send_text_message(mobile, msg)
259
  except Exception as e:
260
  logger.error(f"Error handling interactive response for user {mobile}: {e}", exc_info=True)
261
  wa_client.send_text_message(mobile, "Sorry, something went wrong while handling your confirmation.")
262
 
263
- def _deepgram_tts_to_mp3(text: str) -> Optional[str]:
264
- """
265
- Generate an MP3 using Deepgram TTS and return local file path.
266
- Only used when input originated from STT/audio.
267
- """
268
- if not DEEPGRAM_API:
269
- logger.warning("DEEPGRAM_API not set. Skipping TTS.")
270
- return None
271
-
272
- try:
273
- safe_text = (text or "").strip()
274
- if not safe_text:
275
- return None
276
-
277
- # Keep it short-ish to avoid huge audio
278
- # (WhatsApp media limits + faster UX)
279
- if len(safe_text) > 1800:
280
- safe_text = safe_text[:1800] + "..."
281
-
282
- base_url = f"https://api.deepgram.com/v1/speak?model={DEEPGRAM_TTS_MODEL}"
283
- if DEEPGRAM_TTS_VOICE_PARAMS:
284
- base_url += f"&{DEEPGRAM_TTS_VOICE_PARAMS.lstrip('&')}"
285
-
286
- headers = {
287
- "Authorization": f"Token {DEEPGRAM_API}",
288
- "Content-Type": "application/json",
289
- "Accept": "audio/mpeg"
290
- }
291
- payload = {"text": safe_text}
292
-
293
- resp = requests.post(base_url, headers=headers, json=payload, timeout=60)
294
- resp.raise_for_status()
295
-
296
- temp_dir = "temp_tts"
297
- os.makedirs(temp_dir, exist_ok=True)
298
- out_path = os.path.join(temp_dir, f"tts_{int(time())}.mp3")
299
- with open(out_path, "wb") as f:
300
- f.write(resp.content)
301
-
302
- return out_path
303
-
304
- except Exception as e:
305
- logger.error(f"Deepgram TTS failed: {e}", exc_info=True)
306
- return None
307
-
308
- def process_text_message(message_text: str, mobile: str, input_mode: str = "text") -> Optional[str]:
309
  """
310
  Processes incoming text messages.
311
- If input_mode == 'speech', returns the final text response string for TTS usage.
312
- Otherwise sends responses as before and returns None.
313
  """
314
- logger.info(f"Processing {input_mode} message from {mobile}: '{message_text}'")
315
-
316
  if GREETING_PATTERN.match(message_text):
317
  msg = "Hi there! I'm Qx-SmartLedger, your business assistant. How can I help?"
318
  wa_client.send_text_message(mobile, msg)
319
- return msg if input_mode == "speech" else None
320
-
321
- llm_response_str = generateResponse(message_text)
 
 
 
 
 
 
 
322
  parsed_trans_data = parse_multiple_transactions(llm_response_str)
323
-
324
  if not parsed_trans_data:
325
  msg = "Sorry, I couldn't quite understand that. Could you please rephrase?"
326
  wa_client.send_text_message(mobile, msg)
327
- return msg if input_mode == "speech" else None
328
 
329
- primary_intent = (parsed_trans_data[0].get('intent', '') or '').lower()
330
- primary_type = (parsed_trans_data[0].get('transaction_type', '') or '').lower()
331
 
332
- # Reads / Queries
333
  if primary_intent == 'read' or primary_type == 'query':
334
  response_data = process_intent(parsed_trans_data, mobile)
335
-
336
- # If the response is a generated image report path, we send image and stop
337
  if isinstance(response_data, str) and os.path.isfile(response_data) and HEADERS_IMGUR:
338
  try:
339
  with open(response_data, "rb") as f:
@@ -342,41 +282,77 @@ def process_text_message(message_text: str, mobile: str, input_mode: str = "text
342
  imgur_data = response_imgur.json()
343
  if imgur_data.get("success"):
344
  wa_client.send_image_message(recipient_id=mobile, image_url=imgur_data["data"]["link"])
345
- # For speech pipeline: we already delivered an image; don't TTS this
346
- return None
347
  else:
348
- wa_client.send_text_message(mobile, f"Couldn't generate the image report, but here's the data:\n{response_data}")
349
- return str(response_data) if input_mode == "speech" else None
 
 
350
  except Exception as e:
351
  logger.error(f"Image report processing/upload failed: {e}", exc_info=True)
352
- wa_client.send_text_message(mobile, f"Error generating image report. Here's the text data:\n{response_data}")
353
- return str(response_data) if input_mode == "speech" else None
354
- finally:
355
- try:
356
- os.remove(response_data)
357
- except Exception:
358
- pass
359
  else:
360
  wa_client.send_text_message(mobile, str(response_data))
361
- return str(response_data) if input_mode == "speech" else None
362
 
363
- # Create/Update/Delete with confirmation flow
364
- if primary_intent in ['create', 'update', 'delete']:
365
  if persist_temporary_transaction(parsed_trans_data, mobile):
366
  transaction_summary = format_transaction_response(parsed_trans_data)
367
  send_confirmation_buttons(mobile, transaction_summary)
368
- return None
369
  else:
370
  msg = "Sorry, I couldn't save your transaction for confirmation."
371
  wa_client.send_text_message(mobile, msg)
372
- return msg if input_mode == "speech" else None
 
 
 
 
 
 
373
 
374
- msg = f"I'm not sure how to handle the action '{primary_intent}'."
375
- wa_client.send_text_message(mobile, msg)
376
- return msg if input_mode == "speech" else None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
377
 
378
- def process_audio_message(audio_id: str, mobile: str) -> None:
379
- """Processes incoming audio messages: STT -> normal pipeline -> ALSO TTS back (new feature)."""
380
  if not transcriber:
381
  wa_client.send_text_message(mobile, "Sorry, I cannot process audio messages at the moment.")
382
  return
@@ -396,47 +372,39 @@ def process_audio_message(audio_id: str, mobile: str) -> None:
396
 
397
  try:
398
  transcript = transcriber.transcribe(downloaded_path)
399
-
400
  if transcript.status == aai.TranscriptStatus.error:
401
  wa_client.send_text_message(mobile, f"Sorry, I couldn't transcribe your audio. Error: {transcript.error}")
402
- return
403
-
404
- if not transcript.text:
405
- wa_client.send_text_message(mobile, "Sorry, I couldn't understand the audio.")
406
- return
407
-
408
- # 1) Send normal text response as before (but we also need the text for TTS)
409
- response_text = process_text_message(transcript.text, mobile, input_mode="speech")
410
-
411
- # 2) If we got a textual response, produce TTS and send it as WhatsApp audio
412
- # (We skip if response is None e.g. interactive flow or image report)
413
- if response_text:
414
- mp3_path = _deepgram_tts_to_mp3(response_text)
415
- if mp3_path:
416
- try:
417
- wa_client.send_audio_message(recipient_id=mobile, audio_path=mp3_path)
418
- except Exception as e:
419
- logger.error(f"Failed to send TTS audio: {e}", exc_info=True)
420
- finally:
421
- try:
 
 
 
422
  os.remove(mp3_path)
423
- except Exception:
424
- pass
425
 
426
  finally:
427
  if os.path.exists(downloaded_path):
428
- try:
429
- os.remove(downloaded_path)
430
- except Exception:
431
- pass
432
 
433
- def process_image_message(image_id: str, caption: Optional[str], mobile: str) -> None:
434
- """
435
- Processes incoming image messages.
436
- NOTE:
437
- - Product photos (like dresses) with no caption should auto-record as sale (vision prompt already handles this).
438
- - Receipts should not become sales unless they truly indicate sales lines (vision prompt already handles this).
439
- """
440
  logger.info(f"Processing image message (ID: {image_id}) from {mobile} with caption: '{caption}'")
441
  wa_client.send_text_message(mobile, "Analyzing your image, please wait a moment...")
442
 
@@ -457,7 +425,7 @@ def process_image_message(image_id: str, caption: Optional[str], mobile: str) ->
457
  try:
458
  with open(downloaded_path, "rb") as f:
459
  image_bytes = f.read()
460
-
461
  generated_query = process_image_and_generate_query(image_bytes, caption)
462
 
463
  if not generated_query or "Error:" in generated_query:
@@ -465,7 +433,8 @@ def process_image_message(image_id: str, caption: Optional[str], mobile: str) ->
465
  wa_client.send_text_message(mobile, f"I had some trouble understanding that image. {generated_query}")
466
  else:
467
  logger.info(f"Vision processing successful. Generated query: '{generated_query}'")
468
- process_text_message(generated_query, mobile, input_mode="text")
 
469
 
470
  except Exception as e:
471
  logger.error(f"Error during vision processing of image {downloaded_path}: {e}", exc_info=True)
@@ -487,10 +456,10 @@ def webhook_handler():
487
  challenge = request.args.get("hub.challenge")
488
  if mode == "subscribe" and token == VERIFY_TOKEN:
489
  return make_response(challenge, 200)
490
- return make_response("Verification failed", 403)
 
491
 
492
  if request.method == "POST":
493
- mobile = None
494
  try:
495
  data = request.get_json()
496
  if not data:
@@ -500,32 +469,29 @@ def webhook_handler():
500
  if not msg_details:
501
  return make_response("ok", 200)
502
 
503
- message_id = msg_details.get("id")
504
- mobile = msg_details.get("from")
505
- message_type = msg_details.get("type")
506
 
507
  if not message_id or not mobile:
508
  return make_response("ok - invalid message", 200)
509
-
510
  if check_and_mark_processed(message_id):
511
  return make_response("ok - duplicate", 200)
512
-
513
- if not is_user_approved(mobile):
514
- wa_client.send_text_message(
515
- mobile,
516
- "Access denied. Please sign up at https://smartqx.co.za or contact your administrator."
517
- )
518
  return make_response("ok", 200)
519
 
520
  if message_type == "text":
521
- process_text_message(msg_details.get("text"), mobile, input_mode="text")
522
  elif message_type == "audio":
523
- process_audio_message(msg_details.get("audio_id"), mobile)
524
  elif message_type == "image":
525
  image_id = msg_details.get("image_id")
526
  caption = msg_details.get("caption")
527
  if image_id:
528
- process_image_message(image_id, caption, mobile)
529
  else:
530
  logger.warning(f"Missing image ID for image message: {msg_details}")
531
  elif message_type == "interactive":
@@ -536,7 +502,7 @@ def webhook_handler():
536
 
537
  except Exception as e:
538
  logger.error(f"Unhandled exception: {e}", exc_info=True)
539
- if mobile:
540
  wa_client.send_text_message(mobile, "Sorry, an unexpected error occurred.")
541
  return make_response("ok", 200)
542
 
@@ -548,12 +514,12 @@ if __name__ == '__main__':
548
  logger.info(f"Starting Flask app. Debug mode: {debug_mode}, Port: {port}")
549
 
550
  if not db:
551
- logger.critical("Firestore client failed to initialize. Application cannot run.")
552
  else:
553
- if not debug_mode:
554
- from waitress import serve
555
- logger.info("Running in production mode using Waitress.")
556
- serve(app, host="0.0.0.0", port=port)
557
- else:
558
- logger.info("Running in debug mode using Flask development server.")
559
- app.run(debug=True, host="0.0.0.0", port=port)
 
 
1
  from flask import Flask, request, make_response, jsonify
2
  import os
3
  import logging
 
18
  import dns.resolver
19
  import socket
20
  from datetime import datetime, timedelta
21
+ from typing import Optional, Dict, Any, Tuple
22
  import json
23
  import sys
24
  import threading
 
32
  URL_IMGUR = "https://api.imgur.com/3/image"
33
  HEADERS_IMGUR = {"Authorization": f"Client-ID {IMGUR_CLIENT_ID}"} if IMGUR_CLIENT_ID else {}
34
 
35
+ # --- New Feature: TTS Configuration ---
36
+ DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY") # Ensure this is in .env
37
+ DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak?model=aura-asteria-en"
 
38
 
39
  # AssemblyAI Configuration
40
  try:
 
48
  logging.error(f"Failed to initialize AssemblyAI: {e}")
49
 
50
  import firebase_admin
51
+ from firebase_admin import credentials, firestore, storage
52
 
53
  def init_firestore_from_env(env_var: str = "FIREBASE"):
54
  """
 
77
 
78
  app = Flask(__name__)
79
 
80
+ # Logging Configuration
81
  logging.basicConfig(
82
  level=logging.INFO,
83
  format="%(asctime)s - %(levelname)s - %(module)s.%(funcName)s - %(message)s",
 
89
  # --- DNS Workaround ---
90
  nameserver1 = os.getenv('nameserver1', '8.8.8.8')
91
  nameserver2 = os.getenv('nameserver2', '8.8.4.4')
 
92
  def setup_dns():
93
  try:
94
  resolver = dns.resolver.Resolver()
95
  resolver.nameservers = [nameserver1, nameserver2]
96
  _orig_getaddrinfo = socket.getaddrinfo
 
97
  def new_getaddrinfo(*args, **kwargs):
98
  try:
99
  if args and isinstance(args[0], str) and 'graph.facebook.com' in args[0]:
 
102
  logger.info(f"DNS Override: Resolved {args[0]} to {ip} using {resolver.nameservers}")
103
  return _orig_getaddrinfo(ip, *args[1:], **kwargs)
104
  except dns.resolver.NoAnswer:
105
+ logger.warning(f"DNS Override: No A record found for {args[0]} using {resolver.nameservers}")
106
  except Exception as e:
107
  logger.error(f"DNS resolution override failed for {args}: {e}")
108
  return _orig_getaddrinfo(*args, **kwargs)
 
109
  socket.getaddrinfo = new_getaddrinfo
110
  logger.info(f"DNS resolver configured with nameservers: {resolver.nameservers}")
111
  except Exception as e:
112
  logger.error(f"Failed to setup custom DNS resolver: {e}")
 
113
  setup_dns()
114
  # --- End DNS Workaround ---
115
 
 
120
  PROCESSED_MESSAGES_TTL_HOURS = 24
121
 
122
  class MessageDeduplicator:
 
123
  def __init__(self, ttl_hours=24, max_cache_size=10000, db_client=None):
124
  self.ttl_seconds = ttl_hours * 3600
125
  self.max_cache_size = max_cache_size
 
128
  self.lock = threading.RLock()
129
  self._cleanup_thread = threading.Thread(target=self._periodic_cleanup, daemon=True)
130
  self._cleanup_thread.start()
131
+
132
  def is_duplicate(self, message_id):
133
+ if not message_id: return False
 
134
  with self.lock:
135
  if message_id in self.cache:
136
  self.cache.move_to_end(message_id)
137
  return True
 
138
  if self.db_client:
139
  try:
140
  doc_ref = self.db_client.collection("processed_messages").document(message_id)
141
  if doc_ref.get().exists:
142
  self.cache[message_id] = time()
143
+ if len(self.cache) > self.max_cache_size: self.cache.popitem(last=False)
 
144
  return True
145
  except Exception as e:
146
  logger.error(f"Database error in is_duplicate: {e}", exc_info=True)
 
147
  self._mark_processed(message_id)
148
  return False
149
+
150
  def _mark_processed(self, message_id):
151
  current_time = time()
152
  with self.lock:
153
  self.cache[message_id] = current_time
154
+ if len(self.cache) > self.max_cache_size: self.cache.popitem(last=False)
 
 
155
  if self.db_client:
156
  try:
157
+ expiry = datetime.now() + timedelta(hours=self.ttl_seconds/3600)
158
  doc_ref = self.db_client.collection("processed_messages").document(message_id)
159
  doc_ref.set({"processed_at": firestore.SERVER_TIMESTAMP, "expires_at": expiry})
160
  except Exception as e:
161
  logger.error(f"Database error in _mark_processed: {e}", exc_info=True)
162
+
163
  def _periodic_cleanup(self):
164
  while True:
165
  try:
166
  with self.lock:
167
  current_time = time()
168
+ expired_ids = [msg_id for msg_id, timestamp in list(self.cache.items()) if current_time - timestamp > self.ttl_seconds]
169
+ for msg_id in expired_ids: self.cache.pop(msg_id, None)
 
 
 
 
170
  threading.Event().wait(3600)
171
  except Exception as e:
172
  logger.error(f"Error in cleanup thread: {e}", exc_info=True)
 
175
  message_deduplicator = MessageDeduplicator(ttl_hours=PROCESSED_MESSAGES_TTL_HOURS, db_client=db)
176
 
177
  def check_and_mark_processed(message_id: str) -> bool:
 
178
  if not message_id:
179
  logger.warning("Empty message ID provided to check_and_mark_processed")
180
  return False
181
  return message_deduplicator.is_duplicate(message_id)
182
 
183
+ def is_user_approved(mobile: str) -> Tuple[bool, Optional[Dict]]:
184
+ """Checks approval AND returns user data (for currency preferences)."""
185
  if not db:
186
  logger.error("Authorization check failed: Firestore client is not available.")
187
+ return False, None
188
  try:
189
  normalized_mobile = mobile if mobile.startswith('+') else f'+{mobile}'
190
+ logger.info(f"AUTHORIZATION: Checking approval for mobile: '{mobile}'")
191
  user_ref = db.collection("users").document(normalized_mobile)
192
  user_doc = user_ref.get()
193
 
194
  if user_doc.exists:
195
  user_data = user_doc.to_dict()
196
+ status = user_data.get("status", "N/A").lower()
197
+ if status == "approved":
198
+ return True, user_data
199
+ return False, None
200
  except Exception as e:
201
  logger.error(f"AUTHORIZATION: An exception occurred for {mobile}: {e}", exc_info=True)
202
+ return False, None
203
 
204
  def send_confirmation_buttons(mobile: str, message_summary: str) -> None:
 
205
  buttons = [
206
  {"reply": {"id": "confirm_transaction", "title": "✅ Confirm"}},
207
  {"reply": {"id": "cancel_transaction", "title": "❌ Cancel"}}
 
210
  wa_client.send_reply_buttons(recipient_id=mobile, body_text=body, button_data=buttons)
211
 
212
  def handle_interactive_response(mobile: str, button_id: str) -> None:
 
213
  if not db:
214
  wa_client.send_text_message(mobile, "Sorry, cannot process confirmation due to a database issue.")
215
  return
 
223
 
224
  pending_transactions = transaction_doc.to_dict().get("transactions", [])
225
  if not pending_transactions:
226
+ wa_client.send_text_message(mobile, "There seems to be an issue with the pending transaction data.")
227
+ doc_ref.delete()
228
+ return
229
 
230
  if button_id == "confirm_transaction":
231
  msg = process_intent(pending_transactions, mobile)
 
235
  doc_ref.delete()
236
  else:
237
  msg = "Sorry, I didn't understand that button press."
 
238
  wa_client.send_text_message(mobile, msg)
239
  except Exception as e:
240
  logger.error(f"Error handling interactive response for user {mobile}: {e}", exc_info=True)
241
  wa_client.send_text_message(mobile, "Sorry, something went wrong while handling your confirmation.")
242
 
243
+ def process_text_message(message_text: str, mobile: str, user_settings: Optional[Dict] = None) -> Optional[str]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
244
  """
245
  Processes incoming text messages.
246
+ Returns the response text string (for TTS pipeline), or None if no direct text response (e.g. image/button).
 
247
  """
248
+ logger.info(f"Processing text message from {mobile}: '{message_text}'")
249
+
250
  if GREETING_PATTERN.match(message_text):
251
  msg = "Hi there! I'm Qx-SmartLedger, your business assistant. How can I help?"
252
  wa_client.send_text_message(mobile, msg)
253
+ return msg
254
+
255
+ # --- FIX BUG 4: Get User Currency ---
256
+ user_currency = "R"
257
+ if user_settings:
258
+ # Assuming currency might be in 'settings' dict or top level. Check both.
259
+ user_currency = user_settings.get('currency') or user_settings.get('settings', {}).get('currency', "R")
260
+
261
+ # Inject currency into LLM Prompt
262
+ llm_response_str = generateResponse(message_text, currency=user_currency)
263
  parsed_trans_data = parse_multiple_transactions(llm_response_str)
264
+
265
  if not parsed_trans_data:
266
  msg = "Sorry, I couldn't quite understand that. Could you please rephrase?"
267
  wa_client.send_text_message(mobile, msg)
268
+ return msg
269
 
270
+ primary_intent = parsed_trans_data[0].get('intent', '').lower()
271
+ primary_type = parsed_trans_data[0].get('transaction_type', '').lower()
272
 
 
273
  if primary_intent == 'read' or primary_type == 'query':
274
  response_data = process_intent(parsed_trans_data, mobile)
275
+
276
+ # Handle Image Response
277
  if isinstance(response_data, str) and os.path.isfile(response_data) and HEADERS_IMGUR:
278
  try:
279
  with open(response_data, "rb") as f:
 
282
  imgur_data = response_imgur.json()
283
  if imgur_data.get("success"):
284
  wa_client.send_image_message(recipient_id=mobile, image_url=imgur_data["data"]["link"])
285
+ os.remove(response_data)
286
+ return None # No text response to TTS
287
  else:
288
+ msg = f"Couldn't generate the image report, but here's the data:\n{response_data}"
289
+ wa_client.send_text_message(mobile, msg)
290
+ os.remove(response_data)
291
+ return msg
292
  except Exception as e:
293
  logger.error(f"Image report processing/upload failed: {e}", exc_info=True)
294
+ msg = f"Error generating image report. Here's the text data:\n{response_data}"
295
+ wa_client.send_text_message(mobile, msg)
296
+ return msg
 
 
 
 
297
  else:
298
  wa_client.send_text_message(mobile, str(response_data))
299
+ return str(response_data)
300
 
301
+ elif primary_intent in ['create', 'update', 'delete']:
 
302
  if persist_temporary_transaction(parsed_trans_data, mobile):
303
  transaction_summary = format_transaction_response(parsed_trans_data)
304
  send_confirmation_buttons(mobile, transaction_summary)
305
+ return None # Interactive buttons, no TTS
306
  else:
307
  msg = "Sorry, I couldn't save your transaction for confirmation."
308
  wa_client.send_text_message(mobile, msg)
309
+ return msg
310
+ else:
311
+ msg = f"I'm not sure how to handle the action '{primary_intent}'."
312
+ wa_client.send_text_message(mobile, msg)
313
+ return msg
314
+
315
+ # --- NEW FEATURE: Audio Helpers ---
316
 
317
+ def _deepgram_tts_to_mp3(text: str) -> Optional[str]:
318
+ """Generates MP3 from text using DeepGram."""
319
+ if not DEEPGRAM_API_KEY:
320
+ logger.warning("DEEPGRAM_API_KEY not found. Skipping TTS.")
321
+ return None
322
+ try:
323
+ headers = {
324
+ "Authorization": f"Token {DEEPGRAM_API_KEY}",
325
+ "Content-Type": "application/json"
326
+ }
327
+ payload = {"text": text}
328
+ response = requests.post(DEEPGRAM_TTS_URL, headers=headers, json=payload)
329
+ response.raise_for_status()
330
+
331
+ filename = f"tts_{uuid.uuid4()}.mp3"
332
+ filepath = os.path.join(os.getcwd(), filename)
333
+ with open(filepath, "wb") as f:
334
+ f.write(response.content)
335
+ return filepath
336
+ except Exception as e:
337
+ logger.error(f"DeepGram TTS failed: {e}", exc_info=True)
338
+ return None
339
+
340
+ def _upload_to_firebase_storage(file_path: str) -> Optional[str]:
341
+ """Uploads file to Firebase Storage and returns signed URL. Tries default bucket."""
342
+ try:
343
+ bucket = storage.bucket() # Uses default bucket from init
344
+ blob = bucket.blob(f"audio_responses/{os.path.basename(file_path)}")
345
+ blob.upload_from_filename(file_path)
346
+
347
+ # Generate a signed URL valid for 1 hour
348
+ url = blob.generate_signed_url(expiration=timedelta(hours=1))
349
+ return url
350
+ except Exception as e:
351
+ logger.error(f"Firebase Storage upload failed: {e}. Ensure storage bucket is configured.", exc_info=True)
352
+ return None
353
 
354
+ def process_audio_message(audio_id: str, mobile: str, user_settings: Optional[Dict]) -> None:
355
+ """Processes incoming audio messages: Transcribe -> Process -> TTS -> Send Audio."""
356
  if not transcriber:
357
  wa_client.send_text_message(mobile, "Sorry, I cannot process audio messages at the moment.")
358
  return
 
372
 
373
  try:
374
  transcript = transcriber.transcribe(downloaded_path)
 
375
  if transcript.status == aai.TranscriptStatus.error:
376
  wa_client.send_text_message(mobile, f"Sorry, I couldn't transcribe your audio. Error: {transcript.error}")
377
+ elif not transcript.text:
378
+ wa_client.send_text_message(mobile, "Sorry, I couldn't understand the audio.")
379
+ else:
380
+ # 1. Process as text and get the response string
381
+ text_response = process_text_message(transcript.text, mobile, user_settings)
382
+
383
+ # 2. TTS Pipeline
384
+ if text_response:
385
+ mp3_path = _deepgram_tts_to_mp3(text_response)
386
+ if mp3_path:
387
+ # Try Firebase first (as requested)
388
+ audio_url = _upload_to_firebase_storage(mp3_path)
389
+
390
+ if audio_url:
391
+ # Send via URL
392
+ wa_client.send_audio_message(mobile, audio_url=audio_url)
393
+ else:
394
+ # Fallback: Upload directly to WhatsApp
395
+ logger.info("Falling back to direct WhatsApp upload for TTS audio.")
396
+ wa_client.send_audio_message(mobile, audio_path=mp3_path)
397
+
398
+ # Cleanup TTS file
399
+ if os.path.exists(mp3_path):
400
  os.remove(mp3_path)
 
 
401
 
402
  finally:
403
  if os.path.exists(downloaded_path):
404
+ os.remove(downloaded_path)
 
 
 
405
 
406
+ def process_image_message(image_id: str, caption: Optional[str], mobile: str, user_settings: Optional[Dict]) -> None:
407
+ """Processes incoming image messages."""
 
 
 
 
 
408
  logger.info(f"Processing image message (ID: {image_id}) from {mobile} with caption: '{caption}'")
409
  wa_client.send_text_message(mobile, "Analyzing your image, please wait a moment...")
410
 
 
425
  try:
426
  with open(downloaded_path, "rb") as f:
427
  image_bytes = f.read()
428
+
429
  generated_query = process_image_and_generate_query(image_bytes, caption)
430
 
431
  if not generated_query or "Error:" in generated_query:
 
433
  wa_client.send_text_message(mobile, f"I had some trouble understanding that image. {generated_query}")
434
  else:
435
  logger.info(f"Vision processing successful. Generated query: '{generated_query}'")
436
+ # Process the generated query like a normal text message
437
+ process_text_message(generated_query, mobile, user_settings)
438
 
439
  except Exception as e:
440
  logger.error(f"Error during vision processing of image {downloaded_path}: {e}", exc_info=True)
 
456
  challenge = request.args.get("hub.challenge")
457
  if mode == "subscribe" and token == VERIFY_TOKEN:
458
  return make_response(challenge, 200)
459
+ else:
460
+ return make_response("Verification failed", 403)
461
 
462
  if request.method == "POST":
 
463
  try:
464
  data = request.get_json()
465
  if not data:
 
469
  if not msg_details:
470
  return make_response("ok", 200)
471
 
472
+ message_id, mobile, message_type = msg_details.get("id"), msg_details.get("from"), msg_details.get("type")
 
 
473
 
474
  if not message_id or not mobile:
475
  return make_response("ok - invalid message", 200)
 
476
  if check_and_mark_processed(message_id):
477
  return make_response("ok - duplicate", 200)
478
+
479
+ # --- AUTH & USER DATA FETCH ---
480
+ is_approved, user_data = is_user_approved(mobile)
481
+
482
+ if not is_approved:
483
+ wa_client.send_text_message(mobile, "Access denied. Please sign up at https://smartqx.co.za or contact your administrator.")
484
  return make_response("ok", 200)
485
 
486
  if message_type == "text":
487
+ process_text_message(msg_details.get("text"), mobile, user_data)
488
  elif message_type == "audio":
489
+ process_audio_message(msg_details.get("audio_id"), mobile, user_data)
490
  elif message_type == "image":
491
  image_id = msg_details.get("image_id")
492
  caption = msg_details.get("caption")
493
  if image_id:
494
+ process_image_message(image_id, caption, mobile, user_data)
495
  else:
496
  logger.warning(f"Missing image ID for image message: {msg_details}")
497
  elif message_type == "interactive":
 
502
 
503
  except Exception as e:
504
  logger.error(f"Unhandled exception: {e}", exc_info=True)
505
+ if 'mobile' in locals() and mobile:
506
  wa_client.send_text_message(mobile, "Sorry, an unexpected error occurred.")
507
  return make_response("ok", 200)
508
 
 
514
  logger.info(f"Starting Flask app. Debug mode: {debug_mode}, Port: {port}")
515
 
516
  if not db:
517
+ logger.critical("Firestore client failed to initialize. Application cannot run.")
518
  else:
519
+ if not debug_mode:
520
+ from waitress import serve
521
+ logger.info("Running in production mode using Waitress.")
522
+ serve(app, host="0.0.0.0", port=port)
523
+ else:
524
+ logger.info("Running in debug mode using Flask development server.")
525
+ app.run(debug=True, host="0.0.0.0", port=port)