zazaman's picture
Add comprehensive logging with flush for translation debugging
1ff012c
#!/usr/bin/env python3
"""
Flask Web Frontend for Guardrails System
A sleek, modern ChatGPT-like interface with detailed backend insights
"""
import os
import json
import time
from typing import Dict, Any, List
from flask import Flask, render_template, request, jsonify, session
from werkzeug.utils import secure_filename
from datetime import datetime
import uuid
import tempfile
# Apply performance optimizations early
from llm_clients.performance_utils import apply_all_optimizations
apply_all_optimizations()
from backend import Backend
import config
from english_detector import is_english_by_ascii_letters_only
app = Flask(__name__)
# Use environment variable for secret key in production (HF Spaces)
app.secret_key = os.environ.get('SECRET_KEY', 'guardrails-frontend-secret-key-change-in-production')
# Configure file uploads
app.config['MAX_CONTENT_LENGTH'] = 60 * 1024 * 1024 # 60MB max file size (to accommodate PDFs)
ALLOWED_EXTENSIONS = {'.txt', '.md', '.text', '.rtf', '.pdf', '.docx'}
# Temporary storage for safe attachments (in production, use Redis or database)
safe_attachments = {}
def allowed_file(filename):
"""Check if the uploaded file has an allowed extension"""
if '.' not in filename:
return False
ext = '.' + filename.rsplit('.', 1)[1].lower()
return ext in ALLOWED_EXTENSIONS
class DetailedBackend(Backend):
"""Extended backend that returns detailed information for the frontend"""
def process_request_detailed(self, prompt: str, attachments: List[Dict[str, Any]] = None) -> dict:
"""
Process request and return detailed information including:
- AI detection results (confidence, latency, attack type)
- LLM response
- Output guardrail results
- Timestamps and metadata
"""
start_time = time.time()
result = {
"message_id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat(),
"user_prompt": prompt,
"ai_detection": {},
"llm_response": {},
"output_guardrails": {},
"total_latency_ms": 0,
"is_safe": True,
"final_response": ""
}
# Step 1: AI Detection (Input Guardrails)
# Handle translation and classification with detailed logging
if not self.output_test_mode:
detection_start = time.time()
# Check if non-English and translate if needed
was_translated = False
translated_prompt = prompt
original_prompt = prompt
try:
# Translate if non-English
if not is_english_by_ascii_letters_only(prompt):
print("🌍 Detected non-English input (web). Translating to English...", flush=True)
print(f" Original text: '{prompt[:100]}...'", flush=True)
try:
translator_client = self._get_translator_client()
translation_start = time.time()
translated_prompt = translator_client.generate_content(prompt)
translation_time = (time.time() - translation_start) * 1000
was_translated = True
print(f" βœ… Translated to English ({translation_time:.1f}ms): '{translated_prompt[:200]}...'", flush=True)
print(f" πŸ” Will classify translated text (length: {len(translated_prompt)} chars)", flush=True)
except Exception as e:
error_msg = str(e)
print(f"⚠️ Translation failed: {error_msg}", flush=True)
print(f" Proceeding with original text (may cause classification issues).", flush=True)
# Continue with original - classifier may still work
translated_prompt = prompt
was_translated = False
else:
print(f" βœ… Text is English, no translation needed", flush=True)
translated_prompt = prompt
# Classify with ModernBERT (always on English/translated text)
print(f" πŸ” Classifying text: '{translated_prompt[:100]}...'", flush=True)
print(f" Text length: {len(translated_prompt)} chars, was_translated: {was_translated}", flush=True)
ai_response = self.attack_detector.generate_content(translated_prompt)
json_response = self._extract_json_from_response(ai_response)
ai_result = json.loads(json_response)
detection_end = time.time()
safety_status = ai_result.get("safety_status", "unsafe")
is_safe = safety_status.lower() == "safe"
confidence = ai_result.get("confidence", 0.0)
print(f" πŸ“Š Classification result: safety_status='{safety_status}', is_safe={is_safe}, confidence={confidence:.2f}", flush=True)
result["ai_detection"] = {
"is_safe": is_safe,
"safety_status": ai_result.get("safety_status", "unknown"),
"attack_type": ai_result.get("attack_type", "none"),
"confidence": ai_result.get("confidence", 0.0),
"reason": ai_result.get("reason", "No reason provided"),
"latency_ms": round((detection_end - detection_start) * 1000, 1),
"model_used": "zazaman/fmb" + (" (via Qwen translation)" if was_translated else ""),
"was_translated": was_translated
}
if not is_safe:
attack_type = ai_result.get("attack_type", "unknown")
confidence = ai_result.get("confidence", 1.0)
reason = ai_result.get("reason", "No specific reason provided")
latency_ms = result["ai_detection"]["latency_ms"]
block_reason = f"πŸ€– AI Security Scanner: Detected {attack_type} attack (confidence: {confidence:.2f}, latency: {latency_ms}ms). Reason: {reason}"
if was_translated:
block_reason += " [Original non-English text was translated to English for analysis]"
result["is_safe"] = False
result["final_response"] = block_reason
result["total_latency_ms"] = round((time.time() - start_time) * 1000, 1)
return result
except Exception as e:
detection_end = time.time()
result["ai_detection"] = {
"is_safe": False,
"error": str(e),
"latency_ms": round((detection_end - detection_start) * 1000, 1),
"model_used": "zazaman/fmb",
"was_translated": was_translated
}
result["is_safe"] = False
result["final_response"] = f"πŸ€– AI Security Scanner: Error during security analysis: {str(e)}. Request blocked for safety."
result["total_latency_ms"] = round((time.time() - start_time) * 1000, 1)
return result
# Step 2: LLM Generation
llm_start = time.time()
try:
if config.LLM_PROVIDER == "manual":
# For manual mode, we'll use a default response for the web interface
llm_response = f"This is a manual LLM response to: '{prompt}'. In the web interface, manual responses would typically be pre-configured or generated by a real LLM."
else:
# Send files to LLM if available (currently only Gemini supports this)
files_for_llm = None
if attachments and hasattr(self.llm_client, 'generate_content'):
# Check if this LLM client supports files (has overridden the method)
try:
import inspect
sig = inspect.signature(self.llm_client.generate_content)
if 'files' in sig.parameters:
files_for_llm = attachments
print(f" πŸ“Ž Sending {len(attachments)} attachment(s) to LLM")
except:
pass
llm_response = self.llm_client.generate_content(prompt, files=files_for_llm)
llm_end = time.time()
result["llm_response"] = {
"content": llm_response,
"provider": config.LLM_PROVIDER,
"model": config.LLM_CONFIG.get(config.LLM_PROVIDER, {}).get("model", "unknown"),
"latency_ms": round((llm_end - llm_start) * 1000, 1),
"character_count": len(llm_response)
}
except Exception as e:
result["llm_response"] = {
"error": str(e),
"latency_ms": round((time.time() - llm_start) * 1000, 1)
}
llm_response = f"Error generating response: {str(e)}"
# Step 3: Output Guardrails
guardrail_start = time.time()
processed_response, output_safe = self.output_guardrail_manager.process_complete_output(llm_response)
guardrail_end = time.time()
# Analyze what the guardrails did
pii_detected = processed_response != llm_response
result["output_guardrails"] = {
"is_safe": output_safe,
"original_length": len(llm_response),
"processed_length": len(processed_response),
"was_modified": pii_detected,
"latency_ms": round((guardrail_end - guardrail_start) * 1000, 1),
"guardrails_active": list(config.OUTPUT_GUARDRAILS_CONFIG.keys()),
"processing_details": []
}
if pii_detected:
result["output_guardrails"]["processing_details"].append({
"type": "PII_ANONYMIZATION",
"description": "Personal information was detected and anonymized",
"characters_changed": abs(len(processed_response) - len(llm_response))
})
if not output_safe:
result["is_safe"] = False
result["final_response"] = processed_response # This would be a block message
else:
result["final_response"] = processed_response
result["total_latency_ms"] = round((time.time() - start_time) * 1000, 1)
return result
def process_attachment(self, file_path: str, file_content: bytes) -> dict:
"""
Process an uploaded attachment through attachment guardrails.
Args:
file_path: Name of the uploaded file
file_content: Raw bytes content of the file
Returns:
Dict containing attachment analysis results
"""
start_time = time.time()
result = {
"attachment_id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat(),
"filename": file_path,
"is_safe": True,
"analysis_time_ms": 0,
"guardrail_analysis": {}
}
try:
if not self.attachment_guardrail_manager:
result["is_safe"] = False
result["error"] = "Attachment guardrails not available"
return result
# Process attachment through guardrails
is_safe, analysis = self.attachment_guardrail_manager.process_attachment(file_path, file_content)
result["is_safe"] = is_safe
result["guardrail_analysis"] = analysis
result["analysis_time_ms"] = round((time.time() - start_time) * 1000, 1)
return result
except Exception as e:
result["is_safe"] = False
result["error"] = f"Error processing attachment: {str(e)}"
result["analysis_time_ms"] = round((time.time() - start_time) * 1000, 1)
return result
# Initialize detailed backend
print("Initializing Guardrails Web Interface...")
try:
detailed_backend = DetailedBackend()
print("βœ… Detailed backend initialized successfully")
except Exception as e:
print(f"❌ Error initializing detailed backend: {e}")
print(" Make sure you have all required dependencies installed:")
print(" pip install flask transformers torch presidio-analyzer presidio-anonymizer")
detailed_backend = None
@app.route('/')
def index():
"""Main chat interface"""
return render_template('index.html')
@app.route('/api/upload', methods=['POST'])
def upload_file():
"""Handle file uploads and process them through attachment guardrails"""
if not detailed_backend:
return jsonify({
"error": "Backend not initialized",
"message": "The guardrails system is not available"
}), 500
try:
# Check if file was uploaded
if 'file' not in request.files:
return jsonify({"error": "No file uploaded"}), 400
file = request.files['file']
# Check if file was selected
if file.filename == '':
return jsonify({"error": "No file selected"}), 400
# Check file extension
if not allowed_file(file.filename):
return jsonify({
"error": f"Unsupported file type. Allowed extensions: {', '.join(ALLOWED_EXTENSIONS)}"
}), 400
# Read file content
file_content = file.read()
# Process file through attachment guardrails
result = detailed_backend.process_attachment(file.filename, file_content)
# If file is safe, store it temporarily for potential use with LLM
if result.get("is_safe", False):
attachment_id = result["attachment_id"]
safe_attachments[attachment_id] = {
"filename": file.filename,
"content": file_content,
"extension": os.path.splitext(file.filename.lower())[1],
"analysis": result
}
result["attachment_id"] = attachment_id
print(f" πŸ’Ύ Stored safe attachment: {file.filename} (ID: {attachment_id})")
return jsonify(result)
except Exception as e:
return jsonify({
"error": str(e),
"message": "An error occurred while processing the file"
}), 500
@app.route('/api/chat', methods=['POST'])
def chat():
"""Handle chat messages and return detailed response"""
if not detailed_backend:
return jsonify({
"error": "Backend not initialized",
"message": "The guardrails system is not available"
}), 500
data = request.get_json()
user_message = data.get('message', '').strip()
attachments = data.get('attachments', []) # List of attachment IDs or data
if not user_message and not attachments:
return jsonify({"error": "Empty message and no attachments"}), 400
try:
# Process attachments first if any
attachment_results = []
safe_attachment_files = []
safe_to_proceed = True
for attachment in attachments:
attachment_id = attachment.get("id")
if attachment_id and attachment_id in safe_attachments:
stored_attachment = safe_attachments[attachment_id]
attachment_results.append({
"id": attachment_id,
"filename": stored_attachment["filename"],
"is_safe": True,
"analysis": stored_attachment["analysis"]
})
# Prepare file for LLM
safe_attachment_files.append({
"filename": stored_attachment["filename"],
"content": stored_attachment["content"],
"extension": stored_attachment["extension"]
})
else:
# Attachment not found or not safe
safe_to_proceed = False
attachment_results.append({
"id": attachment_id,
"is_safe": False,
"error": "Attachment not found or not safe"
})
# Process the message with detailed backend only if attachments are safe
if safe_to_proceed:
result = detailed_backend.process_request_detailed(user_message, safe_attachment_files if safe_attachment_files else None)
result["attachments"] = attachment_results
# Clean up used attachments
for attachment in attachments:
attachment_id = attachment.get("id")
if attachment_id in safe_attachments:
del safe_attachments[attachment_id]
else:
result = {
"message_id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat(),
"user_prompt": user_message,
"is_safe": False,
"final_response": "Request blocked due to unsafe attachments",
"attachments": attachment_results,
"total_latency_ms": 0
}
# Store in session for history
if 'chat_history' not in session:
session['chat_history'] = []
session['chat_history'].append(result)
return jsonify(result)
except Exception as e:
return jsonify({
"error": str(e),
"message": "An error occurred while processing your message"
}), 500
@app.route('/api/config')
def get_config():
"""Get current system configuration"""
return jsonify({
"llm_provider": config.LLM_PROVIDER,
"ai_detection_enabled": config.AI_DETECTION_MODE["enabled"],
"model_name": config.AI_DETECTION_MODE["attack_llm_config"].get("model_name", "unknown"),
"output_guardrails": {
name: guard_config.get("enabled", False)
for name, guard_config in config.OUTPUT_GUARDRAILS_CONFIG.items()
}
})
@app.route('/api/stats')
def get_stats():
"""Get session statistics"""
history = session.get('chat_history', [])
if not history:
return jsonify({
"total_messages": 0,
"avg_latency": 0,
"blocks_count": 0,
"pii_anonymizations": 0
})
total_messages = len(history)
total_latency = sum(msg.get('total_latency_ms', 0) for msg in history)
avg_latency = round(total_latency / total_messages, 1) if total_messages > 0 else 0
blocks_count = sum(1 for msg in history if not msg.get('is_safe', True))
pii_count = sum(1 for msg in history
if msg.get('output_guardrails', {}).get('was_modified', False))
return jsonify({
"total_messages": total_messages,
"avg_latency": avg_latency,
"blocks_count": blocks_count,
"pii_anonymizations": pii_count
})
if __name__ == '__main__':
print("="*60)
print("🌐 Guardrails Web Interface")
print("πŸ”’ AI-powered attack detection with sleek UI")
print("="*60)
# Check if running on HF Spaces or locally
port = int(os.environ.get('PORT', 7860))
host = '0.0.0.0' # Accept connections from any IP
debug_mode = os.environ.get('DEBUG', 'false').lower() == 'true'
if port == 7860:
print("πŸš€ Starting server for Hugging Face Spaces at http://0.0.0.0:7860")
else:
print(f"πŸš€ Starting server at http://{host}:{port}")
print("πŸ’‘ Press Ctrl+C to stop the server")
print("="*60)
app.run(debug=debug_mode, host=host, port=port)