IntegraChat / backend /api /services /redflag_detector.py
nothingworry's picture
imporve admin page & backend
4dc66a3
raw
history blame
9.92 kB
# =============================================================
# File: backend/api/services/redflag_detector.py
# =============================================================
"""
Enterprise RedFlagDetector
- Loads per-tenant rules from Supabase REST (or you can swap to Postgres direct)
- Caches rules per tenant with TTL
- Performs regex and keyword matching
- Returns structured match objects with severity and rule metadata
- Sends notifications to Admin MCP or a webhook
"""
import hashlib
import os
import re
import time
from typing import List, Dict, Any, Optional
import httpx
from ..models.redflag import RedFlagRule, RedFlagMatch
from ..storage.rules_store import RulesStore
from .semantic_encoder import embed_text, cosine_similarity
class RedFlagDetector:
def __init__(
self,
supabase_url: Optional[str] = None,
supabase_key: Optional[str] = None,
admin_mcp_url: Optional[str] = None,
cache_ttl: int = 300,
rules_store: Optional[RulesStore] = None,
):
self.supabase_url = supabase_url or os.getenv("SUPABASE_URL")
self.supabase_key = supabase_key or os.getenv("SUPABASE_SERVICE_KEY")
self.admin_mcp_url = admin_mcp_url or os.getenv("ADMIN_MCP_URL")
self.cache_ttl = cache_ttl
self.rules_store = rules_store or RulesStore()
self._rules_cache: Dict[str, Dict[str, Any]] = {} # tenant_id -> {"fetched_at":ts, "rules":[...]}
self._rule_embeddings: Dict[str, Dict[str, List[float]]] = {}
self._client = httpx.AsyncClient(timeout=15)
async def _fetch_rules_from_supabase(self, tenant_id: str) -> List[RedFlagRule]:
# Expecting a table `redflag_rules` with columns: id, tenant_id, pattern, description, severity, source, enabled, keywords (json array)
if not self.supabase_url or not self.supabase_key:
return []
url = self.supabase_url.rstrip("/") + "/rest/v1/redflag_rules"
headers = {"apikey": self.supabase_key, "Authorization": f"Bearer {self.supabase_key}"}
params = {"tenant_id": f"eq.{tenant_id}", "select": "*"}
r = await self._client.get(url, headers=headers, params=params)
r.raise_for_status()
rows = r.json()
rules: List[RedFlagRule] = []
for row in rows:
try:
keywords = row.get("keywords") or []
if isinstance(keywords, str):
# attempt to parse JSON-encoded string
try:
import json
keywords = json.loads(keywords)
except Exception:
keywords = []
rules.append(
RedFlagRule(
id=str(row.get("id")),
pattern=row.get("pattern") or "",
description=row.get("description") or "",
severity=row.get("severity") or "medium",
source=row.get("source") or "admin",
enabled=row.get("enabled", True),
keywords=keywords or [],
)
)
except Exception:
# skip invalid rows defensively
continue
return rules
async def load_rules(self, tenant_id: str) -> List[RedFlagRule]:
now = int(time.time())
entry = self._rules_cache.get(tenant_id)
if entry and now - entry["fetched_at"] < self.cache_ttl:
return entry["rules"]
rules: List[RedFlagRule] = []
remote_rules: List[RedFlagRule] = []
if self.supabase_url and self.supabase_key:
try:
remote_rules = await self._fetch_rules_from_supabase(tenant_id)
except Exception:
remote_rules = []
local_rules = self._fetch_local_rules(tenant_id)
rules.extend(remote_rules)
rules.extend(local_rules)
self._rules_cache[tenant_id] = {"fetched_at": now, "rules": rules}
# Pre-compute embeddings for semantic scoring
embed_map: Dict[str, List[float]] = {}
for rule in rules:
try:
text_for_embedding = " ".join(
[piece for piece in [rule.description, rule.pattern] if piece]
).strip() or rule.id
embed_map[rule.id] = embed_text(text_for_embedding)
except Exception:
embed_map[rule.id] = []
self._rule_embeddings[tenant_id] = embed_map
return rules
def _fetch_local_rules(self, tenant_id: str) -> List[RedFlagRule]:
if not self.rules_store:
return []
rows = self.rules_store.get_rules(tenant_id)
rules: List[RedFlagRule] = []
for raw in rows:
text = (raw or "").strip()
if not text:
continue
rule_id = hashlib.sha1(f"{tenant_id}:{text}".encode()).hexdigest()
rules.append(
RedFlagRule(
id=rule_id,
pattern=text,
description=text,
severity="high",
source="admin_local",
enabled=True,
keywords=[text.lower()] if len(text.split()) <= 6 else [],
)
)
return rules
async def check(self, tenant_id: str, text: str) -> List[RedFlagMatch]:
"""Return structured matches for the given tenant and text."""
if not text:
return []
rules = await self.load_rules(tenant_id)
matches: List[RedFlagMatch] = []
text_lower = text.lower()
text_vector = embed_text(text)
for rule in rules:
if not rule.enabled:
continue
matched = False
matched_text = ""
match_source = ""
keyword_score = 0.0
regex_score = 0.0
# 1) Keyword quick-check (cheap)
for kw in (rule.keywords or []):
if kw and kw.lower() in text_lower:
matched = True
matched_text = kw
keyword_score = 0.8
match_source = "keyword"
break
# 2) Regex check (more precise)
if not matched and rule.pattern:
try:
pat = re.compile(rule.pattern, re.IGNORECASE)
m = pat.search(text)
if m:
matched = True
matched_text = m.group(0)
regex_score = 1.0
match_source = "regex"
except re.error:
# invalid regex; skip this rule
continue
semantic_score = self._semantic_score(tenant_id, rule.id, text_vector)
confidence = max(semantic_score, keyword_score, regex_score)
if matched:
matches.append(
RedFlagMatch(
rule_id=rule.id,
pattern=rule.pattern,
severity=rule.severity,
description=rule.description,
matched_text=matched_text,
confidence=round(confidence, 2),
explanation=self._build_explanation(rule, match_source, matched_text, confidence),
)
)
elif semantic_score >= 0.82:
matches.append(
RedFlagMatch(
rule_id=rule.id,
pattern=rule.pattern,
severity=rule.severity,
description=rule.description,
matched_text=matched_text or "",
confidence=round(semantic_score, 2),
explanation=self._build_explanation(rule, "semantic", matched_text, semantic_score),
)
)
return matches
async def notify_admin(self, tenant_id: str, violations: List[RedFlagMatch], source_payload: Optional[Dict[str, Any]] = None) -> None:
"""Notify the Admin MCP server (or a webhook) about the matches."""
payload = {
"tenant_id": tenant_id,
"violations": [v.__dict__ for v in violations],
"source": source_payload or {},
}
# 1) POST to Admin MCP /alert if configured
if self.admin_mcp_url:
try:
await self._client.post(self.admin_mcp_url.rstrip("/") + "/alert", json=payload, timeout=10)
except Exception:
# swallow exceptions — notifications should not crash orchestration
pass
# 2) Optionally send to a Slack/Teams webhook
webhook = os.getenv("ALERT_WEBHOOK")
if webhook:
try:
await self._client.post(webhook, json={"text": f"Red-flag for tenant {tenant_id}", "details": payload}, timeout=10)
except Exception:
pass
async def close(self):
await self._client.aclose()
def _semantic_score(self, tenant_id: str, rule_id: str, text_vector: List[float]) -> float:
rule_vectors = self._rule_embeddings.get(tenant_id, {})
rule_vector = rule_vectors.get(rule_id)
if not rule_vector:
return 0.0
return cosine_similarity(rule_vector, text_vector)
@staticmethod
def _build_explanation(rule: RedFlagRule, source: str, matched_text: str, confidence: float) -> str:
base = f"Matched rule '{rule.description or rule.id}' via {source or 'heuristics'}"
if matched_text:
base += f" on span \"{matched_text}\""
return f"{base}. confidence={round(confidence, 2)}"