Spaces:
Build error
Build error
| import chardet | |
| import pypdf | |
| import docx | |
| from pdf2image import convert_from_bytes | |
| import pytesseract | |
| from PIL import Image | |
| from typing import Tuple, List, Dict, Optional | |
| import json | |
| import os | |
| import re | |
| from datetime import datetime | |
| import spacy | |
| import nltk | |
| from nltk.tokenize import sent_tokenize | |
| from nltk.corpus import stopwords | |
| from pathlib import Path | |
| import streamlit as st | |
| import shutil | |
| class DocumentProcessor: | |
| def __init__(self, base_path: str = None): | |
| """Initialize Document Processor with proper data directory handling.""" | |
| # Set up base paths | |
| self.base_path = self._setup_data_directories(base_path) | |
| self.ontology_path = os.path.join(self.base_path, "legal_ontology.json") | |
| # Initialize NLP components | |
| self._initialize_nlp() | |
| # Ensure ontology exists | |
| self._ensure_ontology_exists() | |
| # Load ontology | |
| self.ontology = self._load_ontology() | |
| # Create processing directories | |
| self.processed_path = os.path.join(self.base_path, "processed") | |
| self.temp_path = os.path.join(self.base_path, "temp") | |
| os.makedirs(self.processed_path, exist_ok=True) | |
| os.makedirs(self.temp_path, exist_ok=True) | |
| def _setup_data_directories(self, base_path: Optional[str] = None) -> str: | |
| """Set up data directories with error handling.""" | |
| if base_path: | |
| data_path = base_path | |
| else: | |
| # Check if running in Hugging Face Spaces | |
| if os.environ.get('SPACE_ID'): | |
| data_path = "/data" | |
| else: | |
| data_path = os.path.join(os.getcwd(), "data") | |
| # Create necessary subdirectories | |
| subdirs = ["ontology", "processed", "temp", "indexes"] | |
| for subdir in subdirs: | |
| os.makedirs(os.path.join(data_path, subdir), exist_ok=True) | |
| return data_path | |
| def _initialize_nlp(self): | |
| """Initialize NLP components with comprehensive error handling.""" | |
| try: | |
| # Initialize spaCy | |
| try: | |
| self.nlp = spacy.load("en_core_web_sm") | |
| except OSError: | |
| st.info("Downloading spaCy model...") | |
| os.system("python -m spacy download en_core_web_sm") | |
| self.nlp = spacy.load("en_core_web_sm") | |
| # Initialize NLTK components | |
| nltk_data_dir = os.path.join(self.base_path, "nltk_data") | |
| os.makedirs(nltk_data_dir, exist_ok=True) | |
| # Add custom NLTK data path | |
| nltk.data.path.append(nltk_data_dir) | |
| # Ensure all required NLTK resources are available | |
| required_resources = [ | |
| 'punkt', | |
| 'averaged_perceptron_tagger', | |
| 'maxent_ne_chunker', | |
| 'words', | |
| 'stopwords' | |
| ] | |
| for resource in required_resources: | |
| try: | |
| nltk.download(resource, download_dir=nltk_data_dir, quiet=True) | |
| except Exception as e: | |
| st.warning(f"Could not download {resource}: {str(e)}") | |
| # Initialize stopwords | |
| try: | |
| self.stop_words = set(nltk.corpus.stopwords.words('english')) | |
| except Exception as e: | |
| st.warning(f"Could not load stopwords, using empty set: {str(e)}") | |
| self.stop_words = set() | |
| except Exception as e: | |
| st.error(f"Error initializing NLP components: {str(e)}") | |
| raise | |
| def _ensure_ontology_exists(self): | |
| """Ensure the legal ontology file exists, create if not.""" | |
| if not os.path.exists(self.ontology_path): | |
| default_ontology = { | |
| "@graph": [ | |
| { | |
| "@id": "concept:Contract", | |
| "@type": "vocab:LegalConcept", | |
| "rdfs:label": "Contract", | |
| "rdfs:comment": "A legally binding agreement between parties", | |
| "vocab:relatedConcepts": ["Offer", "Acceptance", "Consideration"] | |
| }, | |
| { | |
| "@id": "concept:Judgment", | |
| "@type": "vocab:LegalConcept", | |
| "rdfs:label": "Judgment", | |
| "rdfs:comment": "A court's final determination", | |
| "vocab:relatedConcepts": ["Court Order", "Decision", "Ruling"] | |
| } | |
| ] | |
| } | |
| with open(self.ontology_path, 'w') as f: | |
| json.dump(default_ontology, f, indent=2) | |
| def _load_ontology(self) -> Dict: | |
| """Load legal ontology with error handling.""" | |
| try: | |
| if os.path.exists(self.ontology_path): | |
| with open(self.ontology_path, 'r') as f: | |
| return json.load(f) | |
| return {"@graph": []} | |
| except Exception as e: | |
| st.error(f"Error loading ontology: {str(e)}") | |
| return {"@graph": []} | |
| def process_and_tag_document(self, file) -> Tuple[str, List[Dict], Dict]: | |
| """Process document with enhanced metadata extraction and chunking.""" | |
| try: | |
| # Generate unique document ID | |
| doc_id = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| # Create document directory | |
| doc_dir = os.path.join(self.processed_path, doc_id) | |
| os.makedirs(doc_dir, exist_ok=True) | |
| # Save original file | |
| original_path = os.path.join(doc_dir, "original" + Path(file.name).suffix) | |
| with open(original_path, 'wb') as f: | |
| f.write(file.getvalue()) | |
| # Extract text and perform initial processing | |
| text = "" | |
| try: | |
| text, chunks = self.process_document(original_path) | |
| except Exception as e: | |
| st.error(f"Error processing document content: {str(e)}") | |
| raise | |
| # Extract and enrich metadata | |
| try: | |
| metadata = self._extract_metadata(text, file.name) | |
| metadata['doc_id'] = doc_id | |
| metadata['original_path'] = original_path | |
| except Exception as e: | |
| st.error(f"Error extracting metadata: {str(e)}") | |
| raise | |
| # Save processed content | |
| try: | |
| # Save processed text | |
| text_path = os.path.join(doc_dir, "processed.txt") | |
| with open(text_path, 'w', encoding='utf-8') as f: | |
| f.write(text) | |
| # Save chunks | |
| chunks_path = os.path.join(doc_dir, "chunks.json") | |
| with open(chunks_path, 'w') as f: | |
| json.dump(chunks, f, indent=2) | |
| # Save metadata | |
| metadata_path = os.path.join(doc_dir, "metadata.json") | |
| with open(metadata_path, 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| except Exception as e: | |
| st.error(f"Error saving processed content: {str(e)}") | |
| raise | |
| return text, chunks, metadata | |
| except Exception as e: | |
| st.error(f"Error in document processing pipeline: {str(e)}") | |
| raise | |
| def process_document(self, file_path: str) -> Tuple[str, List[Dict]]: | |
| """Process a document based on its type.""" | |
| file_type = Path(file_path).suffix.lower() | |
| if file_type == '.pdf': | |
| text = self._process_pdf(file_path) | |
| elif file_type == '.docx': | |
| text = self._process_docx(file_path) | |
| elif file_type in ['.txt', '.csv']: | |
| text = self._process_text(file_path) | |
| else: | |
| raise ValueError(f"Unsupported file type: {file_type}") | |
| # Create chunks with enhanced metadata | |
| chunks = self._create_chunks(text) | |
| return text, chunks | |
| def _process_pdf(self, file_path: str) -> str: | |
| """Extract text from PDF with OCR fallback.""" | |
| try: | |
| reader = pypdf.PdfReader(file_path) | |
| text = "" | |
| for page_num, page in enumerate(reader.pages, 1): | |
| page_text = page.extract_text() | |
| if page_text.strip(): | |
| text += f"\n--- Page {page_num} ---\n{page_text}" | |
| else: | |
| # Perform OCR if text extraction fails | |
| st.info(f"Performing OCR for page {page_num}...") | |
| with open(file_path, 'rb') as pdf_file: | |
| images = convert_from_bytes(pdf_file.read()) | |
| page_text = pytesseract.image_to_string(images[page_num - 1]) | |
| text += f"\n--- Page {page_num} (OCR) ---\n{page_text}" | |
| return text | |
| except Exception as e: | |
| st.error(f"Error processing PDF: {str(e)}") | |
| raise | |
| def _process_docx(self, file_path: str) -> str: | |
| """Process DOCX files with metadata.""" | |
| try: | |
| doc = docx.Document(file_path) | |
| text = "" | |
| for para in doc.paragraphs: | |
| if para.text.strip(): | |
| text += para.text + "\n" | |
| return text | |
| except Exception as e: | |
| st.error(f"Error processing DOCX: {str(e)}") | |
| raise | |
| def _process_text(self, file_path: str) -> str: | |
| """Process text files with encoding detection.""" | |
| try: | |
| with open(file_path, 'rb') as f: | |
| raw_data = f.read() | |
| # Detect encoding | |
| result = chardet.detect(raw_data) | |
| encoding = result['encoding'] if result['confidence'] > 0.7 else 'utf-8' | |
| # Decode text | |
| return raw_data.decode(encoding) | |
| except Exception as e: | |
| st.error(f"Error processing text file: {str(e)}") | |
| raise | |
| def _create_chunks(self, text: str) -> List[Dict]: | |
| """Create enhanced chunks with NLP analysis.""" | |
| try: | |
| # Split into sentences | |
| sentences = self._tokenize_text(text) | |
| chunks = [] | |
| current_chunk = [] | |
| current_length = 0 | |
| chunk_size = 500 # Target chunk size | |
| for sentence in sentences: | |
| sentence_length = len(sentence) | |
| if current_length + sentence_length > chunk_size and current_chunk: | |
| # Process current chunk | |
| chunk_text = ' '.join(current_chunk) | |
| chunks.append(self._process_chunk(chunk_text, len(chunks))) | |
| current_chunk = [] | |
| current_length = 0 | |
| current_chunk.append(sentence) | |
| current_length += sentence_length | |
| # Process final chunk | |
| if current_chunk: | |
| chunk_text = ' '.join(current_chunk) | |
| chunks.append(self._process_chunk(chunk_text, len(chunks))) | |
| return chunks | |
| except Exception as e: | |
| st.error(f"Error creating chunks: {str(e)}") | |
| raise | |
| def _tokenize_text(self, text: str) -> List[str]: | |
| """Tokenize text with fallback options.""" | |
| try: | |
| return sent_tokenize(text) | |
| except Exception: | |
| # Fallback to basic splitting | |
| return [s.strip() for s in text.split('.') if s.strip()] | |
| def _process_chunk(self, text: str, chunk_id: int) -> Dict: | |
| """Process a single chunk with NLP analysis.""" | |
| try: | |
| doc = self.nlp(text) | |
| return { | |
| 'chunk_id': chunk_id, | |
| 'text': text, | |
| 'entities': [(ent.text, ent.label_) for ent in doc.ents], | |
| 'noun_phrases': [chunk.text for chunk in doc.noun_chunks], | |
| 'word_count': len([token for token in doc if not token.is_space]), | |
| 'sentence_count': len(list(doc.sents)), | |
| 'ontology_links': self._link_to_ontology(text) | |
| } | |
| except Exception as e: | |
| st.error(f"Error processing chunk: {str(e)}") | |
| raise | |
| def _extract_metadata(self, text: str, file_name: str) -> Dict: | |
| """Extract enhanced metadata from document.""" | |
| try: | |
| doc = self.nlp(text[:10000]) # Process first 10k chars for efficiency | |
| metadata = { | |
| 'filename': file_name, | |
| 'file_type': Path(file_name).suffix.lower(), | |
| 'processed_at': datetime.now().isoformat(), | |
| 'word_count': len([token for token in doc if not token.is_space]), | |
| 'sentence_count': len(list(doc.sents)), | |
| 'entities': self._extract_entities(doc), | |
| 'document_type': self._infer_document_type(text), | |
| 'language_stats': self._get_language_stats(doc), | |
| 'citations': self._extract_citations(text), | |
| 'dates': self._extract_dates(text), | |
| 'key_phrases': [chunk.text for chunk in doc.noun_chunks if len(chunk.text.split()) > 1][:10], | |
| 'ontology_concepts': self._link_to_ontology(text) | |
| } | |
| return metadata | |
| except Exception as e: | |
| st.error(f"Error extracting metadata: {str(e)}") | |
| raise | |
| def _extract_entities(self, doc) -> Dict[str, List[str]]: | |
| """Extract named entities with deduplication.""" | |
| entities = {} | |
| seen = set() | |
| for ent in doc.ents: | |
| if ent.text not in seen: | |
| if ent.label_ not in entities: | |
| entities[ent.label_] = [] | |
| entities[ent.label_].append(ent.text) | |
| seen.add(ent.text) | |
| return entities | |
| def _infer_document_type(self, text: str) -> str: | |
| """Infer document type using rule-based classification.""" | |
| type_patterns = { | |
| 'contract': ['agreement', 'parties', 'obligations', 'terms and conditions'], | |
| 'judgment': ['court', 'judge', 'ruling', 'ordered', 'judgment'], | |
| 'legislation': ['act', 'statute', 'regulation', 'amended', 'parliament'], | |
| 'memo': ['memorandum', 'memo', 'note', 'meeting minutes'] | |
| } | |
| text_lower = text.lower() | |
| scores = {doc_type: sum(1 for pattern in patterns if pattern in text_lower) | |
| for doc_type, patterns in type_patterns.items()} | |
| if not scores or max(scores.values()) == 0: | |
| return 'unknown' | |
| return max(scores.items(), key=lambda x: x[1])[0] | |
| def _extract_citations(self, text: str) -> List[Dict]: | |
| """Extract legal citations.""" | |
| citation_patterns = [ | |
| r'\[\d{4}\]\s+\w+\s+\d+', # [2021] EWHC 123 | |
| r'\d+\s+U\.S\.\s+\d+', # 123 U.S. 456 | |
| r'\(\d{4}\)\s+\d+\s+\w+\s+\d+' # (2021) 12 ABC 345 | |
| ] | |
| citations = [] | |
| for pattern in citation_patterns: | |
| matches = re.finditer(pattern, text) | |
| for match in matches: | |
| citations.append({ | |
| 'citation': match.group(), | |
| 'start_idx': match.start(), | |
| 'end_idx': match.end() | |
| }) | |
| return citations | |
| def _extract_dates(self, text: str) -> List[str]: | |
| """Extract dates with multiple formats.""" | |
| date_patterns = [ | |
| r'\d{1,2}/\d{1,2}/\d{2,4}', | |
| r'\d{1,2}-\d{1,2}-\d{2,4}', | |
| r'\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}' | |
| ] | |
| dates = [] | |
| for pattern in date_patterns: | |
| matches = re.finditer(pattern, text, re.IGNORECASE) | |
| dates.extend(match.group() for match in matches) | |
| return dates | |
| def _get_language_stats(self, doc) -> Dict: | |
| """Get detailed language statistics.""" | |
| return { | |
| 'sentence_count': len(list(doc.sents)), | |
| 'word_count': len([token for token in doc if not token.is_space]), | |
| 'avg_sentence_length': sum(len([token for token in sent if not token.is_space]) | |
| for sent in doc.sents) / len(list(doc.sents)) if doc.sents else 0, | |
| 'unique_words': len(set(token.text.lower() for token in doc if not token.is_space)) | |
| } | |
| def _link_to_ontology(self, text: str) -> List[Dict]: | |
| """Link text to ontology concepts.""" | |
| relevant_concepts = [] | |
| text_lower = text.lower() | |
| for concept in self.ontology.get("@graph", []): | |
| if "rdfs:label" not in concept: | |
| continue | |
| label = concept["rdfs:label"].lower() | |
| if label in text_lower: | |
| # Get surrounding context | |
| start_idx = text_lower.index(label) | |
| context_start = max(0, start_idx - 100) | |
| context_end = min(len(text), start_idx + len(label) + 100) | |
| relevant_concepts.append({ | |
| 'concept': concept['rdfs:label'], | |
| 'type': concept.get('@type', 'Unknown'), | |
| 'description': concept.get('rdfs:comment', ''), | |
| 'context': text[context_start:context_end].strip(), | |
| 'location': {'start': start_idx, 'end': start_idx + len(label)} | |
| }) | |
| return relevant_concepts | |
| def get_document_path(self, doc_id: str) -> Optional[str]: | |
| """Get the path to a processed document.""" | |
| doc_dir = os.path.join(self.processed_path, doc_id) | |
| if not os.path.exists(doc_dir): | |
| return None | |
| return doc_dir | |
| def get_document_metadata(self, doc_id: str) -> Optional[Dict]: | |
| """Get metadata for a processed document.""" | |
| doc_dir = self.get_document_path(doc_id) | |
| if not doc_dir: | |
| return None | |
| metadata_path = os.path.join(doc_dir, "metadata.json") | |
| try: | |
| with open(metadata_path, 'r') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| st.error(f"Error loading metadata for document {doc_id}: {str(e)}") | |
| return None | |
| def get_document_chunks(self, doc_id: str) -> Optional[List[Dict]]: | |
| """Get chunks for a processed document.""" | |
| doc_dir = self.get_document_path(doc_id) | |
| if not doc_dir: | |
| return None | |
| chunks_path = os.path.join(doc_dir, "chunks.json") | |
| try: | |
| with open(chunks_path, 'r') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| st.error(f"Error loading chunks for document {doc_id}: {str(e)}") | |
| return None | |
| def cleanup(self): | |
| """Clean up temporary files.""" | |
| try: | |
| shutil.rmtree(self.temp_path) | |
| os.makedirs(self.temp_path, exist_ok=True) | |
| except Exception as e: | |
| st.warning(f"Error cleaning up temporary files: {str(e)}") | |
| def __enter__(self): | |
| """Context manager entry.""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit with cleanup.""" | |
| self.cleanup() |