Spaces:
Sleeping
Sleeping
File size: 55,322 Bytes
1dc84d4 3b7bd59 1dc84d4 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 ccbfc8e 4393c22 3b7bd59 4393c22 1dc84d4 9337b76 4393c22 9337b76 ffd6cda c15c644 4393c22 3b7bd59 1dc84d4 3b7bd59 9337b76 1dc84d4 9337b76 4393c22 3d25258 1dc84d4 4393c22 9337b76 4393c22 3d25258 9337b76 1dc84d4 4393c22 9337b76 1dc84d4 4393c22 9337b76 1dc84d4 3b7bd59 ffd6cda 1dc84d4 9337b76 1dc84d4 9337b76 1dc84d4 9337b76 4393c22 3d25258 4393c22 6ef20f2 1dc84d4 6ef20f2 4393c22 1dc84d4 ccbfc8e 3d25258 c83774d 3d25258 4393c22 ccbfc8e 3d25258 4393c22 25e63e1 3b7bd59 c83774d 4393c22 1dc84d4 4393c22 3d25258 4393c22 1dc84d4 c83774d 9643373 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 3d25258 1dc84d4 4393c22 3b7bd59 4393c22 3b7bd59 ffd6cda 4393c22 3b7bd59 4393c22 25e63e1 4393c22 9e7ded1 4393c22 9e7ded1 4393c22 9e7ded1 1dc84d4 4393c22 1dc84d4 4393c22 3b7bd59 4393c22 7e9782d 3b7bd59 4393c22 7e9782d 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 540c3fc 3b7bd59 540c3fc 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 725c672 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 c83774d 3b7bd59 c83774d 4393c22 c601c21 4393c22 725c672 1c5a346 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 9f8e288 4393c22 3b7bd59 9f8e288 4393c22 9f8e288 4393c22 c15c644 4393c22 3b7bd59 c15c644 4393c22 c15c644 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 c15c644 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 c83774d 4393c22 3b7bd59 4393c22 c83774d 4393c22 7e9782d 4393c22 7e9782d 4393c22 7e9782d 4393c22 7e9782d 4393c22 7e9782d 4393c22 7e9782d 4393c22 7e9782d 4393c22 7e9782d 3b7bd59 7e9782d 3b7bd59 7e9782d 4393c22 7e9782d 4393c22 7e9782d c83774d 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 725c672 4393c22 3b7bd59 4393c22 725c672 6ce11bc 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 4393c22 3b7bd59 c83774d 4393c22 3b7bd59 4393c22 9f8e288 3b7bd59 ffd6cda 4393c22 3b7bd59 4393c22 9643373 3b7bd59 4393c22 3d25258 4393c22 3d25258 4393c22 3b7bd59 4393c22 3d25258 4393c22 18fb538 ffd6cda ccbfc8e 3b7bd59 ccbfc8e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 | """
main.py — Iris AI Service (v1.1 - April 2026)
AI layer for the Iris Support Portal (IrisPlus / Unified Spark Desk).
Deployed as a HuggingFace Space monofile (Flask + Gemini + AssemblyAI + Firebase).
CHANGELOG v1.1:
- Model: gemini-3.1-flash-lite-preview (multimodal reasoning)
- /api/kb/whatsapp-import: now accepts multipart ZIP upload
* Extracts _chat.txt + maps image files to <Media omitted> pointers
* Sliding-window chunking (~10k tokens / ~40k chars with overlap)
* Multimodal: sends images inline with their surrounding text chunk
* Strict JSON enforcement + pre-save validation
* JSON parse error recovery (regex extraction fallback)
- All other endpoints unchanged from v1.0
FEATURES:
1. WhatsApp Export → Knowledge Base (ZIP multimodal, chunked, additive)
2. Bulk KB Upload (CSV / Excel / PDF)
3. Natural Language + Voice Ticket Submission
4. System Tutorial Ingestion (timestamped transcripts)
5. Agent NL/Voice Solution Writing
6. Iris Chatbot (KB RAG)
ENV VARS:
GOOGLE_API_KEY — Gemini API key
ASSEMBLYAI_API_KEY — AssemblyAI API key
FIREBASE — JSON string of Firebase service account
GEMINI_MODEL — Override model (default: gemini-3.1-flash-lite-preview)
PORT — Server port (default 7860)
"""
import os
import io
import re
import json
import time
import logging
import base64
import hashlib
import zipfile
import tempfile
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
import requests
from flask import Flask, request, jsonify
from flask_cors import CORS
# ─── Logging ──────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger("iris-ai-service")
# ─── Gemini SDK ───────────────────────────────────────────────────────────────
try:
from google import genai
from google.genai import types as genai_types
except Exception as e:
genai = None
logger.error("google-genai not installed: %s", e)
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "")
# v1.1: upgraded to gemini-3.1-flash-lite-preview for multimodal reasoning
GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-3.1-flash-lite-preview")
_gemini_client = None
if genai and GOOGLE_API_KEY:
try:
_gemini_client = genai.Client(api_key=GOOGLE_API_KEY)
logger.info("Gemini client ready (model=%s).", GEMINI_MODEL)
except Exception as e:
logger.error("Failed to init Gemini client: %s", e)
# ─── AssemblyAI ───────────────────────────────────────────────────────────────
ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY", "")
ASSEMBLYAI_BASE = "https://api.assemblyai.com/v2"
# ─── Firebase ─────────────────────────────────────────────────────────────────
try:
import firebase_admin
from firebase_admin import credentials, firestore
FIREBASE_AVAILABLE = True
except ImportError:
FIREBASE_AVAILABLE = False
logger.warning("firebase-admin not installed. Persistence disabled.")
FIREBASE_ENV = os.environ.get("FIREBASE", "")
def init_firestore() -> Optional[Any]:
if not FIREBASE_AVAILABLE:
return None
if firebase_admin._apps:
return firestore.client()
if not FIREBASE_ENV:
logger.warning("FIREBASE env var missing. Persistence disabled.")
return None
try:
sa_info = json.loads(FIREBASE_ENV)
cred = credentials.Certificate(sa_info)
firebase_admin.initialize_app(cred)
logger.info("Firebase initialized.")
return firestore.client()
except Exception as e:
logger.critical("Firebase init failed: %s", e)
return None
db = init_firestore()
# ─── Optional libs ────────────────────────────────────────────────────────────
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
PANDAS_AVAILABLE = False
try:
import pypdf
PYPDF_AVAILABLE = True
except ImportError:
PYPDF_AVAILABLE = False
# ─── Flask App ────────────────────────────────────────────────────────────────
app = Flask(__name__)
CORS(app)
# ══════════════════════════════════════════════════════════════════════════════
# SHARED HELPERS
# ══════════════════════════════════════════════════════════════════════════════
# Supported image extensions for multimodal WhatsApp ingestion
SUPPORTED_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
# Approx chars per token (conservative for mixed Shona/English/emoji content)
CHARS_PER_TOKEN = 4
# Target ~10k tokens per chunk with ~1k token overlap
CHUNK_CHARS = 40_000
OVERLAP_CHARS = 4_000
def _safe_json(text: str, fallback: Any) -> Any:
"""
Multi-strategy JSON parser.
1. Direct parse after stripping markdown fences.
2. Regex extraction of first [...] or {...} block.
3. Return fallback.
"""
if not text:
return fallback
# Strategy 1: strip fences
clean = text.strip()
for fence in ("```json", "```JSON", "```"):
if fence in clean:
parts = clean.split(fence)
# take the content between the first pair of fences
if len(parts) >= 3:
clean = parts[1].strip()
elif len(parts) == 2:
clean = parts[1].split("```")[0].strip()
break
try:
return json.loads(clean)
except json.JSONDecodeError:
pass
# Strategy 2: regex — find outermost [...] array
arr_match = re.search(r'\[[\s\S]*\]', clean)
if arr_match:
try:
return json.loads(arr_match.group())
except json.JSONDecodeError:
pass
# Strategy 3: regex — find outermost {...} object
obj_match = re.search(r'\{[\s\S]*\}', clean)
if obj_match:
try:
return json.loads(obj_match.group())
except json.JSONDecodeError:
pass
logger.error("JSON parse exhausted all strategies. First 300 chars: %s", text[:300])
return fallback
def _validate_articles(data: Any) -> List[Dict]:
"""
Validate that extracted articles are a list of dicts with required fields.
Filters out malformed items rather than failing the whole batch.
"""
if not isinstance(data, list):
logger.warning("Expected list from Gemini, got %s", type(data))
return []
valid = []
for item in data:
if not isinstance(item, dict):
continue
title = str(item.get("title", "")).strip()
content = str(item.get("content", "")).strip()
if len(title) < 3 or len(content) < 10:
continue
valid.append({
"title": title,
"content": content,
"category": str(item.get("category", "General")).strip() or "General",
"tags": item.get("tags", []) if isinstance(item.get("tags"), list) else [],
})
return valid
def _gemini_text(prompt: str, json_mode: bool = False) -> str:
"""Call Gemini with text-only content."""
if not _gemini_client:
return ""
cfg = genai_types.GenerateContentConfig(
response_mime_type="application/json"
) if json_mode else None
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=prompt,
config=cfg
)
return resp.text or ""
except Exception as e:
logger.error("Gemini text call error: %s", e)
return ""
def _gemini_multimodal(parts: list, json_mode: bool = False) -> str:
"""Call Gemini with a mixed list of text strings and image Parts."""
if not _gemini_client:
return ""
cfg = genai_types.GenerateContentConfig(
response_mime_type="application/json"
) if json_mode else None
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=parts,
config=cfg
)
return resp.text or ""
except Exception as e:
logger.error("Gemini multimodal call error: %s", e)
return ""
def _article_fingerprint(title: str, content: str) -> str:
raw = f"{title.strip().lower()}::{content.strip().lower()[:300]}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
def _get_existing_fingerprints() -> set:
if not db:
return set()
try:
docs = db.collection("iris_kb_articles").select(["fingerprint"]).stream()
return {d.to_dict().get("fingerprint") for d in docs if d.to_dict().get("fingerprint")}
except Exception as e:
logger.error("Fingerprint fetch error: %s", e)
return set()
def _save_kb_articles(articles: List[Dict], source_label: str) -> Dict:
if not db:
return {"saved": 0, "skipped": 0, "error": "Firebase unavailable"}
existing = _get_existing_fingerprints()
saved, skipped = 0, 0
for article in articles:
title = article.get("title", "Untitled")
content = article.get("content", "")
fp = _article_fingerprint(title, content)
if fp in existing:
skipped += 1
continue
doc = {
"title": title,
"content": content,
"category": article.get("category", "General"),
"tags": article.get("tags", []),
"source": source_label,
"fingerprint": fp,
"created_at": datetime.now(timezone.utc).isoformat(),
}
if article.get("timestamp_start") is not None:
doc["timestamp_start"] = article["timestamp_start"]
doc["timestamp_end"] = article.get("timestamp_end")
doc["video_url"] = article.get("video_url", "")
db.collection("iris_kb_articles").add(doc)
existing.add(fp)
saved += 1
return {"saved": saved, "skipped": skipped}
# ══════════════════════════════════════════════════════════════════════════════
# WHATSAPP ZIP PROCESSOR
# ══════════════════════════════════════════════════════════════════════════════
# Regex to match WhatsApp timestamp lines
# Handles both: DD/MM/YYYY, HH:MM - Sender: message
# and: DD/MM/YYYY, HH:MM am/pm - Sender: message
WA_LINE_RE = re.compile(
r'^\d{1,2}/\d{1,2}/\d{4},\s+\d{1,2}:\d{2}(?:\s*[ap]m)?\s+-\s+',
re.IGNORECASE
)
# Matches <Media omitted> or [filename.jpg] style media pointers
MEDIA_POINTER_RE = re.compile(
r'<Media omitted>|\[?([^\]]+\.(?:jpg|jpeg|png|webp|gif|mp4|opus|aac|m4a))\]?',
re.IGNORECASE
)
class WhatsAppZipProcessor:
"""
Handles extraction and multimodal chunking of a WhatsApp .zip export.
A WhatsApp export zip typically contains:
_chat.txt — the full conversation
IMG-YYYYMMDD-*.jpg — attached images
VID-*.mp4 — videos (we skip these, too large)
PTT-*.opus — voice notes (skipped)
"""
def __init__(self, zip_bytes: bytes):
self.zip_bytes = zip_bytes
self.chat_text = ""
self.media_map: Dict[str, bytes] = {} # filename -> raw bytes
def extract(self) -> bool:
"""Extract chat text and image files from ZIP. Returns True on success."""
try:
with zipfile.ZipFile(io.BytesIO(self.zip_bytes)) as zf:
names = zf.namelist()
logger.info("ZIP contains %d files: %s", len(names), names[:20])
# Find chat file — WhatsApp names it _chat.txt or WhatsApp Chat with *.txt
chat_file = None
for name in names:
base = os.path.basename(name).lower()
if base == "_chat.txt" or (base.endswith(".txt") and "chat" in base):
chat_file = name
break
if not chat_file:
# Fallback: any .txt file
txts = [n for n in names if n.lower().endswith(".txt")]
if txts:
chat_file = txts[0]
if not chat_file:
logger.error("No chat .txt found in ZIP")
return False
raw = zf.read(chat_file)
self.chat_text = raw.decode("utf-8", errors="replace")
logger.info("Chat text extracted: %d chars from %s", len(self.chat_text), chat_file)
# Extract images (skip videos and audio — too large / not useful for KB)
for name in names:
ext = os.path.splitext(name.lower())[1]
if ext in SUPPORTED_IMAGE_EXTS:
try:
self.media_map[os.path.basename(name)] = zf.read(name)
except Exception as e:
logger.warning("Could not read media file %s: %s", name, e)
logger.info("Media files extracted: %d images", len(self.media_map))
return True
except zipfile.BadZipFile as e:
logger.error("Bad ZIP file: %s", e)
return False
except Exception as e:
logger.error("ZIP extraction error: %s", e)
return False
def _resolve_media_in_line(self, line: str) -> Optional[bytes]:
"""
Given a chat line, check if it references a media file we have.
Returns the image bytes if found, else None.
"""
match = MEDIA_POINTER_RE.search(line)
if not match:
return None
filename = match.group(1) # group 1 = explicit filename, None for <Media omitted>
if filename:
fname = os.path.basename(filename)
if fname in self.media_map:
return self.media_map[fname]
# <Media omitted> — we can't recover the file since it wasn't exported
return None
def build_chunks(self) -> List[Dict]:
"""
Split chat text into overlapping chunks, each annotated with
the image bytes found within that chunk.
Returns list of:
{ "text": str, "images": [bytes, ...], "line_range": (start, end) }
"""
lines = self.chat_text.splitlines()
chunks = []
i = 0
total = len(lines)
char_count = 0
chunk_lines: List[str] = []
chunk_images: List[bytes] = []
while i < total:
line = lines[i]
chunk_lines.append(line)
char_count += len(line) + 1 # +1 for newline
# Check if this line has an image we can include
img_bytes = self._resolve_media_in_line(line)
if img_bytes and len(chunk_images) < 5: # cap images per chunk
chunk_images.append(img_bytes)
if char_count >= CHUNK_CHARS or i == total - 1:
chunks.append({
"text": "\n".join(chunk_lines),
"images": chunk_images[:],
"line_range": (i - len(chunk_lines) + 1, i)
})
logger.info(
"Chunk %d: %d lines, %d chars, %d images",
len(chunks), len(chunk_lines), char_count, len(chunk_images)
)
# Overlap: keep last OVERLAP_CHARS worth of lines for next chunk
overlap_text = 0
overlap_start = len(chunk_lines) - 1
while overlap_start > 0 and overlap_text < OVERLAP_CHARS:
overlap_text += len(chunk_lines[overlap_start]) + 1
overlap_start -= 1
chunk_lines = chunk_lines[overlap_start:]
chunk_images = []
char_count = sum(len(l) + 1 for l in chunk_lines)
i += 1
logger.info("Total chunks: %d", len(chunks))
return chunks
# ══════════════════════════════════════════════════════════════════════════════
# WHATSAPP EXTRACTION PROMPT
# ══════════════════════════════════════════════════════════════════════════════
WHATSAPP_EXTRACTION_PROMPT = """You are a support knowledge base curator for the Iris platform, deployed across Zimbabwe.
Your task: analyse this WhatsApp support group chat segment and extract ONLY clear problem→solution pairs.
CONTEXT ABOUT THIS PLATFORM:
- "Iris" is an integrated POS (Point of Sale) and fiscalisation system with a mobile attendance and
location-tracking module used by field sales reps and in-store tellers at retail stores.
- The POS and fiscalisation layer handles sales transactions, receipt generation, and ZIMRA fiscal
compliance. The mobile module handles teller clock-in/out, GPS location verification, and hours tracking.
- Common POS/fiscal issues: fiscalisation failures, receipt errors, device not syncing to ZIMRA servers,
Elixir (fiscal device software) login/password problems.
- Common mobile attendance issues: GPS location not detected, clock-in failures, app killed by Android
battery optimiser, teller passkey problems, hours recording incorrectly, store radius too small,
wrong teller name shown after login, app not running in the background.
- Messages mix English, Shona, and Ndebele. Understand regional vernacular (e.g. "irikudzima" = switching
off, "ndakashanda" = I worked, "short yemahours" = hours shortage, "gadzirisayi" = fix it, "hupfu" = flour,
"yakuda kulogwa patsva" = needs to be logged in fresh).
- If screenshots show Android error dialogs (e.g. "Service killed by system", "App stopped", "Abrupt stop"),
reason through what that means for Android background restriction and background service killing, and include
that diagnosis and fix in the solution content.
- If screenshots show fiscal device or POS screens, extract the error code or state shown and reason through
the likely cause from the Elixir/ZIMRA integration context.
STRICT RULES:
1. Extract ONLY exchanges where a user described a problem AND a named support person (Tendayi, Tony, Violet,
Rufaro, Albrighton, Ishmael, or any named responder) provided a working solution or clear instruction.
2. Ignore: greetings, media-only messages, deleted messages, clock-in screenshots with no text context,
messages from unknown numbers with no solution attached.
3. Each article must be self-contained and usable by a support agent in future.
4. Translate all Shona/Ndebele problem descriptions to English in the article content.
5. If a screenshot appears to show an Android error or GPS issue, reason through the likely cause and
include that reasoning in the solution content.
OUTPUT FORMAT: Return ONLY a valid JSON array. No preamble, no explanation, no markdown fences.
Every string value MUST be properly JSON-escaped. Do not use unescaped newlines, tabs, or quotes inside strings.
Use \\n for line breaks within content strings.
Schema per item:
{"title": "string (max 80 chars)", "content": "string (escaped, solution steps)", "category": "one of: Account|Technical|Location|Attendance|Device|Other", "tags": ["array", "of", "strings"]}
If no valid problem→solution pairs exist in this segment, return an empty array: []
Chat segment:
"""
def _process_chunk_with_gemini(chunk: Dict) -> List[Dict]:
"""
Send a single chunk (text + optional images) to Gemini.
Returns validated list of article dicts.
"""
text_part = WHATSAPP_EXTRACTION_PROMPT + chunk["text"]
images = chunk.get("images", [])
if images and _gemini_client:
# Build multimodal content list
parts = [text_part]
for img_bytes in images:
# Detect mime type from magic bytes
mime = "image/jpeg"
if img_bytes[:4] == b'\x89PNG':
mime = "image/png"
elif img_bytes[:4] == b'RIFF':
mime = "image/webp"
parts.append(
genai_types.Part.from_bytes(data=img_bytes, mime_type=mime)
)
raw = _gemini_multimodal(parts, json_mode=True)
else:
raw = _gemini_text(text_part, json_mode=True)
if not raw:
logger.warning("Empty Gemini response for chunk")
return []
parsed = _safe_json(raw, [])
return _validate_articles(parsed)
# ══════════════════════════════════════════════════════════════════════════════
# FEATURE 1 — WhatsApp Export → Knowledge Base (v1.1: ZIP multimodal + chunked)
# ══════════════════════════════════════════════════════════════════════════════
@app.post("/api/kb/whatsapp-import")
def whatsapp_import():
"""
Accepts EITHER:
(a) multipart file upload with field "file" containing a .zip WhatsApp export, OR
(b) JSON body { "chat_text": "..." } for plain text (legacy support)
Processes in sliding-window chunks, sends images to Gemini multimodally.
Saves new articles only (additive, dedup by fingerprint).
"""
all_articles: List[Dict] = []
source_label = "whatsapp_export"
# ── Branch A: ZIP upload ──────────────────────────────────────────────────
if "file" in request.files:
f = request.files["file"]
filename = f.filename or ""
if not filename.lower().endswith(".zip"):
return jsonify({"ok": False, "error": "Expected a .zip WhatsApp export file"}), 400
zip_bytes = f.read()
logger.info("WhatsApp ZIP upload: %d bytes, filename=%s", len(zip_bytes), filename)
processor = WhatsAppZipProcessor(zip_bytes)
if not processor.extract():
return jsonify({"ok": False, "error": "Could not extract chat from ZIP. Ensure it is a valid WhatsApp export."}), 400
if len(processor.chat_text) < 100:
return jsonify({"ok": False, "error": "Extracted chat text too short to process"}), 400
chunks = processor.build_chunks()
source_label = f"whatsapp_zip:{filename}"
for idx, chunk in enumerate(chunks):
logger.info("Processing chunk %d/%d", idx + 1, len(chunks))
articles = _process_chunk_with_gemini(chunk)
all_articles.extend(articles)
logger.info("Chunk %d yielded %d articles (running total: %d)", idx + 1, len(articles), len(all_articles))
# ── Branch B: Legacy plain text JSON body ─────────────────────────────────
else:
body = request.get_json(silent=True) or {}
raw_chat = body.get("chat_text", "").strip()
if not raw_chat:
return jsonify({"ok": False, "error": "Provide a .zip file upload or chat_text in JSON body"}), 400
if len(raw_chat) < 100:
return jsonify({"ok": False, "error": "Chat text too short to process"}), 400
logger.info("WhatsApp plain text import: %d chars", len(raw_chat))
# Chunk the plain text too (handles large exports)
lines = raw_chat.splitlines()
pseudo_zip = type("PseudoZip", (), {
"chat_text": raw_chat,
"media_map": {}
})()
processor = WhatsAppZipProcessor(b"")
processor.chat_text = raw_chat
processor.media_map = {}
chunks = processor.build_chunks()
for idx, chunk in enumerate(chunks):
logger.info("Processing text chunk %d/%d", idx + 1, len(chunks))
articles = _process_chunk_with_gemini(chunk)
all_articles.extend(articles)
if not all_articles:
logger.info("No articles extracted from this export")
return jsonify({
"ok": True,
"articles_found": 0,
"saved": 0,
"skipped_dupes": 0,
"note": "No clear problem→solution pairs found in this chat segment"
})
stats = _save_kb_articles(all_articles, source_label=source_label)
logger.info("WhatsApp import complete: found=%d, %s", len(all_articles), stats)
return jsonify({
"ok": True,
"articles_found": len(all_articles),
"articles": all_articles, # full list — frontend INSERTs to Supabase kb_articles
"saved": stats["saved"],
"skipped_dupes": stats["skipped"],
})
# ══════════════════════════════════════════════════════════════════════════════
# FEATURE 2 — Bulk KB Upload (CSV / Excel / PDF)
# ══════════════════════════════════════════════════════════════════════════════
def _extract_text_from_pdf_bytes(pdf_bytes: bytes) -> str:
if PYPDF_AVAILABLE:
try:
reader = pypdf.PdfReader(io.BytesIO(pdf_bytes))
pages = [p.extract_text() or "" for p in reader.pages]
text = "\n\n".join(pages).strip()
if text:
return text
except Exception as e:
logger.warning("pypdf extraction failed: %s", e)
if _gemini_client:
try:
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=[
"Extract all text from this PDF document. Return plain text only.",
genai_types.Part.from_bytes(data=pdf_bytes, mime_type="application/pdf")
]
)
return resp.text or ""
except Exception as e:
logger.error("Gemini PDF extraction failed: %s", e)
return ""
PDF_KB_PROMPT = """You are a support knowledge base curator.
Convert the following document content into structured KB articles.
Each article covers one distinct topic, issue, or procedure.
Return ONLY a valid JSON array — no preamble, no markdown fences.
All string values must be properly JSON-escaped (no raw newlines inside strings, use \\n).
Schema per item:
{"title": "string", "content": "string", "category": "one of: Account|Billing|Technical|Feature|Other", "tags": ["string"]}
Document content:
"""
@app.post("/api/kb/bulk-upload")
def bulk_upload():
if "file" not in request.files:
return jsonify({"ok": False, "error": "No file uploaded"}), 400
f = request.files["file"]
filename = f.filename or ""
ext = filename.rsplit(".", 1)[-1].lower()
file_data = f.read()
articles = []
if ext in ("csv", "xlsx", "xls"):
if not PANDAS_AVAILABLE:
return jsonify({"ok": False, "error": "pandas not installed on server"}), 500
try:
df = pd.read_csv(io.BytesIO(file_data)) if ext == "csv" else pd.read_excel(io.BytesIO(file_data))
df.columns = [c.strip().lower() for c in df.columns]
if "title" not in df.columns or "content" not in df.columns:
return jsonify({"ok": False, "error": "CSV/Excel must have 'title' and 'content' columns"}), 400
for _, row in df.iterrows():
tags = []
if "tags" in df.columns and pd.notna(row.get("tags")):
tags = [t.strip() for t in re.split(r"[,;|]", str(row["tags"])) if t.strip()]
articles.append({
"title": str(row["title"]).strip(),
"content": str(row["content"]).strip(),
"category": str(row.get("category", "General")).strip() if pd.notna(row.get("category")) else "General",
"tags": tags,
})
except Exception as e:
return jsonify({"ok": False, "error": f"Could not parse file: {e}"}), 400
elif ext == "pdf":
text = _extract_text_from_pdf_bytes(file_data)
if not text:
return jsonify({"ok": False, "error": "Could not extract text from PDF"}), 400
raw = _gemini_text(PDF_KB_PROMPT + text[:50000], json_mode=True)
parsed = _safe_json(raw, [])
articles = _validate_articles(parsed)
if not articles:
return jsonify({"ok": False, "error": "Gemini PDF structuring returned no valid articles"}), 500
else:
return jsonify({"ok": False, "error": f"Unsupported file type .{ext}. Use csv, xlsx, or pdf"}), 400
if not articles:
return jsonify({"ok": False, "error": "No articles extracted from file"}), 400
stats = _save_kb_articles(articles, source_label=f"bulk_upload:{filename}")
return jsonify({"ok": True, "articles_found": len(articles), "articles": articles, # full list — frontend INSERTs to Supabase kb_articles
"saved": stats["saved"], "skipped_dupes": stats["skipped"]})
# ══════════════════════════════════════════════════════════════════════════════
# FEATURE 3 — Ticket Submission via NL Text or Voice
# ══════════════════════════════════════════════════════════════════════════════
TICKET_EXTRACTION_PROMPT = """You are a support ticket intake system for a software support portal.
A user has described their issue in natural language. Extract structured ticket fields.
Return ONLY a valid JSON object — no preamble, no markdown fences.
All string values must be properly JSON-escaped.
Schema:
{"title": "string (max 80 chars)", "description": "string (full clear description)", "category_hint": "one of: Account|Billing|Technical|Feature|Other", "priority_hint": "one of: low|medium|high|critical", "keywords": ["string"]}
User message:
"""
def _transcribe_audio_assemblyai(audio_b64: str, audio_format: str = "wav") -> str:
if not ASSEMBLYAI_API_KEY:
return ""
audio_bytes = base64.b64decode(audio_b64)
headers = {"authorization": ASSEMBLYAI_API_KEY}
try:
upload_resp = requests.post(
f"{ASSEMBLYAI_BASE}/upload",
headers={**headers, "Content-Type": "application/octet-stream"},
data=audio_bytes, timeout=30
)
upload_resp.raise_for_status()
upload_url = upload_resp.json().get("upload_url")
except Exception as e:
logger.error("AssemblyAI upload error: %s", e)
return ""
try:
tx_resp = requests.post(
f"{ASSEMBLYAI_BASE}/transcript",
headers={**headers, "Content-Type": "application/json"},
json={"audio_url": upload_url, "language_detection": True}, timeout=15
)
tx_resp.raise_for_status()
tx_id = tx_resp.json().get("id")
except Exception as e:
logger.error("AssemblyAI transcript request error: %s", e)
return ""
for _ in range(30):
time.sleep(3)
try:
poll = requests.get(f"{ASSEMBLYAI_BASE}/transcript/{tx_id}", headers=headers, timeout=15)
poll.raise_for_status()
result = poll.json()
status = result.get("status")
if status == "completed":
return result.get("text", "")
elif status == "error":
logger.error("AssemblyAI error: %s", result.get("error"))
return ""
except Exception as e:
logger.error("AssemblyAI poll error: %s", e)
return ""
@app.post("/api/tickets/submit-nl")
def submit_ticket_nl():
body = request.get_json(silent=True) or {}
message = body.get("message", "").strip()
user_id = body.get("user_id", "anonymous")
if not message:
return jsonify({"ok": False, "error": "message is required"}), 400
raw = _gemini_text(TICKET_EXTRACTION_PROMPT + message, json_mode=True)
ticket = _safe_json(raw, {})
if not isinstance(ticket, dict) or not ticket.get("title"):
return jsonify({"ok": False, "error": "Could not extract ticket info from message"}), 500
if db:
db.collection("iris_ai_ticket_drafts").add({
"user_id": user_id, "raw_input": message,
"extracted": ticket, "channel": "nl_text",
"created_at": datetime.now(timezone.utc).isoformat(),
})
return jsonify({"ok": True, "ticket": ticket})
@app.post("/api/tickets/submit-voice")
def submit_ticket_voice():
body = request.get_json(silent=True) or {}
audio_b64 = body.get("audio_b64", "")
audio_format = body.get("audio_format", "wav")
user_id = body.get("user_id", "anonymous")
if not audio_b64:
return jsonify({"ok": False, "error": "audio_b64 is required"}), 400
if not ASSEMBLYAI_API_KEY:
return jsonify({"ok": False, "error": "AssemblyAI not configured on server"}), 500
transcript = _transcribe_audio_assemblyai(audio_b64, audio_format)
if not transcript:
return jsonify({"ok": False, "error": "Transcription failed or returned empty result"}), 500
raw = _gemini_text(TICKET_EXTRACTION_PROMPT + transcript, json_mode=True)
ticket = _safe_json(raw, {})
if not isinstance(ticket, dict) or not ticket.get("title"):
return jsonify({"ok": False, "error": "Could not extract ticket info from transcript"}), 500
if db:
db.collection("iris_ai_ticket_drafts").add({
"user_id": user_id, "raw_input": transcript,
"extracted": ticket, "channel": "voice",
"created_at": datetime.now(timezone.utc).isoformat(),
})
return jsonify({"ok": True, "transcript": transcript, "ticket": ticket})
# ══════════════════════════════════════════════════════════════════════════════
# FEATURE 4 — System Tutorial Ingestion
# ══════════════════════════════════════════════════════════════════════════════
TUTORIAL_VIDEO_PROMPT = """You are a knowledge base curator watching a tutorial video about the Iris platform.
CONTEXT ABOUT IRIS:
- Iris is an integrated POS (Point of Sale) and fiscalisation system with a mobile attendance and
location-tracking module used by tellers and field reps at retail stores in Zimbabwe.
- The POS/fiscal layer handles sales, receipts, and ZIMRA fiscal compliance (Elixir device).
- The mobile module handles teller clock-in/out, GPS location, store radius, and hours tracking.
- The Iris Support Portal is a customer support desk used by admin staff, agents, and support tiers
to manage tickets, agents, customers, and the knowledge base.
YOUR TASK:
Watch this tutorial video in full. For every distinct feature, workflow, or task you observe being
demonstrated, extract one self-contained KB article. Identify the exact timestamp range in the video
where each demonstration occurs so users can jump directly to the relevant moment.
Be precise about timestamps — state the second at which the demonstration starts and ends.
Write step-by-step instructions based on what you see happening on screen, not generic descriptions.
If the presenter speaks, incorporate their narration into the steps.
Return ONLY a valid JSON array. No preamble, no markdown fences. All strings properly JSON-escaped.
Use \n for line breaks within content strings.
Schema per item:
{
"title": "string — concise how-to title, max 80 chars",
"content": "string — numbered step-by-step instructions based on what is shown",
"category": "one of: Account|Tickets|Agents|Reports|Admin|POS|Attendance|Other",
"tags": ["string"],
"timestamp_start": <integer — seconds from video start where this demo begins>,
"timestamp_end": <integer — seconds from video start where this demo ends>
}
If the video contains no discernible how-to demonstrations, return an empty array: []
"""
def _upload_video_to_gemini(video_bytes: bytes, mime_type: str, display_name: str) -> Optional[Any]:
"""
Upload a video to the Gemini Files API and poll until processing is ACTIVE.
Returns the uploaded file object (with .uri and .name) or None on failure.
Gemini Files API processes video at 1 FPS, adding timestamps every second.
Files are retained for 48 hours. We delete after use to be tidy.
"""
if not _gemini_client:
return None
try:
# Write bytes to a named temp file — Files API needs a file path or IO object
with tempfile.NamedTemporaryFile(suffix=f".{mime_type.split('/')[-1]}", delete=False) as tmp:
tmp.write(video_bytes)
tmp_path = tmp.name
logger.info("Uploading video to Gemini Files API: %s (%d bytes)", display_name, len(video_bytes))
uploaded = _gemini_client.files.upload(
file=tmp_path,
config={"mime_type": mime_type, "display_name": display_name}
)
os.unlink(tmp_path)
logger.info("Upload complete. File name: %s — polling for ACTIVE state...", uploaded.name)
except Exception as e:
logger.error("Gemini Files API upload error: %s", e)
return None
# Poll until state is ACTIVE (video processing complete) — max ~3 minutes
for attempt in range(36):
time.sleep(5)
try:
file_info = _gemini_client.files.get(name=uploaded.name)
state = getattr(file_info, "state", None)
state_str = str(state).upper() if state else ""
logger.info("Poll %d: file state = %s", attempt + 1, state_str)
if "ACTIVE" in state_str:
logger.info("Video ACTIVE after %d polls (~%ds)", attempt + 1, (attempt + 1) * 5)
return file_info
elif "FAILED" in state_str:
logger.error("Gemini Files API processing failed for %s", uploaded.name)
return None
except Exception as e:
logger.warning("Poll error: %s", e)
logger.error("Video did not reach ACTIVE state within timeout")
return None
def _delete_gemini_file(file_obj: Any) -> None:
"""Best-effort cleanup of a file from the Gemini Files API."""
try:
_gemini_client.files.delete(name=file_obj.name)
logger.info("Deleted Gemini file: %s", file_obj.name)
except Exception as e:
logger.warning("Could not delete Gemini file %s: %s", file_obj.name, e)
# Supported video MIME types for tutorial upload
SUPPORTED_VIDEO_MIMES = {
".mp4": "video/mp4",
".mov": "video/quicktime",
".avi": "video/x-msvideo",
".webm": "video/webm",
".mkv": "video/x-matroska",
".3gp": "video/3gpp",
".flv": "video/x-flv",
}
@app.post("/api/kb/tutorial-ingest")
def tutorial_ingest():
"""
Accepts a tutorial video file upload (multipart, field name "file").
Gemini watches the full video, self-generates timestamps, and extracts
one KB article per distinct feature or task demonstrated.
No transcript required — Gemini reasons directly from video + audio.
Supported: mp4, mov, avi, webm, mkv, 3gp, flv
Max practical size: ~500MB (Files API limit is 2GB, but HF Space upload limit applies)
Returns articles with timestamp_start/end in seconds so the frontend
can generate deep-links into the video.
"""
if "file" not in request.files:
return jsonify({"ok": False, "error": "No file uploaded. Use multipart field name 'file'."}), 400
f = request.files["file"]
filename = f.filename or "tutorial"
ext = os.path.splitext(filename.lower())[1]
video_title = request.form.get("video_title", filename)
video_url = request.form.get("video_url", "")
mime_type = SUPPORTED_VIDEO_MIMES.get(ext)
if not mime_type:
return jsonify({
"ok": False,
"error": f"Unsupported video format '{ext}'. Supported: {', '.join(SUPPORTED_VIDEO_MIMES)}"
}), 400
if not _gemini_client:
return jsonify({"ok": False, "error": "Gemini client not initialised — check GOOGLE_API_KEY"}), 500
video_bytes = f.read()
logger.info("Tutorial ingest: '%s', %d bytes, mime=%s", video_title, len(video_bytes), mime_type)
# Upload to Gemini Files API and wait for processing
gemini_file = _upload_video_to_gemini(video_bytes, mime_type, display_name=video_title)
if not gemini_file:
return jsonify({"ok": False, "error": "Video upload or processing by Gemini failed. Try a smaller file or check the format."}), 500
# Ask Gemini to watch and extract articles with self-generated timestamps
try:
logger.info("Sending video to Gemini for tutorial extraction...")
resp = _gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=[gemini_file, TUTORIAL_VIDEO_PROMPT],
config=genai_types.GenerateContentConfig(
response_mime_type="application/json"
)
)
raw = resp.text or ""
except Exception as e:
logger.error("Gemini video analysis error: %s", e)
_delete_gemini_file(gemini_file)
return jsonify({"ok": False, "error": f"Gemini analysis failed: {e}"}), 500
finally:
# Always attempt cleanup — files expire in 48h anyway but clean up early
_delete_gemini_file(gemini_file)
parsed = _safe_json(raw, [])
articles = _validate_articles(parsed) if isinstance(parsed, list) else []
if not articles:
return jsonify({
"ok": False,
"error": "Gemini could not extract any how-to articles from this video. "
"Ensure the video contains on-screen demonstrations of Iris features."
}), 500
# Attach video metadata and normalise timestamp types
for a in articles:
a["video_url"] = video_url
a["video_title"] = video_title
for ts_key in ("timestamp_start", "timestamp_end"):
val = a.get(ts_key)
if not isinstance(val, int):
try:
a[ts_key] = int(val) if val is not None else 0
except (TypeError, ValueError):
a[ts_key] = 0
stats = _save_kb_articles(articles, source_label=f"tutorial:{video_title}")
logger.info("Tutorial ingest complete: %d articles, saved=%d, skipped=%d",
len(articles), stats["saved"], stats["skipped"])
return jsonify({
"ok": True,
"video_title": video_title,
"articles_found": len(articles),
"articles": articles, # full list — frontend INSERTs to Supabase kb_articles
"saved": stats["saved"],
"skipped_dupes": stats["skipped"],
})
# ══════════════════════════════════════════════════════════════════════════════
# FEATURE 5 — Agent Solution Writing (NL Text + Voice)
# ══════════════════════════════════════════════════════════════════════════════
SOLUTION_EXTRACTION_PROMPT = """You are a support knowledge base curator.
An agent has described a solution they used to resolve a ticket.
Structure this into a reusable KB article.
Return ONLY a valid JSON object — no preamble, no markdown fences.
All strings must be properly JSON-escaped.
Schema:
{"title": "string", "content": "string (clear step-by-step solution)", "category": "one of: Account|Billing|Technical|Feature|Other", "tags": ["string"]}
Agent description:
"""
@app.post("/api/kb/agent-solution-nl")
def agent_solution_nl():
body = request.get_json(silent=True) or {}
message = body.get("message", "").strip()
agent_id = body.get("agent_id", "unknown")
ticket_id = body.get("ticket_id", "")
if not message:
return jsonify({"ok": False, "error": "message is required"}), 400
raw = _gemini_text(SOLUTION_EXTRACTION_PROMPT + message, json_mode=True)
article = _safe_json(raw, {})
if not isinstance(article, dict) or not article.get("title"):
return jsonify({"ok": False, "error": "Could not structure solution"}), 500
if ticket_id:
article.setdefault("tags", []).append(f"ticket:{ticket_id}")
stats = _save_kb_articles([article], source_label=f"agent:{agent_id}")
return jsonify({"ok": True, "saved": stats["saved"],
"article": article, # single article — frontend INSERTs to Supabase kb_articles
"articles": [article]})
@app.post("/api/kb/agent-solution-voice")
def agent_solution_voice():
body = request.get_json(silent=True) or {}
audio_b64 = body.get("audio_b64", "")
audio_format = body.get("audio_format", "wav")
agent_id = body.get("agent_id", "unknown")
ticket_id = body.get("ticket_id", "")
if not audio_b64:
return jsonify({"ok": False, "error": "audio_b64 is required"}), 400
transcript = _transcribe_audio_assemblyai(audio_b64, audio_format)
if not transcript:
return jsonify({"ok": False, "error": "Transcription failed"}), 500
raw = _gemini_text(SOLUTION_EXTRACTION_PROMPT + transcript, json_mode=True)
article = _safe_json(raw, {})
if not isinstance(article, dict) or not article.get("title"):
return jsonify({"ok": False, "error": "Could not structure solution from transcript"}), 500
if ticket_id:
article.setdefault("tags", []).append(f"ticket:{ticket_id}")
stats = _save_kb_articles([article], source_label=f"agent:{agent_id}")
return jsonify({"ok": True, "transcript": transcript, "saved": stats["saved"],
"article": article, # single article — frontend INSERTs to Supabase kb_articles
"articles": [article]})
# ══════════════════════════════════════════════════════════════════════════════
# FEATURE 6 — Iris Chatbot (RAG over KB + Tutorials)
# ══════════════════════════════════════════════════════════════════════════════
def _search_kb(query: str, limit: int = 5) -> List[Dict]:
if not db:
return []
query_terms = [t.lower() for t in query.split() if len(t) > 2]
try:
docs = db.collection("iris_kb_articles").order_by(
"created_at", direction=firestore.Query.DESCENDING
).limit(200).stream()
results = []
for doc in docs:
d = doc.to_dict()
text = f"{d.get('title','')} {d.get('content','')} {' '.join(d.get('tags',[]))}".lower()
score = sum(1 for term in query_terms if term in text)
if score > 0:
results.append({"score": score, **d})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
except Exception as e:
logger.error("KB search error: %s", e)
return []
CHATBOT_SYSTEM_PROMPT = """You are Iris, an intelligent support assistant for the Iris Support Portal.
Answer ONLY from the provided knowledge base context.
If the answer is in a tutorial with a timestamp, mention the video and timestamp.
Be concise, clear, and friendly. Format step-by-step answers as numbered lists.
If you cannot find the answer, say so honestly and suggest submitting a ticket.
"""
@app.post("/api/chatbot/query")
def chatbot_query():
body = request.get_json(silent=True) or {}
message = body.get("message", "").strip()
session_id = body.get("session_id", "default")
user_id = body.get("user_id", "anonymous")
if not message:
return jsonify({"ok": False, "error": "message is required"}), 400
kb_results = _search_kb(message, limit=5)
context_blocks = []
sources = []
for r in kb_results:
block = f"[Article: {r.get('title')}]\n{r.get('content', '')}"
if r.get("timestamp_start") is not None:
ts = r["timestamp_start"]
block += f"\n(Tutorial: {r.get('video_title','Video')} at {ts//60:02d}:{ts%60:02d}"
if r.get("video_url"):
block += f" — {r['video_url']}"
block += ")"
context_blocks.append(block)
sources.append({
"title": r.get("title"),
"category": r.get("category"),
"source": r.get("source"),
"ts_start": r.get("timestamp_start"),
"video_url": r.get("video_url"),
})
context_str = "\n\n---\n\n".join(context_blocks) if context_blocks else "No relevant articles found."
full_prompt = f"{CHATBOT_SYSTEM_PROMPT}\n\nKNOWLEDGE BASE CONTEXT:\n{context_str}\n\nUSER QUESTION: {message}\n\nAnswer:"
answer = _gemini_text(full_prompt)
if not answer:
answer = "Sorry, I could not process your question right now. Please try again or submit a support ticket."
if db:
db.collection("iris_chatbot_logs").add({
"user_id": user_id, "session_id": session_id,
"message": message, "answer": answer, "sources": sources,
"created_at": datetime.now(timezone.utc).isoformat(),
})
return jsonify({"ok": True, "answer": answer, "sources": sources})
# ══════════════════════════════════════════════════════════════════════════════
# KB READ / DELETE ENDPOINTS
# ══════════════════════════════════════════════════════════════════════════════
@app.get("/api/kb/articles")
def list_kb_articles():
category = request.args.get("category", "")
limit = int(request.args.get("limit", 50))
if not db:
return jsonify({"ok": False, "error": "Firebase unavailable"}), 500
try:
query = db.collection("iris_kb_articles").order_by("created_at", direction=firestore.Query.DESCENDING)
if category:
query = query.where("category", "==", category)
docs = query.limit(limit).stream()
articles = [{"id": d.id, **d.to_dict()} for d in docs]
return jsonify({"ok": True, "articles": articles, "count": len(articles)})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.delete("/api/kb/articles/<article_id>")
def delete_kb_article(article_id: str):
if not db:
return jsonify({"ok": False, "error": "Firebase unavailable"}), 500
try:
db.collection("iris_kb_articles").document(article_id).delete()
return jsonify({"ok": True})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
# ══════════════════════════════════════════════════════════════════════════════
# HEALTH
# ══════════════════════════════════════════════════════════════════════════════
@app.get("/health")
def health():
article_count = 0
if db:
try:
docs = db.collection("iris_kb_articles").count().get()
article_count = docs[0][0].value
except Exception:
pass
return jsonify({
"ok": True,
"service": "Iris AI Service v1.1",
"model": GEMINI_MODEL,
"gemini": bool(_gemini_client),
"assemblyai": bool(ASSEMBLYAI_API_KEY),
"firebase": bool(db),
"kb_articles": article_count,
})
# ══════════════════════════════════════════════════════════════════════════════
# ENTRYPOINT
# ══════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
logger.info("Iris AI Service v1.1 starting on port %d (model=%s)", port, GEMINI_MODEL)
app.run(host="0.0.0.0", port=port) |