""" main.py — Iris AI Service (v1.1 - April 2026) AI layer for the Iris Support Portal (IrisPlus / Unified Spark Desk). Deployed as a HuggingFace Space monofile (Flask + Gemini + AssemblyAI + Firebase). CHANGELOG v1.1: - Model: gemini-3.1-flash-lite-preview (multimodal reasoning) - /api/kb/whatsapp-import: now accepts multipart ZIP upload * Extracts _chat.txt + maps image files to pointers * Sliding-window chunking (~10k tokens / ~40k chars with overlap) * Multimodal: sends images inline with their surrounding text chunk * Strict JSON enforcement + pre-save validation * JSON parse error recovery (regex extraction fallback) - All other endpoints unchanged from v1.0 FEATURES: 1. WhatsApp Export → Knowledge Base (ZIP multimodal, chunked, additive) 2. Bulk KB Upload (CSV / Excel / PDF) 3. Natural Language + Voice Ticket Submission 4. System Tutorial Ingestion (timestamped transcripts) 5. Agent NL/Voice Solution Writing 6. Iris Chatbot (KB RAG) ENV VARS: GOOGLE_API_KEY — Gemini API key ASSEMBLYAI_API_KEY — AssemblyAI API key FIREBASE — JSON string of Firebase service account GEMINI_MODEL — Override model (default: gemini-3.1-flash-lite-preview) PORT — Server port (default 7860) """ import os import io import re import json import time import logging import base64 import hashlib import zipfile import tempfile from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple import requests from flask import Flask, request, jsonify from flask_cors import CORS # ─── Logging ────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s" ) logger = logging.getLogger("iris-ai-service") # ─── Gemini SDK ─────────────────────────────────────────────────────────────── try: from google import genai from google.genai import types as genai_types except Exception as e: genai = None logger.error("google-genai not installed: %s", e) GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") # v1.1: upgraded to gemini-3.1-flash-lite-preview for multimodal reasoning GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-3.1-flash-lite-preview") _gemini_client = None if genai and GOOGLE_API_KEY: try: _gemini_client = genai.Client(api_key=GOOGLE_API_KEY) logger.info("Gemini client ready (model=%s).", GEMINI_MODEL) except Exception as e: logger.error("Failed to init Gemini client: %s", e) # ─── AssemblyAI ─────────────────────────────────────────────────────────────── ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY", "") ASSEMBLYAI_BASE = "https://api.assemblyai.com/v2" # ─── Firebase ───────────────────────────────────────────────────────────────── try: import firebase_admin from firebase_admin import credentials, firestore FIREBASE_AVAILABLE = True except ImportError: FIREBASE_AVAILABLE = False logger.warning("firebase-admin not installed. Persistence disabled.") FIREBASE_ENV = os.environ.get("FIREBASE", "") def init_firestore() -> Optional[Any]: if not FIREBASE_AVAILABLE: return None if firebase_admin._apps: return firestore.client() if not FIREBASE_ENV: logger.warning("FIREBASE env var missing. Persistence disabled.") return None try: sa_info = json.loads(FIREBASE_ENV) cred = credentials.Certificate(sa_info) firebase_admin.initialize_app(cred) logger.info("Firebase initialized.") return firestore.client() except Exception as e: logger.critical("Firebase init failed: %s", e) return None db = init_firestore() # ─── Optional libs ──────────────────────────────────────────────────────────── try: import pandas as pd PANDAS_AVAILABLE = True except ImportError: PANDAS_AVAILABLE = False try: import pypdf PYPDF_AVAILABLE = True except ImportError: PYPDF_AVAILABLE = False # ─── Flask App ──────────────────────────────────────────────────────────────── app = Flask(__name__) CORS(app) # ══════════════════════════════════════════════════════════════════════════════ # SHARED HELPERS # ══════════════════════════════════════════════════════════════════════════════ # Supported image extensions for multimodal WhatsApp ingestion SUPPORTED_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"} # Approx chars per token (conservative for mixed Shona/English/emoji content) CHARS_PER_TOKEN = 4 # Target ~10k tokens per chunk with ~1k token overlap CHUNK_CHARS = 40_000 OVERLAP_CHARS = 4_000 def _safe_json(text: str, fallback: Any) -> Any: """ Multi-strategy JSON parser. 1. Direct parse after stripping markdown fences. 2. Regex extraction of first [...] or {...} block. 3. Return fallback. """ if not text: return fallback # Strategy 1: strip fences clean = text.strip() for fence in ("```json", "```JSON", "```"): if fence in clean: parts = clean.split(fence) # take the content between the first pair of fences if len(parts) >= 3: clean = parts[1].strip() elif len(parts) == 2: clean = parts[1].split("```")[0].strip() break try: return json.loads(clean) except json.JSONDecodeError: pass # Strategy 2: regex — find outermost [...] array arr_match = re.search(r'\[[\s\S]*\]', clean) if arr_match: try: return json.loads(arr_match.group()) except json.JSONDecodeError: pass # Strategy 3: regex — find outermost {...} object obj_match = re.search(r'\{[\s\S]*\}', clean) if obj_match: try: return json.loads(obj_match.group()) except json.JSONDecodeError: pass logger.error("JSON parse exhausted all strategies. First 300 chars: %s", text[:300]) return fallback def _validate_articles(data: Any) -> List[Dict]: """ Validate that extracted articles are a list of dicts with required fields. Filters out malformed items rather than failing the whole batch. """ if not isinstance(data, list): logger.warning("Expected list from Gemini, got %s", type(data)) return [] valid = [] for item in data: if not isinstance(item, dict): continue title = str(item.get("title", "")).strip() content = str(item.get("content", "")).strip() if len(title) < 3 or len(content) < 10: continue valid.append({ "title": title, "content": content, "category": str(item.get("category", "General")).strip() or "General", "tags": item.get("tags", []) if isinstance(item.get("tags"), list) else [], }) return valid def _gemini_text(prompt: str, json_mode: bool = False) -> str: """Call Gemini with text-only content.""" if not _gemini_client: return "" cfg = genai_types.GenerateContentConfig( response_mime_type="application/json" ) if json_mode else None try: resp = _gemini_client.models.generate_content( model=GEMINI_MODEL, contents=prompt, config=cfg ) return resp.text or "" except Exception as e: logger.error("Gemini text call error: %s", e) return "" def _gemini_multimodal(parts: list, json_mode: bool = False) -> str: """Call Gemini with a mixed list of text strings and image Parts.""" if not _gemini_client: return "" cfg = genai_types.GenerateContentConfig( response_mime_type="application/json" ) if json_mode else None try: resp = _gemini_client.models.generate_content( model=GEMINI_MODEL, contents=parts, config=cfg ) return resp.text or "" except Exception as e: logger.error("Gemini multimodal call error: %s", e) return "" def _article_fingerprint(title: str, content: str) -> str: raw = f"{title.strip().lower()}::{content.strip().lower()[:300]}" return hashlib.sha256(raw.encode()).hexdigest()[:16] def _get_existing_fingerprints() -> set: if not db: return set() try: docs = db.collection("iris_kb_articles").select(["fingerprint"]).stream() return {d.to_dict().get("fingerprint") for d in docs if d.to_dict().get("fingerprint")} except Exception as e: logger.error("Fingerprint fetch error: %s", e) return set() def _save_kb_articles(articles: List[Dict], source_label: str) -> Dict: if not db: return {"saved": 0, "skipped": 0, "error": "Firebase unavailable"} existing = _get_existing_fingerprints() saved, skipped = 0, 0 for article in articles: title = article.get("title", "Untitled") content = article.get("content", "") fp = _article_fingerprint(title, content) if fp in existing: skipped += 1 continue doc = { "title": title, "content": content, "category": article.get("category", "General"), "tags": article.get("tags", []), "source": source_label, "fingerprint": fp, "created_at": datetime.now(timezone.utc).isoformat(), } if article.get("timestamp_start") is not None: doc["timestamp_start"] = article["timestamp_start"] doc["timestamp_end"] = article.get("timestamp_end") doc["video_url"] = article.get("video_url", "") db.collection("iris_kb_articles").add(doc) existing.add(fp) saved += 1 return {"saved": saved, "skipped": skipped} # ══════════════════════════════════════════════════════════════════════════════ # WHATSAPP ZIP PROCESSOR # ══════════════════════════════════════════════════════════════════════════════ # Regex to match WhatsApp timestamp lines # Handles both: DD/MM/YYYY, HH:MM - Sender: message # and: DD/MM/YYYY, HH:MM am/pm - Sender: message WA_LINE_RE = re.compile( r'^\d{1,2}/\d{1,2}/\d{4},\s+\d{1,2}:\d{2}(?:\s*[ap]m)?\s+-\s+', re.IGNORECASE ) # Matches or [filename.jpg] style media pointers MEDIA_POINTER_RE = re.compile( r'|\[?([^\]]+\.(?:jpg|jpeg|png|webp|gif|mp4|opus|aac|m4a))\]?', re.IGNORECASE ) class WhatsAppZipProcessor: """ Handles extraction and multimodal chunking of a WhatsApp .zip export. A WhatsApp export zip typically contains: _chat.txt — the full conversation IMG-YYYYMMDD-*.jpg — attached images VID-*.mp4 — videos (we skip these, too large) PTT-*.opus — voice notes (skipped) """ def __init__(self, zip_bytes: bytes): self.zip_bytes = zip_bytes self.chat_text = "" self.media_map: Dict[str, bytes] = {} # filename -> raw bytes def extract(self) -> bool: """Extract chat text and image files from ZIP. Returns True on success.""" try: with zipfile.ZipFile(io.BytesIO(self.zip_bytes)) as zf: names = zf.namelist() logger.info("ZIP contains %d files: %s", len(names), names[:20]) # Find chat file — WhatsApp names it _chat.txt or WhatsApp Chat with *.txt chat_file = None for name in names: base = os.path.basename(name).lower() if base == "_chat.txt" or (base.endswith(".txt") and "chat" in base): chat_file = name break if not chat_file: # Fallback: any .txt file txts = [n for n in names if n.lower().endswith(".txt")] if txts: chat_file = txts[0] if not chat_file: logger.error("No chat .txt found in ZIP") return False raw = zf.read(chat_file) self.chat_text = raw.decode("utf-8", errors="replace") logger.info("Chat text extracted: %d chars from %s", len(self.chat_text), chat_file) # Extract images (skip videos and audio — too large / not useful for KB) for name in names: ext = os.path.splitext(name.lower())[1] if ext in SUPPORTED_IMAGE_EXTS: try: self.media_map[os.path.basename(name)] = zf.read(name) except Exception as e: logger.warning("Could not read media file %s: %s", name, e) logger.info("Media files extracted: %d images", len(self.media_map)) return True except zipfile.BadZipFile as e: logger.error("Bad ZIP file: %s", e) return False except Exception as e: logger.error("ZIP extraction error: %s", e) return False def _resolve_media_in_line(self, line: str) -> Optional[bytes]: """ Given a chat line, check if it references a media file we have. Returns the image bytes if found, else None. """ match = MEDIA_POINTER_RE.search(line) if not match: return None filename = match.group(1) # group 1 = explicit filename, None for if filename: fname = os.path.basename(filename) if fname in self.media_map: return self.media_map[fname] # — we can't recover the file since it wasn't exported return None def build_chunks(self) -> List[Dict]: """ Split chat text into overlapping chunks, each annotated with the image bytes found within that chunk. Returns list of: { "text": str, "images": [bytes, ...], "line_range": (start, end) } """ lines = self.chat_text.splitlines() chunks = [] i = 0 total = len(lines) char_count = 0 chunk_lines: List[str] = [] chunk_images: List[bytes] = [] while i < total: line = lines[i] chunk_lines.append(line) char_count += len(line) + 1 # +1 for newline # Check if this line has an image we can include img_bytes = self._resolve_media_in_line(line) if img_bytes and len(chunk_images) < 5: # cap images per chunk chunk_images.append(img_bytes) if char_count >= CHUNK_CHARS or i == total - 1: chunks.append({ "text": "\n".join(chunk_lines), "images": chunk_images[:], "line_range": (i - len(chunk_lines) + 1, i) }) logger.info( "Chunk %d: %d lines, %d chars, %d images", len(chunks), len(chunk_lines), char_count, len(chunk_images) ) # Overlap: keep last OVERLAP_CHARS worth of lines for next chunk overlap_text = 0 overlap_start = len(chunk_lines) - 1 while overlap_start > 0 and overlap_text < OVERLAP_CHARS: overlap_text += len(chunk_lines[overlap_start]) + 1 overlap_start -= 1 chunk_lines = chunk_lines[overlap_start:] chunk_images = [] char_count = sum(len(l) + 1 for l in chunk_lines) i += 1 logger.info("Total chunks: %d", len(chunks)) return chunks # ══════════════════════════════════════════════════════════════════════════════ # WHATSAPP EXTRACTION PROMPT # ══════════════════════════════════════════════════════════════════════════════ WHATSAPP_EXTRACTION_PROMPT = """You are a support knowledge base curator for the Iris platform, deployed across Zimbabwe. Your task: analyse this WhatsApp support group chat segment and extract ONLY clear problem→solution pairs. CONTEXT ABOUT THIS PLATFORM: - "Iris" is an integrated POS (Point of Sale) and fiscalisation system with a mobile attendance and location-tracking module used by field sales reps and in-store tellers at retail stores. - The POS and fiscalisation layer handles sales transactions, receipt generation, and ZIMRA fiscal compliance. The mobile module handles teller clock-in/out, GPS location verification, and hours tracking. - Common POS/fiscal issues: fiscalisation failures, receipt errors, device not syncing to ZIMRA servers, Elixir (fiscal device software) login/password problems. - Common mobile attendance issues: GPS location not detected, clock-in failures, app killed by Android battery optimiser, teller passkey problems, hours recording incorrectly, store radius too small, wrong teller name shown after login, app not running in the background. - Messages mix English, Shona, and Ndebele. Understand regional vernacular (e.g. "irikudzima" = switching off, "ndakashanda" = I worked, "short yemahours" = hours shortage, "gadzirisayi" = fix it, "hupfu" = flour, "yakuda kulogwa patsva" = needs to be logged in fresh). - If screenshots show Android error dialogs (e.g. "Service killed by system", "App stopped", "Abrupt stop"), reason through what that means for Android background restriction and background service killing, and include that diagnosis and fix in the solution content. - If screenshots show fiscal device or POS screens, extract the error code or state shown and reason through the likely cause from the Elixir/ZIMRA integration context. STRICT RULES: 1. Extract ONLY exchanges where a user described a problem AND a named support person (Tendayi, Tony, Violet, Rufaro, Albrighton, Ishmael, or any named responder) provided a working solution or clear instruction. 2. Ignore: greetings, media-only messages, deleted messages, clock-in screenshots with no text context, messages from unknown numbers with no solution attached. 3. Each article must be self-contained and usable by a support agent in future. 4. Translate all Shona/Ndebele problem descriptions to English in the article content. 5. If a screenshot appears to show an Android error or GPS issue, reason through the likely cause and include that reasoning in the solution content. OUTPUT FORMAT: Return ONLY a valid JSON array. No preamble, no explanation, no markdown fences. Every string value MUST be properly JSON-escaped. Do not use unescaped newlines, tabs, or quotes inside strings. Use \\n for line breaks within content strings. Schema per item: {"title": "string (max 80 chars)", "content": "string (escaped, solution steps)", "category": "one of: Account|Technical|Location|Attendance|Device|Other", "tags": ["array", "of", "strings"]} If no valid problem→solution pairs exist in this segment, return an empty array: [] Chat segment: """ def _process_chunk_with_gemini(chunk: Dict) -> List[Dict]: """ Send a single chunk (text + optional images) to Gemini. Returns validated list of article dicts. """ text_part = WHATSAPP_EXTRACTION_PROMPT + chunk["text"] images = chunk.get("images", []) if images and _gemini_client: # Build multimodal content list parts = [text_part] for img_bytes in images: # Detect mime type from magic bytes mime = "image/jpeg" if img_bytes[:4] == b'\x89PNG': mime = "image/png" elif img_bytes[:4] == b'RIFF': mime = "image/webp" parts.append( genai_types.Part.from_bytes(data=img_bytes, mime_type=mime) ) raw = _gemini_multimodal(parts, json_mode=True) else: raw = _gemini_text(text_part, json_mode=True) if not raw: logger.warning("Empty Gemini response for chunk") return [] parsed = _safe_json(raw, []) return _validate_articles(parsed) # ══════════════════════════════════════════════════════════════════════════════ # FEATURE 1 — WhatsApp Export → Knowledge Base (v1.1: ZIP multimodal + chunked) # ══════════════════════════════════════════════════════════════════════════════ @app.post("/api/kb/whatsapp-import") def whatsapp_import(): """ Accepts EITHER: (a) multipart file upload with field "file" containing a .zip WhatsApp export, OR (b) JSON body { "chat_text": "..." } for plain text (legacy support) Processes in sliding-window chunks, sends images to Gemini multimodally. Saves new articles only (additive, dedup by fingerprint). """ all_articles: List[Dict] = [] source_label = "whatsapp_export" # ── Branch A: ZIP upload ────────────────────────────────────────────────── if "file" in request.files: f = request.files["file"] filename = f.filename or "" if not filename.lower().endswith(".zip"): return jsonify({"ok": False, "error": "Expected a .zip WhatsApp export file"}), 400 zip_bytes = f.read() logger.info("WhatsApp ZIP upload: %d bytes, filename=%s", len(zip_bytes), filename) processor = WhatsAppZipProcessor(zip_bytes) if not processor.extract(): return jsonify({"ok": False, "error": "Could not extract chat from ZIP. Ensure it is a valid WhatsApp export."}), 400 if len(processor.chat_text) < 100: return jsonify({"ok": False, "error": "Extracted chat text too short to process"}), 400 chunks = processor.build_chunks() source_label = f"whatsapp_zip:{filename}" for idx, chunk in enumerate(chunks): logger.info("Processing chunk %d/%d", idx + 1, len(chunks)) articles = _process_chunk_with_gemini(chunk) all_articles.extend(articles) logger.info("Chunk %d yielded %d articles (running total: %d)", idx + 1, len(articles), len(all_articles)) # ── Branch B: Legacy plain text JSON body ───────────────────────────────── else: body = request.get_json(silent=True) or {} raw_chat = body.get("chat_text", "").strip() if not raw_chat: return jsonify({"ok": False, "error": "Provide a .zip file upload or chat_text in JSON body"}), 400 if len(raw_chat) < 100: return jsonify({"ok": False, "error": "Chat text too short to process"}), 400 logger.info("WhatsApp plain text import: %d chars", len(raw_chat)) # Chunk the plain text too (handles large exports) lines = raw_chat.splitlines() pseudo_zip = type("PseudoZip", (), { "chat_text": raw_chat, "media_map": {} })() processor = WhatsAppZipProcessor(b"") processor.chat_text = raw_chat processor.media_map = {} chunks = processor.build_chunks() for idx, chunk in enumerate(chunks): logger.info("Processing text chunk %d/%d", idx + 1, len(chunks)) articles = _process_chunk_with_gemini(chunk) all_articles.extend(articles) if not all_articles: logger.info("No articles extracted from this export") return jsonify({ "ok": True, "articles_found": 0, "saved": 0, "skipped_dupes": 0, "note": "No clear problem→solution pairs found in this chat segment" }) stats = _save_kb_articles(all_articles, source_label=source_label) logger.info("WhatsApp import complete: found=%d, %s", len(all_articles), stats) return jsonify({ "ok": True, "articles_found": len(all_articles), "articles": all_articles, # full list — frontend INSERTs to Supabase kb_articles "saved": stats["saved"], "skipped_dupes": stats["skipped"], }) # ══════════════════════════════════════════════════════════════════════════════ # FEATURE 2 — Bulk KB Upload (CSV / Excel / PDF) # ══════════════════════════════════════════════════════════════════════════════ def _extract_text_from_pdf_bytes(pdf_bytes: bytes) -> str: if PYPDF_AVAILABLE: try: reader = pypdf.PdfReader(io.BytesIO(pdf_bytes)) pages = [p.extract_text() or "" for p in reader.pages] text = "\n\n".join(pages).strip() if text: return text except Exception as e: logger.warning("pypdf extraction failed: %s", e) if _gemini_client: try: resp = _gemini_client.models.generate_content( model=GEMINI_MODEL, contents=[ "Extract all text from this PDF document. Return plain text only.", genai_types.Part.from_bytes(data=pdf_bytes, mime_type="application/pdf") ] ) return resp.text or "" except Exception as e: logger.error("Gemini PDF extraction failed: %s", e) return "" PDF_KB_PROMPT = """You are a support knowledge base curator. Convert the following document content into structured KB articles. Each article covers one distinct topic, issue, or procedure. Return ONLY a valid JSON array — no preamble, no markdown fences. All string values must be properly JSON-escaped (no raw newlines inside strings, use \\n). Schema per item: {"title": "string", "content": "string", "category": "one of: Account|Billing|Technical|Feature|Other", "tags": ["string"]} Document content: """ @app.post("/api/kb/bulk-upload") def bulk_upload(): if "file" not in request.files: return jsonify({"ok": False, "error": "No file uploaded"}), 400 f = request.files["file"] filename = f.filename or "" ext = filename.rsplit(".", 1)[-1].lower() file_data = f.read() articles = [] if ext in ("csv", "xlsx", "xls"): if not PANDAS_AVAILABLE: return jsonify({"ok": False, "error": "pandas not installed on server"}), 500 try: df = pd.read_csv(io.BytesIO(file_data)) if ext == "csv" else pd.read_excel(io.BytesIO(file_data)) df.columns = [c.strip().lower() for c in df.columns] if "title" not in df.columns or "content" not in df.columns: return jsonify({"ok": False, "error": "CSV/Excel must have 'title' and 'content' columns"}), 400 for _, row in df.iterrows(): tags = [] if "tags" in df.columns and pd.notna(row.get("tags")): tags = [t.strip() for t in re.split(r"[,;|]", str(row["tags"])) if t.strip()] articles.append({ "title": str(row["title"]).strip(), "content": str(row["content"]).strip(), "category": str(row.get("category", "General")).strip() if pd.notna(row.get("category")) else "General", "tags": tags, }) except Exception as e: return jsonify({"ok": False, "error": f"Could not parse file: {e}"}), 400 elif ext == "pdf": text = _extract_text_from_pdf_bytes(file_data) if not text: return jsonify({"ok": False, "error": "Could not extract text from PDF"}), 400 raw = _gemini_text(PDF_KB_PROMPT + text[:50000], json_mode=True) parsed = _safe_json(raw, []) articles = _validate_articles(parsed) if not articles: return jsonify({"ok": False, "error": "Gemini PDF structuring returned no valid articles"}), 500 else: return jsonify({"ok": False, "error": f"Unsupported file type .{ext}. Use csv, xlsx, or pdf"}), 400 if not articles: return jsonify({"ok": False, "error": "No articles extracted from file"}), 400 stats = _save_kb_articles(articles, source_label=f"bulk_upload:{filename}") return jsonify({"ok": True, "articles_found": len(articles), "articles": articles, # full list — frontend INSERTs to Supabase kb_articles "saved": stats["saved"], "skipped_dupes": stats["skipped"]}) # ══════════════════════════════════════════════════════════════════════════════ # FEATURE 3 — Ticket Submission via NL Text or Voice # ══════════════════════════════════════════════════════════════════════════════ TICKET_EXTRACTION_PROMPT = """You are a support ticket intake system for a software support portal. A user has described their issue in natural language. Extract structured ticket fields. Return ONLY a valid JSON object — no preamble, no markdown fences. All string values must be properly JSON-escaped. Schema: {"title": "string (max 80 chars)", "description": "string (full clear description)", "category_hint": "one of: Account|Billing|Technical|Feature|Other", "priority_hint": "one of: low|medium|high|critical", "keywords": ["string"]} User message: """ def _transcribe_audio_assemblyai(audio_b64: str, audio_format: str = "wav") -> str: if not ASSEMBLYAI_API_KEY: return "" audio_bytes = base64.b64decode(audio_b64) headers = {"authorization": ASSEMBLYAI_API_KEY} try: upload_resp = requests.post( f"{ASSEMBLYAI_BASE}/upload", headers={**headers, "Content-Type": "application/octet-stream"}, data=audio_bytes, timeout=30 ) upload_resp.raise_for_status() upload_url = upload_resp.json().get("upload_url") except Exception as e: logger.error("AssemblyAI upload error: %s", e) return "" try: tx_resp = requests.post( f"{ASSEMBLYAI_BASE}/transcript", headers={**headers, "Content-Type": "application/json"}, json={"audio_url": upload_url, "language_detection": True}, timeout=15 ) tx_resp.raise_for_status() tx_id = tx_resp.json().get("id") except Exception as e: logger.error("AssemblyAI transcript request error: %s", e) return "" for _ in range(30): time.sleep(3) try: poll = requests.get(f"{ASSEMBLYAI_BASE}/transcript/{tx_id}", headers=headers, timeout=15) poll.raise_for_status() result = poll.json() status = result.get("status") if status == "completed": return result.get("text", "") elif status == "error": logger.error("AssemblyAI error: %s", result.get("error")) return "" except Exception as e: logger.error("AssemblyAI poll error: %s", e) return "" @app.post("/api/tickets/submit-nl") def submit_ticket_nl(): body = request.get_json(silent=True) or {} message = body.get("message", "").strip() user_id = body.get("user_id", "anonymous") if not message: return jsonify({"ok": False, "error": "message is required"}), 400 raw = _gemini_text(TICKET_EXTRACTION_PROMPT + message, json_mode=True) ticket = _safe_json(raw, {}) if not isinstance(ticket, dict) or not ticket.get("title"): return jsonify({"ok": False, "error": "Could not extract ticket info from message"}), 500 if db: db.collection("iris_ai_ticket_drafts").add({ "user_id": user_id, "raw_input": message, "extracted": ticket, "channel": "nl_text", "created_at": datetime.now(timezone.utc).isoformat(), }) return jsonify({"ok": True, "ticket": ticket}) @app.post("/api/tickets/submit-voice") def submit_ticket_voice(): body = request.get_json(silent=True) or {} audio_b64 = body.get("audio_b64", "") audio_format = body.get("audio_format", "wav") user_id = body.get("user_id", "anonymous") if not audio_b64: return jsonify({"ok": False, "error": "audio_b64 is required"}), 400 if not ASSEMBLYAI_API_KEY: return jsonify({"ok": False, "error": "AssemblyAI not configured on server"}), 500 transcript = _transcribe_audio_assemblyai(audio_b64, audio_format) if not transcript: return jsonify({"ok": False, "error": "Transcription failed or returned empty result"}), 500 raw = _gemini_text(TICKET_EXTRACTION_PROMPT + transcript, json_mode=True) ticket = _safe_json(raw, {}) if not isinstance(ticket, dict) or not ticket.get("title"): return jsonify({"ok": False, "error": "Could not extract ticket info from transcript"}), 500 if db: db.collection("iris_ai_ticket_drafts").add({ "user_id": user_id, "raw_input": transcript, "extracted": ticket, "channel": "voice", "created_at": datetime.now(timezone.utc).isoformat(), }) return jsonify({"ok": True, "transcript": transcript, "ticket": ticket}) # ══════════════════════════════════════════════════════════════════════════════ # FEATURE 4 — System Tutorial Ingestion # ══════════════════════════════════════════════════════════════════════════════ TUTORIAL_VIDEO_PROMPT = """You are a knowledge base curator watching a tutorial video about the Iris platform. CONTEXT ABOUT IRIS: - Iris is an integrated POS (Point of Sale) and fiscalisation system with a mobile attendance and location-tracking module used by tellers and field reps at retail stores in Zimbabwe. - The POS/fiscal layer handles sales, receipts, and ZIMRA fiscal compliance (Elixir device). - The mobile module handles teller clock-in/out, GPS location, store radius, and hours tracking. - The Iris Support Portal is a customer support desk used by admin staff, agents, and support tiers to manage tickets, agents, customers, and the knowledge base. YOUR TASK: Watch this tutorial video in full. For every distinct feature, workflow, or task you observe being demonstrated, extract one self-contained KB article. Identify the exact timestamp range in the video where each demonstration occurs so users can jump directly to the relevant moment. Be precise about timestamps — state the second at which the demonstration starts and ends. Write step-by-step instructions based on what you see happening on screen, not generic descriptions. If the presenter speaks, incorporate their narration into the steps. Return ONLY a valid JSON array. No preamble, no markdown fences. All strings properly JSON-escaped. Use \n for line breaks within content strings. Schema per item: { "title": "string — concise how-to title, max 80 chars", "content": "string — numbered step-by-step instructions based on what is shown", "category": "one of: Account|Tickets|Agents|Reports|Admin|POS|Attendance|Other", "tags": ["string"], "timestamp_start": , "timestamp_end": } If the video contains no discernible how-to demonstrations, return an empty array: [] """ def _upload_video_to_gemini(video_bytes: bytes, mime_type: str, display_name: str) -> Optional[Any]: """ Upload a video to the Gemini Files API and poll until processing is ACTIVE. Returns the uploaded file object (with .uri and .name) or None on failure. Gemini Files API processes video at 1 FPS, adding timestamps every second. Files are retained for 48 hours. We delete after use to be tidy. """ if not _gemini_client: return None try: # Write bytes to a named temp file — Files API needs a file path or IO object with tempfile.NamedTemporaryFile(suffix=f".{mime_type.split('/')[-1]}", delete=False) as tmp: tmp.write(video_bytes) tmp_path = tmp.name logger.info("Uploading video to Gemini Files API: %s (%d bytes)", display_name, len(video_bytes)) uploaded = _gemini_client.files.upload( file=tmp_path, config={"mime_type": mime_type, "display_name": display_name} ) os.unlink(tmp_path) logger.info("Upload complete. File name: %s — polling for ACTIVE state...", uploaded.name) except Exception as e: logger.error("Gemini Files API upload error: %s", e) return None # Poll until state is ACTIVE (video processing complete) — max ~3 minutes for attempt in range(36): time.sleep(5) try: file_info = _gemini_client.files.get(name=uploaded.name) state = getattr(file_info, "state", None) state_str = str(state).upper() if state else "" logger.info("Poll %d: file state = %s", attempt + 1, state_str) if "ACTIVE" in state_str: logger.info("Video ACTIVE after %d polls (~%ds)", attempt + 1, (attempt + 1) * 5) return file_info elif "FAILED" in state_str: logger.error("Gemini Files API processing failed for %s", uploaded.name) return None except Exception as e: logger.warning("Poll error: %s", e) logger.error("Video did not reach ACTIVE state within timeout") return None def _delete_gemini_file(file_obj: Any) -> None: """Best-effort cleanup of a file from the Gemini Files API.""" try: _gemini_client.files.delete(name=file_obj.name) logger.info("Deleted Gemini file: %s", file_obj.name) except Exception as e: logger.warning("Could not delete Gemini file %s: %s", file_obj.name, e) # Supported video MIME types for tutorial upload SUPPORTED_VIDEO_MIMES = { ".mp4": "video/mp4", ".mov": "video/quicktime", ".avi": "video/x-msvideo", ".webm": "video/webm", ".mkv": "video/x-matroska", ".3gp": "video/3gpp", ".flv": "video/x-flv", } @app.post("/api/kb/tutorial-ingest") def tutorial_ingest(): """ Accepts a tutorial video file upload (multipart, field name "file"). Gemini watches the full video, self-generates timestamps, and extracts one KB article per distinct feature or task demonstrated. No transcript required — Gemini reasons directly from video + audio. Supported: mp4, mov, avi, webm, mkv, 3gp, flv Max practical size: ~500MB (Files API limit is 2GB, but HF Space upload limit applies) Returns articles with timestamp_start/end in seconds so the frontend can generate deep-links into the video. """ if "file" not in request.files: return jsonify({"ok": False, "error": "No file uploaded. Use multipart field name 'file'."}), 400 f = request.files["file"] filename = f.filename or "tutorial" ext = os.path.splitext(filename.lower())[1] video_title = request.form.get("video_title", filename) video_url = request.form.get("video_url", "") mime_type = SUPPORTED_VIDEO_MIMES.get(ext) if not mime_type: return jsonify({ "ok": False, "error": f"Unsupported video format '{ext}'. Supported: {', '.join(SUPPORTED_VIDEO_MIMES)}" }), 400 if not _gemini_client: return jsonify({"ok": False, "error": "Gemini client not initialised — check GOOGLE_API_KEY"}), 500 video_bytes = f.read() logger.info("Tutorial ingest: '%s', %d bytes, mime=%s", video_title, len(video_bytes), mime_type) # Upload to Gemini Files API and wait for processing gemini_file = _upload_video_to_gemini(video_bytes, mime_type, display_name=video_title) if not gemini_file: return jsonify({"ok": False, "error": "Video upload or processing by Gemini failed. Try a smaller file or check the format."}), 500 # Ask Gemini to watch and extract articles with self-generated timestamps try: logger.info("Sending video to Gemini for tutorial extraction...") resp = _gemini_client.models.generate_content( model=GEMINI_MODEL, contents=[gemini_file, TUTORIAL_VIDEO_PROMPT], config=genai_types.GenerateContentConfig( response_mime_type="application/json" ) ) raw = resp.text or "" except Exception as e: logger.error("Gemini video analysis error: %s", e) _delete_gemini_file(gemini_file) return jsonify({"ok": False, "error": f"Gemini analysis failed: {e}"}), 500 finally: # Always attempt cleanup — files expire in 48h anyway but clean up early _delete_gemini_file(gemini_file) parsed = _safe_json(raw, []) articles = _validate_articles(parsed) if isinstance(parsed, list) else [] if not articles: return jsonify({ "ok": False, "error": "Gemini could not extract any how-to articles from this video. " "Ensure the video contains on-screen demonstrations of Iris features." }), 500 # Attach video metadata and normalise timestamp types for a in articles: a["video_url"] = video_url a["video_title"] = video_title for ts_key in ("timestamp_start", "timestamp_end"): val = a.get(ts_key) if not isinstance(val, int): try: a[ts_key] = int(val) if val is not None else 0 except (TypeError, ValueError): a[ts_key] = 0 stats = _save_kb_articles(articles, source_label=f"tutorial:{video_title}") logger.info("Tutorial ingest complete: %d articles, saved=%d, skipped=%d", len(articles), stats["saved"], stats["skipped"]) return jsonify({ "ok": True, "video_title": video_title, "articles_found": len(articles), "articles": articles, # full list — frontend INSERTs to Supabase kb_articles "saved": stats["saved"], "skipped_dupes": stats["skipped"], }) # ══════════════════════════════════════════════════════════════════════════════ # FEATURE 5 — Agent Solution Writing (NL Text + Voice) # ══════════════════════════════════════════════════════════════════════════════ SOLUTION_EXTRACTION_PROMPT = """You are a support knowledge base curator. An agent has described a solution they used to resolve a ticket. Structure this into a reusable KB article. Return ONLY a valid JSON object — no preamble, no markdown fences. All strings must be properly JSON-escaped. Schema: {"title": "string", "content": "string (clear step-by-step solution)", "category": "one of: Account|Billing|Technical|Feature|Other", "tags": ["string"]} Agent description: """ @app.post("/api/kb/agent-solution-nl") def agent_solution_nl(): body = request.get_json(silent=True) or {} message = body.get("message", "").strip() agent_id = body.get("agent_id", "unknown") ticket_id = body.get("ticket_id", "") if not message: return jsonify({"ok": False, "error": "message is required"}), 400 raw = _gemini_text(SOLUTION_EXTRACTION_PROMPT + message, json_mode=True) article = _safe_json(raw, {}) if not isinstance(article, dict) or not article.get("title"): return jsonify({"ok": False, "error": "Could not structure solution"}), 500 if ticket_id: article.setdefault("tags", []).append(f"ticket:{ticket_id}") stats = _save_kb_articles([article], source_label=f"agent:{agent_id}") return jsonify({"ok": True, "saved": stats["saved"], "article": article, # single article — frontend INSERTs to Supabase kb_articles "articles": [article]}) @app.post("/api/kb/agent-solution-voice") def agent_solution_voice(): body = request.get_json(silent=True) or {} audio_b64 = body.get("audio_b64", "") audio_format = body.get("audio_format", "wav") agent_id = body.get("agent_id", "unknown") ticket_id = body.get("ticket_id", "") if not audio_b64: return jsonify({"ok": False, "error": "audio_b64 is required"}), 400 transcript = _transcribe_audio_assemblyai(audio_b64, audio_format) if not transcript: return jsonify({"ok": False, "error": "Transcription failed"}), 500 raw = _gemini_text(SOLUTION_EXTRACTION_PROMPT + transcript, json_mode=True) article = _safe_json(raw, {}) if not isinstance(article, dict) or not article.get("title"): return jsonify({"ok": False, "error": "Could not structure solution from transcript"}), 500 if ticket_id: article.setdefault("tags", []).append(f"ticket:{ticket_id}") stats = _save_kb_articles([article], source_label=f"agent:{agent_id}") return jsonify({"ok": True, "transcript": transcript, "saved": stats["saved"], "article": article, # single article — frontend INSERTs to Supabase kb_articles "articles": [article]}) # ══════════════════════════════════════════════════════════════════════════════ # FEATURE 6 — Iris Chatbot (RAG over KB + Tutorials) # ══════════════════════════════════════════════════════════════════════════════ def _search_kb(query: str, limit: int = 5) -> List[Dict]: if not db: return [] query_terms = [t.lower() for t in query.split() if len(t) > 2] try: docs = db.collection("iris_kb_articles").order_by( "created_at", direction=firestore.Query.DESCENDING ).limit(200).stream() results = [] for doc in docs: d = doc.to_dict() text = f"{d.get('title','')} {d.get('content','')} {' '.join(d.get('tags',[]))}".lower() score = sum(1 for term in query_terms if term in text) if score > 0: results.append({"score": score, **d}) results.sort(key=lambda x: x["score"], reverse=True) return results[:limit] except Exception as e: logger.error("KB search error: %s", e) return [] CHATBOT_SYSTEM_PROMPT = """You are Iris, an intelligent support assistant for the Iris Support Portal. Answer ONLY from the provided knowledge base context. If the answer is in a tutorial with a timestamp, mention the video and timestamp. Be concise, clear, and friendly. Format step-by-step answers as numbered lists. If you cannot find the answer, say so honestly and suggest submitting a ticket. """ @app.post("/api/chatbot/query") def chatbot_query(): body = request.get_json(silent=True) or {} message = body.get("message", "").strip() session_id = body.get("session_id", "default") user_id = body.get("user_id", "anonymous") if not message: return jsonify({"ok": False, "error": "message is required"}), 400 kb_results = _search_kb(message, limit=5) context_blocks = [] sources = [] for r in kb_results: block = f"[Article: {r.get('title')}]\n{r.get('content', '')}" if r.get("timestamp_start") is not None: ts = r["timestamp_start"] block += f"\n(Tutorial: {r.get('video_title','Video')} at {ts//60:02d}:{ts%60:02d}" if r.get("video_url"): block += f" — {r['video_url']}" block += ")" context_blocks.append(block) sources.append({ "title": r.get("title"), "category": r.get("category"), "source": r.get("source"), "ts_start": r.get("timestamp_start"), "video_url": r.get("video_url"), }) context_str = "\n\n---\n\n".join(context_blocks) if context_blocks else "No relevant articles found." full_prompt = f"{CHATBOT_SYSTEM_PROMPT}\n\nKNOWLEDGE BASE CONTEXT:\n{context_str}\n\nUSER QUESTION: {message}\n\nAnswer:" answer = _gemini_text(full_prompt) if not answer: answer = "Sorry, I could not process your question right now. Please try again or submit a support ticket." if db: db.collection("iris_chatbot_logs").add({ "user_id": user_id, "session_id": session_id, "message": message, "answer": answer, "sources": sources, "created_at": datetime.now(timezone.utc).isoformat(), }) return jsonify({"ok": True, "answer": answer, "sources": sources}) # ══════════════════════════════════════════════════════════════════════════════ # KB READ / DELETE ENDPOINTS # ══════════════════════════════════════════════════════════════════════════════ @app.get("/api/kb/articles") def list_kb_articles(): category = request.args.get("category", "") limit = int(request.args.get("limit", 50)) if not db: return jsonify({"ok": False, "error": "Firebase unavailable"}), 500 try: query = db.collection("iris_kb_articles").order_by("created_at", direction=firestore.Query.DESCENDING) if category: query = query.where("category", "==", category) docs = query.limit(limit).stream() articles = [{"id": d.id, **d.to_dict()} for d in docs] return jsonify({"ok": True, "articles": articles, "count": len(articles)}) except Exception as e: return jsonify({"ok": False, "error": str(e)}), 500 @app.delete("/api/kb/articles/") def delete_kb_article(article_id: str): if not db: return jsonify({"ok": False, "error": "Firebase unavailable"}), 500 try: db.collection("iris_kb_articles").document(article_id).delete() return jsonify({"ok": True}) except Exception as e: return jsonify({"ok": False, "error": str(e)}), 500 # ══════════════════════════════════════════════════════════════════════════════ # HEALTH # ══════════════════════════════════════════════════════════════════════════════ @app.get("/health") def health(): article_count = 0 if db: try: docs = db.collection("iris_kb_articles").count().get() article_count = docs[0][0].value except Exception: pass return jsonify({ "ok": True, "service": "Iris AI Service v1.1", "model": GEMINI_MODEL, "gemini": bool(_gemini_client), "assemblyai": bool(ASSEMBLYAI_API_KEY), "firebase": bool(db), "kb_articles": article_count, }) # ══════════════════════════════════════════════════════════════════════════════ # ENTRYPOINT # ══════════════════════════════════════════════════════════════════════════════ if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) logger.info("Iris AI Service v1.1 starting on port %d (model=%s)", port, GEMINI_MODEL) app.run(host="0.0.0.0", port=port)