#!/usr/bin/env python3 """ Flask Web Frontend for Guardrails System A sleek, modern ChatGPT-like interface with detailed backend insights """ import os import json import time from typing import Dict, Any, List from flask import Flask, render_template, request, jsonify, session from werkzeug.utils import secure_filename from datetime import datetime import uuid import tempfile # Apply performance optimizations early from llm_clients.performance_utils import apply_all_optimizations apply_all_optimizations() from backend import Backend import config from english_detector import is_english_by_ascii_letters_only app = Flask(__name__) # Use environment variable for secret key in production (HF Spaces) app.secret_key = os.environ.get('SECRET_KEY', 'guardrails-frontend-secret-key-change-in-production') # Configure file uploads app.config['MAX_CONTENT_LENGTH'] = 60 * 1024 * 1024 # 60MB max file size (to accommodate PDFs) ALLOWED_EXTENSIONS = {'.txt', '.md', '.text', '.rtf', '.pdf', '.docx'} # Temporary storage for safe attachments (in production, use Redis or database) safe_attachments = {} def allowed_file(filename): """Check if the uploaded file has an allowed extension""" if '.' not in filename: return False ext = '.' + filename.rsplit('.', 1)[1].lower() return ext in ALLOWED_EXTENSIONS class DetailedBackend(Backend): """Extended backend that returns detailed information for the frontend""" def process_request_detailed(self, prompt: str, attachments: List[Dict[str, Any]] = None) -> dict: """ Process request and return detailed information including: - AI detection results (confidence, latency, attack type) - LLM response - Output guardrail results - Timestamps and metadata """ start_time = time.time() result = { "message_id": str(uuid.uuid4()), "timestamp": datetime.now().isoformat(), "user_prompt": prompt, "ai_detection": {}, "llm_response": {}, "output_guardrails": {}, "total_latency_ms": 0, "is_safe": True, "final_response": "" } # Step 1: AI Detection (Input Guardrails) # Handle translation and classification with detailed logging if not self.output_test_mode: detection_start = time.time() # Check if non-English and translate if needed was_translated = False translated_prompt = prompt original_prompt = prompt try: # Translate if non-English if not is_english_by_ascii_letters_only(prompt): print("🌍 Detected non-English input (web). Translating to English...", flush=True) print(f" Original text: '{prompt[:100]}...'", flush=True) try: translator_client = self._get_translator_client() translation_start = time.time() translated_prompt = translator_client.generate_content(prompt) translation_time = (time.time() - translation_start) * 1000 was_translated = True print(f" ✅ Translated to English ({translation_time:.1f}ms): '{translated_prompt[:200]}...'", flush=True) print(f" 🔍 Will classify translated text (length: {len(translated_prompt)} chars)", flush=True) except Exception as e: error_msg = str(e) print(f"⚠️ Translation failed: {error_msg}", flush=True) print(f" Proceeding with original text (may cause classification issues).", flush=True) # Continue with original - classifier may still work translated_prompt = prompt was_translated = False else: print(f" ✅ Text is English, no translation needed", flush=True) translated_prompt = prompt # Classify with ModernBERT (always on English/translated text) print(f" 🔍 Classifying text: '{translated_prompt[:100]}...'", flush=True) print(f" Text length: {len(translated_prompt)} chars, was_translated: {was_translated}", flush=True) ai_response = self.attack_detector.generate_content(translated_prompt) json_response = self._extract_json_from_response(ai_response) ai_result = json.loads(json_response) detection_end = time.time() safety_status = ai_result.get("safety_status", "unsafe") is_safe = safety_status.lower() == "safe" confidence = ai_result.get("confidence", 0.0) print(f" 📊 Classification result: safety_status='{safety_status}', is_safe={is_safe}, confidence={confidence:.2f}", flush=True) result["ai_detection"] = { "is_safe": is_safe, "safety_status": ai_result.get("safety_status", "unknown"), "attack_type": ai_result.get("attack_type", "none"), "confidence": ai_result.get("confidence", 0.0), "reason": ai_result.get("reason", "No reason provided"), "latency_ms": round((detection_end - detection_start) * 1000, 1), "model_used": "zazaman/fmb" + (" (via Qwen translation)" if was_translated else ""), "was_translated": was_translated } if not is_safe: attack_type = ai_result.get("attack_type", "unknown") confidence = ai_result.get("confidence", 1.0) reason = ai_result.get("reason", "No specific reason provided") latency_ms = result["ai_detection"]["latency_ms"] block_reason = f"🤖 AI Security Scanner: Detected {attack_type} attack (confidence: {confidence:.2f}, latency: {latency_ms}ms). Reason: {reason}" if was_translated: block_reason += " [Original non-English text was translated to English for analysis]" result["is_safe"] = False result["final_response"] = block_reason result["total_latency_ms"] = round((time.time() - start_time) * 1000, 1) return result except Exception as e: detection_end = time.time() result["ai_detection"] = { "is_safe": False, "error": str(e), "latency_ms": round((detection_end - detection_start) * 1000, 1), "model_used": "zazaman/fmb", "was_translated": was_translated } result["is_safe"] = False result["final_response"] = f"🤖 AI Security Scanner: Error during security analysis: {str(e)}. Request blocked for safety." result["total_latency_ms"] = round((time.time() - start_time) * 1000, 1) return result # Step 2: LLM Generation llm_start = time.time() try: if config.LLM_PROVIDER == "manual": # For manual mode, we'll use a default response for the web interface llm_response = f"This is a manual LLM response to: '{prompt}'. In the web interface, manual responses would typically be pre-configured or generated by a real LLM." else: # Send files to LLM if available (currently only Gemini supports this) files_for_llm = None if attachments and hasattr(self.llm_client, 'generate_content'): # Check if this LLM client supports files (has overridden the method) try: import inspect sig = inspect.signature(self.llm_client.generate_content) if 'files' in sig.parameters: files_for_llm = attachments print(f" 📎 Sending {len(attachments)} attachment(s) to LLM") except: pass llm_response = self.llm_client.generate_content(prompt, files=files_for_llm) llm_end = time.time() result["llm_response"] = { "content": llm_response, "provider": config.LLM_PROVIDER, "model": config.LLM_CONFIG.get(config.LLM_PROVIDER, {}).get("model", "unknown"), "latency_ms": round((llm_end - llm_start) * 1000, 1), "character_count": len(llm_response) } except Exception as e: result["llm_response"] = { "error": str(e), "latency_ms": round((time.time() - llm_start) * 1000, 1) } llm_response = f"Error generating response: {str(e)}" # Step 3: Output Guardrails guardrail_start = time.time() processed_response, output_safe = self.output_guardrail_manager.process_complete_output(llm_response) guardrail_end = time.time() # Analyze what the guardrails did pii_detected = processed_response != llm_response result["output_guardrails"] = { "is_safe": output_safe, "original_length": len(llm_response), "processed_length": len(processed_response), "was_modified": pii_detected, "latency_ms": round((guardrail_end - guardrail_start) * 1000, 1), "guardrails_active": list(config.OUTPUT_GUARDRAILS_CONFIG.keys()), "processing_details": [] } if pii_detected: result["output_guardrails"]["processing_details"].append({ "type": "PII_ANONYMIZATION", "description": "Personal information was detected and anonymized", "characters_changed": abs(len(processed_response) - len(llm_response)) }) if not output_safe: result["is_safe"] = False result["final_response"] = processed_response # This would be a block message else: result["final_response"] = processed_response result["total_latency_ms"] = round((time.time() - start_time) * 1000, 1) return result def process_attachment(self, file_path: str, file_content: bytes) -> dict: """ Process an uploaded attachment through attachment guardrails. Args: file_path: Name of the uploaded file file_content: Raw bytes content of the file Returns: Dict containing attachment analysis results """ start_time = time.time() result = { "attachment_id": str(uuid.uuid4()), "timestamp": datetime.now().isoformat(), "filename": file_path, "is_safe": True, "analysis_time_ms": 0, "guardrail_analysis": {} } try: if not self.attachment_guardrail_manager: result["is_safe"] = False result["error"] = "Attachment guardrails not available" return result # Process attachment through guardrails is_safe, analysis = self.attachment_guardrail_manager.process_attachment(file_path, file_content) result["is_safe"] = is_safe result["guardrail_analysis"] = analysis result["analysis_time_ms"] = round((time.time() - start_time) * 1000, 1) return result except Exception as e: result["is_safe"] = False result["error"] = f"Error processing attachment: {str(e)}" result["analysis_time_ms"] = round((time.time() - start_time) * 1000, 1) return result # Initialize detailed backend print("Initializing Guardrails Web Interface...") try: detailed_backend = DetailedBackend() print("✅ Detailed backend initialized successfully") except Exception as e: print(f"❌ Error initializing detailed backend: {e}") print(" Make sure you have all required dependencies installed:") print(" pip install flask transformers torch presidio-analyzer presidio-anonymizer") detailed_backend = None @app.route('/') def index(): """Main chat interface""" return render_template('index.html') @app.route('/api/upload', methods=['POST']) def upload_file(): """Handle file uploads and process them through attachment guardrails""" if not detailed_backend: return jsonify({ "error": "Backend not initialized", "message": "The guardrails system is not available" }), 500 try: # Check if file was uploaded if 'file' not in request.files: return jsonify({"error": "No file uploaded"}), 400 file = request.files['file'] # Check if file was selected if file.filename == '': return jsonify({"error": "No file selected"}), 400 # Check file extension if not allowed_file(file.filename): return jsonify({ "error": f"Unsupported file type. Allowed extensions: {', '.join(ALLOWED_EXTENSIONS)}" }), 400 # Read file content file_content = file.read() # Process file through attachment guardrails result = detailed_backend.process_attachment(file.filename, file_content) # If file is safe, store it temporarily for potential use with LLM if result.get("is_safe", False): attachment_id = result["attachment_id"] safe_attachments[attachment_id] = { "filename": file.filename, "content": file_content, "extension": os.path.splitext(file.filename.lower())[1], "analysis": result } result["attachment_id"] = attachment_id print(f" 💾 Stored safe attachment: {file.filename} (ID: {attachment_id})") return jsonify(result) except Exception as e: return jsonify({ "error": str(e), "message": "An error occurred while processing the file" }), 500 @app.route('/api/chat', methods=['POST']) def chat(): """Handle chat messages and return detailed response""" if not detailed_backend: return jsonify({ "error": "Backend not initialized", "message": "The guardrails system is not available" }), 500 data = request.get_json() user_message = data.get('message', '').strip() attachments = data.get('attachments', []) # List of attachment IDs or data if not user_message and not attachments: return jsonify({"error": "Empty message and no attachments"}), 400 try: # Process attachments first if any attachment_results = [] safe_attachment_files = [] safe_to_proceed = True for attachment in attachments: attachment_id = attachment.get("id") if attachment_id and attachment_id in safe_attachments: stored_attachment = safe_attachments[attachment_id] attachment_results.append({ "id": attachment_id, "filename": stored_attachment["filename"], "is_safe": True, "analysis": stored_attachment["analysis"] }) # Prepare file for LLM safe_attachment_files.append({ "filename": stored_attachment["filename"], "content": stored_attachment["content"], "extension": stored_attachment["extension"] }) else: # Attachment not found or not safe safe_to_proceed = False attachment_results.append({ "id": attachment_id, "is_safe": False, "error": "Attachment not found or not safe" }) # Process the message with detailed backend only if attachments are safe if safe_to_proceed: result = detailed_backend.process_request_detailed(user_message, safe_attachment_files if safe_attachment_files else None) result["attachments"] = attachment_results # Clean up used attachments for attachment in attachments: attachment_id = attachment.get("id") if attachment_id in safe_attachments: del safe_attachments[attachment_id] else: result = { "message_id": str(uuid.uuid4()), "timestamp": datetime.now().isoformat(), "user_prompt": user_message, "is_safe": False, "final_response": "Request blocked due to unsafe attachments", "attachments": attachment_results, "total_latency_ms": 0 } # Store in session for history if 'chat_history' not in session: session['chat_history'] = [] session['chat_history'].append(result) return jsonify(result) except Exception as e: return jsonify({ "error": str(e), "message": "An error occurred while processing your message" }), 500 @app.route('/api/config') def get_config(): """Get current system configuration""" return jsonify({ "llm_provider": config.LLM_PROVIDER, "ai_detection_enabled": config.AI_DETECTION_MODE["enabled"], "model_name": config.AI_DETECTION_MODE["attack_llm_config"].get("model_name", "unknown"), "output_guardrails": { name: guard_config.get("enabled", False) for name, guard_config in config.OUTPUT_GUARDRAILS_CONFIG.items() } }) @app.route('/api/stats') def get_stats(): """Get session statistics""" history = session.get('chat_history', []) if not history: return jsonify({ "total_messages": 0, "avg_latency": 0, "blocks_count": 0, "pii_anonymizations": 0 }) total_messages = len(history) total_latency = sum(msg.get('total_latency_ms', 0) for msg in history) avg_latency = round(total_latency / total_messages, 1) if total_messages > 0 else 0 blocks_count = sum(1 for msg in history if not msg.get('is_safe', True)) pii_count = sum(1 for msg in history if msg.get('output_guardrails', {}).get('was_modified', False)) return jsonify({ "total_messages": total_messages, "avg_latency": avg_latency, "blocks_count": blocks_count, "pii_anonymizations": pii_count }) if __name__ == '__main__': print("="*60) print("🌐 Guardrails Web Interface") print("🔒 AI-powered attack detection with sleek UI") print("="*60) # Check if running on HF Spaces or locally port = int(os.environ.get('PORT', 7860)) host = '0.0.0.0' # Accept connections from any IP debug_mode = os.environ.get('DEBUG', 'false').lower() == 'true' if port == 7860: print("🚀 Starting server for Hugging Face Spaces at http://0.0.0.0:7860") else: print(f"🚀 Starting server at http://{host}:{port}") print("💡 Press Ctrl+C to stop the server") print("="*60) app.run(debug=debug_mode, host=host, port=port)