Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import fitz # PyMuPDF | |
| from typing import List, Optional | |
| from docx import Document | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| import tempfile | |
| import logging | |
| import warnings | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def clean_text(text: str) -> str: | |
| """Clean extracted text by removing extra whitespace and invalid characters.""" | |
| text = re.sub(r'\s+', ' ', text) # Remove multiple spaces | |
| text = ''.join(char for char in text if char.isprintable() or char == '\n') # Remove non-printable characters | |
| text = re.sub(r'\n\s*\n', '\n\n', text) # Remove multiple newlines | |
| return text.strip() | |
| def extract_text_from_pdf(file_path: str) -> Optional[str]: | |
| """ | |
| Extract text from PDF using PyMuPDF (faster than pdfplumber). | |
| """ | |
| try: | |
| doc = fitz.open(file_path) | |
| text = "\n".join(page.get_text("text") for page in doc) | |
| return clean_text(text) if text else None | |
| except Exception as e: | |
| logger.error(f"Error extracting text from {file_path} using PyMuPDF: {e}") | |
| return None | |
| def extract_text_from_docx(file_path: str) -> Optional[str]: | |
| """ | |
| Extract text from DOCX with enhanced error handling. | |
| """ | |
| try: | |
| doc = Document(file_path) | |
| text = '\n'.join(para.text for para in doc.paragraphs if para.text.strip()) | |
| return clean_text(text) if text else None | |
| except Exception as e: | |
| logger.error(f"Failed to process DOCX {file_path}: {e}") | |
| return None | |
| def extract_text_from_txt(file_path: str) -> Optional[str]: | |
| """ | |
| Extract text from plain text files with encoding fallback. | |
| """ | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as file: | |
| text = clean_text(file.read()) | |
| except UnicodeDecodeError: | |
| try: | |
| with open(file_path, "r", encoding="latin-1") as file: | |
| text = clean_text(file.read()) | |
| except Exception as e: | |
| logger.error(f"Failed to read text file {file_path}: {e}") | |
| return None | |
| return text | |
| def extract_text_from_file(uploaded_file) -> Optional[str]: | |
| """ | |
| Extract text from various file types. | |
| """ | |
| if isinstance(uploaded_file, str): # Handle direct file paths | |
| file_path = uploaded_file | |
| else: # Handle file-like objects (e.g., uploaded files) | |
| with tempfile.NamedTemporaryFile(delete=False) as temp_file: | |
| temp_file.write(uploaded_file.read()) # Save file contents temporarily | |
| file_path = temp_file.name # Temporary file path | |
| if not os.path.exists(file_path): | |
| logger.error(f"File not found: {file_path}") | |
| return None | |
| _, file_extension = os.path.splitext(file_path) | |
| file_extension = file_extension.lower() | |
| try: | |
| if file_extension == ".pdf": | |
| text = extract_text_from_pdf(file_path) # Use PyMuPDF | |
| elif file_extension == ".docx": | |
| text = extract_text_from_docx(file_path) | |
| elif file_extension == ".txt": | |
| text = extract_text_from_txt(file_path) | |
| else: | |
| logger.warning(f"Unsupported file type: {file_extension}") | |
| return None | |
| if not text: | |
| logger.warning(f"No text content extracted from {file_path}") | |
| return None | |
| return text | |
| except Exception as e: | |
| logger.error(f"Error extracting text from {file_path}: {e}") | |
| return None | |
| def split_text(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]: | |
| """ | |
| Split text into chunks with improved handling and validation. | |
| """ | |
| if not text: | |
| logger.warning("Empty text provided for splitting") | |
| return [] | |
| try: | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| length_function=len, | |
| is_separator_regex=False | |
| ) | |
| splits = text_splitter.split_text(text) | |
| logger.info(f"Split text into {len(splits)} chunks") | |
| return splits | |
| except Exception as e: | |
| logger.error(f"Error splitting text: {e}") | |
| return [] |