Cascade / cascade /system /universal_extractor.py
tostido's picture
Initial commit - cascade-lattice 0.5.4
77bcbf1
"""
CASCADE Universal File Extractor
Powered by Apache Tika - Professional document processing
Handles ANY file format with proper metadata and content extraction
"""
import os
import json
import tempfile
from pathlib import Path
from typing import List, Dict, Any, Tuple, Optional
import pandas as pd
import hashlib
from datetime import datetime
# Try to import Apache Tika (professional solution)
try:
from tika import parser
TIKA_AVAILABLE = True
except ImportError:
TIKA_AVAILABLE = False
print("⚠️ Apache Tika not installed. Install with: pip install tika")
# Fallback extractors
try:
import fitz # PyMuPDF
PDF_AVAILABLE = True
except ImportError:
PDF_AVAILABLE = False
try:
import pdfplumber
PDFPLUMBER_AVAILABLE = True
except ImportError:
PDFPLUMBER_AVAILABLE = False
try:
from PyPDF2 import PdfReader
PYPDF2_AVAILABLE = True
except ImportError:
PYPDF2_AVAILABLE = False
try:
import docx
DOCX_AVAILABLE = True
except ImportError:
DOCX_AVAILABLE = False
try:
import openpyxl
XLSX_AVAILABLE = True
except ImportError:
XLSX_AVAILABLE = False
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
PANDAS_AVAILABLE = False
class UniversalExtractor:
"""
Professional file extractor using Apache Tika
Can handle ANY file format known to man
"""
def __init__(self):
self.session = None
if TIKA_AVAILABLE:
# Start Tika server if not running
parser.from_buffer('')
def extract_file(self, file_path: str) -> Dict[str, Any]:
"""
Extract content and metadata from ANY file
Returns:
Dict with:
- content: Full text content
- metadata: File metadata
- file_info: Basic file info
- error: Error message if any
"""
result = {
"content": "",
"metadata": {},
"file_info": self._get_file_info(file_path),
"error": None
}
try:
# Use Apache Tika if available (best option)
if TIKA_AVAILABLE:
parsed = parser.from_file(file_path, service_url='http://localhost:9998')
result["content"] = parsed.get("content", "")
result["metadata"] = parsed.get("metadata", {})
# Add Tika-specific metadata
if result["metadata"]:
result["metadata"]["extractor"] = "Apache Tika"
result["metadata"]["extraction_timestamp"] = datetime.now().isoformat()
# Fallback to format-specific extractors
else:
result = self._fallback_extract(file_path, result)
except Exception as e:
result["error"] = str(e)
# Try fallback if Tika fails
if TIKA_AVAILABLE:
result = self._fallback_extract(file_path, result)
return result
def _get_file_info(self, file_path: str) -> Dict[str, Any]:
"""Get basic file information"""
path = Path(file_path)
# Calculate file hash
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return {
"name": path.name,
"extension": path.suffix.lower(),
"size": path.stat().st_size,
"hash_md5": hash_md5.hexdigest(),
"modified": datetime.fromtimestamp(path.stat().st_mtime).isoformat()
}
def _fallback_extract(self, file_path: str, result: Dict[str, Any]) -> Dict[str, Any]:
"""Fallback extraction without Tika"""
ext = Path(file_path).suffix.lower()
# PDF files
if ext == ".pdf":
content = self._extract_pdf(file_path)
if content:
result["content"] = content
result["metadata"]["extractor"] = "PDF fallback"
# Office documents
elif ext in [".docx", ".doc"]:
content = self._extract_docx(file_path)
if content:
result["content"] = content
result["metadata"]["extractor"] = "DOCX fallback"
# Excel files
elif ext in [".xlsx", ".xls"]:
content = self._extract_excel(file_path)
if content:
result["content"] = content
result["metadata"]["extractor"] = "Excel fallback"
# Images with OCR (if available)
elif ext in [".jpg", ".jpeg", ".png", ".tiff", ".bmp"]:
content = self._extract_image(file_path)
if content:
result["content"] = content
result["metadata"]["extractor"] = "Image OCR fallback"
# Code files
elif ext in [".py", ".js", ".java", ".cpp", ".c", ".h", ".css", ".html", ".xml", ".json", ".yaml", ".yml"]:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
result["content"] = f.read()
result["metadata"]["extractor"] = "Text reader"
return result
def _extract_pdf(self, file_path: str) -> Optional[str]:
"""Extract text from PDF using multiple methods"""
content = ""
# Try PyMuPDF first (best quality)
if PDF_AVAILABLE:
try:
doc = fitz.open(file_path)
for page in doc:
content += page.get_text() + "\n"
doc.close()
if content.strip():
return content
except:
pass
# Try pdfplumber
if PDFPLUMBER_AVAILABLE:
try:
import pdfplumber
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
text = page.extract_text() or ""
content += text + "\n"
if content.strip():
return content
except:
pass
# Try PyPDF2
if PYPDF2_AVAILABLE:
try:
reader = PdfReader(file_path)
for page in reader.pages:
text = page.extract_text() or ""
content += text + "\n"
if content.strip():
return content
except:
pass
return content if content.strip() else None
def _extract_docx(self, file_path: str) -> Optional[str]:
"""Extract text from DOCX"""
if DOCX_AVAILABLE:
try:
doc = docx.Document(file_path)
content = ""
for paragraph in doc.paragraphs:
content += paragraph.text + "\n"
return content if content.strip() else None
except:
pass
return None
def _extract_excel(self, file_path: str) -> Optional[str]:
"""Extract text from Excel"""
if XLSX_AVAILABLE and PANDAS_AVAILABLE:
try:
# Read all sheets
content = ""
excel_file = pd.ExcelFile(file_path)
for sheet_name in excel_file.sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name)
content += f"\n=== Sheet: {sheet_name} ===\n"
content += df.to_string() + "\n"
return content if content.strip() else None
except:
pass
return None
def _extract_image(self, file_path: str) -> Optional[str]:
"""Extract text from image using OCR (if available)"""
# Try OCR if pytesseract is available
try:
import pytesseract
from PIL import Image
image = Image.open(file_path)
text = pytesseract.image_to_string(image)
return text if text.strip() else None
except:
return None
def process_folder(self, folder_files: List[Any]) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""
Process multiple files and create a unified dataset
Args:
folder_files: List of uploaded file objects
Returns:
Tuple of (DataFrame with all content, processing_summary)
"""
all_records = []
file_summary = []
for file_obj in folder_files:
try:
# Extract from file
extracted = self.extract_file(file_obj.name)
# Create record
record = {
"file_name": extracted["file_info"]["name"],
"file_extension": extracted["file_info"]["extension"],
"file_size": extracted["file_info"]["size"],
"file_hash": extracted["file_info"]["hash_md5"],
"content": extracted["content"],
"extractor": extracted["metadata"].get("extractor", "unknown"),
"extraction_timestamp": datetime.now().isoformat(),
"error": extracted["error"]
}
# Add metadata as JSON
if extracted["metadata"]:
record["metadata"] = json.dumps(extracted["metadata"])
all_records.append(record)
# Summary
file_summary.append({
"file": extracted["file_info"]["name"],
"status": "success" if extracted["content"] else "failed",
"content_length": len(extracted["content"]),
"extractor": extracted["metadata"].get("extractor", "unknown"),
"error": extracted["error"]
})
except Exception as e:
file_summary.append({
"file": getattr(file_obj, 'name', 'unknown'),
"status": "error",
"error": str(e)
})
# Create DataFrame
if all_records:
df = pd.DataFrame(all_records)
summary = {
"total_files": len(folder_files),
"processed": len([s for s in file_summary if s["status"] == "success"]),
"failed": len([s for s in file_summary if s["status"] != "success"]),
"total_content_chars": df["content"].str.len().sum(),
"file_details": file_summary
}
return df, summary
return None, {"error": "No files processed", "details": file_summary}
# Convenience function
def extract_from_files(file_list: List[Any]) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""
Extract content from multiple files using the universal extractor
Args:
file_list: List of file objects
Returns:
Tuple of (DataFrame, summary)
"""
extractor = UniversalExtractor()
return extractor.process_folder(file_list)