# guardrails/attachments/docx_guardrail.py import time import json from typing import Dict, Any, Tuple, List from .base import AttachmentGuardrail class DocxGuardrail(AttachmentGuardrail): """ Guardrail for Word documents (.docx). Extracts text content using python-docx and analyzes each chunk for unsafe content. """ def __init__(self, config: Dict[str, Any]): super().__init__(config) self.chunk_size = config.get("chunk_size", 500) # tokens per chunk self.confidence_threshold = config.get("confidence_threshold", 0.8) # >80% confidence for blocking self.max_file_size = config.get("max_file_size_mb", 25) * 1024 * 1024 # Convert MB to bytes (moderate limit for Word docs) # Initialize the finetuned model for analysis self.model_client = None self._init_model() # Initialize python-docx self.docx_available = False self._init_docx() def _init_model(self): """Initialize the finetuned model client for text analysis (using shared model)""" try: from llm_clients.shared_models import shared_model_manager self.model_client = shared_model_manager.get_finetuned_guard_client("zazaman/fmb") if self.model_client: print(f" 🔍 DOCX Guardrail: Using shared model zazaman/fmb") else: print(f" ⚠️ DOCX Guardrail: Could not get shared model") except Exception as e: print(f" ⚠️ DOCX Guardrail: Could not initialize shared model: {e}") self.model_client = None def _init_docx(self): """Initialize python-docx for Word document text extraction""" try: import docx # python-docx self.docx_available = True print(f" 📄 DOCX Guardrail: python-docx initialized successfully") except ImportError: print(f" ⚠️ DOCX Guardrail: python-docx not available. Install with: pip install python-docx") self.docx_available = False def get_supported_extensions(self) -> List[str]: """Return supported Word document file extensions""" return ['.docx'] def process_file(self, file_path: str, file_content: bytes) -> Tuple[bool, Dict[str, Any]]: """ Process a Word document by extracting text, chunking, and analyzing each chunk for threats. Args: file_path: Path/name of the uploaded file file_content: Raw bytes content of the file Returns: Tuple of (is_safe, analysis_details) """ start_time = time.time() # Get basic file info file_info = self.get_file_info(file_path, file_content) analysis_details = { **file_info, "chunk_size": self.chunk_size, "confidence_threshold": self.confidence_threshold, "chunks_analyzed": 0, "chunks_unsafe": 0, "max_confidence": 0.0, "analysis_time_ms": 0, "chunks_details": [], "model_used": "zazaman/fmb", "paragraphs_processed": 0, "text_length": 0 } try: # Check file size if len(file_content) > self.max_file_size: analysis_details["error"] = f"File too large: {file_info['size_kb']}KB > {self.max_file_size/1024/1024}MB" return False, analysis_details # Check if python-docx is available if not self.docx_available: analysis_details["error"] = "python-docx not available. Cannot process Word documents." return False, analysis_details # Check if model is available if not self.model_client: analysis_details["error"] = "Text analysis model not available" return False, analysis_details # Extract text from Word document text_content, paragraphs_processed = self._extract_text_from_docx(file_content) analysis_details["paragraphs_processed"] = paragraphs_processed analysis_details["text_length"] = len(text_content) if not text_content.strip(): analysis_details["warning"] = "No extractable text found in Word document" return True, analysis_details # Chunk the text chunks = self._chunk_text(text_content) analysis_details["chunks_analyzed"] = len(chunks) if not chunks: analysis_details["warning"] = "No processable content after chunking" return True, analysis_details # Analyze each chunk unsafe_chunks = 0 max_confidence = 0.0 for i, chunk in enumerate(chunks): chunk_start_time = time.time() try: # Analyze chunk with the finetuned model response = self.model_client.generate_content(chunk) # Parse the JSON response ai_result = json.loads(response) confidence = ai_result.get("confidence", 0.0) safety_status = ai_result.get("safety_status", "unsafe") attack_type = ai_result.get("attack_type", "unknown") is_chunk_safe = safety_status.lower() == "safe" chunk_latency = round((time.time() - chunk_start_time) * 1000, 1) chunk_detail = { "chunk_index": i, "chunk_length": len(chunk), "is_safe": is_chunk_safe, "confidence": confidence, "safety_status": safety_status, "attack_type": attack_type, "latency_ms": chunk_latency, "preview": chunk[:100] + "..." if len(chunk) > 100 else chunk } analysis_details["chunks_details"].append(chunk_detail) # Track statistics max_confidence = max(max_confidence, confidence) # Check if chunk is unsafe with high confidence (>80%) if not is_chunk_safe and confidence > self.confidence_threshold: unsafe_chunks += 1 chunk_detail["flagged"] = True print(f" 🚨 DOCX Guardrail: Unsafe chunk {i+1}/{len(chunks)} detected (confidence: {confidence:.3f})") except Exception as e: # If we can't analyze a chunk, treat it as unsafe chunk_detail = { "chunk_index": i, "chunk_length": len(chunk), "is_safe": False, "error": str(e), "latency_ms": round((time.time() - chunk_start_time) * 1000, 1), "preview": chunk[:100] + "..." if len(chunk) > 100 else chunk } analysis_details["chunks_details"].append(chunk_detail) unsafe_chunks += 1 analysis_details["chunks_unsafe"] = unsafe_chunks analysis_details["max_confidence"] = max_confidence analysis_details["analysis_time_ms"] = round((time.time() - start_time) * 1000, 1) # File is safe if no chunks were flagged as unsafe is_file_safe = unsafe_chunks == 0 if not is_file_safe: analysis_details["threat_summary"] = f"Detected {unsafe_chunks} unsafe chunks out of {len(chunks)} total chunks" return is_file_safe, analysis_details except Exception as e: analysis_details["error"] = f"Unexpected error during Word document analysis: {str(e)}" analysis_details["analysis_time_ms"] = round((time.time() - start_time) * 1000, 1) return False, analysis_details def _extract_text_from_docx(self, docx_content: bytes) -> Tuple[str, int]: """ Extract text content from Word document using python-docx. Args: docx_content: Raw bytes content of the Word document Returns: Tuple of (extracted_text, paragraphs_processed) """ try: import docx import io # Open Word document from bytes doc = docx.Document(io.BytesIO(docx_content)) extracted_text = "" paragraphs_processed = 0 # Extract text from each paragraph for paragraph in doc.paragraphs: paragraph_text = paragraph.text.strip() if paragraph_text: # Only add non-empty paragraphs extracted_text += paragraph_text + "\n\n" paragraphs_processed += 1 # Extract text from tables if any for table in doc.tables: for row in table.rows: for cell in row.cells: cell_text = cell.text.strip() if cell_text: extracted_text += cell_text + " " extracted_text += "\n" return extracted_text.strip(), paragraphs_processed except Exception as e: raise Exception(f"Failed to extract text from Word document: {str(e)}") def _chunk_text(self, text: str) -> List[str]: """ Chunk text into pieces of approximately chunk_size tokens. Uses a simple word-based approximation (1 token ≈ 0.75 words). """ if not text.strip(): return [] # Approximate tokens using word count (1 token ≈ 0.75 words) # So for 500 tokens, we want ~667 words words_per_chunk = int(self.chunk_size / 0.75) # Split text into words words = text.split() if len(words) <= words_per_chunk: # Text is small enough to be a single chunk return [text] chunks = [] current_chunk_words = [] for word in words: current_chunk_words.append(word) # If we've reached the target chunk size, create a chunk if len(current_chunk_words) >= words_per_chunk: chunk_text = ' '.join(current_chunk_words) chunks.append(chunk_text) current_chunk_words = [] # Add remaining words as the last chunk if current_chunk_words: chunk_text = ' '.join(current_chunk_words) chunks.append(chunk_text) return chunks def _estimate_tokens(self, text: str) -> int: """Estimate token count using word count approximation""" words = len(text.split()) return int(words * 0.75) # Rough approximation: 1 token ≈ 0.75 words