Spaces:
Sleeping
Sleeping
| """ | |
| main.py — Iris AI Service (v1.1 - April 2026) | |
| AI layer for the Iris Support Portal (IrisPlus / Unified Spark Desk). | |
| Deployed as a HuggingFace Space monofile (Flask + Gemini + AssemblyAI + Firebase). | |
| CHANGELOG v1.1: | |
| - Model: gemini-3.1-flash-lite-preview (multimodal reasoning) | |
| - /api/kb/whatsapp-import: now accepts multipart ZIP upload | |
| * Extracts _chat.txt + maps image files to <Media omitted> pointers | |
| * Sliding-window chunking (~10k tokens / ~40k chars with overlap) | |
| * Multimodal: sends images inline with their surrounding text chunk | |
| * Strict JSON enforcement + pre-save validation | |
| * JSON parse error recovery (regex extraction fallback) | |
| - All other endpoints unchanged from v1.0 | |
| FEATURES: | |
| 1. WhatsApp Export → Knowledge Base (ZIP multimodal, chunked, additive) | |
| 2. Bulk KB Upload (CSV / Excel / PDF) | |
| 3. Natural Language + Voice Ticket Submission | |
| 4. System Tutorial Ingestion (timestamped transcripts) | |
| 5. Agent NL/Voice Solution Writing | |
| 6. Iris Chatbot (KB RAG) | |
| ENV VARS: | |
| GOOGLE_API_KEY — Gemini API key | |
| ASSEMBLYAI_API_KEY — AssemblyAI API key | |
| FIREBASE — JSON string of Firebase service account | |
| GEMINI_MODEL — Override model (default: gemini-3.1-flash-lite-preview) | |
| PORT — Server port (default 7860) | |
| """ | |
| import os | |
| import io | |
| import re | |
| import json | |
| import time | |
| import logging | |
| import base64 | |
| import hashlib | |
| import zipfile | |
| import tempfile | |
| from datetime import datetime, timezone | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import requests | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| # ─── Logging ────────────────────────────────────────────────────────────────── | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(levelname)s | %(message)s" | |
| ) | |
| logger = logging.getLogger("iris-ai-service") | |
| # ─── Gemini SDK ─────────────────────────────────────────────────────────────── | |
| try: | |
| from google import genai | |
| from google.genai import types as genai_types | |
| except Exception as e: | |
| genai = None | |
| logger.error("google-genai not installed: %s", e) | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") | |
| # v1.1: upgraded to gemini-3.1-flash-lite-preview for multimodal reasoning | |
| GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-3.1-flash-lite-preview") | |
| _gemini_client = None | |
| if genai and GOOGLE_API_KEY: | |
| try: | |
| _gemini_client = genai.Client(api_key=GOOGLE_API_KEY) | |
| logger.info("Gemini client ready (model=%s).", GEMINI_MODEL) | |
| except Exception as e: | |
| logger.error("Failed to init Gemini client: %s", e) | |
| # ─── AssemblyAI ─────────────────────────────────────────────────────────────── | |
| ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY", "") | |
| ASSEMBLYAI_BASE = "https://api.assemblyai.com/v2" | |
| # ─── Firebase ───────────────────────────────────────────────────────────────── | |
| try: | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore | |
| FIREBASE_AVAILABLE = True | |
| except ImportError: | |
| FIREBASE_AVAILABLE = False | |
| logger.warning("firebase-admin not installed. Persistence disabled.") | |
| FIREBASE_ENV = os.environ.get("FIREBASE", "") | |
| def init_firestore() -> Optional[Any]: | |
| if not FIREBASE_AVAILABLE: | |
| return None | |
| if firebase_admin._apps: | |
| return firestore.client() | |
| if not FIREBASE_ENV: | |
| logger.warning("FIREBASE env var missing. Persistence disabled.") | |
| return None | |
| try: | |
| sa_info = json.loads(FIREBASE_ENV) | |
| cred = credentials.Certificate(sa_info) | |
| firebase_admin.initialize_app(cred) | |
| logger.info("Firebase initialized.") | |
| return firestore.client() | |
| except Exception as e: | |
| logger.critical("Firebase init failed: %s", e) | |
| return None | |
| db = init_firestore() | |
| # ─── Optional libs ──────────────────────────────────────────────────────────── | |
| try: | |
| import pandas as pd | |
| PANDAS_AVAILABLE = True | |
| except ImportError: | |
| PANDAS_AVAILABLE = False | |
| try: | |
| import pypdf | |
| PYPDF_AVAILABLE = True | |
| except ImportError: | |
| PYPDF_AVAILABLE = False | |
| # ─── Flask App ──────────────────────────────────────────────────────────────── | |
| app = Flask(__name__) | |
| CORS(app) | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # SHARED HELPERS | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # Supported image extensions for multimodal WhatsApp ingestion | |
| SUPPORTED_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"} | |
| # Approx chars per token (conservative for mixed Shona/English/emoji content) | |
| CHARS_PER_TOKEN = 4 | |
| # Target ~10k tokens per chunk with ~1k token overlap | |
| CHUNK_CHARS = 40_000 | |
| OVERLAP_CHARS = 4_000 | |
| def _safe_json(text: str, fallback: Any) -> Any: | |
| """ | |
| Multi-strategy JSON parser. | |
| 1. Direct parse after stripping markdown fences. | |
| 2. Regex extraction of first [...] or {...} block. | |
| 3. Return fallback. | |
| """ | |
| if not text: | |
| return fallback | |
| # Strategy 1: strip fences | |
| clean = text.strip() | |
| for fence in ("```json", "```JSON", "```"): | |
| if fence in clean: | |
| parts = clean.split(fence) | |
| # take the content between the first pair of fences | |
| if len(parts) >= 3: | |
| clean = parts[1].strip() | |
| elif len(parts) == 2: | |
| clean = parts[1].split("```")[0].strip() | |
| break | |
| try: | |
| return json.loads(clean) | |
| except json.JSONDecodeError: | |
| pass | |
| # Strategy 2: regex — find outermost [...] array | |
| arr_match = re.search(r'\[[\s\S]*\]', clean) | |
| if arr_match: | |
| try: | |
| return json.loads(arr_match.group()) | |
| except json.JSONDecodeError: | |
| pass | |
| # Strategy 3: regex — find outermost {...} object | |
| obj_match = re.search(r'\{[\s\S]*\}', clean) | |
| if obj_match: | |
| try: | |
| return json.loads(obj_match.group()) | |
| except json.JSONDecodeError: | |
| pass | |
| logger.error("JSON parse exhausted all strategies. First 300 chars: %s", text[:300]) | |
| return fallback | |
| def _validate_articles(data: Any) -> List[Dict]: | |
| """ | |
| Validate that extracted articles are a list of dicts with required fields. | |
| Filters out malformed items rather than failing the whole batch. | |
| """ | |
| if not isinstance(data, list): | |
| logger.warning("Expected list from Gemini, got %s", type(data)) | |
| return [] | |
| valid = [] | |
| for item in data: | |
| if not isinstance(item, dict): | |
| continue | |
| title = str(item.get("title", "")).strip() | |
| content = str(item.get("content", "")).strip() | |
| if len(title) < 3 or len(content) < 10: | |
| continue | |
| valid.append({ | |
| "title": title, | |
| "content": content, | |
| "category": str(item.get("category", "General")).strip() or "General", | |
| "tags": item.get("tags", []) if isinstance(item.get("tags"), list) else [], | |
| }) | |
| return valid | |
| def _gemini_text(prompt: str, json_mode: bool = False) -> str: | |
| """Call Gemini with text-only content.""" | |
| if not _gemini_client: | |
| return "" | |
| cfg = genai_types.GenerateContentConfig( | |
| response_mime_type="application/json" | |
| ) if json_mode else None | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=prompt, | |
| config=cfg | |
| ) | |
| return resp.text or "" | |
| except Exception as e: | |
| logger.error("Gemini text call error: %s", e) | |
| return "" | |
| def _gemini_multimodal(parts: list, json_mode: bool = False) -> str: | |
| """Call Gemini with a mixed list of text strings and image Parts.""" | |
| if not _gemini_client: | |
| return "" | |
| cfg = genai_types.GenerateContentConfig( | |
| response_mime_type="application/json" | |
| ) if json_mode else None | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=parts, | |
| config=cfg | |
| ) | |
| return resp.text or "" | |
| except Exception as e: | |
| logger.error("Gemini multimodal call error: %s", e) | |
| return "" | |
| def _article_fingerprint(title: str, content: str) -> str: | |
| raw = f"{title.strip().lower()}::{content.strip().lower()[:300]}" | |
| return hashlib.sha256(raw.encode()).hexdigest()[:16] | |
| def _get_existing_fingerprints() -> set: | |
| if not db: | |
| return set() | |
| try: | |
| docs = db.collection("iris_kb_articles").select(["fingerprint"]).stream() | |
| return {d.to_dict().get("fingerprint") for d in docs if d.to_dict().get("fingerprint")} | |
| except Exception as e: | |
| logger.error("Fingerprint fetch error: %s", e) | |
| return set() | |
| def _save_kb_articles(articles: List[Dict], source_label: str) -> Dict: | |
| if not db: | |
| return {"saved": 0, "skipped": 0, "error": "Firebase unavailable"} | |
| existing = _get_existing_fingerprints() | |
| saved, skipped = 0, 0 | |
| for article in articles: | |
| title = article.get("title", "Untitled") | |
| content = article.get("content", "") | |
| fp = _article_fingerprint(title, content) | |
| if fp in existing: | |
| skipped += 1 | |
| continue | |
| doc = { | |
| "title": title, | |
| "content": content, | |
| "category": article.get("category", "General"), | |
| "tags": article.get("tags", []), | |
| "source": source_label, | |
| "fingerprint": fp, | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| if article.get("timestamp_start") is not None: | |
| doc["timestamp_start"] = article["timestamp_start"] | |
| doc["timestamp_end"] = article.get("timestamp_end") | |
| doc["video_url"] = article.get("video_url", "") | |
| db.collection("iris_kb_articles").add(doc) | |
| existing.add(fp) | |
| saved += 1 | |
| return {"saved": saved, "skipped": skipped} | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # WHATSAPP ZIP PROCESSOR | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # Regex to match WhatsApp timestamp lines | |
| # Handles both: DD/MM/YYYY, HH:MM - Sender: message | |
| # and: DD/MM/YYYY, HH:MM am/pm - Sender: message | |
| WA_LINE_RE = re.compile( | |
| r'^\d{1,2}/\d{1,2}/\d{4},\s+\d{1,2}:\d{2}(?:\s*[ap]m)?\s+-\s+', | |
| re.IGNORECASE | |
| ) | |
| # Matches <Media omitted> or [filename.jpg] style media pointers | |
| MEDIA_POINTER_RE = re.compile( | |
| r'<Media omitted>|\[?([^\]]+\.(?:jpg|jpeg|png|webp|gif|mp4|opus|aac|m4a))\]?', | |
| re.IGNORECASE | |
| ) | |
| class WhatsAppZipProcessor: | |
| """ | |
| Handles extraction and multimodal chunking of a WhatsApp .zip export. | |
| A WhatsApp export zip typically contains: | |
| _chat.txt — the full conversation | |
| IMG-YYYYMMDD-*.jpg — attached images | |
| VID-*.mp4 — videos (we skip these, too large) | |
| PTT-*.opus — voice notes (skipped) | |
| """ | |
| def __init__(self, zip_bytes: bytes): | |
| self.zip_bytes = zip_bytes | |
| self.chat_text = "" | |
| self.media_map: Dict[str, bytes] = {} # filename -> raw bytes | |
| def extract(self) -> bool: | |
| """Extract chat text and image files from ZIP. Returns True on success.""" | |
| try: | |
| with zipfile.ZipFile(io.BytesIO(self.zip_bytes)) as zf: | |
| names = zf.namelist() | |
| logger.info("ZIP contains %d files: %s", len(names), names[:20]) | |
| # Find chat file — WhatsApp names it _chat.txt or WhatsApp Chat with *.txt | |
| chat_file = None | |
| for name in names: | |
| base = os.path.basename(name).lower() | |
| if base == "_chat.txt" or (base.endswith(".txt") and "chat" in base): | |
| chat_file = name | |
| break | |
| if not chat_file: | |
| # Fallback: any .txt file | |
| txts = [n for n in names if n.lower().endswith(".txt")] | |
| if txts: | |
| chat_file = txts[0] | |
| if not chat_file: | |
| logger.error("No chat .txt found in ZIP") | |
| return False | |
| raw = zf.read(chat_file) | |
| self.chat_text = raw.decode("utf-8", errors="replace") | |
| logger.info("Chat text extracted: %d chars from %s", len(self.chat_text), chat_file) | |
| # Extract images (skip videos and audio — too large / not useful for KB) | |
| for name in names: | |
| ext = os.path.splitext(name.lower())[1] | |
| if ext in SUPPORTED_IMAGE_EXTS: | |
| try: | |
| self.media_map[os.path.basename(name)] = zf.read(name) | |
| except Exception as e: | |
| logger.warning("Could not read media file %s: %s", name, e) | |
| logger.info("Media files extracted: %d images", len(self.media_map)) | |
| return True | |
| except zipfile.BadZipFile as e: | |
| logger.error("Bad ZIP file: %s", e) | |
| return False | |
| except Exception as e: | |
| logger.error("ZIP extraction error: %s", e) | |
| return False | |
| def _resolve_media_in_line(self, line: str) -> Optional[bytes]: | |
| """ | |
| Given a chat line, check if it references a media file we have. | |
| Returns the image bytes if found, else None. | |
| """ | |
| match = MEDIA_POINTER_RE.search(line) | |
| if not match: | |
| return None | |
| filename = match.group(1) # group 1 = explicit filename, None for <Media omitted> | |
| if filename: | |
| fname = os.path.basename(filename) | |
| if fname in self.media_map: | |
| return self.media_map[fname] | |
| # <Media omitted> — we can't recover the file since it wasn't exported | |
| return None | |
| def build_chunks(self) -> List[Dict]: | |
| """ | |
| Split chat text into overlapping chunks, each annotated with | |
| the image bytes found within that chunk. | |
| Returns list of: | |
| { "text": str, "images": [bytes, ...], "line_range": (start, end) } | |
| """ | |
| lines = self.chat_text.splitlines() | |
| chunks = [] | |
| i = 0 | |
| total = len(lines) | |
| char_count = 0 | |
| chunk_lines: List[str] = [] | |
| chunk_images: List[bytes] = [] | |
| while i < total: | |
| line = lines[i] | |
| chunk_lines.append(line) | |
| char_count += len(line) + 1 # +1 for newline | |
| # Check if this line has an image we can include | |
| img_bytes = self._resolve_media_in_line(line) | |
| if img_bytes and len(chunk_images) < 5: # cap images per chunk | |
| chunk_images.append(img_bytes) | |
| if char_count >= CHUNK_CHARS or i == total - 1: | |
| chunks.append({ | |
| "text": "\n".join(chunk_lines), | |
| "images": chunk_images[:], | |
| "line_range": (i - len(chunk_lines) + 1, i) | |
| }) | |
| logger.info( | |
| "Chunk %d: %d lines, %d chars, %d images", | |
| len(chunks), len(chunk_lines), char_count, len(chunk_images) | |
| ) | |
| # Overlap: keep last OVERLAP_CHARS worth of lines for next chunk | |
| overlap_text = 0 | |
| overlap_start = len(chunk_lines) - 1 | |
| while overlap_start > 0 and overlap_text < OVERLAP_CHARS: | |
| overlap_text += len(chunk_lines[overlap_start]) + 1 | |
| overlap_start -= 1 | |
| chunk_lines = chunk_lines[overlap_start:] | |
| chunk_images = [] | |
| char_count = sum(len(l) + 1 for l in chunk_lines) | |
| i += 1 | |
| logger.info("Total chunks: %d", len(chunks)) | |
| return chunks | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # WHATSAPP EXTRACTION PROMPT | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| WHATSAPP_EXTRACTION_PROMPT = """You are a support knowledge base curator for the Iris platform, deployed across Zimbabwe. | |
| Your task: analyse this WhatsApp support group chat segment and extract ONLY clear problem→solution pairs. | |
| CONTEXT ABOUT THIS PLATFORM: | |
| - "Iris" is an integrated POS (Point of Sale) and fiscalisation system with a mobile attendance and | |
| location-tracking module used by field sales reps and in-store tellers at retail stores. | |
| - The POS and fiscalisation layer handles sales transactions, receipt generation, and ZIMRA fiscal | |
| compliance. The mobile module handles teller clock-in/out, GPS location verification, and hours tracking. | |
| - Common POS/fiscal issues: fiscalisation failures, receipt errors, device not syncing to ZIMRA servers, | |
| Elixir (fiscal device software) login/password problems. | |
| - Common mobile attendance issues: GPS location not detected, clock-in failures, app killed by Android | |
| battery optimiser, teller passkey problems, hours recording incorrectly, store radius too small, | |
| wrong teller name shown after login, app not running in the background. | |
| - Messages mix English, Shona, and Ndebele. Understand regional vernacular (e.g. "irikudzima" = switching | |
| off, "ndakashanda" = I worked, "short yemahours" = hours shortage, "gadzirisayi" = fix it, "hupfu" = flour, | |
| "yakuda kulogwa patsva" = needs to be logged in fresh). | |
| - If screenshots show Android error dialogs (e.g. "Service killed by system", "App stopped", "Abrupt stop"), | |
| reason through what that means for Android background restriction and background service killing, and include | |
| that diagnosis and fix in the solution content. | |
| - If screenshots show fiscal device or POS screens, extract the error code or state shown and reason through | |
| the likely cause from the Elixir/ZIMRA integration context. | |
| STRICT RULES: | |
| 1. Extract ONLY exchanges where a user described a problem AND a named support person (Tendayi, Tony, Violet, | |
| Rufaro, Albrighton, Ishmael, or any named responder) provided a working solution or clear instruction. | |
| 2. Ignore: greetings, media-only messages, deleted messages, clock-in screenshots with no text context, | |
| messages from unknown numbers with no solution attached. | |
| 3. Each article must be self-contained and usable by a support agent in future. | |
| 4. Translate all Shona/Ndebele problem descriptions to English in the article content. | |
| 5. If a screenshot appears to show an Android error or GPS issue, reason through the likely cause and | |
| include that reasoning in the solution content. | |
| OUTPUT FORMAT: Return ONLY a valid JSON array. No preamble, no explanation, no markdown fences. | |
| Every string value MUST be properly JSON-escaped. Do not use unescaped newlines, tabs, or quotes inside strings. | |
| Use \\n for line breaks within content strings. | |
| Schema per item: | |
| {"title": "string (max 80 chars)", "content": "string (escaped, solution steps)", "category": "one of: Account|Technical|Location|Attendance|Device|Other", "tags": ["array", "of", "strings"]} | |
| If no valid problem→solution pairs exist in this segment, return an empty array: [] | |
| Chat segment: | |
| """ | |
| def _process_chunk_with_gemini(chunk: Dict) -> List[Dict]: | |
| """ | |
| Send a single chunk (text + optional images) to Gemini. | |
| Returns validated list of article dicts. | |
| """ | |
| text_part = WHATSAPP_EXTRACTION_PROMPT + chunk["text"] | |
| images = chunk.get("images", []) | |
| if images and _gemini_client: | |
| # Build multimodal content list | |
| parts = [text_part] | |
| for img_bytes in images: | |
| # Detect mime type from magic bytes | |
| mime = "image/jpeg" | |
| if img_bytes[:4] == b'\x89PNG': | |
| mime = "image/png" | |
| elif img_bytes[:4] == b'RIFF': | |
| mime = "image/webp" | |
| parts.append( | |
| genai_types.Part.from_bytes(data=img_bytes, mime_type=mime) | |
| ) | |
| raw = _gemini_multimodal(parts, json_mode=True) | |
| else: | |
| raw = _gemini_text(text_part, json_mode=True) | |
| if not raw: | |
| logger.warning("Empty Gemini response for chunk") | |
| return [] | |
| parsed = _safe_json(raw, []) | |
| return _validate_articles(parsed) | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # FEATURE 1 — WhatsApp Export → Knowledge Base (v1.1: ZIP multimodal + chunked) | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| def whatsapp_import(): | |
| """ | |
| Accepts EITHER: | |
| (a) multipart file upload with field "file" containing a .zip WhatsApp export, OR | |
| (b) JSON body { "chat_text": "..." } for plain text (legacy support) | |
| Processes in sliding-window chunks, sends images to Gemini multimodally. | |
| Saves new articles only (additive, dedup by fingerprint). | |
| """ | |
| all_articles: List[Dict] = [] | |
| source_label = "whatsapp_export" | |
| # ── Branch A: ZIP upload ────────────────────────────────────────────────── | |
| if "file" in request.files: | |
| f = request.files["file"] | |
| filename = f.filename or "" | |
| if not filename.lower().endswith(".zip"): | |
| return jsonify({"ok": False, "error": "Expected a .zip WhatsApp export file"}), 400 | |
| zip_bytes = f.read() | |
| logger.info("WhatsApp ZIP upload: %d bytes, filename=%s", len(zip_bytes), filename) | |
| processor = WhatsAppZipProcessor(zip_bytes) | |
| if not processor.extract(): | |
| return jsonify({"ok": False, "error": "Could not extract chat from ZIP. Ensure it is a valid WhatsApp export."}), 400 | |
| if len(processor.chat_text) < 100: | |
| return jsonify({"ok": False, "error": "Extracted chat text too short to process"}), 400 | |
| chunks = processor.build_chunks() | |
| source_label = f"whatsapp_zip:{filename}" | |
| for idx, chunk in enumerate(chunks): | |
| logger.info("Processing chunk %d/%d", idx + 1, len(chunks)) | |
| articles = _process_chunk_with_gemini(chunk) | |
| all_articles.extend(articles) | |
| logger.info("Chunk %d yielded %d articles (running total: %d)", idx + 1, len(articles), len(all_articles)) | |
| # ── Branch B: Legacy plain text JSON body ───────────────────────────────── | |
| else: | |
| body = request.get_json(silent=True) or {} | |
| raw_chat = body.get("chat_text", "").strip() | |
| if not raw_chat: | |
| return jsonify({"ok": False, "error": "Provide a .zip file upload or chat_text in JSON body"}), 400 | |
| if len(raw_chat) < 100: | |
| return jsonify({"ok": False, "error": "Chat text too short to process"}), 400 | |
| logger.info("WhatsApp plain text import: %d chars", len(raw_chat)) | |
| # Chunk the plain text too (handles large exports) | |
| lines = raw_chat.splitlines() | |
| pseudo_zip = type("PseudoZip", (), { | |
| "chat_text": raw_chat, | |
| "media_map": {} | |
| })() | |
| processor = WhatsAppZipProcessor(b"") | |
| processor.chat_text = raw_chat | |
| processor.media_map = {} | |
| chunks = processor.build_chunks() | |
| for idx, chunk in enumerate(chunks): | |
| logger.info("Processing text chunk %d/%d", idx + 1, len(chunks)) | |
| articles = _process_chunk_with_gemini(chunk) | |
| all_articles.extend(articles) | |
| if not all_articles: | |
| logger.info("No articles extracted from this export") | |
| return jsonify({ | |
| "ok": True, | |
| "articles_found": 0, | |
| "saved": 0, | |
| "skipped_dupes": 0, | |
| "note": "No clear problem→solution pairs found in this chat segment" | |
| }) | |
| stats = _save_kb_articles(all_articles, source_label=source_label) | |
| logger.info("WhatsApp import complete: found=%d, %s", len(all_articles), stats) | |
| return jsonify({ | |
| "ok": True, | |
| "articles_found": len(all_articles), | |
| "articles": all_articles, # full list — frontend INSERTs to Supabase kb_articles | |
| "saved": stats["saved"], | |
| "skipped_dupes": stats["skipped"], | |
| }) | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # FEATURE 2 — Bulk KB Upload (CSV / Excel / PDF) | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| def _extract_text_from_pdf_bytes(pdf_bytes: bytes) -> str: | |
| if PYPDF_AVAILABLE: | |
| try: | |
| reader = pypdf.PdfReader(io.BytesIO(pdf_bytes)) | |
| pages = [p.extract_text() or "" for p in reader.pages] | |
| text = "\n\n".join(pages).strip() | |
| if text: | |
| return text | |
| except Exception as e: | |
| logger.warning("pypdf extraction failed: %s", e) | |
| if _gemini_client: | |
| try: | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=[ | |
| "Extract all text from this PDF document. Return plain text only.", | |
| genai_types.Part.from_bytes(data=pdf_bytes, mime_type="application/pdf") | |
| ] | |
| ) | |
| return resp.text or "" | |
| except Exception as e: | |
| logger.error("Gemini PDF extraction failed: %s", e) | |
| return "" | |
| PDF_KB_PROMPT = """You are a support knowledge base curator. | |
| Convert the following document content into structured KB articles. | |
| Each article covers one distinct topic, issue, or procedure. | |
| Return ONLY a valid JSON array — no preamble, no markdown fences. | |
| All string values must be properly JSON-escaped (no raw newlines inside strings, use \\n). | |
| Schema per item: | |
| {"title": "string", "content": "string", "category": "one of: Account|Billing|Technical|Feature|Other", "tags": ["string"]} | |
| Document content: | |
| """ | |
| def bulk_upload(): | |
| if "file" not in request.files: | |
| return jsonify({"ok": False, "error": "No file uploaded"}), 400 | |
| f = request.files["file"] | |
| filename = f.filename or "" | |
| ext = filename.rsplit(".", 1)[-1].lower() | |
| file_data = f.read() | |
| articles = [] | |
| if ext in ("csv", "xlsx", "xls"): | |
| if not PANDAS_AVAILABLE: | |
| return jsonify({"ok": False, "error": "pandas not installed on server"}), 500 | |
| try: | |
| df = pd.read_csv(io.BytesIO(file_data)) if ext == "csv" else pd.read_excel(io.BytesIO(file_data)) | |
| df.columns = [c.strip().lower() for c in df.columns] | |
| if "title" not in df.columns or "content" not in df.columns: | |
| return jsonify({"ok": False, "error": "CSV/Excel must have 'title' and 'content' columns"}), 400 | |
| for _, row in df.iterrows(): | |
| tags = [] | |
| if "tags" in df.columns and pd.notna(row.get("tags")): | |
| tags = [t.strip() for t in re.split(r"[,;|]", str(row["tags"])) if t.strip()] | |
| articles.append({ | |
| "title": str(row["title"]).strip(), | |
| "content": str(row["content"]).strip(), | |
| "category": str(row.get("category", "General")).strip() if pd.notna(row.get("category")) else "General", | |
| "tags": tags, | |
| }) | |
| except Exception as e: | |
| return jsonify({"ok": False, "error": f"Could not parse file: {e}"}), 400 | |
| elif ext == "pdf": | |
| text = _extract_text_from_pdf_bytes(file_data) | |
| if not text: | |
| return jsonify({"ok": False, "error": "Could not extract text from PDF"}), 400 | |
| raw = _gemini_text(PDF_KB_PROMPT + text[:50000], json_mode=True) | |
| parsed = _safe_json(raw, []) | |
| articles = _validate_articles(parsed) | |
| if not articles: | |
| return jsonify({"ok": False, "error": "Gemini PDF structuring returned no valid articles"}), 500 | |
| else: | |
| return jsonify({"ok": False, "error": f"Unsupported file type .{ext}. Use csv, xlsx, or pdf"}), 400 | |
| if not articles: | |
| return jsonify({"ok": False, "error": "No articles extracted from file"}), 400 | |
| stats = _save_kb_articles(articles, source_label=f"bulk_upload:{filename}") | |
| return jsonify({"ok": True, "articles_found": len(articles), "articles": articles, # full list — frontend INSERTs to Supabase kb_articles | |
| "saved": stats["saved"], "skipped_dupes": stats["skipped"]}) | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # FEATURE 3 — Ticket Submission via NL Text or Voice | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| TICKET_EXTRACTION_PROMPT = """You are a support ticket intake system for a software support portal. | |
| A user has described their issue in natural language. Extract structured ticket fields. | |
| Return ONLY a valid JSON object — no preamble, no markdown fences. | |
| All string values must be properly JSON-escaped. | |
| Schema: | |
| {"title": "string (max 80 chars)", "description": "string (full clear description)", "category_hint": "one of: Account|Billing|Technical|Feature|Other", "priority_hint": "one of: low|medium|high|critical", "keywords": ["string"]} | |
| User message: | |
| """ | |
| def _transcribe_audio_assemblyai(audio_b64: str, audio_format: str = "wav") -> str: | |
| if not ASSEMBLYAI_API_KEY: | |
| return "" | |
| audio_bytes = base64.b64decode(audio_b64) | |
| headers = {"authorization": ASSEMBLYAI_API_KEY} | |
| try: | |
| upload_resp = requests.post( | |
| f"{ASSEMBLYAI_BASE}/upload", | |
| headers={**headers, "Content-Type": "application/octet-stream"}, | |
| data=audio_bytes, timeout=30 | |
| ) | |
| upload_resp.raise_for_status() | |
| upload_url = upload_resp.json().get("upload_url") | |
| except Exception as e: | |
| logger.error("AssemblyAI upload error: %s", e) | |
| return "" | |
| try: | |
| tx_resp = requests.post( | |
| f"{ASSEMBLYAI_BASE}/transcript", | |
| headers={**headers, "Content-Type": "application/json"}, | |
| json={"audio_url": upload_url, "language_detection": True}, timeout=15 | |
| ) | |
| tx_resp.raise_for_status() | |
| tx_id = tx_resp.json().get("id") | |
| except Exception as e: | |
| logger.error("AssemblyAI transcript request error: %s", e) | |
| return "" | |
| for _ in range(30): | |
| time.sleep(3) | |
| try: | |
| poll = requests.get(f"{ASSEMBLYAI_BASE}/transcript/{tx_id}", headers=headers, timeout=15) | |
| poll.raise_for_status() | |
| result = poll.json() | |
| status = result.get("status") | |
| if status == "completed": | |
| return result.get("text", "") | |
| elif status == "error": | |
| logger.error("AssemblyAI error: %s", result.get("error")) | |
| return "" | |
| except Exception as e: | |
| logger.error("AssemblyAI poll error: %s", e) | |
| return "" | |
| def submit_ticket_nl(): | |
| body = request.get_json(silent=True) or {} | |
| message = body.get("message", "").strip() | |
| user_id = body.get("user_id", "anonymous") | |
| if not message: | |
| return jsonify({"ok": False, "error": "message is required"}), 400 | |
| raw = _gemini_text(TICKET_EXTRACTION_PROMPT + message, json_mode=True) | |
| ticket = _safe_json(raw, {}) | |
| if not isinstance(ticket, dict) or not ticket.get("title"): | |
| return jsonify({"ok": False, "error": "Could not extract ticket info from message"}), 500 | |
| if db: | |
| db.collection("iris_ai_ticket_drafts").add({ | |
| "user_id": user_id, "raw_input": message, | |
| "extracted": ticket, "channel": "nl_text", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| }) | |
| return jsonify({"ok": True, "ticket": ticket}) | |
| def submit_ticket_voice(): | |
| body = request.get_json(silent=True) or {} | |
| audio_b64 = body.get("audio_b64", "") | |
| audio_format = body.get("audio_format", "wav") | |
| user_id = body.get("user_id", "anonymous") | |
| if not audio_b64: | |
| return jsonify({"ok": False, "error": "audio_b64 is required"}), 400 | |
| if not ASSEMBLYAI_API_KEY: | |
| return jsonify({"ok": False, "error": "AssemblyAI not configured on server"}), 500 | |
| transcript = _transcribe_audio_assemblyai(audio_b64, audio_format) | |
| if not transcript: | |
| return jsonify({"ok": False, "error": "Transcription failed or returned empty result"}), 500 | |
| raw = _gemini_text(TICKET_EXTRACTION_PROMPT + transcript, json_mode=True) | |
| ticket = _safe_json(raw, {}) | |
| if not isinstance(ticket, dict) or not ticket.get("title"): | |
| return jsonify({"ok": False, "error": "Could not extract ticket info from transcript"}), 500 | |
| if db: | |
| db.collection("iris_ai_ticket_drafts").add({ | |
| "user_id": user_id, "raw_input": transcript, | |
| "extracted": ticket, "channel": "voice", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| }) | |
| return jsonify({"ok": True, "transcript": transcript, "ticket": ticket}) | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # FEATURE 4 — System Tutorial Ingestion | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| TUTORIAL_VIDEO_PROMPT = """You are a knowledge base curator watching a tutorial video about the Iris platform. | |
| CONTEXT ABOUT IRIS: | |
| - Iris is an integrated POS (Point of Sale) and fiscalisation system with a mobile attendance and | |
| location-tracking module used by tellers and field reps at retail stores in Zimbabwe. | |
| - The POS/fiscal layer handles sales, receipts, and ZIMRA fiscal compliance (Elixir device). | |
| - The mobile module handles teller clock-in/out, GPS location, store radius, and hours tracking. | |
| - The Iris Support Portal is a customer support desk used by admin staff, agents, and support tiers | |
| to manage tickets, agents, customers, and the knowledge base. | |
| YOUR TASK: | |
| Watch this tutorial video in full. For every distinct feature, workflow, or task you observe being | |
| demonstrated, extract one self-contained KB article. Identify the exact timestamp range in the video | |
| where each demonstration occurs so users can jump directly to the relevant moment. | |
| Be precise about timestamps — state the second at which the demonstration starts and ends. | |
| Write step-by-step instructions based on what you see happening on screen, not generic descriptions. | |
| If the presenter speaks, incorporate their narration into the steps. | |
| Return ONLY a valid JSON array. No preamble, no markdown fences. All strings properly JSON-escaped. | |
| Use \n for line breaks within content strings. | |
| Schema per item: | |
| { | |
| "title": "string — concise how-to title, max 80 chars", | |
| "content": "string — numbered step-by-step instructions based on what is shown", | |
| "category": "one of: Account|Tickets|Agents|Reports|Admin|POS|Attendance|Other", | |
| "tags": ["string"], | |
| "timestamp_start": <integer — seconds from video start where this demo begins>, | |
| "timestamp_end": <integer — seconds from video start where this demo ends> | |
| } | |
| If the video contains no discernible how-to demonstrations, return an empty array: [] | |
| """ | |
| def _upload_video_to_gemini(video_bytes: bytes, mime_type: str, display_name: str) -> Optional[Any]: | |
| """ | |
| Upload a video to the Gemini Files API and poll until processing is ACTIVE. | |
| Returns the uploaded file object (with .uri and .name) or None on failure. | |
| Gemini Files API processes video at 1 FPS, adding timestamps every second. | |
| Files are retained for 48 hours. We delete after use to be tidy. | |
| """ | |
| if not _gemini_client: | |
| return None | |
| try: | |
| # Write bytes to a named temp file — Files API needs a file path or IO object | |
| with tempfile.NamedTemporaryFile(suffix=f".{mime_type.split('/')[-1]}", delete=False) as tmp: | |
| tmp.write(video_bytes) | |
| tmp_path = tmp.name | |
| logger.info("Uploading video to Gemini Files API: %s (%d bytes)", display_name, len(video_bytes)) | |
| uploaded = _gemini_client.files.upload( | |
| file=tmp_path, | |
| config={"mime_type": mime_type, "display_name": display_name} | |
| ) | |
| os.unlink(tmp_path) | |
| logger.info("Upload complete. File name: %s — polling for ACTIVE state...", uploaded.name) | |
| except Exception as e: | |
| logger.error("Gemini Files API upload error: %s", e) | |
| return None | |
| # Poll until state is ACTIVE (video processing complete) — max ~3 minutes | |
| for attempt in range(36): | |
| time.sleep(5) | |
| try: | |
| file_info = _gemini_client.files.get(name=uploaded.name) | |
| state = getattr(file_info, "state", None) | |
| state_str = str(state).upper() if state else "" | |
| logger.info("Poll %d: file state = %s", attempt + 1, state_str) | |
| if "ACTIVE" in state_str: | |
| logger.info("Video ACTIVE after %d polls (~%ds)", attempt + 1, (attempt + 1) * 5) | |
| return file_info | |
| elif "FAILED" in state_str: | |
| logger.error("Gemini Files API processing failed for %s", uploaded.name) | |
| return None | |
| except Exception as e: | |
| logger.warning("Poll error: %s", e) | |
| logger.error("Video did not reach ACTIVE state within timeout") | |
| return None | |
| def _delete_gemini_file(file_obj: Any) -> None: | |
| """Best-effort cleanup of a file from the Gemini Files API.""" | |
| try: | |
| _gemini_client.files.delete(name=file_obj.name) | |
| logger.info("Deleted Gemini file: %s", file_obj.name) | |
| except Exception as e: | |
| logger.warning("Could not delete Gemini file %s: %s", file_obj.name, e) | |
| # Supported video MIME types for tutorial upload | |
| SUPPORTED_VIDEO_MIMES = { | |
| ".mp4": "video/mp4", | |
| ".mov": "video/quicktime", | |
| ".avi": "video/x-msvideo", | |
| ".webm": "video/webm", | |
| ".mkv": "video/x-matroska", | |
| ".3gp": "video/3gpp", | |
| ".flv": "video/x-flv", | |
| } | |
| def tutorial_ingest(): | |
| """ | |
| Accepts a tutorial video file upload (multipart, field name "file"). | |
| Gemini watches the full video, self-generates timestamps, and extracts | |
| one KB article per distinct feature or task demonstrated. | |
| No transcript required — Gemini reasons directly from video + audio. | |
| Supported: mp4, mov, avi, webm, mkv, 3gp, flv | |
| Max practical size: ~500MB (Files API limit is 2GB, but HF Space upload limit applies) | |
| Returns articles with timestamp_start/end in seconds so the frontend | |
| can generate deep-links into the video. | |
| """ | |
| if "file" not in request.files: | |
| return jsonify({"ok": False, "error": "No file uploaded. Use multipart field name 'file'."}), 400 | |
| f = request.files["file"] | |
| filename = f.filename or "tutorial" | |
| ext = os.path.splitext(filename.lower())[1] | |
| video_title = request.form.get("video_title", filename) | |
| video_url = request.form.get("video_url", "") | |
| mime_type = SUPPORTED_VIDEO_MIMES.get(ext) | |
| if not mime_type: | |
| return jsonify({ | |
| "ok": False, | |
| "error": f"Unsupported video format '{ext}'. Supported: {', '.join(SUPPORTED_VIDEO_MIMES)}" | |
| }), 400 | |
| if not _gemini_client: | |
| return jsonify({"ok": False, "error": "Gemini client not initialised — check GOOGLE_API_KEY"}), 500 | |
| video_bytes = f.read() | |
| logger.info("Tutorial ingest: '%s', %d bytes, mime=%s", video_title, len(video_bytes), mime_type) | |
| # Upload to Gemini Files API and wait for processing | |
| gemini_file = _upload_video_to_gemini(video_bytes, mime_type, display_name=video_title) | |
| if not gemini_file: | |
| return jsonify({"ok": False, "error": "Video upload or processing by Gemini failed. Try a smaller file or check the format."}), 500 | |
| # Ask Gemini to watch and extract articles with self-generated timestamps | |
| try: | |
| logger.info("Sending video to Gemini for tutorial extraction...") | |
| resp = _gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=[gemini_file, TUTORIAL_VIDEO_PROMPT], | |
| config=genai_types.GenerateContentConfig( | |
| response_mime_type="application/json" | |
| ) | |
| ) | |
| raw = resp.text or "" | |
| except Exception as e: | |
| logger.error("Gemini video analysis error: %s", e) | |
| _delete_gemini_file(gemini_file) | |
| return jsonify({"ok": False, "error": f"Gemini analysis failed: {e}"}), 500 | |
| finally: | |
| # Always attempt cleanup — files expire in 48h anyway but clean up early | |
| _delete_gemini_file(gemini_file) | |
| parsed = _safe_json(raw, []) | |
| articles = _validate_articles(parsed) if isinstance(parsed, list) else [] | |
| if not articles: | |
| return jsonify({ | |
| "ok": False, | |
| "error": "Gemini could not extract any how-to articles from this video. " | |
| "Ensure the video contains on-screen demonstrations of Iris features." | |
| }), 500 | |
| # Attach video metadata and normalise timestamp types | |
| for a in articles: | |
| a["video_url"] = video_url | |
| a["video_title"] = video_title | |
| for ts_key in ("timestamp_start", "timestamp_end"): | |
| val = a.get(ts_key) | |
| if not isinstance(val, int): | |
| try: | |
| a[ts_key] = int(val) if val is not None else 0 | |
| except (TypeError, ValueError): | |
| a[ts_key] = 0 | |
| stats = _save_kb_articles(articles, source_label=f"tutorial:{video_title}") | |
| logger.info("Tutorial ingest complete: %d articles, saved=%d, skipped=%d", | |
| len(articles), stats["saved"], stats["skipped"]) | |
| return jsonify({ | |
| "ok": True, | |
| "video_title": video_title, | |
| "articles_found": len(articles), | |
| "articles": articles, # full list — frontend INSERTs to Supabase kb_articles | |
| "saved": stats["saved"], | |
| "skipped_dupes": stats["skipped"], | |
| }) | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # FEATURE 5 — Agent Solution Writing (NL Text + Voice) | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| SOLUTION_EXTRACTION_PROMPT = """You are a support knowledge base curator. | |
| An agent has described a solution they used to resolve a ticket. | |
| Structure this into a reusable KB article. | |
| Return ONLY a valid JSON object — no preamble, no markdown fences. | |
| All strings must be properly JSON-escaped. | |
| Schema: | |
| {"title": "string", "content": "string (clear step-by-step solution)", "category": "one of: Account|Billing|Technical|Feature|Other", "tags": ["string"]} | |
| Agent description: | |
| """ | |
| def agent_solution_nl(): | |
| body = request.get_json(silent=True) or {} | |
| message = body.get("message", "").strip() | |
| agent_id = body.get("agent_id", "unknown") | |
| ticket_id = body.get("ticket_id", "") | |
| if not message: | |
| return jsonify({"ok": False, "error": "message is required"}), 400 | |
| raw = _gemini_text(SOLUTION_EXTRACTION_PROMPT + message, json_mode=True) | |
| article = _safe_json(raw, {}) | |
| if not isinstance(article, dict) or not article.get("title"): | |
| return jsonify({"ok": False, "error": "Could not structure solution"}), 500 | |
| if ticket_id: | |
| article.setdefault("tags", []).append(f"ticket:{ticket_id}") | |
| stats = _save_kb_articles([article], source_label=f"agent:{agent_id}") | |
| return jsonify({"ok": True, "saved": stats["saved"], | |
| "article": article, # single article — frontend INSERTs to Supabase kb_articles | |
| "articles": [article]}) | |
| def agent_solution_voice(): | |
| body = request.get_json(silent=True) or {} | |
| audio_b64 = body.get("audio_b64", "") | |
| audio_format = body.get("audio_format", "wav") | |
| agent_id = body.get("agent_id", "unknown") | |
| ticket_id = body.get("ticket_id", "") | |
| if not audio_b64: | |
| return jsonify({"ok": False, "error": "audio_b64 is required"}), 400 | |
| transcript = _transcribe_audio_assemblyai(audio_b64, audio_format) | |
| if not transcript: | |
| return jsonify({"ok": False, "error": "Transcription failed"}), 500 | |
| raw = _gemini_text(SOLUTION_EXTRACTION_PROMPT + transcript, json_mode=True) | |
| article = _safe_json(raw, {}) | |
| if not isinstance(article, dict) or not article.get("title"): | |
| return jsonify({"ok": False, "error": "Could not structure solution from transcript"}), 500 | |
| if ticket_id: | |
| article.setdefault("tags", []).append(f"ticket:{ticket_id}") | |
| stats = _save_kb_articles([article], source_label=f"agent:{agent_id}") | |
| return jsonify({"ok": True, "transcript": transcript, "saved": stats["saved"], | |
| "article": article, # single article — frontend INSERTs to Supabase kb_articles | |
| "articles": [article]}) | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # FEATURE 6 — Iris Chatbot (RAG over KB + Tutorials) | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| def _search_kb(query: str, limit: int = 5) -> List[Dict]: | |
| if not db: | |
| return [] | |
| query_terms = [t.lower() for t in query.split() if len(t) > 2] | |
| try: | |
| docs = db.collection("iris_kb_articles").order_by( | |
| "created_at", direction=firestore.Query.DESCENDING | |
| ).limit(200).stream() | |
| results = [] | |
| for doc in docs: | |
| d = doc.to_dict() | |
| text = f"{d.get('title','')} {d.get('content','')} {' '.join(d.get('tags',[]))}".lower() | |
| score = sum(1 for term in query_terms if term in text) | |
| if score > 0: | |
| results.append({"score": score, **d}) | |
| results.sort(key=lambda x: x["score"], reverse=True) | |
| return results[:limit] | |
| except Exception as e: | |
| logger.error("KB search error: %s", e) | |
| return [] | |
| CHATBOT_SYSTEM_PROMPT = """You are Iris, an intelligent support assistant for the Iris Support Portal. | |
| Answer ONLY from the provided knowledge base context. | |
| If the answer is in a tutorial with a timestamp, mention the video and timestamp. | |
| Be concise, clear, and friendly. Format step-by-step answers as numbered lists. | |
| If you cannot find the answer, say so honestly and suggest submitting a ticket. | |
| """ | |
| def chatbot_query(): | |
| body = request.get_json(silent=True) or {} | |
| message = body.get("message", "").strip() | |
| session_id = body.get("session_id", "default") | |
| user_id = body.get("user_id", "anonymous") | |
| if not message: | |
| return jsonify({"ok": False, "error": "message is required"}), 400 | |
| kb_results = _search_kb(message, limit=5) | |
| context_blocks = [] | |
| sources = [] | |
| for r in kb_results: | |
| block = f"[Article: {r.get('title')}]\n{r.get('content', '')}" | |
| if r.get("timestamp_start") is not None: | |
| ts = r["timestamp_start"] | |
| block += f"\n(Tutorial: {r.get('video_title','Video')} at {ts//60:02d}:{ts%60:02d}" | |
| if r.get("video_url"): | |
| block += f" — {r['video_url']}" | |
| block += ")" | |
| context_blocks.append(block) | |
| sources.append({ | |
| "title": r.get("title"), | |
| "category": r.get("category"), | |
| "source": r.get("source"), | |
| "ts_start": r.get("timestamp_start"), | |
| "video_url": r.get("video_url"), | |
| }) | |
| context_str = "\n\n---\n\n".join(context_blocks) if context_blocks else "No relevant articles found." | |
| full_prompt = f"{CHATBOT_SYSTEM_PROMPT}\n\nKNOWLEDGE BASE CONTEXT:\n{context_str}\n\nUSER QUESTION: {message}\n\nAnswer:" | |
| answer = _gemini_text(full_prompt) | |
| if not answer: | |
| answer = "Sorry, I could not process your question right now. Please try again or submit a support ticket." | |
| if db: | |
| db.collection("iris_chatbot_logs").add({ | |
| "user_id": user_id, "session_id": session_id, | |
| "message": message, "answer": answer, "sources": sources, | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| }) | |
| return jsonify({"ok": True, "answer": answer, "sources": sources}) | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # KB READ / DELETE ENDPOINTS | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| def list_kb_articles(): | |
| category = request.args.get("category", "") | |
| limit = int(request.args.get("limit", 50)) | |
| if not db: | |
| return jsonify({"ok": False, "error": "Firebase unavailable"}), 500 | |
| try: | |
| query = db.collection("iris_kb_articles").order_by("created_at", direction=firestore.Query.DESCENDING) | |
| if category: | |
| query = query.where("category", "==", category) | |
| docs = query.limit(limit).stream() | |
| articles = [{"id": d.id, **d.to_dict()} for d in docs] | |
| return jsonify({"ok": True, "articles": articles, "count": len(articles)}) | |
| except Exception as e: | |
| return jsonify({"ok": False, "error": str(e)}), 500 | |
| def delete_kb_article(article_id: str): | |
| if not db: | |
| return jsonify({"ok": False, "error": "Firebase unavailable"}), 500 | |
| try: | |
| db.collection("iris_kb_articles").document(article_id).delete() | |
| return jsonify({"ok": True}) | |
| except Exception as e: | |
| return jsonify({"ok": False, "error": str(e)}), 500 | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # HEALTH | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| def health(): | |
| article_count = 0 | |
| if db: | |
| try: | |
| docs = db.collection("iris_kb_articles").count().get() | |
| article_count = docs[0][0].value | |
| except Exception: | |
| pass | |
| return jsonify({ | |
| "ok": True, | |
| "service": "Iris AI Service v1.1", | |
| "model": GEMINI_MODEL, | |
| "gemini": bool(_gemini_client), | |
| "assemblyai": bool(ASSEMBLYAI_API_KEY), | |
| "firebase": bool(db), | |
| "kb_articles": article_count, | |
| }) | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # ENTRYPOINT | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| logger.info("Iris AI Service v1.1 starting on port %d (model=%s)", port, GEMINI_MODEL) | |
| app.run(host="0.0.0.0", port=port) |