magicpin-vera-bot / security.py
SmartKapila's picture
Made some final touches integrating LLMs running and verifying judging script and making README as per requirements
abb7bf9
"""
security.py β€” Vera Message Engine
Prompt Guard middleware using Groq's meta-llama/llama-prompt-guard-2-86m.
Pillar 4: Security Shield
- Every inbound /v1/reply message passes through Prompt Guard BEFORE
touching the DB or invoking any main LLM.
- On injection detection β†’ return {"action": "end", "rationale": "Security violation detected."}
- Fail-open on API errors (logged) to avoid blocking legitimate requests.
"""
import os
import json
import logging
import requests
from typing import Dict, Any
logger = logging.getLogger("vera.security")
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "")
GROQ_CHAT_URL = "https://api.groq.com/openai/v1/chat/completions"
GUARD_MODEL = os.getenv("GUARD_MODEL", "meta-llama/llama-prompt-guard-2-86m")
def check_prompt_injection(text: str) -> bool:
"""
Run inbound text through Groq Prompt Guard.
Returns True if the text is SAFE, False if injection detected.
"""
if not text or not text.strip():
return True # Empty is safe
if not GROQ_API_KEY:
logger.warning("GROQ_API_KEY not set β€” prompt guard DISABLED (fail-open)")
return True
try:
resp = requests.post(
GROQ_CHAT_URL,
headers={
"Authorization": f"Bearer {GROQ_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": GUARD_MODEL,
"messages": [{"role": "user", "content": text}],
"temperature": 0.0,
"max_tokens": 32,
},
timeout=8,
)
if resp.status_code != 200:
logger.error("Prompt Guard returned %d: %s", resp.status_code, resp.text[:300])
return True # Fail open
data = resp.json()
guard_output = (
data.get("choices", [{}])[0]
.get("message", {})
.get("content", "")
.strip()
.lower()
)
# Check for injection indicators
injection_words = ["unsafe", "injection", "jailbreak", "malicious", "attack", "yes"]
safe_words = ["safe", "benign", "clean", "no injection", "legitimate", "no"]
for w in safe_words:
if w in guard_output:
return True
for w in injection_words:
if w in guard_output:
logger.warning("INJECTION DETECTED: %s β†’ guard said: %s", text[:100], guard_output)
return False
return True # Ambiguous β†’ fail open
except requests.Timeout:
logger.warning("Prompt Guard timed out β€” fail-open")
return True
except Exception as e:
logger.error("Prompt Guard error: %s", str(e))
return True
def injection_response() -> Dict[str, Any]:
"""Return the security violation response per judge contract."""
return {
"action": "end",
"rationale": "Security violation detected.",
}