Spaces:
Sleeping
Sleeping
| # ============================================================= | |
| # File: backend/api/services/redflag_detector.py | |
| # ============================================================= | |
| """ | |
| Enterprise RedFlagDetector | |
| - Loads per-tenant rules from Supabase REST (or you can swap to Postgres direct) | |
| - Caches rules per tenant with TTL | |
| - Performs regex and keyword matching | |
| - Returns structured match objects with severity and rule metadata | |
| - Sends notifications to Admin MCP or a webhook | |
| """ | |
| import hashlib | |
| import os | |
| import re | |
| import time | |
| from typing import List, Dict, Any, Optional | |
| import httpx | |
| from ..models.redflag import RedFlagRule, RedFlagMatch | |
| from ..storage.rules_store import RulesStore | |
| from .semantic_encoder import embed_text, cosine_similarity | |
| class RedFlagDetector: | |
| def __init__( | |
| self, | |
| supabase_url: Optional[str] = None, | |
| supabase_key: Optional[str] = None, | |
| admin_mcp_url: Optional[str] = None, | |
| cache_ttl: int = 300, | |
| rules_store: Optional[RulesStore] = None, | |
| ): | |
| self.supabase_url = supabase_url or os.getenv("SUPABASE_URL") | |
| self.supabase_key = supabase_key or os.getenv("SUPABASE_SERVICE_KEY") | |
| self.admin_mcp_url = admin_mcp_url or os.getenv("ADMIN_MCP_URL") | |
| self.cache_ttl = cache_ttl | |
| self.rules_store = rules_store or RulesStore() | |
| self._rules_cache: Dict[str, Dict[str, Any]] = {} # tenant_id -> {"fetched_at":ts, "rules":[...]} | |
| self._rule_embeddings: Dict[str, Dict[str, List[float]]] = {} | |
| self._client = httpx.AsyncClient(timeout=15) | |
| async def _fetch_rules_from_supabase(self, tenant_id: str) -> List[RedFlagRule]: | |
| # Expecting a table `redflag_rules` with columns: id, tenant_id, pattern, description, severity, source, enabled, keywords (json array) | |
| if not self.supabase_url or not self.supabase_key: | |
| return [] | |
| url = self.supabase_url.rstrip("/") + "/rest/v1/redflag_rules" | |
| headers = {"apikey": self.supabase_key, "Authorization": f"Bearer {self.supabase_key}"} | |
| params = {"tenant_id": f"eq.{tenant_id}", "select": "*"} | |
| r = await self._client.get(url, headers=headers, params=params) | |
| r.raise_for_status() | |
| rows = r.json() | |
| rules: List[RedFlagRule] = [] | |
| for row in rows: | |
| try: | |
| keywords = row.get("keywords") or [] | |
| if isinstance(keywords, str): | |
| # attempt to parse JSON-encoded string | |
| try: | |
| import json | |
| keywords = json.loads(keywords) | |
| except Exception: | |
| keywords = [] | |
| rules.append( | |
| RedFlagRule( | |
| id=str(row.get("id")), | |
| pattern=row.get("pattern") or "", | |
| description=row.get("description") or "", | |
| severity=row.get("severity") or "medium", | |
| source=row.get("source") or "admin", | |
| enabled=row.get("enabled", True), | |
| keywords=keywords or [], | |
| ) | |
| ) | |
| except Exception: | |
| # skip invalid rows defensively | |
| continue | |
| return rules | |
| async def load_rules(self, tenant_id: str) -> List[RedFlagRule]: | |
| now = int(time.time()) | |
| entry = self._rules_cache.get(tenant_id) | |
| if entry and now - entry["fetched_at"] < self.cache_ttl: | |
| return entry["rules"] | |
| rules: List[RedFlagRule] = [] | |
| remote_rules: List[RedFlagRule] = [] | |
| if self.supabase_url and self.supabase_key: | |
| try: | |
| remote_rules = await self._fetch_rules_from_supabase(tenant_id) | |
| except Exception: | |
| remote_rules = [] | |
| local_rules = self._fetch_local_rules(tenant_id) | |
| rules.extend(remote_rules) | |
| rules.extend(local_rules) | |
| self._rules_cache[tenant_id] = {"fetched_at": now, "rules": rules} | |
| # Pre-compute embeddings for semantic scoring | |
| embed_map: Dict[str, List[float]] = {} | |
| for rule in rules: | |
| try: | |
| text_for_embedding = " ".join( | |
| [piece for piece in [rule.description, rule.pattern] if piece] | |
| ).strip() or rule.id | |
| embed_map[rule.id] = embed_text(text_for_embedding) | |
| except Exception: | |
| embed_map[rule.id] = [] | |
| self._rule_embeddings[tenant_id] = embed_map | |
| return rules | |
| def _fetch_local_rules(self, tenant_id: str) -> List[RedFlagRule]: | |
| if not self.rules_store: | |
| return [] | |
| rows = self.rules_store.get_rules(tenant_id) | |
| rules: List[RedFlagRule] = [] | |
| for raw in rows: | |
| text = (raw or "").strip() | |
| if not text: | |
| continue | |
| rule_id = hashlib.sha1(f"{tenant_id}:{text}".encode()).hexdigest() | |
| rules.append( | |
| RedFlagRule( | |
| id=rule_id, | |
| pattern=text, | |
| description=text, | |
| severity="high", | |
| source="admin_local", | |
| enabled=True, | |
| keywords=[text.lower()] if len(text.split()) <= 6 else [], | |
| ) | |
| ) | |
| return rules | |
| async def check(self, tenant_id: str, text: str) -> List[RedFlagMatch]: | |
| """Return structured matches for the given tenant and text.""" | |
| if not text: | |
| return [] | |
| rules = await self.load_rules(tenant_id) | |
| matches: List[RedFlagMatch] = [] | |
| text_lower = text.lower() | |
| text_vector = embed_text(text) | |
| for rule in rules: | |
| if not rule.enabled: | |
| continue | |
| matched = False | |
| matched_text = "" | |
| match_source = "" | |
| keyword_score = 0.0 | |
| regex_score = 0.0 | |
| # 1) Keyword quick-check (cheap) | |
| for kw in (rule.keywords or []): | |
| if kw and kw.lower() in text_lower: | |
| matched = True | |
| matched_text = kw | |
| keyword_score = 0.8 | |
| match_source = "keyword" | |
| break | |
| # 2) Regex check (more precise) | |
| if not matched and rule.pattern: | |
| try: | |
| pat = re.compile(rule.pattern, re.IGNORECASE) | |
| m = pat.search(text) | |
| if m: | |
| matched = True | |
| matched_text = m.group(0) | |
| regex_score = 1.0 | |
| match_source = "regex" | |
| except re.error: | |
| # invalid regex; skip this rule | |
| continue | |
| semantic_score = self._semantic_score(tenant_id, rule.id, text_vector) | |
| confidence = max(semantic_score, keyword_score, regex_score) | |
| if matched: | |
| matches.append( | |
| RedFlagMatch( | |
| rule_id=rule.id, | |
| pattern=rule.pattern, | |
| severity=rule.severity, | |
| description=rule.description, | |
| matched_text=matched_text, | |
| confidence=round(confidence, 2), | |
| explanation=self._build_explanation(rule, match_source, matched_text, confidence), | |
| ) | |
| ) | |
| elif semantic_score >= 0.82: | |
| matches.append( | |
| RedFlagMatch( | |
| rule_id=rule.id, | |
| pattern=rule.pattern, | |
| severity=rule.severity, | |
| description=rule.description, | |
| matched_text=matched_text or "", | |
| confidence=round(semantic_score, 2), | |
| explanation=self._build_explanation(rule, "semantic", matched_text, semantic_score), | |
| ) | |
| ) | |
| return matches | |
| async def notify_admin(self, tenant_id: str, violations: List[RedFlagMatch], source_payload: Optional[Dict[str, Any]] = None) -> None: | |
| """Notify the Admin MCP server (or a webhook) about the matches.""" | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "violations": [v.__dict__ for v in violations], | |
| "source": source_payload or {}, | |
| } | |
| # 1) POST to Admin MCP /alert if configured | |
| if self.admin_mcp_url: | |
| try: | |
| await self._client.post(self.admin_mcp_url.rstrip("/") + "/alert", json=payload, timeout=10) | |
| except Exception: | |
| # swallow exceptions — notifications should not crash orchestration | |
| pass | |
| # 2) Optionally send to a Slack/Teams webhook | |
| webhook = os.getenv("ALERT_WEBHOOK") | |
| if webhook: | |
| try: | |
| await self._client.post(webhook, json={"text": f"Red-flag for tenant {tenant_id}", "details": payload}, timeout=10) | |
| except Exception: | |
| pass | |
| async def close(self): | |
| await self._client.aclose() | |
| def _semantic_score(self, tenant_id: str, rule_id: str, text_vector: List[float]) -> float: | |
| rule_vectors = self._rule_embeddings.get(tenant_id, {}) | |
| rule_vector = rule_vectors.get(rule_id) | |
| if not rule_vector: | |
| return 0.0 | |
| return cosine_similarity(rule_vector, text_vector) | |
| def _build_explanation(rule: RedFlagRule, source: str, matched_text: str, confidence: float) -> str: | |
| base = f"Matched rule '{rule.description or rule.id}' via {source or 'heuristics'}" | |
| if matched_text: | |
| base += f" on span \"{matched_text}\"" | |
| return f"{base}. confidence={round(confidence, 2)}" | |