Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Flask Web Frontend for Guardrails System | |
| A sleek, modern ChatGPT-like interface with detailed backend insights | |
| """ | |
| import os | |
| import json | |
| import time | |
| from typing import Dict, Any, List | |
| from flask import Flask, render_template, request, jsonify, session | |
| from werkzeug.utils import secure_filename | |
| from datetime import datetime | |
| import uuid | |
| import tempfile | |
| # Apply performance optimizations early | |
| from llm_clients.performance_utils import apply_all_optimizations | |
| apply_all_optimizations() | |
| from backend import Backend | |
| import config | |
| from english_detector import is_english_by_ascii_letters_only | |
| app = Flask(__name__) | |
| # Use environment variable for secret key in production (HF Spaces) | |
| app.secret_key = os.environ.get('SECRET_KEY', 'guardrails-frontend-secret-key-change-in-production') | |
| # Configure file uploads | |
| app.config['MAX_CONTENT_LENGTH'] = 60 * 1024 * 1024 # 60MB max file size (to accommodate PDFs) | |
| ALLOWED_EXTENSIONS = {'.txt', '.md', '.text', '.rtf', '.pdf', '.docx'} | |
| # Temporary storage for safe attachments (in production, use Redis or database) | |
| safe_attachments = {} | |
| def allowed_file(filename): | |
| """Check if the uploaded file has an allowed extension""" | |
| if '.' not in filename: | |
| return False | |
| ext = '.' + filename.rsplit('.', 1)[1].lower() | |
| return ext in ALLOWED_EXTENSIONS | |
| class DetailedBackend(Backend): | |
| """Extended backend that returns detailed information for the frontend""" | |
| def process_request_detailed(self, prompt: str, attachments: List[Dict[str, Any]] = None) -> dict: | |
| """ | |
| Process request and return detailed information including: | |
| - AI detection results (confidence, latency, attack type) | |
| - LLM response | |
| - Output guardrail results | |
| - Timestamps and metadata | |
| """ | |
| start_time = time.time() | |
| result = { | |
| "message_id": str(uuid.uuid4()), | |
| "timestamp": datetime.now().isoformat(), | |
| "user_prompt": prompt, | |
| "ai_detection": {}, | |
| "llm_response": {}, | |
| "output_guardrails": {}, | |
| "total_latency_ms": 0, | |
| "is_safe": True, | |
| "final_response": "" | |
| } | |
| # Step 1: AI Detection (Input Guardrails) | |
| # Handle translation and classification with detailed logging | |
| if not self.output_test_mode: | |
| detection_start = time.time() | |
| # Check if non-English and translate if needed | |
| was_translated = False | |
| translated_prompt = prompt | |
| original_prompt = prompt | |
| try: | |
| # Translate if non-English | |
| if not is_english_by_ascii_letters_only(prompt): | |
| print("π Detected non-English input (web). Translating to English...", flush=True) | |
| print(f" Original text: '{prompt[:100]}...'", flush=True) | |
| try: | |
| translator_client = self._get_translator_client() | |
| translation_start = time.time() | |
| translated_prompt = translator_client.generate_content(prompt) | |
| translation_time = (time.time() - translation_start) * 1000 | |
| was_translated = True | |
| print(f" β Translated to English ({translation_time:.1f}ms): '{translated_prompt[:200]}...'", flush=True) | |
| print(f" π Will classify translated text (length: {len(translated_prompt)} chars)", flush=True) | |
| except Exception as e: | |
| error_msg = str(e) | |
| print(f"β οΈ Translation failed: {error_msg}", flush=True) | |
| print(f" Proceeding with original text (may cause classification issues).", flush=True) | |
| # Continue with original - classifier may still work | |
| translated_prompt = prompt | |
| was_translated = False | |
| else: | |
| print(f" β Text is English, no translation needed", flush=True) | |
| translated_prompt = prompt | |
| # Classify with ModernBERT (always on English/translated text) | |
| print(f" π Classifying text: '{translated_prompt[:100]}...'", flush=True) | |
| print(f" Text length: {len(translated_prompt)} chars, was_translated: {was_translated}", flush=True) | |
| ai_response = self.attack_detector.generate_content(translated_prompt) | |
| json_response = self._extract_json_from_response(ai_response) | |
| ai_result = json.loads(json_response) | |
| detection_end = time.time() | |
| safety_status = ai_result.get("safety_status", "unsafe") | |
| is_safe = safety_status.lower() == "safe" | |
| confidence = ai_result.get("confidence", 0.0) | |
| print(f" π Classification result: safety_status='{safety_status}', is_safe={is_safe}, confidence={confidence:.2f}", flush=True) | |
| result["ai_detection"] = { | |
| "is_safe": is_safe, | |
| "safety_status": ai_result.get("safety_status", "unknown"), | |
| "attack_type": ai_result.get("attack_type", "none"), | |
| "confidence": ai_result.get("confidence", 0.0), | |
| "reason": ai_result.get("reason", "No reason provided"), | |
| "latency_ms": round((detection_end - detection_start) * 1000, 1), | |
| "model_used": "zazaman/fmb" + (" (via Qwen translation)" if was_translated else ""), | |
| "was_translated": was_translated | |
| } | |
| if not is_safe: | |
| attack_type = ai_result.get("attack_type", "unknown") | |
| confidence = ai_result.get("confidence", 1.0) | |
| reason = ai_result.get("reason", "No specific reason provided") | |
| latency_ms = result["ai_detection"]["latency_ms"] | |
| block_reason = f"π€ AI Security Scanner: Detected {attack_type} attack (confidence: {confidence:.2f}, latency: {latency_ms}ms). Reason: {reason}" | |
| if was_translated: | |
| block_reason += " [Original non-English text was translated to English for analysis]" | |
| result["is_safe"] = False | |
| result["final_response"] = block_reason | |
| result["total_latency_ms"] = round((time.time() - start_time) * 1000, 1) | |
| return result | |
| except Exception as e: | |
| detection_end = time.time() | |
| result["ai_detection"] = { | |
| "is_safe": False, | |
| "error": str(e), | |
| "latency_ms": round((detection_end - detection_start) * 1000, 1), | |
| "model_used": "zazaman/fmb", | |
| "was_translated": was_translated | |
| } | |
| result["is_safe"] = False | |
| result["final_response"] = f"π€ AI Security Scanner: Error during security analysis: {str(e)}. Request blocked for safety." | |
| result["total_latency_ms"] = round((time.time() - start_time) * 1000, 1) | |
| return result | |
| # Step 2: LLM Generation | |
| llm_start = time.time() | |
| try: | |
| if config.LLM_PROVIDER == "manual": | |
| # For manual mode, we'll use a default response for the web interface | |
| llm_response = f"This is a manual LLM response to: '{prompt}'. In the web interface, manual responses would typically be pre-configured or generated by a real LLM." | |
| else: | |
| # Send files to LLM if available (currently only Gemini supports this) | |
| files_for_llm = None | |
| if attachments and hasattr(self.llm_client, 'generate_content'): | |
| # Check if this LLM client supports files (has overridden the method) | |
| try: | |
| import inspect | |
| sig = inspect.signature(self.llm_client.generate_content) | |
| if 'files' in sig.parameters: | |
| files_for_llm = attachments | |
| print(f" π Sending {len(attachments)} attachment(s) to LLM") | |
| except: | |
| pass | |
| llm_response = self.llm_client.generate_content(prompt, files=files_for_llm) | |
| llm_end = time.time() | |
| result["llm_response"] = { | |
| "content": llm_response, | |
| "provider": config.LLM_PROVIDER, | |
| "model": config.LLM_CONFIG.get(config.LLM_PROVIDER, {}).get("model", "unknown"), | |
| "latency_ms": round((llm_end - llm_start) * 1000, 1), | |
| "character_count": len(llm_response) | |
| } | |
| except Exception as e: | |
| result["llm_response"] = { | |
| "error": str(e), | |
| "latency_ms": round((time.time() - llm_start) * 1000, 1) | |
| } | |
| llm_response = f"Error generating response: {str(e)}" | |
| # Step 3: Output Guardrails | |
| guardrail_start = time.time() | |
| processed_response, output_safe = self.output_guardrail_manager.process_complete_output(llm_response) | |
| guardrail_end = time.time() | |
| # Analyze what the guardrails did | |
| pii_detected = processed_response != llm_response | |
| result["output_guardrails"] = { | |
| "is_safe": output_safe, | |
| "original_length": len(llm_response), | |
| "processed_length": len(processed_response), | |
| "was_modified": pii_detected, | |
| "latency_ms": round((guardrail_end - guardrail_start) * 1000, 1), | |
| "guardrails_active": list(config.OUTPUT_GUARDRAILS_CONFIG.keys()), | |
| "processing_details": [] | |
| } | |
| if pii_detected: | |
| result["output_guardrails"]["processing_details"].append({ | |
| "type": "PII_ANONYMIZATION", | |
| "description": "Personal information was detected and anonymized", | |
| "characters_changed": abs(len(processed_response) - len(llm_response)) | |
| }) | |
| if not output_safe: | |
| result["is_safe"] = False | |
| result["final_response"] = processed_response # This would be a block message | |
| else: | |
| result["final_response"] = processed_response | |
| result["total_latency_ms"] = round((time.time() - start_time) * 1000, 1) | |
| return result | |
| def process_attachment(self, file_path: str, file_content: bytes) -> dict: | |
| """ | |
| Process an uploaded attachment through attachment guardrails. | |
| Args: | |
| file_path: Name of the uploaded file | |
| file_content: Raw bytes content of the file | |
| Returns: | |
| Dict containing attachment analysis results | |
| """ | |
| start_time = time.time() | |
| result = { | |
| "attachment_id": str(uuid.uuid4()), | |
| "timestamp": datetime.now().isoformat(), | |
| "filename": file_path, | |
| "is_safe": True, | |
| "analysis_time_ms": 0, | |
| "guardrail_analysis": {} | |
| } | |
| try: | |
| if not self.attachment_guardrail_manager: | |
| result["is_safe"] = False | |
| result["error"] = "Attachment guardrails not available" | |
| return result | |
| # Process attachment through guardrails | |
| is_safe, analysis = self.attachment_guardrail_manager.process_attachment(file_path, file_content) | |
| result["is_safe"] = is_safe | |
| result["guardrail_analysis"] = analysis | |
| result["analysis_time_ms"] = round((time.time() - start_time) * 1000, 1) | |
| return result | |
| except Exception as e: | |
| result["is_safe"] = False | |
| result["error"] = f"Error processing attachment: {str(e)}" | |
| result["analysis_time_ms"] = round((time.time() - start_time) * 1000, 1) | |
| return result | |
| # Initialize detailed backend | |
| print("Initializing Guardrails Web Interface...") | |
| try: | |
| detailed_backend = DetailedBackend() | |
| print("β Detailed backend initialized successfully") | |
| except Exception as e: | |
| print(f"β Error initializing detailed backend: {e}") | |
| print(" Make sure you have all required dependencies installed:") | |
| print(" pip install flask transformers torch presidio-analyzer presidio-anonymizer") | |
| detailed_backend = None | |
| def index(): | |
| """Main chat interface""" | |
| return render_template('index.html') | |
| def upload_file(): | |
| """Handle file uploads and process them through attachment guardrails""" | |
| if not detailed_backend: | |
| return jsonify({ | |
| "error": "Backend not initialized", | |
| "message": "The guardrails system is not available" | |
| }), 500 | |
| try: | |
| # Check if file was uploaded | |
| if 'file' not in request.files: | |
| return jsonify({"error": "No file uploaded"}), 400 | |
| file = request.files['file'] | |
| # Check if file was selected | |
| if file.filename == '': | |
| return jsonify({"error": "No file selected"}), 400 | |
| # Check file extension | |
| if not allowed_file(file.filename): | |
| return jsonify({ | |
| "error": f"Unsupported file type. Allowed extensions: {', '.join(ALLOWED_EXTENSIONS)}" | |
| }), 400 | |
| # Read file content | |
| file_content = file.read() | |
| # Process file through attachment guardrails | |
| result = detailed_backend.process_attachment(file.filename, file_content) | |
| # If file is safe, store it temporarily for potential use with LLM | |
| if result.get("is_safe", False): | |
| attachment_id = result["attachment_id"] | |
| safe_attachments[attachment_id] = { | |
| "filename": file.filename, | |
| "content": file_content, | |
| "extension": os.path.splitext(file.filename.lower())[1], | |
| "analysis": result | |
| } | |
| result["attachment_id"] = attachment_id | |
| print(f" πΎ Stored safe attachment: {file.filename} (ID: {attachment_id})") | |
| return jsonify(result) | |
| except Exception as e: | |
| return jsonify({ | |
| "error": str(e), | |
| "message": "An error occurred while processing the file" | |
| }), 500 | |
| def chat(): | |
| """Handle chat messages and return detailed response""" | |
| if not detailed_backend: | |
| return jsonify({ | |
| "error": "Backend not initialized", | |
| "message": "The guardrails system is not available" | |
| }), 500 | |
| data = request.get_json() | |
| user_message = data.get('message', '').strip() | |
| attachments = data.get('attachments', []) # List of attachment IDs or data | |
| if not user_message and not attachments: | |
| return jsonify({"error": "Empty message and no attachments"}), 400 | |
| try: | |
| # Process attachments first if any | |
| attachment_results = [] | |
| safe_attachment_files = [] | |
| safe_to_proceed = True | |
| for attachment in attachments: | |
| attachment_id = attachment.get("id") | |
| if attachment_id and attachment_id in safe_attachments: | |
| stored_attachment = safe_attachments[attachment_id] | |
| attachment_results.append({ | |
| "id": attachment_id, | |
| "filename": stored_attachment["filename"], | |
| "is_safe": True, | |
| "analysis": stored_attachment["analysis"] | |
| }) | |
| # Prepare file for LLM | |
| safe_attachment_files.append({ | |
| "filename": stored_attachment["filename"], | |
| "content": stored_attachment["content"], | |
| "extension": stored_attachment["extension"] | |
| }) | |
| else: | |
| # Attachment not found or not safe | |
| safe_to_proceed = False | |
| attachment_results.append({ | |
| "id": attachment_id, | |
| "is_safe": False, | |
| "error": "Attachment not found or not safe" | |
| }) | |
| # Process the message with detailed backend only if attachments are safe | |
| if safe_to_proceed: | |
| result = detailed_backend.process_request_detailed(user_message, safe_attachment_files if safe_attachment_files else None) | |
| result["attachments"] = attachment_results | |
| # Clean up used attachments | |
| for attachment in attachments: | |
| attachment_id = attachment.get("id") | |
| if attachment_id in safe_attachments: | |
| del safe_attachments[attachment_id] | |
| else: | |
| result = { | |
| "message_id": str(uuid.uuid4()), | |
| "timestamp": datetime.now().isoformat(), | |
| "user_prompt": user_message, | |
| "is_safe": False, | |
| "final_response": "Request blocked due to unsafe attachments", | |
| "attachments": attachment_results, | |
| "total_latency_ms": 0 | |
| } | |
| # Store in session for history | |
| if 'chat_history' not in session: | |
| session['chat_history'] = [] | |
| session['chat_history'].append(result) | |
| return jsonify(result) | |
| except Exception as e: | |
| return jsonify({ | |
| "error": str(e), | |
| "message": "An error occurred while processing your message" | |
| }), 500 | |
| def get_config(): | |
| """Get current system configuration""" | |
| return jsonify({ | |
| "llm_provider": config.LLM_PROVIDER, | |
| "ai_detection_enabled": config.AI_DETECTION_MODE["enabled"], | |
| "model_name": config.AI_DETECTION_MODE["attack_llm_config"].get("model_name", "unknown"), | |
| "output_guardrails": { | |
| name: guard_config.get("enabled", False) | |
| for name, guard_config in config.OUTPUT_GUARDRAILS_CONFIG.items() | |
| } | |
| }) | |
| def get_stats(): | |
| """Get session statistics""" | |
| history = session.get('chat_history', []) | |
| if not history: | |
| return jsonify({ | |
| "total_messages": 0, | |
| "avg_latency": 0, | |
| "blocks_count": 0, | |
| "pii_anonymizations": 0 | |
| }) | |
| total_messages = len(history) | |
| total_latency = sum(msg.get('total_latency_ms', 0) for msg in history) | |
| avg_latency = round(total_latency / total_messages, 1) if total_messages > 0 else 0 | |
| blocks_count = sum(1 for msg in history if not msg.get('is_safe', True)) | |
| pii_count = sum(1 for msg in history | |
| if msg.get('output_guardrails', {}).get('was_modified', False)) | |
| return jsonify({ | |
| "total_messages": total_messages, | |
| "avg_latency": avg_latency, | |
| "blocks_count": blocks_count, | |
| "pii_anonymizations": pii_count | |
| }) | |
| if __name__ == '__main__': | |
| print("="*60) | |
| print("π Guardrails Web Interface") | |
| print("π AI-powered attack detection with sleek UI") | |
| print("="*60) | |
| # Check if running on HF Spaces or locally | |
| port = int(os.environ.get('PORT', 7860)) | |
| host = '0.0.0.0' # Accept connections from any IP | |
| debug_mode = os.environ.get('DEBUG', 'false').lower() == 'true' | |
| if port == 7860: | |
| print("π Starting server for Hugging Face Spaces at http://0.0.0.0:7860") | |
| else: | |
| print(f"π Starting server at http://{host}:{port}") | |
| print("π‘ Press Ctrl+C to stop the server") | |
| print("="*60) | |
| app.run(debug=debug_mode, host=host, port=port) |