Spaces:
Running
Running
| """ | |
| guardrails.py | |
| ============= | |
| Cross-cutting guardrails for the Morningstar RAG Pipeline. | |
| Three layers of protection: | |
| Layer 1 β Input Guardrails (before retrieval) | |
| Validates user queries before they touch the vector store or LLM. | |
| Catches: prompt injection, out-of-scope questions, PII, abusive queries, | |
| queries that are too short/long to be meaningful. | |
| Layer 2 β Retrieval Guardrails (after retrieval, before generation) | |
| Checks that retrieved chunks are relevant enough to answer the question. | |
| Catches: low-confidence retrievals that would force the LLM to hallucinate. | |
| Layer 3 β Output Guardrails (after generation) | |
| Validates the LLM response before returning it to the user. | |
| Catches: empty responses, uncertainty phrases in financial answers, | |
| missing grounding (delegates to verifier.py), missing disclaimer. | |
| No external API required β all checks are regex + threshold-based. | |
| Usage | |
| ----- | |
| from src.guardrails import RAGGuardrails | |
| guards = RAGGuardrails() | |
| # Check query before retrieval | |
| input_result = guards.check_input("What was Apple's revenue in FY2024?") | |
| if not input_result.passed: | |
| print(input_result.reason) | |
| else: | |
| # retrieve + generate ... | |
| # Check retrieval relevance | |
| retrieval_result = guards.check_retrieval(chunks) | |
| if not retrieval_result.passed: | |
| print(retrieval_result.reason) | |
| else: | |
| # generate ... | |
| # Check output | |
| output_result = guards.check_output(answer, chunks) | |
| final_answer = output_result.safe_answer | |
| Full pipeline wrapper | |
| --------------------- | |
| result = guards.run( | |
| query = "Apple gross margin FY2024?", | |
| chunks = retrieved_chunks, | |
| answer = llm_answer, | |
| ) | |
| print(result.final_answer) | |
| print(result.report()) | |
| """ | |
| import re | |
| import logging | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| log = logging.getLogger(__name__) | |
| # ββ Financial disclaimer appended to advisory-sounding answers βββββββββββββ | |
| DISCLAIMER = ( | |
| "\n\n---\n" | |
| "_Disclaimer: This answer is generated from Morningstar research reports " | |
| "and Apple SEC filings for informational purposes only. It does not " | |
| "constitute investment advice. Always consult a qualified financial " | |
| "adviser before making investment decisions._" | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # RESULT DATACLASS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class GuardrailResult: | |
| """Outcome of a single guardrail check.""" | |
| layer : str # "input" | "retrieval" | "output" | |
| check : str # name of the specific check | |
| passed : bool | |
| reason : str = "" # human-readable explanation when not passed | |
| severity : str = "block" # "block" | "warn" β block stops pipeline, warn continues | |
| class PipelineGuardrailResult: | |
| """Aggregated result from all guardrail layers.""" | |
| query : str | |
| input_results : list[GuardrailResult] = field(default_factory=list) | |
| retrieval_results: list[GuardrailResult] = field(default_factory=list) | |
| output_results: list[GuardrailResult] = field(default_factory=list) | |
| final_answer : str = "" | |
| blocked : bool = False | |
| block_reason : str = "" | |
| def report(self) -> str: | |
| all_results = self.input_results + self.retrieval_results + self.output_results | |
| lines = [ | |
| "=" * 65, | |
| f" Guardrail Report", | |
| "=" * 65, | |
| f" Query : {self.query[:80]}", | |
| f" Blocked : {self.blocked}" + (f" ({self.block_reason})" if self.blocked else ""), | |
| "-" * 65, | |
| ] | |
| for r in all_results: | |
| icon = "β" if r.passed else ("β" if r.severity == "block" else "β ") | |
| lines.append( | |
| f" {icon} [{r.layer:10s}] [{r.check:30s}] " | |
| f"{'PASS' if r.passed else r.severity.upper()}" | |
| + (f" β {r.reason}" if not r.passed else "") | |
| ) | |
| lines.append("=" * 65) | |
| return "\n".join(lines) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LAYER 1 β INPUT GUARDRAILS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Prompt injection patterns β attempts to override the system prompt | |
| _INJECTION_PATTERNS = [ | |
| r"ignore\s+(all\s+)?(previous|prior|above)\s+instructions", | |
| r"you\s+are\s+now\s+(a|an)", | |
| r"disregard\s+(your|the)\s+(system\s+)?prompt", | |
| r"forget\s+(everything|all)\s+(you('ve)?\s+been\s+told|above)", | |
| r"(pretend|act|behave)\s+(as\s+if\s+you('re)?|like\s+you('re)?)\s+(a|an)", | |
| r"do\s+not\s+follow\s+your\s+(instructions|guidelines|rules)", | |
| r"system\s*:\s*you\s+are", | |
| r"<\s*system\s*>", | |
| r"\[INST\]", | |
| r"###\s*(system|instruction)", | |
| ] | |
| _INJECTION_RE = re.compile("|".join(_INJECTION_PATTERNS), re.IGNORECASE) | |
| # PII patterns | |
| _PII_PATTERNS = [ | |
| r"\b\d{3}-\d{2}-\d{4}\b", # SSN | |
| r"\b\d{16}\b", # credit card (16 digits) | |
| r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z]{2,}\b",# email | |
| r"\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b", # US phone | |
| r"\bpassword\s*[:=]\s*\S+", # password=... | |
| ] | |
| _PII_RE = re.compile("|".join(_PII_PATTERNS), re.IGNORECASE) | |
| # In-scope keywords β query must relate to finance / Apple / Morningstar | |
| _IN_SCOPE_KEYWORDS = { | |
| # Company | |
| "apple", "aapl", "tim cook", "morningstar", | |
| # Document types | |
| "10-k", "10-q", "8-k", "annual report", "quarterly report", | |
| "sec", "edgar", "filing", | |
| # Financial terms | |
| "revenue", "net sales", "earnings", "eps", "profit", "loss", "income", | |
| "margin", "gross margin", "operating", "ebitda", "cash flow", | |
| "balance sheet", "assets", "liabilities", "equity", "debt", | |
| "dividend", "buyback", "repurchase", "capex", "capital expenditure", | |
| "guidance", "forecast", "outlook", | |
| # Products/segments | |
| "iphone", "ipad", "mac", "services", "wearables", "appstore", "app store", | |
| "vision pro", "airpods", "watch", | |
| # Analysis | |
| "moat", "valuation", "fair value", "risk", "competition", "competitive", | |
| "growth", "segment", "geographic", "americas", "china", "europe", | |
| "fiscal year", "fy", "quarter", "q1", "q2", "q3", "q4", | |
| } | |
| # Hard-block topics β completely out of scope for a financial RAG assistant | |
| _BLOCKED_TOPICS = [ | |
| r"\b(hack|exploit|malware|ransomware|phishing|ddos)\b", | |
| r"\b(bomb|weapon|drug|illegal)\b", | |
| r"\b(password|credential|login).{0,20}(steal|crack|bypass)\b", | |
| r"\bself.harm\b", | |
| ] | |
| _BLOCKED_RE = re.compile("|".join(_BLOCKED_TOPICS), re.IGNORECASE) | |
| class InputGuardrails: | |
| """Validates user queries before they reach the retriever.""" | |
| def __init__( | |
| self, | |
| min_query_len : int = 10, | |
| max_query_len : int = 500, | |
| scope_threshold : int = 1, # min in-scope keywords required | |
| ): | |
| self.min_query_len = min_query_len | |
| self.max_query_len = max_query_len | |
| self.scope_threshold = scope_threshold | |
| def check_length(self, query: str) -> GuardrailResult: | |
| n = len(query.strip()) | |
| if n < self.min_query_len: | |
| return GuardrailResult( | |
| layer="input", check="query_length", passed=False, | |
| reason=f"Query too short ({n} chars). Please ask a complete question.", | |
| severity="block", | |
| ) | |
| if n > self.max_query_len: | |
| return GuardrailResult( | |
| layer="input", check="query_length", passed=False, | |
| reason=f"Query too long ({n} chars, max {self.max_query_len}). Please shorten.", | |
| severity="block", | |
| ) | |
| return GuardrailResult(layer="input", check="query_length", passed=True) | |
| def check_injection(self, query: str) -> GuardrailResult: | |
| if _INJECTION_RE.search(query): | |
| return GuardrailResult( | |
| layer="input", check="prompt_injection", passed=False, | |
| reason="Query contains prompt injection patterns and cannot be processed.", | |
| severity="block", | |
| ) | |
| return GuardrailResult(layer="input", check="prompt_injection", passed=True) | |
| def check_pii(self, query: str) -> GuardrailResult: | |
| match = _PII_RE.search(query) | |
| if match: | |
| return GuardrailResult( | |
| layer="input", check="pii_detection", passed=False, | |
| reason="Query appears to contain personal information (PII). " | |
| "Please remove sensitive data before querying.", | |
| severity="block", | |
| ) | |
| return GuardrailResult(layer="input", check="pii_detection", passed=True) | |
| def check_blocked_topics(self, query: str) -> GuardrailResult: | |
| if _BLOCKED_RE.search(query): | |
| return GuardrailResult( | |
| layer="input", check="blocked_topics", passed=False, | |
| reason="Query contains content outside the scope of this financial assistant.", | |
| severity="block", | |
| ) | |
| return GuardrailResult(layer="input", check="blocked_topics", passed=True) | |
| def check_scope(self, query: str) -> GuardrailResult: | |
| q_lower = query.lower() | |
| hits = sum(1 for kw in _IN_SCOPE_KEYWORDS if kw in q_lower) | |
| if hits < self.scope_threshold: | |
| return GuardrailResult( | |
| layer="input", check="scope", passed=False, | |
| reason=( | |
| "Query does not appear to be related to Apple financials or " | |
| "Morningstar research. This pipeline only covers Apple SEC filings " | |
| "(10-K, 10-Q, 8-K) and Morningstar equity research." | |
| ), | |
| severity="warn", # warn not block β user may rephrase | |
| ) | |
| return GuardrailResult(layer="input", check="scope", passed=True) | |
| def run(self, query: str) -> list[GuardrailResult]: | |
| return [ | |
| self.check_length(query), | |
| self.check_injection(query), | |
| self.check_pii(query), | |
| self.check_blocked_topics(query), | |
| self.check_scope(query), | |
| ] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LAYER 2 β RETRIEVAL GUARDRAILS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RetrievalGuardrails: | |
| """ | |
| Checks retrieved chunks before passing them to the LLM. | |
| Prevents the LLM from hallucinating when there is no relevant context. | |
| """ | |
| def __init__( | |
| self, | |
| min_top_score : float = 0.30, # cosine sim or rerank score | |
| min_chunks : int = 1, | |
| ): | |
| self.min_top_score = min_top_score | |
| self.min_chunks = min_chunks | |
| def check_relevance(self, chunks: list[dict]) -> GuardrailResult: | |
| if not chunks or len(chunks) < self.min_chunks: | |
| return GuardrailResult( | |
| layer="retrieval", check="min_chunks", passed=False, | |
| reason=f"No relevant chunks retrieved. " | |
| "The question may be outside the scope of the indexed documents.", | |
| severity="block", | |
| ) | |
| top_score = chunks[0].get("score", 0) | |
| if top_score < self.min_top_score: | |
| return GuardrailResult( | |
| layer="retrieval", check="relevance_score", passed=False, | |
| reason=( | |
| f"Top retrieval score {top_score:.3f} is below threshold " | |
| f"{self.min_top_score}. Retrieved context is likely irrelevant β " | |
| "generating an answer could produce hallucinations." | |
| ), | |
| severity="block", | |
| ) | |
| return GuardrailResult(layer="retrieval", check="relevance_score", passed=True) | |
| def check_diversity(self, chunks: list[dict]) -> GuardrailResult: | |
| """Warn if all chunks come from the same single document.""" | |
| if len(chunks) < 2: | |
| return GuardrailResult( | |
| layer="retrieval", check="source_diversity", passed=True | |
| ) | |
| doc_ids = [c.get("metadata", {}).get("doc_id", "") for c in chunks] | |
| unique = len(set(doc_ids)) | |
| if unique == 1 and len(chunks) >= 3: | |
| return GuardrailResult( | |
| layer="retrieval", check="source_diversity", passed=False, | |
| reason=( | |
| f"All {len(chunks)} retrieved chunks are from the same document. " | |
| "Answer may lack cross-document perspective." | |
| ), | |
| severity="warn", | |
| ) | |
| return GuardrailResult(layer="retrieval", check="source_diversity", passed=True) | |
| def run(self, chunks: list[dict]) -> list[GuardrailResult]: | |
| return [ | |
| self.check_relevance(chunks), | |
| self.check_diversity(chunks), | |
| ] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LAYER 3 β OUTPUT GUARDRAILS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Phrases that indicate the LLM is guessing rather than citing | |
| _UNCERTAINTY_PHRASES = [ | |
| r"\bi think\b", | |
| r"\bi believe\b", | |
| r"\bprobably\b", | |
| r"\bmaybe\b", | |
| r"\bmight be\b", | |
| r"\bi('m| am) not sure\b", | |
| r"\bI('m| am) guessing\b", | |
| r"\bI('m| am) not certain\b", | |
| r"\bcould be\b", | |
| r"\bI('m| am) unsure\b", | |
| ] | |
| _UNCERTAINTY_RE = re.compile("|".join(_UNCERTAINTY_PHRASES), re.IGNORECASE) | |
| # Phrases that signal investment advice (trigger disclaimer) | |
| _ADVISORY_PHRASES = [ | |
| r"\b(should|recommend|advise|suggest)\s+(you\s+)?(buy|sell|invest|hold)\b", | |
| r"\b(good|great|strong|excellent)\s+(buy|investment|opportunity)\b", | |
| r"\bprice\s+target\b", | |
| r"\bupside\s+(potential|of)\b", | |
| ] | |
| _ADVISORY_RE = re.compile("|".join(_ADVISORY_PHRASES), re.IGNORECASE) | |
| class OutputGuardrails: | |
| """Validates and optionally transforms the LLM answer before returning it.""" | |
| def __init__( | |
| self, | |
| min_answer_len : int = 20, | |
| grounding_threshold : float = 0.50, | |
| add_disclaimer : bool = True, | |
| ): | |
| self.min_answer_len = min_answer_len | |
| self.grounding_threshold = grounding_threshold | |
| self.add_disclaimer = add_disclaimer | |
| def check_empty(self, answer: str) -> GuardrailResult: | |
| if not answer or len(answer.strip()) < self.min_answer_len: | |
| return GuardrailResult( | |
| layer="output", check="empty_response", passed=False, | |
| reason="LLM returned an empty or trivially short answer.", | |
| severity="block", | |
| ) | |
| return GuardrailResult(layer="output", check="empty_response", passed=True) | |
| def check_uncertainty(self, answer: str) -> GuardrailResult: | |
| """Flag answers where the LLM expresses uncertainty β shouldn't happen with RAG.""" | |
| match = _UNCERTAINTY_RE.search(answer) | |
| if match: | |
| phrase = match.group(0) | |
| return GuardrailResult( | |
| layer="output", check="uncertainty_phrases", passed=False, | |
| reason=( | |
| f"Answer contains uncertainty phrase: '{phrase}'. " | |
| "A grounded RAG answer should only state facts present in the context." | |
| ), | |
| severity="warn", | |
| ) | |
| return GuardrailResult(layer="output", check="uncertainty_phrases", passed=True) | |
| def check_grounding( | |
| self, answer: str, chunks: list[dict] | |
| ) -> GuardrailResult: | |
| """Delegate to AnswerVerifier for NLI-based grounding check.""" | |
| try: | |
| from src.verifier import AnswerVerifier | |
| verifier = AnswerVerifier(entail_threshold=self.grounding_threshold) | |
| result = verifier.verify(answer, chunks) | |
| score = result["grounding_score"] | |
| if score < self.grounding_threshold: | |
| return GuardrailResult( | |
| layer="output", check="grounding_score", passed=False, | |
| reason=( | |
| f"Answer grounding score {score:.0%} is below threshold " | |
| f"{self.grounding_threshold:.0%}. " | |
| f"({result['contradicted']} contradicted, " | |
| f"{result['unverified']} unverified sentences)" | |
| ), | |
| severity="block", | |
| ) | |
| return GuardrailResult( | |
| layer="output", check="grounding_score", passed=True, | |
| reason=f"Grounding score: {score:.0%}", | |
| ) | |
| except Exception as e: | |
| log.warning(f"Grounding check skipped: {e}") | |
| return GuardrailResult( | |
| layer="output", check="grounding_score", passed=True, | |
| reason="Grounding check unavailable β skipped.", | |
| ) | |
| def apply_disclaimer(self, answer: str) -> str: | |
| """Append financial disclaimer if answer sounds advisory.""" | |
| if self.add_disclaimer and _ADVISORY_RE.search(answer): | |
| return answer + DISCLAIMER | |
| return answer | |
| def run( | |
| self, | |
| answer : str, | |
| chunks : list[dict], | |
| run_grounding : bool = True, | |
| ) -> tuple[list[GuardrailResult], str]: | |
| """ | |
| Run all output checks. Returns (results, safe_answer). | |
| `safe_answer` has disclaimer appended if needed. | |
| """ | |
| results = [self.check_empty(answer), self.check_uncertainty(answer)] | |
| if run_grounding: | |
| results.append(self.check_grounding(answer, chunks)) | |
| safe_answer = self.apply_disclaimer(answer) | |
| return results, safe_answer | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # COMBINED GUARDRAIL RUNNER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RAGGuardrails: | |
| """ | |
| Runs all three guardrail layers in sequence. | |
| Attributes | |
| ---------- | |
| input_guards : InputGuardrails | |
| retrieval_guards : RetrievalGuardrails | |
| output_guards : OutputGuardrails | |
| Usage | |
| ----- | |
| guards = RAGGuardrails() | |
| # Layer 1 β before retrieval | |
| ok, reason = guards.gate_input(query) | |
| if not ok: | |
| return reason | |
| chunks = retriever.retrieve(query, ...) | |
| # Layer 2 β after retrieval | |
| ok, reason = guards.gate_retrieval(chunks) | |
| if not ok: | |
| return reason | |
| answer = llm_chain.invoke(...) | |
| # Layer 3 β after generation | |
| final_answer, warnings = guards.gate_output(answer, chunks) | |
| return final_answer | |
| Or use the convenience wrapper: | |
| result = guards.run(query=query, chunks=chunks, answer=answer) | |
| print(result.final_answer) | |
| print(result.report()) | |
| """ | |
| def __init__( | |
| self, | |
| min_query_len : int = 10, | |
| max_query_len : int = 500, | |
| scope_threshold : int = 1, | |
| min_retrieval_score : float = 0.30, | |
| grounding_threshold : float = 0.50, | |
| add_disclaimer : bool = True, | |
| run_grounding : bool = True, | |
| ): | |
| self.input_guards = InputGuardrails( | |
| min_query_len = min_query_len, | |
| max_query_len = max_query_len, | |
| scope_threshold = scope_threshold, | |
| ) | |
| self.retrieval_guards = RetrievalGuardrails( | |
| min_top_score = min_retrieval_score, | |
| ) | |
| self.output_guards = OutputGuardrails( | |
| grounding_threshold = grounding_threshold, | |
| add_disclaimer = add_disclaimer, | |
| ) | |
| self.run_grounding = run_grounding | |
| # ββ Convenience gates βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def gate_input(self, query: str) -> tuple[bool, str]: | |
| """ | |
| Returns (True, "") if query passes all input checks. | |
| Returns (False, reason) on the first blocking failure. | |
| """ | |
| for r in self.input_guards.run(query): | |
| if not r.passed and r.severity == "block": | |
| return False, r.reason | |
| return True, "" | |
| def gate_retrieval(self, chunks: list[dict]) -> tuple[bool, str]: | |
| """ | |
| Returns (True, "") if retrieved chunks are relevant enough. | |
| Returns (False, reason) if retrieval quality is too low. | |
| """ | |
| for r in self.retrieval_guards.run(chunks): | |
| if not r.passed and r.severity == "block": | |
| return False, r.reason | |
| return True, "" | |
| def gate_output( | |
| self, answer: str, chunks: list[dict] | |
| ) -> tuple[str, list[str]]: | |
| """ | |
| Returns (safe_answer, [warning_messages]). | |
| `safe_answer` has disclaimer appended if needed. | |
| Raises ValueError if a blocking output check fails. | |
| """ | |
| results, safe_answer = self.output_guards.run( | |
| answer, chunks, run_grounding=self.run_grounding | |
| ) | |
| warnings = [] | |
| for r in results: | |
| if not r.passed: | |
| if r.severity == "block": | |
| raise ValueError(f"Output guardrail blocked: {r.reason}") | |
| else: | |
| warnings.append(r.reason) | |
| return safe_answer, warnings | |
| # ββ Full pipeline run βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run( | |
| self, | |
| query : str, | |
| chunks : list[dict], | |
| answer : str, | |
| ) -> PipelineGuardrailResult: | |
| """ | |
| Run all three layers and return a PipelineGuardrailResult. | |
| Does NOT raise on failure β check result.blocked instead. | |
| """ | |
| pr = PipelineGuardrailResult(query=query) | |
| # Layer 1 | |
| pr.input_results = self.input_guards.run(query) | |
| for r in pr.input_results: | |
| if not r.passed and r.severity == "block": | |
| pr.blocked = True | |
| pr.block_reason = f"[input/{r.check}] {r.reason}" | |
| pr.final_answer = f"Query blocked: {r.reason}" | |
| return pr | |
| # Layer 2 | |
| pr.retrieval_results = self.retrieval_guards.run(chunks) | |
| for r in pr.retrieval_results: | |
| if not r.passed and r.severity == "block": | |
| pr.blocked = True | |
| pr.block_reason = f"[retrieval/{r.check}] {r.reason}" | |
| pr.final_answer = ( | |
| "The provided documents do not contain enough information " | |
| "to answer this question reliably." | |
| ) | |
| return pr | |
| # Layer 3 | |
| output_results, safe_answer = self.output_guards.run( | |
| answer, chunks, run_grounding=self.run_grounding | |
| ) | |
| pr.output_results = output_results | |
| for r in pr.output_results: | |
| if not r.passed and r.severity == "block": | |
| pr.blocked = True | |
| pr.block_reason = f"[output/{r.check}] {r.reason}" | |
| pr.final_answer = ( | |
| "The generated answer could not be verified against the " | |
| "source documents. Please rephrase your question." | |
| ) | |
| return pr | |
| pr.final_answer = safe_answer | |
| return pr | |