Spaces:
Sleeping
Sleeping
| # main_hugging_phase_recent.py | |
| import os | |
| import json | |
| import re | |
| import requests | |
| import builtins | |
| from typing import Optional, Any, Dict, List, Tuple | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from dotenv import load_dotenv | |
| from datetime import datetime | |
| # Import shared vocab from KB services | |
| from services.kb_creation import ACTION_SYNONYMS, MODULE_VOCAB | |
| from services.kb_creation import ( | |
| collection, | |
| ingest_documents, | |
| hybrid_search_knowledge_base, | |
| get_section_text, | |
| get_best_steps_section_text, | |
| get_best_errors_section_text, | |
| get_escalation_text, # for escalation heading | |
| ) | |
| from services.login import router as login_router | |
| from services.generate_ticket import get_valid_token, create_incident | |
| VERIFY_SSL = os.getenv("SERVICENOW_SSL_VERIFY", "true").lower() in ("1", "true", "yes") | |
| GEMINI_SSL_VERIFY = os.getenv("GEMINI_SSL_VERIFY", "true").lower() in ("1", "true", "yes") | |
| def safe_str(e: Any) -> str: | |
| try: | |
| return builtins.str(e) | |
| except Exception: | |
| return "<error stringify failed>" | |
| load_dotenv() | |
| os.environ["POSTHOG_DISABLED"] = "true" | |
| async def lifespan(app: FastAPI): | |
| try: | |
| folder_path = os.path.join(os.getcwd(), "documents") | |
| if collection.count() == 0: | |
| print("[KB] empty. Running ingestion...") | |
| ingest_documents(folder_path) | |
| else: | |
| print(f"[KB] already populated with {collection.count()} entries. Skipping ingestion.") | |
| except Exception as e: | |
| print(f"[KB] ingestion failed: {safe_str(e)}") | |
| yield | |
| app = FastAPI(lifespan=lifespan) | |
| app.include_router(login_router) | |
| origins = ["https://chatbotnova-chatbot-frontend.hf.space"] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ------------------------------ Models ------------------------------ | |
| class ChatInput(BaseModel): | |
| user_message: str | |
| prev_status: Optional[str] = None | |
| last_issue: Optional[str] = None | |
| class IncidentInput(BaseModel): | |
| short_description: str | |
| description: str | |
| mark_resolved: Optional[bool] = False | |
| class TicketDescInput(BaseModel): | |
| issue: str | |
| class TicketStatusInput(BaseModel): | |
| sys_id: Optional[str] = None | |
| number: Optional[str] = None | |
| STATE_MAP = { | |
| "1": "New", | |
| "2": "In Progress", | |
| "3": "On Hold", | |
| "6": "Resolved", | |
| "7": "Closed", | |
| "8": "Canceled", | |
| } | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| GEMINI_URL = ( | |
| f"https://generativelanguage.googleapis.com/v1beta/models/" | |
| f"gemini-2.5-flash-lite:generateContent?key={GEMINI_API_KEY}" | |
| ) | |
| # ------------------------------ Helpers ------------------------------ | |
| NUMBERING_STYLE = os.getenv("NUMBERING_STYLE", "digit").lower() # 'digit' or 'step' | |
| # Domain status terms (generic WMS domain words) | |
| DOMAIN_STATUS_TERMS = ( | |
| "shipment", "order", "load", "trailer", "wave", | |
| "inventory", "putaway", "receiving", "appointment", | |
| "dock", "door", "manifest", "pallet", "container", | |
| "asn", "grn", "pick", "picking" | |
| ) | |
| # --- Generic error families (SOP-wide, reusable in gating and line selection) --- | |
| ERROR_FAMILY_SYNS = { | |
| "NOT_FOUND": ( | |
| "not found", "missing", "does not exist", "doesn't exist", | |
| "unavailable", "not available", "cannot find", "no such", | |
| "not present", "absent" | |
| ), | |
| "MISMATCH": ( | |
| "mismatch", "doesn't match", "does not match", "variance", | |
| "difference", "discrepancy", "not equal" | |
| ), | |
| "LOCKED": ( | |
| "locked", "status locked", "blocked", "read only", "read-only", "frozen", "freeze" | |
| ), | |
| "PERMISSION": ( | |
| "permission", "permissions", "access denied", "not authorized", | |
| "not authorised", "insufficient privileges", "no access", | |
| "authorization", "authorisation" | |
| ), | |
| "TIMEOUT": ( | |
| "timeout", "timed out", "network", "connection", | |
| "unable to connect", "disconnected", "no network" | |
| ), | |
| "SYNC": ( | |
| "sync", "synchronization", "synchronisation", "replication", | |
| "refresh", "out of sync", "stale", "delay", "lag" | |
| ), | |
| } | |
| # ----- local extension so runtime filtering is precise even without re-ingest ----- | |
| # (Does NOT override your KB synonyms—just augments them at runtime.) | |
| ACTION_SYNONYMS_EXT: Dict[str, List[str]] = {} | |
| for k, v in ACTION_SYNONYMS.items(): | |
| ACTION_SYNONYMS_EXT[k] = list(v) # copy | |
| # Extend with SOP phrasing (appointments often say 'updation', 'deletion', 'reschedule') | |
| ACTION_SYNONYMS_EXT.setdefault("create", []).extend(["appointment creation", "create appointment"]) | |
| ACTION_SYNONYMS_EXT.setdefault("update", []).extend([ | |
| "updation", "reschedule", "change time", "change date", "change slot", | |
| "update time", "update date", "update slot", "update appointment", "edit appointment" | |
| ]) | |
| ACTION_SYNONYMS_EXT.setdefault("delete", []).extend(["deletion", "delete appointment", "cancel appointment"]) | |
| def _detect_error_families(msg: str) -> list: | |
| """Return matching error family names found in the message (generic across SOPs).""" | |
| low = (msg or "").lower() | |
| low_norm = re.sub(r"[^\w\s]", " ", low) | |
| low_norm = re.sub(r"\s+", " ", low_norm).strip() | |
| fams = [] | |
| for fam, syns in ERROR_FAMILY_SYNS.items(): | |
| if any(s in low_norm for s in syns): | |
| fams.append(fam) | |
| return fams | |
| def _is_domain_status_context(msg_norm: str) -> bool: | |
| if "status locked" in msg_norm or "locked status" in msg_norm: | |
| return True | |
| return any(term in msg_norm for term in DOMAIN_STATUS_TERMS) | |
| def _normalize_lines(text: str) -> List[str]: | |
| raw = (text or "") | |
| try: | |
| return [ln.strip() for ln in raw.splitlines() if ln.strip()] | |
| except Exception: | |
| return [raw.strip()] if raw.strip() else [] | |
| def _ensure_numbering(text: str) -> str: | |
| """ | |
| Normalize raw SOP steps into a clean numbered list using circled digits. | |
| Robust against '1.', '1)', 'Step 1:', bullets ('-', '*', '•'), and circled digits. | |
| """ | |
| text = re.sub(r"[\u2060\u200B]", "", text or "") | |
| lines = [ln.strip() for ln in (text or "").splitlines() if ln and ln.strip()] | |
| if not lines: | |
| return text or "" | |
| # Collapse lines into a block and then split on common step markers | |
| para = " ".join(lines).strip() | |
| if not para: | |
| return "" | |
| # Create hard breaks at typical step boundaries | |
| para_clean = re.sub(r"(?:\b\d+[.)]\s+)", "\n\n\n", para) # 1. / 1) | |
| para_clean = re.sub(r"(?:[\u2460-\u2473]\s+)", "\n\n\n", para_clean) # circled digits | |
| para_clean = re.sub(r"(?i)\bstep\s*\d+\s*:\s*", "\n\n\n", para_clean) # Step 1: | |
| segments = [seg.strip() for seg in para_clean.split("\n\n\n") if seg.strip()] | |
| # Fallback splitting if we didn't detect separators | |
| if len(segments) < 2: | |
| tmp = [ln.strip() for ln in para.splitlines() if ln.strip()] | |
| segments = tmp if len(tmp) > 1 else [seg.strip() for seg in re.split(r"(?<=[.!?])\s+|\s+;\s+", para) if seg.strip()] | |
| # Strip any step prefixes | |
| def strip_prefix_any(s: str) -> str: | |
| return re.sub( | |
| r"^\s*(?:" | |
| r"(?:\d+\s*[.)])" # leading numbers 1., 2) | |
| r"|(?:step\s*\d+:?)" # Step 1: | |
| r"|(?:[-*\u2022])" # bullets | |
| r"|(?:[\u2460-\u2473])" # circled digits | |
| r")\s*", | |
| "", | |
| (s or "").strip(), | |
| flags=re.IGNORECASE | |
| ) | |
| clean_segments = [strip_prefix_any(seg) for seg in segments if seg.strip()] | |
| circled = { | |
| 1: "\u2460", 2: "\u2461", 3: "\u2462", 4: "\u2463", 5: "\u2464", | |
| 6: "\u2465", 7: "\u2466", 8: "\u2467", 9: "\u2468", 10: "\u2469", | |
| 11: "\u246a", 12: "\u246b", 13: "\u246c", 14: "\u246d", 15: "\u246e", | |
| 16: "\u246f", 17: "\u2470", 18: "\u2471", 19: "\u2472", 20: "\u2473" | |
| } | |
| out = [] | |
| for idx, seg in enumerate(clean_segments, start=1): | |
| marker = circled.get(idx, f"{idx})") | |
| out.append(f"{marker} {seg}") | |
| return "\n".join(out) | |
| # --- Next-step helpers (generic; SOP-agnostic) --- | |
| def _norm_text(s: str) -> str: | |
| s = (s or "").lower() | |
| s = re.sub(r"[^\w\s]", " ", s) | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def _split_sop_into_steps(numbered_text: str) -> list: | |
| """ | |
| Split a numbered/bulleted SOP block (already passed through _ensure_numbering) | |
| into atomic steps. Returns a list of raw step strings (order preserved). | |
| Safe for circled digits, '1.' styles, and bullets. | |
| """ | |
| lines = [ln.strip() for ln in (numbered_text or "").splitlines() if ln.strip()] | |
| steps = [] | |
| for ln in lines: | |
| cleaned = re.sub(r"^\s*(?:[\u2460-\u2473]|\d+[.)]|[-*•])\s*", "", ln) | |
| if cleaned: | |
| steps.append(cleaned) | |
| return steps | |
| def _soft_match_score(a: str, b: str) -> float: | |
| # Simple Jaccard-like score on tokens for fuzzy matching | |
| ta = set(_norm_text(a).split()) | |
| tb = set(_norm_text(b).split()) | |
| if not ta or not tb: | |
| return 0.0 | |
| inter = len(ta & tb) | |
| union = len(ta | tb) | |
| return inter / union if union else 0.0 | |
| def _detect_next_intent(user_query: str) -> bool: | |
| q = _norm_text(user_query) | |
| keys = [ | |
| "after", "after this", "what next", "whats next", "next step", | |
| "then what", "following step", "continue", "subsequent", "proceed" | |
| ] | |
| return any(k in q for k in keys) | |
| def _resolve_next_steps(user_query: str, numbered_text: str, max_next: int = 6, min_score: float = 0.35): | |
| """ | |
| If 'what's next' intent is detected and we can reliably match the user's | |
| referenced line to a SOP step, return ONLY the subsequent steps. | |
| Else return None to preserve current behavior. | |
| """ | |
| if not _detect_next_intent(user_query): | |
| return None | |
| steps = _split_sop_into_steps(numbered_text) | |
| if not steps: | |
| return None | |
| q = user_query or "" | |
| best_idx, best_score = -1, -1.0 | |
| for idx, step in enumerate(steps): | |
| score = 1.0 if _norm_text(step) in _norm_text(q) else _soft_match_score(q, step) | |
| if score > best_score: | |
| best_score, best_idx = score, idx | |
| if best_idx < 0 or best_score < min_score: | |
| return None # fallback to full SOP | |
| start = best_idx + 1 | |
| if start >= len(steps): | |
| return [] # user is at final step | |
| end = min(start + max_next, len(steps)) | |
| return steps[start:end] | |
| def _format_steps_as_numbered(steps: list) -> str: | |
| """Render a small list of steps with circled numbers for visual continuity.""" | |
| circled = { | |
| 1: "\u2460", 2: "\u2461", 3: "\u2462", 4: "\u2463", 5: "\u2464", | |
| 6: "\u2465", 7: "\u2466", 8: "\u2467", 9: "\u2468", 10: "\u2469", | |
| 11: "\u246a", 12: "\u246b", 13: "\u246c", 14: "\u246d", 15: "\u246e", | |
| 16: "\u246f", 17: "\u2470", 18: "\u2471", 19: "\u2472", 20: "\u2473" | |
| } | |
| out = [] | |
| for i, s in enumerate(steps, start=1): | |
| out.append(f"{circled.get(i, str(i))} {s}") | |
| return "\n".join(out) | |
| def _filter_error_lines_by_query(text: str, query: str, max_lines: int = 1) -> str: | |
| """ | |
| Pick the most relevant 'Common Errors & Resolution' bullet(s) for the user's message. | |
| Generic (SOP-agnostic) scoring: | |
| 1) error-family match (NOT_FOUND/MISMATCH/LOCKED/PERMISSION/TIMEOUT/SYNC), | |
| 2) anchored starts (line begins with error heading), | |
| 3) multi-word overlap (bigrams/trigrams), | |
| 4) token overlap, | |
| 5) formatting bonus for bullets/headings. | |
| Returns exactly `max_lines` best-scoring lines (defaults to 1). | |
| """ | |
| def _norm(s: str) -> str: | |
| s = (s or "").lower() | |
| s = re.sub(r"[^\w\s]", " ", s) | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def _ngrams(tokens: List[str], n: int) -> List[str]: | |
| return [" ".join(tokens[i:i + n]) for i in range(len(tokens) - n + 1)] | |
| def _families_for(s: str) -> set: | |
| low = _norm(s) | |
| fams = set() | |
| for fam, syns in ERROR_FAMILY_SYNS.items(): | |
| if any(k in low for k in syns): | |
| fams.add(fam) | |
| return fams | |
| q = _norm(query) | |
| q_tokens = [t for t in q.split() if len(t) > 1] | |
| q_bi = _ngrams(q_tokens, 2) | |
| q_tri = _ngrams(q_tokens, 3) | |
| q_fams = _families_for(query) | |
| lines = _normalize_lines(text) | |
| if not lines: | |
| return (text or "").strip() | |
| scored: List[Tuple[float, str]] = [] | |
| for ln in lines: | |
| ln_norm = _norm(ln) | |
| ln_fams = _families_for(ln) | |
| fam_overlap = len(q_fams & ln_fams) | |
| anchored = 0.0 | |
| first2 = " ".join(q_tokens[:2]) if len(q_tokens) >= 2 else "" | |
| first3 = " ".join(q_tokens[:3]) if len(q_tokens) >= 3 else "" | |
| if (first3 and ln_norm.startswith(first3)) or (first2 and ln_norm.startswith(first2)): | |
| anchored = 1.0 | |
| bigram_hits = sum(1 for bg in q_bi if bg and bg in ln_norm) | |
| trigram_hits = sum(1 for tg in q_tri if tg and tg in ln_norm) | |
| token_overlap = sum(1 for t in q_tokens if t and t in ln_norm) | |
| exact_phrase = 1.0 if (q and q in ln_norm) else 0.0 | |
| score = ( | |
| 1.70 * fam_overlap + | |
| 1.00 * anchored + | |
| 0.80 * trigram_hits + | |
| 0.55 * bigram_hits + | |
| 0.40 * exact_phrase + | |
| 0.30 * token_overlap | |
| ) | |
| if re.match(r"^\s*[-*\u2022]\s*", ln): # bullet | |
| score += 0.10 | |
| heading = ln_norm.split(":")[0].strip() | |
| if heading and (heading in q or (first2 and first2 in heading)): | |
| score += 0.15 | |
| scored.append((score, ln)) | |
| scored.sort(key=lambda x: x[0], reverse=True) | |
| top = [ln for s, ln in scored[:max_lines] if s > 0.0] | |
| if not top: | |
| top = lines[:max_lines] | |
| return "\n".join(top).strip() | |
| def _friendly_permission_reply(raw: str) -> str: | |
| line = (raw or "").strip() | |
| line = re.sub(r"^\s*[-*\u2022]\s*", "", line) | |
| if not line: | |
| return "It looks like you may not have access for this action. Please verify your WMS role/permission with your supervisor or IT." | |
| if "verify role access" in line.lower(): | |
| return "It looks like you may not have access for this action. Please verify your WMS role/permission with your supervisor or IT." | |
| if ("permission" in line.lower()) or ("access" in line.lower()) or ("authorization" in line.lower()): | |
| return f"It seems to be an access issue: {line}. Please check your role mapping or request access." | |
| return line | |
| def _detect_language_hint(msg: str) -> Optional[str]: | |
| if re.search(r"[\u0B80-\u0BFF]", msg or ""): # Tamil | |
| return "Tamil" | |
| if re.search(r"[\u0900-\u097F]", msg or ""): # Hindi | |
| return "Hindi" | |
| return None | |
| def _build_clarifying_message() -> str: | |
| return ( | |
| "It seems the issue isn’t resolved yet. Would you like to share a few details so I can check further, " | |
| "or should I raise a ServiceNow ticket for you?" | |
| ) | |
| def _build_tracking_descriptions(issue_text: str, resolved_text: str) -> Tuple[str, str]: | |
| issue = (issue_text or "").strip() | |
| resolved = (resolved_text or "").strip() | |
| short_desc = issue[:100] if issue else (resolved[:100] or "Issue resolved (user confirmation)") | |
| long_desc = ( | |
| f"User reported: \"{issue}\". " | |
| f"User confirmation: \"{resolved}\". " | |
| f"Tracking record created automatically by NOVA." | |
| ).strip() | |
| return short_desc, long_desc | |
| def _is_incident_intent(msg_norm: str) -> bool: | |
| intent_phrases = [ | |
| "create ticket", "create a ticket", "raise ticket", "raise a ticket", "open ticket", "open a ticket", | |
| "create incident", "create an incident", "raise incident", "raise an incident", "open incident", "open an incident", | |
| "log ticket", "log an incident", "generate ticket", "create snow ticket", "raise snow ticket", | |
| "raise service now ticket", "create service now ticket", "raise sr", "open sr", | |
| ] | |
| return any(p in msg_norm for p in intent_phrases) | |
| def _parse_ticket_status_intent(msg_norm: str) -> Dict[str, Optional[str]]: | |
| status_keywords = ["status", "ticket status", "incident status", "check status", "check ticket status", "check incident status"] | |
| base_has_status = any(k in msg_norm for k in status_keywords) | |
| has_ticket_marker = ( | |
| any(w in msg_norm for w in ("ticket", "incident", "servicenow", "snow")) or | |
| bool(re.search(r"\binc\d{5,}\b", msg_norm, flags=re.IGNORECASE)) | |
| ) | |
| # Disambiguation: if it's a domain status query and not clearly ticket/incident, do NOT route to ticket-status. | |
| if (not base_has_status) or (base_has_status and not has_ticket_marker and _is_domain_status_context(msg_norm)): | |
| return {} | |
| patterns = [ | |
| r"(?:incident\s*id|incidentid|ticket\s*number|number)\s*[:=]?\s*(inc\d+)", | |
| r"(inc\d+)" | |
| ] | |
| for pat in patterns: | |
| m = re.search(pat, msg_norm, flags=re.IGNORECASE) | |
| if m: | |
| val = m.group(1).strip() | |
| if val: | |
| return {"number": val.upper() if val.lower().startswith("inc") else val} | |
| return {"number": None, "ask_number": True} | |
| def _is_resolution_ack_heuristic(msg_norm: str) -> bool: | |
| phrases = [ | |
| "it is resolved", "resolved", "issue resolved", "problem resolved", | |
| "it's working", "working now", "works now", "fixed", "sorted", | |
| "ok now", "fine now", "all good", "all set", "thanks works", "thank you it works", "back to normal", | |
| ] | |
| return any(p in msg_norm for p in phrases) | |
| def _has_negation_resolved(msg_norm: str) -> bool: | |
| neg_phrases = [ | |
| "not resolved", "issue not resolved", "still not working", "not working", | |
| "didn't work", "doesn't work", "no change", "not fixed", "still failing", "failed again", "broken", "fail", | |
| ] | |
| return any(p in msg_norm for p in neg_phrases) | |
| def _filter_context_for_query(context: str, query: str) -> Tuple[str, Dict[str, Any]]: | |
| STRICT_OVERLAP = 3 | |
| MAX_SENTENCES_STRICT = 4 | |
| MAX_SENTENCES_CONCISE = 3 | |
| def _norm(text: str) -> str: | |
| t = (text or "").lower() | |
| t = re.sub(r"[^\w\s]", " ", t) | |
| t = re.sub(r"\s+", " ", t).strip() | |
| return t | |
| def _split_sentences(ctx: str) -> List[str]: | |
| raw_sents = re.split(r"(?<=[.!?])\s+|\n+|-\s*|\*\s*", ctx or "") | |
| return [s.strip() for s in raw_sents if s and len(s.strip()) > 2] | |
| ctx = (context or "").strip() | |
| if not ctx or not query: | |
| return ctx, {'mode': 'concise', 'matched_count': 0, 'all_sentences': 0} | |
| q_norm = _norm(query) | |
| q_terms = [t for t in q_norm.split() if len(t) > 2] | |
| if not q_terms: | |
| return ctx, {'mode': 'concise', 'matched_count': 0, 'all_sentences': 0} | |
| sentences = _split_sentences(ctx) | |
| matched_exact, matched_any = [], [] | |
| for s in sentences: | |
| s_norm = _norm(s) | |
| is_bullet = bool(re.match(r"^[\-\*]\s*", s)) | |
| overlap = sum(1 for t in q_terms if t in s_norm) + (1 if is_bullet else 0) | |
| if overlap >= STRICT_OVERLAP: | |
| matched_exact.append(s) | |
| elif overlap > 0: | |
| matched_any.append(s) | |
| if matched_exact: | |
| kept = matched_exact[:MAX_SENTENCES_STRICT] | |
| return "\n".join(kept).strip(), {'mode': 'exact', 'matched_count': len(kept), 'all_sentences': len(sentences)} | |
| if matched_any: | |
| kept = matched_any[:MAX_SENTENCES_CONCISE] | |
| return "\n".join(kept).strip(), {'mode': 'concise', 'matched_count': len(kept), 'all_sentences': len(sentences)} | |
| kept = sentences[:MAX_SENTENCES_CONCISE] | |
| return "\n".join(kept).strip(), {'mode': 'concise', 'matched_count': 0, 'all_sentences': len(sentences)} | |
| def _extract_errors_only(text: str, max_lines: int = 12) -> str: | |
| """ | |
| Collect error bullets/heading-style lines from the SOP errors section. | |
| Generic: keeps bullet points (•, -, *), and lines that look like "Heading: details". | |
| This ensures items like 'ASN not found', 'Trailer status locked', etc., are preserved. | |
| """ | |
| kept: List[str] = [] | |
| for ln in _normalize_lines(text): | |
| if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln): | |
| kept.append(ln) | |
| if len(kept) >= max_lines: | |
| break | |
| return "\n".join(kept).strip() if kept else (text or "").strip() | |
| def _filter_permission_lines(text: str, max_lines: int = 6) -> str: | |
| PERM_SYNONYMS = ( | |
| "permission", "permissions", "access", "authorization", "authorisation", | |
| "role", "role mapping", "security profile", "not allowed", "not authorized", "denied", "insufficient" | |
| ) | |
| kept: List[str] = [] | |
| for ln in _normalize_lines(text): | |
| low = ln.lower() | |
| if any(k in low for k in PERM_SYNONYMS): | |
| kept.append(ln) | |
| if len(kept) >= max_lines: | |
| break | |
| return "\n".join(kept).strip() if kept else (text or "").strip() | |
| def _extract_escalation_line(text: str) -> Optional[str]: | |
| if not text: | |
| return None | |
| lines = _normalize_lines(text) | |
| if not lines: | |
| return None | |
| start_idx = None | |
| for i, ln in enumerate(lines): | |
| low = ln.lower() | |
| if "escalation" in low or "escalation path" in low or "escalate" in low: | |
| start_idx = i | |
| break | |
| block = [] | |
| if start_idx is not None: | |
| for j in range(start_idx, min(len(lines), start_idx + 6)): | |
| if not lines[j].strip(): | |
| break | |
| block.append(lines[j].strip()) | |
| else: | |
| block = [ln.strip() for ln in lines if ("->" in ln or "→" in ln)] | |
| if not block: | |
| return None | |
| text_block = " ".join(block) | |
| m = re.search(r"escalation[^:]*:\s*(.+)", text_block, flags=re.IGNORECASE) | |
| path = m.group(1).strip() if m else None | |
| if not path: | |
| arrow_lines = [ln for ln in block if ("->" in ln or "→" in ln)] | |
| if arrow_lines: | |
| path = arrow_lines[0] | |
| if not path: | |
| m2 = re.search(r"(operator.*?administrator|operator.*)", text_block, flags=re.IGNORECASE) | |
| path = m2.group(1).strip() if m2 else None | |
| if not path: | |
| return None | |
| path = path.replace("->", "→").strip() | |
| path = re.sub(r"^(?i:escalation\s*path)\s*:\s*", "", path).strip() | |
| return f"If you want to escalate the issue, follow: {path}" | |
| def _classify_resolution_llm(user_message: str) -> bool: | |
| if not GEMINI_API_KEY: | |
| return False | |
| prompt = f"""Classify if the following user message indicates that the issue is resolved or working now. | |
| Return only 'true' or 'false'. | |
| Message: {user_message}""" | |
| headers = {"Content-Type": "application/json"} | |
| payload = {"contents": [{"parts": [{"text": prompt}]}]} | |
| try: | |
| resp = requests.post(GEMINI_URL, headers=headers, json=payload, timeout=12, verify=GEMINI_SSL_VERIFY) | |
| data = resp.json() | |
| text = ( | |
| data.get("candidates", [{}])[0] | |
| .get("content", {}) | |
| .get("parts", [{}])[0] | |
| .get("text", "") | |
| ) | |
| return "true" in (text or "").strip().lower() | |
| except Exception: | |
| return False | |
| def _set_incident_resolved(sys_id: str) -> bool: | |
| try: | |
| token = get_valid_token() | |
| instance_url = os.getenv("SERVICENOW_INSTANCE_URL") | |
| if not instance_url: | |
| print("[SN PATCH resolve] missing SERVICENOW_INSTANCE_URL") | |
| return False | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Accept": "application/json", | |
| "Content-Type": "application/json", | |
| } | |
| url = f"{instance_url}/api/now/table/incident/{sys_id}" | |
| close_code_val = os.getenv("SERVICENOW_CLOSE_CODE", "Solution provided") | |
| close_notes_val = os.getenv("SERVICENOW_RESOLUTION_NOTES", "Issue resolved, user confirmed") | |
| caller_sysid = os.getenv("SERVICENOW_CALLER_SYSID") | |
| resolved_by_sysid = os.getenv("SERVICENOW_RESOLVED_BY_SYSID") | |
| assign_group = os.getenv("SERVICENOW_ASSIGNMENT_GROUP_SYSID") | |
| require_progress = os.getenv("SERVICENOW_REQUIRE_IN_PROGRESS_FIRST", "false").lower() in ("1", "true", "yes") | |
| if require_progress: | |
| try: | |
| resp1 = requests.patch(url, headers=headers, json={"state": "2"}, verify=VERIFY_SSL, timeout=25) | |
| print(f"[SN PATCH progress] status={resp1.status_code} body={resp1.text[:500]}") | |
| except Exception as e: | |
| print(f"[SN PATCH progress] exception={safe_str(e)}") | |
| def clean(d: dict) -> dict: | |
| return {k: v for k, v in d.items() if v is not None} | |
| payload_A = clean({ | |
| "state": "6", | |
| "close_code": close_code_val, | |
| "close_notes": close_notes_val, | |
| "caller_id": caller_sysid, | |
| "resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), | |
| "work_notes": "Auto-resolve set by NOVA.", | |
| "resolved_by": resolved_by_sysid, | |
| "assignment_group": assign_group, | |
| }) | |
| respA = requests.patch(url, headers=headers, json=payload_A, verify=VERIFY_SSL, timeout=25) | |
| if respA.status_code in (200, 204): | |
| return True | |
| print(f"[SN PATCH resolve A] status={respA.status_code} body={respA.text[:500]}") | |
| payload_B = clean({ | |
| "state": "Resolved", | |
| "close_code": close_code_val, | |
| "close_notes": close_notes_val, | |
| "caller_id": caller_sysid, | |
| "resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), | |
| "work_notes": "Auto-resolve set by NOVA.", | |
| "resolved_by": resolved_by_sysid, | |
| "assignment_group": assign_group, | |
| }) | |
| respB = requests.patch(url, headers=headers, json=payload_B, verify=VERIFY_SSL, timeout=25) | |
| if respB.status_code in (200, 204): | |
| return True | |
| print(f"[SN PATCH resolve B] status={respB.status_code} body={respB.text[:500]}") | |
| code_field = os.getenv("SERVICENOW_RESOLUTION_CODE_FIELD", "close_code") | |
| notes_field = os.getenv("SERVICENOW_RESOLUTION_NOTES_FIELD", "close_notes") | |
| payload_C = clean({ | |
| "state": "6", | |
| code_field: close_code_val, | |
| notes_field: close_notes_val, | |
| "caller_id": caller_sysid, | |
| "resolved_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), | |
| "work_notes": "Auto-resolve set by NOVA.", | |
| "resolved_by": resolved_by_sysid, | |
| "assignment_group": assign_group, | |
| }) | |
| respC = requests.patch(url, headers=headers, json=payload_C, verify=VERIFY_SSL, timeout=25) | |
| if respC.status_code in (200, 204): | |
| return True | |
| print(f"[SN PATCH resolve C] status={respC.status_code} body={respC.text[:500]}") | |
| return False | |
| except Exception as e: | |
| print(f"[SN PATCH resolve] exception={safe_str(e)}") | |
| return False | |
| # ------------------------------ Prereq helper ------------------------------ | |
| def _find_prereq_section_text(best_doc: str) -> str: | |
| """ | |
| Return the prerequisites section text, trying common heading variants. | |
| Generic for future SOPs—no document-specific keywords. | |
| """ | |
| variants = [ | |
| "Pre-Requisites", | |
| "Prerequisites", | |
| "Pre Requisites", | |
| "Pre-Requirements", | |
| "Requirements", | |
| ] | |
| for title in variants: | |
| txt = get_section_text(best_doc, title) | |
| if txt and txt.strip(): | |
| return txt.strip() | |
| return "" | |
| # ------------------------------ Health ------------------------------ | |
| async def health_check(): | |
| return {"status": "ok"} | |
| # ------------------------------ Chat ------------------------------ | |
| async def chat_with_ai(input_data: ChatInput): | |
| assist_followup: Optional[str] = None | |
| try: | |
| msg_norm = (input_data.user_message or "").lower().strip() | |
| # Yes/No handlers | |
| if msg_norm in ("yes", "y", "sure", "ok", "okay"): | |
| return { | |
| "bot_response": "Great! Tell me what you'd like to do next — check another ticket, create an incident, or describe your issue.", | |
| "status": "OK", | |
| "followup": "You can say: 'create ticket', 'incident status INC0012345', or describe your problem.", | |
| "options": [], | |
| "debug": {"intent": "continue_conversation"}, | |
| } | |
| if msg_norm in ("no", "no thanks", "nope"): | |
| return { | |
| "bot_response": "No problem. Do you need assistance with any other issue?", | |
| "status": "OK", | |
| "end_chat": False, | |
| "followup": None, | |
| "options": [], | |
| "debug": {"intent": "end_conversation"}, | |
| } | |
| # Resolution ack (auto incident + mark Resolved) | |
| is_llm_resolved = _classify_resolution_llm(input_data.user_message) | |
| if _has_negation_resolved(msg_norm): | |
| is_llm_resolved = False | |
| if (not _has_negation_resolved(msg_norm)) and (_is_resolution_ack_heuristic(msg_norm) or is_llm_resolved): | |
| try: | |
| short_desc, long_desc = _build_tracking_descriptions(input_data.last_issue, input_data.user_message) | |
| result = create_incident(short_desc, long_desc) | |
| if isinstance(result, dict) and not result.get("error"): | |
| inc_number = result.get("number", "<unknown>") | |
| sys_id = result.get("sys_id") | |
| resolved_note = "" | |
| if sys_id: | |
| ok = _set_incident_resolved(sys_id) | |
| resolved_note = " (marked Resolved)" if ok else " (could not mark Resolved; please update manually)" | |
| return { | |
| "bot_response": f"✅ Incident created: {inc_number}{resolved_note}", | |
| "status": "OK", | |
| "context_found": False, | |
| "ask_resolved": False, | |
| "suggest_incident": False, | |
| "followup": None, | |
| "top_hits": [], | |
| "sources": [], | |
| "auto_incident": True, | |
| "debug": {"intent": "resolved_ack", "auto_created": True}, | |
| } | |
| else: | |
| err = (result or {}).get("error", "Unknown error") | |
| return { | |
| "bot_response": f"⚠️ I couldn't create the tracking incident automatically ({err}).", | |
| "status": "PARTIAL", | |
| "context_found": False, | |
| "ask_resolved": False, | |
| "suggest_incident": True, | |
| "followup": "Shall I create a ticket now?", | |
| "options": [{"type": "yesno", "title": "Create ticket now?"}], | |
| "top_hits": [], | |
| "sources": [], | |
| "debug": {"intent": "resolved_ack", "auto_created": False, "error": err}, | |
| } | |
| except Exception as e: | |
| return { | |
| "bot_response": f"⚠️ Something went wrong while creating the tracking incident: {safe_str(e)}", | |
| "status": "PARTIAL", | |
| "context_found": False, | |
| "ask_resolved": False, | |
| "suggest_incident": True, | |
| "followup": "Shall I create a ticket now?", | |
| "options": [{"type": "yesno", "title": "Create ticket now?"}], | |
| "top_hits": [], | |
| "sources": [], | |
| "debug": {"intent": "resolved_ack", "exception": True}, | |
| } | |
| # Incident intent | |
| if _is_incident_intent(msg_norm): | |
| return { | |
| "bot_response": ( | |
| "Okay, let's create a ServiceNow incident.\n\n" | |
| "Please provide:\n- Short Description (one line)\n" | |
| "- Detailed Description (steps, error text, IDs, site, environment)" | |
| ), | |
| "status": (input_data.prev_status or "PARTIAL"), | |
| "context_found": False, | |
| "ask_resolved": False, | |
| "suggest_incident": False, | |
| "show_incident_form": True, | |
| "followup": None, | |
| "top_hits": [], | |
| "sources": [], | |
| "debug": {"intent": "create_ticket"}, | |
| } | |
| # Status intent (ticket/incident) — disambiguated | |
| status_intent = _parse_ticket_status_intent(msg_norm) | |
| if status_intent: | |
| if status_intent.get("ask_number"): | |
| return { | |
| "bot_response": ( | |
| "To check a ticket status, please share the Incident ID (e.g., INC0012345).\n\n" | |
| "You can paste the ID here or say 'cancel'." | |
| ), | |
| "status": "PARTIAL", | |
| "context_found": False, | |
| "ask_resolved": False, | |
| "suggest_incident": False, | |
| "followup": None, | |
| "show_status_form": True, | |
| "top_hits": [], | |
| "sources": [], | |
| "debug": {"intent": "status_request_missing_id"}, | |
| } | |
| try: | |
| token = get_valid_token() | |
| instance_url = os.getenv("SERVICENOW_INSTANCE_URL") | |
| if not instance_url: | |
| raise HTTPException(status_code=500, detail="SERVICENOW_INSTANCE_URL missing") | |
| headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} | |
| number = status_intent.get("number") | |
| url = f"{instance_url}/api/now/table/incident?number={number}" | |
| response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25) | |
| data = response.json() | |
| lst = data.get("result", []) | |
| result = (lst or [{}])[0] if response.status_code == 200 else {} | |
| state_code = builtins.str(result.get("state", "unknown")) | |
| state_label = STATE_MAP.get(state_code, state_code) | |
| short = result.get("short_description", "") | |
| num = result.get("number", number or "unknown") | |
| return { | |
| "bot_response": ( | |
| f"**Ticket:** {num}\n" | |
| f"**Status:** {state_label}\n" | |
| f"**Issue description:** {short}" | |
| ), | |
| "status": "OK", | |
| "show_assist_card": True, | |
| "context_found": False, | |
| "ask_resolved": False, | |
| "suggest_incident": False, | |
| "followup": "Is there anything else I can assist you with?", | |
| "top_hits": [], | |
| "sources": [], | |
| "debug": {"intent": "status", "http_status": response.status_code}, | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=safe_str(e)) | |
| # Hybrid KB search | |
| kb_results = hybrid_search_knowledge_base(input_data.user_message, top_k=10, alpha=0.6, beta=0.4) | |
| documents = kb_results.get("documents", []) | |
| metadatas = kb_results.get("metadatas", []) | |
| distances = kb_results.get("distances", []) | |
| combined = kb_results.get("combined_scores", []) | |
| items: List[Dict[str, Any]] = [] | |
| for i, doc in enumerate(documents): | |
| text = doc.strip() if isinstance(doc, str) else "" | |
| if not text: | |
| continue | |
| meta = metadatas[i] if i < len(metadatas) and isinstance(metadatas[i], dict) else {} | |
| score = distances[i] if i < len(distances) else None | |
| comb = combined[i] if i < len(combined) else None | |
| m = dict(meta) | |
| if score is not None: | |
| m["distance"] = score | |
| if comb is not None: | |
| m["combined"] = comb | |
| items.append({"text": text, "meta": m}) | |
| selected = items[:max(1, 2)] | |
| context_raw = "\n\n---\n\n".join([s["text"] for s in selected]) if selected else "" | |
| filtered_text, filt_info = _filter_context_for_query(context_raw, input_data.user_message) | |
| context = filtered_text | |
| context_found = bool(context.strip()) | |
| best_distance = min([d for d in distances if d is not None], default=None) if distances else None | |
| best_combined = max([c for c in combined if c is not None], default=None) if combined else None | |
| detected_intent = kb_results.get("user_intent", "neutral") | |
| best_doc = kb_results.get("best_doc") | |
| top_meta = (metadatas or [{}])[0] if metadatas else {} | |
| msg_low = (input_data.user_message or "").lower() | |
| GENERIC_ERROR_TERMS = ("error", "issue", "problem", "not working", "failed", "failure") | |
| generic_error_signal = any(t in msg_low for t in GENERIC_ERROR_TERMS) | |
| # Query-based prereq nudge | |
| PREREQ_TERMS = ( | |
| "pre req", "pre-requisite", "pre-requisites", "prerequisite", | |
| "prerequisites", "pre requirement", "pre-requirements", "requirements" | |
| ) | |
| if detected_intent == "neutral" and any(t in msg_low for t in PREREQ_TERMS): | |
| detected_intent = "prereqs" | |
| # Force errors intent for permissions | |
| PERM_QUERY_TERMS = [ | |
| "permission", "permissions", "access", "access right", "authorization", "authorisation", | |
| "role", "role access", "security", "security profile", "privilege", "not allowed", "not authorized", "denied", | |
| ] | |
| is_perm_query = any(t in msg_norm for t in PERM_QUERY_TERMS) | |
| if is_perm_query: | |
| detected_intent = "errors" | |
| # Heading-aware prereq nudge | |
| sec_title = ((top_meta or {}).get("section") or "").strip().lower() | |
| PREREQ_HEADINGS = ( | |
| "pre-requisites", "prerequisites", "pre requisites", | |
| "pre-requirements", "requirements" | |
| ) | |
| if detected_intent == "neutral" and any(h in sec_title for h in PREREQ_HEADINGS): | |
| detected_intent = "prereqs" | |
| # --- Module-aware steps nudge (appointments, picking, shipping, etc.) --- | |
| STEPS_TERMS = ("how to", "procedure", "perform", "steps", "do", "navigate") | |
| ACTIONS_PRESENT = any(s in msg_low for syns in ACTION_SYNONYMS_EXT.values() for s in syns) | |
| mod_tags = ((top_meta or {}).get("module_tags") or "").lower() | |
| MODULE_TOKENS = tuple({term for syns in MODULE_VOCAB.values() for term in syns}) | |
| looks_like_module = ( | |
| any(t in msg_low for t in MODULE_TOKENS) | |
| or any(m in mod_tags for m in ["appointments", "receiving", "picking", "putaway", "shipping", "inventory", "replenishment"]) | |
| or any(m in sec_title for m in ["appointment", "appointments", "schedule", "dock", "door", "picking", "putaway", "shipping", "inventory"]) | |
| ) | |
| looks_like_steps_query = any(t in msg_low for t in STEPS_TERMS) or ACTIONS_PRESENT | |
| if detected_intent in ("neutral", "prereqs") and looks_like_steps_query and looks_like_module: | |
| detected_intent = "steps" | |
| # --- Meaning-aware SOP gating --- | |
| def _contains_any(s: str, keywords: tuple) -> bool: | |
| low = (s or "").lower() | |
| return any(k in low for k in keywords) | |
| DOMAIN_TERMS = ( | |
| "trailer", "shipment", "order", "load", "wave", | |
| "inventory", "putaway", "receiving", "appointment", | |
| "dock", "door", "manifest", "pallet", "container", | |
| "asn", "grn", "pick", "picking" | |
| ) | |
| ACTION_OR_ERROR_TERMS = ( | |
| "how to", "procedure", "perform", # added | |
| "close", "closing", "open", "navigate", "scan", "confirm", "generate", "update", | |
| "receive", "receiving", # added | |
| "error", "issue", "fail", "failed", "not working", "locked", "mismatch", | |
| "access", "permission", "status" | |
| ) | |
| matched_count = int(filt_info.get("matched_count") or 0) | |
| filter_mode = (filt_info.get("mode") or "").lower() | |
| has_any_action_or_error = _contains_any(msg_low, ACTION_OR_ERROR_TERMS) | |
| mentions_domain = _contains_any(msg_low, DOMAIN_TERMS) | |
| short_query = len((input_data.user_message or "").split()) <= 4 | |
| gate_combined_ok = 0.60 if short_query else 0.55 | |
| combined_ok = (best_combined is not None and best_combined >= gate_combined_ok) | |
| weak_domain_only = (mentions_domain and not has_any_action_or_error) | |
| low_context_hit = (matched_count < 2 and filter_mode in ("concise", "exact")) | |
| strong_steps_bypass = looks_like_steps_query and looks_like_module | |
| strong_error_signal = len(_detect_error_families(msg_low)) > 0 | |
| if (weak_domain_only or (low_context_hit and not combined_ok)) \ | |
| and not strong_steps_bypass \ | |
| and not (strong_error_signal or generic_error_signal): | |
| return { | |
| "bot_response": _build_clarifying_message(), | |
| "status": "NO_KB_MATCH", | |
| "context_found": False, | |
| "ask_resolved": False, | |
| "suggest_incident": True, | |
| "followup": "Share more details (module/screen/error), or say 'create ticket'.", | |
| "options": [{"type": "yesno", "title": "Share details or raise a ticket?"}], | |
| "top_hits": [], | |
| "sources": [], | |
| "debug": { | |
| "intent": "sop_rejected_weak_match", | |
| "matched_count": matched_count, | |
| "filter_mode": filter_mode, | |
| "best_combined": best_combined, | |
| "mentions_domain": mentions_domain, | |
| "has_any_action_or_error": has_any_action_or_error, | |
| "strong_steps_bypass": strong_steps_bypass, | |
| "strong_error_signal": strong_error_signal, | |
| "generic_error_signal": generic_error_signal | |
| }, | |
| } | |
| # Build SOP context if allowed | |
| if is_perm_query: | |
| detected_intent = "errors" | |
| escalation_line = None # SOP escalation candidate | |
| full_errors = None # keep for possible escalation extraction | |
| next_step_applied = False | |
| next_step_info: Dict[str, Any] = {} | |
| # Helper: detect asked action from query using extended synonyms | |
| def _detect_action_from_query(q: str) -> Optional[str]: | |
| qlow = (q or "").lower() | |
| for act, syns in ACTION_SYNONYMS_EXT.items(): | |
| if any(s in qlow for s in syns): | |
| return act | |
| return None | |
| # Helper: filter lines in a steps context to keep only the asked action | |
| def _filter_steps_by_action(raw_context: str, asked_act: Optional[str]) -> str: | |
| if not asked_act or not raw_context.strip(): | |
| return raw_context | |
| other_terms: List[str] = [] | |
| for act, syns in ACTION_SYNONYMS_EXT.items(): | |
| if act != asked_act: | |
| other_terms.extend(syns) | |
| lines = _normalize_lines(raw_context) | |
| kept = [ln for ln in lines if not any(t in ln.lower() for t in other_terms)] | |
| return "\n".join(kept).strip() if kept else raw_context | |
| if best_doc: | |
| if detected_intent == "steps": | |
| full_steps = get_best_steps_section_text(best_doc) | |
| if not full_steps: | |
| sec = (top_meta or {}).get("section") | |
| if sec: | |
| full_steps = get_section_text(best_doc, sec) | |
| if full_steps: | |
| # Apply action-focused filtering FIRST (to avoid mixing create/update/delete) | |
| asked_action = _detect_action_from_query(input_data.user_message) | |
| full_steps = _filter_steps_by_action(full_steps, asked_action) | |
| # Use numbered form only for matching; keep raw for full output | |
| numbered_full = _ensure_numbering(full_steps) | |
| next_only = _resolve_next_steps(input_data.user_message, numbered_full, max_next=6, min_score=0.35) | |
| if next_only is not None: | |
| # "what's next" mode | |
| if len(next_only) == 0: | |
| context = "You are at the final step of this SOP. No further steps." | |
| next_step_applied = True | |
| next_step_info = {"count": 0} | |
| context_preformatted = True | |
| else: | |
| context = _format_steps_as_numbered(next_only) | |
| next_step_applied = True | |
| next_step_info = {"count": len(next_only)} | |
| context_preformatted = True | |
| else: | |
| # Normal mode: return the full SOP section (raw), and we'll number it below once. | |
| context = full_steps | |
| context_preformatted = False | |
| elif detected_intent == "errors": | |
| full_errors = get_best_errors_section_text(best_doc) | |
| if full_errors: | |
| ctx_err = _extract_errors_only(full_errors, max_lines=30) | |
| if is_perm_query: | |
| context = _filter_permission_lines(ctx_err, max_lines=6) | |
| else: | |
| # Decide specific vs generic: | |
| is_specific_error = len(_detect_error_families(msg_low)) > 0 | |
| if is_specific_error: | |
| context = _filter_error_lines_by_query(ctx_err, input_data.user_message, max_lines=1) | |
| else: | |
| all_lines: List[str] = _normalize_lines(ctx_err) | |
| error_bullets = [ln for ln in all_lines if re.match(r"^\s*[-*\u2022]\s*", ln) or (":" in ln)] | |
| context = "\n".join(error_bullets[:8]).strip() | |
| assist_followup = ( | |
| "Please tell me which error above matches your screen (paste the exact text), " | |
| "or share a screenshot. I can guide you further or raise a ServiceNow ticket." | |
| ) | |
| escalation_line = _extract_escalation_line(full_errors) | |
| else: | |
| # Fallback when Errors section is missing: show steps | |
| full_steps = get_best_steps_section_text(best_doc) or get_section_text(best_doc, sec_title or "") | |
| if full_steps: | |
| # Apply action filter to keep responses tight | |
| asked_action = _detect_action_from_query(input_data.user_message) | |
| full_steps = _filter_steps_by_action(full_steps, asked_action) | |
| context = full_steps | |
| detected_intent = "steps" | |
| context_preformatted = False | |
| elif detected_intent == "prereqs": | |
| full_prereqs = _find_prereq_section_text(best_doc) | |
| if full_prereqs: | |
| context = full_prereqs.strip() | |
| else: | |
| # Fallback when Prereqs section is missing: show steps | |
| full_steps = get_best_steps_section_text(best_doc) or get_section_text(best_doc, sec_title or "") | |
| if full_steps: | |
| asked_action = _detect_action_from_query(input_data.user_message) | |
| full_steps = _filter_steps_by_action(full_steps, asked_action) | |
| context = full_steps | |
| detected_intent = "steps" | |
| context_preformatted = False | |
| language_hint = _detect_language_hint(input_data.user_message) | |
| lang_line = f"Respond in {language_hint}." if language_hint else "Respond in a clear, polite tone." | |
| use_gemini = (detected_intent == "errors") | |
| enhanced_prompt = f"""You are a helpful support assistant. Rewrite the provided context ONLY into clear, user-friendly guidance. | |
| - Do not add any information that is not present in the context. | |
| - If the content is an error/access/permission note, paraphrase it into a helpful sentence users can understand. | |
| - {lang_line} | |
| ### Context | |
| {context} | |
| ### Question | |
| {input_data.user_message} | |
| ### Output | |
| Return ONLY the rewritten guidance.""" | |
| headers = {"Content-Type": "application/json"} | |
| payload = {"contents": [{"parts": [{"text": enhanced_prompt}]}]} | |
| bot_text = "" | |
| http_code = 0 | |
| if use_gemini and GEMINI_API_KEY: | |
| try: | |
| resp = requests.post(GEMINI_URL, headers=headers, json=payload, timeout=25, verify=GEMINI_SSL_VERIFY) | |
| http_code = getattr(resp, "status_code", 0) | |
| try: | |
| result = resp.json() | |
| except Exception: | |
| result = {} | |
| try: | |
| bot_text = result.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "") | |
| except Exception: | |
| bot_text = "" | |
| except Exception: | |
| bot_text, http_code = "", 0 | |
| # Deterministic local formatting | |
| if detected_intent == "steps": | |
| # If we trimmed to next steps, 'context' is already formatted (or a sentence). | |
| # Only number when returning full SOP raw text. | |
| if ('context_preformatted' in locals()) and context_preformatted: | |
| bot_text = context | |
| else: | |
| bot_text = _ensure_numbering(context) | |
| elif detected_intent == "errors": | |
| if not bot_text.strip() or http_code == 429: | |
| bot_text = context.strip() | |
| if escalation_line: | |
| bot_text = (bot_text or "").rstrip() + "\n\n" + escalation_line | |
| else: | |
| bot_text = context | |
| # If the user explicitly asks to escalate, append escalation even in 'steps' intent | |
| needs_escalation = (" escalate" in msg_norm) or ("escalation" in msg_norm) | |
| if needs_escalation and best_doc: | |
| esc_text = get_escalation_text(best_doc) | |
| if not esc_text and full_errors: | |
| esc_text = full_errors | |
| line = _extract_escalation_line(esc_text or "") | |
| if line: | |
| bot_text = (bot_text or "").rstrip() + "\n\n" + line | |
| # Guarantee non-empty bot response | |
| if not (bot_text or "").strip(): | |
| if context.strip(): | |
| bot_text = context.strip() | |
| else: | |
| bot_text = ( | |
| "I found some related guidance but couldn’t assemble a reply. " | |
| "Share a bit more detail (module/screen/error), or say ‘create ticket’." | |
| ) | |
| short_query = len((input_data.user_message or "").split()) <= 4 | |
| gate_combined_ok = 0.60 if short_query else 0.55 | |
| status = "OK" if (best_combined is not None and best_combined >= gate_combined_ok) else "PARTIAL" | |
| lower = (bot_text or "").lower() | |
| if ("partial" in lower) or ("may be partial" in lower) or ("closest" in lower) or ("may not fully" in lower): | |
| status = "PARTIAL" | |
| options = [{"type": "yesno", "title": "Share details or raise a ticket?"}] if status == "PARTIAL" else [] | |
| return { | |
| "bot_response": bot_text, | |
| "status": status, | |
| "context_found": True, | |
| "ask_resolved": (status == "OK" and detected_intent != "errors"), | |
| "suggest_incident": (status == "PARTIAL"), | |
| "followup": (assist_followup if assist_followup else ("Is this helpful, or should I raise a ticket?" if status == "PARTIAL" else None)), | |
| "options": options, | |
| "top_hits": [], | |
| "sources": [], | |
| "debug": { | |
| "used_chunks": len((context or "").split("\n\n---\n\n")) if context else 0, | |
| "best_distance": best_distance, | |
| "best_combined": best_combined, | |
| "http_status": http_code, | |
| "filter_mode": filt_info.get("mode"), | |
| "matched_count": filt_info.get("matched_count"), | |
| "user_intent": detected_intent, | |
| "best_doc": best_doc, | |
| "next_step": { | |
| "applied": next_step_applied, | |
| "info": next_step_info, | |
| }, | |
| }, | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=safe_str(e)) | |
| # ------------------------------ Ticket description generation ------------------------------ | |
| async def generate_ticket_desc_ep(input_data: TicketDescInput): | |
| try: | |
| prompt = ( | |
| f"You are helping generate ServiceNow ticket descriptions based on the issue: {input_data.issue}.\n" | |
| "Please return the output strictly in JSON format with the following keys:\n" | |
| "{\n" | |
| ' "ShortDescription": "A concise summary of the issue (max 100 characters)",\n' | |
| ' "DetailedDescription": "A detailed explanation of the issue"\n' | |
| "}\n" | |
| "Do not include any extra text, comments, or explanations outside the JSON." | |
| ) | |
| headers = {"Content-Type": "application/json"} | |
| payload = {"contents": [{"parts": [{"text": prompt}]}]} | |
| resp = requests.post(GEMINI_URL, headers=headers, json=payload, timeout=25, verify=GEMINI_SSL_VERIFY) | |
| try: | |
| data = resp.json() | |
| except Exception: | |
| return {"ShortDescription": "", "DetailedDescription": "", "error": "Gemini returned non-JSON"} | |
| try: | |
| text = data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "").strip() | |
| except Exception: | |
| return {"ShortDescription": "", "DetailedDescription": "", "error": "Gemini parsing failed"} | |
| if text.startswith("```"): | |
| lines = [ln for ln in text.splitlines() if not ln.strip().startswith("```")] | |
| text = "\n".join(lines).strip() | |
| try: | |
| ticket_json = json.loads(text) | |
| return { | |
| "ShortDescription": ticket_json.get("ShortDescription", "").strip(), | |
| "DetailedDescription": ticket_json.get("DetailedDescription", "").strip(), | |
| } | |
| except Exception: | |
| return {"ShortDescription": "", "DetailedDescription": "", "error": "Invalid JSON returned"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=safe_str(e)) | |
| # ------------------------------ Incident status ------------------------------ | |
| async def incident_status(input_data: TicketStatusInput): | |
| try: | |
| token = get_valid_token() | |
| instance_url = os.getenv("SERVICENOW_INSTANCE_URL") | |
| if not instance_url: | |
| raise HTTPException(status_code=500, detail="SERVICENOW_INSTANCE_URL missing") | |
| headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} | |
| if input_data.sys_id: | |
| url = f"{instance_url}/api/now/table/incident/{input_data.sys_id}" | |
| response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25) | |
| data = response.json() | |
| result = data.get("result", {}) if response.status_code == 200 else {} | |
| elif input_data.number: | |
| url = f"{instance_url}/api/now/table/incident?number={input_data.number}" | |
| response = requests.get(url, headers=headers, verify=VERIFY_SSL, timeout=25) | |
| data = response.json() | |
| lst = data.get("result", []) | |
| result = (lst or [{}])[0] if response.status_code == 200 else {} | |
| else: | |
| raise HTTPException(status_code=400, detail="Provide IncidentID (number) or sys_id") | |
| state_code = builtins.str(result.get("state", "unknown")) | |
| state_label = STATE_MAP.get(state_code, state_code) | |
| short = result.get("short_description", "") | |
| number = result.get("number", input_data.number or "unknown") | |
| return { | |
| "bot_response": ( | |
| f"**Ticket:** {number} \n" | |
| f"**Status:** {state_label} \n" | |
| f"**Issue description:** {short}" | |
| ).replace("\n", " \n"), | |
| "followup": "Is there anything else I can assist you with?", | |
| "show_assist_card": True, | |
| "persist": True, | |
| "debug": "Incident status fetched", | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=safe_str(e)) | |
| # ------------------------------ Incident ------------------------------ | |
| async def raise_incident(input_data: IncidentInput): | |
| try: | |
| result = create_incident(input_data.short_description, input_data.description) | |
| if isinstance(result, dict) and not result.get("error"): | |
| inc_number = result.get("number", "<unknown>") | |
| sys_id = result.get("sys_id") | |
| resolved_note = "" | |
| if bool(input_data.mark_resolved) and sys_id not in ("<unknown>", None): | |
| ok = _set_incident_resolved(sys_id) | |
| resolved_note = " (marked Resolved)" if ok else " (could not mark Resolved; please update manually)" | |
| ticket_text = f"Incident created: {inc_number}{resolved_note}" if inc_number else "Incident created." | |
| return { | |
| "bot_response": f"✅ {ticket_text}", | |
| "debug": "Incident created via ServiceNow", | |
| "persist": True, | |
| "show_assist_card": True, | |
| "followup": "Is there anything else I can assist you with?", | |
| } | |
| else: | |
| raise HTTPException(status_code=500, detail=(result or {}).get("error", "Unknown error")) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=safe_str(e)) | |