zazaman's picture
Add multilingual translation support with Qwen3-0.6B-GGUF and optimize for Hugging Face Spaces deployment
a2e1879
# guardrails/attachments/docx_guardrail.py
import time
import json
from typing import Dict, Any, Tuple, List
from .base import AttachmentGuardrail
class DocxGuardrail(AttachmentGuardrail):
"""
Guardrail for Word documents (.docx).
Extracts text content using python-docx and analyzes each chunk for unsafe content.
"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.chunk_size = config.get("chunk_size", 500) # tokens per chunk
self.confidence_threshold = config.get("confidence_threshold", 0.8) # >80% confidence for blocking
self.max_file_size = config.get("max_file_size_mb", 25) * 1024 * 1024 # Convert MB to bytes (moderate limit for Word docs)
# Initialize the finetuned model for analysis
self.model_client = None
self._init_model()
# Initialize python-docx
self.docx_available = False
self._init_docx()
def _init_model(self):
"""Initialize the finetuned model client for text analysis (using shared model)"""
try:
from llm_clients.shared_models import shared_model_manager
self.model_client = shared_model_manager.get_finetuned_guard_client("zazaman/fmb")
if self.model_client:
print(f" 🔍 DOCX Guardrail: Using shared model zazaman/fmb")
else:
print(f" ⚠️ DOCX Guardrail: Could not get shared model")
except Exception as e:
print(f" ⚠️ DOCX Guardrail: Could not initialize shared model: {e}")
self.model_client = None
def _init_docx(self):
"""Initialize python-docx for Word document text extraction"""
try:
import docx # python-docx
self.docx_available = True
print(f" 📄 DOCX Guardrail: python-docx initialized successfully")
except ImportError:
print(f" ⚠️ DOCX Guardrail: python-docx not available. Install with: pip install python-docx")
self.docx_available = False
def get_supported_extensions(self) -> List[str]:
"""Return supported Word document file extensions"""
return ['.docx']
def process_file(self, file_path: str, file_content: bytes) -> Tuple[bool, Dict[str, Any]]:
"""
Process a Word document by extracting text, chunking, and analyzing each chunk for threats.
Args:
file_path: Path/name of the uploaded file
file_content: Raw bytes content of the file
Returns:
Tuple of (is_safe, analysis_details)
"""
start_time = time.time()
# Get basic file info
file_info = self.get_file_info(file_path, file_content)
analysis_details = {
**file_info,
"chunk_size": self.chunk_size,
"confidence_threshold": self.confidence_threshold,
"chunks_analyzed": 0,
"chunks_unsafe": 0,
"max_confidence": 0.0,
"analysis_time_ms": 0,
"chunks_details": [],
"model_used": "zazaman/fmb",
"paragraphs_processed": 0,
"text_length": 0
}
try:
# Check file size
if len(file_content) > self.max_file_size:
analysis_details["error"] = f"File too large: {file_info['size_kb']}KB > {self.max_file_size/1024/1024}MB"
return False, analysis_details
# Check if python-docx is available
if not self.docx_available:
analysis_details["error"] = "python-docx not available. Cannot process Word documents."
return False, analysis_details
# Check if model is available
if not self.model_client:
analysis_details["error"] = "Text analysis model not available"
return False, analysis_details
# Extract text from Word document
text_content, paragraphs_processed = self._extract_text_from_docx(file_content)
analysis_details["paragraphs_processed"] = paragraphs_processed
analysis_details["text_length"] = len(text_content)
if not text_content.strip():
analysis_details["warning"] = "No extractable text found in Word document"
return True, analysis_details
# Chunk the text
chunks = self._chunk_text(text_content)
analysis_details["chunks_analyzed"] = len(chunks)
if not chunks:
analysis_details["warning"] = "No processable content after chunking"
return True, analysis_details
# Analyze each chunk
unsafe_chunks = 0
max_confidence = 0.0
for i, chunk in enumerate(chunks):
chunk_start_time = time.time()
try:
# Analyze chunk with the finetuned model
response = self.model_client.generate_content(chunk)
# Parse the JSON response
ai_result = json.loads(response)
confidence = ai_result.get("confidence", 0.0)
safety_status = ai_result.get("safety_status", "unsafe")
attack_type = ai_result.get("attack_type", "unknown")
is_chunk_safe = safety_status.lower() == "safe"
chunk_latency = round((time.time() - chunk_start_time) * 1000, 1)
chunk_detail = {
"chunk_index": i,
"chunk_length": len(chunk),
"is_safe": is_chunk_safe,
"confidence": confidence,
"safety_status": safety_status,
"attack_type": attack_type,
"latency_ms": chunk_latency,
"preview": chunk[:100] + "..." if len(chunk) > 100 else chunk
}
analysis_details["chunks_details"].append(chunk_detail)
# Track statistics
max_confidence = max(max_confidence, confidence)
# Check if chunk is unsafe with high confidence (>80%)
if not is_chunk_safe and confidence > self.confidence_threshold:
unsafe_chunks += 1
chunk_detail["flagged"] = True
print(f" 🚨 DOCX Guardrail: Unsafe chunk {i+1}/{len(chunks)} detected (confidence: {confidence:.3f})")
except Exception as e:
# If we can't analyze a chunk, treat it as unsafe
chunk_detail = {
"chunk_index": i,
"chunk_length": len(chunk),
"is_safe": False,
"error": str(e),
"latency_ms": round((time.time() - chunk_start_time) * 1000, 1),
"preview": chunk[:100] + "..." if len(chunk) > 100 else chunk
}
analysis_details["chunks_details"].append(chunk_detail)
unsafe_chunks += 1
analysis_details["chunks_unsafe"] = unsafe_chunks
analysis_details["max_confidence"] = max_confidence
analysis_details["analysis_time_ms"] = round((time.time() - start_time) * 1000, 1)
# File is safe if no chunks were flagged as unsafe
is_file_safe = unsafe_chunks == 0
if not is_file_safe:
analysis_details["threat_summary"] = f"Detected {unsafe_chunks} unsafe chunks out of {len(chunks)} total chunks"
return is_file_safe, analysis_details
except Exception as e:
analysis_details["error"] = f"Unexpected error during Word document analysis: {str(e)}"
analysis_details["analysis_time_ms"] = round((time.time() - start_time) * 1000, 1)
return False, analysis_details
def _extract_text_from_docx(self, docx_content: bytes) -> Tuple[str, int]:
"""
Extract text content from Word document using python-docx.
Args:
docx_content: Raw bytes content of the Word document
Returns:
Tuple of (extracted_text, paragraphs_processed)
"""
try:
import docx
import io
# Open Word document from bytes
doc = docx.Document(io.BytesIO(docx_content))
extracted_text = ""
paragraphs_processed = 0
# Extract text from each paragraph
for paragraph in doc.paragraphs:
paragraph_text = paragraph.text.strip()
if paragraph_text: # Only add non-empty paragraphs
extracted_text += paragraph_text + "\n\n"
paragraphs_processed += 1
# Extract text from tables if any
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
cell_text = cell.text.strip()
if cell_text:
extracted_text += cell_text + " "
extracted_text += "\n"
return extracted_text.strip(), paragraphs_processed
except Exception as e:
raise Exception(f"Failed to extract text from Word document: {str(e)}")
def _chunk_text(self, text: str) -> List[str]:
"""
Chunk text into pieces of approximately chunk_size tokens.
Uses a simple word-based approximation (1 token ≈ 0.75 words).
"""
if not text.strip():
return []
# Approximate tokens using word count (1 token ≈ 0.75 words)
# So for 500 tokens, we want ~667 words
words_per_chunk = int(self.chunk_size / 0.75)
# Split text into words
words = text.split()
if len(words) <= words_per_chunk:
# Text is small enough to be a single chunk
return [text]
chunks = []
current_chunk_words = []
for word in words:
current_chunk_words.append(word)
# If we've reached the target chunk size, create a chunk
if len(current_chunk_words) >= words_per_chunk:
chunk_text = ' '.join(current_chunk_words)
chunks.append(chunk_text)
current_chunk_words = []
# Add remaining words as the last chunk
if current_chunk_words:
chunk_text = ' '.join(current_chunk_words)
chunks.append(chunk_text)
return chunks
def _estimate_tokens(self, text: str) -> int:
"""Estimate token count using word count approximation"""
words = len(text.split())
return int(words * 0.75) # Rough approximation: 1 token ≈ 0.75 words