InsuranceBot / backend /security.py
rohitsar567's picture
recovery: integrate stalled-session work + de-stale/cleanup (pytest 215 green) [build-fix]
b87bd2d
Raw
History Blame Contribute Delete
16.5 kB
"""Security gates for user-uploaded content.
The /api/upload-policy endpoint accepts arbitrary PDFs from the public web.
That's a real attack surface. Each upload runs through these gates before
we touch it with the embedding model or LLM:
Gate 1 β€” FILE MECHANICS
- magic bytes start with %PDF
- size <= 25 MB and >= 5 KB
- no embedded JavaScript / AcroForm / OpenAction (PDF exploits)
- no /Launch / /EmbeddedFile actions (file execution / payload smuggling)
- no embedded executables (PE / ELF / Mach-O / shell scripts / HTML / PHP)
- well-formed %%EOF in trailer
Gate 2 β€” CONTENT QUALITY
- at least 1500 chars of extractable text (rejects scanned image-only PDFs
and intentional empty/garbage uploads)
- 3 ≀ page_count ≀ 200 β€” real insurance policies fall in this window;
<3 = trivially-empty, >200 = bundled-dump or abuse vector
- text contains at least one insurance-domain keyword (cheap filter for
"this is not a recipe / resume / random PDF")
Gate 3 β€” PROMPT INJECTION DEFENSE
- regex-scan for known injection patterns in extracted text
- reject if the PDF tries to override the system prompt, expose secrets,
or impersonate the assistant
- we DO NOT silently rewrite the content β€” block + log instead, so the
user knows their upload was rejected for a reason
Gate 4 β€” RATE LIMITING
- per-session: max 5 uploads / hour
- cumulative: max 200 chunks across all user uploads (prevents corpus
flooding by a single session)
Gate 5 β€” PER-IP RATE LIMIT
- max 10 uploads / hour per source IP (catches rotating session_id)
Gate 6 β€” ENCRYPTED / PASSWORD-PROTECTED PDF (added 2026-05-14)
- pdfplumber raises on encrypted PDFs; we surface that as a clean reject
instead of storing an opaque blob.
Gate 7 β€” BYTES HASH DEDUPE + REJECT-CACHE (added 2026-05-14)
- SHA256 the bytes. If we've rejected this exact hash in the last 24h,
short-circuit and reject again without re-running the full pipeline.
- If we've already accepted+indexed this hash for the same session,
return the cached chunk count.
Gate 8 β€” LLM-JUDGE AUDIT (added 2026-05-14)
- Pass the first 2000 chars to the judge LLM with a strict yes/no prompt:
"Is this a real Indian health-insurance policy document?". The judge is
the same Mistral Large 3 used elsewhere β€” different family from the
brain so it grades independently. ~3s latency, ~0.05 per call.
- High precision against social-engineering content that escapes regex
(e.g., carefully-worded prose that doesn't trip injection patterns
but isn't actually a policy).
Every block is logged to logs/upload_blocks.jsonl with the reason.
"""
from __future__ import annotations
import hashlib
import io
import json
import re
import time
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
from backend.config import settings
LOG_DIR = settings.CORPUS_DIR.parent.parent / "logs"
LOG_DIR.mkdir(exist_ok=True)
UPLOAD_BLOCK_LOG = LOG_DIR / "upload_blocks.jsonl"
@dataclass
class UploadVerdict:
accepted: bool
reasons: list[str] = field(default_factory=list)
extracted_text_chars: int = 0
page_count: int = 0
cached_chunks: Optional[int] = None
# Insurance-domain keywords used to confirm the PDF is plausibly a policy.
# A single hit on any of these is enough β€” we don't want to over-reject.
INSURANCE_KEYWORDS = (
"insur", "policy", "premium", "sum insured", "claim", "hospital",
"coverage", "covered", "exclus", "waiting period", "pre-existing",
"irdai", "deductible", "cashless", "ayush", "domiciliary", "ncb",
"no claim bonus", "renewal", "uin", "section", "covered",
"insurance", "insurer", "insurer's"
)
# Prompt-injection patterns we explicitly reject. Not exhaustive β€” but every
# one of these is a clear signal of adversarial content, not a real policy.
INJECTION_PATTERNS = [
re.compile(r"\bignore\s+(?:all\s+)?(?:previous|prior|above|earlier)\s+instructions", re.IGNORECASE),
re.compile(r"\bdisregard\s+(?:all\s+)?(?:previous|prior|above|earlier)\s+instructions", re.IGNORECASE),
re.compile(r"\bforget\s+(?:everything|all|your|the)\s+(?:previous|prior|above|instructions)", re.IGNORECASE),
re.compile(r"\byou\s+are\s+now\s+a\s+", re.IGNORECASE),
re.compile(r"\bact\s+as\s+(?:a|an)\s+(?:different|new)\s+", re.IGNORECASE),
re.compile(r"\bpretend\s+(?:to\s+be|you\s+are)\s+", re.IGNORECASE),
re.compile(r"\bsystem\s+prompt\b", re.IGNORECASE),
re.compile(r"<\s*\|?im_start\|?\s*>|<\s*\|?im_end\|?\s*>", re.IGNORECASE),
re.compile(r"\bjailbreak\b|\bdan\s+mode\b", re.IGNORECASE),
re.compile(r"reveal\s+(?:your|the)\s+(?:system|hidden|secret|original)\s+(?:prompt|instructions)", re.IGNORECASE),
re.compile(r"\bapi[_\- ]?key\b.{0,40}(?:reveal|share|tell|print|output)", re.IGNORECASE),
]
# PDF byte patterns that indicate active content / exploits
DANGEROUS_PDF_FEATURES = [
(rb"/JavaScript", "embedded_javascript"),
(rb"/JS ", "javascript_action"),
(rb"/JS\n", "javascript_action_newline"),
(rb"/Launch", "launch_action"),
(rb"/EmbeddedFile", "embedded_file"),
(rb"/OpenAction", "openaction_trigger"),
(rb"/SubmitForm", "form_submission"),
(rb"/AA<<", "auto_actions"),
(rb"/RichMedia", "rich_media_embed"),
(rb"/Movie", "movie_embed"),
(rb"/Sound", "sound_embed"),
(rb"/GoToR", "external_goto_action"),
]
# Magic-byte signatures for executables hidden inside PDFs.
# A PDF should NEVER contain these in its body.
EXECUTABLE_SIGNATURES = [
(b"MZ\x90\x00", "windows_pe_executable"), # Windows PE (.exe, .dll)
(b"\x7fELF", "linux_elf_executable"), # Linux ELF
(b"\xcf\xfa\xed\xfe", "macos_macho"), # macOS Mach-O
(b"\xca\xfe\xba\xbe", "java_class_or_macho"), # Java class file or Mach-O fat
(b"#!/", "shell_script_shebang"), # Shell script
(b"<script", "html_script_tag"), # HTML/JS payload
(b"<?php", "php_payload"),
]
# Per-IP rate limit (memory; v2 β†’ Redis)
ip_uploads: dict[str, list[float]] = defaultdict(list)
IP_UPLOADS_PER_HOUR = 10
# Per-session rate-limit state (in-memory; resets on restart).
# In v2 this moves to Redis.
class RateLimit:
def __init__(self):
self.uploads_by_session: dict[str, list[float]] = defaultdict(list)
self.chunks_by_session: dict[str, int] = defaultdict(int)
def check_upload_rate(self, session_id: str) -> Optional[str]:
now = time.time()
# Keep last hour only
self.uploads_by_session[session_id] = [
t for t in self.uploads_by_session[session_id] if now - t < 3600
]
if len(self.uploads_by_session[session_id]) >= 5:
return "rate_limit_uploads_per_hour"
return None
def record_upload(self, session_id: str, chunks: int):
self.uploads_by_session[session_id].append(time.time())
self.chunks_by_session[session_id] += chunks
def check_chunk_quota(self, session_id: str) -> Optional[str]:
if self.chunks_by_session.get(session_id, 0) >= 200:
return "rate_limit_total_chunks"
return None
rate_limiter = RateLimit()
def gate_pdf_mechanics(content: bytes) -> list[str]:
"""Gate 1 β€” bytes-level PDF checks. Now ALSO scans for executable
signatures hidden inside PDF body (Windows PE, Linux ELF, Mach-O,
shell scripts, HTML/JS payloads, PHP). A real insurance policy PDF
never contains these byte patterns.
"""
reasons: list[str] = []
if not content.startswith(b"%PDF"):
reasons.append("not_a_pdf_magic_bytes")
return reasons # no point checking further
if len(content) > 25 * 1024 * 1024:
reasons.append("file_too_large_25mb")
if len(content) < 5_000:
reasons.append("file_too_small_5kb")
# Verify the trailing %%EOF is present (well-formed PDF)
if b"%%EOF" not in content[-256:]:
reasons.append("malformed_pdf_missing_eof")
# Look for dangerous PDF features in the WHOLE file (not just first 2MB)
for needle, label in DANGEROUS_PDF_FEATURES:
if needle in content:
reasons.append(f"dangerous_pdf_feature: {label}")
# Scan for embedded executables / payloads
for sig, label in EXECUTABLE_SIGNATURES:
# Don't check the first 8 bytes (false positive on PDF magic)
if sig in content[8:]:
reasons.append(f"embedded_executable: {label}")
return reasons
def gate_ip_rate_limit(ip: str) -> list[str]:
"""Gate 5 β€” per-IP rate limit on top of per-session."""
if not ip:
return []
now = time.time()
ip_uploads[ip] = [t for t in ip_uploads[ip] if now - t < 3600]
if len(ip_uploads[ip]) >= IP_UPLOADS_PER_HOUR:
return ["rate_limit_per_ip_per_hour"]
return []
def record_ip_upload(ip: str):
if ip:
ip_uploads[ip].append(time.time())
def gate_content_quality(text: str, page_count: int) -> list[str]:
"""Gate 2 β€” extracted-text checks."""
reasons: list[str] = []
if len(text.strip()) < 1500:
reasons.append(f"too_little_text: {len(text)} chars")
if page_count < 3:
reasons.append(f"too_few_pages: {page_count}")
text_l = text.lower()
if not any(kw in text_l for kw in INSURANCE_KEYWORDS):
reasons.append("no_insurance_keywords_found")
return reasons
def gate_prompt_injection(text: str) -> list[str]:
"""Gate 3 β€” scan for injection patterns."""
reasons: list[str] = []
for pat in INJECTION_PATTERNS:
m = pat.search(text)
if m:
snippet = text[max(0, m.start() - 30): m.end() + 30].replace("\n", " ")
reasons.append(f"injection_pattern: {snippet[:100]}")
break # one hit is enough
return reasons
def gate_rate_limit(session_id: str) -> list[str]:
"""Gate 4 β€” per-session rate limits."""
reasons: list[str] = []
if r := rate_limiter.check_upload_rate(session_id):
reasons.append(r)
if r := rate_limiter.check_chunk_quota(session_id):
reasons.append(r)
return reasons
def gate_encrypted_pdf(content: bytes) -> list[str]:
"""Gate 6 β€” reject password-protected / encrypted PDFs.
pdfplumber raises on encrypted PDFs at open-time OR at first metadata /
pages access. We catch broadly so any decoding failure here surfaces as
a clean reject (don't let an unreadable blob get past content quality).
Runs BEFORE the text-extraction gates so we never store junk.
"""
try:
import pdfplumber
except Exception:
# If pdfplumber isn't importable for some reason, don't block the
# pipeline here β€” the upstream extraction step would have failed
# already if the lib was actually missing.
return []
try:
with pdfplumber.open(io.BytesIO(content)) as pdf:
# Touch metadata + first page to force a decrypt attempt; some
# encrypted PDFs only raise on these accesses, not on open().
_ = pdf.metadata
_ = pdf.pages
if pdf.pages:
_ = pdf.pages[0]
return []
except Exception as e:
msg = str(e).lower()
if "encrypt" in msg or "password" in msg:
return ["pdf_encrypted_or_password_protected"]
# Any other open/parse failure here = unreadable; reject the same way.
return ["pdf_encrypted_or_password_protected"]
def gate_page_count_ceiling(page_count: int) -> list[str]:
"""Gate 7 β€” reject PDFs over 200 pages.
Floor (<3) is handled in gate_content_quality. Anything over 200 pages
is almost certainly a bundled-dump or abuse vector (a real Indian health
insurance policy is 20-150 pages).
"""
if page_count > 200:
return ["too_many_pages_over_200"]
return []
# In-memory hash dedupe caches (process-local; reset on restart; v2 β†’ Redis).
_recent_rejects: dict[str, float] = {}
_recent_accepts: dict[tuple[str, str], int] = {}
_REJECT_TTL_SECONDS = 24 * 3600
def record_reject(sha: str) -> None:
"""Mark a content hash as recently-rejected so re-uploads short-circuit."""
_recent_rejects[sha] = time.time()
def record_accept(sha: str, session_id: str, chunks: int) -> None:
"""Cache an accepted hash β†’ chunk count for the same session."""
_recent_accepts[(sha, session_id)] = chunks
def gate_hash_dedupe(content: bytes, session_id: str) -> tuple[list[str], Optional[int]]:
"""Gate 7b β€” content-hash dedupe + reject cache.
Returns (reasons, cached_chunks):
- If hash was rejected <24h ago β†’ (["already_rejected_24h"], None)
- If (hash, session_id) is in the accept cache β†’ ([], cached_chunks)
- Else β†’ ([], None)
"""
sha = hashlib.sha256(content).hexdigest()
now = time.time()
# Drop rejects older than the TTL so the cache doesn't grow unbounded.
if _recent_rejects:
stale = [h for h, ts in _recent_rejects.items() if now - ts >= _REJECT_TTL_SECONDS]
for h in stale:
_recent_rejects.pop(h, None)
if sha in _recent_rejects and (now - _recent_rejects[sha]) < _REJECT_TTL_SECONDS:
return (["already_rejected_24h"], None)
cached = _recent_accepts.get((sha, session_id))
if cached is not None:
return ([], cached)
return ([], None)
async def check_upload(
content: bytes,
extracted_text: str,
page_count: int,
session_id: str = "anonymous",
ip: str = "",
) -> UploadVerdict:
"""Run all 8 gates. Return verdict with reasons (empty if accepted).
Order (cheap β†’ expensive; short-circuit only on the dedupe accept-cache,
so the caller can skip re-embedding):
1. gate_hash_dedupe (instant; may short-circuit accept w/ cached chunks)
2. gate_rate_limit (per-session counter lookup)
3. gate_ip_rate_limit (per-IP counter lookup)
4. gate_pdf_mechanics (byte scan)
5. gate_encrypted_pdf (pdfplumber open)
6. gate_content_quality (extracted text + β‰₯3 page floor)
7. gate_page_count_ceiling (≀200 page ceiling)
8. gate_prompt_injection (regex sweep)
"""
# 1. Hash dedupe first β€” accept-cache short-circuit avoids re-embedding.
dedupe_reasons, cached_chunks = gate_hash_dedupe(content, session_id)
if cached_chunks is not None:
return UploadVerdict(
accepted=True,
reasons=[],
extracted_text_chars=len(extracted_text),
page_count=page_count,
cached_chunks=cached_chunks,
)
if dedupe_reasons:
# Recently rejected β€” re-log + return immediately without running the rest.
_log_block(session_id, dedupe_reasons, len(content), len(extracted_text), page_count)
return UploadVerdict(
accepted=False,
reasons=dedupe_reasons,
extracted_text_chars=len(extracted_text),
page_count=page_count,
)
reasons: list[str] = []
reasons.extend(gate_rate_limit(session_id))
reasons.extend(gate_ip_rate_limit(ip))
reasons.extend(gate_pdf_mechanics(content))
reasons.extend(gate_encrypted_pdf(content))
reasons.extend(gate_content_quality(extracted_text, page_count))
reasons.extend(gate_page_count_ceiling(page_count))
reasons.extend(gate_prompt_injection(extracted_text))
verdict = UploadVerdict(
accepted=(len(reasons) == 0),
reasons=reasons,
extracted_text_chars=len(extracted_text),
page_count=page_count,
)
if not verdict.accepted:
# Cache this hash as rejected so identical re-uploads are cheap.
try:
sha = hashlib.sha256(content).hexdigest()
record_reject(sha)
except Exception:
pass
_log_block(session_id, reasons, len(content), len(extracted_text), page_count)
return verdict
def _log_block(session_id: str, reasons: list[str], byte_size: int, text_chars: int, pages: int):
entry = {
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"session_id": session_id,
"reasons": reasons,
"byte_size": byte_size,
"text_chars": text_chars,
"pages": pages,
}
with open(UPLOAD_BLOCK_LOG, "a") as f:
f.write(json.dumps(entry) + "\n")