Spaces:
Sleeping
Sleeping
File size: 8,372 Bytes
3736c33 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 | import PyPDF2
import pdfplumber
from docx import Document
from pathlib import Path
from typing import List, Dict
import re
import warnings
import logging
# Suppress PyPDF2 warnings about font descriptors
warnings.filterwarnings('ignore', category=UserWarning, module='PyPDF2')
logging.getLogger('PyPDF2').setLevel(logging.ERROR)
class DocumentProcessor:
"""Process various document types and extract text content."""
def __init__(self):
self.supported_formats = ['.pdf', '.txt', '.docx']
def process_file(self, file_path: Path) -> Dict[str, any]:
"""
Process a single file and extract its content.
Args:
file_path: Path to the file
Returns:
Dictionary containing file metadata and content
"""
suffix = file_path.suffix.lower()
if suffix == '.pdf':
content = self._extract_pdf(file_path)
elif suffix == '.txt':
content = self._extract_txt(file_path)
elif suffix == '.docx':
content = self._extract_docx(file_path)
else:
raise ValueError(f"Unsupported file format: {suffix}")
return {
'filename': file_path.name,
'path': str(file_path),
'content': content,
'format': suffix
}
def _extract_pdf(self, file_path: Path) -> str:
"""Extract text from PDF using pdfplumber with PyPDF2 fallback."""
text = ""
try:
# Primary: Use pdfplumber (better for complex PDFs)
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
except Exception as e:
# Fallback: Use PyPDF2 with warnings suppressed
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
try:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
except Exception:
continue # Skip problematic pages
except Exception as e2:
raise ValueError(f"Could not extract text from PDF: {file_path.name}")
return self._clean_text(text)
def _extract_txt(self, file_path: Path) -> str:
"""Extract text from TXT file."""
try:
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
except UnicodeDecodeError:
with open(file_path, 'r', encoding='latin-1') as file:
text = file.read()
return self._clean_text(text)
def _extract_docx(self, file_path: Path) -> str:
"""Extract text from DOCX file."""
doc = Document(file_path)
text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
return self._clean_text(text)
def _clean_text(self, text: str) -> str:
"""Clean and normalize text."""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep punctuation
text = re.sub(r'[^\w\s.,!?;:()\-\'\"]+', '', text)
return text.strip()
def chunk_text(self, text: str, chunk_size: int = 512, overlap: int = 50, semantic: bool = True) -> List[str]:
"""
Split text into chunks using semantic or simple chunking.
Args:
text: The text to chunk
chunk_size: Target size of each chunk in characters
overlap: Number of overlapping characters between chunks
semantic: Use semantic chunking (by headers/concepts) if True
Returns:
List of text chunks
"""
if semantic:
return self._semantic_chunk(text, chunk_size, overlap)
else:
return self._simple_chunk(text, chunk_size, overlap)
def _semantic_chunk(self, text: str, target_size: int = 512, overlap: int = 50) -> List[str]:
"""
Chunk text by detecting headers and logical sections.
Perfect for lecture slides and structured documents.
"""
chunks = []
# Split by common header patterns
# Pattern 1: Lines that are ALL CAPS or Title Case followed by newline
# Pattern 2: Lines starting with numbers like "1.", "1.1", etc.
# Pattern 3: Lines with clear visual separators
# First, split by double newlines (paragraphs)
sections = text.split('\n\n')
current_chunk = ""
current_header = ""
for section in sections:
section = section.strip()
if not section:
continue
# Check if this looks like a header
is_header = self._is_likely_header(section)
if is_header and len(current_chunk) > 100:
# Save previous chunk and start new one with this header
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = section + "\n\n"
current_header = section
else:
# Add to current chunk
potential_chunk = current_chunk + section + "\n\n"
# If chunk is getting too large, split it
if len(potential_chunk) > target_size * 1.5:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = section + "\n\n"
else:
current_chunk = potential_chunk
# Add final chunk
if current_chunk:
chunks.append(current_chunk.strip())
# If semantic chunking produced too few chunks, fall back to simple chunking
if len(chunks) < len(text) / (target_size * 2):
return self._simple_chunk(text, target_size, overlap)
return chunks
def _is_likely_header(self, text: str) -> bool:
"""Detect if text is likely a header/title."""
# Too long to be a header
if len(text) > 200:
return False
# Single line headers
if '\n' not in text:
# ALL CAPS
if text.isupper() and len(text.split()) <= 10:
return True
# Title Case
if text.istitle() and len(text.split()) <= 10:
return True
# Numbered sections like "1.", "1.1", "Chapter 1"
if re.match(r'^(\d+\.)+\s+', text) or re.match(r'^(Chapter|Section|Part)\s+\d+', text, re.IGNORECASE):
return True
return False
def _simple_chunk(self, text: str, chunk_size: int = 512, overlap: int = 50) -> List[str]:
"""
Split text into overlapping chunks (original method).
"""
chunks = []
start = 0
text_length = len(text)
while start < text_length:
end = start + chunk_size
chunk = text[start:end]
# Try to break at sentence boundary
if end < text_length:
last_period = chunk.rfind('.')
last_newline = chunk.rfind('\n')
break_point = max(last_period, last_newline)
if break_point > chunk_size * 0.5: # At least 50% through the chunk
chunk = chunk[:break_point + 1]
end = start + break_point + 1
chunks.append(chunk.strip())
start = end - overlap
return chunks
|