Spaces:
Sleeping
Sleeping
| # guardrails/attachments/base.py | |
| from abc import ABC, abstractmethod | |
| from typing import Dict, Any, Tuple, List | |
| import os | |
| class AttachmentGuardrail(ABC): | |
| """ | |
| Abstract base class for attachment guardrails. | |
| Each file type should have its own guardrail implementation. | |
| """ | |
| def __init__(self, config: Dict[str, Any]): | |
| self.config = config | |
| self.supported_extensions = self.get_supported_extensions() | |
| def get_supported_extensions(self) -> List[str]: | |
| """Return list of supported file extensions (e.g., ['.txt', '.md'])""" | |
| pass | |
| def process_file(self, file_path: str, file_content: bytes) -> Tuple[bool, Dict[str, Any]]: | |
| """ | |
| Process the uploaded file and return safety assessment. | |
| Args: | |
| file_path: Path/name of the uploaded file | |
| file_content: Raw bytes content of the file | |
| Returns: | |
| Tuple of (is_safe, analysis_details) | |
| - is_safe: Boolean indicating if file is safe | |
| - analysis_details: Dict containing detailed analysis results | |
| """ | |
| pass | |
| def can_handle_file(self, file_path: str) -> bool: | |
| """Check if this guardrail can handle the given file type""" | |
| file_ext = os.path.splitext(file_path.lower())[1] | |
| return file_ext in self.supported_extensions | |
| def get_file_info(self, file_path: str, file_content: bytes) -> Dict[str, Any]: | |
| """Extract basic file information""" | |
| file_ext = os.path.splitext(file_path.lower())[1] | |
| file_size = len(file_content) | |
| return { | |
| "filename": os.path.basename(file_path), | |
| "extension": file_ext, | |
| "size_bytes": file_size, | |
| "size_kb": round(file_size / 1024, 2), | |
| } | |
| class AttachmentGuardrailManager: | |
| """ | |
| Manager class that handles multiple attachment guardrails and routes | |
| files to the appropriate guardrail based on file extension. | |
| """ | |
| def __init__(self, guardrail_configs: Dict[str, Dict[str, Any]]): | |
| self.guardrails: List[AttachmentGuardrail] = [] | |
| self.extension_map: Dict[str, AttachmentGuardrail] = {} | |
| print("\nInitializing Attachment Guardrail Manager...") | |
| # Load and initialize guardrails | |
| for name, config in guardrail_configs.items(): | |
| if config.get("enabled", False): | |
| try: | |
| # Import the guardrail module | |
| module = __import__(f"guardrails.attachments.{name}", fromlist=[name]) | |
| # Get the class name (e.g., txt_guardrail -> TxtGuardrail) | |
| class_name = self._get_class_name(name) | |
| guardrail_class = getattr(module, class_name) | |
| # Initialize the guardrail | |
| guardrail_instance = guardrail_class(config) | |
| self.guardrails.append(guardrail_instance) | |
| # Map file extensions to this guardrail | |
| for ext in guardrail_instance.supported_extensions: | |
| self.extension_map[ext] = guardrail_instance | |
| print(f" ✅ Loaded attachment guardrail: {name} (extensions: {guardrail_instance.supported_extensions})") | |
| except Exception as e: | |
| print(f" ⚠️ Could not load attachment guardrail '{name}': {e}") | |
| def _get_class_name(self, module_name: str) -> str: | |
| """Convert module name to class name (e.g., txt_guardrail -> TxtGuardrail)""" | |
| return ''.join(word.capitalize() for word in module_name.split('_')) | |
| def process_attachment(self, file_path: str, file_content: bytes) -> Tuple[bool, Dict[str, Any]]: | |
| """ | |
| Process an attachment through the appropriate guardrail. | |
| Args: | |
| file_path: Name/path of the uploaded file | |
| file_content: Raw bytes content of the file | |
| Returns: | |
| Tuple of (is_safe, analysis_details) | |
| """ | |
| file_ext = os.path.splitext(file_path.lower())[1] | |
| # Check if we have a guardrail for this file type | |
| if file_ext not in self.extension_map: | |
| return False, { | |
| "error": f"Unsupported file type: {file_ext}", | |
| "supported_extensions": list(self.extension_map.keys()), | |
| "filename": os.path.basename(file_path), | |
| "extension": file_ext, | |
| "size_bytes": len(file_content) | |
| } | |
| # Process with the appropriate guardrail | |
| guardrail = self.extension_map[file_ext] | |
| try: | |
| is_safe, analysis = guardrail.process_file(file_path, file_content) | |
| # Add manager metadata | |
| analysis["guardrail_used"] = guardrail.__class__.__name__ | |
| analysis["file_extension"] = file_ext | |
| return is_safe, analysis | |
| except Exception as e: | |
| return False, { | |
| "error": f"Error processing file with {guardrail.__class__.__name__}: {str(e)}", | |
| "filename": os.path.basename(file_path), | |
| "extension": file_ext, | |
| "size_bytes": len(file_content) | |
| } | |
| def get_supported_extensions(self) -> List[str]: | |
| """Get list of all supported file extensions""" | |
| return list(self.extension_map.keys()) | |
| def get_guardrail_info(self) -> Dict[str, Dict[str, Any]]: | |
| """Get information about loaded guardrails""" | |
| info = {} | |
| for guardrail in self.guardrails: | |
| class_name = guardrail.__class__.__name__ | |
| info[class_name] = { | |
| "supported_extensions": guardrail.supported_extensions, | |
| "config": guardrail.config | |
| } | |
| return info |