""" CASCADE Universal File Extractor Powered by Apache Tika - Professional document processing Handles ANY file format with proper metadata and content extraction """ import os import json import tempfile from pathlib import Path from typing import List, Dict, Any, Tuple, Optional import pandas as pd import hashlib from datetime import datetime # Try to import Apache Tika (professional solution) try: from tika import parser TIKA_AVAILABLE = True except ImportError: TIKA_AVAILABLE = False print("⚠️ Apache Tika not installed. Install with: pip install tika") # Fallback extractors try: import fitz # PyMuPDF PDF_AVAILABLE = True except ImportError: PDF_AVAILABLE = False try: import pdfplumber PDFPLUMBER_AVAILABLE = True except ImportError: PDFPLUMBER_AVAILABLE = False try: from PyPDF2 import PdfReader PYPDF2_AVAILABLE = True except ImportError: PYPDF2_AVAILABLE = False try: import docx DOCX_AVAILABLE = True except ImportError: DOCX_AVAILABLE = False try: import openpyxl XLSX_AVAILABLE = True except ImportError: XLSX_AVAILABLE = False try: import pandas as pd PANDAS_AVAILABLE = True except ImportError: PANDAS_AVAILABLE = False class UniversalExtractor: """ Professional file extractor using Apache Tika Can handle ANY file format known to man """ def __init__(self): self.session = None if TIKA_AVAILABLE: # Start Tika server if not running parser.from_buffer('') def extract_file(self, file_path: str) -> Dict[str, Any]: """ Extract content and metadata from ANY file Returns: Dict with: - content: Full text content - metadata: File metadata - file_info: Basic file info - error: Error message if any """ result = { "content": "", "metadata": {}, "file_info": self._get_file_info(file_path), "error": None } try: # Use Apache Tika if available (best option) if TIKA_AVAILABLE: parsed = parser.from_file(file_path, service_url='http://localhost:9998') result["content"] = parsed.get("content", "") result["metadata"] = parsed.get("metadata", {}) # Add Tika-specific metadata if result["metadata"]: result["metadata"]["extractor"] = "Apache Tika" result["metadata"]["extraction_timestamp"] = datetime.now().isoformat() # Fallback to format-specific extractors else: result = self._fallback_extract(file_path, result) except Exception as e: result["error"] = str(e) # Try fallback if Tika fails if TIKA_AVAILABLE: result = self._fallback_extract(file_path, result) return result def _get_file_info(self, file_path: str) -> Dict[str, Any]: """Get basic file information""" path = Path(file_path) # Calculate file hash hash_md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return { "name": path.name, "extension": path.suffix.lower(), "size": path.stat().st_size, "hash_md5": hash_md5.hexdigest(), "modified": datetime.fromtimestamp(path.stat().st_mtime).isoformat() } def _fallback_extract(self, file_path: str, result: Dict[str, Any]) -> Dict[str, Any]: """Fallback extraction without Tika""" ext = Path(file_path).suffix.lower() # PDF files if ext == ".pdf": content = self._extract_pdf(file_path) if content: result["content"] = content result["metadata"]["extractor"] = "PDF fallback" # Office documents elif ext in [".docx", ".doc"]: content = self._extract_docx(file_path) if content: result["content"] = content result["metadata"]["extractor"] = "DOCX fallback" # Excel files elif ext in [".xlsx", ".xls"]: content = self._extract_excel(file_path) if content: result["content"] = content result["metadata"]["extractor"] = "Excel fallback" # Images with OCR (if available) elif ext in [".jpg", ".jpeg", ".png", ".tiff", ".bmp"]: content = self._extract_image(file_path) if content: result["content"] = content result["metadata"]["extractor"] = "Image OCR fallback" # Code files elif ext in [".py", ".js", ".java", ".cpp", ".c", ".h", ".css", ".html", ".xml", ".json", ".yaml", ".yml"]: with open(file_path, "r", encoding="utf-8", errors="ignore") as f: result["content"] = f.read() result["metadata"]["extractor"] = "Text reader" return result def _extract_pdf(self, file_path: str) -> Optional[str]: """Extract text from PDF using multiple methods""" content = "" # Try PyMuPDF first (best quality) if PDF_AVAILABLE: try: doc = fitz.open(file_path) for page in doc: content += page.get_text() + "\n" doc.close() if content.strip(): return content except: pass # Try pdfplumber if PDFPLUMBER_AVAILABLE: try: import pdfplumber with pdfplumber.open(file_path) as pdf: for page in pdf.pages: text = page.extract_text() or "" content += text + "\n" if content.strip(): return content except: pass # Try PyPDF2 if PYPDF2_AVAILABLE: try: reader = PdfReader(file_path) for page in reader.pages: text = page.extract_text() or "" content += text + "\n" if content.strip(): return content except: pass return content if content.strip() else None def _extract_docx(self, file_path: str) -> Optional[str]: """Extract text from DOCX""" if DOCX_AVAILABLE: try: doc = docx.Document(file_path) content = "" for paragraph in doc.paragraphs: content += paragraph.text + "\n" return content if content.strip() else None except: pass return None def _extract_excel(self, file_path: str) -> Optional[str]: """Extract text from Excel""" if XLSX_AVAILABLE and PANDAS_AVAILABLE: try: # Read all sheets content = "" excel_file = pd.ExcelFile(file_path) for sheet_name in excel_file.sheet_names: df = pd.read_excel(file_path, sheet_name=sheet_name) content += f"\n=== Sheet: {sheet_name} ===\n" content += df.to_string() + "\n" return content if content.strip() else None except: pass return None def _extract_image(self, file_path: str) -> Optional[str]: """Extract text from image using OCR (if available)""" # Try OCR if pytesseract is available try: import pytesseract from PIL import Image image = Image.open(file_path) text = pytesseract.image_to_string(image) return text if text.strip() else None except: return None def process_folder(self, folder_files: List[Any]) -> Tuple[pd.DataFrame, Dict[str, Any]]: """ Process multiple files and create a unified dataset Args: folder_files: List of uploaded file objects Returns: Tuple of (DataFrame with all content, processing_summary) """ all_records = [] file_summary = [] for file_obj in folder_files: try: # Extract from file extracted = self.extract_file(file_obj.name) # Create record record = { "file_name": extracted["file_info"]["name"], "file_extension": extracted["file_info"]["extension"], "file_size": extracted["file_info"]["size"], "file_hash": extracted["file_info"]["hash_md5"], "content": extracted["content"], "extractor": extracted["metadata"].get("extractor", "unknown"), "extraction_timestamp": datetime.now().isoformat(), "error": extracted["error"] } # Add metadata as JSON if extracted["metadata"]: record["metadata"] = json.dumps(extracted["metadata"]) all_records.append(record) # Summary file_summary.append({ "file": extracted["file_info"]["name"], "status": "success" if extracted["content"] else "failed", "content_length": len(extracted["content"]), "extractor": extracted["metadata"].get("extractor", "unknown"), "error": extracted["error"] }) except Exception as e: file_summary.append({ "file": getattr(file_obj, 'name', 'unknown'), "status": "error", "error": str(e) }) # Create DataFrame if all_records: df = pd.DataFrame(all_records) summary = { "total_files": len(folder_files), "processed": len([s for s in file_summary if s["status"] == "success"]), "failed": len([s for s in file_summary if s["status"] != "success"]), "total_content_chars": df["content"].str.len().sum(), "file_details": file_summary } return df, summary return None, {"error": "No files processed", "details": file_summary} # Convenience function def extract_from_files(file_list: List[Any]) -> Tuple[pd.DataFrame, Dict[str, Any]]: """ Extract content from multiple files using the universal extractor Args: file_list: List of file objects Returns: Tuple of (DataFrame, summary) """ extractor = UniversalExtractor() return extractor.process_folder(file_list)