import chardet import pypdf import docx from pdf2image import convert_from_bytes import pytesseract from PIL import Image from typing import Tuple, List, Dict, Optional import json import os import re from datetime import datetime import spacy import nltk from nltk.tokenize import sent_tokenize from nltk.corpus import stopwords from pathlib import Path import streamlit as st import shutil import poppler class DocumentProcessor: def __init__(self, base_path: str = None): """Initialize Document Processor with proper data directory handling.""" self.base_path = self._setup_data_directories(base_path) self.ontology_path = os.path.join(self.base_path, "ontology", "legal_ontology.json") # Initialize NLP components self._initialize_nlp() # Ensure ontology exists self._ensure_ontology_exists() # Load ontology self.ontology = self._load_ontology() # Create processing directories self.processed_path = os.path.join(self.base_path, "processed") self.temp_path = os.path.join(self.base_path, "temp") os.makedirs(self.processed_path, exist_ok=True) os.makedirs(self.temp_path, exist_ok=True) def _setup_data_directories(self, base_path: Optional[str] = None) -> str: """Set up data directories with error handling.""" data_path = base_path or os.path.join(os.getcwd(), "data") subdirs = ["ontology", "processed", "temp", "indexes"] for subdir in subdirs: os.makedirs(os.path.join(data_path, subdir), exist_ok=True) return data_path def _initialize_nlp(self): """Initialize NLP components.""" try: # Load spaCy model try: self.nlp = spacy.load("en_core_web_sm") except OSError: st.info("Downloading spaCy model...") os.system("python -m spacy download en_core_web_sm") self.nlp = spacy.load("en_core_web_sm") # Initialize NLTK nltk_data_dir = os.path.join(self.base_path, "nltk_data") os.makedirs(nltk_data_dir, exist_ok=True) nltk.data.path.append(nltk_data_dir) required_resources = ['punkt', 'averaged_perceptron_tagger', 'maxent_ne_chunker', 'words', 'stopwords'] for resource in required_resources: try: nltk.download(resource, download_dir=nltk_data_dir, quiet=True) except Exception as e: st.warning(f"Could not download {resource}: {str(e)}") self.stop_words = set(nltk.corpus.stopwords.words('english')) except Exception as e: st.error(f"Error initializing NLP components: {str(e)}") raise def _ensure_ontology_exists(self): """Ensure the legal ontology file exists, create if not.""" if not os.path.exists(self.ontology_path): default_ontology = { "@graph": [ { "@id": "concept:Contract", "@type": "vocab:LegalConcept", "rdfs:label": "Contract", "rdfs:comment": "A legally binding agreement between parties", "vocab:relatedConcepts": ["Offer", "Acceptance", "Consideration"] }, { "@id": "concept:Judgment", "@type": "vocab:LegalConcept", "rdfs:label": "Judgment", "rdfs:comment": "A court's final determination", "vocab:relatedConcepts": ["Court Order", "Decision", "Ruling"] } ] } with open(self.ontology_path, 'w') as f: json.dump(default_ontology, f, indent=2) def _load_ontology(self) -> Dict: """Load legal ontology with error handling.""" try: with open(self.ontology_path, 'r') as f: return json.load(f) except Exception as e: st.error(f"Error loading ontology: {str(e)}") return {"@graph": []} def process_and_tag_document(self, file) -> Tuple[str, List[Dict], Dict]: """Process document and generate metadata.""" try: doc_id = datetime.now().strftime('%Y%m%d_%H%M%S') doc_dir = os.path.join(self.processed_path, doc_id) os.makedirs(doc_dir, exist_ok=True) original_path = os.path.join(doc_dir, "original" + Path(file.name).suffix) with open(original_path, 'wb') as f: f.write(file.getvalue()) # Extract text and process document text, chunks = self.process_document(original_path) metadata = self._extract_metadata(text, file.name) metadata.update({"doc_id": doc_id, "original_path": original_path}) # Save processed data self._save_processed_data(doc_dir, text, chunks, metadata) return text, chunks, metadata except Exception as e: st.error(f"Error in document processing pipeline: {str(e)}") raise def _tokenize_text(self, text: str) -> List[str]: """Tokenize text into sentences using NLTK.""" try: return sent_tokenize(text) except Exception: return [sentence.strip() for sentence in text.split('.') if sentence.strip()] def process_document(self, file_path: str) -> Tuple[str, List[Dict]]: """Process a document based on its type.""" file_type = Path(file_path).suffix.lower() if file_type == '.pdf': text = self._process_pdf(file_path) elif file_type == '.docx': text = self._process_docx(file_path) elif file_type in ['.txt', '.csv']: text = self._process_text(file_path) else: raise ValueError(f"Unsupported file type: {file_type}") chunks = self._create_chunks(text) return text, chunks def process_pdf(self, file_path: str) -> Optional[str]: try: # First verify if poppler is installed via package manager try: subprocess.check_output(['pdftoppm', '-v'], stderr=subprocess.STDOUT) st.success("✓ Poppler found on system") except (subprocess.CalledProcessError, FileNotFoundError): # If not in default path, check common installation directories poppler_paths = [ "/usr/bin", "/usr/local/bin", "/opt/poppler/bin", "/app/.apt/usr/bin", # Common HF Spaces path os.path.expanduser("~/.local/bin") ] for poppler_dir in poppler_paths: if os.path.exists(os.path.join(poppler_dir, "pdftoppm")): # Update PATH os.environ["PATH"] = f"{poppler_dir}:{os.environ.get('PATH', '')}" st.success(f"✓ Found Poppler in {poppler_dir}") break else: st.error("❌ Poppler not found. Please ensure 'poppler-utils' is in packages.txt") return None # Attempt to read and convert the PDF try: with open(file_path, 'rb') as pdf_file: pdf_bytes = pdf_file.read() # Convert PDF to images images = convert_from_bytes( pdf_bytes, dpi=300, # Increase DPI for better OCR quality fmt='png' ) # Process each page text = "" total_pages = len(images) for page_num, image in enumerate(images, 1): st.progress(page_num / total_pages) st.info(f"📄 Processing page {page_num}/{total_pages}") # Perform OCR with custom configuration page_text = pytesseract.image_to_string( image, config='--psm 3 --oem 3' # Use default page segmentation and OCR Engine Mode ) text += f"\n{'='*20} Page {page_num} {'='*20}\n{page_text}\n" return text.strip() except Exception as e: st.error(f"Error processing PDF content: {str(e)}") return None except Exception as e: st.error(f"Unexpected error: {str(e)}") return None def _process_docx(self, file_path: str) -> str: """Extract text from DOCX files.""" try: doc = docx.Document(file_path) return "\n".join(para.text for para in doc.paragraphs if para.text.strip()) except Exception as e: st.error(f"Error processing DOCX: {str(e)}") raise def _process_text(self, file_path: str) -> str: """Process plain text files.""" try: with open(file_path, 'rb') as f: raw_data = f.read() encoding = chardet.detect(raw_data).get('encoding', 'utf-8') return raw_data.decode(encoding) except Exception as e: st.error(f"Error processing text file: {str(e)}") raise def _create_chunks(self, text: str) -> List[Dict]: """Chunk text for further processing.""" sentences = self._tokenize_text(text) chunk_size = 500 chunks = [] current_chunk, current_length = [], 0 for sentence in sentences: if current_length + len(sentence) > chunk_size and current_chunk: chunks.append(self._process_chunk(' '.join(current_chunk), len(chunks))) current_chunk, current_length = [], 0 current_chunk.append(sentence) current_length += len(sentence) if current_chunk: chunks.append(self._process_chunk(' '.join(current_chunk), len(chunks))) return chunks def _link_to_ontology(self, text: str) -> List[Dict]: """Link text to legal ontology concepts.""" relevant_concepts = [] text_lower = text.lower() for concept in self.ontology.get("@graph", []): if "rdfs:label" not in concept: continue label = concept["rdfs:label"].lower() if label in text_lower: # Get surrounding context start_idx = text_lower.index(label) context_start = max(0, start_idx - 100) context_end = min(len(text), start_idx + len(label) + 100) relevant_concepts.append({ "concept": concept["rdfs:label"], "type": concept.get("@type", "Unknown"), "description": concept.get("rdfs:comment", ""), "context": text[context_start:context_end].strip(), "location": {"start": start_idx, "end": start_idx + len(label)} }) return relevant_concepts def _process_chunk(self, text: str, chunk_id: int) -> Dict: """Process individual chunks with NLP and ontology linking.""" doc = self.nlp(text) return { 'chunk_id': chunk_id, 'text': text, 'entities': [(ent.text, ent.label_) for ent in doc.ents], 'noun_phrases': [np.text for np in doc.noun_chunks], 'ontology_links': self._link_to_ontology(text) } def _extract_metadata(self, text: str, file_name: str) -> Dict: """Extract metadata from text.""" doc = self.nlp(text[:10000]) return { 'filename': file_name, 'file_type': Path(file_name).suffix.lower(), 'processed_at': datetime.now().isoformat(), 'entities': [(ent.text, ent.label_) for ent in doc.ents], 'document_type': 'Legal Document' } def _save_processed_data(self, doc_dir: str, text: str, chunks: List[Dict], metadata: Dict): """Save processed data to disk.""" with open(os.path.join(doc_dir, "processed.txt"), 'w', encoding='utf-8') as f: f.write(text) with open(os.path.join(doc_dir, "chunks.json"), 'w') as f: json.dump(chunks, f, indent=2) with open(os.path.join(doc_dir, "metadata.json"), 'w') as f: json.dump(metadata, f, indent=2) def cleanup(self): """Clean up temporary files.""" shutil.rmtree(self.temp_path, ignore_errors=True) os.makedirs(self.temp_path, exist_ok=True) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.cleanup()