import chardet import pypdf import docx from pdf2image import convert_from_bytes import pytesseract from PIL import Image from typing import Tuple, List, Dict, Optional import json import os import re from datetime import datetime import spacy import nltk from nltk.tokenize import sent_tokenize from nltk.corpus import stopwords from pathlib import Path import streamlit as st import shutil import poppler class DocumentProcessor: def __init__(self, base_path: str = None): """Initialize Document Processor with proper data directory handling.""" self.base_path = self._setup_data_directories(base_path) self.ontology_path = os.path.join(self.base_path, "ontology", "legal_ontology.json") # Initialize NLP components self._initialize_nlp() # Ensure ontology exists self._ensure_ontology_exists() # Load ontology self.ontology = self._load_ontology() # Create processing directories self.processed_path = os.path.join(self.base_path, "processed") self.temp_path = os.path.join(self.base_path, "temp") os.makedirs(self.processed_path, exist_ok=True) os.makedirs(self.temp_path, exist_ok=True) def _setup_data_directories(self, base_path: Optional[str] = None) -> str: """Set up data directories with error handling.""" data_path = base_path or os.path.join(os.getcwd(), "data") subdirs = ["ontology", "processed", "temp", "indexes"] for subdir in subdirs: os.makedirs(os.path.join(data_path, subdir), exist_ok=True) return data_path def _initialize_nlp(self): """Initialize NLP components.""" try: # Load spaCy model try: self.nlp = spacy.load("en_core_web_sm") except OSError: st.info("Downloading spaCy model...") os.system("python -m spacy download en_core_web_sm") self.nlp = spacy.load("en_core_web_sm") # Initialize NLTK nltk_data_dir = os.path.join(self.base_path, "nltk_data") os.makedirs(nltk_data_dir, exist_ok=True) nltk.data.path.append(nltk_data_dir) required_resources = ['punkt', 'averaged_perceptron_tagger', 'maxent_ne_chunker', 'words', 'stopwords'] for resource in required_resources: try: nltk.download(resource, download_dir=nltk_data_dir, quiet=True) except Exception as e: st.warning(f"Could not download {resource}: {str(e)}") self.stop_words = set(nltk.corpus.stopwords.words('english')) except Exception as e: st.error(f"Error initializing NLP components: {str(e)}") raise def _ensure_ontology_exists(self): """Ensure the legal ontology file exists, create if not.""" if not os.path.exists(self.ontology_path): default_ontology = { "@graph": [ { "@id": "concept:Contract", "@type": "vocab:LegalConcept", "rdfs:label": "Contract", "rdfs:comment": "A legally binding agreement between parties", "vocab:relatedConcepts": ["Offer", "Acceptance", "Consideration"] }, { "@id": "concept:Judgment", "@type": "vocab:LegalConcept", "rdfs:label": "Judgment", "rdfs:comment": "A court's final determination", "vocab:relatedConcepts": ["Court Order", "Decision", "Ruling"] } ] } with open(self.ontology_path, 'w') as f: json.dump(default_ontology, f, indent=2) def _load_ontology(self) -> Dict: """Load legal ontology with error handling.""" try: with open(self.ontology_path, 'r') as f: return json.load(f) except Exception as e: st.error(f"Error loading ontology: {str(e)}") return {"@graph": []} def process_and_tag_document(self, file) -> Tuple[str, List[Dict], Dict]: """Process document and generate metadata.""" try: doc_id = datetime.now().strftime('%Y%m%d_%H%M%S') doc_dir = os.path.join(self.processed_path, doc_id) os.makedirs(doc_dir, exist_ok=True) original_path = os.path.join(doc_dir, "original" + Path(file.name).suffix) with open(original_path, 'wb') as f: f.write(file.getvalue()) # Extract text and process document text, chunks = self.process_document(original_path) metadata = self._extract_metadata(text, file.name) metadata.update({"doc_id": doc_id, "original_path": original_path}) # Save processed data self._save_processed_data(doc_dir, text, chunks, metadata) return text, chunks, metadata except Exception as e: st.error(f"Error in document processing pipeline: {str(e)}") raise def _tokenize_text(self, text: str) -> List[str]: """Tokenize text into sentences using NLTK.""" try: return sent_tokenize(text) except Exception: return [sentence.strip() for sentence in text.split('.') if sentence.strip()] def process_document(self, file_path: str) -> Tuple[str, List[Dict]]: """Process a document based on its type.""" file_type = Path(file_path).suffix.lower() if file_type == '.pdf': text = self._process_pdf(file_path) elif file_type == '.docx': text = self._process_docx(file_path) elif file_type in ['.txt', '.csv']: text = self._process_text(file_path) else: raise ValueError(f"Unsupported file type: {file_type}") chunks = self._create_chunks(text) return text, chunks def _process_pdf(self, file_path: str) -> str: try: # Try some common Poppler installation paths poppler_paths = [ "/usr/bin", "/usr/local/bin", "/opt/poppler/bin", "/Library/Frameworks/Poppler.framework/Versions/Current/bin", # for macOS ] # Find the first valid Poppler path for poppler_dir in poppler_paths: if os.path.exists(os.path.join(poppler_dir, "pdftoppm")): break else: raise ValueError("Poppler not found in any of the common installation paths.") # Update the PATH and LD_LIBRARY_PATH environment variables os.environ["PATH"] = f"{poppler_dir}:{os.environ['PATH']}" os.environ["LD_LIBRARY_PATH"] = f"{poppler_dir}:{os.environ.get('LD_LIBRARY_PATH', '')}" # Test the Poppler installation try: subprocess.check_output(["pdftoppm", "-v"]) st.info("Poppler is installed and in the PATH.") except (subprocess.CalledProcessError, FileNotFoundError): st.error("Unable to find Poppler. Please check the installation.") raise # Process the PDF file using pdf2image and Tesseract OCR images = convert_from_bytes(open(file_path, 'rb').read()) text = "" for page_num, image in enumerate(images, 1): st.info(f"Performing OCR on page {page_num}...") page_text = pytesseract.image_to_string(image) text += f"\n--- Page {page_num} ---\n{page_text}" return text except Exception as e: st.error(f"Error processing PDF: {str(e)}") raise def _process_docx(self, file_path: str) -> str: """Extract text from DOCX files.""" try: doc = docx.Document(file_path) return "\n".join(para.text for para in doc.paragraphs if para.text.strip()) except Exception as e: st.error(f"Error processing DOCX: {str(e)}") raise def _process_text(self, file_path: str) -> str: """Process plain text files.""" try: with open(file_path, 'rb') as f: raw_data = f.read() encoding = chardet.detect(raw_data).get('encoding', 'utf-8') return raw_data.decode(encoding) except Exception as e: st.error(f"Error processing text file: {str(e)}") raise def _create_chunks(self, text: str) -> List[Dict]: """Chunk text for further processing.""" sentences = self._tokenize_text(text) chunk_size = 500 chunks = [] current_chunk, current_length = [], 0 for sentence in sentences: if current_length + len(sentence) > chunk_size and current_chunk: chunks.append(self._process_chunk(' '.join(current_chunk), len(chunks))) current_chunk, current_length = [], 0 current_chunk.append(sentence) current_length += len(sentence) if current_chunk: chunks.append(self._process_chunk(' '.join(current_chunk), len(chunks))) return chunks def _link_to_ontology(self, text: str) -> List[Dict]: """Link text to legal ontology concepts.""" relevant_concepts = [] text_lower = text.lower() for concept in self.ontology.get("@graph", []): if "rdfs:label" not in concept: continue label = concept["rdfs:label"].lower() if label in text_lower: # Get surrounding context start_idx = text_lower.index(label) context_start = max(0, start_idx - 100) context_end = min(len(text), start_idx + len(label) + 100) relevant_concepts.append({ "concept": concept["rdfs:label"], "type": concept.get("@type", "Unknown"), "description": concept.get("rdfs:comment", ""), "context": text[context_start:context_end].strip(), "location": {"start": start_idx, "end": start_idx + len(label)} }) return relevant_concepts def _process_chunk(self, text: str, chunk_id: int) -> Dict: """Process individual chunks with NLP and ontology linking.""" doc = self.nlp(text) return { 'chunk_id': chunk_id, 'text': text, 'entities': [(ent.text, ent.label_) for ent in doc.ents], 'noun_phrases': [np.text for np in doc.noun_chunks], 'ontology_links': self._link_to_ontology(text) } def _extract_metadata(self, text: str, file_name: str) -> Dict: """Extract metadata from text.""" doc = self.nlp(text[:10000]) return { 'filename': file_name, 'file_type': Path(file_name).suffix.lower(), 'processed_at': datetime.now().isoformat(), 'entities': [(ent.text, ent.label_) for ent in doc.ents], 'document_type': 'Legal Document' } def _save_processed_data(self, doc_dir: str, text: str, chunks: List[Dict], metadata: Dict): """Save processed data to disk.""" with open(os.path.join(doc_dir, "processed.txt"), 'w', encoding='utf-8') as f: f.write(text) with open(os.path.join(doc_dir, "chunks.json"), 'w') as f: json.dump(chunks, f, indent=2) with open(os.path.join(doc_dir, "metadata.json"), 'w') as f: json.dump(metadata, f, indent=2) def cleanup(self): """Clean up temporary files.""" shutil.rmtree(self.temp_path, ignore_errors=True) os.makedirs(self.temp_path, exist_ok=True) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.cleanup()