File size: 6,048 Bytes
a2e1879
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# guardrails/attachments/base.py
from abc import ABC, abstractmethod
from typing import Dict, Any, Tuple, List
import os


class AttachmentGuardrail(ABC):
    """
    Abstract base class for attachment guardrails.
    Each file type should have its own guardrail implementation.
    """
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.supported_extensions = self.get_supported_extensions()
        
    @abstractmethod
    def get_supported_extensions(self) -> List[str]:
        """Return list of supported file extensions (e.g., ['.txt', '.md'])"""
        pass
        
    @abstractmethod
    def process_file(self, file_path: str, file_content: bytes) -> Tuple[bool, Dict[str, Any]]:
        """
        Process the uploaded file and return safety assessment.
        
        Args:
            file_path: Path/name of the uploaded file
            file_content: Raw bytes content of the file
            
        Returns:
            Tuple of (is_safe, analysis_details)
            - is_safe: Boolean indicating if file is safe
            - analysis_details: Dict containing detailed analysis results
        """
        pass
        
    def can_handle_file(self, file_path: str) -> bool:
        """Check if this guardrail can handle the given file type"""
        file_ext = os.path.splitext(file_path.lower())[1]
        return file_ext in self.supported_extensions
        
    def get_file_info(self, file_path: str, file_content: bytes) -> Dict[str, Any]:
        """Extract basic file information"""
        file_ext = os.path.splitext(file_path.lower())[1]
        file_size = len(file_content)
        
        return {
            "filename": os.path.basename(file_path),
            "extension": file_ext,
            "size_bytes": file_size,
            "size_kb": round(file_size / 1024, 2),
        }


class AttachmentGuardrailManager:
    """
    Manager class that handles multiple attachment guardrails and routes
    files to the appropriate guardrail based on file extension.
    """
    
    def __init__(self, guardrail_configs: Dict[str, Dict[str, Any]]):
        self.guardrails: List[AttachmentGuardrail] = []
        self.extension_map: Dict[str, AttachmentGuardrail] = {}
        
        print("\nInitializing Attachment Guardrail Manager...")
        
        # Load and initialize guardrails
        for name, config in guardrail_configs.items():
            if config.get("enabled", False):
                try:
                    # Import the guardrail module
                    module = __import__(f"guardrails.attachments.{name}", fromlist=[name])
                    
                    # Get the class name (e.g., txt_guardrail -> TxtGuardrail)
                    class_name = self._get_class_name(name)
                    guardrail_class = getattr(module, class_name)
                    
                    # Initialize the guardrail
                    guardrail_instance = guardrail_class(config)
                    self.guardrails.append(guardrail_instance)
                    
                    # Map file extensions to this guardrail
                    for ext in guardrail_instance.supported_extensions:
                        self.extension_map[ext] = guardrail_instance
                        
                    print(f"   ✅ Loaded attachment guardrail: {name} (extensions: {guardrail_instance.supported_extensions})")
                    
                except Exception as e:
                    print(f"   ⚠️  Could not load attachment guardrail '{name}': {e}")
    
    def _get_class_name(self, module_name: str) -> str:
        """Convert module name to class name (e.g., txt_guardrail -> TxtGuardrail)"""
        return ''.join(word.capitalize() for word in module_name.split('_'))
    
    def process_attachment(self, file_path: str, file_content: bytes) -> Tuple[bool, Dict[str, Any]]:
        """
        Process an attachment through the appropriate guardrail.
        
        Args:
            file_path: Name/path of the uploaded file
            file_content: Raw bytes content of the file
            
        Returns:
            Tuple of (is_safe, analysis_details)
        """
        file_ext = os.path.splitext(file_path.lower())[1]
        
        # Check if we have a guardrail for this file type
        if file_ext not in self.extension_map:
            return False, {
                "error": f"Unsupported file type: {file_ext}",
                "supported_extensions": list(self.extension_map.keys()),
                "filename": os.path.basename(file_path),
                "extension": file_ext,
                "size_bytes": len(file_content)
            }
        
        # Process with the appropriate guardrail
        guardrail = self.extension_map[file_ext]
        
        try:
            is_safe, analysis = guardrail.process_file(file_path, file_content)
            
            # Add manager metadata
            analysis["guardrail_used"] = guardrail.__class__.__name__
            analysis["file_extension"] = file_ext
            
            return is_safe, analysis
            
        except Exception as e:
            return False, {
                "error": f"Error processing file with {guardrail.__class__.__name__}: {str(e)}",
                "filename": os.path.basename(file_path),
                "extension": file_ext,
                "size_bytes": len(file_content)
            }
    
    def get_supported_extensions(self) -> List[str]:
        """Get list of all supported file extensions"""
        return list(self.extension_map.keys())
    
    def get_guardrail_info(self) -> Dict[str, Dict[str, Any]]:
        """Get information about loaded guardrails"""
        info = {}
        for guardrail in self.guardrails:
            class_name = guardrail.__class__.__name__
            info[class_name] = {
                "supported_extensions": guardrail.supported_extensions,
                "config": guardrail.config
            }
        return info