SuoMoto.AI_Demo / utils /document_processor.py
cryogenic22's picture
Update utils/document_processor.py
de83dc0 verified
import chardet
import pypdf
import docx
from pdf2image import convert_from_bytes
import pytesseract
from PIL import Image
from typing import Tuple, List, Dict, Optional
import json
import os
import re
from datetime import datetime
import spacy
import nltk
from nltk.tokenize import sent_tokenize
from nltk.corpus import stopwords
from pathlib import Path
import streamlit as st
import shutil
import poppler
class DocumentProcessor:
def __init__(self, base_path: str = None):
"""Initialize Document Processor with proper data directory handling."""
self.base_path = self._setup_data_directories(base_path)
self.ontology_path = os.path.join(self.base_path, "ontology", "legal_ontology.json")
# Initialize NLP components
self._initialize_nlp()
# Ensure ontology exists
self._ensure_ontology_exists()
# Load ontology
self.ontology = self._load_ontology()
# Create processing directories
self.processed_path = os.path.join(self.base_path, "processed")
self.temp_path = os.path.join(self.base_path, "temp")
os.makedirs(self.processed_path, exist_ok=True)
os.makedirs(self.temp_path, exist_ok=True)
def _setup_data_directories(self, base_path: Optional[str] = None) -> str:
"""Set up data directories with error handling."""
data_path = base_path or os.path.join(os.getcwd(), "data")
subdirs = ["ontology", "processed", "temp", "indexes"]
for subdir in subdirs:
os.makedirs(os.path.join(data_path, subdir), exist_ok=True)
return data_path
def _initialize_nlp(self):
"""Initialize NLP components."""
try:
# Load spaCy model
try:
self.nlp = spacy.load("en_core_web_sm")
except OSError:
st.info("Downloading spaCy model...")
os.system("python -m spacy download en_core_web_sm")
self.nlp = spacy.load("en_core_web_sm")
# Initialize NLTK
nltk_data_dir = os.path.join(self.base_path, "nltk_data")
os.makedirs(nltk_data_dir, exist_ok=True)
nltk.data.path.append(nltk_data_dir)
required_resources = ['punkt', 'averaged_perceptron_tagger', 'maxent_ne_chunker', 'words', 'stopwords']
for resource in required_resources:
try:
nltk.download(resource, download_dir=nltk_data_dir, quiet=True)
except Exception as e:
st.warning(f"Could not download {resource}: {str(e)}")
self.stop_words = set(nltk.corpus.stopwords.words('english'))
except Exception as e:
st.error(f"Error initializing NLP components: {str(e)}")
raise
def _ensure_ontology_exists(self):
"""Ensure the legal ontology file exists, create if not."""
if not os.path.exists(self.ontology_path):
default_ontology = {
"@graph": [
{
"@id": "concept:Contract",
"@type": "vocab:LegalConcept",
"rdfs:label": "Contract",
"rdfs:comment": "A legally binding agreement between parties",
"vocab:relatedConcepts": ["Offer", "Acceptance", "Consideration"]
},
{
"@id": "concept:Judgment",
"@type": "vocab:LegalConcept",
"rdfs:label": "Judgment",
"rdfs:comment": "A court's final determination",
"vocab:relatedConcepts": ["Court Order", "Decision", "Ruling"]
}
]
}
with open(self.ontology_path, 'w') as f:
json.dump(default_ontology, f, indent=2)
def _load_ontology(self) -> Dict:
"""Load legal ontology with error handling."""
try:
with open(self.ontology_path, 'r') as f:
return json.load(f)
except Exception as e:
st.error(f"Error loading ontology: {str(e)}")
return {"@graph": []}
def process_and_tag_document(self, file) -> Tuple[str, List[Dict], Dict]:
"""Process document and generate metadata."""
try:
doc_id = datetime.now().strftime('%Y%m%d_%H%M%S')
doc_dir = os.path.join(self.processed_path, doc_id)
os.makedirs(doc_dir, exist_ok=True)
original_path = os.path.join(doc_dir, "original" + Path(file.name).suffix)
with open(original_path, 'wb') as f:
f.write(file.getvalue())
# Extract text and process document
text, chunks = self.process_document(original_path)
metadata = self._extract_metadata(text, file.name)
metadata.update({"doc_id": doc_id, "original_path": original_path})
# Save processed data
self._save_processed_data(doc_dir, text, chunks, metadata)
return text, chunks, metadata
except Exception as e:
st.error(f"Error in document processing pipeline: {str(e)}")
raise
def _tokenize_text(self, text: str) -> List[str]:
"""Tokenize text into sentences using NLTK."""
try:
return sent_tokenize(text)
except Exception:
return [sentence.strip() for sentence in text.split('.') if sentence.strip()]
def process_document(self, file_path: str) -> Tuple[str, List[Dict]]:
"""Process a document based on its type."""
file_type = Path(file_path).suffix.lower()
if file_type == '.pdf':
text = self._process_pdf(file_path)
elif file_type == '.docx':
text = self._process_docx(file_path)
elif file_type in ['.txt', '.csv']:
text = self._process_text(file_path)
else:
raise ValueError(f"Unsupported file type: {file_type}")
chunks = self._create_chunks(text)
return text, chunks
def _process_pdf(self, file_path: str) -> str:
try:
# Try some common Poppler installation paths
poppler_paths = [
"/usr/bin",
"/usr/local/bin",
"/opt/poppler/bin",
"/Library/Frameworks/Poppler.framework/Versions/Current/bin", # for macOS
]
# Find the first valid Poppler path
for poppler_dir in poppler_paths:
if os.path.exists(os.path.join(poppler_dir, "pdftoppm")):
break
else:
raise ValueError("Poppler not found in any of the common installation paths.")
# Update the PATH and LD_LIBRARY_PATH environment variables
os.environ["PATH"] = f"{poppler_dir}:{os.environ['PATH']}"
os.environ["LD_LIBRARY_PATH"] = f"{poppler_dir}:{os.environ.get('LD_LIBRARY_PATH', '')}"
# Test the Poppler installation
try:
subprocess.check_output(["pdftoppm", "-v"])
st.info("Poppler is installed and in the PATH.")
except (subprocess.CalledProcessError, FileNotFoundError):
st.error("Unable to find Poppler. Please check the installation.")
raise
# Process the PDF file using pdf2image and Tesseract OCR
images = convert_from_bytes(open(file_path, 'rb').read())
text = ""
for page_num, image in enumerate(images, 1):
st.info(f"Performing OCR on page {page_num}...")
page_text = pytesseract.image_to_string(image)
text += f"\n--- Page {page_num} ---\n{page_text}"
return text
except Exception as e:
st.error(f"Error processing PDF: {str(e)}")
raise
def _process_docx(self, file_path: str) -> str:
"""Extract text from DOCX files."""
try:
doc = docx.Document(file_path)
return "\n".join(para.text for para in doc.paragraphs if para.text.strip())
except Exception as e:
st.error(f"Error processing DOCX: {str(e)}")
raise
def _process_text(self, file_path: str) -> str:
"""Process plain text files."""
try:
with open(file_path, 'rb') as f:
raw_data = f.read()
encoding = chardet.detect(raw_data).get('encoding', 'utf-8')
return raw_data.decode(encoding)
except Exception as e:
st.error(f"Error processing text file: {str(e)}")
raise
def _create_chunks(self, text: str) -> List[Dict]:
"""Chunk text for further processing."""
sentences = self._tokenize_text(text)
chunk_size = 500
chunks = []
current_chunk, current_length = [], 0
for sentence in sentences:
if current_length + len(sentence) > chunk_size and current_chunk:
chunks.append(self._process_chunk(' '.join(current_chunk), len(chunks)))
current_chunk, current_length = [], 0
current_chunk.append(sentence)
current_length += len(sentence)
if current_chunk:
chunks.append(self._process_chunk(' '.join(current_chunk), len(chunks)))
return chunks
def _link_to_ontology(self, text: str) -> List[Dict]:
"""Link text to legal ontology concepts."""
relevant_concepts = []
text_lower = text.lower()
for concept in self.ontology.get("@graph", []):
if "rdfs:label" not in concept:
continue
label = concept["rdfs:label"].lower()
if label in text_lower:
# Get surrounding context
start_idx = text_lower.index(label)
context_start = max(0, start_idx - 100)
context_end = min(len(text), start_idx + len(label) + 100)
relevant_concepts.append({
"concept": concept["rdfs:label"],
"type": concept.get("@type", "Unknown"),
"description": concept.get("rdfs:comment", ""),
"context": text[context_start:context_end].strip(),
"location": {"start": start_idx, "end": start_idx + len(label)}
})
return relevant_concepts
def _process_chunk(self, text: str, chunk_id: int) -> Dict:
"""Process individual chunks with NLP and ontology linking."""
doc = self.nlp(text)
return {
'chunk_id': chunk_id,
'text': text,
'entities': [(ent.text, ent.label_) for ent in doc.ents],
'noun_phrases': [np.text for np in doc.noun_chunks],
'ontology_links': self._link_to_ontology(text)
}
def _extract_metadata(self, text: str, file_name: str) -> Dict:
"""Extract metadata from text."""
doc = self.nlp(text[:10000])
return {
'filename': file_name,
'file_type': Path(file_name).suffix.lower(),
'processed_at': datetime.now().isoformat(),
'entities': [(ent.text, ent.label_) for ent in doc.ents],
'document_type': 'Legal Document'
}
def _save_processed_data(self, doc_dir: str, text: str, chunks: List[Dict], metadata: Dict):
"""Save processed data to disk."""
with open(os.path.join(doc_dir, "processed.txt"), 'w', encoding='utf-8') as f:
f.write(text)
with open(os.path.join(doc_dir, "chunks.json"), 'w') as f:
json.dump(chunks, f, indent=2)
with open(os.path.join(doc_dir, "metadata.json"), 'w') as f:
json.dump(metadata, f, indent=2)
def cleanup(self):
"""Clean up temporary files."""
shutil.rmtree(self.temp_path, ignore_errors=True)
os.makedirs(self.temp_path, exist_ok=True)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()