""" guardrails.py ============= Cross-cutting guardrails for the Morningstar RAG Pipeline. Three layers of protection: Layer 1 — Input Guardrails (before retrieval) Validates user queries before they touch the vector store or LLM. Catches: prompt injection, out-of-scope questions, PII, abusive queries, queries that are too short/long to be meaningful. Layer 2 — Retrieval Guardrails (after retrieval, before generation) Checks that retrieved chunks are relevant enough to answer the question. Catches: low-confidence retrievals that would force the LLM to hallucinate. Layer 3 — Output Guardrails (after generation) Validates the LLM response before returning it to the user. Catches: empty responses, uncertainty phrases in financial answers, missing grounding (delegates to verifier.py), missing disclaimer. No external API required — all checks are regex + threshold-based. Usage ----- from src.guardrails import RAGGuardrails guards = RAGGuardrails() # Check query before retrieval input_result = guards.check_input("What was Apple's revenue in FY2024?") if not input_result.passed: print(input_result.reason) else: # retrieve + generate ... # Check retrieval relevance retrieval_result = guards.check_retrieval(chunks) if not retrieval_result.passed: print(retrieval_result.reason) else: # generate ... # Check output output_result = guards.check_output(answer, chunks) final_answer = output_result.safe_answer Full pipeline wrapper --------------------- result = guards.run( query = "Apple gross margin FY2024?", chunks = retrieved_chunks, answer = llm_answer, ) print(result.final_answer) print(result.report()) """ import re import logging from dataclasses import dataclass, field from typing import Optional log = logging.getLogger(__name__) # ── Financial disclaimer appended to advisory-sounding answers ───────────── DISCLAIMER = ( "\n\n---\n" "_Disclaimer: This answer is generated from Morningstar research reports " "and Apple SEC filings for informational purposes only. It does not " "constitute investment advice. Always consult a qualified financial " "adviser before making investment decisions._" ) # ══════════════════════════════════════════════════════════════════════════════ # RESULT DATACLASS # ══════════════════════════════════════════════════════════════════════════════ @dataclass class GuardrailResult: """Outcome of a single guardrail check.""" layer : str # "input" | "retrieval" | "output" check : str # name of the specific check passed : bool reason : str = "" # human-readable explanation when not passed severity : str = "block" # "block" | "warn" — block stops pipeline, warn continues @dataclass class PipelineGuardrailResult: """Aggregated result from all guardrail layers.""" query : str input_results : list[GuardrailResult] = field(default_factory=list) retrieval_results: list[GuardrailResult] = field(default_factory=list) output_results: list[GuardrailResult] = field(default_factory=list) final_answer : str = "" blocked : bool = False block_reason : str = "" def report(self) -> str: all_results = self.input_results + self.retrieval_results + self.output_results lines = [ "=" * 65, f" Guardrail Report", "=" * 65, f" Query : {self.query[:80]}", f" Blocked : {self.blocked}" + (f" ({self.block_reason})" if self.blocked else ""), "-" * 65, ] for r in all_results: icon = "✓" if r.passed else ("✗" if r.severity == "block" else "⚠") lines.append( f" {icon} [{r.layer:10s}] [{r.check:30s}] " f"{'PASS' if r.passed else r.severity.upper()}" + (f" — {r.reason}" if not r.passed else "") ) lines.append("=" * 65) return "\n".join(lines) # ══════════════════════════════════════════════════════════════════════════════ # LAYER 1 — INPUT GUARDRAILS # ══════════════════════════════════════════════════════════════════════════════ # Prompt injection patterns — attempts to override the system prompt _INJECTION_PATTERNS = [ r"ignore\s+(all\s+)?(previous|prior|above)\s+instructions", r"you\s+are\s+now\s+(a|an)", r"disregard\s+(your|the)\s+(system\s+)?prompt", r"forget\s+(everything|all)\s+(you('ve)?\s+been\s+told|above)", r"(pretend|act|behave)\s+(as\s+if\s+you('re)?|like\s+you('re)?)\s+(a|an)", r"do\s+not\s+follow\s+your\s+(instructions|guidelines|rules)", r"system\s*:\s*you\s+are", r"<\s*system\s*>", r"\[INST\]", r"###\s*(system|instruction)", ] _INJECTION_RE = re.compile("|".join(_INJECTION_PATTERNS), re.IGNORECASE) # PII patterns _PII_PATTERNS = [ r"\b\d{3}-\d{2}-\d{4}\b", # SSN r"\b\d{16}\b", # credit card (16 digits) r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z]{2,}\b",# email r"\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b", # US phone r"\bpassword\s*[:=]\s*\S+", # password=... ] _PII_RE = re.compile("|".join(_PII_PATTERNS), re.IGNORECASE) # In-scope keywords — query must relate to finance / Apple / Morningstar _IN_SCOPE_KEYWORDS = { # Company "apple", "aapl", "tim cook", "morningstar", # Document types "10-k", "10-q", "8-k", "annual report", "quarterly report", "sec", "edgar", "filing", # Financial terms "revenue", "net sales", "earnings", "eps", "profit", "loss", "income", "margin", "gross margin", "operating", "ebitda", "cash flow", "balance sheet", "assets", "liabilities", "equity", "debt", "dividend", "buyback", "repurchase", "capex", "capital expenditure", "guidance", "forecast", "outlook", # Products/segments "iphone", "ipad", "mac", "services", "wearables", "appstore", "app store", "vision pro", "airpods", "watch", # Analysis "moat", "valuation", "fair value", "risk", "competition", "competitive", "growth", "segment", "geographic", "americas", "china", "europe", "fiscal year", "fy", "quarter", "q1", "q2", "q3", "q4", } # Hard-block topics — completely out of scope for a financial RAG assistant _BLOCKED_TOPICS = [ r"\b(hack|exploit|malware|ransomware|phishing|ddos)\b", r"\b(bomb|weapon|drug|illegal)\b", r"\b(password|credential|login).{0,20}(steal|crack|bypass)\b", r"\bself.harm\b", ] _BLOCKED_RE = re.compile("|".join(_BLOCKED_TOPICS), re.IGNORECASE) class InputGuardrails: """Validates user queries before they reach the retriever.""" def __init__( self, min_query_len : int = 10, max_query_len : int = 500, scope_threshold : int = 1, # min in-scope keywords required ): self.min_query_len = min_query_len self.max_query_len = max_query_len self.scope_threshold = scope_threshold def check_length(self, query: str) -> GuardrailResult: n = len(query.strip()) if n < self.min_query_len: return GuardrailResult( layer="input", check="query_length", passed=False, reason=f"Query too short ({n} chars). Please ask a complete question.", severity="block", ) if n > self.max_query_len: return GuardrailResult( layer="input", check="query_length", passed=False, reason=f"Query too long ({n} chars, max {self.max_query_len}). Please shorten.", severity="block", ) return GuardrailResult(layer="input", check="query_length", passed=True) def check_injection(self, query: str) -> GuardrailResult: if _INJECTION_RE.search(query): return GuardrailResult( layer="input", check="prompt_injection", passed=False, reason="Query contains prompt injection patterns and cannot be processed.", severity="block", ) return GuardrailResult(layer="input", check="prompt_injection", passed=True) def check_pii(self, query: str) -> GuardrailResult: match = _PII_RE.search(query) if match: return GuardrailResult( layer="input", check="pii_detection", passed=False, reason="Query appears to contain personal information (PII). " "Please remove sensitive data before querying.", severity="block", ) return GuardrailResult(layer="input", check="pii_detection", passed=True) def check_blocked_topics(self, query: str) -> GuardrailResult: if _BLOCKED_RE.search(query): return GuardrailResult( layer="input", check="blocked_topics", passed=False, reason="Query contains content outside the scope of this financial assistant.", severity="block", ) return GuardrailResult(layer="input", check="blocked_topics", passed=True) def check_scope(self, query: str) -> GuardrailResult: q_lower = query.lower() hits = sum(1 for kw in _IN_SCOPE_KEYWORDS if kw in q_lower) if hits < self.scope_threshold: return GuardrailResult( layer="input", check="scope", passed=False, reason=( "Query does not appear to be related to Apple financials or " "Morningstar research. This pipeline only covers Apple SEC filings " "(10-K, 10-Q, 8-K) and Morningstar equity research." ), severity="warn", # warn not block — user may rephrase ) return GuardrailResult(layer="input", check="scope", passed=True) def run(self, query: str) -> list[GuardrailResult]: return [ self.check_length(query), self.check_injection(query), self.check_pii(query), self.check_blocked_topics(query), self.check_scope(query), ] # ══════════════════════════════════════════════════════════════════════════════ # LAYER 2 — RETRIEVAL GUARDRAILS # ══════════════════════════════════════════════════════════════════════════════ class RetrievalGuardrails: """ Checks retrieved chunks before passing them to the LLM. Prevents the LLM from hallucinating when there is no relevant context. """ def __init__( self, min_top_score : float = 0.30, # cosine sim or rerank score min_chunks : int = 1, ): self.min_top_score = min_top_score self.min_chunks = min_chunks def check_relevance(self, chunks: list[dict]) -> GuardrailResult: if not chunks or len(chunks) < self.min_chunks: return GuardrailResult( layer="retrieval", check="min_chunks", passed=False, reason=f"No relevant chunks retrieved. " "The question may be outside the scope of the indexed documents.", severity="block", ) top_score = chunks[0].get("score", 0) if top_score < self.min_top_score: return GuardrailResult( layer="retrieval", check="relevance_score", passed=False, reason=( f"Top retrieval score {top_score:.3f} is below threshold " f"{self.min_top_score}. Retrieved context is likely irrelevant — " "generating an answer could produce hallucinations." ), severity="block", ) return GuardrailResult(layer="retrieval", check="relevance_score", passed=True) def check_diversity(self, chunks: list[dict]) -> GuardrailResult: """Warn if all chunks come from the same single document.""" if len(chunks) < 2: return GuardrailResult( layer="retrieval", check="source_diversity", passed=True ) doc_ids = [c.get("metadata", {}).get("doc_id", "") for c in chunks] unique = len(set(doc_ids)) if unique == 1 and len(chunks) >= 3: return GuardrailResult( layer="retrieval", check="source_diversity", passed=False, reason=( f"All {len(chunks)} retrieved chunks are from the same document. " "Answer may lack cross-document perspective." ), severity="warn", ) return GuardrailResult(layer="retrieval", check="source_diversity", passed=True) def run(self, chunks: list[dict]) -> list[GuardrailResult]: return [ self.check_relevance(chunks), self.check_diversity(chunks), ] # ══════════════════════════════════════════════════════════════════════════════ # LAYER 3 — OUTPUT GUARDRAILS # ══════════════════════════════════════════════════════════════════════════════ # Phrases that indicate the LLM is guessing rather than citing _UNCERTAINTY_PHRASES = [ r"\bi think\b", r"\bi believe\b", r"\bprobably\b", r"\bmaybe\b", r"\bmight be\b", r"\bi('m| am) not sure\b", r"\bI('m| am) guessing\b", r"\bI('m| am) not certain\b", r"\bcould be\b", r"\bI('m| am) unsure\b", ] _UNCERTAINTY_RE = re.compile("|".join(_UNCERTAINTY_PHRASES), re.IGNORECASE) # Phrases that signal investment advice (trigger disclaimer) _ADVISORY_PHRASES = [ r"\b(should|recommend|advise|suggest)\s+(you\s+)?(buy|sell|invest|hold)\b", r"\b(good|great|strong|excellent)\s+(buy|investment|opportunity)\b", r"\bprice\s+target\b", r"\bupside\s+(potential|of)\b", ] _ADVISORY_RE = re.compile("|".join(_ADVISORY_PHRASES), re.IGNORECASE) class OutputGuardrails: """Validates and optionally transforms the LLM answer before returning it.""" def __init__( self, min_answer_len : int = 20, grounding_threshold : float = 0.50, add_disclaimer : bool = True, ): self.min_answer_len = min_answer_len self.grounding_threshold = grounding_threshold self.add_disclaimer = add_disclaimer def check_empty(self, answer: str) -> GuardrailResult: if not answer or len(answer.strip()) < self.min_answer_len: return GuardrailResult( layer="output", check="empty_response", passed=False, reason="LLM returned an empty or trivially short answer.", severity="block", ) return GuardrailResult(layer="output", check="empty_response", passed=True) def check_uncertainty(self, answer: str) -> GuardrailResult: """Flag answers where the LLM expresses uncertainty — shouldn't happen with RAG.""" match = _UNCERTAINTY_RE.search(answer) if match: phrase = match.group(0) return GuardrailResult( layer="output", check="uncertainty_phrases", passed=False, reason=( f"Answer contains uncertainty phrase: '{phrase}'. " "A grounded RAG answer should only state facts present in the context." ), severity="warn", ) return GuardrailResult(layer="output", check="uncertainty_phrases", passed=True) def check_grounding( self, answer: str, chunks: list[dict] ) -> GuardrailResult: """Delegate to AnswerVerifier for NLI-based grounding check.""" try: from src.verifier import AnswerVerifier verifier = AnswerVerifier(entail_threshold=self.grounding_threshold) result = verifier.verify(answer, chunks) score = result["grounding_score"] if score < self.grounding_threshold: return GuardrailResult( layer="output", check="grounding_score", passed=False, reason=( f"Answer grounding score {score:.0%} is below threshold " f"{self.grounding_threshold:.0%}. " f"({result['contradicted']} contradicted, " f"{result['unverified']} unverified sentences)" ), severity="block", ) return GuardrailResult( layer="output", check="grounding_score", passed=True, reason=f"Grounding score: {score:.0%}", ) except Exception as e: log.warning(f"Grounding check skipped: {e}") return GuardrailResult( layer="output", check="grounding_score", passed=True, reason="Grounding check unavailable — skipped.", ) def apply_disclaimer(self, answer: str) -> str: """Append financial disclaimer if answer sounds advisory.""" if self.add_disclaimer and _ADVISORY_RE.search(answer): return answer + DISCLAIMER return answer def run( self, answer : str, chunks : list[dict], run_grounding : bool = True, ) -> tuple[list[GuardrailResult], str]: """ Run all output checks. Returns (results, safe_answer). `safe_answer` has disclaimer appended if needed. """ results = [self.check_empty(answer), self.check_uncertainty(answer)] if run_grounding: results.append(self.check_grounding(answer, chunks)) safe_answer = self.apply_disclaimer(answer) return results, safe_answer # ══════════════════════════════════════════════════════════════════════════════ # COMBINED GUARDRAIL RUNNER # ══════════════════════════════════════════════════════════════════════════════ class RAGGuardrails: """ Runs all three guardrail layers in sequence. Attributes ---------- input_guards : InputGuardrails retrieval_guards : RetrievalGuardrails output_guards : OutputGuardrails Usage ----- guards = RAGGuardrails() # Layer 1 — before retrieval ok, reason = guards.gate_input(query) if not ok: return reason chunks = retriever.retrieve(query, ...) # Layer 2 — after retrieval ok, reason = guards.gate_retrieval(chunks) if not ok: return reason answer = llm_chain.invoke(...) # Layer 3 — after generation final_answer, warnings = guards.gate_output(answer, chunks) return final_answer Or use the convenience wrapper: result = guards.run(query=query, chunks=chunks, answer=answer) print(result.final_answer) print(result.report()) """ def __init__( self, min_query_len : int = 10, max_query_len : int = 500, scope_threshold : int = 1, min_retrieval_score : float = 0.30, grounding_threshold : float = 0.50, add_disclaimer : bool = True, run_grounding : bool = True, ): self.input_guards = InputGuardrails( min_query_len = min_query_len, max_query_len = max_query_len, scope_threshold = scope_threshold, ) self.retrieval_guards = RetrievalGuardrails( min_top_score = min_retrieval_score, ) self.output_guards = OutputGuardrails( grounding_threshold = grounding_threshold, add_disclaimer = add_disclaimer, ) self.run_grounding = run_grounding # ── Convenience gates ─────────────────────────────────────────────────── def gate_input(self, query: str) -> tuple[bool, str]: """ Returns (True, "") if query passes all input checks. Returns (False, reason) on the first blocking failure. """ for r in self.input_guards.run(query): if not r.passed and r.severity == "block": return False, r.reason return True, "" def gate_retrieval(self, chunks: list[dict]) -> tuple[bool, str]: """ Returns (True, "") if retrieved chunks are relevant enough. Returns (False, reason) if retrieval quality is too low. """ for r in self.retrieval_guards.run(chunks): if not r.passed and r.severity == "block": return False, r.reason return True, "" def gate_output( self, answer: str, chunks: list[dict] ) -> tuple[str, list[str]]: """ Returns (safe_answer, [warning_messages]). `safe_answer` has disclaimer appended if needed. Raises ValueError if a blocking output check fails. """ results, safe_answer = self.output_guards.run( answer, chunks, run_grounding=self.run_grounding ) warnings = [] for r in results: if not r.passed: if r.severity == "block": raise ValueError(f"Output guardrail blocked: {r.reason}") else: warnings.append(r.reason) return safe_answer, warnings # ── Full pipeline run ─────────────────────────────────────────────────── def run( self, query : str, chunks : list[dict], answer : str, ) -> PipelineGuardrailResult: """ Run all three layers and return a PipelineGuardrailResult. Does NOT raise on failure — check result.blocked instead. """ pr = PipelineGuardrailResult(query=query) # Layer 1 pr.input_results = self.input_guards.run(query) for r in pr.input_results: if not r.passed and r.severity == "block": pr.blocked = True pr.block_reason = f"[input/{r.check}] {r.reason}" pr.final_answer = f"Query blocked: {r.reason}" return pr # Layer 2 pr.retrieval_results = self.retrieval_guards.run(chunks) for r in pr.retrieval_results: if not r.passed and r.severity == "block": pr.blocked = True pr.block_reason = f"[retrieval/{r.check}] {r.reason}" pr.final_answer = ( "The provided documents do not contain enough information " "to answer this question reliably." ) return pr # Layer 3 output_results, safe_answer = self.output_guards.run( answer, chunks, run_grounding=self.run_grounding ) pr.output_results = output_results for r in pr.output_results: if not r.passed and r.severity == "block": pr.blocked = True pr.block_reason = f"[output/{r.check}] {r.reason}" pr.final_answer = ( "The generated answer could not be verified against the " "source documents. Please rephrase your question." ) return pr pr.final_answer = safe_answer return pr