Financial_bot / src /guardrails.py
Pushkya's picture
Upload 30 files
8299003 verified
Raw
History Blame Contribute Delete
25.2 kB
"""
guardrails.py
=============
Cross-cutting guardrails for the Morningstar RAG Pipeline.
Three layers of protection:
Layer 1 β€” Input Guardrails (before retrieval)
Validates user queries before they touch the vector store or LLM.
Catches: prompt injection, out-of-scope questions, PII, abusive queries,
queries that are too short/long to be meaningful.
Layer 2 β€” Retrieval Guardrails (after retrieval, before generation)
Checks that retrieved chunks are relevant enough to answer the question.
Catches: low-confidence retrievals that would force the LLM to hallucinate.
Layer 3 β€” Output Guardrails (after generation)
Validates the LLM response before returning it to the user.
Catches: empty responses, uncertainty phrases in financial answers,
missing grounding (delegates to verifier.py), missing disclaimer.
No external API required β€” all checks are regex + threshold-based.
Usage
-----
from src.guardrails import RAGGuardrails
guards = RAGGuardrails()
# Check query before retrieval
input_result = guards.check_input("What was Apple's revenue in FY2024?")
if not input_result.passed:
print(input_result.reason)
else:
# retrieve + generate ...
# Check retrieval relevance
retrieval_result = guards.check_retrieval(chunks)
if not retrieval_result.passed:
print(retrieval_result.reason)
else:
# generate ...
# Check output
output_result = guards.check_output(answer, chunks)
final_answer = output_result.safe_answer
Full pipeline wrapper
---------------------
result = guards.run(
query = "Apple gross margin FY2024?",
chunks = retrieved_chunks,
answer = llm_answer,
)
print(result.final_answer)
print(result.report())
"""
import re
import logging
from dataclasses import dataclass, field
from typing import Optional
log = logging.getLogger(__name__)
# ── Financial disclaimer appended to advisory-sounding answers ─────────────
DISCLAIMER = (
"\n\n---\n"
"_Disclaimer: This answer is generated from Morningstar research reports "
"and Apple SEC filings for informational purposes only. It does not "
"constitute investment advice. Always consult a qualified financial "
"adviser before making investment decisions._"
)
# ══════════════════════════════════════════════════════════════════════════════
# RESULT DATACLASS
# ══════════════════════════════════════════════════════════════════════════════
@dataclass
class GuardrailResult:
"""Outcome of a single guardrail check."""
layer : str # "input" | "retrieval" | "output"
check : str # name of the specific check
passed : bool
reason : str = "" # human-readable explanation when not passed
severity : str = "block" # "block" | "warn" β€” block stops pipeline, warn continues
@dataclass
class PipelineGuardrailResult:
"""Aggregated result from all guardrail layers."""
query : str
input_results : list[GuardrailResult] = field(default_factory=list)
retrieval_results: list[GuardrailResult] = field(default_factory=list)
output_results: list[GuardrailResult] = field(default_factory=list)
final_answer : str = ""
blocked : bool = False
block_reason : str = ""
def report(self) -> str:
all_results = self.input_results + self.retrieval_results + self.output_results
lines = [
"=" * 65,
f" Guardrail Report",
"=" * 65,
f" Query : {self.query[:80]}",
f" Blocked : {self.blocked}" + (f" ({self.block_reason})" if self.blocked else ""),
"-" * 65,
]
for r in all_results:
icon = "βœ“" if r.passed else ("βœ—" if r.severity == "block" else "⚠")
lines.append(
f" {icon} [{r.layer:10s}] [{r.check:30s}] "
f"{'PASS' if r.passed else r.severity.upper()}"
+ (f" β€” {r.reason}" if not r.passed else "")
)
lines.append("=" * 65)
return "\n".join(lines)
# ══════════════════════════════════════════════════════════════════════════════
# LAYER 1 β€” INPUT GUARDRAILS
# ══════════════════════════════════════════════════════════════════════════════
# Prompt injection patterns β€” attempts to override the system prompt
_INJECTION_PATTERNS = [
r"ignore\s+(all\s+)?(previous|prior|above)\s+instructions",
r"you\s+are\s+now\s+(a|an)",
r"disregard\s+(your|the)\s+(system\s+)?prompt",
r"forget\s+(everything|all)\s+(you('ve)?\s+been\s+told|above)",
r"(pretend|act|behave)\s+(as\s+if\s+you('re)?|like\s+you('re)?)\s+(a|an)",
r"do\s+not\s+follow\s+your\s+(instructions|guidelines|rules)",
r"system\s*:\s*you\s+are",
r"<\s*system\s*>",
r"\[INST\]",
r"###\s*(system|instruction)",
]
_INJECTION_RE = re.compile("|".join(_INJECTION_PATTERNS), re.IGNORECASE)
# PII patterns
_PII_PATTERNS = [
r"\b\d{3}-\d{2}-\d{4}\b", # SSN
r"\b\d{16}\b", # credit card (16 digits)
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z]{2,}\b",# email
r"\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b", # US phone
r"\bpassword\s*[:=]\s*\S+", # password=...
]
_PII_RE = re.compile("|".join(_PII_PATTERNS), re.IGNORECASE)
# In-scope keywords β€” query must relate to finance / Apple / Morningstar
_IN_SCOPE_KEYWORDS = {
# Company
"apple", "aapl", "tim cook", "morningstar",
# Document types
"10-k", "10-q", "8-k", "annual report", "quarterly report",
"sec", "edgar", "filing",
# Financial terms
"revenue", "net sales", "earnings", "eps", "profit", "loss", "income",
"margin", "gross margin", "operating", "ebitda", "cash flow",
"balance sheet", "assets", "liabilities", "equity", "debt",
"dividend", "buyback", "repurchase", "capex", "capital expenditure",
"guidance", "forecast", "outlook",
# Products/segments
"iphone", "ipad", "mac", "services", "wearables", "appstore", "app store",
"vision pro", "airpods", "watch",
# Analysis
"moat", "valuation", "fair value", "risk", "competition", "competitive",
"growth", "segment", "geographic", "americas", "china", "europe",
"fiscal year", "fy", "quarter", "q1", "q2", "q3", "q4",
}
# Hard-block topics β€” completely out of scope for a financial RAG assistant
_BLOCKED_TOPICS = [
r"\b(hack|exploit|malware|ransomware|phishing|ddos)\b",
r"\b(bomb|weapon|drug|illegal)\b",
r"\b(password|credential|login).{0,20}(steal|crack|bypass)\b",
r"\bself.harm\b",
]
_BLOCKED_RE = re.compile("|".join(_BLOCKED_TOPICS), re.IGNORECASE)
class InputGuardrails:
"""Validates user queries before they reach the retriever."""
def __init__(
self,
min_query_len : int = 10,
max_query_len : int = 500,
scope_threshold : int = 1, # min in-scope keywords required
):
self.min_query_len = min_query_len
self.max_query_len = max_query_len
self.scope_threshold = scope_threshold
def check_length(self, query: str) -> GuardrailResult:
n = len(query.strip())
if n < self.min_query_len:
return GuardrailResult(
layer="input", check="query_length", passed=False,
reason=f"Query too short ({n} chars). Please ask a complete question.",
severity="block",
)
if n > self.max_query_len:
return GuardrailResult(
layer="input", check="query_length", passed=False,
reason=f"Query too long ({n} chars, max {self.max_query_len}). Please shorten.",
severity="block",
)
return GuardrailResult(layer="input", check="query_length", passed=True)
def check_injection(self, query: str) -> GuardrailResult:
if _INJECTION_RE.search(query):
return GuardrailResult(
layer="input", check="prompt_injection", passed=False,
reason="Query contains prompt injection patterns and cannot be processed.",
severity="block",
)
return GuardrailResult(layer="input", check="prompt_injection", passed=True)
def check_pii(self, query: str) -> GuardrailResult:
match = _PII_RE.search(query)
if match:
return GuardrailResult(
layer="input", check="pii_detection", passed=False,
reason="Query appears to contain personal information (PII). "
"Please remove sensitive data before querying.",
severity="block",
)
return GuardrailResult(layer="input", check="pii_detection", passed=True)
def check_blocked_topics(self, query: str) -> GuardrailResult:
if _BLOCKED_RE.search(query):
return GuardrailResult(
layer="input", check="blocked_topics", passed=False,
reason="Query contains content outside the scope of this financial assistant.",
severity="block",
)
return GuardrailResult(layer="input", check="blocked_topics", passed=True)
def check_scope(self, query: str) -> GuardrailResult:
q_lower = query.lower()
hits = sum(1 for kw in _IN_SCOPE_KEYWORDS if kw in q_lower)
if hits < self.scope_threshold:
return GuardrailResult(
layer="input", check="scope", passed=False,
reason=(
"Query does not appear to be related to Apple financials or "
"Morningstar research. This pipeline only covers Apple SEC filings "
"(10-K, 10-Q, 8-K) and Morningstar equity research."
),
severity="warn", # warn not block β€” user may rephrase
)
return GuardrailResult(layer="input", check="scope", passed=True)
def run(self, query: str) -> list[GuardrailResult]:
return [
self.check_length(query),
self.check_injection(query),
self.check_pii(query),
self.check_blocked_topics(query),
self.check_scope(query),
]
# ══════════════════════════════════════════════════════════════════════════════
# LAYER 2 β€” RETRIEVAL GUARDRAILS
# ══════════════════════════════════════════════════════════════════════════════
class RetrievalGuardrails:
"""
Checks retrieved chunks before passing them to the LLM.
Prevents the LLM from hallucinating when there is no relevant context.
"""
def __init__(
self,
min_top_score : float = 0.30, # cosine sim or rerank score
min_chunks : int = 1,
):
self.min_top_score = min_top_score
self.min_chunks = min_chunks
def check_relevance(self, chunks: list[dict]) -> GuardrailResult:
if not chunks or len(chunks) < self.min_chunks:
return GuardrailResult(
layer="retrieval", check="min_chunks", passed=False,
reason=f"No relevant chunks retrieved. "
"The question may be outside the scope of the indexed documents.",
severity="block",
)
top_score = chunks[0].get("score", 0)
if top_score < self.min_top_score:
return GuardrailResult(
layer="retrieval", check="relevance_score", passed=False,
reason=(
f"Top retrieval score {top_score:.3f} is below threshold "
f"{self.min_top_score}. Retrieved context is likely irrelevant β€” "
"generating an answer could produce hallucinations."
),
severity="block",
)
return GuardrailResult(layer="retrieval", check="relevance_score", passed=True)
def check_diversity(self, chunks: list[dict]) -> GuardrailResult:
"""Warn if all chunks come from the same single document."""
if len(chunks) < 2:
return GuardrailResult(
layer="retrieval", check="source_diversity", passed=True
)
doc_ids = [c.get("metadata", {}).get("doc_id", "") for c in chunks]
unique = len(set(doc_ids))
if unique == 1 and len(chunks) >= 3:
return GuardrailResult(
layer="retrieval", check="source_diversity", passed=False,
reason=(
f"All {len(chunks)} retrieved chunks are from the same document. "
"Answer may lack cross-document perspective."
),
severity="warn",
)
return GuardrailResult(layer="retrieval", check="source_diversity", passed=True)
def run(self, chunks: list[dict]) -> list[GuardrailResult]:
return [
self.check_relevance(chunks),
self.check_diversity(chunks),
]
# ══════════════════════════════════════════════════════════════════════════════
# LAYER 3 β€” OUTPUT GUARDRAILS
# ══════════════════════════════════════════════════════════════════════════════
# Phrases that indicate the LLM is guessing rather than citing
_UNCERTAINTY_PHRASES = [
r"\bi think\b",
r"\bi believe\b",
r"\bprobably\b",
r"\bmaybe\b",
r"\bmight be\b",
r"\bi('m| am) not sure\b",
r"\bI('m| am) guessing\b",
r"\bI('m| am) not certain\b",
r"\bcould be\b",
r"\bI('m| am) unsure\b",
]
_UNCERTAINTY_RE = re.compile("|".join(_UNCERTAINTY_PHRASES), re.IGNORECASE)
# Phrases that signal investment advice (trigger disclaimer)
_ADVISORY_PHRASES = [
r"\b(should|recommend|advise|suggest)\s+(you\s+)?(buy|sell|invest|hold)\b",
r"\b(good|great|strong|excellent)\s+(buy|investment|opportunity)\b",
r"\bprice\s+target\b",
r"\bupside\s+(potential|of)\b",
]
_ADVISORY_RE = re.compile("|".join(_ADVISORY_PHRASES), re.IGNORECASE)
class OutputGuardrails:
"""Validates and optionally transforms the LLM answer before returning it."""
def __init__(
self,
min_answer_len : int = 20,
grounding_threshold : float = 0.50,
add_disclaimer : bool = True,
):
self.min_answer_len = min_answer_len
self.grounding_threshold = grounding_threshold
self.add_disclaimer = add_disclaimer
def check_empty(self, answer: str) -> GuardrailResult:
if not answer or len(answer.strip()) < self.min_answer_len:
return GuardrailResult(
layer="output", check="empty_response", passed=False,
reason="LLM returned an empty or trivially short answer.",
severity="block",
)
return GuardrailResult(layer="output", check="empty_response", passed=True)
def check_uncertainty(self, answer: str) -> GuardrailResult:
"""Flag answers where the LLM expresses uncertainty β€” shouldn't happen with RAG."""
match = _UNCERTAINTY_RE.search(answer)
if match:
phrase = match.group(0)
return GuardrailResult(
layer="output", check="uncertainty_phrases", passed=False,
reason=(
f"Answer contains uncertainty phrase: '{phrase}'. "
"A grounded RAG answer should only state facts present in the context."
),
severity="warn",
)
return GuardrailResult(layer="output", check="uncertainty_phrases", passed=True)
def check_grounding(
self, answer: str, chunks: list[dict]
) -> GuardrailResult:
"""Delegate to AnswerVerifier for NLI-based grounding check."""
try:
from src.verifier import AnswerVerifier
verifier = AnswerVerifier(entail_threshold=self.grounding_threshold)
result = verifier.verify(answer, chunks)
score = result["grounding_score"]
if score < self.grounding_threshold:
return GuardrailResult(
layer="output", check="grounding_score", passed=False,
reason=(
f"Answer grounding score {score:.0%} is below threshold "
f"{self.grounding_threshold:.0%}. "
f"({result['contradicted']} contradicted, "
f"{result['unverified']} unverified sentences)"
),
severity="block",
)
return GuardrailResult(
layer="output", check="grounding_score", passed=True,
reason=f"Grounding score: {score:.0%}",
)
except Exception as e:
log.warning(f"Grounding check skipped: {e}")
return GuardrailResult(
layer="output", check="grounding_score", passed=True,
reason="Grounding check unavailable β€” skipped.",
)
def apply_disclaimer(self, answer: str) -> str:
"""Append financial disclaimer if answer sounds advisory."""
if self.add_disclaimer and _ADVISORY_RE.search(answer):
return answer + DISCLAIMER
return answer
def run(
self,
answer : str,
chunks : list[dict],
run_grounding : bool = True,
) -> tuple[list[GuardrailResult], str]:
"""
Run all output checks. Returns (results, safe_answer).
`safe_answer` has disclaimer appended if needed.
"""
results = [self.check_empty(answer), self.check_uncertainty(answer)]
if run_grounding:
results.append(self.check_grounding(answer, chunks))
safe_answer = self.apply_disclaimer(answer)
return results, safe_answer
# ══════════════════════════════════════════════════════════════════════════════
# COMBINED GUARDRAIL RUNNER
# ══════════════════════════════════════════════════════════════════════════════
class RAGGuardrails:
"""
Runs all three guardrail layers in sequence.
Attributes
----------
input_guards : InputGuardrails
retrieval_guards : RetrievalGuardrails
output_guards : OutputGuardrails
Usage
-----
guards = RAGGuardrails()
# Layer 1 β€” before retrieval
ok, reason = guards.gate_input(query)
if not ok:
return reason
chunks = retriever.retrieve(query, ...)
# Layer 2 β€” after retrieval
ok, reason = guards.gate_retrieval(chunks)
if not ok:
return reason
answer = llm_chain.invoke(...)
# Layer 3 β€” after generation
final_answer, warnings = guards.gate_output(answer, chunks)
return final_answer
Or use the convenience wrapper:
result = guards.run(query=query, chunks=chunks, answer=answer)
print(result.final_answer)
print(result.report())
"""
def __init__(
self,
min_query_len : int = 10,
max_query_len : int = 500,
scope_threshold : int = 1,
min_retrieval_score : float = 0.30,
grounding_threshold : float = 0.50,
add_disclaimer : bool = True,
run_grounding : bool = True,
):
self.input_guards = InputGuardrails(
min_query_len = min_query_len,
max_query_len = max_query_len,
scope_threshold = scope_threshold,
)
self.retrieval_guards = RetrievalGuardrails(
min_top_score = min_retrieval_score,
)
self.output_guards = OutputGuardrails(
grounding_threshold = grounding_threshold,
add_disclaimer = add_disclaimer,
)
self.run_grounding = run_grounding
# ── Convenience gates ───────────────────────────────────────────────────
def gate_input(self, query: str) -> tuple[bool, str]:
"""
Returns (True, "") if query passes all input checks.
Returns (False, reason) on the first blocking failure.
"""
for r in self.input_guards.run(query):
if not r.passed and r.severity == "block":
return False, r.reason
return True, ""
def gate_retrieval(self, chunks: list[dict]) -> tuple[bool, str]:
"""
Returns (True, "") if retrieved chunks are relevant enough.
Returns (False, reason) if retrieval quality is too low.
"""
for r in self.retrieval_guards.run(chunks):
if not r.passed and r.severity == "block":
return False, r.reason
return True, ""
def gate_output(
self, answer: str, chunks: list[dict]
) -> tuple[str, list[str]]:
"""
Returns (safe_answer, [warning_messages]).
`safe_answer` has disclaimer appended if needed.
Raises ValueError if a blocking output check fails.
"""
results, safe_answer = self.output_guards.run(
answer, chunks, run_grounding=self.run_grounding
)
warnings = []
for r in results:
if not r.passed:
if r.severity == "block":
raise ValueError(f"Output guardrail blocked: {r.reason}")
else:
warnings.append(r.reason)
return safe_answer, warnings
# ── Full pipeline run ───────────────────────────────────────────────────
def run(
self,
query : str,
chunks : list[dict],
answer : str,
) -> PipelineGuardrailResult:
"""
Run all three layers and return a PipelineGuardrailResult.
Does NOT raise on failure β€” check result.blocked instead.
"""
pr = PipelineGuardrailResult(query=query)
# Layer 1
pr.input_results = self.input_guards.run(query)
for r in pr.input_results:
if not r.passed and r.severity == "block":
pr.blocked = True
pr.block_reason = f"[input/{r.check}] {r.reason}"
pr.final_answer = f"Query blocked: {r.reason}"
return pr
# Layer 2
pr.retrieval_results = self.retrieval_guards.run(chunks)
for r in pr.retrieval_results:
if not r.passed and r.severity == "block":
pr.blocked = True
pr.block_reason = f"[retrieval/{r.check}] {r.reason}"
pr.final_answer = (
"The provided documents do not contain enough information "
"to answer this question reliably."
)
return pr
# Layer 3
output_results, safe_answer = self.output_guards.run(
answer, chunks, run_grounding=self.run_grounding
)
pr.output_results = output_results
for r in pr.output_results:
if not r.passed and r.severity == "block":
pr.blocked = True
pr.block_reason = f"[output/{r.check}] {r.reason}"
pr.final_answer = (
"The generated answer could not be verified against the "
"source documents. Please rephrase your question."
)
return pr
pr.final_answer = safe_answer
return pr