Chatbot-Backend / main.py
srilakshu012456's picture
Update main.py
cbd0fc7 verified
# main.py — Hugging Face backend (drop-in)
import os
import json
import re
import requests
import builtins
from typing import Optional, Any, Dict, List, Tuple
from contextlib import asynccontextmanager
from datetime import datetime
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from dotenv import load_dotenv
from difflib import SequenceMatcher
# --- WMS API integration imports ---
# NOTE: these must exist in your repo (you shared them earlier)
from public_api.public_api_auth import login
from public_api.public_api_item import get_item
from public_api.public_api_inventory import get_inventory_holds
from public_api.public_api_location import get_location_status
from public_api.utils import flatten_json, extract_fields
from public_api.field_mapping import TOOL_REGISTRY, ALL_TOOLS
# KB services (keep your current modules)
from services.kb_creation import (
collection,
ingest_documents,
hybrid_search_knowledge_base,
get_section_text,
get_best_steps_section_text,
get_best_errors_section_text,
get_escalation_text,
)
# ServiceNow helpers (keep your current modules)
from services.login import router as login_router
from services.generate_ticket import get_valid_token, create_incident
# =============================================================================
# Environment / Gemini config
# =============================================================================
load_dotenv()
VERIFY_SSL = os.getenv("SERVICENOW_SSL_VERIFY", "true").lower() in ("1", "true", "yes")
GEMINI_SSL_VERIFY = os.getenv("GEMINI_SSL_VERIFY", "true").lower() in ("1", "true", "yes")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
# Safer header-based auth; model can be flash or flash-lite.
# If you prefer your friend's exact style, set GEMINI_URL_WITH_KEY in secrets to override.
GEMINI_URL = os.getenv(
"GEMINI_URL_WITH_KEY",
"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent"
)
os.environ["POSTHOG_DISABLED"] = "true"
# --- API WMS session cache ---
_WMS_SESSION_DATA: Optional[dict] = None
_WMS_WAREHOUSE_ID: Optional[str] = None
def _gemini_post(payload: dict, timeout: int = 25):
"""
Central helper to call Gemini. Supports either header API key or URL ?key=...
"""
headers = {"Content-Type": "application/json"}
# If URL doesn't contain ?key=..., send header
if "key=" not in GEMINI_URL:
if not GEMINI_API_KEY:
# return a mock-ish response object for graceful handling
return type("Resp", (), {
"status_code": 401,
"json": lambda: {"error": {"message": "Missing GEMINI_API_KEY"}},
"text": "Missing GEMINI_API_KEY",
"raise_for_status": lambda: None,
})()
headers["x-goog-api-key"] = GEMINI_API_KEY
return requests.post(GEMINI_URL, headers=headers, json=payload, timeout=timeout, verify=GEMINI_SSL_VERIFY)
def _ensure_wms_session() -> bool:
"""Login once to WMS and cache session_data + default warehouse."""
global _WMS_SESSION_DATA, _WMS_WAREHOUSE_ID
if _WMS_SESSION_DATA and _WMS_WAREHOUSE_ID:
return True
try:
session, warehouse_id, ok = login()
if ok and session:
_WMS_SESSION_DATA = {"wms_auth": session, "user_warehouse_id": warehouse_id}
_WMS_WAREHOUSE_ID = warehouse_id
return True
print("[WMS] login unsuccessful: ok=", ok, "warehouse_id=", warehouse_id)
except Exception as e:
print("[WMS] login failed:", e)
return False
# =============================================================================
# Minimal server-side cache (used to populate short description later)
# =============================================================================
LAST_ISSUE_HINT: str = ""
def safe_str(e: Any) -> str:
try:
return builtins.str(e)
except Exception:
return "<error stringify failed>"
# =============================================================================
# App / Lifespan
# =============================================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
try:
folder_path = os.path.join(os.getcwd(), "documents")
if collection.count() == 0:
print("[KB] empty. Running ingestion...")
ingest_documents(folder_path)
else:
print(f"[KB] already populated with {collection.count()} entries. Skipping ingestion.")
except Exception as e:
print(f"[KB] ingestion failed: {safe_str(e)}")
yield
app = FastAPI(lifespan=lifespan)
app.include_router(login_router)
# CORS — add your Hugging Face frontend Space URL here
origins = [
"https://chatbotnova-chatbot-frontend.hf.space",
# "http://localhost:5173",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# =============================================================================
# Models (align to your frontend)
# =============================================================================
class ChatInput(BaseModel):
user_message: str
prev_status: Optional[str] = None
last_issue: Optional[str] = None
class IncidentInput(BaseModel):
short_description: str
description: str
mark_resolved: Optional[bool] = False
class TicketDescInput(BaseModel):
issue: str
class TicketStatusInput(BaseModel):
sys_id: Optional[str] = None
number: Optional[str] = None
STATE_MAP = {
"1": "New",
"2": "In Progress",
"3": "On Hold",
"6": "Resolved",
"7": "Closed",
"8": "Canceled",
}
# =============================================================================
# WMS Tool Orchestration (fast-path + Gemini function-calling)
# =============================================================================
def _wms_fastpath(user_text: str) -> Optional[dict]:
"""
Deterministic regex-based path for Item/Location/Holds.
Returns a LIVE_API table dict if matched; else None.
"""
if not _ensure_wms_session():
return {
"bot_response": "⚠️ Could not login to WMS from this environment. Please try again or raise a ticket.",
"status": "PARTIAL",
"context_found": False,
"ask_resolved": False,
"suggest_incident": True,
"followup": "Shall I raise an incident now?",
"source": "ERROR",
"debug": {"intent": "wms_login_failed_fastpath"},
}
session_data = _WMS_SESSION_DATA
default_wh = _WMS_WAREHOUSE_ID
text = (user_text or "").strip()
# Item: "Get item number 100001 ..." / "Show item 100001 ..."
m_item = re.search(r"\bitem\s+(?:number|id)?\s*[: ]?([A-Za-z0-9\-]+)", text, flags=re.IGNORECASE)
if m_item:
item_num = m_item.group(1)
try:
raw = get_item(session_data, default_wh, item_num)
if isinstance(raw, dict) and raw.get("error"):
return {
"bot_response": f"⚠️ {raw['error']}",
"status": "PARTIAL",
"context_found": False,
"suggest_incident": True,
"followup": "Should I raise a ServiceNow ticket?",
"source": "ERROR",
"debug": {"intent": "get_item_fastpath_error"},
}
data_list = raw.get("data", []) or []
if not data_list:
return {
"bot_response": "I searched WMS but couldn't find matching records.",
"status": "PARTIAL",
"context_found": False,
"suggest_incident": True,
"followup": "Do you want me to raise a ticket or try a different ID?",
"source": "LIVE_API",
"debug": {"intent": "get_item_fastpath_empty"},
}
cfg = TOOL_REGISTRY["get_item_data"]
flat = flatten_json(data_list[0])
requested_labels = []
# Try mapping labels automatically if the user lists fields
labels = set(cfg["mapping"].keys())
for lbl in labels:
if re.search(rf"\b{re.escape(lbl)}\b", text, flags=re.IGNORECASE):
requested_labels.append(lbl)
if requested_labels:
filtered_map = {k: v for k, v in cfg["mapping"].items() if k in requested_labels or k == "Item"}
order = ["Item"] + [f for f in requested_labels if f != "Item"]
else:
summary_keys = ["Item", "Description", "Item type", "Warehouse ID"]
filtered_map = {k: v for k, v in cfg["mapping"].items() if k in summary_keys}
order = summary_keys
cleaned = extract_fields(flat, filtered_map, cfg.get("formatters", {}), requested_order=order)
lines = [f"{k}: {cleaned.get(k, 'N/A')}" for k in order]
msg = "Details:\n" + "\n".join(lines)
return {
"bot_response": msg,
"status": "OK",
"context_found": True,
"source": "LIVE_API",
"type": "table",
"data": [data_list[0]],
"show_export": False,
"debug": {"intent": "get_item_fastpath", "warehouse_id": default_wh},
}
except Exception as e:
print("[WMS fastpath:item] error:", e)
# Location: "Check location A1-01-01"
m_loc = re.search(r"\b(?:check|status|loc(?:ation)?)\s+(?:bin\s+)?([A-Za-z0-9\-]+)", text, flags=re.IGNORECASE)
if m_loc:
stoloc = m_loc.group(1)
try:
raw = get_location_status(session_data, default_wh, stoloc)
if isinstance(raw, dict) and raw.get("error"):
return {
"bot_response": f"⚠️ {raw['error']}",
"status": "PARTIAL",
"context_found": False,
"suggest_incident": True,
"followup": "Should I raise a ServiceNow ticket?",
"source": "ERROR",
"debug": {"intent": "get_location_fastpath_error"},
}
data_list = raw.get("data", []) or []
cfg = TOOL_REGISTRY["get_location_status"]
rows = []
for item in data_list:
rows.append(extract_fields(flatten_json(item), cfg["mapping"], cfg.get("formatters", {})))
msg = f"Location '{stoloc}' status:\n" + (("\n" + "; ".join([f"{k}: {v}" for k, v in rows[0].items()])) if rows else "No status returned")
return {
"bot_response": msg,
"status": "OK",
"context_found": True,
"source": "LIVE_API",
"type": "table",
"data": data_list,
"show_export": False,
"debug": {"intent": "get_location_fastpath", "warehouse_id": default_wh},
}
except Exception as e:
print("[WMS fastpath:location] error:", e)
# Holds: "Show holds for LPN 123456" / "Show all inventory holds"
m_lpn = re.search(r"\b(?:lpn|lodnum)\s*[: ]?([A-Za-z0-9\-]+)", text, flags=re.IGNORECASE)
want_all_holds = re.search(r"\b(all\s+inventory\s+holds|list\s+holds|show\s+holds)\b", text, flags=re.IGNORECASE)
if m_lpn or want_all_holds:
kwargs = {}
if m_lpn:
kwargs["lodnum"] = m_lpn.group(1)
try:
raw = get_inventory_holds(session_data, default_wh, **kwargs)
if isinstance(raw, dict) and raw.get("error"):
return {
"bot_response": f"⚠️ {raw['error']}",
"status": "PARTIAL",
"context_found": False,
"suggest_incident": True,
"followup": "Should I raise a ServiceNow ticket?",
"source": "ERROR",
"debug": {"intent": "get_holds_fastpath_error"},
}
data_list = raw.get("data", []) or []
if not data_list:
return {
"bot_response": "I searched WMS but couldn't find matching records.",
"status": "PARTIAL",
"context_found": False,
"suggest_incident": True,
"followup": "Do you want me to raise a ticket or try a different ID?",
"source": "LIVE_API",
"debug": {"intent": "get_holds_fastpath_empty"},
}
cfg = TOOL_REGISTRY["get_inventory_holds"]
rows_for_text, total_qty = [], 0
for item in data_list:
row = extract_fields(flatten_json(item), cfg["mapping"], cfg.get("formatters", {}))
rows_for_text.append("• " + "; ".join(f"{k}: {v}" for k, v in row.items()))
try:
total_qty += int(item.get("untqty", 0) or 0)
except Exception:
pass
msg = f"Found {len(rows_for_text)} hold records (Total Qty: {total_qty}).\n" + "\n".join(rows_for_text[:10])
return {
"bot_response": msg,
"status": "OK",
"context_found": True,
"source": "LIVE_API",
"type": "table",
"data": data_list,
"show_export": True,
"debug": {"intent": "get_holds_fastpath", "warehouse_id": default_wh},
}
except Exception as e:
print("[WMS fastpath:holds] error:", e)
return None # no fast-path match
def _try_wms_tool(user_text: str) -> Optional[dict]:
"""
Orchestrates WMS tool calls:
1) Fast-path (regex)
2) Gemini function-calling (fallback)
"""
# Fast-path first (robust on HF even if functionCall is missing)
fast = _wms_fastpath(user_text)
if fast is not None:
return fast
# Deep fallback: Gemini function calling (try both camelCase & snake_case tool keys)
payload = {
"contents": [{"role": "user", "parts": [{"text": user_text}]}],
# We include both casings to maximize compatibility.
"tools": [
{"functionDeclarations": ALL_TOOLS}, # REST canonical
{"function_declarations": ALL_TOOLS}, # friend-style
],
"toolConfig": {"functionCallingConfig": {"mode": "ANY"}}, # nudge model to call tools
}
try:
resp = _gemini_post(payload, timeout=25)
# If REST key mismatch, this ensures no crash
try:
resp.raise_for_status()
except Exception:
pass
data = {}
try:
data = resp.json()
except Exception:
pass
candidates = data.get("candidates", [])
if not candidates:
return None
content = candidates[0].get("content", {})
parts = content.get("parts", [])
part = parts[0] if parts else {}
fn_call = part.get("functionCall")
if not fn_call or "name" not in fn_call:
return None
if not _ensure_wms_session():
return {
"bot_response": "⚠️ Could not login to WMS from this environment.",
"status": "PARTIAL",
"context_found": False,
"suggest_incident": True,
"followup": "Shall I raise an incident?",
"source": "ERROR",
"debug": {"intent": "wms_login_failed_functioncall"},
}
session_data = _WMS_SESSION_DATA
default_wh = _WMS_WAREHOUSE_ID
tool_name = fn_call["name"]
args = fn_call.get("args", {}) or {}
cfg = TOOL_REGISTRY.get(tool_name)
if not cfg:
return None
# Warehouse override
wh = (
args.pop("warehouse_id", None)
or args.pop("warehouseId", None)
or args.pop("wh_id", None)
or default_wh
)
tool_fn = cfg.get("function")
if not callable(tool_fn):
return {
"bot_response": f"⚠️ Tool '{tool_name}' is not callable. Check TOOL_REGISTRY.",
"status": "PARTIAL",
"context_found": False,
"suggest_incident": True,
"followup": "Should I raise a ServiceNow ticket?",
"source": "ERROR",
"debug": {"intent": "wms_config_error", "tool": tool_name},
}
# Correct invocation by signature
if tool_name == "get_inventory_holds":
raw = tool_fn(session_data, wh, **args)
elif tool_name == "get_location_status":
stoloc = args.get("stoloc")
raw = tool_fn(session_data, wh, stoloc)
else:
id_param = cfg.get("id_param") # "item_number"
target = args.get(id_param)
if id_param and target is None:
simple_vals = [v for v in args.values() if isinstance(v, (str, int))]
target = simple_vals[0] if simple_vals else None
if not target:
return {
"bot_response": f"Please share the {id_param} (e.g., item number).",
"status": "PARTIAL",
"context_found": False,
"suggest_incident": False,
"followup": None,
"source": "CLIENT",
"debug": {"intent": "wms_missing_id_param", "tool": tool_name},
}
extra_kwargs = {k: v for k, v in args.items() if k != id_param}
raw = tool_fn(session_data, wh, target, **extra_kwargs)
if isinstance(raw, dict) and raw.get("error"):
return {
"bot_response": f"⚠️ {raw['error']}",
"status": "PARTIAL",
"context_found": False,
"suggest_incident": True,
"followup": "Should I raise a ServiceNow ticket?",
"source": "ERROR",
"debug": {"intent": "wms_error", "tool": tool_name},
}
response_key = cfg.get("response_key", "data")
data_list = (raw.get(response_key, []) if isinstance(raw, dict) else []) or []
if not data_list:
return {
"bot_response": "I searched WMS but couldn't find matching records.",
"status": "PARTIAL",
"context_found": False,
"suggest_incident": True,
"followup": "Do you want me to raise a ticket or try a different ID?",
"source": "LIVE_API",
"debug": {"intent": "wms_empty", "tool": tool_name},
}
# Table/summary formatting
if tool_name in ("get_inventory_holds", "get_location_status"):
rows_for_text, total_qty = [], 0
for item in data_list:
row = extract_fields(flatten_json(item), cfg["mapping"], cfg.get("formatters", {}))
rows_for_text.append("• " + "; ".join(f"{k}: {v}" for k, v in row.items()))
if tool_name == "get_inventory_holds":
try:
total_qty += int(item.get("untqty", 0) or 0)
except Exception:
pass
if tool_name == "get_inventory_holds":
msg = f"Found {len(rows_for_text)} hold records (Total Qty: {total_qty}).\n" + "\n".join(rows_for_text[:10])
show_export = True
else:
target_loc = args.get("stoloc") or ""
msg = f"Location '{target_loc}' status:\n" + ("\n".join(rows_for_text[:1]) if rows_for_text else "No status returned")
show_export = False
return {
"bot_response": msg,
"status": "OK",
"context_found": True,
"source": "LIVE_API",
"type": "table",
"data": data_list,
"show_export": show_export,
"debug": {"intent": tool_name, "warehouse_id": wh},
}
# Item: summary + one-row table
flat = flatten_json(data_list[0])
requested = args.get("fields", [])
if requested:
filtered_map = {k: v for k, v in cfg["mapping"].items() if k in requested or k == "Item"}
order = ["Item"] + [f for f in requested if f != "Item"]
else:
summary_keys = ["Item", "Description", "Item type", "Warehouse ID"]
filtered_map = {k: v for k, v in cfg["mapping"].items() if k in summary_keys}
order = summary_keys
cleaned = extract_fields(flat, filtered_map, cfg.get("formatters", {}), requested_order=order)
lines = [f"{k}: {cleaned.get(k, 'N/A')}" for k in order]
msg = "Details:\n" + "\n".join(lines)
return {
"bot_response": msg,
"status": "OK",
"context_found": True,
"source": "LIVE_API",
"type": "table",
"data": [data_list[0]],
"show_export": False,
"debug": {"intent": "get_item_data", "warehouse_id": wh},
}
except Exception as e:
print("[WMS] tool call error:", e)
return None
# =============================================================================
# Health
# =============================================================================
@app.get("/")
async def health_check():
return {"status": "ok"}
# =============================================================================
# Chat (aligns to your frontend)
# =============================================================================
@app.post("/chat")
async def chat_with_ai(input_data: ChatInput):
global LAST_ISSUE_HINT
assist_followup: Optional[str] = None
try:
msg_norm = (input_data.user_message or "").lower().strip()
# yes/no handlers (keep simple UX)
if msg_norm in ("yes", "y", "sure", "ok", "okay"):
return {
"bot_response": "Great! Tell me what you'd like to do next — check another ticket, create an incident, or describe your issue.",
"status": "OK",
"followup": "You can say: 'create ticket', 'incident status INC0012345', or describe your problem.",
"options": [],
"debug": {"intent": "continue_conversation"},
}
if msg_norm in ("no", "no thanks", "nope"):
return {
"bot_response": "No problem. Do you need assistance with any other issue?",
"status": "OK",
"end_chat": False,
"followup": None,
"options": [],
"debug": {"intent": "end_conversation"},
}
# Resolution ack (optional auto incident)
is_llm_resolved = _classify_resolution_llm(input_data.user_message)
if _has_negation_resolved(msg_norm):
is_llm_resolved = False
if (not _has_negation_resolved(msg_norm)) and (_is_resolution_ack_heuristic(msg_norm) or is_llm_resolved):
try:
issue_hint = (input_data.last_issue or "").strip() or LAST_ISSUE_HINT.strip()
short_desc, long_desc = _build_tracking_descriptions(issue_hint, input_data.user_message)
result = create_incident(short_desc, long_desc)
if isinstance(result, dict) and not result.get("error"):
inc_number = result.get("number", "<unknown>")
sys_id = result.get("sys_id")
resolved_note = ""
if sys_id:
ok = _set_incident_resolved(sys_id)
resolved_note = " (marked Resolved)" if ok else " (could not mark Resolved; please update manually)"
return {
"bot_response": f"✅ Incident created: {inc_number}{resolved_note}",
"status": "OK",
"context_found": False,
"ask_resolved": False,
"suggest_incident": False,
"followup": None,
"debug": {"intent": "resolved_ack", "auto_created": True},
}
else:
err = (result or {}).get("error", "Unknown error")
return {
"bot_response": f"⚠️ I couldn't create the tracking incident automatically ({err}).",
"status": "PARTIAL",
"suggest_incident": True,
"followup": "Shall I create a ticket now?",
"options": [{"type": "yesno", "title": "Create ticket now?"}],
"debug": {"intent": "resolved_ack_error"},
}
except Exception as e:
return {
"bot_response": f"⚠️ Something went wrong while creating the tracking incident: {safe_str(e)}",
"status": "PARTIAL",
"suggest_incident": True,
"followup": "Shall I create a ticket now?",
"options": [{"type": "yesno", "title": "Create ticket now?"}],
"debug": {"intent": "resolved_ack_exception"},
}
# Incident intent
if _is_incident_intent(msg_norm):
return {
"bot_response": "Okay, let's create a ServiceNow incident.",
"status": (input_data.prev_status or "PARTIAL"),
"context_found": False,
"ask_resolved": False,
"suggest_incident": False,
"show_incident_form": True,
"followup": None,
"debug": {"intent": "create_ticket"},
}
# Ticket status intent
status_intent = _parse_ticket_status_intent(msg_norm)
if status_intent:
if status_intent.get("ask_number"):
return {
"bot_response": (
"To check a ticket status, please share the Incident ID (e.g., INC0012345).\n\n"
"You can paste the ID here or say 'cancel'."
),
"status": "PARTIAL",
"show_status_form": True,
"debug": {"intent": "status_request_missing_id"},
}
try:
token = get_valid_token()
instance_url = os.getenv("SERVICENOW_INSTANCE_URL")
if not instance_url:
raise HTTPException(status_code=500, detail="SERVICENOW_INSTANCE_URL missing")
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
number = status_intent.get("number")
url = f"{instance_url}/api/now/table/incident?number={number}"
response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25)
data = response.json()
lst = data.get("result", [])
result = (lst or [{}])[0] if response.status_code == 200 else {}
state_code = builtins.str(result.get("state", "unknown"))
state_label = STATE_MAP.get(state_code, state_code)
short = result.get("short_description", "")
num = result.get("number", number or "unknown")
return {
"bot_response": (
f"**Ticket:** {num}\n"
f"**Status:** {state_label}\n"
f"**Issue description:** {short}"
),
"status": "OK",
"show_assist_card": True,
"followup": "Is there anything else I can assist you with?",
"debug": {"intent": "status", "http_status": response.status_code},
}
except Exception as e:
raise HTTPException(status_code=500, detail=safe_str(e))
# --- Try WMS (fast-path + function-calling) BEFORE KB ---
wms_res = _try_wms_tool(input_data.user_message)
if wms_res is not None:
return wms_res
# ---------------- Hybrid KB fallback ----------------
kb_results = hybrid_search_knowledge_base(input_data.user_message, top_k=10, alpha=0.6, beta=0.4)
documents = kb_results.get("documents", [])
metadatas = kb_results.get("metadatas", [])
distances = kb_results.get("distances", [])
combined = kb_results.get("combined_scores", [])
items: List[Dict[str, Any]] = []
for i, doc in enumerate(documents):
text = doc.strip() if isinstance(doc, str) else ""
if not text:
continue
meta = metadatas[i] if i < len(metadatas) and isinstance(metadatas[i], dict) else {}
score = distances[i] if i < len(distances) else None
comb = combined[i] if i < len(combined) else None
m = dict(meta)
if score is not None:
m["distance"] = score
if comb is not None:
m["combined"] = comb
items.append({"text": text, "meta": m})
selected = items[:max(1, 2)]
context_raw = "\n\n---\n\n".join([s["text"] for s in selected]) if selected else ""
filtered_text, filt_info = _filter_context_for_query(context_raw, input_data.user_message)
context = filtered_text
context_found = bool(context.strip())
best_distance = (min([d for d in distances if d is not None], default=None) if distances else None)
best_combined = (max([c for c in combined if c is not None], default=None) if combined else None)
detected_intent = kb_results.get("user_intent", "neutral")
best_doc = kb_results.get("best_doc")
top_meta = (metadatas or [{}])[0] if metadatas else {}
msg_low = (input_data.user_message or "").lower()
GENERIC_ERROR_TERMS = ("error", "issue", "problem", "not working", "failed", "failure")
generic_error_signal = any(t in msg_low for t in GENERIC_ERROR_TERMS)
# intent nudge for prereqs
PREREQ_TERMS = (
"pre req", "pre-requisite", "pre-requisites", "prerequisite",
"prerequisites", "pre requirement", "pre-requirements", "requirements",
)
if detected_intent == "neutral" and any(t in msg_low for t in PREREQ_TERMS):
detected_intent = "prereqs"
# permission queries force 'errors'
PERM_QUERY_TERMS = [
"permission", "permissions", "access", "access right", "authorization", "authorisation",
"role", "role access", "security", "security profile", "privilege",
"not allowed", "not authorized", "denied",
]
is_perm_query = any(t in msg_norm for t in PERM_QUERY_TERMS)
if is_perm_query:
detected_intent = "errors"
sec_title = (top_meta or {}).get("section", "") or ""
sec_title_low = sec_title.strip().lower()
PREREQ_HEADINGS = ("pre-requisites", "prerequisites", "pre requisites", "pre-requirements", "requirements")
if detected_intent == "neutral" and any(h in sec_title_low for h in PREREQ_HEADINGS):
detected_intent = "prereqs"
def _contains_any(s: str, keywords: tuple) -> bool:
low = (s or "").lower()
return any(k in low for k in keywords)
DOMAIN_TERMS = (
"trailer", "shipment", "order", "load", "wave",
"inventory", "putaway", "receiving", "appointment",
"dock", "door", "manifest", "pallet", "container",
"asn", "grn", "pick", "picking",
)
ACTION_OR_ERROR_TERMS = (
"how to", "procedure", "perform", "close", "closing", "open", "navigate", "scan",
"confirm", "generate", "update", "receive", "receiving", "error", "issue", "fail", "failed",
"not working", "locked", "mismatch", "access", "permission", "status",
)
matched_count = int(filt_info.get("matched_count") or 0)
filter_mode = (filt_info.get("mode") or "").lower()
has_any_action_or_error = _contains_any(msg_low, ACTION_OR_ERROR_TERMS)
mentions_domain = _contains_any(msg_low, DOMAIN_TERMS)
short_query = len((input_data.user_message or "").split()) <= 4
gate_combined_ok = 0.60 if short_query else 0.55
combined_ok = (best_combined is not None and best_combined >= gate_combined_ok)
weak_domain_only = (mentions_domain and not has_any_action_or_error)
low_context_hit = (matched_count < 2 and filter_mode in ("concise", "exact"))
strong_steps_bypass = True
strong_error_signal = len(_detect_error_families(msg_low)) > 0
if (weak_domain_only or (low_context_hit and not combined_ok)) \
and not strong_steps_bypass \
and not (strong_error_signal or generic_error_signal):
return {
"bot_response": _build_clarifying_message(),
"status": "NO_KB_MATCH",
"context_found": False,
"suggest_incident": True,
"followup": "Share more details (module/screen/error), or say 'create ticket'.",
"options": [{"type": "yesno", "title": "Share details or raise a ticket?"}],
"debug": {
"intent": "sop_rejected_weak_match",
"matched_count": matched_count,
"filter_mode": filter_mode,
"best_combined": best_combined,
"mentions_domain": mentions_domain,
"has_any_action_or_error": has_any_action_or_error,
},
}
# Build SOP/Errors/Prereqs context
escalation_line: Optional[str] = None
full_errors: Optional[str] = None
next_step_applied = False
next_step_info: Dict[str, Any] = {}
context_preformatted = False
steps_override_applied = False
if best_doc and detected_intent == "steps":
sec = (top_meta or {}).get("section")
full_steps = get_section_text(best_doc, sec) if sec else get_best_steps_section_text(best_doc)
if full_steps:
numbered_full = _ensure_numbering(full_steps)
next_only = _anchor_next_steps(input_data.user_message, numbered_full, max_next=6)
if next_only is not None:
if len(next_only) == 0:
context = "You are at the final step of this SOP. No further steps."
next_step_applied = True
next_step_info = {"count": 0}
context_preformatted = True
else:
context = _dedupe_lines(_ensure_numbering("\n".join(next_only)))
next_step_applied = True
next_step_info = {"count": len(next_only)}
context_preformatted = True
else:
context = numbered_full
context_preformatted = True
filt_info = {'mode': None, 'matched_count': None, 'all_sentences': None}
context_found = True
elif best_doc and detected_intent == "errors":
said_not_resolved = _has_negation_resolved(msg_norm)
if said_not_resolved:
return {
"bot_response": "Select an option below.",
"status": "OK",
"context_found": False,
"show_incident_form": False,
"show_status_form": False,
"followup": None,
"debug": {"intent": "errors_not_resolved", "best_doc": best_doc},
}
full_errors = get_best_errors_section_text(best_doc)
if full_errors:
ctx_err = _extract_errors_only(full_errors, max_lines=30)
if is_perm_query:
context = _filter_permission_lines(ctx_err, max_lines=6)
else:
mentions_domain_local = any(t in msg_low for t in DOMAIN_TERMS)
is_specific_error = (len(_detect_error_families(msg_low)) > 0) or mentions_domain_local
if is_specific_error:
context = _filter_context_for_query(ctx_err, input_data.user_message)[0]
if not context.strip():
all_lines = _normalize_lines(ctx_err)
error_bullets = [ln for ln in all_lines if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln)]
context = "\n".join(error_bullets[:6]).strip()
else:
all_lines = _normalize_lines(ctx_err)
error_bullets = [ln for ln in all_lines if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln)]
context = "\n".join(error_bullets[:6]).strip()
assist_followup = (
"Please tell me which error above matches your screen (paste the exact text), "
"or share a screenshot. I can guide you further or raise a ServiceNow ticket."
)
escalation_line = _extract_escalation_line(full_errors)
try:
steps_src = get_best_steps_section_text(best_doc)
if steps_src:
numbered_steps = _ensure_numbering(steps_src)
next_only = _anchor_next_steps(input_data.user_message, numbered_steps, max_next=6)
if next_only is not None and len(next_only) > 0:
context = _dedupe_lines(_ensure_numbering("\n".join(next_only)))
context_preformatted = True
next_step_applied = True
next_step_info = {"count": len(next_only), "source": "errors_domain_override"}
steps_override_applied = True
except Exception:
pass
elif best_doc and detected_intent == "prereqs":
full_prereqs = _find_prereq_section_text(best_doc)
if full_prereqs:
context = full_prereqs.strip()
context_found = True
# language hint & paraphrase (errors only when not overridden)
language_hint = _detect_language_hint(input_data.user_message)
lang_line = f"Respond in {language_hint}." if language_hint else "Respond in a clear, polite tone."
use_gemini = (detected_intent == "errors") and not steps_override_applied
enhanced_prompt = f"""You are a helpful support assistant. Rewrite the provided context ONLY into clear, user-friendly guidance.
- Do not add any information that is not present in the context.
- If the content is an error/access/permission note, paraphrase it into a helpful sentence users can understand.
- {lang_line}
### Context
{context}
### Question
{input_data.user_message}
### Output
Return ONLY the rewritten guidance."""
bot_text = ""
http_code = 0
if use_gemini and GEMINI_API_KEY:
try:
payload = {"contents": [{"role": "user", "parts": [{"text": enhanced_prompt}]}]}
resp = _gemini_post(payload, timeout=25)
http_code = getattr(resp, "status_code", 0)
try:
result = resp.json()
except Exception:
result = {}
try:
bot_text = result.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "")
except Exception:
bot_text = ""
except Exception:
bot_text, http_code = "", 0
# deterministic local formatting
if detected_intent == "steps":
bot_text = context if context_preformatted else _ensure_numbering(context)
elif detected_intent == "errors":
if not (bot_text or "").strip() or http_code == 429:
bot_text = context.strip()
if escalation_line:
bot_text = (bot_text or "").rstrip() + "\n\n" + escalation_line
else:
bot_text = context
# explicit escalation add if user asked
needs_escalation = (" escalate" in msg_norm) or ("escalation" in msg_norm)
if needs_escalation and best_doc:
esc_text = get_escalation_text(best_doc) or full_errors or ""
line = _extract_escalation_line(esc_text)
if line:
bot_text = (bot_text or "").rstrip() + "\n\n" + line
if not (bot_text or "").strip():
bot_text = context.strip() if (context or "").strip() else (
"I found some related guidance but couldn’t assemble a reply. "
"Share more detail (module/screen/error), or say ‘create ticket’."
)
short_query = len((input_data.user_message or "").split()) <= 4
gate_combined_ok = 0.60 if short_query else 0.55
status = "OK" if (best_combined is not None and best_combined >= gate_combined_ok) else "PARTIAL"
lower = (bot_text or "").lower()
if ("partial" in lower) or ("may be partial" in lower) or ("closest" in lower) or ("may not fully" in lower):
status = "PARTIAL"
options = [{"type": "yesno", "title": "Share details or raise a ticket?"}] if status == "PARTIAL" else []
try:
base_query = (input_data.user_message or "").strip()
if detected_intent == "steps":
LAST_ISSUE_HINT = base_query[:100]
elif detected_intent == "errors":
shown_lines = [ln.strip() for ln in (bot_text or "").splitlines() if ln.strip()]
top_error_line = ""
for ln in shown_lines:
if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln):
if "escalation" in ln.lower():
continue
top_error_line = ln
break
if top_error_line:
top_error_line = re.split(r"\bif you want to escalate\b", top_error_line, flags=re.IGNORECASE)[0].strip()
LAST_ISSUE_HINT = (f"{base_query}{top_error_line}" if top_error_line else base_query)[:100]
except Exception:
pass
return {
"bot_response": bot_text,
"status": status,
"context_found": True,
"ask_resolved": (status == "OK" and detected_intent != "errors"),
"suggest_incident": (status == "PARTIAL"),
"followup": (assist_followup if assist_followup else ("Is this helpful, or should I raise a ticket?" if status == "PARTIAL" else None)),
"options": options,
"debug": {
"used_chunks": len((context or "").split("\n\n---\n\n")) if context else 0,
"best_distance": best_distance,
"best_combined": best_combined,
"http_status": http_code,
"filter_mode": filt_info.get("mode") if isinstance(filt_info, dict) else None,
"matched_count": filt_info.get("matched_count") if isinstance(filt_info, dict) else None,
"user_intent": detected_intent,
"best_doc": best_doc,
"next_step": {"applied": next_step_applied, "info": next_step_info},
},
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=safe_str(e))
# =============================================================================
# Ticket description generation
# =============================================================================
@app.post("/generate_ticket_desc")
async def generate_ticket_desc_ep(input_data: TicketDescInput):
try:
prompt = (
f"You are helping generate ServiceNow ticket descriptions based on the issue: {input_data.issue}.\n"
"Please return the output strictly in JSON format with the following keys:\n"
"{\n"
' "ShortDescription": "A concise summary of the issue (max 100 characters)",\n'
' "DetailedDescription": "A detailed explanation of the issue"\n'
"}\n"
"Do not include any extra text, comments, or explanations outside the JSON."
)
payload = {"contents": [{"role": "user", "parts": [{"text": prompt}]}]}
resp = _gemini_post(payload, timeout=25)
try:
data = resp.json()
except Exception:
return {"ShortDescription": "", "DetailedDescription": "", "error": "Gemini returned non-JSON"}
try:
text = data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "").strip()
except Exception:
return {"ShortDescription": "", "DetailedDescription": "", "error": "Gemini parsing failed"}
if text.startswith("```"):
lines = [ln for ln in text.splitlines() if not ln.strip().startswith("```")]
text = "\n".join(lines).strip()
try:
ticket_json = json.loads(text)
return {
"ShortDescription": ticket_json.get("ShortDescription", "").strip(),
"DetailedDescription": ticket_json.get("DetailedDescription", "").strip(),
}
except Exception:
return {"ShortDescription": "", "DetailedDescription": "", "error": "Invalid JSON returned"}
except Exception as e:
raise HTTPException(status_code=500, detail=safe_str(e))
# =============================================================================
# Incident status
# =============================================================================
@app.post("/incident_status")
async def incident_status(input_data: TicketStatusInput):
try:
token = get_valid_token()
instance_url = os.getenv("SERVICENOW_INSTANCE_URL")
if not instance_url:
raise HTTPException(status_code=500, detail="SERVICENOW_INSTANCE_URL missing")
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
if input_data.sys_id:
url = f"{instance_url}/api/now/table/incident/{input_data.sys_id}"
response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25)
data = response.json()
result = data.get("result", {}) if response.status_code == 200 else {}
elif input_data.number:
url = f"{instance_url}/api/now/table/incident?number={input_data.number}"
response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25)
data = response.json()
lst = data.get("result", [])
result = (lst or [{}])[0] if response.status_code == 200 else {}
else:
raise HTTPException(status_code=400, detail="Provide IncidentID (number) or sys_id")
state_code = builtins.str(result.get("state", "unknown"))
state_label = STATE_MAP.get(state_code, state_code)
short = result.get("short_description", "")
number = result.get("number", input_data.number or "unknown")
return {
"bot_response": (
f"**Ticket:** {number} \n"
f"**Status:** {state_label} \n"
f"**Issue description:** {short}"
).replace("\n", " \n"),
"followup": "Is there anything else I can assist you with?",
"show_assist_card": True,
"persist": True,
"debug": "Incident status fetched",
}
except Exception as e:
raise HTTPException(status_code=500, detail=safe_str(e))
# =============================================================================
# Incident creation
# =============================================================================
def _classify_resolution_llm(user_message: str) -> bool:
if not GEMINI_API_KEY and "key=" not in GEMINI_URL:
return False
prompt = f"""Classify if the following user message indicates that the issue is resolved or working now.
Return only 'true' or 'false'.
Message: {user_message}"""
payload = {"contents": [{"role": "user", "parts": [{"text": prompt}]}]}
try:
resp = _gemini_post(payload, timeout=12)
data = resp.json()
text = (data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "") or "")
return "true" in text.strip().lower()
except Exception:
return False
def _set_incident_resolved(sys_id: str) -> bool:
try:
token = get_valid_token()
instance_url = os.getenv("SERVICENOW_INSTANCE_URL")
if not instance_url:
print("[SN PATCH resolve] missing SERVICENOW_INSTANCE_URL")
return False
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"Content-Type": "application/json",
}
url = f"{instance_url}/api/now/table/incident/{sys_id}"
close_code_val = os.getenv("SERVICENOW_CLOSE_CODE", "Solution provided")
close_notes_val = os.getenv("SERVICENOW_RESOLUTION_NOTES", "Issue resolved, user confirmed")
caller_sysid = os.getenv("SERVICENOW_CALLER_SYSID")
resolved_by_sysid = os.getenv("SERVICENOW_RESOLVED_BY_SYSID")
assign_group = os.getenv("SERVICENOW_ASSIGNMENT_GROUP_SYSID")
require_progress = os.getenv("SERVICENOW_REQUIRE_IN_PROGRESS_FIRST", "false").lower() in ("1", "true", "yes")
if require_progress:
try:
resp1 = requests.patch(url, headers=headers, json={"state": "2"}, verify=VERIFY_SSL, timeout=25)
print(f"[SN PATCH progress] status={resp1.status_code} body={resp1.text[:500]}")
except Exception as e:
print(f"[SN PATCH progress] exception={safe_str(e)}")
def clean(d: dict) -> dict:
return {k: v for k, v in d.items() if v is not None}
payload_A = clean({
"state": "6",
"close_code": close_code_val,
"close_notes": close_notes_val,
"caller_id": caller_sysid,
"resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
"work_notes": "Auto-resolve set by NOVA.",
"resolved_by": resolved_by_sysid,
"assignment_group": assign_group,
})
respA = requests.patch(url, headers=headers, json=payload_A, verify=VERIFY_SSL, timeout=25)
if respA.status_code in (200, 204):
return True
print(f"[SN PATCH resolve A] status={respA.status_code} body={respA.text[:500]}")
payload_B = clean({
"state": "Resolved",
"close_code": close_code_val,
"close_notes": close_notes_val,
"caller_id": caller_sysid,
"resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
"work_notes": "Auto-resolve set by NOVA.",
"resolved_by": resolved_by_sysid,
"assignment_group": assign_group,
})
respB = requests.patch(url, headers=headers, json=payload_B, verify=VERIFY_SSL, timeout=25)
if respB.status_code in (200, 204):
return True
print(f"[SN PATCH resolve B] status={respB.status_code} body={respB.text[:500]}")
code_field = os.getenv("SERVICENOW_RESOLUTION_CODE_FIELD", "close_code")
notes_field = os.getenv("SERVICENOW_RESOLUTION_NOTES_FIELD", "close_notes")
payload_C = clean({
"state": "6",
code_field: close_notes_val,
notes_field: close_notes_val,
"caller_id": caller_sysid,
"resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
"work_notes": "Auto-resolve set by NOVA.",
"resolved_by": resolved_by_sysid,
"assignment_group": assign_group,
})
respC = requests.patch(url, headers=headers, json=payload_C, verify=VERIFY_SSL, timeout=25)
if respC.status_code in (200, 204):
return True
print(f"[SN PATCH resolve C] status={respC.status_code} body={respC.text[:500]}")
return False
except Exception as e:
print(f"[SN PATCH resolve] exception={safe_str(e)}")
return False
# =============================================================================
# Helper functions used by KB logic
# =============================================================================
NUMBERING_STYLE = os.getenv("NUMBERING_STYLE", "digit").lower() # 'digit' or 'step'
DOMAIN_STATUS_TERMS = (
"shipment", "order", "load", "trailer", "wave",
"inventory", "putaway", "receiving", "appointment",
"dock", "door", "manifest", "pallet", "container",
"asn", "grn", "pick", "picking",
)
ERROR_FAMILY_SYNS = {
"NOT_FOUND": ("not found", "missing", "does not exist", "doesn't exist", "unavailable", "not available", "cannot find", "no such", "not present", "absent"),
"MISMATCH": ("mismatch", "doesn't match", "does not match", "variance", "difference", "discrepancy", "not equal"),
"LOCKED": ("locked", "status locked", "blocked", "read only", "read-only", "frozen", "freeze"),
"PERMISSION": ("permission", "permissions", "access denied", "not authorized", "not authorised", "insufficient privileges", "no access", "authorization", "authorisation"),
"TIMEOUT": ("timeout", "timed out", "network", "connection", "unable to connect", "disconnected", "no network"),
"SYNC": ("sync", "synchronization", "synchronisation", "replication", "refresh", "out of sync", "stale", "delay", "lag"),
}
def _detect_error_families(msg: str) -> list:
low = (msg or "").lower()
low_norm = re.sub(r"[^\w\s]", " ", low)
low_norm = re.sub(r"\s+", " ", low_norm).strip()
fams = []
for fam, syns in ERROR_FAMILY_SYNS.items():
if any(s in low_norm for s in syns):
fams.append(fam)
return fams
def _is_domain_status_context(msg_norm: str) -> bool:
if "status locked" in msg_norm or "locked status" in msg_norm:
return True
return any(term in msg_norm for term in DOMAIN_STATUS_TERMS)
def _normalize_lines(text: str) -> List[str]:
raw = (text or "")
try:
return [ln.strip() for ln in raw.splitlines() if ln.strip()]
except Exception:
return [raw.strip()] if raw.strip() else []
def _filter_context_for_query(context: str, query: str) -> Tuple[str, Dict[str, Any]]:
STRICT_OVERLAP = 3
MAX_SENTENCES_STRICT = 4
MAX_SENTENCES_CONCISE = 3
def _norm(text: str) -> str:
t = (text or "").lower()
t = re.sub(r"[^\w\s]", " ", t)
t = re.sub(r"\s+", " ", t).strip()
return t
def _split_sents(ctx: str) -> List[str]:
raw_sents = re.split(r"(?<=\w[.!?])\s+", ctx or "")
return [s.strip() for s in raw_sents if s and len(s.strip()) > 2]
ctx = (context or "").strip()
if not ctx or not query:
return ctx, {'mode': 'concise', 'matched_count': 0, 'all_sentences': 0}
q_norm = _norm(query)
q_terms = [t for t in q_norm.split() if len(t) > 2]
if not q_terms:
return ctx, {'mode': 'concise', 'matched_count': 0, 'all_sentences': 0}
sentences = _split_sents(ctx)
matched_exact, matched_any = [], []
for s in sentences:
s_norm = _norm(s)
is_bullet = bool(re.match(r"^[-*\u2022]\s*", s))
overlap = sum(1 for t in q_terms if t in s_norm) + (1 if is_bullet else 0)
if overlap >= STRICT_OVERLAP:
matched_exact.append(s)
elif overlap > 0:
matched_any.append(s)
if matched_exact:
kept = matched_exact[:MAX_SENTENCES_STRICT]
return _dedupe_lines("\n".join(kept).strip()), {
'mode': 'exact', 'matched_count': len(kept), 'all_sentences': len(sentences),
}
if matched_any:
kept = matched_any[:MAX_SENTENCES_CONCISE]
return _dedupe_lines("\n".join(kept).strip()), {
'mode': 'concise', 'matched_count': len(kept), 'all_sentences': len(sentences),
}
kept = sentences[:MAX_SENTENCES_CONCISE]
return _dedupe_lines("\n".join(kept).strip()), {
'mode': 'concise', 'matched_count': 0, 'all_sentences': len(sentences),
}
def _extract_errors_only(text: str, max_lines: int = 12) -> str:
kept: List[str] = []
for ln in _normalize_lines(text):
if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln):
kept.append(ln)
if len(kept) >= max_lines:
break
return "\n".join(kept).strip() if kept else (text or "").strip()
def _filter_permission_lines(text: str, max_lines: int = 6) -> str:
PERM_SYNONYMS = (
"permission", "permissions", "access", "authorization", "authorisation",
"role", "role mapping", "security profile", "not allowed", "not authorized", "denied", "insufficient",
)
kept: List[str] = []
for ln in _normalize_lines(text):
low = ln.lower()
if any(k in low for k in PERM_SYNONYMS):
kept.append(ln)
if len(kept) >= max_lines:
break
return "\n".join(kept).strip() if kept else (text or "").strip()
def _extract_escalation_line(text: str) -> Optional[str]:
if not text:
return None
lines = _normalize_lines(text)
if not lines:
return None
start_idx = None
for i, ln in enumerate(lines):
low = ln.lower()
if "escalation" in low or "escalation path" in low or "escalate" in low:
start_idx = i
break
block: List[str] = []
if start_idx is not None:
for j in range(start_idx, min(len(lines), start_idx + 6)):
if not lines[j].strip():
break
block.append(lines[j].strip())
else:
block = [ln.strip() for ln in lines if ("->" in ln or "→" in ln)]
if not block:
return None
text_block = " ".join(block)
m = re.search(r"escalation[^:]*:\s*(.+)", text_block, flags=re.IGNORECASE)
path = m.group(1).strip() if m else None
if not path:
arrow_lines = [ln for ln in block if ("->" in ln or "→" in ln)]
if arrow_lines:
path = arrow_lines[0]
if not path:
m2 = re.search(r"(operator.*?administrator.*operator.*)", text_block, flags=re.IGNORECASE)
path = m2.group(1).strip() if m2 else None
if not path:
return None
path = path.replace("->", "→").strip()
path = re.sub(r"^(?i:escalation\s*path)\s*:\s*", "", path).strip()
return f"If you want to escalate the issue, follow: {path}"
def _detect_language_hint(msg: str) -> Optional[str]:
if re.search(r"[\u0B80-\u0BFF]", msg or ""): # Tamil
return "Tamil"
if re.search(r"[\u0900-\u097F]", msg or ""): # Hindi
return "Hindi"
return None
def _build_clarifying_message() -> str:
return (
"It seems the issue isn’t resolved yet. Would you like to share a few details so I can check further, "
"or should I raise a ServiceNow ticket for you?"
)
def _build_tracking_descriptions(issue_text: str, resolved_text: str) -> Tuple[str, str]:
issue = (issue_text or "").strip()
resolved = (resolved_text or "").strip()
is_process_query = bool(re.search(r"\b(how to|steps|procedure|process)\b", issue.lower()))
if is_process_query:
cleaned_issue = re.sub(r"^\s*(process\s*steps\s*[-–:]?\s*)", "", issue, flags=re.IGNORECASE).strip()
short_desc = f"Process Steps – {cleaned_issue or issue}"[:100]
else:
short_desc = issue[:100]
long_desc = (
f'User reported: "{issue}". '
f'User confirmation: "{resolved}". '
f"Tracking record created automatically by NOVA."
).strip()
return short_desc, long_desc
def _is_incident_intent(msg_norm: str) -> bool:
intent_phrases = [
"create ticket", "create a ticket", "raise ticket", "raise a ticket", "open ticket", "open a ticket",
"create incident", "create an incident", "raise incident", "raise an incident", "open incident", "open an incident",
"log ticket", "log an incident", "generate ticket", "create snow ticket", "raise snow ticket",
"raise service now ticket", "create service now ticket", "raise sr", "open sr",
]
return any(p in msg_norm for p in intent_phrases)
def _parse_ticket_status_intent(msg_norm: str) -> Dict[str, Optional[str]]:
status_keywords = ["status", "ticket status", "incident status", "check status", "check ticket status", "check incident status"]
base_has_status = any(k in msg_norm for k in status_keywords)
has_ticket_marker = (
any(w in msg_norm for w in ("ticket", "incident", "servicenow", "snow"))
or bool(re.search(r"\binc\d{5,}\b", msg_norm, flags=re.IGNORECASE))
)
if (not base_has_status) or (base_has_status and not has_ticket_marker and _is_domain_status_context(msg_norm)):
return {}
patterns = [r"(?:incident\s*id|incidentid|ticket\s*number|number)\s*[:=]?\s*(inc\d+)", r"(inc\d+)"]
for pat in patterns:
m = re.search(pat, msg_norm, flags=re.IGNORECASE)
if m:
val = m.group(1).strip()
if val:
return {"number": val.upper() if val.lower().startswith("inc") else val}
return {"number": None, "ask_number": True}
def _is_resolution_ack_heuristic(msg_norm: str) -> bool:
phrases = [
"it is resolved", "resolved", "issue resolved", "problem resolved",
"it's working", "working now", "works now", "fixed", "sorted",
"ok now", "fine now", "all good", "all set", "thanks works", "thank you it works", "back to normal",
]
return any(p in msg_norm for p in phrases)
def _has_negation_resolved(msg_norm: str) -> bool:
neg_phrases = [
"not resolved", "issue not resolved", "still not working", "not working",
"didn't work", "doesn't work", "no change", "not fixed", "still failing", "failed again", "broken", "fail",
]
return any(p in msg_norm for p in neg_phrases)
def _ensure_numbering(text: str) -> str:
text = re.sub(r"[\u2060\u200B]", "", text or "")
lines = [ln.strip() for ln in (text or "").splitlines() if ln and ln.strip()]
if not lines:
return text or ""
para = " ".join(lines).strip()
if not para:
return ""
para_clean = re.sub(r"(?:\b\d+\s*[\.\)])\s+", "\n\n\n", para)
para_clean = re.sub(r"(?:[\u2460-\u2473]\s+)", "\n\n\n", para_clean)
para_clean = re.sub(r"(?i)\bstep\s*\d+\s*:?\s*", "\n\n\n", para_clean)
segments = [seg.strip() for seg in para_clean.split("\n\n\n") if seg.strip()]
if len(segments) < 2:
tmp = [ln.strip() for ln in para.splitlines() if ln.strip()]
segments = tmp if len(tmp) > 1 else [seg.strip() for seg in re.split(r"(?<=\w[.!?])\s+", para) if seg.strip()]
def strip_prefix_any(s: str) -> str:
return re.sub(
r"^\s*(?:" r"(?:\d+\s*[\.\)])|" r"(?i:step\s*\d+:?)|" r"(?:[-*\u2022])|" r"(?:[\u2460-\u2473])" r")\s*",
"", (s or "").strip(),
)
clean_segments = [strip_prefix_any(seg) for seg in segments if seg.strip()]
circled = {i: chr(9311 + i) for i in range(1, 21)}
out = []
for idx, seg in enumerate(clean_segments, start=1):
marker = circled.get(idx, f"{idx})")
out.append(f"{marker} {seg}")
return "\n".join(out)
def _dedupe_lines(text: str) -> str:
seen, out = set(), []
for ln in (text or "").splitlines():
key = re.sub(r"\s+", " ", (ln or "").strip().lower())
if key and key not in seen:
out.append(ln)
seen.add(key)
return "\n".join(out).strip()
def _split_sentences(block: str) -> list:
parts = [t.strip() for t in re.split(r"(?<=\w[.!?])\s+", block or "") if t.strip()]
return parts if parts else ([block.strip()] if (block or "").strip() else [])
def _similarity(a: str, b: str) -> float:
def _norm_text(s: str) -> str:
s = (s or "").lower()
s = re.sub(r"[^\w\s]", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
a_norm, b_norm = _norm_text(a), _norm_text(b)
ta, tb = set(a_norm.split()), set(b_norm.split())
inter = len(ta & tb)
union = len(ta | tb) or 1
jacc = inter / union
def _bigrams(tokens: list) -> set:
return set([" ".join(tokens[i:i + 2]) for i in range(len(tokens) - 1)]) if len(tokens) > 1 else set()
ab, bb = _bigrams(a_norm.split()), _bigrams(b_norm.split())
big_inter = len(ab & bb)
big_union = len(ab | bb) or 1
big = big_inter / big_union
char = SequenceMatcher(None, a_norm, b_norm).ratio()
return min(1.0, 0.45 * jacc + 0.30 * big + 0.35 * char)
def _extract_anchor_from_query(msg: str) -> dict:
raw = (msg or "").strip()
low = re.sub(r"[^\w\s]", " ", raw.lower()).strip()
FOLLOWUP_CUES = ("what next", "what is next", "what to do", "then", "after that", "next")
has_followup = any(cue in low for cue in FOLLOWUP_CUES)
parts = [p.strip() for p in re.split(r"[?.,;:\-\n]+", raw) if p.strip()]
if not parts:
return {"anchor": raw, "has_followup": has_followup}
last = parts[-1]
last_low = re.sub(r"[^\w\s]", " ", last.lower()).strip()
if any(cue in last_low for cue in FOLLOWUP_CUES) and len(parts) >= 2:
anchor = parts[-2]
else:
anchor = parts[-1] if len(parts) > 1 else parts[0]
return {"anchor": anchor.strip(), "has_followup": has_followup}
def _anchor_next_steps(user_message: str, numbered_text: str, max_next: int = 8) -> list | None:
steps = [ln.strip() for ln in (numbered_text or "").splitlines() if ln.strip()]
if not steps:
return None
info = _extract_anchor_from_query(user_message)
anchor = info.get("anchor", "").strip()
if not anchor:
return None
has_followup = bool(info.get("has_followup"))
candidates = []
for idx, step_line in enumerate(steps):
s_full = _similarity(anchor, step_line)
literal_hit = False
scores = [s_full]
for s in _split_sentences(step_line):
scores.append(_similarity(anchor, s))
a_flat = re.sub(r"\W+", "", re.sub(r"[^\w\s]", " ", anchor.lower()))
s_flat = re.sub(r"\W+", "", re.sub(r"[^\w\s]", " ", s.lower()))
if a_flat and (a_flat in s_flat or s_flat in a_flat):
literal_hit = True
score = max(scores)
candidates.append((idx, score, literal_hit))
candidates.sort(key=lambda t: (t[1], t[0]), reverse=True)
best_idx, best_score, best_literal = candidates[0]
tok_count = len([t for t in re.sub(r"[^\w\s]", " ", anchor.lower()).split() if len(t) > 1])
if best_literal:
accept = True
else:
base_ok = best_score >= (0.55 if not has_followup else 0.50)
len_ok = (best_score >= 0.40) and (tok_count >= 3)
accept = base_ok or len_ok
if not accept:
return None
start = best_idx + 1
if start >= len(steps):
return []
end = min(start + max_next, len(steps))
next_steps = steps[start:end]
return [ln for ln in _dedupe_lines("\n".join(next_steps)).splitlines() if ln.strip()]