Legal_AI_Agent / utils /document_processor.py
cryogenic22's picture
Update utils/document_processor.py
ca49412 verified
import chardet
import pypdf
import docx
from pdf2image import convert_from_bytes
import pytesseract
from PIL import Image
from typing import Tuple, List, Dict, Optional
import json
import os
import re
from datetime import datetime
import spacy
import nltk
from nltk.tokenize import sent_tokenize
from nltk.corpus import stopwords
from pathlib import Path
import streamlit as st
import shutil
import poppler
class DocumentProcessor:
def __init__(self, base_path: str = None):
"""Initialize Document Processor with proper data directory handling."""
self.base_path = self._setup_data_directories(base_path)
self.ontology_path = os.path.join(self.base_path, "ontology", "legal_ontology.json")
# Initialize NLP components
self._initialize_nlp()
# Ensure ontology exists
self._ensure_ontology_exists()
# Load ontology
self.ontology = self._load_ontology()
# Create processing directories
self.processed_path = os.path.join(self.base_path, "processed")
self.temp_path = os.path.join(self.base_path, "temp")
os.makedirs(self.processed_path, exist_ok=True)
os.makedirs(self.temp_path, exist_ok=True)
def _setup_data_directories(self, base_path: Optional[str] = None) -> str:
"""Set up data directories with error handling."""
data_path = base_path or os.path.join(os.getcwd(), "data")
subdirs = ["ontology", "processed", "temp", "indexes"]
for subdir in subdirs:
os.makedirs(os.path.join(data_path, subdir), exist_ok=True)
return data_path
def _initialize_nlp(self):
"""Initialize NLP components."""
try:
# Load spaCy model
try:
self.nlp = spacy.load("en_core_web_sm")
except OSError:
st.info("Downloading spaCy model...")
os.system("python -m spacy download en_core_web_sm")
self.nlp = spacy.load("en_core_web_sm")
# Initialize NLTK
nltk_data_dir = os.path.join(self.base_path, "nltk_data")
os.makedirs(nltk_data_dir, exist_ok=True)
nltk.data.path.append(nltk_data_dir)
required_resources = ['punkt', 'averaged_perceptron_tagger', 'maxent_ne_chunker', 'words', 'stopwords']
for resource in required_resources:
try:
nltk.download(resource, download_dir=nltk_data_dir, quiet=True)
except Exception as e:
st.warning(f"Could not download {resource}: {str(e)}")
self.stop_words = set(nltk.corpus.stopwords.words('english'))
except Exception as e:
st.error(f"Error initializing NLP components: {str(e)}")
raise
def _ensure_ontology_exists(self):
"""Ensure the legal ontology file exists, create if not."""
if not os.path.exists(self.ontology_path):
default_ontology = {
"@graph": [
{
"@id": "concept:Contract",
"@type": "vocab:LegalConcept",
"rdfs:label": "Contract",
"rdfs:comment": "A legally binding agreement between parties",
"vocab:relatedConcepts": ["Offer", "Acceptance", "Consideration"]
},
{
"@id": "concept:Judgment",
"@type": "vocab:LegalConcept",
"rdfs:label": "Judgment",
"rdfs:comment": "A court's final determination",
"vocab:relatedConcepts": ["Court Order", "Decision", "Ruling"]
}
]
}
with open(self.ontology_path, 'w') as f:
json.dump(default_ontology, f, indent=2)
def _load_ontology(self) -> Dict:
"""Load legal ontology with error handling."""
try:
with open(self.ontology_path, 'r') as f:
return json.load(f)
except Exception as e:
st.error(f"Error loading ontology: {str(e)}")
return {"@graph": []}
def process_and_tag_document(self, file) -> Tuple[str, List[Dict], Dict]:
"""Process document and generate metadata."""
try:
doc_id = datetime.now().strftime('%Y%m%d_%H%M%S')
doc_dir = os.path.join(self.processed_path, doc_id)
os.makedirs(doc_dir, exist_ok=True)
original_path = os.path.join(doc_dir, "original" + Path(file.name).suffix)
with open(original_path, 'wb') as f:
f.write(file.getvalue())
# Extract text and process document
text, chunks = self.process_document(original_path)
metadata = self._extract_metadata(text, file.name)
metadata.update({"doc_id": doc_id, "original_path": original_path})
# Save processed data
self._save_processed_data(doc_dir, text, chunks, metadata)
return text, chunks, metadata
except Exception as e:
st.error(f"Error in document processing pipeline: {str(e)}")
raise
def _tokenize_text(self, text: str) -> List[str]:
"""Tokenize text into sentences using NLTK."""
try:
return sent_tokenize(text)
except Exception:
return [sentence.strip() for sentence in text.split('.') if sentence.strip()]
def process_document(self, file_path: str) -> Tuple[str, List[Dict]]:
"""Process a document based on its type."""
file_type = Path(file_path).suffix.lower()
if file_type == '.pdf':
text = self._process_pdf(file_path)
elif file_type == '.docx':
text = self._process_docx(file_path)
elif file_type in ['.txt', '.csv']:
text = self._process_text(file_path)
else:
raise ValueError(f"Unsupported file type: {file_type}")
chunks = self._create_chunks(text)
return text, chunks
def process_pdf(self, file_path: str) -> Optional[str]:
try:
# First verify if poppler is installed via package manager
try:
subprocess.check_output(['pdftoppm', '-v'], stderr=subprocess.STDOUT)
st.success("✓ Poppler found on system")
except (subprocess.CalledProcessError, FileNotFoundError):
# If not in default path, check common installation directories
poppler_paths = [
"/usr/bin",
"/usr/local/bin",
"/opt/poppler/bin",
"/app/.apt/usr/bin", # Common HF Spaces path
os.path.expanduser("~/.local/bin")
]
for poppler_dir in poppler_paths:
if os.path.exists(os.path.join(poppler_dir, "pdftoppm")):
# Update PATH
os.environ["PATH"] = f"{poppler_dir}:{os.environ.get('PATH', '')}"
st.success(f"✓ Found Poppler in {poppler_dir}")
break
else:
st.error("❌ Poppler not found. Please ensure 'poppler-utils' is in packages.txt")
return None
# Attempt to read and convert the PDF
try:
with open(file_path, 'rb') as pdf_file:
pdf_bytes = pdf_file.read()
# Convert PDF to images
images = convert_from_bytes(
pdf_bytes,
dpi=300, # Increase DPI for better OCR quality
fmt='png'
)
# Process each page
text = ""
total_pages = len(images)
for page_num, image in enumerate(images, 1):
st.progress(page_num / total_pages)
st.info(f"📄 Processing page {page_num}/{total_pages}")
# Perform OCR with custom configuration
page_text = pytesseract.image_to_string(
image,
config='--psm 3 --oem 3' # Use default page segmentation and OCR Engine Mode
)
text += f"\n{'='*20} Page {page_num} {'='*20}\n{page_text}\n"
return text.strip()
except Exception as e:
st.error(f"Error processing PDF content: {str(e)}")
return None
except Exception as e:
st.error(f"Unexpected error: {str(e)}")
return None
def _process_docx(self, file_path: str) -> str:
"""Extract text from DOCX files."""
try:
doc = docx.Document(file_path)
return "\n".join(para.text for para in doc.paragraphs if para.text.strip())
except Exception as e:
st.error(f"Error processing DOCX: {str(e)}")
raise
def _process_text(self, file_path: str) -> str:
"""Process plain text files."""
try:
with open(file_path, 'rb') as f:
raw_data = f.read()
encoding = chardet.detect(raw_data).get('encoding', 'utf-8')
return raw_data.decode(encoding)
except Exception as e:
st.error(f"Error processing text file: {str(e)}")
raise
def _create_chunks(self, text: str) -> List[Dict]:
"""Chunk text for further processing."""
sentences = self._tokenize_text(text)
chunk_size = 500
chunks = []
current_chunk, current_length = [], 0
for sentence in sentences:
if current_length + len(sentence) > chunk_size and current_chunk:
chunks.append(self._process_chunk(' '.join(current_chunk), len(chunks)))
current_chunk, current_length = [], 0
current_chunk.append(sentence)
current_length += len(sentence)
if current_chunk:
chunks.append(self._process_chunk(' '.join(current_chunk), len(chunks)))
return chunks
def _link_to_ontology(self, text: str) -> List[Dict]:
"""Link text to legal ontology concepts."""
relevant_concepts = []
text_lower = text.lower()
for concept in self.ontology.get("@graph", []):
if "rdfs:label" not in concept:
continue
label = concept["rdfs:label"].lower()
if label in text_lower:
# Get surrounding context
start_idx = text_lower.index(label)
context_start = max(0, start_idx - 100)
context_end = min(len(text), start_idx + len(label) + 100)
relevant_concepts.append({
"concept": concept["rdfs:label"],
"type": concept.get("@type", "Unknown"),
"description": concept.get("rdfs:comment", ""),
"context": text[context_start:context_end].strip(),
"location": {"start": start_idx, "end": start_idx + len(label)}
})
return relevant_concepts
def _process_chunk(self, text: str, chunk_id: int) -> Dict:
"""Process individual chunks with NLP and ontology linking."""
doc = self.nlp(text)
return {
'chunk_id': chunk_id,
'text': text,
'entities': [(ent.text, ent.label_) for ent in doc.ents],
'noun_phrases': [np.text for np in doc.noun_chunks],
'ontology_links': self._link_to_ontology(text)
}
def _extract_metadata(self, text: str, file_name: str) -> Dict:
"""Extract metadata from text."""
doc = self.nlp(text[:10000])
return {
'filename': file_name,
'file_type': Path(file_name).suffix.lower(),
'processed_at': datetime.now().isoformat(),
'entities': [(ent.text, ent.label_) for ent in doc.ents],
'document_type': 'Legal Document'
}
def _save_processed_data(self, doc_dir: str, text: str, chunks: List[Dict], metadata: Dict):
"""Save processed data to disk."""
with open(os.path.join(doc_dir, "processed.txt"), 'w', encoding='utf-8') as f:
f.write(text)
with open(os.path.join(doc_dir, "chunks.json"), 'w') as f:
json.dump(chunks, f, indent=2)
with open(os.path.join(doc_dir, "metadata.json"), 'w') as f:
json.dump(metadata, f, indent=2)
def cleanup(self):
"""Clean up temporary files."""
shutil.rmtree(self.temp_path, ignore_errors=True)
os.makedirs(self.temp_path, exist_ok=True)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()