Chatbot-Backend / main.py
srilakshu012456's picture
Update main.py
5eddee2 verified
raw
history blame
58.5 kB
# main_hugging_phase_recent.py
import os
import json
import re
import requests
import builtins
from typing import Optional, Any, Dict, List, Tuple
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from dotenv import load_dotenv
from datetime import datetime
# Import shared vocab from KB services
from services.kb_creation import ACTION_SYNONYMS, MODULE_VOCAB
from services.kb_creation import (
collection,
ingest_documents,
hybrid_search_knowledge_base,
get_section_text,
get_best_steps_section_text,
get_best_errors_section_text,
get_escalation_text, # for escalation heading
)
from services.login import router as login_router
from services.generate_ticket import get_valid_token, create_incident
VERIFY_SSL = os.getenv("SERVICENOW_SSL_VERIFY", "true").lower() in ("1", "true", "yes")
GEMINI_SSL_VERIFY = os.getenv("GEMINI_SSL_VERIFY", "true").lower() in ("1", "true", "yes")
def safe_str(e: Any) -> str:
try:
return builtins.str(e)
except Exception:
return "<error stringify failed>"
load_dotenv()
os.environ["POSTHOG_DISABLED"] = "true"
@asynccontextmanager
async def lifespan(app: FastAPI):
try:
folder_path = os.path.join(os.getcwd(), "documents")
if collection.count() == 0:
print("[KB] empty. Running ingestion...")
ingest_documents(folder_path)
else:
print(f"[KB] already populated with {collection.count()} entries. Skipping ingestion.")
except Exception as e:
print(f"[KB] ingestion failed: {safe_str(e)}")
yield
app = FastAPI(lifespan=lifespan)
app.include_router(login_router)
origins = ["https://chatbotnova-chatbot-frontend.hf.space"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ------------------------------ Models ------------------------------
class ChatInput(BaseModel):
user_message: str
prev_status: Optional[str] = None
last_issue: Optional[str] = None
class IncidentInput(BaseModel):
short_description: str
description: str
mark_resolved: Optional[bool] = False
class TicketDescInput(BaseModel):
issue: str
class TicketStatusInput(BaseModel):
sys_id: Optional[str] = None
number: Optional[str] = None
STATE_MAP = {
"1": "New",
"2": "In Progress",
"3": "On Hold",
"6": "Resolved",
"7": "Closed",
"8": "Canceled",
}
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
GEMINI_URL = (
f"https://generativelanguage.googleapis.com/v1beta/models/"
f"gemini-2.5-flash-lite:generateContent?key={GEMINI_API_KEY}"
)
# ------------------------------ Helpers ------------------------------
NUMBERING_STYLE = os.getenv("NUMBERING_STYLE", "digit").lower() # 'digit' or 'step'
# Domain status terms (generic WMS domain words)
DOMAIN_STATUS_TERMS = (
"shipment", "order", "load", "trailer", "wave",
"inventory", "putaway", "receiving", "appointment",
"dock", "door", "manifest", "pallet", "container",
"asn", "grn", "pick", "picking"
)
# --- Generic error families (SOP-wide, reusable in gating and line selection) ---
ERROR_FAMILY_SYNS = {
"NOT_FOUND": (
"not found", "missing", "does not exist", "doesn't exist",
"unavailable", "not available", "cannot find", "no such",
"not present", "absent"
),
"MISMATCH": (
"mismatch", "doesn't match", "does not match", "variance",
"difference", "discrepancy", "not equal"
),
"LOCKED": (
"locked", "status locked", "blocked", "read only", "read-only", "frozen", "freeze"
),
"PERMISSION": (
"permission", "permissions", "access denied", "not authorized",
"not authorised", "insufficient privileges", "no access",
"authorization", "authorisation"
),
"TIMEOUT": (
"timeout", "timed out", "network", "connection",
"unable to connect", "disconnected", "no network"
),
"SYNC": (
"sync", "synchronization", "synchronisation", "replication",
"refresh", "out of sync", "stale", "delay", "lag"
),
}
# ----- local extension so runtime filtering is precise even without re-ingest -----
# (Does NOT override your KB synonyms—just augments them at runtime.)
ACTION_SYNONYMS_EXT: Dict[str, List[str]] = {}
for k, v in ACTION_SYNONYMS.items():
ACTION_SYNONYMS_EXT[k] = list(v) # copy
# Extend with SOP phrasing (appointments often say 'updation', 'deletion', 'reschedule')
ACTION_SYNONYMS_EXT.setdefault("create", []).extend(["appointment creation", "create appointment"])
ACTION_SYNONYMS_EXT.setdefault("update", []).extend([
"updation", "reschedule", "change time", "change date", "change slot",
"update time", "update date", "update slot", "update appointment", "edit appointment"
])
ACTION_SYNONYMS_EXT.setdefault("delete", []).extend(["deletion", "delete appointment", "cancel appointment"])
def _detect_error_families(msg: str) -> list:
"""Return matching error family names found in the message (generic across SOPs)."""
low = (msg or "").lower()
low_norm = re.sub(r"[^\w\s]", " ", low)
low_norm = re.sub(r"\s+", " ", low_norm).strip()
fams = []
for fam, syns in ERROR_FAMILY_SYNS.items():
if any(s in low_norm for s in syns):
fams.append(fam)
return fams
def _is_domain_status_context(msg_norm: str) -> bool:
if "status locked" in msg_norm or "locked status" in msg_norm:
return True
return any(term in msg_norm for term in DOMAIN_STATUS_TERMS)
def _normalize_lines(text: str) -> List[str]:
raw = (text or "")
try:
return [ln.strip() for ln in raw.splitlines() if ln.strip()]
except Exception:
return [raw.strip()] if raw.strip() else []
def _ensure_numbering(text: str) -> str:
"""
Normalize raw SOP steps into a clean numbered list using circled digits.
Robust against '1.', '1)', 'Step 1:', bullets ('-', '*', '•'), and circled digits.
"""
text = re.sub(r"[\u2060\u200B]", "", text or "")
lines = [ln.strip() for ln in (text or "").splitlines() if ln and ln.strip()]
if not lines:
return text or ""
# Collapse lines into a block and then split on common step markers
para = " ".join(lines).strip()
if not para:
return ""
# Create hard breaks at typical step boundaries
para_clean = re.sub(r"(?:\b\d+[.)]\s+)", "\n\n\n", para) # 1. / 1)
para_clean = re.sub(r"(?:[\u2460-\u2473]\s+)", "\n\n\n", para_clean) # circled digits
para_clean = re.sub(r"(?i)\bstep\s*\d+\s*:\s*", "\n\n\n", para_clean) # Step 1:
segments = [seg.strip() for seg in para_clean.split("\n\n\n") if seg.strip()]
# Fallback splitting if we didn't detect separators
if len(segments) < 2:
tmp = [ln.strip() for ln in para.splitlines() if ln.strip()]
segments = tmp if len(tmp) > 1 else [seg.strip() for seg in re.split(r"(?<=[.!?])\s+|\s+;\s+", para) if seg.strip()]
# Strip any step prefixes
def strip_prefix_any(s: str) -> str:
return re.sub(
r"^\s*(?:"
r"(?:\d+\s*[.)])" # leading numbers 1., 2)
r"|(?:step\s*\d+:?)" # Step 1:
r"|(?:[-*\u2022])" # bullets
r"|(?:[\u2460-\u2473])" # circled digits
r")\s*",
"",
(s or "").strip(),
flags=re.IGNORECASE
)
clean_segments = [strip_prefix_any(seg) for seg in segments if seg.strip()]
circled = {
1: "\u2460", 2: "\u2461", 3: "\u2462", 4: "\u2463", 5: "\u2464",
6: "\u2465", 7: "\u2466", 8: "\u2467", 9: "\u2468", 10: "\u2469",
11: "\u246a", 12: "\u246b", 13: "\u246c", 14: "\u246d", 15: "\u246e",
16: "\u246f", 17: "\u2470", 18: "\u2471", 19: "\u2472", 20: "\u2473"
}
out = []
for idx, seg in enumerate(clean_segments, start=1):
marker = circled.get(idx, f"{idx})")
out.append(f"{marker} {seg}")
return "\n".join(out)
# --- Next-step helpers (generic; SOP-agnostic) ---
def _norm_text(s: str) -> str:
s = (s or "").lower()
s = re.sub(r"[^\w\s]", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
def _split_sop_into_steps(numbered_text: str) -> list:
"""
Split a numbered/bulleted SOP block (already passed through _ensure_numbering)
into atomic steps. Returns a list of raw step strings (order preserved).
Safe for circled digits, '1.' styles, and bullets.
"""
lines = [ln.strip() for ln in (numbered_text or "").splitlines() if ln.strip()]
steps = []
for ln in lines:
cleaned = re.sub(r"^\s*(?:[\u2460-\u2473]|\d+[.)]|[-*•])\s*", "", ln)
if cleaned:
steps.append(cleaned)
return steps
def _soft_match_score(a: str, b: str) -> float:
# Simple Jaccard-like score on tokens for fuzzy matching
ta = set(_norm_text(a).split())
tb = set(_norm_text(b).split())
if not ta or not tb:
return 0.0
inter = len(ta & tb)
union = len(ta | tb)
return inter / union if union else 0.0
def _detect_next_intent(user_query: str) -> bool:
q = _norm_text(user_query)
keys = [
"after", "after this", "what next", "whats next", "next step",
"then what", "following step", "continue", "subsequent", "proceed"
]
return any(k in q for k in keys)
def _resolve_next_steps(user_query: str, numbered_text: str, max_next: int = 6, min_score: float = 0.35):
"""
If 'what's next' intent is detected and we can reliably match the user's
referenced line to a SOP step, return ONLY the subsequent steps.
Else return None to preserve current behavior.
"""
if not _detect_next_intent(user_query):
return None
steps = _split_sop_into_steps(numbered_text)
if not steps:
return None
q = user_query or ""
best_idx, best_score = -1, -1.0
for idx, step in enumerate(steps):
score = 1.0 if _norm_text(step) in _norm_text(q) else _soft_match_score(q, step)
if score > best_score:
best_score, best_idx = score, idx
if best_idx < 0 or best_score < min_score:
return None # fallback to full SOP
start = best_idx + 1
if start >= len(steps):
return [] # user is at final step
end = min(start + max_next, len(steps))
return steps[start:end]
def _format_steps_as_numbered(steps: list) -> str:
"""Render a small list of steps with circled numbers for visual continuity."""
circled = {
1: "\u2460", 2: "\u2461", 3: "\u2462", 4: "\u2463", 5: "\u2464",
6: "\u2465", 7: "\u2466", 8: "\u2467", 9: "\u2468", 10: "\u2469",
11: "\u246a", 12: "\u246b", 13: "\u246c", 14: "\u246d", 15: "\u246e",
16: "\u246f", 17: "\u2470", 18: "\u2471", 19: "\u2472", 20: "\u2473"
}
out = []
for i, s in enumerate(steps, start=1):
out.append(f"{circled.get(i, str(i))} {s}")
return "\n".join(out)
def _filter_error_lines_by_query(text: str, query: str, max_lines: int = 1) -> str:
"""
Pick the most relevant 'Common Errors & Resolution' bullet(s) for the user's message.
Generic (SOP-agnostic) scoring:
1) error-family match (NOT_FOUND/MISMATCH/LOCKED/PERMISSION/TIMEOUT/SYNC),
2) anchored starts (line begins with error heading),
3) multi-word overlap (bigrams/trigrams),
4) token overlap,
5) formatting bonus for bullets/headings.
Returns exactly `max_lines` best-scoring lines (defaults to 1).
"""
def _norm(s: str) -> str:
s = (s or "").lower()
s = re.sub(r"[^\w\s]", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
def _ngrams(tokens: List[str], n: int) -> List[str]:
return [" ".join(tokens[i:i + n]) for i in range(len(tokens) - n + 1)]
def _families_for(s: str) -> set:
low = _norm(s)
fams = set()
for fam, syns in ERROR_FAMILY_SYNS.items():
if any(k in low for k in syns):
fams.add(fam)
return fams
q = _norm(query)
q_tokens = [t for t in q.split() if len(t) > 1]
q_bi = _ngrams(q_tokens, 2)
q_tri = _ngrams(q_tokens, 3)
q_fams = _families_for(query)
lines = _normalize_lines(text)
if not lines:
return (text or "").strip()
scored: List[Tuple[float, str]] = []
for ln in lines:
ln_norm = _norm(ln)
ln_fams = _families_for(ln)
fam_overlap = len(q_fams & ln_fams)
anchored = 0.0
first2 = " ".join(q_tokens[:2]) if len(q_tokens) >= 2 else ""
first3 = " ".join(q_tokens[:3]) if len(q_tokens) >= 3 else ""
if (first3 and ln_norm.startswith(first3)) or (first2 and ln_norm.startswith(first2)):
anchored = 1.0
bigram_hits = sum(1 for bg in q_bi if bg and bg in ln_norm)
trigram_hits = sum(1 for tg in q_tri if tg and tg in ln_norm)
token_overlap = sum(1 for t in q_tokens if t and t in ln_norm)
exact_phrase = 1.0 if (q and q in ln_norm) else 0.0
score = (
1.70 * fam_overlap +
1.00 * anchored +
0.80 * trigram_hits +
0.55 * bigram_hits +
0.40 * exact_phrase +
0.30 * token_overlap
)
if re.match(r"^\s*[-*\u2022]\s*", ln): # bullet
score += 0.10
heading = ln_norm.split(":")[0].strip()
if heading and (heading in q or (first2 and first2 in heading)):
score += 0.15
scored.append((score, ln))
scored.sort(key=lambda x: x[0], reverse=True)
top = [ln for s, ln in scored[:max_lines] if s > 0.0]
if not top:
top = lines[:max_lines]
return "\n".join(top).strip()
def _friendly_permission_reply(raw: str) -> str:
line = (raw or "").strip()
line = re.sub(r"^\s*[-*\u2022]\s*", "", line)
if not line:
return "It looks like you may not have access for this action. Please verify your WMS role/permission with your supervisor or IT."
if "verify role access" in line.lower():
return "It looks like you may not have access for this action. Please verify your WMS role/permission with your supervisor or IT."
if ("permission" in line.lower()) or ("access" in line.lower()) or ("authorization" in line.lower()):
return f"It seems to be an access issue: {line}. Please check your role mapping or request access."
return line
def _detect_language_hint(msg: str) -> Optional[str]:
if re.search(r"[\u0B80-\u0BFF]", msg or ""): # Tamil
return "Tamil"
if re.search(r"[\u0900-\u097F]", msg or ""): # Hindi
return "Hindi"
return None
def _build_clarifying_message() -> str:
return (
"It seems the issue isn’t resolved yet. Would you like to share a few details so I can check further, "
"or should I raise a ServiceNow ticket for you?"
)
def _build_tracking_descriptions(issue_text: str, resolved_text: str) -> Tuple[str, str]:
issue = (issue_text or "").strip()
resolved = (resolved_text or "").strip()
short_desc = issue[:100] if issue else (resolved[:100] or "Issue resolved (user confirmation)")
long_desc = (
f"User reported: \"{issue}\". "
f"User confirmation: \"{resolved}\". "
f"Tracking record created automatically by NOVA."
).strip()
return short_desc, long_desc
def _is_incident_intent(msg_norm: str) -> bool:
intent_phrases = [
"create ticket", "create a ticket", "raise ticket", "raise a ticket", "open ticket", "open a ticket",
"create incident", "create an incident", "raise incident", "raise an incident", "open incident", "open an incident",
"log ticket", "log an incident", "generate ticket", "create snow ticket", "raise snow ticket",
"raise service now ticket", "create service now ticket", "raise sr", "open sr",
]
return any(p in msg_norm for p in intent_phrases)
def _parse_ticket_status_intent(msg_norm: str) -> Dict[str, Optional[str]]:
status_keywords = ["status", "ticket status", "incident status", "check status", "check ticket status", "check incident status"]
base_has_status = any(k in msg_norm for k in status_keywords)
has_ticket_marker = (
any(w in msg_norm for w in ("ticket", "incident", "servicenow", "snow")) or
bool(re.search(r"\binc\d{5,}\b", msg_norm, flags=re.IGNORECASE))
)
# Disambiguation: if it's a domain status query and not clearly ticket/incident, do NOT route to ticket-status.
if (not base_has_status) or (base_has_status and not has_ticket_marker and _is_domain_status_context(msg_norm)):
return {}
patterns = [
r"(?:incident\s*id|incidentid|ticket\s*number|number)\s*[:=]?\s*(inc\d+)",
r"(inc\d+)"
]
for pat in patterns:
m = re.search(pat, msg_norm, flags=re.IGNORECASE)
if m:
val = m.group(1).strip()
if val:
return {"number": val.upper() if val.lower().startswith("inc") else val}
return {"number": None, "ask_number": True}
def _is_resolution_ack_heuristic(msg_norm: str) -> bool:
phrases = [
"it is resolved", "resolved", "issue resolved", "problem resolved",
"it's working", "working now", "works now", "fixed", "sorted",
"ok now", "fine now", "all good", "all set", "thanks works", "thank you it works", "back to normal",
]
return any(p in msg_norm for p in phrases)
def _has_negation_resolved(msg_norm: str) -> bool:
neg_phrases = [
"not resolved", "issue not resolved", "still not working", "not working",
"didn't work", "doesn't work", "no change", "not fixed", "still failing", "failed again", "broken", "fail",
]
return any(p in msg_norm for p in neg_phrases)
def _filter_context_for_query(context: str, query: str) -> Tuple[str, Dict[str, Any]]:
STRICT_OVERLAP = 3
MAX_SENTENCES_STRICT = 4
MAX_SENTENCES_CONCISE = 3
def _norm(text: str) -> str:
t = (text or "").lower()
t = re.sub(r"[^\w\s]", " ", t)
t = re.sub(r"\s+", " ", t).strip()
return t
def _split_sentences(ctx: str) -> List[str]:
raw_sents = re.split(r"(?<=[.!?])\s+|\n+|-\s*|\*\s*", ctx or "")
return [s.strip() for s in raw_sents if s and len(s.strip()) > 2]
ctx = (context or "").strip()
if not ctx or not query:
return ctx, {'mode': 'concise', 'matched_count': 0, 'all_sentences': 0}
q_norm = _norm(query)
q_terms = [t for t in q_norm.split() if len(t) > 2]
if not q_terms:
return ctx, {'mode': 'concise', 'matched_count': 0, 'all_sentences': 0}
sentences = _split_sentences(ctx)
matched_exact, matched_any = [], []
for s in sentences:
s_norm = _norm(s)
is_bullet = bool(re.match(r"^[\-\*]\s*", s))
overlap = sum(1 for t in q_terms if t in s_norm) + (1 if is_bullet else 0)
if overlap >= STRICT_OVERLAP:
matched_exact.append(s)
elif overlap > 0:
matched_any.append(s)
if matched_exact:
kept = matched_exact[:MAX_SENTENCES_STRICT]
return "\n".join(kept).strip(), {'mode': 'exact', 'matched_count': len(kept), 'all_sentences': len(sentences)}
if matched_any:
kept = matched_any[:MAX_SENTENCES_CONCISE]
return "\n".join(kept).strip(), {'mode': 'concise', 'matched_count': len(kept), 'all_sentences': len(sentences)}
kept = sentences[:MAX_SENTENCES_CONCISE]
return "\n".join(kept).strip(), {'mode': 'concise', 'matched_count': 0, 'all_sentences': len(sentences)}
def _extract_errors_only(text: str, max_lines: int = 12) -> str:
"""
Collect error bullets/heading-style lines from the SOP errors section.
Generic: keeps bullet points (•, -, *), and lines that look like "Heading: details".
This ensures items like 'ASN not found', 'Trailer status locked', etc., are preserved.
"""
kept: List[str] = []
for ln in _normalize_lines(text):
if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln):
kept.append(ln)
if len(kept) >= max_lines:
break
return "\n".join(kept).strip() if kept else (text or "").strip()
def _filter_permission_lines(text: str, max_lines: int = 6) -> str:
PERM_SYNONYMS = (
"permission", "permissions", "access", "authorization", "authorisation",
"role", "role mapping", "security profile", "not allowed", "not authorized", "denied", "insufficient"
)
kept: List[str] = []
for ln in _normalize_lines(text):
low = ln.lower()
if any(k in low for k in PERM_SYNONYMS):
kept.append(ln)
if len(kept) >= max_lines:
break
return "\n".join(kept).strip() if kept else (text or "").strip()
def _extract_escalation_line(text: str) -> Optional[str]:
if not text:
return None
lines = _normalize_lines(text)
if not lines:
return None
start_idx = None
for i, ln in enumerate(lines):
low = ln.lower()
if "escalation" in low or "escalation path" in low or "escalate" in low:
start_idx = i
break
block = []
if start_idx is not None:
for j in range(start_idx, min(len(lines), start_idx + 6)):
if not lines[j].strip():
break
block.append(lines[j].strip())
else:
block = [ln.strip() for ln in lines if ("->" in ln or "→" in ln)]
if not block:
return None
text_block = " ".join(block)
m = re.search(r"escalation[^:]*:\s*(.+)", text_block, flags=re.IGNORECASE)
path = m.group(1).strip() if m else None
if not path:
arrow_lines = [ln for ln in block if ("->" in ln or "→" in ln)]
if arrow_lines:
path = arrow_lines[0]
if not path:
m2 = re.search(r"(operator.*?administrator|operator.*)", text_block, flags=re.IGNORECASE)
path = m2.group(1).strip() if m2 else None
if not path:
return None
path = path.replace("->", "→").strip()
path = re.sub(r"^(?i:escalation\s*path)\s*:\s*", "", path).strip()
return f"If you want to escalate the issue, follow: {path}"
def _classify_resolution_llm(user_message: str) -> bool:
if not GEMINI_API_KEY:
return False
prompt = f"""Classify if the following user message indicates that the issue is resolved or working now.
Return only 'true' or 'false'.
Message: {user_message}"""
headers = {"Content-Type": "application/json"}
payload = {"contents": [{"parts": [{"text": prompt}]}]}
try:
resp = requests.post(GEMINI_URL, headers=headers, json=payload, timeout=12, verify=GEMINI_SSL_VERIFY)
data = resp.json()
text = (
data.get("candidates", [{}])[0]
.get("content", {})
.get("parts", [{}])[0]
.get("text", "")
)
return "true" in (text or "").strip().lower()
except Exception:
return False
def _set_incident_resolved(sys_id: str) -> bool:
try:
token = get_valid_token()
instance_url = os.getenv("SERVICENOW_INSTANCE_URL")
if not instance_url:
print("[SN PATCH resolve] missing SERVICENOW_INSTANCE_URL")
return False
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"Content-Type": "application/json",
}
url = f"{instance_url}/api/now/table/incident/{sys_id}"
close_code_val = os.getenv("SERVICENOW_CLOSE_CODE", "Solution provided")
close_notes_val = os.getenv("SERVICENOW_RESOLUTION_NOTES", "Issue resolved, user confirmed")
caller_sysid = os.getenv("SERVICENOW_CALLER_SYSID")
resolved_by_sysid = os.getenv("SERVICENOW_RESOLVED_BY_SYSID")
assign_group = os.getenv("SERVICENOW_ASSIGNMENT_GROUP_SYSID")
require_progress = os.getenv("SERVICENOW_REQUIRE_IN_PROGRESS_FIRST", "false").lower() in ("1", "true", "yes")
if require_progress:
try:
resp1 = requests.patch(url, headers=headers, json={"state": "2"}, verify=VERIFY_SSL, timeout=25)
print(f"[SN PATCH progress] status={resp1.status_code} body={resp1.text[:500]}")
except Exception as e:
print(f"[SN PATCH progress] exception={safe_str(e)}")
def clean(d: dict) -> dict:
return {k: v for k, v in d.items() if v is not None}
payload_A = clean({
"state": "6",
"close_code": close_code_val,
"close_notes": close_notes_val,
"caller_id": caller_sysid,
"resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
"work_notes": "Auto-resolve set by NOVA.",
"resolved_by": resolved_by_sysid,
"assignment_group": assign_group,
})
respA = requests.patch(url, headers=headers, json=payload_A, verify=VERIFY_SSL, timeout=25)
if respA.status_code in (200, 204):
return True
print(f"[SN PATCH resolve A] status={respA.status_code} body={respA.text[:500]}")
payload_B = clean({
"state": "Resolved",
"close_code": close_code_val,
"close_notes": close_notes_val,
"caller_id": caller_sysid,
"resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
"work_notes": "Auto-resolve set by NOVA.",
"resolved_by": resolved_by_sysid,
"assignment_group": assign_group,
})
respB = requests.patch(url, headers=headers, json=payload_B, verify=VERIFY_SSL, timeout=25)
if respB.status_code in (200, 204):
return True
print(f"[SN PATCH resolve B] status={respB.status_code} body={respB.text[:500]}")
code_field = os.getenv("SERVICENOW_RESOLUTION_CODE_FIELD", "close_code")
notes_field = os.getenv("SERVICENOW_RESOLUTION_NOTES_FIELD", "close_notes")
payload_C = clean({
"state": "6",
code_field: close_code_val,
notes_field: close_notes_val,
"caller_id": caller_sysid,
"resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
"work_notes": "Auto-resolve set by NOVA.",
"resolved_by": resolved_by_sysid,
"assignment_group": assign_group,
})
respC = requests.patch(url, headers=headers, json=payload_C, verify=VERIFY_SSL, timeout=25)
if respC.status_code in (200, 204):
return True
print(f"[SN PATCH resolve C] status={respC.status_code} body={respC.text[:500]}")
return False
except Exception as e:
print(f"[SN PATCH resolve] exception={safe_str(e)}")
return False
# ------------------------------ Prereq helper ------------------------------
def _find_prereq_section_text(best_doc: str) -> str:
"""
Return the prerequisites section text, trying common heading variants.
Generic for future SOPs—no document-specific keywords.
"""
variants = [
"Pre-Requisites",
"Prerequisites",
"Pre Requisites",
"Pre-Requirements",
"Requirements",
]
for title in variants:
txt = get_section_text(best_doc, title)
if txt and txt.strip():
return txt.strip()
return ""
# ------------------------------ Health ------------------------------
@app.get("/")
async def health_check():
return {"status": "ok"}
# ------------------------------ Chat ------------------------------
@app.post("/chat")
async def chat_with_ai(input_data: ChatInput):
assist_followup: Optional[str] = None
try:
msg_norm = (input_data.user_message or "").lower().strip()
# Yes/No handlers
if msg_norm in ("yes", "y", "sure", "ok", "okay"):
return {
"bot_response": "Great! Tell me what you'd like to do next — check another ticket, create an incident, or describe your issue.",
"status": "OK",
"followup": "You can say: 'create ticket', 'incident status INC0012345', or describe your problem.",
"options": [],
"debug": {"intent": "continue_conversation"},
}
if msg_norm in ("no", "no thanks", "nope"):
return {
"bot_response": "No problem. Do you need assistance with any other issue?",
"status": "OK",
"end_chat": False,
"followup": None,
"options": [],
"debug": {"intent": "end_conversation"},
}
# Resolution ack (auto incident + mark Resolved)
is_llm_resolved = _classify_resolution_llm(input_data.user_message)
if _has_negation_resolved(msg_norm):
is_llm_resolved = False
if (not _has_negation_resolved(msg_norm)) and (_is_resolution_ack_heuristic(msg_norm) or is_llm_resolved):
try:
short_desc, long_desc = _build_tracking_descriptions(input_data.last_issue, input_data.user_message)
result = create_incident(short_desc, long_desc)
if isinstance(result, dict) and not result.get("error"):
inc_number = result.get("number", "<unknown>")
sys_id = result.get("sys_id")
resolved_note = ""
if sys_id:
ok = _set_incident_resolved(sys_id)
resolved_note = " (marked Resolved)" if ok else " (could not mark Resolved; please update manually)"
return {
"bot_response": f"✅ Incident created: {inc_number}{resolved_note}",
"status": "OK",
"context_found": False,
"ask_resolved": False,
"suggest_incident": False,
"followup": None,
"top_hits": [],
"sources": [],
"auto_incident": True,
"debug": {"intent": "resolved_ack", "auto_created": True},
}
else:
err = (result or {}).get("error", "Unknown error")
return {
"bot_response": f"⚠️ I couldn't create the tracking incident automatically ({err}).",
"status": "PARTIAL",
"context_found": False,
"ask_resolved": False,
"suggest_incident": True,
"followup": "Shall I create a ticket now?",
"options": [{"type": "yesno", "title": "Create ticket now?"}],
"top_hits": [],
"sources": [],
"debug": {"intent": "resolved_ack", "auto_created": False, "error": err},
}
except Exception as e:
return {
"bot_response": f"⚠️ Something went wrong while creating the tracking incident: {safe_str(e)}",
"status": "PARTIAL",
"context_found": False,
"ask_resolved": False,
"suggest_incident": True,
"followup": "Shall I create a ticket now?",
"options": [{"type": "yesno", "title": "Create ticket now?"}],
"top_hits": [],
"sources": [],
"debug": {"intent": "resolved_ack", "exception": True},
}
# Incident intent
if _is_incident_intent(msg_norm):
return {
"bot_response": (
"Okay, let's create a ServiceNow incident.\n\n"
"Please provide:\n- Short Description (one line)\n"
"- Detailed Description (steps, error text, IDs, site, environment)"
),
"status": (input_data.prev_status or "PARTIAL"),
"context_found": False,
"ask_resolved": False,
"suggest_incident": False,
"show_incident_form": True,
"followup": None,
"top_hits": [],
"sources": [],
"debug": {"intent": "create_ticket"},
}
# Status intent (ticket/incident) — disambiguated
status_intent = _parse_ticket_status_intent(msg_norm)
if status_intent:
if status_intent.get("ask_number"):
return {
"bot_response": (
"To check a ticket status, please share the Incident ID (e.g., INC0012345).\n\n"
"You can paste the ID here or say 'cancel'."
),
"status": "PARTIAL",
"context_found": False,
"ask_resolved": False,
"suggest_incident": False,
"followup": None,
"show_status_form": True,
"top_hits": [],
"sources": [],
"debug": {"intent": "status_request_missing_id"},
}
try:
token = get_valid_token()
instance_url = os.getenv("SERVICENOW_INSTANCE_URL")
if not instance_url:
raise HTTPException(status_code=500, detail="SERVICENOW_INSTANCE_URL missing")
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
number = status_intent.get("number")
url = f"{instance_url}/api/now/table/incident?number={number}"
response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25)
data = response.json()
lst = data.get("result", [])
result = (lst or [{}])[0] if response.status_code == 200 else {}
state_code = builtins.str(result.get("state", "unknown"))
state_label = STATE_MAP.get(state_code, state_code)
short = result.get("short_description", "")
num = result.get("number", number or "unknown")
return {
"bot_response": (
f"**Ticket:** {num}\n"
f"**Status:** {state_label}\n"
f"**Issue description:** {short}"
),
"status": "OK",
"show_assist_card": True,
"context_found": False,
"ask_resolved": False,
"suggest_incident": False,
"followup": "Is there anything else I can assist you with?",
"top_hits": [],
"sources": [],
"debug": {"intent": "status", "http_status": response.status_code},
}
except Exception as e:
raise HTTPException(status_code=500, detail=safe_str(e))
# Hybrid KB search
kb_results = hybrid_search_knowledge_base(input_data.user_message, top_k=10, alpha=0.6, beta=0.4)
documents = kb_results.get("documents", [])
metadatas = kb_results.get("metadatas", [])
distances = kb_results.get("distances", [])
combined = kb_results.get("combined_scores", [])
items: List[Dict[str, Any]] = []
for i, doc in enumerate(documents):
text = doc.strip() if isinstance(doc, str) else ""
if not text:
continue
meta = metadatas[i] if i < len(metadatas) and isinstance(metadatas[i], dict) else {}
score = distances[i] if i < len(distances) else None
comb = combined[i] if i < len(combined) else None
m = dict(meta)
if score is not None:
m["distance"] = score
if comb is not None:
m["combined"] = comb
items.append({"text": text, "meta": m})
selected = items[:max(1, 2)]
context_raw = "\n\n---\n\n".join([s["text"] for s in selected]) if selected else ""
filtered_text, filt_info = _filter_context_for_query(context_raw, input_data.user_message)
context = filtered_text
context_found = bool(context.strip())
best_distance = min([d for d in distances if d is not None], default=None) if distances else None
best_combined = max([c for c in combined if c is not None], default=None) if combined else None
detected_intent = kb_results.get("user_intent", "neutral")
best_doc = kb_results.get("best_doc")
top_meta = (metadatas or [{}])[0] if metadatas else {}
msg_low = (input_data.user_message or "").lower()
GENERIC_ERROR_TERMS = ("error", "issue", "problem", "not working", "failed", "failure")
generic_error_signal = any(t in msg_low for t in GENERIC_ERROR_TERMS)
# Query-based prereq nudge
PREREQ_TERMS = (
"pre req", "pre-requisite", "pre-requisites", "prerequisite",
"prerequisites", "pre requirement", "pre-requirements", "requirements"
)
if detected_intent == "neutral" and any(t in msg_low for t in PREREQ_TERMS):
detected_intent = "prereqs"
# Force errors intent for permissions
PERM_QUERY_TERMS = [
"permission", "permissions", "access", "access right", "authorization", "authorisation",
"role", "role access", "security", "security profile", "privilege", "not allowed", "not authorized", "denied",
]
is_perm_query = any(t in msg_norm for t in PERM_QUERY_TERMS)
if is_perm_query:
detected_intent = "errors"
# Heading-aware prereq nudge
sec_title = ((top_meta or {}).get("section") or "").strip().lower()
PREREQ_HEADINGS = (
"pre-requisites", "prerequisites", "pre requisites",
"pre-requirements", "requirements"
)
if detected_intent == "neutral" and any(h in sec_title for h in PREREQ_HEADINGS):
detected_intent = "prereqs"
# --- Module-aware steps nudge (appointments, picking, shipping, etc.) ---
STEPS_TERMS = ("how to", "procedure", "perform", "steps", "do", "navigate")
ACTIONS_PRESENT = any(s in msg_low for syns in ACTION_SYNONYMS_EXT.values() for s in syns)
mod_tags = ((top_meta or {}).get("module_tags") or "").lower()
MODULE_TOKENS = tuple({term for syns in MODULE_VOCAB.values() for term in syns})
looks_like_module = (
any(t in msg_low for t in MODULE_TOKENS)
or any(m in mod_tags for m in ["appointments", "receiving", "picking", "putaway", "shipping", "inventory", "replenishment"])
or any(m in sec_title for m in ["appointment", "appointments", "schedule", "dock", "door", "picking", "putaway", "shipping", "inventory"])
)
looks_like_steps_query = any(t in msg_low for t in STEPS_TERMS) or ACTIONS_PRESENT
if detected_intent in ("neutral", "prereqs") and looks_like_steps_query and looks_like_module:
detected_intent = "steps"
# --- Meaning-aware SOP gating ---
def _contains_any(s: str, keywords: tuple) -> bool:
low = (s or "").lower()
return any(k in low for k in keywords)
DOMAIN_TERMS = (
"trailer", "shipment", "order", "load", "wave",
"inventory", "putaway", "receiving", "appointment",
"dock", "door", "manifest", "pallet", "container",
"asn", "grn", "pick", "picking"
)
ACTION_OR_ERROR_TERMS = (
"how to", "procedure", "perform", # added
"close", "closing", "open", "navigate", "scan", "confirm", "generate", "update",
"receive", "receiving", # added
"error", "issue", "fail", "failed", "not working", "locked", "mismatch",
"access", "permission", "status"
)
matched_count = int(filt_info.get("matched_count") or 0)
filter_mode = (filt_info.get("mode") or "").lower()
has_any_action_or_error = _contains_any(msg_low, ACTION_OR_ERROR_TERMS)
mentions_domain = _contains_any(msg_low, DOMAIN_TERMS)
short_query = len((input_data.user_message or "").split()) <= 4
gate_combined_ok = 0.60 if short_query else 0.55
combined_ok = (best_combined is not None and best_combined >= gate_combined_ok)
weak_domain_only = (mentions_domain and not has_any_action_or_error)
low_context_hit = (matched_count < 2 and filter_mode in ("concise", "exact"))
strong_steps_bypass = looks_like_steps_query and looks_like_module
strong_error_signal = len(_detect_error_families(msg_low)) > 0
if (weak_domain_only or (low_context_hit and not combined_ok)) \
and not strong_steps_bypass \
and not (strong_error_signal or generic_error_signal):
return {
"bot_response": _build_clarifying_message(),
"status": "NO_KB_MATCH",
"context_found": False,
"ask_resolved": False,
"suggest_incident": True,
"followup": "Share more details (module/screen/error), or say 'create ticket'.",
"options": [{"type": "yesno", "title": "Share details or raise a ticket?"}],
"top_hits": [],
"sources": [],
"debug": {
"intent": "sop_rejected_weak_match",
"matched_count": matched_count,
"filter_mode": filter_mode,
"best_combined": best_combined,
"mentions_domain": mentions_domain,
"has_any_action_or_error": has_any_action_or_error,
"strong_steps_bypass": strong_steps_bypass,
"strong_error_signal": strong_error_signal,
"generic_error_signal": generic_error_signal
},
}
# Build SOP context if allowed
if is_perm_query:
detected_intent = "errors"
escalation_line = None # SOP escalation candidate
full_errors = None # keep for possible escalation extraction
next_step_applied = False
next_step_info: Dict[str, Any] = {}
# Helper: detect asked action from query using extended synonyms
def _detect_action_from_query(q: str) -> Optional[str]:
qlow = (q or "").lower()
for act, syns in ACTION_SYNONYMS_EXT.items():
if any(s in qlow for s in syns):
return act
return None
# Helper: filter lines in a steps context to keep only the asked action
def _filter_steps_by_action(raw_context: str, asked_act: Optional[str]) -> str:
if not asked_act or not raw_context.strip():
return raw_context
other_terms: List[str] = []
for act, syns in ACTION_SYNONYMS_EXT.items():
if act != asked_act:
other_terms.extend(syns)
lines = _normalize_lines(raw_context)
kept = [ln for ln in lines if not any(t in ln.lower() for t in other_terms)]
return "\n".join(kept).strip() if kept else raw_context
if best_doc:
if detected_intent == "steps":
full_steps = get_best_steps_section_text(best_doc)
if not full_steps:
sec = (top_meta or {}).get("section")
if sec:
full_steps = get_section_text(best_doc, sec)
if full_steps:
# Apply action-focused filtering FIRST (to avoid mixing create/update/delete)
asked_action = _detect_action_from_query(input_data.user_message)
full_steps = _filter_steps_by_action(full_steps, asked_action)
# Use numbered form only for matching; keep raw for full output
numbered_full = _ensure_numbering(full_steps)
next_only = _resolve_next_steps(input_data.user_message, numbered_full, max_next=6, min_score=0.35)
if next_only is not None:
# "what's next" mode
if len(next_only) == 0:
context = "You are at the final step of this SOP. No further steps."
next_step_applied = True
next_step_info = {"count": 0}
context_preformatted = True
else:
context = _format_steps_as_numbered(next_only)
next_step_applied = True
next_step_info = {"count": len(next_only)}
context_preformatted = True
else:
# Normal mode: return the full SOP section (raw), and we'll number it below once.
context = full_steps
context_preformatted = False
elif detected_intent == "errors":
full_errors = get_best_errors_section_text(best_doc)
if full_errors:
ctx_err = _extract_errors_only(full_errors, max_lines=30)
if is_perm_query:
context = _filter_permission_lines(ctx_err, max_lines=6)
else:
# Decide specific vs generic:
is_specific_error = len(_detect_error_families(msg_low)) > 0
if is_specific_error:
context = _filter_error_lines_by_query(ctx_err, input_data.user_message, max_lines=1)
else:
all_lines: List[str] = _normalize_lines(ctx_err)
error_bullets = [ln for ln in all_lines if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln)]
context = "\n".join(error_bullets[:8]).strip()
assist_followup = (
"Please tell me which error above matches your screen (paste the exact text), "
"or share a screenshot. I can guide you further or raise a ServiceNow ticket."
)
escalation_line = _extract_escalation_line(full_errors)
else:
# Fallback when Errors section is missing: show steps
full_steps = get_best_steps_section_text(best_doc) or get_section_text(best_doc, sec_title or "")
if full_steps:
# Apply action filter to keep responses tight
asked_action = _detect_action_from_query(input_data.user_message)
full_steps = _filter_steps_by_action(full_steps, asked_action)
context = full_steps
detected_intent = "steps"
context_preformatted = False
elif detected_intent == "prereqs":
full_prereqs = _find_prereq_section_text(best_doc)
if full_prereqs:
context = full_prereqs.strip()
else:
# Fallback when Prereqs section is missing: show steps
full_steps = get_best_steps_section_text(best_doc) or get_section_text(best_doc, sec_title or "")
if full_steps:
asked_action = _detect_action_from_query(input_data.user_message)
full_steps = _filter_steps_by_action(full_steps, asked_action)
context = full_steps
detected_intent = "steps"
context_preformatted = False
language_hint = _detect_language_hint(input_data.user_message)
lang_line = f"Respond in {language_hint}." if language_hint else "Respond in a clear, polite tone."
use_gemini = (detected_intent == "errors")
enhanced_prompt = f"""You are a helpful support assistant. Rewrite the provided context ONLY into clear, user-friendly guidance.
- Do not add any information that is not present in the context.
- If the content is an error/access/permission note, paraphrase it into a helpful sentence users can understand.
- {lang_line}
### Context
{context}
### Question
{input_data.user_message}
### Output
Return ONLY the rewritten guidance."""
headers = {"Content-Type": "application/json"}
payload = {"contents": [{"parts": [{"text": enhanced_prompt}]}]}
bot_text = ""
http_code = 0
if use_gemini and GEMINI_API_KEY:
try:
resp = requests.post(GEMINI_URL, headers=headers, json=payload, timeout=25, verify=GEMINI_SSL_VERIFY)
http_code = getattr(resp, "status_code", 0)
try:
result = resp.json()
except Exception:
result = {}
try:
bot_text = result.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "")
except Exception:
bot_text = ""
except Exception:
bot_text, http_code = "", 0
# Deterministic local formatting
if detected_intent == "steps":
# If we trimmed to next steps, 'context' is already formatted (or a sentence).
# Only number when returning full SOP raw text.
if ('context_preformatted' in locals()) and context_preformatted:
bot_text = context
else:
bot_text = _ensure_numbering(context)
elif detected_intent == "errors":
if not bot_text.strip() or http_code == 429:
bot_text = context.strip()
if escalation_line:
bot_text = (bot_text or "").rstrip() + "\n\n" + escalation_line
else:
bot_text = context
# If the user explicitly asks to escalate, append escalation even in 'steps' intent
needs_escalation = (" escalate" in msg_norm) or ("escalation" in msg_norm)
if needs_escalation and best_doc:
esc_text = get_escalation_text(best_doc)
if not esc_text and full_errors:
esc_text = full_errors
line = _extract_escalation_line(esc_text or "")
if line:
bot_text = (bot_text or "").rstrip() + "\n\n" + line
# Guarantee non-empty bot response
if not (bot_text or "").strip():
if context.strip():
bot_text = context.strip()
else:
bot_text = (
"I found some related guidance but couldn’t assemble a reply. "
"Share a bit more detail (module/screen/error), or say ‘create ticket’."
)
short_query = len((input_data.user_message or "").split()) <= 4
gate_combined_ok = 0.60 if short_query else 0.55
status = "OK" if (best_combined is not None and best_combined >= gate_combined_ok) else "PARTIAL"
lower = (bot_text or "").lower()
if ("partial" in lower) or ("may be partial" in lower) or ("closest" in lower) or ("may not fully" in lower):
status = "PARTIAL"
options = [{"type": "yesno", "title": "Share details or raise a ticket?"}] if status == "PARTIAL" else []
return {
"bot_response": bot_text,
"status": status,
"context_found": True,
"ask_resolved": (status == "OK" and detected_intent != "errors"),
"suggest_incident": (status == "PARTIAL"),
"followup": (assist_followup if assist_followup else ("Is this helpful, or should I raise a ticket?" if status == "PARTIAL" else None)),
"options": options,
"top_hits": [],
"sources": [],
"debug": {
"used_chunks": len((context or "").split("\n\n---\n\n")) if context else 0,
"best_distance": best_distance,
"best_combined": best_combined,
"http_status": http_code,
"filter_mode": filt_info.get("mode"),
"matched_count": filt_info.get("matched_count"),
"user_intent": detected_intent,
"best_doc": best_doc,
"next_step": {
"applied": next_step_applied,
"info": next_step_info,
},
},
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=safe_str(e))
# ------------------------------ Ticket description generation ------------------------------
@app.post("/generate_ticket_desc")
async def generate_ticket_desc_ep(input_data: TicketDescInput):
try:
prompt = (
f"You are helping generate ServiceNow ticket descriptions based on the issue: {input_data.issue}.\n"
"Please return the output strictly in JSON format with the following keys:\n"
"{\n"
' "ShortDescription": "A concise summary of the issue (max 100 characters)",\n'
' "DetailedDescription": "A detailed explanation of the issue"\n'
"}\n"
"Do not include any extra text, comments, or explanations outside the JSON."
)
headers = {"Content-Type": "application/json"}
payload = {"contents": [{"parts": [{"text": prompt}]}]}
resp = requests.post(GEMINI_URL, headers=headers, json=payload, timeout=25, verify=GEMINI_SSL_VERIFY)
try:
data = resp.json()
except Exception:
return {"ShortDescription": "", "DetailedDescription": "", "error": "Gemini returned non-JSON"}
try:
text = data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "").strip()
except Exception:
return {"ShortDescription": "", "DetailedDescription": "", "error": "Gemini parsing failed"}
if text.startswith("```"):
lines = [ln for ln in text.splitlines() if not ln.strip().startswith("```")]
text = "\n".join(lines).strip()
try:
ticket_json = json.loads(text)
return {
"ShortDescription": ticket_json.get("ShortDescription", "").strip(),
"DetailedDescription": ticket_json.get("DetailedDescription", "").strip(),
}
except Exception:
return {"ShortDescription": "", "DetailedDescription": "", "error": "Invalid JSON returned"}
except Exception as e:
raise HTTPException(status_code=500, detail=safe_str(e))
# ------------------------------ Incident status ------------------------------
@app.post("/incident_status")
async def incident_status(input_data: TicketStatusInput):
try:
token = get_valid_token()
instance_url = os.getenv("SERVICENOW_INSTANCE_URL")
if not instance_url:
raise HTTPException(status_code=500, detail="SERVICENOW_INSTANCE_URL missing")
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
if input_data.sys_id:
url = f"{instance_url}/api/now/table/incident/{input_data.sys_id}"
response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25)
data = response.json()
result = data.get("result", {}) if response.status_code == 200 else {}
elif input_data.number:
url = f"{instance_url}/api/now/table/incident?number={input_data.number}"
response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25)
data = response.json()
lst = data.get("result", [])
result = (lst or [{}])[0] if response.status_code == 200 else {}
else:
raise HTTPException(status_code=400, detail="Provide IncidentID (number) or sys_id")
state_code = builtins.str(result.get("state", "unknown"))
state_label = STATE_MAP.get(state_code, state_code)
short = result.get("short_description", "")
number = result.get("number", input_data.number or "unknown")
return {
"bot_response": (
f"**Ticket:** {number} \n"
f"**Status:** {state_label} \n"
f"**Issue description:** {short}"
).replace("\n", " \n"),
"followup": "Is there anything else I can assist you with?",
"show_assist_card": True,
"persist": True,
"debug": "Incident status fetched",
}
except Exception as e:
raise HTTPException(status_code=500, detail=safe_str(e))
# ------------------------------ Incident ------------------------------
@app.post("/incident")
async def raise_incident(input_data: IncidentInput):
try:
result = create_incident(input_data.short_description, input_data.description)
if isinstance(result, dict) and not result.get("error"):
inc_number = result.get("number", "<unknown>")
sys_id = result.get("sys_id")
resolved_note = ""
if bool(input_data.mark_resolved) and sys_id not in ("<unknown>", None):
ok = _set_incident_resolved(sys_id)
resolved_note = " (marked Resolved)" if ok else " (could not mark Resolved; please update manually)"
ticket_text = f"Incident created: {inc_number}{resolved_note}" if inc_number else "Incident created."
return {
"bot_response": f"✅ {ticket_text}",
"debug": "Incident created via ServiceNow",
"persist": True,
"show_assist_card": True,
"followup": "Is there anything else I can assist you with?",
}
else:
raise HTTPException(status_code=500, detail=(result or {}).get("error", "Unknown error"))
except Exception as e:
raise HTTPException(status_code=500, detail=safe_str(e))