"""LangGraph-based requirement-collection consultation state machine. Implements a 5-phase flow (Discovery → Exploration → Constraints → Solution → Refinement) controlled by confidence. Supports both clear requests (e.g. build X) and problems (e.g. Y is wrong). CODE-LEVEL QUALITY ENFORCEMENT: - Requirement-oriented question categories (scope, what they have, behavior/look, success) - Anti-repetition: sentence overlap detection and re-generation - Anti-hallucination: verifies claims against conversation history - Deferral counter: "I don't know" → switch to proposal mode - Turn-aware instructions and understanding validation """ from __future__ import annotations import json import re from difflib import SequenceMatcher from typing import Any, TypedDict from langchain_core.messages import HumanMessage, SystemMessage from llm import llm from models import Message, Phase, SessionState, PHASE_ORDER, PHASE_THRESHOLDS from prompts import PHASE_PROMPTS, QUESTIONNAIRE_SECTIONS, CURRENT_SECTION_PLACEHOLDER # --------------------------------------------------------------------------- # Questionnaire-driven section progression (same intent → same question order) # --------------------------------------------------------------------------- def _current_section_index(questions_asked: int) -> int: """Section index 0..10 from number of questions asked. ~2 questions per section.""" return min(questions_asked // 2, len(QUESTIONNAIRE_SECTIONS) - 1) def _build_section_block(section_index: int) -> str: """Build the 'Current section' block to inject into the prompt.""" num, title, intents = QUESTIONNAIRE_SECTIONS[section_index] intents_str = "\n".join(f"- {i}" for i in intents) return ( f"Current section: Section {num} — {title}\n" f"Question intents to cover (adapt to domain and conversation):\n" f"{intents_str}\n" f"If the client has ALREADY answered these intents earlier in the conversation, skip this section or ask only what's missing, then move to the next section.\n" ) # --------------------------------------------------------------------------- # Graph state type # --------------------------------------------------------------------------- class GraphState(TypedDict): session_state: SessionState user_message: str assistant_reply: str new_confidence: float new_understanding: str new_phase: Phase # --------------------------------------------------------------------------- # Wrap-up guidance (when to ask "Is there anything else?") # --------------------------------------------------------------------------- def _build_wrap_up_guidance(questions_asked: int, messages: list[Message], section_index: int) -> str: """Tell the bot when to keep gathering vs when to ask 'Is there anything else?'""" if _can_wrap_up(questions_asked, messages): return "\nYou've gathered enough info. Ask: 'Is there anything else you'd like to add?' Only that — no proposal or solution.\n" section_num = section_index + 1 total_sections = len(QUESTIONNAIRE_SECTIONS) return ( f"\nKeep gathering. You are on section {section_num} of {total_sections} (follow the questionnaire in order). " f"Do NOT ask 'Is there anything else?' until you have gone through ALL 11 sections (through Section 11 - Open Ends) " f"and asked at least {MIN_QUESTIONS_BEFORE_WRAP_UP} questions. You have asked {questions_asked} so far.\n" ) # --------------------------------------------------------------------------- # Conversation intelligence helpers # --------------------------------------------------------------------------- DEFERRAL_PATTERNS = [ r"\bi\s*(?:don'?t|dont|dotn|do\s*not)\s*know\b", r"\bno\s*idea\b", r"\bnot\s*sure\b", r"\byou\s*decide\b", r"\byou\s*tell\s*me\b", r"\bidk\b", r"\bno\s*clue\b", r"\bi\s*have\s*no\s*(idea|clue)\b", r"\bwhat\s*do\s*you\s*(mean|suggest|think|recommend)\b", r"\bcan\s*you\s*explain\b", r"\bnope\b", r"\bi\s*(?:don'?t|dont)\s*(?:really\s*)?know\b", ] def _count_deferrals(messages: list[Message]) -> int: """Count how many times the user deferred or said 'I don't know'.""" count = 0 for msg in messages: if msg.role == "user": text = msg.content.lower() for pattern in DEFERRAL_PATTERNS: if re.search(pattern, text): count += 1 break return count def _count_bot_questions(messages: list[Message]) -> int: """Count total questions the bot has asked across all turns.""" return sum(msg.content.count("?") for msg in messages if msg.role == "assistant") def _get_turn_number(messages: list[Message]) -> int: """Get the current turn number (count of user messages).""" return sum(1 for msg in messages if msg.role == "user") + 1 def _get_previous_bot_responses(messages: list[Message]) -> list[str]: """Get all previous bot responses.""" return [msg.content for msg in messages if msg.role == "assistant"] # Specific patterns that indicate the bot ACTUALLY asked about data STORAGE # Must be specific enough to not false-positive on generic mentions DATA_STORAGE_QUESTION_PATTERNS = [ r"(?:how|where) do you (?:keep track|track|record|store|log|manage)", r"(?:do you|are you) (?:use|using) (?:any |a |)(?:excel|spreadsheet|notebook|software|tool|system|app)", r"(?:excel|spreadsheet|notebook|software|database|app|tool|system).+(?:to track|to record|to manage|to store|to keep)", r"(?:keep|store|record|track|log).+(?:in a |in |on |using |with )(?:excel|spreadsheet|notebook|software|database|app|system|paper)", r"how (?:do you|are you) (?:currently |)(?:tracking|recording|storing|managing|keeping) (?:your |the |all |this )", r"(?:track|record|manage) your (?:business |)(?:information|data|sales|inventory)", r"(?:track or record|record or track|store or track)", ] def _has_asked_about_data(messages: list[Message]) -> bool: """Check if the bot has SPECIFICALLY asked about data storage/tracking tools. Only returns True if the bot asked WHERE/HOW data is stored (Excel, notebook, etc.) Not just any mention of 'data' or 'track' in general context. """ for msg in messages: if msg.role == "assistant" and "?" in msg.content: content_lower = msg.content.lower() question_parts = content_lower.split("?") for part in question_parts: for pattern in DATA_STORAGE_QUESTION_PATTERNS: if re.search(pattern, part): return True return False # Require ALL 11 sections before wrap-up MIN_QUESTIONS_BEFORE_WRAP_UP = 22 # ~2 per section so we reach Section 11 MIN_SECTION_BEFORE_WRAP_UP = 10 # Section 11 (Open Ends) — must complete all sections def _can_wrap_up(questions_asked: int, messages: list[Message]) -> bool: """True only after we've gone through ALL 11 questionnaire sections (no early exit).""" section_index = _current_section_index(questions_asked) # Must reach Section 11 (Open Ends, index 10) and ask at least 22 questions return ( questions_asked >= MIN_QUESTIONS_BEFORE_WRAP_UP and section_index >= MIN_SECTION_BEFORE_WRAP_UP ) def _user_has_said_where_data_is(messages: list[Message]) -> bool: """True if the user has already said where they keep their data (spreadsheet, database, etc.).""" data_location_words = [ "spreadsheet", "database", "excel", "notebook", "paper", "software", "app", "system", "cloud", "computer", "file", "files", ] for msg in messages: if msg.role != "user": continue lower = msg.content.lower().strip() # Short reply that looks like an answer (e.g. "spreadsheet", "database") if len(lower) < 80 and any(w in lower for w in data_location_words): return True if any(w in lower for w in data_location_words) and ("keep" in lower or "store" in lower or "track" in lower or "use" in lower or "in " in lower): return True return False def _check_repetition(new_reply: str, previous_replies: list[str]) -> float: """Check sentence-level overlap with previous replies. Returns 0.0-1.0.""" if not previous_replies: return 0.0 new_sentences = set( s.strip().lower() for s in re.split(r'[.!?\n]', new_reply) if len(s.strip()) > 20 ) if not new_sentences: return 0.0 max_overlap = 0.0 for prev in previous_replies: prev_sentences = set( s.strip().lower() for s in re.split(r'[.!?\n]', prev) if len(s.strip()) > 20 ) if not prev_sentences: continue overlap_count = 0 for new_s in new_sentences: for prev_s in prev_sentences: if SequenceMatcher(None, new_s, prev_s).ratio() > 0.75: overlap_count += 1 break overlap_ratio = overlap_count / len(new_sentences) max_overlap = max(max_overlap, overlap_ratio) return max_overlap # --------------------------------------------------------------------------- # Reply quality enforcement (code-level, can't be bypassed by LLM) # --------------------------------------------------------------------------- # Budget / timeline — NEVER ask (policy) BUDGET_TIMELINE_PATTERNS = [ r"budget", r"timeline", r"when (?:do you |would you |)need (?:it|this|the)", r"deadline", r"how (?:much |long |)(?:time|money)", r"expected (?:timeline|date|duration)", r"time (?:frame|line|scale)", r"(?:within|in) (?:how many|what) (?:weeks|months|days)", r"expectations? (?:for|about) (?:the )?timeline", ] # Policy-only checks: budget/timeline and tech. All other behavior is guided by prompts and LLM context. # Non-tech client: no "involvement", "development", "implementation", "progress updates", "system design" TECH_PROCESS_PATTERNS = [ r"involved in (?:the |)(?:development|implementation)", r"involvement in (?:development|implementation|design)", r"progress updates?", r"regular (?:meetings?|updates?)", r"system design", r"input on (?:system |)(?:design|implementation)", r"project management (?:portal|tool)", r"preferred method for (?:receiving|getting) (?:updates?|progress)", r"receiving (?:progress )?updates?", r"providing input on", r"implementation (?:of the |)(?:system|solution)", r"development and implementation", r"first meeting to discuss (?:project|details|expectations)", r"ready to proceed", r"prioritize .*(?:goals?|objectives?)", ] # Filler SENTENCES that should be stripped from ANYWHERE in the response # These are full-sentence patterns that add no value and make responses robotic FILLER_SENTENCES = [ # "I'm glad / I'm happy" openers r"i'm glad you (?:shared|told|mentioned|brought)", r"i'm happy (?:to hear|you shared|you told)", # "I'd love to" filler r"i'd love to (?:learn|help|explore|understand|hear|know|dig)", r"i'd like to (?:learn|help|explore|understand|hear|know|dig)", # Restating the problem robotically (domain-agnostic) r"(?:can be|is) a (?:challenging|significant|difficult|tough|particularly challenging|common) (?:issue|problem|situation|challenge)", r"(?:this|it) can be a (?:common|challenging|significant) (?:issue|problem|situation)", r"especially (?:given|with) the (?:high value|constantly changing|competitive)", r"especially given the (?:high value|unique|complex) (?:nature|aspects)", r"(?:market|industry) can be (?:often |)(?:subject to|a (?:high|competitive))", r"are often (?:purchased|used|needed) for (?:special |)(?:occasions|reasons)", r"the (?:quality and )?characteristics of the", r"the (?:market|industry) can be", r"it's likely that this is affecting", r"this (?:can|may|might) (?:be|also be) (?:affecting|impacting|taking)", # Robotic transitions r"it sounds like you(?:'re| are) (?:dealing with|experiencing|having|facing|going through)", r"^it(?:'s| is) (?:clear|evident|apparent|obvious|understandable|interesting) that", r"it can be (?:particularly |especially |quite |very |)(?:challenging|concerning|frustrating|difficult|tough)", r"i (?:can |)(?:understand|see|imagine|appreciate) (?:that |how |why )(?:this|it|you)", r"that(?:'s| is) (?:a |definitely a |certainly a |)(?:great|good|excellent|interesting)", r"thank you for sharing", r"i'm (?:starting to |)(?:get|getting) a sense", r"^(?:a |this is a )(?:challenging|significant|difficult|tough|concerning) (?:situation|challenge|problem|issue)", r"you(?:'d| would) be (?:concerned|worried|frustrated|surprised)", r"it seems like (?:we(?:'ve| have)|you(?:'ve| have))", # Recap openers — bot restates known facts every turn r"you(?:'re| are) (?:in|involved in) the .{3,30} business", r"you(?:'ve| have) (?:been using|mentioned|shared|told|noted|indicated|acknowledged)", r"given the (?:specifics|broad range|unique aspects|diverse nature|complexity)", r"to better understand (?:the scope|your situation|the issue|your)", r"this (?:is a good starting point|indicates you have)", r"it'?s possible that .+(?:could be|might be|is)", r"(?:could be|might be) (?:impacting|affecting|contributing)", r"(?:understanding|this information) (?:the specifics|helps us understand)", r"can (?:help|provide) (?:identify|valuable insights)", r"understanding the (?:current state|specifics) .+ (?:can provide|can help)", r"i'm excited to learn more", r"i'm looking forward to", r"understanding (?:your|the) specific", r"(?:this|that) (?:can be|is) (?:particularly |especially |)(?:challenging|concerning|frustrating)", # Repeated robotic phrases r"(?:can be |is )particularly (?:challenging|concerning)", r"a (?:challenging|significant) (?:situation|challenge|financial burden)", r"(?:\w+ )?industry(?:,|) where trends and (?:consumer |customer |)preferences can shift", r"wasted resources,? tied-?up capital", r"high value and luxury nature", r"it(?:'s| is) essential to (?:identify|address|tackle|consider)", r"(?:affecting|impacting) your business(?:'s|'s) (?:overall |)(?:performance|cash flow|profitability)", r"(?:overall |)(?:performance|appearance|freshness) and (?:customer engagement|profitability)", r"(?:get|getting) your business back on track", # "I'm imagining" / "I'm thinking" r"i'm (?:imagining|thinking) that (?:this|it) (?:might|may|could|is)", # "It seems like" (broad — catches any usage) r"it seems like (?:the|your|this|that)", # "This could indicate/be due to" r"this (?:could|may|might) (?:indicate|suggest|mean|be due to|be related to|be a sign)", # "It's likely that you're" r"it(?:'s| is) likely that (?:you(?:'re| are)|this|there)", # "Given the high value / slow movement" r"given the (?:high value|slow movement|nature|importance|significance)", # "It's great that you..." r"it(?:'s| is) great that you(?:'ve| have)", # "You've just started our conversation" r"you(?:'ve| have) just started", # Solution pitching — requirement gathering only r"our approach (?:at |)stacklogix (?:involves|would involve|will involve)", r"stacklogix will (?:develop|implement|provide|deliver)", r"how we will address (?:it|this)", r"what we will deliver", r"implementation (?:of a |)(?:robust |)(?:management |)system", r"key components? of the solution", r"we will (?:develop|implement|provide|schedule|work closely)", ] # Section headers that create robotic formatting RECAP_HEADERS = [ r"\*\*(?:key points?|current understanding|summary|what (?:we|i) know(?: so far)?|what you(?:'ve| have) (?:told|shared|mentioned))(?:\s*so far)?\s*:?\*\*", r"\*\*(?:key (?:observations?|takeaways?|insights?|details?|information))\s*:?\*\*", r"\*\*(?:current challenges?|situation overview|current situation)\s*:?\*\*", ] def _detect_bad_question_patterns(reply: str) -> str | None: """Detect self-solving, cause-suggesting, budget/timeline, or technical process questions. Returns a correction instruction if bad patterns found, None otherwise. """ reply_lower = reply.lower() # Only enforce hard policy via patterns; let the LLM handle the rest from conversation context. for pattern in BUDGET_TIMELINE_PATTERNS: if re.search(pattern, reply_lower): return ( "⚠️ You must NOT ask about budget or timeline. " "Remove any question about when they need it, deadlines, or cost. Ask something else from the questionnaire in plain words." ) for pattern in TECH_PROCESS_PATTERNS: if re.search(pattern, reply_lower): return ( "⚠️ The client is non-technical. Do NOT ask about involvement in development, " "progress updates, system design, project management, or implementation. " "Ask a simple, plain-language question from the current section." ) return None def _strip_filler_sentences(reply: str) -> str: """Strip filler sentences from the response. Strategy: Only strip filler from the OPENING of each paragraph. This removes robotic openers while preserving analytical content. If stripping would make the response too short, keep the original. """ paragraphs = reply.split("\n") result_paragraphs = [] for para in paragraphs: if not para.strip(): result_paragraphs.append(para) continue # Split paragraph into sentences sentences = re.split(r'(?<=[.!])\s+', para.strip()) if not sentences: result_paragraphs.append(para) continue # Only check the FIRST 2 sentences of each paragraph for filler # This preserves analytical content in the middle/end kept_sentences = list(sentences) # start with all # Strip leading filler sentences (up to 2) stripped = 0 while stripped < min(2, len(kept_sentences)): sent_lower = kept_sentences[0].strip().lower() is_filler = False for pattern in FILLER_SENTENCES: if re.search(pattern, sent_lower): is_filler = True break if is_filler: kept_sentences.pop(0) stripped += 1 else: break if kept_sentences: rebuilt = " ".join(s.strip() for s in kept_sentences if s.strip()) if rebuilt and rebuilt[0].islower(): rebuilt = rebuilt[0].upper() + rebuilt[1:] result_paragraphs.append(rebuilt) result = "\n".join(result_paragraphs).strip() result = re.sub(r"\n{3,}", "\n\n", result) result = result.strip() # SAFEGUARD: If stripping made the response too short, keep the original # Count non-question content length non_question = re.sub(r'[^\n]*\?[^\n]*', '', result) if len(non_question.strip()) < 80 and len(reply.strip()) > len(result.strip()): return reply return result if result else reply def _break_wall_of_text(reply: str) -> str: """Break long wall-of-text paragraphs into readable chunks. If any paragraph is longer than 400 chars without a newline, split it into sentences and group them into shorter paragraphs. """ paragraphs = reply.split("\n\n") result = [] for para in paragraphs: if len(para) > 400 and "\n" not in para.strip(): # Split into sentences sentences = re.split(r'(?<=[.!?])\s+', para.strip()) if len(sentences) <= 2: result.append(para) continue # Group into chunks of 2-3 sentences chunks = [] current_chunk = [] for sent in sentences: current_chunk.append(sent) if len(current_chunk) >= 2: chunks.append(" ".join(current_chunk)) current_chunk = [] if current_chunk: chunks.append(" ".join(current_chunk)) result.append("\n\n".join(chunks)) else: result.append(para) return "\n\n".join(result) def _strip_observation_blocks(reply: str) -> str: """Remove 'Key Observations', 'Understanding the situation', and similar blocks with bullet lists. Keep only short acknowledgment and questions. Skip until we see a question line. """ lines = reply.split("\n") result = [] in_observation_block = False for line in lines: stripped = line.strip() # Start of an observation/recap block (bold header, no question) if re.match(r"^\*\*(?:key observations?|understanding the situation|key points?|current (?:understanding|situation)|summary)\s*", stripped, re.IGNORECASE) and "?" not in stripped: in_observation_block = True continue # While in block: skip bullets and any non-question line (transition sentences, etc.) if in_observation_block: if stripped.startswith("-") or stripped.startswith("*") or re.match(r"^[\d]+[.)]\s", stripped): continue if "?" not in stripped: continue in_observation_block = False result.append(line) text = "\n".join(result) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() def _extract_question_sentences(text: str) -> list[str]: """Get lines or sentence fragments that contain a question mark (normalized for comparison).""" questions = [] for line in text.split("\n"): part = line.strip().lower() if "?" in part and len(part) > 15: questions.append(part) return questions def _check_question_repetition(new_reply: str, previous_replies: list[str]) -> bool: """True if the new reply asks something very similar to a previous reply's question.""" new_qs = _extract_question_sentences(new_reply) if not new_qs: return False all_prev_qs = [] for prev in previous_replies: all_prev_qs.extend(_extract_question_sentences(prev)) if not all_prev_qs: return False for nq in new_qs: for pq in all_prev_qs: if SequenceMatcher(None, nq, pq).ratio() > 0.55: return True return False def _ensure_text_before_questions(reply: str) -> str: """If the reply starts with a question (no preceding acknowledgment), prepend a short line.""" stripped = reply.strip() if not stripped: return reply # Find first line that contains ? lines = stripped.split("\n") first_question_idx = None for i, line in enumerate(lines): if "?" in line and len(line.strip()) > 5: first_question_idx = i break if first_question_idx is None: return reply # Is there any non-empty, non-question text before the first question? text_before = "\n".join(lines[:first_question_idx]).strip() text_before = re.sub(r"\*\*[^*]+\*\*", "", text_before).strip() # remove bold markers for length check if len(text_before) < 50: # require at least ~2 sentences of context before first question lines.insert(0, "Thanks for sharing that.") return "\n".join(lines).strip() return reply def _format_questions(reply: str) -> str: """Ensure each question is on its own paragraph with blank lines around it. The frontend renders questions in styled boxes only when they're in their own

tag, which requires blank lines around them in the markdown. """ lines = reply.split("\n") result = [] for i, line in enumerate(lines): stripped = line.strip() # Check if this line contains a question mark (likely a question) if "?" in stripped and len(stripped) > 10: # Add blank line before if the previous line isn't blank if result and result[-1].strip(): result.append("") result.append(line) # Mark that we need a blank line after # (handled by checking if next non-empty line is also a question) if i + 1 < len(lines) and lines[i + 1].strip(): result.append("") else: result.append(line) text = "\n".join(result) # Clean up triple+ blank lines text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() # --------------------------------------------------------------------------- # Anti-hallucination checks # --------------------------------------------------------------------------- def _extract_user_text(messages: list[Message]) -> str: """Get all user messages combined as a single text block.""" return " ".join(msg.content.lower() for msg in messages if msg.role == "user") def _validate_understanding(understanding: str, messages: list[Message]) -> str: """Validate that the 'understanding' field doesn't contain fabricated facts. If the understanding mentions specific numbers, tools, or details not found in any user message, strip it back to a safer version. """ if not understanding: return understanding user_text = _extract_user_text(messages) if not user_text: return understanding # Check for specific numbers in understanding that aren't in user text numbers_in_understanding = re.findall(r'\b\d[\d,.]+\b', understanding) fabricated_numbers = [ n for n in numbers_in_understanding if n not in user_text ] # If more than half the specific numbers are fabricated, reset understanding if numbers_in_understanding and len(fabricated_numbers) > len(numbers_in_understanding) / 2: # Rebuild understanding from just user messages user_messages = [msg.content for msg in messages if msg.role == "user"] return "Client has said: " + " | ".join(user_messages[-5:]) return understanding def _check_hallucination(reply: str, messages: list[Message]) -> bool: """Check if the reply contains likely hallucinated specific claims. Returns True if hallucination is detected. """ user_text = _extract_user_text(messages) if not user_text: return False reply_lower = reply.lower() # Check for specific dollar/rupee amounts the user never mentioned amounts_in_reply = re.findall( r'(?:₹|rs\.?|inr|\$|usd)\s*[\d,.]+(?:\s*(?:lakh|crore|k|m|million|billion))?', reply_lower ) for amount in amounts_in_reply: # Extract just the number part to check num_part = re.findall(r'[\d,.]+', amount) if num_part and not any(n in user_text for n in num_part): return True # Check for percentage claims the user never mentioned percentages_in_reply = re.findall(r'\d+(?:\.\d+)?%', reply_lower) for pct in percentages_in_reply: num = pct.replace('%', '') if num not in user_text and pct not in user_text: return True return False # --------------------------------------------------------------------------- # Core processing # --------------------------------------------------------------------------- def _build_message_list(state: SessionState, user_message: str) -> list: """Build the message list with intelligence injection.""" turn_number = _get_turn_number(state.messages) deferral_count = _count_deferrals(state.messages) questions_asked = _count_bot_questions(state.messages) user_msg_lower = user_message.lower().strip() # --- SOLUTION SHORTCUT --- # If user says "give me solution" / "tell me what to do" etc., jump to final answer SOLUTION_SHORTCUT_PATTERNS = [ r"give me (?:the |a |)(?:solution|answer|recommendation|plan|advice)", r"tell me what to do", r"just (?:give me|tell me|skip to)", r"what (?:should|do) (?:i|we) do", r"(?:skip|jump) (?:to |)(?:the |)(?:solution|answer|recommendation|advice)", r"i (?:just |)(?:want|need) (?:the |a |your |)(?:solution|answer|recommendation|plan|advice)", ] if any(re.search(p, user_msg_lower) for p in SOLUTION_SHORTCUT_PATTERNS): state.phase = Phase.REFINEMENT state.confidence = max(state.confidence, 0.85) # --- NEW TOPIC RESET --- # If we're in SOLUTION/REFINEMENT and user mentions a new topic, reset to DISCOVERY END_CONVO_PATTERNS = [ r"^no$", r"^nope$", r"^nothing$", r"^that'?s? (?:all|it)$", r"^thanks?$", r"^thank you", r"^bye$", r"^ok$", r"^okay$", r"^i'm good$", r"^i'm good enough$", r"^i'm good enough with that$", r"^i'm good with that$", r"^i'm good with the solution$", r"^i'm good with the approach$", r"^i'm good with the recommendations$", r"^i'm good with the advice$", r"^i'm good with the plan$", r"^no thank you$", r"^no thank you very much$", r"^no thank you so much$", r"^no thank you very much$", r"^no problem$",r"^no thank u$", r"^nah", r"^it's alright$", r"^it's all good$", r"^it's all good with that$", r"^it's all good with the solution$", r"^it's all good with the approach$", r"^it's all good with the recommendations$", r"^it's all good with the advice$", r"^it's all good with the plan$", r"^it's all good with that$", ] is_end_of_convo = any(re.search(p, user_msg_lower) for p in END_CONVO_PATTERNS) if state.phase in (Phase.SOLUTION, Phase.REFINEMENT) and not is_end_of_convo: # Check if the last bot message had "anything else" — signals end of a topic last_bot_msg = "" for msg in reversed(state.messages): if msg.role == "assistant": last_bot_msg = msg.content.lower() break if "anything else" in last_bot_msg or "other challenges" in last_bot_msg: # User is bringing up a new topic — reset to discovery state.phase = Phase.DISCOVERY state.confidence = 0.1 state.topic_start_turn = turn_number # Reset the topic turn counter # Keep the conversation history but mark the new topic state.understanding = state.understanding + f"\n\n--- NEW TOPIC: {user_message} ---\n" if state.understanding else "" # --- HARD PHASE FORCING — require minimum 10 questions (or lots of client info) before wrap-up --- topic_turns = turn_number - state.topic_start_turn current_phase = state.phase current_confidence = state.confidence if not _can_wrap_up(questions_asked, state.messages) and current_phase in (Phase.SOLUTION, Phase.REFINEMENT): # Keep in EXPLORATION until we've asked at least 10 questions (or client gave lots of info) current_phase = Phase.EXPLORATION current_confidence = min(current_confidence, 0.5) state.phase = current_phase state.confidence = current_confidence elif topic_turns >= 12 and current_phase != Phase.REFINEMENT: current_phase = Phase.REFINEMENT current_confidence = max(current_confidence, 0.85) state.phase = current_phase state.confidence = current_confidence elif topic_turns >= 10 and current_phase in (Phase.DISCOVERY, Phase.EXPLORATION): current_phase = Phase.SOLUTION current_confidence = max(current_confidence, 0.65) state.phase = current_phase state.confidence = current_confidence # --- USE TURN-1 SPECIFIC PROMPT FOR FIRST MESSAGE --- if turn_number <= 1: from prompts import TURN_1_PROMPT phase_prompt = TURN_1_PROMPT else: phase_prompt = PHASE_PROMPTS[current_phase].format(confidence=current_confidence) # Inject current questionnaire section so the same intent leads to the same question order section_index = _current_section_index(questions_asked) if current_phase in (Phase.DISCOVERY, Phase.EXPLORATION, Phase.CONSTRAINTS): phase_prompt = phase_prompt.replace(CURRENT_SECTION_PLACEHOLDER, _build_section_block(section_index)) else: phase_prompt = phase_prompt.replace(CURRENT_SECTION_PLACEHOLDER, "(Wrapping up — ask if there is anything else.)") # Add understanding context (validated) if state.understanding: phase_prompt += ( f"\n\nCLIENT INFO (for YOUR reference only — do NOT copy this back to the client):" f"\n{state.understanding}\n" f"Do NOT list 'Key Points', 'Summary', or 'How we will address it'. Just ask your next question(s) in simple words.\n" ) # Remind LLM to use the full conversation (messages below) to adapt and improve if turn_number > 1: phase_prompt += "\nThe messages below are the full conversation. Use them to understand what the client said and what you already asked; avoid repeating, infer context, and choose the next 1–2 questions that add the most value.\n" # --- CONVERSATION STATS --- stats = f"\nTurn {turn_number} | {questions_asked} questions asked so far | {deferral_count} deferrals\n" sec_idx = _current_section_index(questions_asked) if not _can_wrap_up(questions_asked, state.messages): stats += f"Follow the questionnaire: you are on section {sec_idx + 1} of {len(QUESTIONNAIRE_SECTIONS)}. Do NOT ask 'Is there anything else?' until you have completed ALL 11 sections and asked at least {MIN_QUESTIONS_BEFORE_WRAP_UP} questions.\n" phase_prompt += stats # --- ANTI-REPEAT REMINDER --- Follow the current section only; do not repeat past topics if turn_number >= 4: phase_prompt += "\n⚠️ Do NOT repeat questions from earlier sections. Ask only from the current section above. If the client already answered or said 'I don't know', move to the next intent in the section or the next section.\n" # --- "I ALREADY ANSWERED" --- Do not re-ask; move to next intent or section if re.search(r"already (?:answered|said|mentioned|told)|i (?:already )?said (?:that )?(?:already )?|said (?:that )?already", user_msg_lower): phase_prompt += "\n⚠️ The client said they already answered. Acknowledge briefly (e.g. 'Got it.') and move to the NEXT intent or next section. Do NOT ask the same or a similar question again.\n" # --- DATA: do not repeat once they've said where data is (questionnaire Section 4 covers this) --- if _user_has_said_where_data_is(state.messages): phase_prompt += """ ⚠️ The client has ALREADY said where they keep their data (e.g. spreadsheet, database, notebook). Do NOT ask again about where it's stored, what it looks like, or where the file is. Continue with the current section or move on.\n""" # --- WRAP-UP GUIDANCE (questionnaire sections drive what to ask; this only says when to end) --- if turn_number > 1: sec_idx = _current_section_index(questions_asked) wrap_guidance = _build_wrap_up_guidance(questions_asked, state.messages, sec_idx) phase_prompt += wrap_guidance # --- DYNAMIC BEHAVIORAL OVERRIDES --- if deferral_count >= 2: phase_prompt += """ ⚠️ THE USER SAID "I DON'T KNOW" MULTIPLE TIMES. Move on — do not keep asking that. - Skip that topic. Ask about a DIFFERENT requirement (e.g. where they keep their data, or what they want in simple terms). - Do NOT pitch a solution or explain "how we will address it". Just gather what you can and then ask "Is there anything else?" """ # Question count limits only apply during diagnostic phases # In solution/final phases, the prompt itself handles question limits if state.phase in (Phase.DISCOVERY, Phase.EXPLORATION): if questions_asked >= 20: phase_prompt += "\n⚠️ STOP ASKING. Ask only: 'Is there anything else you'd like to add?' Nothing else. No proposal.\n" elif questions_asked >= 14: phase_prompt += "\nYou've asked enough. Ask: 'Is there anything else you'd like to add?' Only that — no solution or approach.\n" if turn_number >= 8: phase_prompt += "\n⚠️ Turn 8+. Ask 'Is there anything else you'd like to add?' Do NOT give a proposal or solution.\n" messages = [SystemMessage(content=phase_prompt)] # Conversation history (last 20 messages) for msg in state.messages[-20:]: if msg.role == "user": messages.append(HumanMessage(content=msg.content)) else: messages.append(SystemMessage(content=f"[Your previous response]: {msg.content}")) messages.append(HumanMessage(content=user_message)) return messages def _parse_llm_output(raw: str) -> dict[str, Any]: """Parse the JSON output from the LLM with robust fallback.""" text = raw.strip() # Strip markdown code fences json_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL) if json_match: text = json_match.group(1) else: brace_match = re.search(r"\{.*\}", text, re.DOTALL) if brace_match: text = brace_match.group(0) # Attempt 1: direct parse try: data = json.loads(text) return { "reply": str(data.get("reply", text)), "confidence": float(data.get("confidence", 0.1)), "understanding": str(data.get("understanding", "")), } except (json.JSONDecodeError, ValueError): pass # Attempt 2: sanitise newlines try: sanitised = re.sub(r'(?<=": ")(.*?)(?="[,\s]*\n?\s*"|\"\s*\})', lambda m: m.group(0).replace('\n', '\\n'), text, flags=re.DOTALL) data = json.loads(sanitised) return { "reply": str(data.get("reply", text)), "confidence": float(data.get("confidence", 0.1)), "understanding": str(data.get("understanding", "")), } except (json.JSONDecodeError, ValueError, TypeError): pass # Attempt 3: regex extraction reply = "" confidence = 0.1 understanding = "" reply_match = re.search( r'"reply"\s*:\s*"(.*?)"\s*,\s*"(?:confidence|understanding)', text, re.DOTALL, ) if reply_match: reply = reply_match.group(1).replace('\\n', '\n').replace('\\"', '"') conf_match = re.search(r'"confidence"\s*:\s*([\d.]+)', text) if conf_match: try: confidence = float(conf_match.group(1)) except ValueError: pass und_match = re.search( r'"understanding"\s*:\s*"(.*?)"\s*\}', text, re.DOTALL, ) if und_match: understanding = und_match.group(1).replace('\\n', '\n').replace('\\"', '"') if reply: return {"reply": reply, "confidence": confidence, "understanding": understanding} # Ultimate fallback fallback = text for pattern in [r'"reply"\s*:\s*"?', r'"confidence"\s*:.*', r'"understanding"\s*:.*', r'[{}]']: fallback = re.sub(pattern, '', fallback, flags=re.DOTALL) fallback = fallback.strip().strip('"').strip(',').strip() return { "reply": fallback if fallback else raw.strip(), "confidence": confidence, "understanding": understanding, } def _determine_phase(confidence: float) -> Phase: """Map confidence score to the appropriate consultation phase.""" for phase in reversed(PHASE_ORDER): low, _ = PHASE_THRESHOLDS[phase] if confidence >= low: return phase return Phase.DISCOVERY # Thank-you message when client says "no" to "Is there anything else?" THANK_YOU_MESSAGE = ( "Thank you. We have everything we need. Our team at Stacklogix will be in touch with you shortly." ) # User said "no" / "nothing else" / "bye" to "Is there anything else?" — respond with thank-you ONCE, do not ask again ANYTHING_ELSE_NO_PATTERNS = [ r"^\s*no\s*$", r"^\s*nope\s*$", r"^\s*nothing\s*$", r"^\s*that'?s? (?:all|it)\s*$", r"^\s*that is all\s*$", r"^\s*no\s*,?\s*thanks?\s*$", r"^\s*thanks?\s*,?\s*that'?s? all\s*$", r"^\s*(?:we'?re|i'?m) good\s*$", r"^\s*all good\s*$", r"^\s*not really\s*$", r"^\s*nothing else\s*$", r"^\s*no\s+not\s+at\s+all\s*$", r"^\s*not\s+at\s+all\s*$", r"^\s*bye\s*$", r"^\s*i'?m\s+good\s*$", r"^\s*that'?s\s+it\s*$", # "no thank you" / "no thank u" / "nope thanks" and variants r"^\s*no\s+thank\s+you\s*$", r"^\s*no\s+thank\s+u\s*$", r"^\s*no\s+thank\s+you\s+.*$", r"^\s*no\s+thank\s+u\s+.*$", # "no thank you very much" etc r"^\s*nope\s*,?\s*thanks?\s*$", r"^\s*nope\s*,?\s*thank\s+you\s*$", r"^\s*nope\s*,?\s*thank\s+u\s*$", r"^\s*that'?s\s+all\s*,?\s*thanks?\s*$", r"^\s*that'?s\s+all\s*,?\s*thank\s+you\s*$", # From END_CONVO_PATTERNS (standalone thanks, ok, i'm good variants, nah, it's alright/good) r"^\s*thanks?\s*$", r"^\s*thank\s+you\s*$", r"^\s*thank\s+you\s+.*$", r"^\s*ok\s*$", r"^\s*okay\s*$", r"^\s*i'?m\s+good\s+enough\s*$", r"^\s*i'?m\s+good\s+enough\s+with\s+that\s*$", r"^\s*i'?m\s+good\s+with\s+that\s*$", r"^\s*i'?m\s+good\s+with\s+the\s+solution\s*$", r"^\s*i'?m\s+good\s+with\s+the\s+approach\s*$", r"^\s*i'?m\s+good\s+with\s+the\s+recommendations\s*$", r"^\s*i'?m\s+good\s+with\s+the\s+advice\s*$", r"^\s*i'?m\s+good\s+with\s+the\s+plan\s*$", r"^\s*no\s+problem\s*$", r"^\s*nah\s*$", r"^\s*nah\s+.*$", r"^\s*it'?s\s+alright\s*$", r"^\s*it'?s\s+all\s+good\s*$", r"^\s*it'?s\s+all\s+good\s+with\s+that\s*$", r"^\s*it'?s\s+all\s+good\s+with\s+the\s+solution\s*$", r"^\s*it'?s\s+all\s+good\s+with\s+the\s+approach\s*$", r"^\s*it'?s\s+all\s+good\s+with\s+the\s+recommendations\s*$", r"^\s*it'?s\s+all\s+good\s+with\s+the\s+advice\s*$", r"^\s*it'?s\s+all\s+good\s+with\s+the\s+plan\s*$", ] def _last_bot_asked_anything_else(messages: list[Message]) -> bool: """True if the last assistant message asks 'anything else'.""" for msg in reversed(messages): if msg.role == "assistant": lower = msg.content.lower() return "anything else" in lower or "anything more" in lower return False def _user_said_no_to_anything_else(user_message: str) -> bool: """True if user is saying no / nothing else.""" stripped = user_message.strip().lower().rstrip(".,!?") return any(re.search(p, stripped, re.IGNORECASE) for p in ANYTHING_ELSE_NO_PATTERNS) # --------------------------------------------------------------------------- # Main graph execution # --------------------------------------------------------------------------- async def run_consultation(session_state: SessionState, user_message: str) -> GraphState: """Run one turn of the consultation with quality enforcement.""" # 0. If bot just asked "Is there anything else?" and user said no → thank you and end if session_state.messages and _last_bot_asked_anything_else(session_state.messages) and _user_said_no_to_anything_else(user_message): session_state.messages.append(Message(role="user", content=user_message)) session_state.messages.append(Message(role="assistant", content=THANK_YOU_MESSAGE)) return GraphState( session_state=session_state, user_message=user_message, assistant_reply=THANK_YOU_MESSAGE, new_confidence=1.0, new_understanding=session_state.understanding, new_phase=Phase.REFINEMENT, ) # 1. Build messages with intelligence injection messages = _build_message_list(session_state, user_message) turn_number = _get_turn_number(session_state.messages) # 2. Call LLM response = await llm.ainvoke(messages) parsed = _parse_llm_output(response.content) # 3. Anti-repetition check (content and questions) previous_replies = _get_previous_bot_responses(session_state.messages) overlap = _check_repetition(parsed["reply"], previous_replies) question_repeat = _check_question_repetition(parsed["reply"], previous_replies) if (overlap > 0.35 or question_repeat) and previous_replies: anti_repeat_msg = SystemMessage(content=( "⚠️ You are REPEATING yourself. Either your content or your questions are too similar to what you already asked.\n" "The client has already answered (or said 'I don't know') to topics you've covered. Do NOT ask about those again.\n" "Reply with ONLY 1-2 short sentences, then ask about something you have NOT asked yet (from the current questionnaire section), OR ask: 'Is there anything else you'd like to add?'\n" "No 'Key Observations' or bullet lists. Just a short acknowledgment and one new question." )) messages.insert(-1, anti_repeat_msg) response = await llm.ainvoke(messages) parsed = _parse_llm_output(response.content) # 3.5. HARD question limit enforcement — max 2 questions reply = parsed["reply"] question_count = reply.count("?") if question_count > 2: # Keep only the first 2 questions, remove the rest parts = reply.split("?") if len(parts) > 3: # Keep everything up to and including the 2nd "?" kept = "?".join(parts[:2]) + "?" # Check if there's meaningful non-question content after remaining = "?".join(parts[2:]) remaining_lines = remaining.split("\n") non_question_lines = [ line for line in remaining_lines if line.strip() and "?" not in line ] if non_question_lines: kept += "\n" + "\n".join(non_question_lines) parsed["reply"] = kept.strip() # 3.6. Anti-hallucination check if _check_hallucination(parsed["reply"], session_state.messages): anti_hallucinate_msg = SystemMessage(content=( "⚠️ YOUR RESPONSE CONTAINS SPECIFIC NUMBERS, AMOUNTS, OR PERCENTAGES " "THAT THE CLIENT NEVER MENTIONED. This is hallucination.\n" "REWRITE your response:\n" "- Remove ALL specific numbers/amounts that the client did not explicitly state\n" "- If you need numbers, ASK the client instead of guessing\n" "- Only reference facts the client actually told you" )) messages.insert(-1, anti_hallucinate_msg) response = await llm.ainvoke(messages) parsed = _parse_llm_output(response.content) # 3.7. Minimum response length check — replies must be substantive # (Run BEFORE bad question check so expanded responses get checked too) reply_text = parsed["reply"] non_q_lines = [line for line in reply_text.split("\n") if "?" not in line] non_q_content = "\n".join(non_q_lines).strip() # SOLUTION/REFINEMENT = only "Is there anything else?" — keep it short is_final_phase = session_state.phase in (Phase.SOLUTION, Phase.REFINEMENT) min_length = 80 if is_final_phase else 120 # require at least 2-3 sentences before questions if len(non_q_content) < min_length: if is_final_phase: expand_msg = SystemMessage(content=( "⚠️ You must ONLY ask: 'Is there anything else you'd like to add?'\n" "Do NOT give a summary, proposal, or 'how we will address it'. Requirement gathering only. Short reply with that one question." )) else: expand_msg = SystemMessage(content=( "⚠️ Your response is too short. You MUST include 2-3 sentences of acknowledgment before your question(s). " "Briefly reflect what they said and why it matters for understanding their needs. Then ask your question(s). " "Do NOT add 'Key Observations' or bullet lists. No single-line intros." )) messages.insert(-1, expand_msg) response = await llm.ainvoke(messages) parsed = _parse_llm_output(response.content) # 3.8. Bad question pattern check (self-solving, cause-suggesting) # Runs AFTER length expansion so expanded responses are also checked correction = _detect_bad_question_patterns(parsed["reply"]) if correction: correction_msg = SystemMessage(content=correction) messages.insert(-1, correction_msg) response = await llm.ainvoke(messages) parsed = _parse_llm_output(response.content) # 3.9. Strip leading filler sentences (no LLM call) parsed["reply"] = _strip_filler_sentences(parsed["reply"]) # 3.10. Strip "Key Observations" and similar observation/recap blocks parsed["reply"] = _strip_observation_blocks(parsed["reply"]) # 3.10b. Ensure there is some text before the first question (no bare questions) parsed["reply"] = _ensure_text_before_questions(parsed["reply"]) # 3.11. Break wall-of-text responses into readable paragraphs parsed["reply"] = _break_wall_of_text(parsed["reply"]) # 3.12. Ensure questions are properly formatted for frontend box styling parsed["reply"] = _format_questions(parsed["reply"]) # 4. Confidence progression MAX_INCREASE = 0.15 MIN_INCREASE = 0.06 llm_confidence = min(parsed["confidence"], 1.0) if llm_confidence > session_state.confidence: new_confidence = session_state.confidence + min( llm_confidence - session_state.confidence, MAX_INCREASE ) else: new_confidence = min(session_state.confidence + MIN_INCREASE, 1.0) # 5. Route phase new_phase = _determine_phase(new_confidence) # Phase forcing — require at least 10 questions (or lots of client info) before wrap-up topic_turns = turn_number - session_state.topic_start_turn questions_after_this_turn = _count_bot_questions(session_state.messages) + parsed["reply"].count("?") messages_incl_current = session_state.messages + [Message(role="user", content=user_message)] if not _can_wrap_up(questions_after_this_turn, messages_incl_current) and new_phase in (Phase.SOLUTION, Phase.REFINEMENT): new_phase = Phase.EXPLORATION new_confidence = min(new_confidence, 0.5) elif topic_turns >= 12 and _can_wrap_up(questions_after_this_turn, messages_incl_current): new_phase = Phase.REFINEMENT new_confidence = max(new_confidence, 0.85) elif topic_turns >= 10 and _can_wrap_up(questions_after_this_turn, messages_incl_current) and new_phase in (Phase.DISCOVERY, Phase.EXPLORATION): new_phase = Phase.SOLUTION new_confidence = max(new_confidence, 0.65) # 6. Update understanding (with validation) raw_understanding = parsed["understanding"] if parsed["understanding"] else session_state.understanding new_understanding = _validate_understanding(raw_understanding, session_state.messages) # 7. Update session state session_state.messages.append(Message(role="user", content=user_message)) session_state.messages.append(Message(role="assistant", content=parsed["reply"])) session_state.confidence = new_confidence session_state.phase = new_phase session_state.understanding = new_understanding return GraphState( session_state=session_state, user_message=user_message, assistant_reply=parsed["reply"], new_confidence=new_confidence, new_understanding=new_understanding, new_phase=new_phase, )