Spaces:
Running
Running
Made some final touches integrating LLMs running and verifying judging script and making README as per requirements
abb7bf9 | """ | |
| security.py β Vera Message Engine | |
| Prompt Guard middleware using Groq's meta-llama/llama-prompt-guard-2-86m. | |
| Pillar 4: Security Shield | |
| - Every inbound /v1/reply message passes through Prompt Guard BEFORE | |
| touching the DB or invoking any main LLM. | |
| - On injection detection β return {"action": "end", "rationale": "Security violation detected."} | |
| - Fail-open on API errors (logged) to avoid blocking legitimate requests. | |
| """ | |
| import os | |
| import json | |
| import logging | |
| import requests | |
| from typing import Dict, Any | |
| logger = logging.getLogger("vera.security") | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY", "") | |
| GROQ_CHAT_URL = "https://api.groq.com/openai/v1/chat/completions" | |
| GUARD_MODEL = os.getenv("GUARD_MODEL", "meta-llama/llama-prompt-guard-2-86m") | |
| def check_prompt_injection(text: str) -> bool: | |
| """ | |
| Run inbound text through Groq Prompt Guard. | |
| Returns True if the text is SAFE, False if injection detected. | |
| """ | |
| if not text or not text.strip(): | |
| return True # Empty is safe | |
| if not GROQ_API_KEY: | |
| logger.warning("GROQ_API_KEY not set β prompt guard DISABLED (fail-open)") | |
| return True | |
| try: | |
| resp = requests.post( | |
| GROQ_CHAT_URL, | |
| headers={ | |
| "Authorization": f"Bearer {GROQ_API_KEY}", | |
| "Content-Type": "application/json", | |
| }, | |
| json={ | |
| "model": GUARD_MODEL, | |
| "messages": [{"role": "user", "content": text}], | |
| "temperature": 0.0, | |
| "max_tokens": 32, | |
| }, | |
| timeout=8, | |
| ) | |
| if resp.status_code != 200: | |
| logger.error("Prompt Guard returned %d: %s", resp.status_code, resp.text[:300]) | |
| return True # Fail open | |
| data = resp.json() | |
| guard_output = ( | |
| data.get("choices", [{}])[0] | |
| .get("message", {}) | |
| .get("content", "") | |
| .strip() | |
| .lower() | |
| ) | |
| # Check for injection indicators | |
| injection_words = ["unsafe", "injection", "jailbreak", "malicious", "attack", "yes"] | |
| safe_words = ["safe", "benign", "clean", "no injection", "legitimate", "no"] | |
| for w in safe_words: | |
| if w in guard_output: | |
| return True | |
| for w in injection_words: | |
| if w in guard_output: | |
| logger.warning("INJECTION DETECTED: %s β guard said: %s", text[:100], guard_output) | |
| return False | |
| return True # Ambiguous β fail open | |
| except requests.Timeout: | |
| logger.warning("Prompt Guard timed out β fail-open") | |
| return True | |
| except Exception as e: | |
| logger.error("Prompt Guard error: %s", str(e)) | |
| return True | |
| def injection_response() -> Dict[str, Any]: | |
| """Return the security violation response per judge contract.""" | |
| return { | |
| "action": "end", | |
| "rationale": "Security violation detected.", | |
| } | |