Spaces:
Runtime error
Runtime error
| import requests | |
| import PyPDF2 | |
| import docx | |
| from io import BytesIO | |
| from typing import Dict, List | |
| import re | |
| from pathlib import Path | |
| class AdvancedDocumentProcessor: | |
| def __init__(self): | |
| self.supported_formats = ['.pdf', '.docx', '.txt'] | |
| self.chunk_size = 600 # words per chunk | |
| self.chunk_overlap = 100 | |
| # === Public methods === | |
| def process_document(self, url: str) -> Dict: | |
| """Download document from URL and extract text + metadata""" | |
| try: | |
| response = requests.get(url, timeout=30) | |
| response.raise_for_status() | |
| if url.endswith('.pdf') or 'pdf' in response.headers.get('content-type', ''): | |
| text = self._extract_pdf_text(response.content) | |
| elif url.endswith('.docx') or 'word' in response.headers.get('content-type', ''): | |
| text = self._extract_docx_text(response.content) | |
| else: | |
| text = response.text | |
| return { | |
| 'text': text, | |
| 'metadata': self._extract_metadata(text), | |
| 'document_type': self._detect_document_type(text), | |
| 'url': url | |
| } | |
| except Exception as e: | |
| raise Exception(f"Error processing document: {str(e)}") | |
| def process_file(self, file_path: str) -> List[Dict]: | |
| """ | |
| Process a local file path into chunks with metadata | |
| Returns: list of {id, text, page, metadata} | |
| """ | |
| path = Path(file_path) | |
| suffix = path.suffix.lower() | |
| if suffix == ".pdf": | |
| pages = self._pdf_pages_from_path(file_path) | |
| elif suffix == ".docx": | |
| pages = [self._extract_docx_text_from_path(file_path)] | |
| elif suffix == ".txt": | |
| pages = [Path(file_path).read_text(encoding="utf-8", errors="ignore")] | |
| else: | |
| raise ValueError(f"Unsupported file format: {suffix}") | |
| # Chunk each page and add page number metadata | |
| chunks = [] | |
| idx = 0 | |
| for pnum, page_text in enumerate(pages, start=1): | |
| for chunk in self._chunk_text(page_text): | |
| chunks.append({ | |
| "id": f"chunk-{idx}", | |
| "text": chunk, | |
| "page": pnum, | |
| "metadata": self._extract_metadata(chunk) | |
| }) | |
| idx += 1 | |
| return chunks | |
| # === Internal extractors === | |
| def _extract_pdf_text(self, content: bytes) -> str: | |
| pdf_file = BytesIO(content) | |
| reader = PyPDF2.PdfReader(pdf_file) | |
| text = "" | |
| for page in reader.pages: | |
| page_text = page.extract_text() or "" | |
| text += page_text + "\n" | |
| return text | |
| def _pdf_pages_from_path(self, file_path: str) -> List[str]: | |
| reader = PyPDF2.PdfReader(file_path) | |
| return [(p.extract_text() or "") for p in reader.pages] | |
| def _extract_docx_text(self, content: bytes) -> str: | |
| doc_file = BytesIO(content) | |
| doc = docx.Document(doc_file) | |
| return "\n".join(p.text for p in doc.paragraphs) | |
| def _extract_docx_text_from_path(self, file_path: str) -> str: | |
| doc = docx.Document(file_path) | |
| return "\n".join(p.text for p in doc.paragraphs) | |
| # === Metadata & type detection === | |
| def _extract_metadata(self, text: str) -> Dict: | |
| return { | |
| 'word_count': len(text.split()), | |
| 'character_count': len(text), | |
| 'paragraph_count': len(text.split('\n\n')), | |
| 'has_tables': 'table' in text.lower(), | |
| 'has_sections': bool(re.search(r'\b(section|clause|article)\s+\d+', text.lower())) | |
| } | |
| def _detect_document_type(self, text: str) -> str: | |
| text_lower = text.lower() | |
| if any(word in text_lower for word in ['policy', 'insurance', 'premium', 'coverage']): | |
| return 'insurance_policy' | |
| elif any(word in text_lower for word in ['contract', 'agreement', 'terms']): | |
| return 'legal_contract' | |
| elif any(word in text_lower for word in ['employee', 'hr', 'benefits', 'salary']): | |
| return 'hr_document' | |
| else: | |
| return 'general_document' | |
| # === Chunking === | |
| def _chunk_text(self, text: str) -> List[str]: | |
| words = text.split() | |
| chunks = [] | |
| i = 0 | |
| while i < len(words): | |
| chunk_words = words[i:i + self.chunk_size] | |
| chunk_text = " ".join(chunk_words) | |
| chunks.append(chunk_text) | |
| i += self.chunk_size - self.chunk_overlap | |
| return chunks | |