Spaces:
Sleeping
Sleeping
File size: 6,048 Bytes
a2e1879 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# guardrails/attachments/base.py
from abc import ABC, abstractmethod
from typing import Dict, Any, Tuple, List
import os
class AttachmentGuardrail(ABC):
"""
Abstract base class for attachment guardrails.
Each file type should have its own guardrail implementation.
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.supported_extensions = self.get_supported_extensions()
@abstractmethod
def get_supported_extensions(self) -> List[str]:
"""Return list of supported file extensions (e.g., ['.txt', '.md'])"""
pass
@abstractmethod
def process_file(self, file_path: str, file_content: bytes) -> Tuple[bool, Dict[str, Any]]:
"""
Process the uploaded file and return safety assessment.
Args:
file_path: Path/name of the uploaded file
file_content: Raw bytes content of the file
Returns:
Tuple of (is_safe, analysis_details)
- is_safe: Boolean indicating if file is safe
- analysis_details: Dict containing detailed analysis results
"""
pass
def can_handle_file(self, file_path: str) -> bool:
"""Check if this guardrail can handle the given file type"""
file_ext = os.path.splitext(file_path.lower())[1]
return file_ext in self.supported_extensions
def get_file_info(self, file_path: str, file_content: bytes) -> Dict[str, Any]:
"""Extract basic file information"""
file_ext = os.path.splitext(file_path.lower())[1]
file_size = len(file_content)
return {
"filename": os.path.basename(file_path),
"extension": file_ext,
"size_bytes": file_size,
"size_kb": round(file_size / 1024, 2),
}
class AttachmentGuardrailManager:
"""
Manager class that handles multiple attachment guardrails and routes
files to the appropriate guardrail based on file extension.
"""
def __init__(self, guardrail_configs: Dict[str, Dict[str, Any]]):
self.guardrails: List[AttachmentGuardrail] = []
self.extension_map: Dict[str, AttachmentGuardrail] = {}
print("\nInitializing Attachment Guardrail Manager...")
# Load and initialize guardrails
for name, config in guardrail_configs.items():
if config.get("enabled", False):
try:
# Import the guardrail module
module = __import__(f"guardrails.attachments.{name}", fromlist=[name])
# Get the class name (e.g., txt_guardrail -> TxtGuardrail)
class_name = self._get_class_name(name)
guardrail_class = getattr(module, class_name)
# Initialize the guardrail
guardrail_instance = guardrail_class(config)
self.guardrails.append(guardrail_instance)
# Map file extensions to this guardrail
for ext in guardrail_instance.supported_extensions:
self.extension_map[ext] = guardrail_instance
print(f" ✅ Loaded attachment guardrail: {name} (extensions: {guardrail_instance.supported_extensions})")
except Exception as e:
print(f" ⚠️ Could not load attachment guardrail '{name}': {e}")
def _get_class_name(self, module_name: str) -> str:
"""Convert module name to class name (e.g., txt_guardrail -> TxtGuardrail)"""
return ''.join(word.capitalize() for word in module_name.split('_'))
def process_attachment(self, file_path: str, file_content: bytes) -> Tuple[bool, Dict[str, Any]]:
"""
Process an attachment through the appropriate guardrail.
Args:
file_path: Name/path of the uploaded file
file_content: Raw bytes content of the file
Returns:
Tuple of (is_safe, analysis_details)
"""
file_ext = os.path.splitext(file_path.lower())[1]
# Check if we have a guardrail for this file type
if file_ext not in self.extension_map:
return False, {
"error": f"Unsupported file type: {file_ext}",
"supported_extensions": list(self.extension_map.keys()),
"filename": os.path.basename(file_path),
"extension": file_ext,
"size_bytes": len(file_content)
}
# Process with the appropriate guardrail
guardrail = self.extension_map[file_ext]
try:
is_safe, analysis = guardrail.process_file(file_path, file_content)
# Add manager metadata
analysis["guardrail_used"] = guardrail.__class__.__name__
analysis["file_extension"] = file_ext
return is_safe, analysis
except Exception as e:
return False, {
"error": f"Error processing file with {guardrail.__class__.__name__}: {str(e)}",
"filename": os.path.basename(file_path),
"extension": file_ext,
"size_bytes": len(file_content)
}
def get_supported_extensions(self) -> List[str]:
"""Get list of all supported file extensions"""
return list(self.extension_map.keys())
def get_guardrail_info(self) -> Dict[str, Dict[str, Any]]:
"""Get information about loaded guardrails"""
info = {}
for guardrail in self.guardrails:
class_name = guardrail.__class__.__name__
info[class_name] = {
"supported_extensions": guardrail.supported_extensions,
"config": guardrail.config
}
return info |