Spaces:
Sleeping
Sleeping
| # main.py — Hugging Face backend (drop-in) | |
| import os | |
| import json | |
| import re | |
| import requests | |
| import builtins | |
| from typing import Optional, Any, Dict, List, Tuple | |
| from contextlib import asynccontextmanager | |
| from datetime import datetime | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from dotenv import load_dotenv | |
| from difflib import SequenceMatcher | |
| # --- WMS API integration imports --- | |
| # NOTE: these must exist in your repo (you shared them earlier) | |
| from public_api.public_api_auth import login | |
| from public_api.public_api_item import get_item | |
| from public_api.public_api_inventory import get_inventory_holds | |
| from public_api.public_api_location import get_location_status | |
| from public_api.utils import flatten_json, extract_fields | |
| from public_api.field_mapping import TOOL_REGISTRY, ALL_TOOLS | |
| # KB services (keep your current modules) | |
| from services.kb_creation import ( | |
| collection, | |
| ingest_documents, | |
| hybrid_search_knowledge_base, | |
| get_section_text, | |
| get_best_steps_section_text, | |
| get_best_errors_section_text, | |
| get_escalation_text, | |
| ) | |
| # ServiceNow helpers (keep your current modules) | |
| from services.login import router as login_router | |
| from services.generate_ticket import get_valid_token, create_incident | |
| # ============================================================================= | |
| # Environment / Gemini config | |
| # ============================================================================= | |
| load_dotenv() | |
| VERIFY_SSL = os.getenv("SERVICENOW_SSL_VERIFY", "true").lower() in ("1", "true", "yes") | |
| GEMINI_SSL_VERIFY = os.getenv("GEMINI_SSL_VERIFY", "true").lower() in ("1", "true", "yes") | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| # Safer header-based auth; model can be flash or flash-lite. | |
| # If you prefer your friend's exact style, set GEMINI_URL_WITH_KEY in secrets to override. | |
| GEMINI_URL = os.getenv( | |
| "GEMINI_URL_WITH_KEY", | |
| "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent" | |
| ) | |
| os.environ["POSTHOG_DISABLED"] = "true" | |
| # --- API WMS session cache --- | |
| _WMS_SESSION_DATA: Optional[dict] = None | |
| _WMS_WAREHOUSE_ID: Optional[str] = None | |
| def _gemini_post(payload: dict, timeout: int = 25): | |
| """ | |
| Central helper to call Gemini. Supports either header API key or URL ?key=... | |
| """ | |
| headers = {"Content-Type": "application/json"} | |
| # If URL doesn't contain ?key=..., send header | |
| if "key=" not in GEMINI_URL: | |
| if not GEMINI_API_KEY: | |
| # return a mock-ish response object for graceful handling | |
| return type("Resp", (), { | |
| "status_code": 401, | |
| "json": lambda: {"error": {"message": "Missing GEMINI_API_KEY"}}, | |
| "text": "Missing GEMINI_API_KEY", | |
| "raise_for_status": lambda: None, | |
| })() | |
| headers["x-goog-api-key"] = GEMINI_API_KEY | |
| return requests.post(GEMINI_URL, headers=headers, json=payload, timeout=timeout, verify=GEMINI_SSL_VERIFY) | |
| def _ensure_wms_session() -> bool: | |
| """Login once to WMS and cache session_data + default warehouse.""" | |
| global _WMS_SESSION_DATA, _WMS_WAREHOUSE_ID | |
| if _WMS_SESSION_DATA and _WMS_WAREHOUSE_ID: | |
| return True | |
| try: | |
| session, warehouse_id, ok = login() | |
| if ok and session: | |
| _WMS_SESSION_DATA = {"wms_auth": session, "user_warehouse_id": warehouse_id} | |
| _WMS_WAREHOUSE_ID = warehouse_id | |
| return True | |
| print("[WMS] login unsuccessful: ok=", ok, "warehouse_id=", warehouse_id) | |
| except Exception as e: | |
| print("[WMS] login failed:", e) | |
| return False | |
| # ============================================================================= | |
| # Minimal server-side cache (used to populate short description later) | |
| # ============================================================================= | |
| LAST_ISSUE_HINT: str = "" | |
| def safe_str(e: Any) -> str: | |
| try: | |
| return builtins.str(e) | |
| except Exception: | |
| return "<error stringify failed>" | |
| # ============================================================================= | |
| # App / Lifespan | |
| # ============================================================================= | |
| async def lifespan(app: FastAPI): | |
| try: | |
| folder_path = os.path.join(os.getcwd(), "documents") | |
| if collection.count() == 0: | |
| print("[KB] empty. Running ingestion...") | |
| ingest_documents(folder_path) | |
| else: | |
| print(f"[KB] already populated with {collection.count()} entries. Skipping ingestion.") | |
| except Exception as e: | |
| print(f"[KB] ingestion failed: {safe_str(e)}") | |
| yield | |
| app = FastAPI(lifespan=lifespan) | |
| app.include_router(login_router) | |
| # CORS — add your Hugging Face frontend Space URL here | |
| origins = [ | |
| "https://chatbotnova-chatbot-frontend.hf.space", | |
| # "http://localhost:5173", | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ============================================================================= | |
| # Models (align to your frontend) | |
| # ============================================================================= | |
| class ChatInput(BaseModel): | |
| user_message: str | |
| prev_status: Optional[str] = None | |
| last_issue: Optional[str] = None | |
| class IncidentInput(BaseModel): | |
| short_description: str | |
| description: str | |
| mark_resolved: Optional[bool] = False | |
| class TicketDescInput(BaseModel): | |
| issue: str | |
| class TicketStatusInput(BaseModel): | |
| sys_id: Optional[str] = None | |
| number: Optional[str] = None | |
| STATE_MAP = { | |
| "1": "New", | |
| "2": "In Progress", | |
| "3": "On Hold", | |
| "6": "Resolved", | |
| "7": "Closed", | |
| "8": "Canceled", | |
| } | |
| # ============================================================================= | |
| # WMS Tool Orchestration (fast-path + Gemini function-calling) | |
| # ============================================================================= | |
| def _wms_fastpath(user_text: str) -> Optional[dict]: | |
| """ | |
| Deterministic regex-based path for Item/Location/Holds. | |
| Returns a LIVE_API table dict if matched; else None. | |
| """ | |
| if not _ensure_wms_session(): | |
| return { | |
| "bot_response": "⚠️ Could not login to WMS from this environment. Please try again or raise a ticket.", | |
| "status": "PARTIAL", | |
| "context_found": False, | |
| "ask_resolved": False, | |
| "suggest_incident": True, | |
| "followup": "Shall I raise an incident now?", | |
| "source": "ERROR", | |
| "debug": {"intent": "wms_login_failed_fastpath"}, | |
| } | |
| session_data = _WMS_SESSION_DATA | |
| default_wh = _WMS_WAREHOUSE_ID | |
| text = (user_text or "").strip() | |
| # Item: "Get item number 100001 ..." / "Show item 100001 ..." | |
| m_item = re.search(r"\bitem\s+(?:number|id)?\s*[: ]?([A-Za-z0-9\-]+)", text, flags=re.IGNORECASE) | |
| if m_item: | |
| item_num = m_item.group(1) | |
| try: | |
| raw = get_item(session_data, default_wh, item_num) | |
| if isinstance(raw, dict) and raw.get("error"): | |
| return { | |
| "bot_response": f"⚠️ {raw['error']}", | |
| "status": "PARTIAL", | |
| "context_found": False, | |
| "suggest_incident": True, | |
| "followup": "Should I raise a ServiceNow ticket?", | |
| "source": "ERROR", | |
| "debug": {"intent": "get_item_fastpath_error"}, | |
| } | |
| data_list = raw.get("data", []) or [] | |
| if not data_list: | |
| return { | |
| "bot_response": "I searched WMS but couldn't find matching records.", | |
| "status": "PARTIAL", | |
| "context_found": False, | |
| "suggest_incident": True, | |
| "followup": "Do you want me to raise a ticket or try a different ID?", | |
| "source": "LIVE_API", | |
| "debug": {"intent": "get_item_fastpath_empty"}, | |
| } | |
| cfg = TOOL_REGISTRY["get_item_data"] | |
| flat = flatten_json(data_list[0]) | |
| requested_labels = [] | |
| # Try mapping labels automatically if the user lists fields | |
| labels = set(cfg["mapping"].keys()) | |
| for lbl in labels: | |
| if re.search(rf"\b{re.escape(lbl)}\b", text, flags=re.IGNORECASE): | |
| requested_labels.append(lbl) | |
| if requested_labels: | |
| filtered_map = {k: v for k, v in cfg["mapping"].items() if k in requested_labels or k == "Item"} | |
| order = ["Item"] + [f for f in requested_labels if f != "Item"] | |
| else: | |
| summary_keys = ["Item", "Description", "Item type", "Warehouse ID"] | |
| filtered_map = {k: v for k, v in cfg["mapping"].items() if k in summary_keys} | |
| order = summary_keys | |
| cleaned = extract_fields(flat, filtered_map, cfg.get("formatters", {}), requested_order=order) | |
| lines = [f"{k}: {cleaned.get(k, 'N/A')}" for k in order] | |
| msg = "Details:\n" + "\n".join(lines) | |
| return { | |
| "bot_response": msg, | |
| "status": "OK", | |
| "context_found": True, | |
| "source": "LIVE_API", | |
| "type": "table", | |
| "data": [data_list[0]], | |
| "show_export": False, | |
| "debug": {"intent": "get_item_fastpath", "warehouse_id": default_wh}, | |
| } | |
| except Exception as e: | |
| print("[WMS fastpath:item] error:", e) | |
| # Location: "Check location A1-01-01" | |
| m_loc = re.search(r"\b(?:check|status|loc(?:ation)?)\s+(?:bin\s+)?([A-Za-z0-9\-]+)", text, flags=re.IGNORECASE) | |
| if m_loc: | |
| stoloc = m_loc.group(1) | |
| try: | |
| raw = get_location_status(session_data, default_wh, stoloc) | |
| if isinstance(raw, dict) and raw.get("error"): | |
| return { | |
| "bot_response": f"⚠️ {raw['error']}", | |
| "status": "PARTIAL", | |
| "context_found": False, | |
| "suggest_incident": True, | |
| "followup": "Should I raise a ServiceNow ticket?", | |
| "source": "ERROR", | |
| "debug": {"intent": "get_location_fastpath_error"}, | |
| } | |
| data_list = raw.get("data", []) or [] | |
| cfg = TOOL_REGISTRY["get_location_status"] | |
| rows = [] | |
| for item in data_list: | |
| rows.append(extract_fields(flatten_json(item), cfg["mapping"], cfg.get("formatters", {}))) | |
| msg = f"Location '{stoloc}' status:\n" + (("\n" + "; ".join([f"{k}: {v}" for k, v in rows[0].items()])) if rows else "No status returned") | |
| return { | |
| "bot_response": msg, | |
| "status": "OK", | |
| "context_found": True, | |
| "source": "LIVE_API", | |
| "type": "table", | |
| "data": data_list, | |
| "show_export": False, | |
| "debug": {"intent": "get_location_fastpath", "warehouse_id": default_wh}, | |
| } | |
| except Exception as e: | |
| print("[WMS fastpath:location] error:", e) | |
| # Holds: "Show holds for LPN 123456" / "Show all inventory holds" | |
| m_lpn = re.search(r"\b(?:lpn|lodnum)\s*[: ]?([A-Za-z0-9\-]+)", text, flags=re.IGNORECASE) | |
| want_all_holds = re.search(r"\b(all\s+inventory\s+holds|list\s+holds|show\s+holds)\b", text, flags=re.IGNORECASE) | |
| if m_lpn or want_all_holds: | |
| kwargs = {} | |
| if m_lpn: | |
| kwargs["lodnum"] = m_lpn.group(1) | |
| try: | |
| raw = get_inventory_holds(session_data, default_wh, **kwargs) | |
| if isinstance(raw, dict) and raw.get("error"): | |
| return { | |
| "bot_response": f"⚠️ {raw['error']}", | |
| "status": "PARTIAL", | |
| "context_found": False, | |
| "suggest_incident": True, | |
| "followup": "Should I raise a ServiceNow ticket?", | |
| "source": "ERROR", | |
| "debug": {"intent": "get_holds_fastpath_error"}, | |
| } | |
| data_list = raw.get("data", []) or [] | |
| if not data_list: | |
| return { | |
| "bot_response": "I searched WMS but couldn't find matching records.", | |
| "status": "PARTIAL", | |
| "context_found": False, | |
| "suggest_incident": True, | |
| "followup": "Do you want me to raise a ticket or try a different ID?", | |
| "source": "LIVE_API", | |
| "debug": {"intent": "get_holds_fastpath_empty"}, | |
| } | |
| cfg = TOOL_REGISTRY["get_inventory_holds"] | |
| rows_for_text, total_qty = [], 0 | |
| for item in data_list: | |
| row = extract_fields(flatten_json(item), cfg["mapping"], cfg.get("formatters", {})) | |
| rows_for_text.append("• " + "; ".join(f"{k}: {v}" for k, v in row.items())) | |
| try: | |
| total_qty += int(item.get("untqty", 0) or 0) | |
| except Exception: | |
| pass | |
| msg = f"Found {len(rows_for_text)} hold records (Total Qty: {total_qty}).\n" + "\n".join(rows_for_text[:10]) | |
| return { | |
| "bot_response": msg, | |
| "status": "OK", | |
| "context_found": True, | |
| "source": "LIVE_API", | |
| "type": "table", | |
| "data": data_list, | |
| "show_export": True, | |
| "debug": {"intent": "get_holds_fastpath", "warehouse_id": default_wh}, | |
| } | |
| except Exception as e: | |
| print("[WMS fastpath:holds] error:", e) | |
| return None # no fast-path match | |
| def _try_wms_tool(user_text: str) -> Optional[dict]: | |
| """ | |
| Orchestrates WMS tool calls: | |
| 1) Fast-path (regex) | |
| 2) Gemini function-calling (fallback) | |
| """ | |
| # Fast-path first (robust on HF even if functionCall is missing) | |
| fast = _wms_fastpath(user_text) | |
| if fast is not None: | |
| return fast | |
| # Deep fallback: Gemini function calling (try both camelCase & snake_case tool keys) | |
| payload = { | |
| "contents": [{"role": "user", "parts": [{"text": user_text}]}], | |
| # We include both casings to maximize compatibility. | |
| "tools": [ | |
| {"functionDeclarations": ALL_TOOLS}, # REST canonical | |
| {"function_declarations": ALL_TOOLS}, # friend-style | |
| ], | |
| "toolConfig": {"functionCallingConfig": {"mode": "ANY"}}, # nudge model to call tools | |
| } | |
| try: | |
| resp = _gemini_post(payload, timeout=25) | |
| # If REST key mismatch, this ensures no crash | |
| try: | |
| resp.raise_for_status() | |
| except Exception: | |
| pass | |
| data = {} | |
| try: | |
| data = resp.json() | |
| except Exception: | |
| pass | |
| candidates = data.get("candidates", []) | |
| if not candidates: | |
| return None | |
| content = candidates[0].get("content", {}) | |
| parts = content.get("parts", []) | |
| part = parts[0] if parts else {} | |
| fn_call = part.get("functionCall") | |
| if not fn_call or "name" not in fn_call: | |
| return None | |
| if not _ensure_wms_session(): | |
| return { | |
| "bot_response": "⚠️ Could not login to WMS from this environment.", | |
| "status": "PARTIAL", | |
| "context_found": False, | |
| "suggest_incident": True, | |
| "followup": "Shall I raise an incident?", | |
| "source": "ERROR", | |
| "debug": {"intent": "wms_login_failed_functioncall"}, | |
| } | |
| session_data = _WMS_SESSION_DATA | |
| default_wh = _WMS_WAREHOUSE_ID | |
| tool_name = fn_call["name"] | |
| args = fn_call.get("args", {}) or {} | |
| cfg = TOOL_REGISTRY.get(tool_name) | |
| if not cfg: | |
| return None | |
| # Warehouse override | |
| wh = ( | |
| args.pop("warehouse_id", None) | |
| or args.pop("warehouseId", None) | |
| or args.pop("wh_id", None) | |
| or default_wh | |
| ) | |
| tool_fn = cfg.get("function") | |
| if not callable(tool_fn): | |
| return { | |
| "bot_response": f"⚠️ Tool '{tool_name}' is not callable. Check TOOL_REGISTRY.", | |
| "status": "PARTIAL", | |
| "context_found": False, | |
| "suggest_incident": True, | |
| "followup": "Should I raise a ServiceNow ticket?", | |
| "source": "ERROR", | |
| "debug": {"intent": "wms_config_error", "tool": tool_name}, | |
| } | |
| # Correct invocation by signature | |
| if tool_name == "get_inventory_holds": | |
| raw = tool_fn(session_data, wh, **args) | |
| elif tool_name == "get_location_status": | |
| stoloc = args.get("stoloc") | |
| raw = tool_fn(session_data, wh, stoloc) | |
| else: | |
| id_param = cfg.get("id_param") # "item_number" | |
| target = args.get(id_param) | |
| if id_param and target is None: | |
| simple_vals = [v for v in args.values() if isinstance(v, (str, int))] | |
| target = simple_vals[0] if simple_vals else None | |
| if not target: | |
| return { | |
| "bot_response": f"Please share the {id_param} (e.g., item number).", | |
| "status": "PARTIAL", | |
| "context_found": False, | |
| "suggest_incident": False, | |
| "followup": None, | |
| "source": "CLIENT", | |
| "debug": {"intent": "wms_missing_id_param", "tool": tool_name}, | |
| } | |
| extra_kwargs = {k: v for k, v in args.items() if k != id_param} | |
| raw = tool_fn(session_data, wh, target, **extra_kwargs) | |
| if isinstance(raw, dict) and raw.get("error"): | |
| return { | |
| "bot_response": f"⚠️ {raw['error']}", | |
| "status": "PARTIAL", | |
| "context_found": False, | |
| "suggest_incident": True, | |
| "followup": "Should I raise a ServiceNow ticket?", | |
| "source": "ERROR", | |
| "debug": {"intent": "wms_error", "tool": tool_name}, | |
| } | |
| response_key = cfg.get("response_key", "data") | |
| data_list = (raw.get(response_key, []) if isinstance(raw, dict) else []) or [] | |
| if not data_list: | |
| return { | |
| "bot_response": "I searched WMS but couldn't find matching records.", | |
| "status": "PARTIAL", | |
| "context_found": False, | |
| "suggest_incident": True, | |
| "followup": "Do you want me to raise a ticket or try a different ID?", | |
| "source": "LIVE_API", | |
| "debug": {"intent": "wms_empty", "tool": tool_name}, | |
| } | |
| # Table/summary formatting | |
| if tool_name in ("get_inventory_holds", "get_location_status"): | |
| rows_for_text, total_qty = [], 0 | |
| for item in data_list: | |
| row = extract_fields(flatten_json(item), cfg["mapping"], cfg.get("formatters", {})) | |
| rows_for_text.append("• " + "; ".join(f"{k}: {v}" for k, v in row.items())) | |
| if tool_name == "get_inventory_holds": | |
| try: | |
| total_qty += int(item.get("untqty", 0) or 0) | |
| except Exception: | |
| pass | |
| if tool_name == "get_inventory_holds": | |
| msg = f"Found {len(rows_for_text)} hold records (Total Qty: {total_qty}).\n" + "\n".join(rows_for_text[:10]) | |
| show_export = True | |
| else: | |
| target_loc = args.get("stoloc") or "" | |
| msg = f"Location '{target_loc}' status:\n" + ("\n".join(rows_for_text[:1]) if rows_for_text else "No status returned") | |
| show_export = False | |
| return { | |
| "bot_response": msg, | |
| "status": "OK", | |
| "context_found": True, | |
| "source": "LIVE_API", | |
| "type": "table", | |
| "data": data_list, | |
| "show_export": show_export, | |
| "debug": {"intent": tool_name, "warehouse_id": wh}, | |
| } | |
| # Item: summary + one-row table | |
| flat = flatten_json(data_list[0]) | |
| requested = args.get("fields", []) | |
| if requested: | |
| filtered_map = {k: v for k, v in cfg["mapping"].items() if k in requested or k == "Item"} | |
| order = ["Item"] + [f for f in requested if f != "Item"] | |
| else: | |
| summary_keys = ["Item", "Description", "Item type", "Warehouse ID"] | |
| filtered_map = {k: v for k, v in cfg["mapping"].items() if k in summary_keys} | |
| order = summary_keys | |
| cleaned = extract_fields(flat, filtered_map, cfg.get("formatters", {}), requested_order=order) | |
| lines = [f"{k}: {cleaned.get(k, 'N/A')}" for k in order] | |
| msg = "Details:\n" + "\n".join(lines) | |
| return { | |
| "bot_response": msg, | |
| "status": "OK", | |
| "context_found": True, | |
| "source": "LIVE_API", | |
| "type": "table", | |
| "data": [data_list[0]], | |
| "show_export": False, | |
| "debug": {"intent": "get_item_data", "warehouse_id": wh}, | |
| } | |
| except Exception as e: | |
| print("[WMS] tool call error:", e) | |
| return None | |
| # ============================================================================= | |
| # Health | |
| # ============================================================================= | |
| async def health_check(): | |
| return {"status": "ok"} | |
| # ============================================================================= | |
| # Chat (aligns to your frontend) | |
| # ============================================================================= | |
| async def chat_with_ai(input_data: ChatInput): | |
| global LAST_ISSUE_HINT | |
| assist_followup: Optional[str] = None | |
| try: | |
| msg_norm = (input_data.user_message or "").lower().strip() | |
| # yes/no handlers (keep simple UX) | |
| if msg_norm in ("yes", "y", "sure", "ok", "okay"): | |
| return { | |
| "bot_response": "Great! Tell me what you'd like to do next — check another ticket, create an incident, or describe your issue.", | |
| "status": "OK", | |
| "followup": "You can say: 'create ticket', 'incident status INC0012345', or describe your problem.", | |
| "options": [], | |
| "debug": {"intent": "continue_conversation"}, | |
| } | |
| if msg_norm in ("no", "no thanks", "nope"): | |
| return { | |
| "bot_response": "No problem. Do you need assistance with any other issue?", | |
| "status": "OK", | |
| "end_chat": False, | |
| "followup": None, | |
| "options": [], | |
| "debug": {"intent": "end_conversation"}, | |
| } | |
| # Resolution ack (optional auto incident) | |
| is_llm_resolved = _classify_resolution_llm(input_data.user_message) | |
| if _has_negation_resolved(msg_norm): | |
| is_llm_resolved = False | |
| if (not _has_negation_resolved(msg_norm)) and (_is_resolution_ack_heuristic(msg_norm) or is_llm_resolved): | |
| try: | |
| issue_hint = (input_data.last_issue or "").strip() or LAST_ISSUE_HINT.strip() | |
| short_desc, long_desc = _build_tracking_descriptions(issue_hint, input_data.user_message) | |
| result = create_incident(short_desc, long_desc) | |
| if isinstance(result, dict) and not result.get("error"): | |
| inc_number = result.get("number", "<unknown>") | |
| sys_id = result.get("sys_id") | |
| resolved_note = "" | |
| if sys_id: | |
| ok = _set_incident_resolved(sys_id) | |
| resolved_note = " (marked Resolved)" if ok else " (could not mark Resolved; please update manually)" | |
| return { | |
| "bot_response": f"✅ Incident created: {inc_number}{resolved_note}", | |
| "status": "OK", | |
| "context_found": False, | |
| "ask_resolved": False, | |
| "suggest_incident": False, | |
| "followup": None, | |
| "debug": {"intent": "resolved_ack", "auto_created": True}, | |
| } | |
| else: | |
| err = (result or {}).get("error", "Unknown error") | |
| return { | |
| "bot_response": f"⚠️ I couldn't create the tracking incident automatically ({err}).", | |
| "status": "PARTIAL", | |
| "suggest_incident": True, | |
| "followup": "Shall I create a ticket now?", | |
| "options": [{"type": "yesno", "title": "Create ticket now?"}], | |
| "debug": {"intent": "resolved_ack_error"}, | |
| } | |
| except Exception as e: | |
| return { | |
| "bot_response": f"⚠️ Something went wrong while creating the tracking incident: {safe_str(e)}", | |
| "status": "PARTIAL", | |
| "suggest_incident": True, | |
| "followup": "Shall I create a ticket now?", | |
| "options": [{"type": "yesno", "title": "Create ticket now?"}], | |
| "debug": {"intent": "resolved_ack_exception"}, | |
| } | |
| # Incident intent | |
| if _is_incident_intent(msg_norm): | |
| return { | |
| "bot_response": "Okay, let's create a ServiceNow incident.", | |
| "status": (input_data.prev_status or "PARTIAL"), | |
| "context_found": False, | |
| "ask_resolved": False, | |
| "suggest_incident": False, | |
| "show_incident_form": True, | |
| "followup": None, | |
| "debug": {"intent": "create_ticket"}, | |
| } | |
| # Ticket status intent | |
| status_intent = _parse_ticket_status_intent(msg_norm) | |
| if status_intent: | |
| if status_intent.get("ask_number"): | |
| return { | |
| "bot_response": ( | |
| "To check a ticket status, please share the Incident ID (e.g., INC0012345).\n\n" | |
| "You can paste the ID here or say 'cancel'." | |
| ), | |
| "status": "PARTIAL", | |
| "show_status_form": True, | |
| "debug": {"intent": "status_request_missing_id"}, | |
| } | |
| try: | |
| token = get_valid_token() | |
| instance_url = os.getenv("SERVICENOW_INSTANCE_URL") | |
| if not instance_url: | |
| raise HTTPException(status_code=500, detail="SERVICENOW_INSTANCE_URL missing") | |
| headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} | |
| number = status_intent.get("number") | |
| url = f"{instance_url}/api/now/table/incident?number={number}" | |
| response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25) | |
| data = response.json() | |
| lst = data.get("result", []) | |
| result = (lst or [{}])[0] if response.status_code == 200 else {} | |
| state_code = builtins.str(result.get("state", "unknown")) | |
| state_label = STATE_MAP.get(state_code, state_code) | |
| short = result.get("short_description", "") | |
| num = result.get("number", number or "unknown") | |
| return { | |
| "bot_response": ( | |
| f"**Ticket:** {num}\n" | |
| f"**Status:** {state_label}\n" | |
| f"**Issue description:** {short}" | |
| ), | |
| "status": "OK", | |
| "show_assist_card": True, | |
| "followup": "Is there anything else I can assist you with?", | |
| "debug": {"intent": "status", "http_status": response.status_code}, | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=safe_str(e)) | |
| # --- Try WMS (fast-path + function-calling) BEFORE KB --- | |
| wms_res = _try_wms_tool(input_data.user_message) | |
| if wms_res is not None: | |
| return wms_res | |
| # ---------------- Hybrid KB fallback ---------------- | |
| kb_results = hybrid_search_knowledge_base(input_data.user_message, top_k=10, alpha=0.6, beta=0.4) | |
| documents = kb_results.get("documents", []) | |
| metadatas = kb_results.get("metadatas", []) | |
| distances = kb_results.get("distances", []) | |
| combined = kb_results.get("combined_scores", []) | |
| items: List[Dict[str, Any]] = [] | |
| for i, doc in enumerate(documents): | |
| text = doc.strip() if isinstance(doc, str) else "" | |
| if not text: | |
| continue | |
| meta = metadatas[i] if i < len(metadatas) and isinstance(metadatas[i], dict) else {} | |
| score = distances[i] if i < len(distances) else None | |
| comb = combined[i] if i < len(combined) else None | |
| m = dict(meta) | |
| if score is not None: | |
| m["distance"] = score | |
| if comb is not None: | |
| m["combined"] = comb | |
| items.append({"text": text, "meta": m}) | |
| selected = items[:max(1, 2)] | |
| context_raw = "\n\n---\n\n".join([s["text"] for s in selected]) if selected else "" | |
| filtered_text, filt_info = _filter_context_for_query(context_raw, input_data.user_message) | |
| context = filtered_text | |
| context_found = bool(context.strip()) | |
| best_distance = (min([d for d in distances if d is not None], default=None) if distances else None) | |
| best_combined = (max([c for c in combined if c is not None], default=None) if combined else None) | |
| detected_intent = kb_results.get("user_intent", "neutral") | |
| best_doc = kb_results.get("best_doc") | |
| top_meta = (metadatas or [{}])[0] if metadatas else {} | |
| msg_low = (input_data.user_message or "").lower() | |
| GENERIC_ERROR_TERMS = ("error", "issue", "problem", "not working", "failed", "failure") | |
| generic_error_signal = any(t in msg_low for t in GENERIC_ERROR_TERMS) | |
| # intent nudge for prereqs | |
| PREREQ_TERMS = ( | |
| "pre req", "pre-requisite", "pre-requisites", "prerequisite", | |
| "prerequisites", "pre requirement", "pre-requirements", "requirements", | |
| ) | |
| if detected_intent == "neutral" and any(t in msg_low for t in PREREQ_TERMS): | |
| detected_intent = "prereqs" | |
| # permission queries force 'errors' | |
| PERM_QUERY_TERMS = [ | |
| "permission", "permissions", "access", "access right", "authorization", "authorisation", | |
| "role", "role access", "security", "security profile", "privilege", | |
| "not allowed", "not authorized", "denied", | |
| ] | |
| is_perm_query = any(t in msg_norm for t in PERM_QUERY_TERMS) | |
| if is_perm_query: | |
| detected_intent = "errors" | |
| sec_title = (top_meta or {}).get("section", "") or "" | |
| sec_title_low = sec_title.strip().lower() | |
| PREREQ_HEADINGS = ("pre-requisites", "prerequisites", "pre requisites", "pre-requirements", "requirements") | |
| if detected_intent == "neutral" and any(h in sec_title_low for h in PREREQ_HEADINGS): | |
| detected_intent = "prereqs" | |
| def _contains_any(s: str, keywords: tuple) -> bool: | |
| low = (s or "").lower() | |
| return any(k in low for k in keywords) | |
| DOMAIN_TERMS = ( | |
| "trailer", "shipment", "order", "load", "wave", | |
| "inventory", "putaway", "receiving", "appointment", | |
| "dock", "door", "manifest", "pallet", "container", | |
| "asn", "grn", "pick", "picking", | |
| ) | |
| ACTION_OR_ERROR_TERMS = ( | |
| "how to", "procedure", "perform", "close", "closing", "open", "navigate", "scan", | |
| "confirm", "generate", "update", "receive", "receiving", "error", "issue", "fail", "failed", | |
| "not working", "locked", "mismatch", "access", "permission", "status", | |
| ) | |
| matched_count = int(filt_info.get("matched_count") or 0) | |
| filter_mode = (filt_info.get("mode") or "").lower() | |
| has_any_action_or_error = _contains_any(msg_low, ACTION_OR_ERROR_TERMS) | |
| mentions_domain = _contains_any(msg_low, DOMAIN_TERMS) | |
| short_query = len((input_data.user_message or "").split()) <= 4 | |
| gate_combined_ok = 0.60 if short_query else 0.55 | |
| combined_ok = (best_combined is not None and best_combined >= gate_combined_ok) | |
| weak_domain_only = (mentions_domain and not has_any_action_or_error) | |
| low_context_hit = (matched_count < 2 and filter_mode in ("concise", "exact")) | |
| strong_steps_bypass = True | |
| strong_error_signal = len(_detect_error_families(msg_low)) > 0 | |
| if (weak_domain_only or (low_context_hit and not combined_ok)) \ | |
| and not strong_steps_bypass \ | |
| and not (strong_error_signal or generic_error_signal): | |
| return { | |
| "bot_response": _build_clarifying_message(), | |
| "status": "NO_KB_MATCH", | |
| "context_found": False, | |
| "suggest_incident": True, | |
| "followup": "Share more details (module/screen/error), or say 'create ticket'.", | |
| "options": [{"type": "yesno", "title": "Share details or raise a ticket?"}], | |
| "debug": { | |
| "intent": "sop_rejected_weak_match", | |
| "matched_count": matched_count, | |
| "filter_mode": filter_mode, | |
| "best_combined": best_combined, | |
| "mentions_domain": mentions_domain, | |
| "has_any_action_or_error": has_any_action_or_error, | |
| }, | |
| } | |
| # Build SOP/Errors/Prereqs context | |
| escalation_line: Optional[str] = None | |
| full_errors: Optional[str] = None | |
| next_step_applied = False | |
| next_step_info: Dict[str, Any] = {} | |
| context_preformatted = False | |
| steps_override_applied = False | |
| if best_doc and detected_intent == "steps": | |
| sec = (top_meta or {}).get("section") | |
| full_steps = get_section_text(best_doc, sec) if sec else get_best_steps_section_text(best_doc) | |
| if full_steps: | |
| numbered_full = _ensure_numbering(full_steps) | |
| next_only = _anchor_next_steps(input_data.user_message, numbered_full, max_next=6) | |
| if next_only is not None: | |
| if len(next_only) == 0: | |
| context = "You are at the final step of this SOP. No further steps." | |
| next_step_applied = True | |
| next_step_info = {"count": 0} | |
| context_preformatted = True | |
| else: | |
| context = _dedupe_lines(_ensure_numbering("\n".join(next_only))) | |
| next_step_applied = True | |
| next_step_info = {"count": len(next_only)} | |
| context_preformatted = True | |
| else: | |
| context = numbered_full | |
| context_preformatted = True | |
| filt_info = {'mode': None, 'matched_count': None, 'all_sentences': None} | |
| context_found = True | |
| elif best_doc and detected_intent == "errors": | |
| said_not_resolved = _has_negation_resolved(msg_norm) | |
| if said_not_resolved: | |
| return { | |
| "bot_response": "Select an option below.", | |
| "status": "OK", | |
| "context_found": False, | |
| "show_incident_form": False, | |
| "show_status_form": False, | |
| "followup": None, | |
| "debug": {"intent": "errors_not_resolved", "best_doc": best_doc}, | |
| } | |
| full_errors = get_best_errors_section_text(best_doc) | |
| if full_errors: | |
| ctx_err = _extract_errors_only(full_errors, max_lines=30) | |
| if is_perm_query: | |
| context = _filter_permission_lines(ctx_err, max_lines=6) | |
| else: | |
| mentions_domain_local = any(t in msg_low for t in DOMAIN_TERMS) | |
| is_specific_error = (len(_detect_error_families(msg_low)) > 0) or mentions_domain_local | |
| if is_specific_error: | |
| context = _filter_context_for_query(ctx_err, input_data.user_message)[0] | |
| if not context.strip(): | |
| all_lines = _normalize_lines(ctx_err) | |
| error_bullets = [ln for ln in all_lines if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln)] | |
| context = "\n".join(error_bullets[:6]).strip() | |
| else: | |
| all_lines = _normalize_lines(ctx_err) | |
| error_bullets = [ln for ln in all_lines if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln)] | |
| context = "\n".join(error_bullets[:6]).strip() | |
| assist_followup = ( | |
| "Please tell me which error above matches your screen (paste the exact text), " | |
| "or share a screenshot. I can guide you further or raise a ServiceNow ticket." | |
| ) | |
| escalation_line = _extract_escalation_line(full_errors) | |
| try: | |
| steps_src = get_best_steps_section_text(best_doc) | |
| if steps_src: | |
| numbered_steps = _ensure_numbering(steps_src) | |
| next_only = _anchor_next_steps(input_data.user_message, numbered_steps, max_next=6) | |
| if next_only is not None and len(next_only) > 0: | |
| context = _dedupe_lines(_ensure_numbering("\n".join(next_only))) | |
| context_preformatted = True | |
| next_step_applied = True | |
| next_step_info = {"count": len(next_only), "source": "errors_domain_override"} | |
| steps_override_applied = True | |
| except Exception: | |
| pass | |
| elif best_doc and detected_intent == "prereqs": | |
| full_prereqs = _find_prereq_section_text(best_doc) | |
| if full_prereqs: | |
| context = full_prereqs.strip() | |
| context_found = True | |
| # language hint & paraphrase (errors only when not overridden) | |
| language_hint = _detect_language_hint(input_data.user_message) | |
| lang_line = f"Respond in {language_hint}." if language_hint else "Respond in a clear, polite tone." | |
| use_gemini = (detected_intent == "errors") and not steps_override_applied | |
| enhanced_prompt = f"""You are a helpful support assistant. Rewrite the provided context ONLY into clear, user-friendly guidance. | |
| - Do not add any information that is not present in the context. | |
| - If the content is an error/access/permission note, paraphrase it into a helpful sentence users can understand. | |
| - {lang_line} | |
| ### Context | |
| {context} | |
| ### Question | |
| {input_data.user_message} | |
| ### Output | |
| Return ONLY the rewritten guidance.""" | |
| bot_text = "" | |
| http_code = 0 | |
| if use_gemini and GEMINI_API_KEY: | |
| try: | |
| payload = {"contents": [{"role": "user", "parts": [{"text": enhanced_prompt}]}]} | |
| resp = _gemini_post(payload, timeout=25) | |
| http_code = getattr(resp, "status_code", 0) | |
| try: | |
| result = resp.json() | |
| except Exception: | |
| result = {} | |
| try: | |
| bot_text = result.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "") | |
| except Exception: | |
| bot_text = "" | |
| except Exception: | |
| bot_text, http_code = "", 0 | |
| # deterministic local formatting | |
| if detected_intent == "steps": | |
| bot_text = context if context_preformatted else _ensure_numbering(context) | |
| elif detected_intent == "errors": | |
| if not (bot_text or "").strip() or http_code == 429: | |
| bot_text = context.strip() | |
| if escalation_line: | |
| bot_text = (bot_text or "").rstrip() + "\n\n" + escalation_line | |
| else: | |
| bot_text = context | |
| # explicit escalation add if user asked | |
| needs_escalation = (" escalate" in msg_norm) or ("escalation" in msg_norm) | |
| if needs_escalation and best_doc: | |
| esc_text = get_escalation_text(best_doc) or full_errors or "" | |
| line = _extract_escalation_line(esc_text) | |
| if line: | |
| bot_text = (bot_text or "").rstrip() + "\n\n" + line | |
| if not (bot_text or "").strip(): | |
| bot_text = context.strip() if (context or "").strip() else ( | |
| "I found some related guidance but couldn’t assemble a reply. " | |
| "Share more detail (module/screen/error), or say ‘create ticket’." | |
| ) | |
| short_query = len((input_data.user_message or "").split()) <= 4 | |
| gate_combined_ok = 0.60 if short_query else 0.55 | |
| status = "OK" if (best_combined is not None and best_combined >= gate_combined_ok) else "PARTIAL" | |
| lower = (bot_text or "").lower() | |
| if ("partial" in lower) or ("may be partial" in lower) or ("closest" in lower) or ("may not fully" in lower): | |
| status = "PARTIAL" | |
| options = [{"type": "yesno", "title": "Share details or raise a ticket?"}] if status == "PARTIAL" else [] | |
| try: | |
| base_query = (input_data.user_message or "").strip() | |
| if detected_intent == "steps": | |
| LAST_ISSUE_HINT = base_query[:100] | |
| elif detected_intent == "errors": | |
| shown_lines = [ln.strip() for ln in (bot_text or "").splitlines() if ln.strip()] | |
| top_error_line = "" | |
| for ln in shown_lines: | |
| if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln): | |
| if "escalation" in ln.lower(): | |
| continue | |
| top_error_line = ln | |
| break | |
| if top_error_line: | |
| top_error_line = re.split(r"\bif you want to escalate\b", top_error_line, flags=re.IGNORECASE)[0].strip() | |
| LAST_ISSUE_HINT = (f"{base_query} — {top_error_line}" if top_error_line else base_query)[:100] | |
| except Exception: | |
| pass | |
| return { | |
| "bot_response": bot_text, | |
| "status": status, | |
| "context_found": True, | |
| "ask_resolved": (status == "OK" and detected_intent != "errors"), | |
| "suggest_incident": (status == "PARTIAL"), | |
| "followup": (assist_followup if assist_followup else ("Is this helpful, or should I raise a ticket?" if status == "PARTIAL" else None)), | |
| "options": options, | |
| "debug": { | |
| "used_chunks": len((context or "").split("\n\n---\n\n")) if context else 0, | |
| "best_distance": best_distance, | |
| "best_combined": best_combined, | |
| "http_status": http_code, | |
| "filter_mode": filt_info.get("mode") if isinstance(filt_info, dict) else None, | |
| "matched_count": filt_info.get("matched_count") if isinstance(filt_info, dict) else None, | |
| "user_intent": detected_intent, | |
| "best_doc": best_doc, | |
| "next_step": {"applied": next_step_applied, "info": next_step_info}, | |
| }, | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=safe_str(e)) | |
| # ============================================================================= | |
| # Ticket description generation | |
| # ============================================================================= | |
| async def generate_ticket_desc_ep(input_data: TicketDescInput): | |
| try: | |
| prompt = ( | |
| f"You are helping generate ServiceNow ticket descriptions based on the issue: {input_data.issue}.\n" | |
| "Please return the output strictly in JSON format with the following keys:\n" | |
| "{\n" | |
| ' "ShortDescription": "A concise summary of the issue (max 100 characters)",\n' | |
| ' "DetailedDescription": "A detailed explanation of the issue"\n' | |
| "}\n" | |
| "Do not include any extra text, comments, or explanations outside the JSON." | |
| ) | |
| payload = {"contents": [{"role": "user", "parts": [{"text": prompt}]}]} | |
| resp = _gemini_post(payload, timeout=25) | |
| try: | |
| data = resp.json() | |
| except Exception: | |
| return {"ShortDescription": "", "DetailedDescription": "", "error": "Gemini returned non-JSON"} | |
| try: | |
| text = data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "").strip() | |
| except Exception: | |
| return {"ShortDescription": "", "DetailedDescription": "", "error": "Gemini parsing failed"} | |
| if text.startswith("```"): | |
| lines = [ln for ln in text.splitlines() if not ln.strip().startswith("```")] | |
| text = "\n".join(lines).strip() | |
| try: | |
| ticket_json = json.loads(text) | |
| return { | |
| "ShortDescription": ticket_json.get("ShortDescription", "").strip(), | |
| "DetailedDescription": ticket_json.get("DetailedDescription", "").strip(), | |
| } | |
| except Exception: | |
| return {"ShortDescription": "", "DetailedDescription": "", "error": "Invalid JSON returned"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=safe_str(e)) | |
| # ============================================================================= | |
| # Incident status | |
| # ============================================================================= | |
| async def incident_status(input_data: TicketStatusInput): | |
| try: | |
| token = get_valid_token() | |
| instance_url = os.getenv("SERVICENOW_INSTANCE_URL") | |
| if not instance_url: | |
| raise HTTPException(status_code=500, detail="SERVICENOW_INSTANCE_URL missing") | |
| headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} | |
| if input_data.sys_id: | |
| url = f"{instance_url}/api/now/table/incident/{input_data.sys_id}" | |
| response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25) | |
| data = response.json() | |
| result = data.get("result", {}) if response.status_code == 200 else {} | |
| elif input_data.number: | |
| url = f"{instance_url}/api/now/table/incident?number={input_data.number}" | |
| response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25) | |
| data = response.json() | |
| lst = data.get("result", []) | |
| result = (lst or [{}])[0] if response.status_code == 200 else {} | |
| else: | |
| raise HTTPException(status_code=400, detail="Provide IncidentID (number) or sys_id") | |
| state_code = builtins.str(result.get("state", "unknown")) | |
| state_label = STATE_MAP.get(state_code, state_code) | |
| short = result.get("short_description", "") | |
| number = result.get("number", input_data.number or "unknown") | |
| return { | |
| "bot_response": ( | |
| f"**Ticket:** {number} \n" | |
| f"**Status:** {state_label} \n" | |
| f"**Issue description:** {short}" | |
| ).replace("\n", " \n"), | |
| "followup": "Is there anything else I can assist you with?", | |
| "show_assist_card": True, | |
| "persist": True, | |
| "debug": "Incident status fetched", | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=safe_str(e)) | |
| # ============================================================================= | |
| # Incident creation | |
| # ============================================================================= | |
| def _classify_resolution_llm(user_message: str) -> bool: | |
| if not GEMINI_API_KEY and "key=" not in GEMINI_URL: | |
| return False | |
| prompt = f"""Classify if the following user message indicates that the issue is resolved or working now. | |
| Return only 'true' or 'false'. | |
| Message: {user_message}""" | |
| payload = {"contents": [{"role": "user", "parts": [{"text": prompt}]}]} | |
| try: | |
| resp = _gemini_post(payload, timeout=12) | |
| data = resp.json() | |
| text = (data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "") or "") | |
| return "true" in text.strip().lower() | |
| except Exception: | |
| return False | |
| def _set_incident_resolved(sys_id: str) -> bool: | |
| try: | |
| token = get_valid_token() | |
| instance_url = os.getenv("SERVICENOW_INSTANCE_URL") | |
| if not instance_url: | |
| print("[SN PATCH resolve] missing SERVICENOW_INSTANCE_URL") | |
| return False | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Accept": "application/json", | |
| "Content-Type": "application/json", | |
| } | |
| url = f"{instance_url}/api/now/table/incident/{sys_id}" | |
| close_code_val = os.getenv("SERVICENOW_CLOSE_CODE", "Solution provided") | |
| close_notes_val = os.getenv("SERVICENOW_RESOLUTION_NOTES", "Issue resolved, user confirmed") | |
| caller_sysid = os.getenv("SERVICENOW_CALLER_SYSID") | |
| resolved_by_sysid = os.getenv("SERVICENOW_RESOLVED_BY_SYSID") | |
| assign_group = os.getenv("SERVICENOW_ASSIGNMENT_GROUP_SYSID") | |
| require_progress = os.getenv("SERVICENOW_REQUIRE_IN_PROGRESS_FIRST", "false").lower() in ("1", "true", "yes") | |
| if require_progress: | |
| try: | |
| resp1 = requests.patch(url, headers=headers, json={"state": "2"}, verify=VERIFY_SSL, timeout=25) | |
| print(f"[SN PATCH progress] status={resp1.status_code} body={resp1.text[:500]}") | |
| except Exception as e: | |
| print(f"[SN PATCH progress] exception={safe_str(e)}") | |
| def clean(d: dict) -> dict: | |
| return {k: v for k, v in d.items() if v is not None} | |
| payload_A = clean({ | |
| "state": "6", | |
| "close_code": close_code_val, | |
| "close_notes": close_notes_val, | |
| "caller_id": caller_sysid, | |
| "resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), | |
| "work_notes": "Auto-resolve set by NOVA.", | |
| "resolved_by": resolved_by_sysid, | |
| "assignment_group": assign_group, | |
| }) | |
| respA = requests.patch(url, headers=headers, json=payload_A, verify=VERIFY_SSL, timeout=25) | |
| if respA.status_code in (200, 204): | |
| return True | |
| print(f"[SN PATCH resolve A] status={respA.status_code} body={respA.text[:500]}") | |
| payload_B = clean({ | |
| "state": "Resolved", | |
| "close_code": close_code_val, | |
| "close_notes": close_notes_val, | |
| "caller_id": caller_sysid, | |
| "resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), | |
| "work_notes": "Auto-resolve set by NOVA.", | |
| "resolved_by": resolved_by_sysid, | |
| "assignment_group": assign_group, | |
| }) | |
| respB = requests.patch(url, headers=headers, json=payload_B, verify=VERIFY_SSL, timeout=25) | |
| if respB.status_code in (200, 204): | |
| return True | |
| print(f"[SN PATCH resolve B] status={respB.status_code} body={respB.text[:500]}") | |
| code_field = os.getenv("SERVICENOW_RESOLUTION_CODE_FIELD", "close_code") | |
| notes_field = os.getenv("SERVICENOW_RESOLUTION_NOTES_FIELD", "close_notes") | |
| payload_C = clean({ | |
| "state": "6", | |
| code_field: close_notes_val, | |
| notes_field: close_notes_val, | |
| "caller_id": caller_sysid, | |
| "resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), | |
| "work_notes": "Auto-resolve set by NOVA.", | |
| "resolved_by": resolved_by_sysid, | |
| "assignment_group": assign_group, | |
| }) | |
| respC = requests.patch(url, headers=headers, json=payload_C, verify=VERIFY_SSL, timeout=25) | |
| if respC.status_code in (200, 204): | |
| return True | |
| print(f"[SN PATCH resolve C] status={respC.status_code} body={respC.text[:500]}") | |
| return False | |
| except Exception as e: | |
| print(f"[SN PATCH resolve] exception={safe_str(e)}") | |
| return False | |
| # ============================================================================= | |
| # Helper functions used by KB logic | |
| # ============================================================================= | |
| NUMBERING_STYLE = os.getenv("NUMBERING_STYLE", "digit").lower() # 'digit' or 'step' | |
| DOMAIN_STATUS_TERMS = ( | |
| "shipment", "order", "load", "trailer", "wave", | |
| "inventory", "putaway", "receiving", "appointment", | |
| "dock", "door", "manifest", "pallet", "container", | |
| "asn", "grn", "pick", "picking", | |
| ) | |
| ERROR_FAMILY_SYNS = { | |
| "NOT_FOUND": ("not found", "missing", "does not exist", "doesn't exist", "unavailable", "not available", "cannot find", "no such", "not present", "absent"), | |
| "MISMATCH": ("mismatch", "doesn't match", "does not match", "variance", "difference", "discrepancy", "not equal"), | |
| "LOCKED": ("locked", "status locked", "blocked", "read only", "read-only", "frozen", "freeze"), | |
| "PERMISSION": ("permission", "permissions", "access denied", "not authorized", "not authorised", "insufficient privileges", "no access", "authorization", "authorisation"), | |
| "TIMEOUT": ("timeout", "timed out", "network", "connection", "unable to connect", "disconnected", "no network"), | |
| "SYNC": ("sync", "synchronization", "synchronisation", "replication", "refresh", "out of sync", "stale", "delay", "lag"), | |
| } | |
| def _detect_error_families(msg: str) -> list: | |
| low = (msg or "").lower() | |
| low_norm = re.sub(r"[^\w\s]", " ", low) | |
| low_norm = re.sub(r"\s+", " ", low_norm).strip() | |
| fams = [] | |
| for fam, syns in ERROR_FAMILY_SYNS.items(): | |
| if any(s in low_norm for s in syns): | |
| fams.append(fam) | |
| return fams | |
| def _is_domain_status_context(msg_norm: str) -> bool: | |
| if "status locked" in msg_norm or "locked status" in msg_norm: | |
| return True | |
| return any(term in msg_norm for term in DOMAIN_STATUS_TERMS) | |
| def _normalize_lines(text: str) -> List[str]: | |
| raw = (text or "") | |
| try: | |
| return [ln.strip() for ln in raw.splitlines() if ln.strip()] | |
| except Exception: | |
| return [raw.strip()] if raw.strip() else [] | |
| def _filter_context_for_query(context: str, query: str) -> Tuple[str, Dict[str, Any]]: | |
| STRICT_OVERLAP = 3 | |
| MAX_SENTENCES_STRICT = 4 | |
| MAX_SENTENCES_CONCISE = 3 | |
| def _norm(text: str) -> str: | |
| t = (text or "").lower() | |
| t = re.sub(r"[^\w\s]", " ", t) | |
| t = re.sub(r"\s+", " ", t).strip() | |
| return t | |
| def _split_sents(ctx: str) -> List[str]: | |
| raw_sents = re.split(r"(?<=\w[.!?])\s+", ctx or "") | |
| return [s.strip() for s in raw_sents if s and len(s.strip()) > 2] | |
| ctx = (context or "").strip() | |
| if not ctx or not query: | |
| return ctx, {'mode': 'concise', 'matched_count': 0, 'all_sentences': 0} | |
| q_norm = _norm(query) | |
| q_terms = [t for t in q_norm.split() if len(t) > 2] | |
| if not q_terms: | |
| return ctx, {'mode': 'concise', 'matched_count': 0, 'all_sentences': 0} | |
| sentences = _split_sents(ctx) | |
| matched_exact, matched_any = [], [] | |
| for s in sentences: | |
| s_norm = _norm(s) | |
| is_bullet = bool(re.match(r"^[-*\u2022]\s*", s)) | |
| overlap = sum(1 for t in q_terms if t in s_norm) + (1 if is_bullet else 0) | |
| if overlap >= STRICT_OVERLAP: | |
| matched_exact.append(s) | |
| elif overlap > 0: | |
| matched_any.append(s) | |
| if matched_exact: | |
| kept = matched_exact[:MAX_SENTENCES_STRICT] | |
| return _dedupe_lines("\n".join(kept).strip()), { | |
| 'mode': 'exact', 'matched_count': len(kept), 'all_sentences': len(sentences), | |
| } | |
| if matched_any: | |
| kept = matched_any[:MAX_SENTENCES_CONCISE] | |
| return _dedupe_lines("\n".join(kept).strip()), { | |
| 'mode': 'concise', 'matched_count': len(kept), 'all_sentences': len(sentences), | |
| } | |
| kept = sentences[:MAX_SENTENCES_CONCISE] | |
| return _dedupe_lines("\n".join(kept).strip()), { | |
| 'mode': 'concise', 'matched_count': 0, 'all_sentences': len(sentences), | |
| } | |
| def _extract_errors_only(text: str, max_lines: int = 12) -> str: | |
| kept: List[str] = [] | |
| for ln in _normalize_lines(text): | |
| if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln): | |
| kept.append(ln) | |
| if len(kept) >= max_lines: | |
| break | |
| return "\n".join(kept).strip() if kept else (text or "").strip() | |
| def _filter_permission_lines(text: str, max_lines: int = 6) -> str: | |
| PERM_SYNONYMS = ( | |
| "permission", "permissions", "access", "authorization", "authorisation", | |
| "role", "role mapping", "security profile", "not allowed", "not authorized", "denied", "insufficient", | |
| ) | |
| kept: List[str] = [] | |
| for ln in _normalize_lines(text): | |
| low = ln.lower() | |
| if any(k in low for k in PERM_SYNONYMS): | |
| kept.append(ln) | |
| if len(kept) >= max_lines: | |
| break | |
| return "\n".join(kept).strip() if kept else (text or "").strip() | |
| def _extract_escalation_line(text: str) -> Optional[str]: | |
| if not text: | |
| return None | |
| lines = _normalize_lines(text) | |
| if not lines: | |
| return None | |
| start_idx = None | |
| for i, ln in enumerate(lines): | |
| low = ln.lower() | |
| if "escalation" in low or "escalation path" in low or "escalate" in low: | |
| start_idx = i | |
| break | |
| block: List[str] = [] | |
| if start_idx is not None: | |
| for j in range(start_idx, min(len(lines), start_idx + 6)): | |
| if not lines[j].strip(): | |
| break | |
| block.append(lines[j].strip()) | |
| else: | |
| block = [ln.strip() for ln in lines if ("->" in ln or "→" in ln)] | |
| if not block: | |
| return None | |
| text_block = " ".join(block) | |
| m = re.search(r"escalation[^:]*:\s*(.+)", text_block, flags=re.IGNORECASE) | |
| path = m.group(1).strip() if m else None | |
| if not path: | |
| arrow_lines = [ln for ln in block if ("->" in ln or "→" in ln)] | |
| if arrow_lines: | |
| path = arrow_lines[0] | |
| if not path: | |
| m2 = re.search(r"(operator.*?administrator.*operator.*)", text_block, flags=re.IGNORECASE) | |
| path = m2.group(1).strip() if m2 else None | |
| if not path: | |
| return None | |
| path = path.replace("->", "→").strip() | |
| path = re.sub(r"^(?i:escalation\s*path)\s*:\s*", "", path).strip() | |
| return f"If you want to escalate the issue, follow: {path}" | |
| def _detect_language_hint(msg: str) -> Optional[str]: | |
| if re.search(r"[\u0B80-\u0BFF]", msg or ""): # Tamil | |
| return "Tamil" | |
| if re.search(r"[\u0900-\u097F]", msg or ""): # Hindi | |
| return "Hindi" | |
| return None | |
| def _build_clarifying_message() -> str: | |
| return ( | |
| "It seems the issue isn’t resolved yet. Would you like to share a few details so I can check further, " | |
| "or should I raise a ServiceNow ticket for you?" | |
| ) | |
| def _build_tracking_descriptions(issue_text: str, resolved_text: str) -> Tuple[str, str]: | |
| issue = (issue_text or "").strip() | |
| resolved = (resolved_text or "").strip() | |
| is_process_query = bool(re.search(r"\b(how to|steps|procedure|process)\b", issue.lower())) | |
| if is_process_query: | |
| cleaned_issue = re.sub(r"^\s*(process\s*steps\s*[-–:]?\s*)", "", issue, flags=re.IGNORECASE).strip() | |
| short_desc = f"Process Steps – {cleaned_issue or issue}"[:100] | |
| else: | |
| short_desc = issue[:100] | |
| long_desc = ( | |
| f'User reported: "{issue}". ' | |
| f'User confirmation: "{resolved}". ' | |
| f"Tracking record created automatically by NOVA." | |
| ).strip() | |
| return short_desc, long_desc | |
| def _is_incident_intent(msg_norm: str) -> bool: | |
| intent_phrases = [ | |
| "create ticket", "create a ticket", "raise ticket", "raise a ticket", "open ticket", "open a ticket", | |
| "create incident", "create an incident", "raise incident", "raise an incident", "open incident", "open an incident", | |
| "log ticket", "log an incident", "generate ticket", "create snow ticket", "raise snow ticket", | |
| "raise service now ticket", "create service now ticket", "raise sr", "open sr", | |
| ] | |
| return any(p in msg_norm for p in intent_phrases) | |
| def _parse_ticket_status_intent(msg_norm: str) -> Dict[str, Optional[str]]: | |
| status_keywords = ["status", "ticket status", "incident status", "check status", "check ticket status", "check incident status"] | |
| base_has_status = any(k in msg_norm for k in status_keywords) | |
| has_ticket_marker = ( | |
| any(w in msg_norm for w in ("ticket", "incident", "servicenow", "snow")) | |
| or bool(re.search(r"\binc\d{5,}\b", msg_norm, flags=re.IGNORECASE)) | |
| ) | |
| if (not base_has_status) or (base_has_status and not has_ticket_marker and _is_domain_status_context(msg_norm)): | |
| return {} | |
| patterns = [r"(?:incident\s*id|incidentid|ticket\s*number|number)\s*[:=]?\s*(inc\d+)", r"(inc\d+)"] | |
| for pat in patterns: | |
| m = re.search(pat, msg_norm, flags=re.IGNORECASE) | |
| if m: | |
| val = m.group(1).strip() | |
| if val: | |
| return {"number": val.upper() if val.lower().startswith("inc") else val} | |
| return {"number": None, "ask_number": True} | |
| def _is_resolution_ack_heuristic(msg_norm: str) -> bool: | |
| phrases = [ | |
| "it is resolved", "resolved", "issue resolved", "problem resolved", | |
| "it's working", "working now", "works now", "fixed", "sorted", | |
| "ok now", "fine now", "all good", "all set", "thanks works", "thank you it works", "back to normal", | |
| ] | |
| return any(p in msg_norm for p in phrases) | |
| def _has_negation_resolved(msg_norm: str) -> bool: | |
| neg_phrases = [ | |
| "not resolved", "issue not resolved", "still not working", "not working", | |
| "didn't work", "doesn't work", "no change", "not fixed", "still failing", "failed again", "broken", "fail", | |
| ] | |
| return any(p in msg_norm for p in neg_phrases) | |
| def _ensure_numbering(text: str) -> str: | |
| text = re.sub(r"[\u2060\u200B]", "", text or "") | |
| lines = [ln.strip() for ln in (text or "").splitlines() if ln and ln.strip()] | |
| if not lines: | |
| return text or "" | |
| para = " ".join(lines).strip() | |
| if not para: | |
| return "" | |
| para_clean = re.sub(r"(?:\b\d+\s*[\.\)])\s+", "\n\n\n", para) | |
| para_clean = re.sub(r"(?:[\u2460-\u2473]\s+)", "\n\n\n", para_clean) | |
| para_clean = re.sub(r"(?i)\bstep\s*\d+\s*:?\s*", "\n\n\n", para_clean) | |
| segments = [seg.strip() for seg in para_clean.split("\n\n\n") if seg.strip()] | |
| if len(segments) < 2: | |
| tmp = [ln.strip() for ln in para.splitlines() if ln.strip()] | |
| segments = tmp if len(tmp) > 1 else [seg.strip() for seg in re.split(r"(?<=\w[.!?])\s+", para) if seg.strip()] | |
| def strip_prefix_any(s: str) -> str: | |
| return re.sub( | |
| r"^\s*(?:" r"(?:\d+\s*[\.\)])|" r"(?i:step\s*\d+:?)|" r"(?:[-*\u2022])|" r"(?:[\u2460-\u2473])" r")\s*", | |
| "", (s or "").strip(), | |
| ) | |
| clean_segments = [strip_prefix_any(seg) for seg in segments if seg.strip()] | |
| circled = {i: chr(9311 + i) for i in range(1, 21)} | |
| out = [] | |
| for idx, seg in enumerate(clean_segments, start=1): | |
| marker = circled.get(idx, f"{idx})") | |
| out.append(f"{marker} {seg}") | |
| return "\n".join(out) | |
| def _dedupe_lines(text: str) -> str: | |
| seen, out = set(), [] | |
| for ln in (text or "").splitlines(): | |
| key = re.sub(r"\s+", " ", (ln or "").strip().lower()) | |
| if key and key not in seen: | |
| out.append(ln) | |
| seen.add(key) | |
| return "\n".join(out).strip() | |
| def _split_sentences(block: str) -> list: | |
| parts = [t.strip() for t in re.split(r"(?<=\w[.!?])\s+", block or "") if t.strip()] | |
| return parts if parts else ([block.strip()] if (block or "").strip() else []) | |
| def _similarity(a: str, b: str) -> float: | |
| def _norm_text(s: str) -> str: | |
| s = (s or "").lower() | |
| s = re.sub(r"[^\w\s]", " ", s) | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| a_norm, b_norm = _norm_text(a), _norm_text(b) | |
| ta, tb = set(a_norm.split()), set(b_norm.split()) | |
| inter = len(ta & tb) | |
| union = len(ta | tb) or 1 | |
| jacc = inter / union | |
| def _bigrams(tokens: list) -> set: | |
| return set([" ".join(tokens[i:i + 2]) for i in range(len(tokens) - 1)]) if len(tokens) > 1 else set() | |
| ab, bb = _bigrams(a_norm.split()), _bigrams(b_norm.split()) | |
| big_inter = len(ab & bb) | |
| big_union = len(ab | bb) or 1 | |
| big = big_inter / big_union | |
| char = SequenceMatcher(None, a_norm, b_norm).ratio() | |
| return min(1.0, 0.45 * jacc + 0.30 * big + 0.35 * char) | |
| def _extract_anchor_from_query(msg: str) -> dict: | |
| raw = (msg or "").strip() | |
| low = re.sub(r"[^\w\s]", " ", raw.lower()).strip() | |
| FOLLOWUP_CUES = ("what next", "what is next", "what to do", "then", "after that", "next") | |
| has_followup = any(cue in low for cue in FOLLOWUP_CUES) | |
| parts = [p.strip() for p in re.split(r"[?.,;:\-\n]+", raw) if p.strip()] | |
| if not parts: | |
| return {"anchor": raw, "has_followup": has_followup} | |
| last = parts[-1] | |
| last_low = re.sub(r"[^\w\s]", " ", last.lower()).strip() | |
| if any(cue in last_low for cue in FOLLOWUP_CUES) and len(parts) >= 2: | |
| anchor = parts[-2] | |
| else: | |
| anchor = parts[-1] if len(parts) > 1 else parts[0] | |
| return {"anchor": anchor.strip(), "has_followup": has_followup} | |
| def _anchor_next_steps(user_message: str, numbered_text: str, max_next: int = 8) -> list | None: | |
| steps = [ln.strip() for ln in (numbered_text or "").splitlines() if ln.strip()] | |
| if not steps: | |
| return None | |
| info = _extract_anchor_from_query(user_message) | |
| anchor = info.get("anchor", "").strip() | |
| if not anchor: | |
| return None | |
| has_followup = bool(info.get("has_followup")) | |
| candidates = [] | |
| for idx, step_line in enumerate(steps): | |
| s_full = _similarity(anchor, step_line) | |
| literal_hit = False | |
| scores = [s_full] | |
| for s in _split_sentences(step_line): | |
| scores.append(_similarity(anchor, s)) | |
| a_flat = re.sub(r"\W+", "", re.sub(r"[^\w\s]", " ", anchor.lower())) | |
| s_flat = re.sub(r"\W+", "", re.sub(r"[^\w\s]", " ", s.lower())) | |
| if a_flat and (a_flat in s_flat or s_flat in a_flat): | |
| literal_hit = True | |
| score = max(scores) | |
| candidates.append((idx, score, literal_hit)) | |
| candidates.sort(key=lambda t: (t[1], t[0]), reverse=True) | |
| best_idx, best_score, best_literal = candidates[0] | |
| tok_count = len([t for t in re.sub(r"[^\w\s]", " ", anchor.lower()).split() if len(t) > 1]) | |
| if best_literal: | |
| accept = True | |
| else: | |
| base_ok = best_score >= (0.55 if not has_followup else 0.50) | |
| len_ok = (best_score >= 0.40) and (tok_count >= 3) | |
| accept = base_ok or len_ok | |
| if not accept: | |
| return None | |
| start = best_idx + 1 | |
| if start >= len(steps): | |
| return [] | |
| end = min(start + max_next, len(steps)) | |
| next_steps = steps[start:end] | |
| return [ln for ln in _dedupe_lines("\n".join(next_steps)).splitlines() if ln.strip()] | |