# ============================================================= # File: backend/api/services/redflag_detector.py # ============================================================= """ Enterprise RedFlagDetector - Loads per-tenant rules from Supabase REST (or you can swap to Postgres direct) - Caches rules per tenant with TTL - Performs regex and keyword matching - Returns structured match objects with severity and rule metadata - Sends notifications to Admin MCP or a webhook """ import hashlib import os import re import time from typing import List, Dict, Any, Optional import httpx from ..models.redflag import RedFlagRule, RedFlagMatch from ..storage.rules_store import RulesStore from .semantic_encoder import embed_text, cosine_similarity class RedFlagDetector: def __init__( self, supabase_url: Optional[str] = None, supabase_key: Optional[str] = None, admin_mcp_url: Optional[str] = None, cache_ttl: int = 300, rules_store: Optional[RulesStore] = None, ): self.supabase_url = supabase_url or os.getenv("SUPABASE_URL") self.supabase_key = supabase_key or os.getenv("SUPABASE_SERVICE_KEY") self.admin_mcp_url = admin_mcp_url or os.getenv("ADMIN_MCP_URL") self.cache_ttl = cache_ttl self.rules_store = rules_store or RulesStore() self._rules_cache: Dict[str, Dict[str, Any]] = {} # tenant_id -> {"fetched_at":ts, "rules":[...]} self._rule_embeddings: Dict[str, Dict[str, List[float]]] = {} self._client = httpx.AsyncClient(timeout=15) async def _fetch_rules_from_supabase(self, tenant_id: str) -> List[RedFlagRule]: # Expecting a table `redflag_rules` with columns: id, tenant_id, pattern, description, severity, source, enabled, keywords (json array) if not self.supabase_url or not self.supabase_key: return [] url = self.supabase_url.rstrip("/") + "/rest/v1/redflag_rules" headers = {"apikey": self.supabase_key, "Authorization": f"Bearer {self.supabase_key}"} params = {"tenant_id": f"eq.{tenant_id}", "select": "*"} r = await self._client.get(url, headers=headers, params=params) r.raise_for_status() rows = r.json() rules: List[RedFlagRule] = [] for row in rows: try: keywords = row.get("keywords") or [] if isinstance(keywords, str): # attempt to parse JSON-encoded string try: import json keywords = json.loads(keywords) except Exception: keywords = [] rules.append( RedFlagRule( id=str(row.get("id")), pattern=row.get("pattern") or "", description=row.get("description") or "", severity=row.get("severity") or "medium", source=row.get("source") or "admin", enabled=row.get("enabled", True), keywords=keywords or [], ) ) except Exception: # skip invalid rows defensively continue return rules async def load_rules(self, tenant_id: str) -> List[RedFlagRule]: now = int(time.time()) entry = self._rules_cache.get(tenant_id) if entry and now - entry["fetched_at"] < self.cache_ttl: return entry["rules"] rules: List[RedFlagRule] = [] remote_rules: List[RedFlagRule] = [] if self.supabase_url and self.supabase_key: try: remote_rules = await self._fetch_rules_from_supabase(tenant_id) except Exception: remote_rules = [] local_rules = self._fetch_local_rules(tenant_id) rules.extend(remote_rules) rules.extend(local_rules) self._rules_cache[tenant_id] = {"fetched_at": now, "rules": rules} # Pre-compute embeddings for semantic scoring embed_map: Dict[str, List[float]] = {} for rule in rules: try: text_for_embedding = " ".join( [piece for piece in [rule.description, rule.pattern] if piece] ).strip() or rule.id embed_map[rule.id] = embed_text(text_for_embedding) except Exception: embed_map[rule.id] = [] self._rule_embeddings[tenant_id] = embed_map return rules def _fetch_local_rules(self, tenant_id: str) -> List[RedFlagRule]: if not self.rules_store: return [] rows = self.rules_store.get_rules(tenant_id) rules: List[RedFlagRule] = [] for raw in rows: text = (raw or "").strip() if not text: continue rule_id = hashlib.sha1(f"{tenant_id}:{text}".encode()).hexdigest() rules.append( RedFlagRule( id=rule_id, pattern=text, description=text, severity="high", source="admin_local", enabled=True, keywords=[text.lower()] if len(text.split()) <= 6 else [], ) ) return rules async def check(self, tenant_id: str, text: str) -> List[RedFlagMatch]: """Return structured matches for the given tenant and text.""" if not text: return [] rules = await self.load_rules(tenant_id) matches: List[RedFlagMatch] = [] text_lower = text.lower() text_vector = embed_text(text) for rule in rules: if not rule.enabled: continue matched = False matched_text = "" match_source = "" keyword_score = 0.0 regex_score = 0.0 # 1) Keyword quick-check (cheap) for kw in (rule.keywords or []): if kw and kw.lower() in text_lower: matched = True matched_text = kw keyword_score = 0.8 match_source = "keyword" break # 2) Regex check (more precise) if not matched and rule.pattern: try: pat = re.compile(rule.pattern, re.IGNORECASE) m = pat.search(text) if m: matched = True matched_text = m.group(0) regex_score = 1.0 match_source = "regex" except re.error: # invalid regex; skip this rule continue semantic_score = self._semantic_score(tenant_id, rule.id, text_vector) confidence = max(semantic_score, keyword_score, regex_score) if matched: matches.append( RedFlagMatch( rule_id=rule.id, pattern=rule.pattern, severity=rule.severity, description=rule.description, matched_text=matched_text, confidence=round(confidence, 2), explanation=self._build_explanation(rule, match_source, matched_text, confidence), ) ) elif semantic_score >= 0.82: matches.append( RedFlagMatch( rule_id=rule.id, pattern=rule.pattern, severity=rule.severity, description=rule.description, matched_text=matched_text or "", confidence=round(semantic_score, 2), explanation=self._build_explanation(rule, "semantic", matched_text, semantic_score), ) ) return matches async def notify_admin(self, tenant_id: str, violations: List[RedFlagMatch], source_payload: Optional[Dict[str, Any]] = None) -> None: """Notify the Admin MCP server (or a webhook) about the matches.""" payload = { "tenant_id": tenant_id, "violations": [v.__dict__ for v in violations], "source": source_payload or {}, } # 1) POST to Admin MCP /alert if configured if self.admin_mcp_url: try: await self._client.post(self.admin_mcp_url.rstrip("/") + "/alert", json=payload, timeout=10) except Exception: # swallow exceptions — notifications should not crash orchestration pass # 2) Optionally send to a Slack/Teams webhook webhook = os.getenv("ALERT_WEBHOOK") if webhook: try: await self._client.post(webhook, json={"text": f"Red-flag for tenant {tenant_id}", "details": payload}, timeout=10) except Exception: pass async def close(self): await self._client.aclose() def _semantic_score(self, tenant_id: str, rule_id: str, text_vector: List[float]) -> float: rule_vectors = self._rule_embeddings.get(tenant_id, {}) rule_vector = rule_vectors.get(rule_id) if not rule_vector: return 0.0 return cosine_similarity(rule_vector, text_vector) @staticmethod def _build_explanation(rule: RedFlagRule, source: str, matched_text: str, confidence: float) -> str: base = f"Matched rule '{rule.description or rule.id}' via {source or 'heuristics'}" if matched_text: base += f" on span \"{matched_text}\"" return f"{base}. confidence={round(confidence, 2)}"