# main.py — Hugging Face backend (drop-in) import os import json import re import requests import builtins from typing import Optional, Any, Dict, List, Tuple from contextlib import asynccontextmanager from datetime import datetime from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from dotenv import load_dotenv from difflib import SequenceMatcher # --- WMS API integration imports --- # NOTE: these must exist in your repo (you shared them earlier) from public_api.public_api_auth import login from public_api.public_api_item import get_item from public_api.public_api_inventory import get_inventory_holds from public_api.public_api_location import get_location_status from public_api.utils import flatten_json, extract_fields from public_api.field_mapping import TOOL_REGISTRY, ALL_TOOLS # KB services (keep your current modules) from services.kb_creation import ( collection, ingest_documents, hybrid_search_knowledge_base, get_section_text, get_best_steps_section_text, get_best_errors_section_text, get_escalation_text, ) # ServiceNow helpers (keep your current modules) from services.login import router as login_router from services.generate_ticket import get_valid_token, create_incident # ============================================================================= # Environment / Gemini config # ============================================================================= load_dotenv() VERIFY_SSL = os.getenv("SERVICENOW_SSL_VERIFY", "true").lower() in ("1", "true", "yes") GEMINI_SSL_VERIFY = os.getenv("GEMINI_SSL_VERIFY", "true").lower() in ("1", "true", "yes") GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") # Safer header-based auth; model can be flash or flash-lite. # If you prefer your friend's exact style, set GEMINI_URL_WITH_KEY in secrets to override. GEMINI_URL = os.getenv( "GEMINI_URL_WITH_KEY", "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent" ) os.environ["POSTHOG_DISABLED"] = "true" # --- API WMS session cache --- _WMS_SESSION_DATA: Optional[dict] = None _WMS_WAREHOUSE_ID: Optional[str] = None def _gemini_post(payload: dict, timeout: int = 25): """ Central helper to call Gemini. Supports either header API key or URL ?key=... """ headers = {"Content-Type": "application/json"} # If URL doesn't contain ?key=..., send header if "key=" not in GEMINI_URL: if not GEMINI_API_KEY: # return a mock-ish response object for graceful handling return type("Resp", (), { "status_code": 401, "json": lambda: {"error": {"message": "Missing GEMINI_API_KEY"}}, "text": "Missing GEMINI_API_KEY", "raise_for_status": lambda: None, })() headers["x-goog-api-key"] = GEMINI_API_KEY return requests.post(GEMINI_URL, headers=headers, json=payload, timeout=timeout, verify=GEMINI_SSL_VERIFY) def _ensure_wms_session() -> bool: """Login once to WMS and cache session_data + default warehouse.""" global _WMS_SESSION_DATA, _WMS_WAREHOUSE_ID if _WMS_SESSION_DATA and _WMS_WAREHOUSE_ID: return True try: session, warehouse_id, ok = login() if ok and session: _WMS_SESSION_DATA = {"wms_auth": session, "user_warehouse_id": warehouse_id} _WMS_WAREHOUSE_ID = warehouse_id return True print("[WMS] login unsuccessful: ok=", ok, "warehouse_id=", warehouse_id) except Exception as e: print("[WMS] login failed:", e) return False # ============================================================================= # Minimal server-side cache (used to populate short description later) # ============================================================================= LAST_ISSUE_HINT: str = "" def safe_str(e: Any) -> str: try: return builtins.str(e) except Exception: return "" # ============================================================================= # App / Lifespan # ============================================================================= @asynccontextmanager async def lifespan(app: FastAPI): try: folder_path = os.path.join(os.getcwd(), "documents") if collection.count() == 0: print("[KB] empty. Running ingestion...") ingest_documents(folder_path) else: print(f"[KB] already populated with {collection.count()} entries. Skipping ingestion.") except Exception as e: print(f"[KB] ingestion failed: {safe_str(e)}") yield app = FastAPI(lifespan=lifespan) app.include_router(login_router) # CORS — add your Hugging Face frontend Space URL here origins = [ "https://chatbotnova-chatbot-frontend.hf.space", # "http://localhost:5173", ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ============================================================================= # Models (align to your frontend) # ============================================================================= class ChatInput(BaseModel): user_message: str prev_status: Optional[str] = None last_issue: Optional[str] = None class IncidentInput(BaseModel): short_description: str description: str mark_resolved: Optional[bool] = False class TicketDescInput(BaseModel): issue: str class TicketStatusInput(BaseModel): sys_id: Optional[str] = None number: Optional[str] = None STATE_MAP = { "1": "New", "2": "In Progress", "3": "On Hold", "6": "Resolved", "7": "Closed", "8": "Canceled", } # ============================================================================= # WMS Tool Orchestration (fast-path + Gemini function-calling) # ============================================================================= def _wms_fastpath(user_text: str) -> Optional[dict]: """ Deterministic regex-based path for Item/Location/Holds. Returns a LIVE_API table dict if matched; else None. """ if not _ensure_wms_session(): return { "bot_response": "⚠️ Could not login to WMS from this environment. Please try again or raise a ticket.", "status": "PARTIAL", "context_found": False, "ask_resolved": False, "suggest_incident": True, "followup": "Shall I raise an incident now?", "source": "ERROR", "debug": {"intent": "wms_login_failed_fastpath"}, } session_data = _WMS_SESSION_DATA default_wh = _WMS_WAREHOUSE_ID text = (user_text or "").strip() # Item: "Get item number 100001 ..." / "Show item 100001 ..." m_item = re.search(r"\bitem\s+(?:number|id)?\s*[: ]?([A-Za-z0-9\-]+)", text, flags=re.IGNORECASE) if m_item: item_num = m_item.group(1) try: raw = get_item(session_data, default_wh, item_num) if isinstance(raw, dict) and raw.get("error"): return { "bot_response": f"⚠️ {raw['error']}", "status": "PARTIAL", "context_found": False, "suggest_incident": True, "followup": "Should I raise a ServiceNow ticket?", "source": "ERROR", "debug": {"intent": "get_item_fastpath_error"}, } data_list = raw.get("data", []) or [] if not data_list: return { "bot_response": "I searched WMS but couldn't find matching records.", "status": "PARTIAL", "context_found": False, "suggest_incident": True, "followup": "Do you want me to raise a ticket or try a different ID?", "source": "LIVE_API", "debug": {"intent": "get_item_fastpath_empty"}, } cfg = TOOL_REGISTRY["get_item_data"] flat = flatten_json(data_list[0]) requested_labels = [] # Try mapping labels automatically if the user lists fields labels = set(cfg["mapping"].keys()) for lbl in labels: if re.search(rf"\b{re.escape(lbl)}\b", text, flags=re.IGNORECASE): requested_labels.append(lbl) if requested_labels: filtered_map = {k: v for k, v in cfg["mapping"].items() if k in requested_labels or k == "Item"} order = ["Item"] + [f for f in requested_labels if f != "Item"] else: summary_keys = ["Item", "Description", "Item type", "Warehouse ID"] filtered_map = {k: v for k, v in cfg["mapping"].items() if k in summary_keys} order = summary_keys cleaned = extract_fields(flat, filtered_map, cfg.get("formatters", {}), requested_order=order) lines = [f"{k}: {cleaned.get(k, 'N/A')}" for k in order] msg = "Details:\n" + "\n".join(lines) return { "bot_response": msg, "status": "OK", "context_found": True, "source": "LIVE_API", "type": "table", "data": [data_list[0]], "show_export": False, "debug": {"intent": "get_item_fastpath", "warehouse_id": default_wh}, } except Exception as e: print("[WMS fastpath:item] error:", e) # Location: "Check location A1-01-01" m_loc = re.search(r"\b(?:check|status|loc(?:ation)?)\s+(?:bin\s+)?([A-Za-z0-9\-]+)", text, flags=re.IGNORECASE) if m_loc: stoloc = m_loc.group(1) try: raw = get_location_status(session_data, default_wh, stoloc) if isinstance(raw, dict) and raw.get("error"): return { "bot_response": f"⚠️ {raw['error']}", "status": "PARTIAL", "context_found": False, "suggest_incident": True, "followup": "Should I raise a ServiceNow ticket?", "source": "ERROR", "debug": {"intent": "get_location_fastpath_error"}, } data_list = raw.get("data", []) or [] cfg = TOOL_REGISTRY["get_location_status"] rows = [] for item in data_list: rows.append(extract_fields(flatten_json(item), cfg["mapping"], cfg.get("formatters", {}))) msg = f"Location '{stoloc}' status:\n" + (("\n" + "; ".join([f"{k}: {v}" for k, v in rows[0].items()])) if rows else "No status returned") return { "bot_response": msg, "status": "OK", "context_found": True, "source": "LIVE_API", "type": "table", "data": data_list, "show_export": False, "debug": {"intent": "get_location_fastpath", "warehouse_id": default_wh}, } except Exception as e: print("[WMS fastpath:location] error:", e) # Holds: "Show holds for LPN 123456" / "Show all inventory holds" m_lpn = re.search(r"\b(?:lpn|lodnum)\s*[: ]?([A-Za-z0-9\-]+)", text, flags=re.IGNORECASE) want_all_holds = re.search(r"\b(all\s+inventory\s+holds|list\s+holds|show\s+holds)\b", text, flags=re.IGNORECASE) if m_lpn or want_all_holds: kwargs = {} if m_lpn: kwargs["lodnum"] = m_lpn.group(1) try: raw = get_inventory_holds(session_data, default_wh, **kwargs) if isinstance(raw, dict) and raw.get("error"): return { "bot_response": f"⚠️ {raw['error']}", "status": "PARTIAL", "context_found": False, "suggest_incident": True, "followup": "Should I raise a ServiceNow ticket?", "source": "ERROR", "debug": {"intent": "get_holds_fastpath_error"}, } data_list = raw.get("data", []) or [] if not data_list: return { "bot_response": "I searched WMS but couldn't find matching records.", "status": "PARTIAL", "context_found": False, "suggest_incident": True, "followup": "Do you want me to raise a ticket or try a different ID?", "source": "LIVE_API", "debug": {"intent": "get_holds_fastpath_empty"}, } cfg = TOOL_REGISTRY["get_inventory_holds"] rows_for_text, total_qty = [], 0 for item in data_list: row = extract_fields(flatten_json(item), cfg["mapping"], cfg.get("formatters", {})) rows_for_text.append("• " + "; ".join(f"{k}: {v}" for k, v in row.items())) try: total_qty += int(item.get("untqty", 0) or 0) except Exception: pass msg = f"Found {len(rows_for_text)} hold records (Total Qty: {total_qty}).\n" + "\n".join(rows_for_text[:10]) return { "bot_response": msg, "status": "OK", "context_found": True, "source": "LIVE_API", "type": "table", "data": data_list, "show_export": True, "debug": {"intent": "get_holds_fastpath", "warehouse_id": default_wh}, } except Exception as e: print("[WMS fastpath:holds] error:", e) return None # no fast-path match def _try_wms_tool(user_text: str) -> Optional[dict]: """ Orchestrates WMS tool calls: 1) Fast-path (regex) 2) Gemini function-calling (fallback) """ # Fast-path first (robust on HF even if functionCall is missing) fast = _wms_fastpath(user_text) if fast is not None: return fast # Deep fallback: Gemini function calling (try both camelCase & snake_case tool keys) payload = { "contents": [{"role": "user", "parts": [{"text": user_text}]}], # We include both casings to maximize compatibility. "tools": [ {"functionDeclarations": ALL_TOOLS}, # REST canonical {"function_declarations": ALL_TOOLS}, # friend-style ], "toolConfig": {"functionCallingConfig": {"mode": "ANY"}}, # nudge model to call tools } try: resp = _gemini_post(payload, timeout=25) # If REST key mismatch, this ensures no crash try: resp.raise_for_status() except Exception: pass data = {} try: data = resp.json() except Exception: pass candidates = data.get("candidates", []) if not candidates: return None content = candidates[0].get("content", {}) parts = content.get("parts", []) part = parts[0] if parts else {} fn_call = part.get("functionCall") if not fn_call or "name" not in fn_call: return None if not _ensure_wms_session(): return { "bot_response": "⚠️ Could not login to WMS from this environment.", "status": "PARTIAL", "context_found": False, "suggest_incident": True, "followup": "Shall I raise an incident?", "source": "ERROR", "debug": {"intent": "wms_login_failed_functioncall"}, } session_data = _WMS_SESSION_DATA default_wh = _WMS_WAREHOUSE_ID tool_name = fn_call["name"] args = fn_call.get("args", {}) or {} cfg = TOOL_REGISTRY.get(tool_name) if not cfg: return None # Warehouse override wh = ( args.pop("warehouse_id", None) or args.pop("warehouseId", None) or args.pop("wh_id", None) or default_wh ) tool_fn = cfg.get("function") if not callable(tool_fn): return { "bot_response": f"⚠️ Tool '{tool_name}' is not callable. Check TOOL_REGISTRY.", "status": "PARTIAL", "context_found": False, "suggest_incident": True, "followup": "Should I raise a ServiceNow ticket?", "source": "ERROR", "debug": {"intent": "wms_config_error", "tool": tool_name}, } # Correct invocation by signature if tool_name == "get_inventory_holds": raw = tool_fn(session_data, wh, **args) elif tool_name == "get_location_status": stoloc = args.get("stoloc") raw = tool_fn(session_data, wh, stoloc) else: id_param = cfg.get("id_param") # "item_number" target = args.get(id_param) if id_param and target is None: simple_vals = [v for v in args.values() if isinstance(v, (str, int))] target = simple_vals[0] if simple_vals else None if not target: return { "bot_response": f"Please share the {id_param} (e.g., item number).", "status": "PARTIAL", "context_found": False, "suggest_incident": False, "followup": None, "source": "CLIENT", "debug": {"intent": "wms_missing_id_param", "tool": tool_name}, } extra_kwargs = {k: v for k, v in args.items() if k != id_param} raw = tool_fn(session_data, wh, target, **extra_kwargs) if isinstance(raw, dict) and raw.get("error"): return { "bot_response": f"⚠️ {raw['error']}", "status": "PARTIAL", "context_found": False, "suggest_incident": True, "followup": "Should I raise a ServiceNow ticket?", "source": "ERROR", "debug": {"intent": "wms_error", "tool": tool_name}, } response_key = cfg.get("response_key", "data") data_list = (raw.get(response_key, []) if isinstance(raw, dict) else []) or [] if not data_list: return { "bot_response": "I searched WMS but couldn't find matching records.", "status": "PARTIAL", "context_found": False, "suggest_incident": True, "followup": "Do you want me to raise a ticket or try a different ID?", "source": "LIVE_API", "debug": {"intent": "wms_empty", "tool": tool_name}, } # Table/summary formatting if tool_name in ("get_inventory_holds", "get_location_status"): rows_for_text, total_qty = [], 0 for item in data_list: row = extract_fields(flatten_json(item), cfg["mapping"], cfg.get("formatters", {})) rows_for_text.append("• " + "; ".join(f"{k}: {v}" for k, v in row.items())) if tool_name == "get_inventory_holds": try: total_qty += int(item.get("untqty", 0) or 0) except Exception: pass if tool_name == "get_inventory_holds": msg = f"Found {len(rows_for_text)} hold records (Total Qty: {total_qty}).\n" + "\n".join(rows_for_text[:10]) show_export = True else: target_loc = args.get("stoloc") or "" msg = f"Location '{target_loc}' status:\n" + ("\n".join(rows_for_text[:1]) if rows_for_text else "No status returned") show_export = False return { "bot_response": msg, "status": "OK", "context_found": True, "source": "LIVE_API", "type": "table", "data": data_list, "show_export": show_export, "debug": {"intent": tool_name, "warehouse_id": wh}, } # Item: summary + one-row table flat = flatten_json(data_list[0]) requested = args.get("fields", []) if requested: filtered_map = {k: v for k, v in cfg["mapping"].items() if k in requested or k == "Item"} order = ["Item"] + [f for f in requested if f != "Item"] else: summary_keys = ["Item", "Description", "Item type", "Warehouse ID"] filtered_map = {k: v for k, v in cfg["mapping"].items() if k in summary_keys} order = summary_keys cleaned = extract_fields(flat, filtered_map, cfg.get("formatters", {}), requested_order=order) lines = [f"{k}: {cleaned.get(k, 'N/A')}" for k in order] msg = "Details:\n" + "\n".join(lines) return { "bot_response": msg, "status": "OK", "context_found": True, "source": "LIVE_API", "type": "table", "data": [data_list[0]], "show_export": False, "debug": {"intent": "get_item_data", "warehouse_id": wh}, } except Exception as e: print("[WMS] tool call error:", e) return None # ============================================================================= # Health # ============================================================================= @app.get("/") async def health_check(): return {"status": "ok"} # ============================================================================= # Chat (aligns to your frontend) # ============================================================================= @app.post("/chat") async def chat_with_ai(input_data: ChatInput): global LAST_ISSUE_HINT assist_followup: Optional[str] = None try: msg_norm = (input_data.user_message or "").lower().strip() # yes/no handlers (keep simple UX) if msg_norm in ("yes", "y", "sure", "ok", "okay"): return { "bot_response": "Great! Tell me what you'd like to do next — check another ticket, create an incident, or describe your issue.", "status": "OK", "followup": "You can say: 'create ticket', 'incident status INC0012345', or describe your problem.", "options": [], "debug": {"intent": "continue_conversation"}, } if msg_norm in ("no", "no thanks", "nope"): return { "bot_response": "No problem. Do you need assistance with any other issue?", "status": "OK", "end_chat": False, "followup": None, "options": [], "debug": {"intent": "end_conversation"}, } # Resolution ack (optional auto incident) is_llm_resolved = _classify_resolution_llm(input_data.user_message) if _has_negation_resolved(msg_norm): is_llm_resolved = False if (not _has_negation_resolved(msg_norm)) and (_is_resolution_ack_heuristic(msg_norm) or is_llm_resolved): try: issue_hint = (input_data.last_issue or "").strip() or LAST_ISSUE_HINT.strip() short_desc, long_desc = _build_tracking_descriptions(issue_hint, input_data.user_message) result = create_incident(short_desc, long_desc) if isinstance(result, dict) and not result.get("error"): inc_number = result.get("number", "") sys_id = result.get("sys_id") resolved_note = "" if sys_id: ok = _set_incident_resolved(sys_id) resolved_note = " (marked Resolved)" if ok else " (could not mark Resolved; please update manually)" return { "bot_response": f"✅ Incident created: {inc_number}{resolved_note}", "status": "OK", "context_found": False, "ask_resolved": False, "suggest_incident": False, "followup": None, "debug": {"intent": "resolved_ack", "auto_created": True}, } else: err = (result or {}).get("error", "Unknown error") return { "bot_response": f"⚠️ I couldn't create the tracking incident automatically ({err}).", "status": "PARTIAL", "suggest_incident": True, "followup": "Shall I create a ticket now?", "options": [{"type": "yesno", "title": "Create ticket now?"}], "debug": {"intent": "resolved_ack_error"}, } except Exception as e: return { "bot_response": f"⚠️ Something went wrong while creating the tracking incident: {safe_str(e)}", "status": "PARTIAL", "suggest_incident": True, "followup": "Shall I create a ticket now?", "options": [{"type": "yesno", "title": "Create ticket now?"}], "debug": {"intent": "resolved_ack_exception"}, } # Incident intent if _is_incident_intent(msg_norm): return { "bot_response": "Okay, let's create a ServiceNow incident.", "status": (input_data.prev_status or "PARTIAL"), "context_found": False, "ask_resolved": False, "suggest_incident": False, "show_incident_form": True, "followup": None, "debug": {"intent": "create_ticket"}, } # Ticket status intent status_intent = _parse_ticket_status_intent(msg_norm) if status_intent: if status_intent.get("ask_number"): return { "bot_response": ( "To check a ticket status, please share the Incident ID (e.g., INC0012345).\n\n" "You can paste the ID here or say 'cancel'." ), "status": "PARTIAL", "show_status_form": True, "debug": {"intent": "status_request_missing_id"}, } try: token = get_valid_token() instance_url = os.getenv("SERVICENOW_INSTANCE_URL") if not instance_url: raise HTTPException(status_code=500, detail="SERVICENOW_INSTANCE_URL missing") headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} number = status_intent.get("number") url = f"{instance_url}/api/now/table/incident?number={number}" response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25) data = response.json() lst = data.get("result", []) result = (lst or [{}])[0] if response.status_code == 200 else {} state_code = builtins.str(result.get("state", "unknown")) state_label = STATE_MAP.get(state_code, state_code) short = result.get("short_description", "") num = result.get("number", number or "unknown") return { "bot_response": ( f"**Ticket:** {num}\n" f"**Status:** {state_label}\n" f"**Issue description:** {short}" ), "status": "OK", "show_assist_card": True, "followup": "Is there anything else I can assist you with?", "debug": {"intent": "status", "http_status": response.status_code}, } except Exception as e: raise HTTPException(status_code=500, detail=safe_str(e)) # --- Try WMS (fast-path + function-calling) BEFORE KB --- wms_res = _try_wms_tool(input_data.user_message) if wms_res is not None: return wms_res # ---------------- Hybrid KB fallback ---------------- kb_results = hybrid_search_knowledge_base(input_data.user_message, top_k=10, alpha=0.6, beta=0.4) documents = kb_results.get("documents", []) metadatas = kb_results.get("metadatas", []) distances = kb_results.get("distances", []) combined = kb_results.get("combined_scores", []) items: List[Dict[str, Any]] = [] for i, doc in enumerate(documents): text = doc.strip() if isinstance(doc, str) else "" if not text: continue meta = metadatas[i] if i < len(metadatas) and isinstance(metadatas[i], dict) else {} score = distances[i] if i < len(distances) else None comb = combined[i] if i < len(combined) else None m = dict(meta) if score is not None: m["distance"] = score if comb is not None: m["combined"] = comb items.append({"text": text, "meta": m}) selected = items[:max(1, 2)] context_raw = "\n\n---\n\n".join([s["text"] for s in selected]) if selected else "" filtered_text, filt_info = _filter_context_for_query(context_raw, input_data.user_message) context = filtered_text context_found = bool(context.strip()) best_distance = (min([d for d in distances if d is not None], default=None) if distances else None) best_combined = (max([c for c in combined if c is not None], default=None) if combined else None) detected_intent = kb_results.get("user_intent", "neutral") best_doc = kb_results.get("best_doc") top_meta = (metadatas or [{}])[0] if metadatas else {} msg_low = (input_data.user_message or "").lower() GENERIC_ERROR_TERMS = ("error", "issue", "problem", "not working", "failed", "failure") generic_error_signal = any(t in msg_low for t in GENERIC_ERROR_TERMS) # intent nudge for prereqs PREREQ_TERMS = ( "pre req", "pre-requisite", "pre-requisites", "prerequisite", "prerequisites", "pre requirement", "pre-requirements", "requirements", ) if detected_intent == "neutral" and any(t in msg_low for t in PREREQ_TERMS): detected_intent = "prereqs" # permission queries force 'errors' PERM_QUERY_TERMS = [ "permission", "permissions", "access", "access right", "authorization", "authorisation", "role", "role access", "security", "security profile", "privilege", "not allowed", "not authorized", "denied", ] is_perm_query = any(t in msg_norm for t in PERM_QUERY_TERMS) if is_perm_query: detected_intent = "errors" sec_title = (top_meta or {}).get("section", "") or "" sec_title_low = sec_title.strip().lower() PREREQ_HEADINGS = ("pre-requisites", "prerequisites", "pre requisites", "pre-requirements", "requirements") if detected_intent == "neutral" and any(h in sec_title_low for h in PREREQ_HEADINGS): detected_intent = "prereqs" def _contains_any(s: str, keywords: tuple) -> bool: low = (s or "").lower() return any(k in low for k in keywords) DOMAIN_TERMS = ( "trailer", "shipment", "order", "load", "wave", "inventory", "putaway", "receiving", "appointment", "dock", "door", "manifest", "pallet", "container", "asn", "grn", "pick", "picking", ) ACTION_OR_ERROR_TERMS = ( "how to", "procedure", "perform", "close", "closing", "open", "navigate", "scan", "confirm", "generate", "update", "receive", "receiving", "error", "issue", "fail", "failed", "not working", "locked", "mismatch", "access", "permission", "status", ) matched_count = int(filt_info.get("matched_count") or 0) filter_mode = (filt_info.get("mode") or "").lower() has_any_action_or_error = _contains_any(msg_low, ACTION_OR_ERROR_TERMS) mentions_domain = _contains_any(msg_low, DOMAIN_TERMS) short_query = len((input_data.user_message or "").split()) <= 4 gate_combined_ok = 0.60 if short_query else 0.55 combined_ok = (best_combined is not None and best_combined >= gate_combined_ok) weak_domain_only = (mentions_domain and not has_any_action_or_error) low_context_hit = (matched_count < 2 and filter_mode in ("concise", "exact")) strong_steps_bypass = True strong_error_signal = len(_detect_error_families(msg_low)) > 0 if (weak_domain_only or (low_context_hit and not combined_ok)) \ and not strong_steps_bypass \ and not (strong_error_signal or generic_error_signal): return { "bot_response": _build_clarifying_message(), "status": "NO_KB_MATCH", "context_found": False, "suggest_incident": True, "followup": "Share more details (module/screen/error), or say 'create ticket'.", "options": [{"type": "yesno", "title": "Share details or raise a ticket?"}], "debug": { "intent": "sop_rejected_weak_match", "matched_count": matched_count, "filter_mode": filter_mode, "best_combined": best_combined, "mentions_domain": mentions_domain, "has_any_action_or_error": has_any_action_or_error, }, } # Build SOP/Errors/Prereqs context escalation_line: Optional[str] = None full_errors: Optional[str] = None next_step_applied = False next_step_info: Dict[str, Any] = {} context_preformatted = False steps_override_applied = False if best_doc and detected_intent == "steps": sec = (top_meta or {}).get("section") full_steps = get_section_text(best_doc, sec) if sec else get_best_steps_section_text(best_doc) if full_steps: numbered_full = _ensure_numbering(full_steps) next_only = _anchor_next_steps(input_data.user_message, numbered_full, max_next=6) if next_only is not None: if len(next_only) == 0: context = "You are at the final step of this SOP. No further steps." next_step_applied = True next_step_info = {"count": 0} context_preformatted = True else: context = _dedupe_lines(_ensure_numbering("\n".join(next_only))) next_step_applied = True next_step_info = {"count": len(next_only)} context_preformatted = True else: context = numbered_full context_preformatted = True filt_info = {'mode': None, 'matched_count': None, 'all_sentences': None} context_found = True elif best_doc and detected_intent == "errors": said_not_resolved = _has_negation_resolved(msg_norm) if said_not_resolved: return { "bot_response": "Select an option below.", "status": "OK", "context_found": False, "show_incident_form": False, "show_status_form": False, "followup": None, "debug": {"intent": "errors_not_resolved", "best_doc": best_doc}, } full_errors = get_best_errors_section_text(best_doc) if full_errors: ctx_err = _extract_errors_only(full_errors, max_lines=30) if is_perm_query: context = _filter_permission_lines(ctx_err, max_lines=6) else: mentions_domain_local = any(t in msg_low for t in DOMAIN_TERMS) is_specific_error = (len(_detect_error_families(msg_low)) > 0) or mentions_domain_local if is_specific_error: context = _filter_context_for_query(ctx_err, input_data.user_message)[0] if not context.strip(): all_lines = _normalize_lines(ctx_err) error_bullets = [ln for ln in all_lines if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln)] context = "\n".join(error_bullets[:6]).strip() else: all_lines = _normalize_lines(ctx_err) error_bullets = [ln for ln in all_lines if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln)] context = "\n".join(error_bullets[:6]).strip() assist_followup = ( "Please tell me which error above matches your screen (paste the exact text), " "or share a screenshot. I can guide you further or raise a ServiceNow ticket." ) escalation_line = _extract_escalation_line(full_errors) try: steps_src = get_best_steps_section_text(best_doc) if steps_src: numbered_steps = _ensure_numbering(steps_src) next_only = _anchor_next_steps(input_data.user_message, numbered_steps, max_next=6) if next_only is not None and len(next_only) > 0: context = _dedupe_lines(_ensure_numbering("\n".join(next_only))) context_preformatted = True next_step_applied = True next_step_info = {"count": len(next_only), "source": "errors_domain_override"} steps_override_applied = True except Exception: pass elif best_doc and detected_intent == "prereqs": full_prereqs = _find_prereq_section_text(best_doc) if full_prereqs: context = full_prereqs.strip() context_found = True # language hint & paraphrase (errors only when not overridden) language_hint = _detect_language_hint(input_data.user_message) lang_line = f"Respond in {language_hint}." if language_hint else "Respond in a clear, polite tone." use_gemini = (detected_intent == "errors") and not steps_override_applied enhanced_prompt = f"""You are a helpful support assistant. Rewrite the provided context ONLY into clear, user-friendly guidance. - Do not add any information that is not present in the context. - If the content is an error/access/permission note, paraphrase it into a helpful sentence users can understand. - {lang_line} ### Context {context} ### Question {input_data.user_message} ### Output Return ONLY the rewritten guidance.""" bot_text = "" http_code = 0 if use_gemini and GEMINI_API_KEY: try: payload = {"contents": [{"role": "user", "parts": [{"text": enhanced_prompt}]}]} resp = _gemini_post(payload, timeout=25) http_code = getattr(resp, "status_code", 0) try: result = resp.json() except Exception: result = {} try: bot_text = result.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "") except Exception: bot_text = "" except Exception: bot_text, http_code = "", 0 # deterministic local formatting if detected_intent == "steps": bot_text = context if context_preformatted else _ensure_numbering(context) elif detected_intent == "errors": if not (bot_text or "").strip() or http_code == 429: bot_text = context.strip() if escalation_line: bot_text = (bot_text or "").rstrip() + "\n\n" + escalation_line else: bot_text = context # explicit escalation add if user asked needs_escalation = (" escalate" in msg_norm) or ("escalation" in msg_norm) if needs_escalation and best_doc: esc_text = get_escalation_text(best_doc) or full_errors or "" line = _extract_escalation_line(esc_text) if line: bot_text = (bot_text or "").rstrip() + "\n\n" + line if not (bot_text or "").strip(): bot_text = context.strip() if (context or "").strip() else ( "I found some related guidance but couldn’t assemble a reply. " "Share more detail (module/screen/error), or say ‘create ticket’." ) short_query = len((input_data.user_message or "").split()) <= 4 gate_combined_ok = 0.60 if short_query else 0.55 status = "OK" if (best_combined is not None and best_combined >= gate_combined_ok) else "PARTIAL" lower = (bot_text or "").lower() if ("partial" in lower) or ("may be partial" in lower) or ("closest" in lower) or ("may not fully" in lower): status = "PARTIAL" options = [{"type": "yesno", "title": "Share details or raise a ticket?"}] if status == "PARTIAL" else [] try: base_query = (input_data.user_message or "").strip() if detected_intent == "steps": LAST_ISSUE_HINT = base_query[:100] elif detected_intent == "errors": shown_lines = [ln.strip() for ln in (bot_text or "").splitlines() if ln.strip()] top_error_line = "" for ln in shown_lines: if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln): if "escalation" in ln.lower(): continue top_error_line = ln break if top_error_line: top_error_line = re.split(r"\bif you want to escalate\b", top_error_line, flags=re.IGNORECASE)[0].strip() LAST_ISSUE_HINT = (f"{base_query} — {top_error_line}" if top_error_line else base_query)[:100] except Exception: pass return { "bot_response": bot_text, "status": status, "context_found": True, "ask_resolved": (status == "OK" and detected_intent != "errors"), "suggest_incident": (status == "PARTIAL"), "followup": (assist_followup if assist_followup else ("Is this helpful, or should I raise a ticket?" if status == "PARTIAL" else None)), "options": options, "debug": { "used_chunks": len((context or "").split("\n\n---\n\n")) if context else 0, "best_distance": best_distance, "best_combined": best_combined, "http_status": http_code, "filter_mode": filt_info.get("mode") if isinstance(filt_info, dict) else None, "matched_count": filt_info.get("matched_count") if isinstance(filt_info, dict) else None, "user_intent": detected_intent, "best_doc": best_doc, "next_step": {"applied": next_step_applied, "info": next_step_info}, }, } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=safe_str(e)) # ============================================================================= # Ticket description generation # ============================================================================= @app.post("/generate_ticket_desc") async def generate_ticket_desc_ep(input_data: TicketDescInput): try: prompt = ( f"You are helping generate ServiceNow ticket descriptions based on the issue: {input_data.issue}.\n" "Please return the output strictly in JSON format with the following keys:\n" "{\n" ' "ShortDescription": "A concise summary of the issue (max 100 characters)",\n' ' "DetailedDescription": "A detailed explanation of the issue"\n' "}\n" "Do not include any extra text, comments, or explanations outside the JSON." ) payload = {"contents": [{"role": "user", "parts": [{"text": prompt}]}]} resp = _gemini_post(payload, timeout=25) try: data = resp.json() except Exception: return {"ShortDescription": "", "DetailedDescription": "", "error": "Gemini returned non-JSON"} try: text = data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "").strip() except Exception: return {"ShortDescription": "", "DetailedDescription": "", "error": "Gemini parsing failed"} if text.startswith("```"): lines = [ln for ln in text.splitlines() if not ln.strip().startswith("```")] text = "\n".join(lines).strip() try: ticket_json = json.loads(text) return { "ShortDescription": ticket_json.get("ShortDescription", "").strip(), "DetailedDescription": ticket_json.get("DetailedDescription", "").strip(), } except Exception: return {"ShortDescription": "", "DetailedDescription": "", "error": "Invalid JSON returned"} except Exception as e: raise HTTPException(status_code=500, detail=safe_str(e)) # ============================================================================= # Incident status # ============================================================================= @app.post("/incident_status") async def incident_status(input_data: TicketStatusInput): try: token = get_valid_token() instance_url = os.getenv("SERVICENOW_INSTANCE_URL") if not instance_url: raise HTTPException(status_code=500, detail="SERVICENOW_INSTANCE_URL missing") headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} if input_data.sys_id: url = f"{instance_url}/api/now/table/incident/{input_data.sys_id}" response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25) data = response.json() result = data.get("result", {}) if response.status_code == 200 else {} elif input_data.number: url = f"{instance_url}/api/now/table/incident?number={input_data.number}" response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25) data = response.json() lst = data.get("result", []) result = (lst or [{}])[0] if response.status_code == 200 else {} else: raise HTTPException(status_code=400, detail="Provide IncidentID (number) or sys_id") state_code = builtins.str(result.get("state", "unknown")) state_label = STATE_MAP.get(state_code, state_code) short = result.get("short_description", "") number = result.get("number", input_data.number or "unknown") return { "bot_response": ( f"**Ticket:** {number} \n" f"**Status:** {state_label} \n" f"**Issue description:** {short}" ).replace("\n", " \n"), "followup": "Is there anything else I can assist you with?", "show_assist_card": True, "persist": True, "debug": "Incident status fetched", } except Exception as e: raise HTTPException(status_code=500, detail=safe_str(e)) # ============================================================================= # Incident creation # ============================================================================= def _classify_resolution_llm(user_message: str) -> bool: if not GEMINI_API_KEY and "key=" not in GEMINI_URL: return False prompt = f"""Classify if the following user message indicates that the issue is resolved or working now. Return only 'true' or 'false'. Message: {user_message}""" payload = {"contents": [{"role": "user", "parts": [{"text": prompt}]}]} try: resp = _gemini_post(payload, timeout=12) data = resp.json() text = (data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "") or "") return "true" in text.strip().lower() except Exception: return False def _set_incident_resolved(sys_id: str) -> bool: try: token = get_valid_token() instance_url = os.getenv("SERVICENOW_INSTANCE_URL") if not instance_url: print("[SN PATCH resolve] missing SERVICENOW_INSTANCE_URL") return False headers = { "Authorization": f"Bearer {token}", "Accept": "application/json", "Content-Type": "application/json", } url = f"{instance_url}/api/now/table/incident/{sys_id}" close_code_val = os.getenv("SERVICENOW_CLOSE_CODE", "Solution provided") close_notes_val = os.getenv("SERVICENOW_RESOLUTION_NOTES", "Issue resolved, user confirmed") caller_sysid = os.getenv("SERVICENOW_CALLER_SYSID") resolved_by_sysid = os.getenv("SERVICENOW_RESOLVED_BY_SYSID") assign_group = os.getenv("SERVICENOW_ASSIGNMENT_GROUP_SYSID") require_progress = os.getenv("SERVICENOW_REQUIRE_IN_PROGRESS_FIRST", "false").lower() in ("1", "true", "yes") if require_progress: try: resp1 = requests.patch(url, headers=headers, json={"state": "2"}, verify=VERIFY_SSL, timeout=25) print(f"[SN PATCH progress] status={resp1.status_code} body={resp1.text[:500]}") except Exception as e: print(f"[SN PATCH progress] exception={safe_str(e)}") def clean(d: dict) -> dict: return {k: v for k, v in d.items() if v is not None} payload_A = clean({ "state": "6", "close_code": close_code_val, "close_notes": close_notes_val, "caller_id": caller_sysid, "resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), "work_notes": "Auto-resolve set by NOVA.", "resolved_by": resolved_by_sysid, "assignment_group": assign_group, }) respA = requests.patch(url, headers=headers, json=payload_A, verify=VERIFY_SSL, timeout=25) if respA.status_code in (200, 204): return True print(f"[SN PATCH resolve A] status={respA.status_code} body={respA.text[:500]}") payload_B = clean({ "state": "Resolved", "close_code": close_code_val, "close_notes": close_notes_val, "caller_id": caller_sysid, "resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), "work_notes": "Auto-resolve set by NOVA.", "resolved_by": resolved_by_sysid, "assignment_group": assign_group, }) respB = requests.patch(url, headers=headers, json=payload_B, verify=VERIFY_SSL, timeout=25) if respB.status_code in (200, 204): return True print(f"[SN PATCH resolve B] status={respB.status_code} body={respB.text[:500]}") code_field = os.getenv("SERVICENOW_RESOLUTION_CODE_FIELD", "close_code") notes_field = os.getenv("SERVICENOW_RESOLUTION_NOTES_FIELD", "close_notes") payload_C = clean({ "state": "6", code_field: close_notes_val, notes_field: close_notes_val, "caller_id": caller_sysid, "resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), "work_notes": "Auto-resolve set by NOVA.", "resolved_by": resolved_by_sysid, "assignment_group": assign_group, }) respC = requests.patch(url, headers=headers, json=payload_C, verify=VERIFY_SSL, timeout=25) if respC.status_code in (200, 204): return True print(f"[SN PATCH resolve C] status={respC.status_code} body={respC.text[:500]}") return False except Exception as e: print(f"[SN PATCH resolve] exception={safe_str(e)}") return False # ============================================================================= # Helper functions used by KB logic # ============================================================================= NUMBERING_STYLE = os.getenv("NUMBERING_STYLE", "digit").lower() # 'digit' or 'step' DOMAIN_STATUS_TERMS = ( "shipment", "order", "load", "trailer", "wave", "inventory", "putaway", "receiving", "appointment", "dock", "door", "manifest", "pallet", "container", "asn", "grn", "pick", "picking", ) ERROR_FAMILY_SYNS = { "NOT_FOUND": ("not found", "missing", "does not exist", "doesn't exist", "unavailable", "not available", "cannot find", "no such", "not present", "absent"), "MISMATCH": ("mismatch", "doesn't match", "does not match", "variance", "difference", "discrepancy", "not equal"), "LOCKED": ("locked", "status locked", "blocked", "read only", "read-only", "frozen", "freeze"), "PERMISSION": ("permission", "permissions", "access denied", "not authorized", "not authorised", "insufficient privileges", "no access", "authorization", "authorisation"), "TIMEOUT": ("timeout", "timed out", "network", "connection", "unable to connect", "disconnected", "no network"), "SYNC": ("sync", "synchronization", "synchronisation", "replication", "refresh", "out of sync", "stale", "delay", "lag"), } def _detect_error_families(msg: str) -> list: low = (msg or "").lower() low_norm = re.sub(r"[^\w\s]", " ", low) low_norm = re.sub(r"\s+", " ", low_norm).strip() fams = [] for fam, syns in ERROR_FAMILY_SYNS.items(): if any(s in low_norm for s in syns): fams.append(fam) return fams def _is_domain_status_context(msg_norm: str) -> bool: if "status locked" in msg_norm or "locked status" in msg_norm: return True return any(term in msg_norm for term in DOMAIN_STATUS_TERMS) def _normalize_lines(text: str) -> List[str]: raw = (text or "") try: return [ln.strip() for ln in raw.splitlines() if ln.strip()] except Exception: return [raw.strip()] if raw.strip() else [] def _filter_context_for_query(context: str, query: str) -> Tuple[str, Dict[str, Any]]: STRICT_OVERLAP = 3 MAX_SENTENCES_STRICT = 4 MAX_SENTENCES_CONCISE = 3 def _norm(text: str) -> str: t = (text or "").lower() t = re.sub(r"[^\w\s]", " ", t) t = re.sub(r"\s+", " ", t).strip() return t def _split_sents(ctx: str) -> List[str]: raw_sents = re.split(r"(?<=\w[.!?])\s+", ctx or "") return [s.strip() for s in raw_sents if s and len(s.strip()) > 2] ctx = (context or "").strip() if not ctx or not query: return ctx, {'mode': 'concise', 'matched_count': 0, 'all_sentences': 0} q_norm = _norm(query) q_terms = [t for t in q_norm.split() if len(t) > 2] if not q_terms: return ctx, {'mode': 'concise', 'matched_count': 0, 'all_sentences': 0} sentences = _split_sents(ctx) matched_exact, matched_any = [], [] for s in sentences: s_norm = _norm(s) is_bullet = bool(re.match(r"^[-*\u2022]\s*", s)) overlap = sum(1 for t in q_terms if t in s_norm) + (1 if is_bullet else 0) if overlap >= STRICT_OVERLAP: matched_exact.append(s) elif overlap > 0: matched_any.append(s) if matched_exact: kept = matched_exact[:MAX_SENTENCES_STRICT] return _dedupe_lines("\n".join(kept).strip()), { 'mode': 'exact', 'matched_count': len(kept), 'all_sentences': len(sentences), } if matched_any: kept = matched_any[:MAX_SENTENCES_CONCISE] return _dedupe_lines("\n".join(kept).strip()), { 'mode': 'concise', 'matched_count': len(kept), 'all_sentences': len(sentences), } kept = sentences[:MAX_SENTENCES_CONCISE] return _dedupe_lines("\n".join(kept).strip()), { 'mode': 'concise', 'matched_count': 0, 'all_sentences': len(sentences), } def _extract_errors_only(text: str, max_lines: int = 12) -> str: kept: List[str] = [] for ln in _normalize_lines(text): if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln): kept.append(ln) if len(kept) >= max_lines: break return "\n".join(kept).strip() if kept else (text or "").strip() def _filter_permission_lines(text: str, max_lines: int = 6) -> str: PERM_SYNONYMS = ( "permission", "permissions", "access", "authorization", "authorisation", "role", "role mapping", "security profile", "not allowed", "not authorized", "denied", "insufficient", ) kept: List[str] = [] for ln in _normalize_lines(text): low = ln.lower() if any(k in low for k in PERM_SYNONYMS): kept.append(ln) if len(kept) >= max_lines: break return "\n".join(kept).strip() if kept else (text or "").strip() def _extract_escalation_line(text: str) -> Optional[str]: if not text: return None lines = _normalize_lines(text) if not lines: return None start_idx = None for i, ln in enumerate(lines): low = ln.lower() if "escalation" in low or "escalation path" in low or "escalate" in low: start_idx = i break block: List[str] = [] if start_idx is not None: for j in range(start_idx, min(len(lines), start_idx + 6)): if not lines[j].strip(): break block.append(lines[j].strip()) else: block = [ln.strip() for ln in lines if ("->" in ln or "→" in ln)] if not block: return None text_block = " ".join(block) m = re.search(r"escalation[^:]*:\s*(.+)", text_block, flags=re.IGNORECASE) path = m.group(1).strip() if m else None if not path: arrow_lines = [ln for ln in block if ("->" in ln or "→" in ln)] if arrow_lines: path = arrow_lines[0] if not path: m2 = re.search(r"(operator.*?administrator.*operator.*)", text_block, flags=re.IGNORECASE) path = m2.group(1).strip() if m2 else None if not path: return None path = path.replace("->", "→").strip() path = re.sub(r"^(?i:escalation\s*path)\s*:\s*", "", path).strip() return f"If you want to escalate the issue, follow: {path}" def _detect_language_hint(msg: str) -> Optional[str]: if re.search(r"[\u0B80-\u0BFF]", msg or ""): # Tamil return "Tamil" if re.search(r"[\u0900-\u097F]", msg or ""): # Hindi return "Hindi" return None def _build_clarifying_message() -> str: return ( "It seems the issue isn’t resolved yet. Would you like to share a few details so I can check further, " "or should I raise a ServiceNow ticket for you?" ) def _build_tracking_descriptions(issue_text: str, resolved_text: str) -> Tuple[str, str]: issue = (issue_text or "").strip() resolved = (resolved_text or "").strip() is_process_query = bool(re.search(r"\b(how to|steps|procedure|process)\b", issue.lower())) if is_process_query: cleaned_issue = re.sub(r"^\s*(process\s*steps\s*[-–:]?\s*)", "", issue, flags=re.IGNORECASE).strip() short_desc = f"Process Steps – {cleaned_issue or issue}"[:100] else: short_desc = issue[:100] long_desc = ( f'User reported: "{issue}". ' f'User confirmation: "{resolved}". ' f"Tracking record created automatically by NOVA." ).strip() return short_desc, long_desc def _is_incident_intent(msg_norm: str) -> bool: intent_phrases = [ "create ticket", "create a ticket", "raise ticket", "raise a ticket", "open ticket", "open a ticket", "create incident", "create an incident", "raise incident", "raise an incident", "open incident", "open an incident", "log ticket", "log an incident", "generate ticket", "create snow ticket", "raise snow ticket", "raise service now ticket", "create service now ticket", "raise sr", "open sr", ] return any(p in msg_norm for p in intent_phrases) def _parse_ticket_status_intent(msg_norm: str) -> Dict[str, Optional[str]]: status_keywords = ["status", "ticket status", "incident status", "check status", "check ticket status", "check incident status"] base_has_status = any(k in msg_norm for k in status_keywords) has_ticket_marker = ( any(w in msg_norm for w in ("ticket", "incident", "servicenow", "snow")) or bool(re.search(r"\binc\d{5,}\b", msg_norm, flags=re.IGNORECASE)) ) if (not base_has_status) or (base_has_status and not has_ticket_marker and _is_domain_status_context(msg_norm)): return {} patterns = [r"(?:incident\s*id|incidentid|ticket\s*number|number)\s*[:=]?\s*(inc\d+)", r"(inc\d+)"] for pat in patterns: m = re.search(pat, msg_norm, flags=re.IGNORECASE) if m: val = m.group(1).strip() if val: return {"number": val.upper() if val.lower().startswith("inc") else val} return {"number": None, "ask_number": True} def _is_resolution_ack_heuristic(msg_norm: str) -> bool: phrases = [ "it is resolved", "resolved", "issue resolved", "problem resolved", "it's working", "working now", "works now", "fixed", "sorted", "ok now", "fine now", "all good", "all set", "thanks works", "thank you it works", "back to normal", ] return any(p in msg_norm for p in phrases) def _has_negation_resolved(msg_norm: str) -> bool: neg_phrases = [ "not resolved", "issue not resolved", "still not working", "not working", "didn't work", "doesn't work", "no change", "not fixed", "still failing", "failed again", "broken", "fail", ] return any(p in msg_norm for p in neg_phrases) def _ensure_numbering(text: str) -> str: text = re.sub(r"[\u2060\u200B]", "", text or "") lines = [ln.strip() for ln in (text or "").splitlines() if ln and ln.strip()] if not lines: return text or "" para = " ".join(lines).strip() if not para: return "" para_clean = re.sub(r"(?:\b\d+\s*[\.\)])\s+", "\n\n\n", para) para_clean = re.sub(r"(?:[\u2460-\u2473]\s+)", "\n\n\n", para_clean) para_clean = re.sub(r"(?i)\bstep\s*\d+\s*:?\s*", "\n\n\n", para_clean) segments = [seg.strip() for seg in para_clean.split("\n\n\n") if seg.strip()] if len(segments) < 2: tmp = [ln.strip() for ln in para.splitlines() if ln.strip()] segments = tmp if len(tmp) > 1 else [seg.strip() for seg in re.split(r"(?<=\w[.!?])\s+", para) if seg.strip()] def strip_prefix_any(s: str) -> str: return re.sub( r"^\s*(?:" r"(?:\d+\s*[\.\)])|" r"(?i:step\s*\d+:?)|" r"(?:[-*\u2022])|" r"(?:[\u2460-\u2473])" r")\s*", "", (s or "").strip(), ) clean_segments = [strip_prefix_any(seg) for seg in segments if seg.strip()] circled = {i: chr(9311 + i) for i in range(1, 21)} out = [] for idx, seg in enumerate(clean_segments, start=1): marker = circled.get(idx, f"{idx})") out.append(f"{marker} {seg}") return "\n".join(out) def _dedupe_lines(text: str) -> str: seen, out = set(), [] for ln in (text or "").splitlines(): key = re.sub(r"\s+", " ", (ln or "").strip().lower()) if key and key not in seen: out.append(ln) seen.add(key) return "\n".join(out).strip() def _split_sentences(block: str) -> list: parts = [t.strip() for t in re.split(r"(?<=\w[.!?])\s+", block or "") if t.strip()] return parts if parts else ([block.strip()] if (block or "").strip() else []) def _similarity(a: str, b: str) -> float: def _norm_text(s: str) -> str: s = (s or "").lower() s = re.sub(r"[^\w\s]", " ", s) s = re.sub(r"\s+", " ", s).strip() return s a_norm, b_norm = _norm_text(a), _norm_text(b) ta, tb = set(a_norm.split()), set(b_norm.split()) inter = len(ta & tb) union = len(ta | tb) or 1 jacc = inter / union def _bigrams(tokens: list) -> set: return set([" ".join(tokens[i:i + 2]) for i in range(len(tokens) - 1)]) if len(tokens) > 1 else set() ab, bb = _bigrams(a_norm.split()), _bigrams(b_norm.split()) big_inter = len(ab & bb) big_union = len(ab | bb) or 1 big = big_inter / big_union char = SequenceMatcher(None, a_norm, b_norm).ratio() return min(1.0, 0.45 * jacc + 0.30 * big + 0.35 * char) def _extract_anchor_from_query(msg: str) -> dict: raw = (msg or "").strip() low = re.sub(r"[^\w\s]", " ", raw.lower()).strip() FOLLOWUP_CUES = ("what next", "what is next", "what to do", "then", "after that", "next") has_followup = any(cue in low for cue in FOLLOWUP_CUES) parts = [p.strip() for p in re.split(r"[?.,;:\-\n]+", raw) if p.strip()] if not parts: return {"anchor": raw, "has_followup": has_followup} last = parts[-1] last_low = re.sub(r"[^\w\s]", " ", last.lower()).strip() if any(cue in last_low for cue in FOLLOWUP_CUES) and len(parts) >= 2: anchor = parts[-2] else: anchor = parts[-1] if len(parts) > 1 else parts[0] return {"anchor": anchor.strip(), "has_followup": has_followup} def _anchor_next_steps(user_message: str, numbered_text: str, max_next: int = 8) -> list | None: steps = [ln.strip() for ln in (numbered_text or "").splitlines() if ln.strip()] if not steps: return None info = _extract_anchor_from_query(user_message) anchor = info.get("anchor", "").strip() if not anchor: return None has_followup = bool(info.get("has_followup")) candidates = [] for idx, step_line in enumerate(steps): s_full = _similarity(anchor, step_line) literal_hit = False scores = [s_full] for s in _split_sentences(step_line): scores.append(_similarity(anchor, s)) a_flat = re.sub(r"\W+", "", re.sub(r"[^\w\s]", " ", anchor.lower())) s_flat = re.sub(r"\W+", "", re.sub(r"[^\w\s]", " ", s.lower())) if a_flat and (a_flat in s_flat or s_flat in a_flat): literal_hit = True score = max(scores) candidates.append((idx, score, literal_hit)) candidates.sort(key=lambda t: (t[1], t[0]), reverse=True) best_idx, best_score, best_literal = candidates[0] tok_count = len([t for t in re.sub(r"[^\w\s]", " ", anchor.lower()).split() if len(t) > 1]) if best_literal: accept = True else: base_ok = best_score >= (0.55 if not has_followup else 0.50) len_ok = (best_score >= 0.40) and (tok_count >= 3) accept = base_ok or len_ok if not accept: return None start = best_idx + 1 if start >= len(steps): return [] end = min(start + max_next, len(steps)) next_steps = steps[start:end] return [ln for ln in _dedupe_lines("\n".join(next_steps)).splitlines() if ln.strip()]