Spaces:
Running
Running
| import os | |
| import io | |
| import logging | |
| import zipfile | |
| import tarfile | |
| import time | |
| import uvicorn | |
| import fitz # PyMuPDF | |
| import docx # python-docx | |
| import pptx # python-pptx | |
| import openpyxl | |
| import pandas as pd | |
| from PIL import Image | |
| import pytesseract | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Header, BackgroundTasks, Body | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from typing import List, Optional, Tuple | |
| import asyncio | |
| from concurrent.futures import ThreadPoolExecutor | |
| import magic | |
| import chardet | |
| import json | |
| import xml.etree.ElementTree as ET | |
| from pathlib import Path | |
| import tempfile | |
| import shutil | |
| import subprocess | |
| from pdf2image import convert_from_bytes | |
| import concurrent.futures | |
| from vector import vdb | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| from typing import List, Dict | |
| from fastapi.responses import JSONResponse | |
| import numpy as np | |
| import re | |
| # ==================== CONFIGURATION ==================== | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s | %(levelname)s | %(name)s | %(message)s' | |
| ) | |
| logger = logging.getLogger("ProductionExtractor") | |
| # Production Configuration | |
| class Config: | |
| MAX_ZIP_DEPTH = 3 | |
| MAX_FILES_IN_ZIP = 100 | |
| MAX_FILE_SIZE_MB = 50 | |
| MAX_TOTAL_SIZE_MB = 500 | |
| TIMEOUT_SECONDS = 300 | |
| WORKER_THREADS = 4 | |
| TEXTRACT_TIMEOUT = 30 | |
| MAX_PDF_PAGES = 100 | |
| TESSERACT_TIMEOUT = 60 | |
| ENABLE_OCR = True | |
| MAX_IMAGE_PIXELS = 80_000_000 # ~40MP limit for PIL | |
| OCR_LANGUAGE = os.getenv("TESSERACT_LANGUAGE", "eng+hin") | |
| class SearchRequest(BaseModel): | |
| query: str | |
| target: Optional[str] = None | |
| # Performance metrics tracking | |
| metrics = { | |
| "files_processed": 0, | |
| "total_bytes": 0, | |
| "processing_time": 0, | |
| "errors": [] | |
| } | |
| app = FastAPI( | |
| title="NeuralStream Production Extractor", | |
| version="1.0.0", | |
| description="High-performance file extraction service with support for 50+ file types", | |
| docs_url="/docs", | |
| redoc_url="/redoc" | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Thread pool for blocking operations | |
| executor = ThreadPoolExecutor(max_workers=Config.WORKER_THREADS) | |
| # Configure Tesseract path if needed | |
| if os.name == 'nt': # Windows | |
| tesseract_path = r'C:\Program Files\Tesseract-OCR\tesseract.exe' | |
| if os.path.exists(tesseract_path): | |
| pytesseract.pytesseract.tesseract_cmd = tesseract_path | |
| # ==================== UTILITY FUNCTIONS ==================== | |
| def sanitize_filename(filename: str) -> str: | |
| """Sanitize filename to prevent path traversal attacks.""" | |
| return os.path.basename(filename).replace('\\', '/') | |
| def get_file_extension(filename: str) -> str: | |
| """Extract file extension in a safe way.""" | |
| return Path(filename).suffix.lower() | |
| def detect_file_type(content: bytes, filename: str) -> str: | |
| """Detect file type using both magic numbers and extension.""" | |
| try: | |
| mime = magic.from_buffer(content[:2048], mime=True) | |
| return mime | |
| except Exception: | |
| ext = get_file_extension(filename) | |
| return f"extension/{ext}" | |
| def is_binary_file(content: bytes) -> bool: | |
| """Heuristic check if file is binary.""" | |
| if not content: | |
| return False | |
| if b'\x00' in content[:1024]: | |
| return True | |
| # Check if >30% of bytes are non-printable | |
| text_chars = bytearray({7,8,9,10,12,13,27} | set(range(0x20, 0x100)) - {0x7f}) | |
| sample = content[:1024] if len(content) >= 1024 else content | |
| if len(sample) == 0: | |
| return False | |
| try: | |
| non_text = sample.translate(None, text_chars) | |
| return float(len(non_text)) / len(sample) > 0.3 | |
| except: | |
| return False | |
| def truncate_content(content: str, max_length: int = 100000) -> str: | |
| """Truncate content if too long, keeping start and end.""" | |
| if len(content) <= max_length: | |
| return content | |
| half = max_length // 2 | |
| return content[:half] + f"\n\n[... TRUNCATED {len(content) - max_length} CHARACTERS ...]\n\n" + content[-half:] | |
| # ==================== EXTRACTION ENGINES ==================== | |
| def decode_text_safe(content: bytes, filename: str) -> str: | |
| """Tier 1: Universal text extraction with advanced encoding detection.""" | |
| try: | |
| # Try UTF-8 first (most common) | |
| try: | |
| decoded = content.decode('utf-8') | |
| if not is_binary_file(content): | |
| return format_text_content(decoded, filename, 'utf-8') | |
| except UnicodeDecodeError: | |
| pass | |
| # Try common encodings | |
| for encoding in ['utf-8-sig', 'latin-1', 'cp1252', 'ascii']: | |
| try: | |
| decoded = content.decode(encoding) | |
| if not is_binary_file(content): | |
| return format_text_content(decoded, filename, encoding) | |
| except UnicodeDecodeError: | |
| continue | |
| # Fallback to chardet | |
| try: | |
| detection = chardet.detect(content) | |
| encoding = detection['encoding'] or 'utf-8' | |
| decoded = content.decode(encoding, errors='replace') | |
| return format_text_content(decoded, filename, f"{encoding} (detected)") | |
| except: | |
| return f"\n--- BINARY/TEXT FILE: {filename} ---\n[Content appears to be binary or has unknown encoding]\n" | |
| except Exception as e: | |
| logger.error(f"Text extraction error for {filename}: {e}") | |
| return f"\n[Error extracting text from {filename}: {str(e)}]\n" | |
| def format_text_content(content: str, filename: str, encoding: str) -> str: | |
| """Format text content with metadata.""" | |
| content = truncate_content(content) | |
| return f""" | |
| --- TEXT FILE: {filename} --- | |
| Encoding: {encoding} | |
| Size: {len(content)} characters | |
| {content} | |
| --- END TEXT FILE --- | |
| """ | |
| # ==================== DOCUMENT EXTRACTION ==================== | |
| def extract_pdf(content: bytes, filename: str) -> str: | |
| """Advanced PDF extraction with OCR fallback.""" | |
| start_time = time.time() | |
| try: | |
| text_buffer = [] | |
| metadata_info = [] | |
| with fitz.open(stream=content, filetype="pdf") as doc: | |
| if doc.is_encrypted: | |
| try: | |
| doc.authenticate("") | |
| except: | |
| return f"\n[ENCRYPTED PDF: {filename} - Cannot extract content]\n" | |
| metadata = doc.metadata | |
| if metadata: | |
| metadata_info.append(f"Title: {metadata.get('title', 'N/A')}") | |
| metadata_info.append(f"Author: {metadata.get('author', 'N/A')}") | |
| metadata_info.append(f"Subject: {metadata.get('subject', 'N/A')}") | |
| metadata_info.append(f"Created: {metadata.get('creationDate', 'N/A')}") | |
| total_pages = len(doc) | |
| pages_extracted = 0 | |
| for i, page in enumerate(doc): | |
| if i >= Config.MAX_PDF_PAGES: | |
| text_buffer.append(f"\n[... Truncated at {Config.MAX_PDF_PAGES} pages from total {total_pages} ...]\n") | |
| break | |
| page_text = page.get_text("text") | |
| if page_text.strip(): | |
| text_buffer.append(f"\n--- Page {i+1} ---") | |
| text_buffer.append(page_text) | |
| pages_extracted += 1 | |
| full_text = "\n".join(text_buffer) | |
| if len(full_text.strip()) < 10 and Config.ENABLE_OCR: | |
| logger.info(f"PDF appears to be scanned, attempting OCR: {filename}") | |
| ocr_result = extract_text_from_image_pdf(content, filename) | |
| if ocr_result: | |
| elapsed = time.time() - start_time | |
| return f""" | |
| === PDF DOCUMENT (OCR): {filename} === | |
| Metadata: | |
| {chr(10).join(metadata_info)} | |
| Processing Time: {elapsed:.2f}s | |
| Pages: {pages_extracted}/{total_pages} | |
| {ocr_result} | |
| === END PDF === | |
| """ | |
| elapsed = time.time() - start_time | |
| return f""" | |
| === PDF DOCUMENT: {filename} === | |
| Metadata: | |
| {chr(10).join(metadata_info)} | |
| Extraction Time: {elapsed:.2f}s | |
| Pages: {pages_extracted}/{total_pages} | |
| {full_text} | |
| === END PDF === | |
| """ | |
| except Exception as e: | |
| logger.error(f"PDF extraction error for {filename}: {e}") | |
| return f"\n[Error parsing PDF {filename}: {str(e)}]\n" | |
| def extract_docx(content: bytes, filename: str) -> str: | |
| """Advanced DOCX extraction with tables.""" | |
| try: | |
| doc = docx.Document(io.BytesIO(content)) | |
| properties = [] | |
| if doc.core_properties.title: | |
| properties.append(f"Title: {doc.core_properties.title}") | |
| if doc.core_properties.author: | |
| properties.append(f"Author: {doc.core_properties.author}") | |
| if doc.core_properties.created: | |
| properties.append(f"Created: {doc.core_properties.created}") | |
| paragraphs = [] | |
| for para in doc.paragraphs: | |
| if para.text.strip(): | |
| paragraphs.append(para.text) | |
| tables_text = [] | |
| for i, table in enumerate(doc.tables): | |
| table_data = [] | |
| for row in table.rows: | |
| row_data = [cell.text for cell in row.cells] | |
| table_data.append(" | ".join(row_data)) | |
| if table_data: | |
| tables_text.append(f"\n--- Table {i+1} ---") | |
| tables_text.append("\n".join(table_data)) | |
| result = "\n".join(paragraphs) | |
| if tables_text: | |
| result += "\n" + "\n".join(tables_text) | |
| return f""" | |
| === WORD DOCUMENT: {filename} === | |
| Metadata: | |
| {chr(10).join(properties)} | |
| {result} | |
| === END DOCUMENT === | |
| """ | |
| except Exception as e: | |
| logger.error(f"DOCX extraction error for {filename}: {e}") | |
| return f"\n[Error parsing DOCX {filename}: {str(e)}]\n" | |
| def extract_pptx(content: bytes, filename: str) -> str: | |
| """Extract text from PowerPoint presentations.""" | |
| try: | |
| prs = pptx.Presentation(io.BytesIO(content)) | |
| text_slides = [] | |
| for i, slide in enumerate(prs.slides): | |
| slide_text = [] | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text") and shape.text: | |
| if shape.text.strip(): | |
| slide_text.append(shape.text) | |
| # Check for table text | |
| if shape.has_table: | |
| for row in shape.table.rows: | |
| for cell in row.cells: | |
| if cell.text.strip(): | |
| slide_text.append(cell.text) | |
| if slide_text: | |
| text_slides.append(f"\n--- Slide {i+1} ---") | |
| text_slides.extend(slide_text) | |
| return f""" | |
| === POWERPOINT: {filename} === | |
| Slides: {len(prs.slides)} | |
| {chr(10).join(text_slides)} | |
| === END POWERPOINT === | |
| """ | |
| except Exception as e: | |
| logger.error(f"PPTX extraction error for {filename}: {e}") | |
| return f"\n[Error parsing PPTX {filename}: {str(e)}]\n" | |
| def extract_excel(content: bytes, filename: str) -> str: | |
| """Extract data from Excel files.""" | |
| try: | |
| wb = openpyxl.load_workbook(io.BytesIO(content), read_only=True, data_only=True) | |
| sheets_data = [] | |
| for sheet_name in wb.sheetnames: | |
| sheet = wb[sheet_name] | |
| sheet_rows = [] | |
| max_rows = 100 | |
| for i, row in enumerate(sheet.iter_rows(values_only=True)): | |
| if i >= max_rows: | |
| break | |
| row_data = [str(cell) if cell is not None else "" for cell in row] | |
| sheet_rows.append(" | ".join(row_data)) | |
| if sheet_rows: | |
| sheets_data.append(f"\n--- Sheet: {sheet_name} ---") | |
| sheets_data.extend(sheet_rows) | |
| if len(sheet_rows) >= max_rows: | |
| sheets_data.append(f"[... Only first {max_rows} rows shown ...]") | |
| try: | |
| df = pd.read_excel(io.BytesIO(content), engine='openpyxl') | |
| pandas_output = df.head(50).to_string(index=False, max_rows=50, max_colwidth=50) | |
| if pandas_output: | |
| sheets_data.append("\n--- Pandas Format (First 50 rows) ---") | |
| sheets_data.append(pandas_output) | |
| if len(df) > 50: | |
| sheets_data.append(f"[... {len(df) - 50} more rows truncated ...]") | |
| except Exception as pandas_error: | |
| logger.warning(f"Pandas extraction failed: {pandas_error}") | |
| return f""" | |
| === EXCEL FILE: {filename} === | |
| {chr(10).join(sheets_data)} | |
| === END EXCEL === | |
| """ | |
| except Exception as e: | |
| logger.error(f"Excel extraction error for {filename}: {e}") | |
| return f"\n[Error parsing Excel {filename}: {str(e)}]\n" | |
| # ==================== IMAGE EXTRACTION ==================== | |
| def extract_text_from_image_pdf(pdf_content: bytes, filename: str) -> Optional[str]: | |
| """Extract text from image-based PDF using OCR with pdf2image.""" | |
| if not Config.ENABLE_OCR: | |
| return None | |
| try: | |
| extracted_text = [] | |
| # Convert PDF to images with proper error handling | |
| images = convert_from_bytes( | |
| pdf_content, | |
| dpi=300, | |
| fmt='jpeg', | |
| thread_count=2, | |
| poppler_path=None # Will use system poppler | |
| ) | |
| logger.info(f"Converted {len(images)} pages from {filename} for OCR") | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: | |
| future_to_page = { | |
| executor.submit(perform_ocr_on_image, image, page_num): page_num | |
| for page_num, image in enumerate(images[:Config.MAX_PDF_PAGES]) | |
| } | |
| for future in concurrent.futures.as_completed(future_to_page, timeout=Config.TESSERACT_TIMEOUT): | |
| page_num = future_to_page[future] | |
| try: | |
| text = future.result(timeout=30) | |
| if text and text.strip(): | |
| extracted_text.append(f"\n--- Page {page_num + 1} (OCR) ---") | |
| extracted_text.append(text) | |
| logger.info(f"OCR completed for page {page_num + 1}") | |
| except Exception as e: | |
| logger.warning(f"OCR failed for page {page_num + 1}: {e}") | |
| continue | |
| if extracted_text: | |
| return "\n".join(extracted_text) | |
| else: | |
| return None | |
| except Exception as e: | |
| logger.error(f"PDF to image conversion or OCR failed for {filename}: {e}") | |
| return None | |
| def perform_ocr_on_image(image: Image.Image, page_num: int) -> str: | |
| """Perform OCR on a single image with proper configuration.""" | |
| try: | |
| # Resize if too large | |
| width, height = image.size | |
| total_pixels = width * height | |
| if total_pixels > Config.MAX_IMAGE_PIXELS: | |
| scale_factor = (Config.MAX_IMAGE_PIXELS / total_pixels) ** 0.5 | |
| new_width = int(width * scale_factor) | |
| new_height = int(height * scale_factor) | |
| image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| logger.info(f"Resized page {page_num + 1} from {width}x{height} to {new_width}x{new_height}") | |
| # Configure Tesseract | |
| custom_config = f'--oem 3 --psm 3 -l {Config.OCR_LANGUAGE}' | |
| # Perform OCR | |
| text = pytesseract.image_to_string(image, config=custom_config, timeout=30) | |
| return truncate_content(text.strip(), max_length=50000) | |
| except Exception as e: | |
| logger.error(f"OCR error on page {page_num + 1}: {e}") | |
| return "" | |
| def extract_image_ocr(content: bytes, filename: str) -> str: | |
| """Extract text from image files using OCR.""" | |
| if not Config.ENABLE_OCR: | |
| return f"\n[IMAGE FILE: {filename}]\n[Image extraction disabled]\n" | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=get_file_extension(filename)) as temp_img: | |
| temp_img.write(content) | |
| temp_img.flush() | |
| try: | |
| # Open and check image | |
| with Image.open(temp_img.name) as img: | |
| img = img.convert('RGB') # Ensure RGB mode | |
| # Resize if too large | |
| width, height = img.size | |
| total_pixels = width * height | |
| if total_pixels > Config.MAX_IMAGE_PIXELS: | |
| scale_factor = (Config.MAX_IMAGE_PIXELS / total_pixels) ** 0.5 | |
| new_size = (int(width * scale_factor), int(height * scale_factor)) | |
| img = img.resize(new_size, Image.Resampling.LANCZOS) | |
| # Perform OCR | |
| custom_config = f'--oem 3 --psm 3 -l {Config.OCR_LANGUAGE}' | |
| text = pytesseract.image_to_string(img, config=custom_config, timeout=30) | |
| if text.strip(): | |
| return f""" | |
| --- IMAGE FILE (OCR): {filename} --- | |
| Size: {img.size[0]}x{img.size[1]} pixels | |
| Format: {img.format} | |
| Extracted Text: | |
| {text.strip()} | |
| --- END IMAGE --- | |
| """ | |
| else: | |
| return f"\n[IMAGE FILE: {filename}]\n[No text detected in image]\n" | |
| finally: | |
| os.unlink(temp_img.name) | |
| except Exception as e: | |
| logger.error(f"Image OCR extraction error for {filename}: {e}") | |
| return f"\n[Error processing image {filename}: {str(e)}]\n" | |
| # ==================== ARCHIVE EXTRACTION ==================== | |
| def process_zip_archive(zip_bytes: bytes, zip_name: str, depth: int = 0) -> Tuple[str, int]: | |
| """Recursive ZIP extraction with safety limits.""" | |
| if depth > Config.MAX_ZIP_DEPTH: | |
| return f"\n[ZIP Depth Limit Reached: {zip_name}]\n", 0 | |
| output_log = f"\n>>> ZIP ARCHIVE: {zip_name} (Depth {depth}) <<<\n" | |
| file_count = 0 | |
| total_size = 0 | |
| try: | |
| with zipfile.ZipFile(io.BytesIO(zip_bytes)) as z: | |
| file_list = [f for f in z.infolist() | |
| if not f.filename.startswith(('.', '__')) | |
| and not f.is_dir()] | |
| for zf in file_list: | |
| if file_count >= Config.MAX_FILES_IN_ZIP: | |
| output_log += f"\n[... File limit reached: {Config.MAX_FILES_IN_ZIP} files ...]\n" | |
| break | |
| if zf.file_size == 0 or zf.file_size > (Config.MAX_FILE_SIZE_MB * 1024 * 1024): | |
| continue | |
| total_size += zf.file_size | |
| if total_size > (Config.MAX_TOTAL_SIZE_MB * 1024 * 1024): | |
| output_log += f"\n[... Total size limit reached: {Config.MAX_TOTAL_SIZE_MB}MB ...]\n" | |
| break | |
| try: | |
| with z.open(zf) as f: | |
| content = f.read() | |
| ext = get_file_extension(zf.filename) | |
| if ext in ['.zip']: | |
| nested_output, nested_count = process_zip_archive(content, zf.filename, depth + 1) | |
| output_log += nested_output | |
| file_count += nested_count | |
| else: | |
| output_log += process_file_bytes(zf.filename, content) | |
| file_count += 1 | |
| except Exception as e: | |
| logger.error(f"Error processing nested file {zf.filename}: {e}") | |
| output_log += f"\n[Error processing {zf.filename} inside {zip_name}]\n" | |
| continue | |
| except zipfile.BadZipFile: | |
| return f"\n[Error: Corrupt Zip Archive - {zip_name}]\n", 0 | |
| except Exception as e: | |
| logger.error(f"Zip processing error for {zip_name}: {e}") | |
| return f"\n[Zip Processing Error: {str(e)}]\n", 0 | |
| output_log += f"\n>>> END ZIP: {zip_name} ({file_count} files) <<<\n" | |
| return output_log, file_count | |
| def extract_tar_gz(content: bytes, filename: str) -> str: | |
| """Extract files from tar.gz archives.""" | |
| output_log = f"\n>>> TAR.GZ ARCHIVE: {filename} <<<\n" | |
| file_count = 0 | |
| try: | |
| # Determine compression mode | |
| if filename.endswith('.tar.gz') or filename.endswith('.tgz'): | |
| mode = 'r:gz' | |
| elif filename.endswith('.tar.bz2'): | |
| mode = 'r:bz2' | |
| elif filename.endswith('.tar.xz'): | |
| mode = 'r:xz' | |
| else: | |
| mode = 'r:' | |
| with tarfile.open(fileobj=io.BytesIO(content), mode=mode) as tar: | |
| members = [m for m in tar.getmembers() | |
| if m.isfile() | |
| and not m.name.startswith(('.', '__')) | |
| and m.size <= (Config.MAX_FILE_SIZE_MB * 1024 * 1024)] | |
| for member in members: | |
| if file_count >= Config.MAX_FILES_IN_ZIP: | |
| output_log += "\n[...Tar file limit reached...]\n" | |
| break | |
| try: | |
| f = tar.extractfile(member) | |
| if f: | |
| content = f.read() | |
| output_log += process_file_bytes(member.name, content) | |
| file_count += 1 | |
| except Exception as e: | |
| logger.error(f"Error extracting {member.name}: {e}") | |
| continue | |
| except Exception as e: | |
| logger.error(f"TAR extraction error for {filename}: {e}") | |
| return f"\n[Error processing TAR {filename}: {str(e)}]\n" | |
| output_log += f"\n>>> END TAR: {filename} ({file_count} files) <<<\n" | |
| return output_log | |
| # ==================== STRUCTURED DATA EXTRACTION ==================== | |
| def extract_json(content: bytes, filename: str) -> str: | |
| """Extract and format JSON files.""" | |
| try: | |
| json_obj = json.loads(content.decode('utf-8')) | |
| formatted = json.dumps(json_obj, indent=2, ensure_ascii=False) | |
| return f""" | |
| === JSON FILE: {filename} === | |
| {formatted} | |
| === END JSON === | |
| """ | |
| except Exception as e: | |
| logger.error(f"JSON parsing error for {filename}: {e}") | |
| return decode_text_safe(content, filename) | |
| def extract_xml(content: bytes, filename: str) -> str: | |
| """Extract readable text from XML files.""" | |
| try: | |
| root = ET.fromstring(content) | |
| def extract_text(element, depth=0): | |
| text_parts = [] | |
| indent = " " * depth | |
| text_parts.append(f"{indent}<{element.tag}>") | |
| if element.text and element.text.strip(): | |
| text_parts.append(f"{indent} {element.text.strip()}") | |
| for child in element: | |
| text_parts.extend(extract_text(child, depth + 1)) | |
| text_parts.append(f"{indent}</{element.tag}>") | |
| return text_parts | |
| extracted = extract_text(root) | |
| return f""" | |
| === XML FILE: {filename} === | |
| {chr(10).join(extracted)} | |
| === END XML === | |
| """ | |
| except Exception as e: | |
| logger.error(f"XML parsing error for {filename}: {e}") | |
| return decode_text_safe(content, filename) | |
| def extract_csv(content: bytes, filename: str) -> str: | |
| """Extract and format CSV files.""" | |
| try: | |
| df = pd.read_csv(io.BytesIO(content), encoding_errors='replace') | |
| output = df.head(100).to_string(index=False, max_rows=100, max_colwidth=50) | |
| row_count = len(df) | |
| result = f""" | |
| === CSV FILE: {filename} === | |
| Total Rows: {row_count} | |
| Columns: {', '.join(df.columns.astype(str))} | |
| First 100 Rows: | |
| {output} | |
| """ | |
| if row_count > 100: | |
| result += f"\n[... {row_count - 100} more rows truncated ...]\n" | |
| result += "\n=== END CSV ===\n" | |
| return result | |
| except Exception as e: | |
| logger.error(f"CSV parsing error for {filename}: {e}") | |
| return decode_text_safe(content, filename) | |
| # ==================== MAIN ROUTING LOGIC ==================== | |
| def process_file_bytes(filename: str, content: bytes) -> str: | |
| """Route files to appropriate extraction engines.""" | |
| start_time = time.time() | |
| safe_name = sanitize_filename(filename) | |
| content_size = len(content) | |
| ext = get_file_extension(safe_name) | |
| try: | |
| result = "" | |
| # Document files | |
| if ext == '.pdf': | |
| result = extract_pdf(content, safe_name) | |
| elif ext == '.docx': | |
| result = extract_docx(content, safe_name) | |
| elif ext == '.pptx': | |
| result = extract_pptx(content, safe_name) | |
| elif ext in ['.xlsx', '.xls']: | |
| result = extract_excel(content, safe_name) | |
| # Archive files | |
| elif ext == '.zip': | |
| archive_result, count = process_zip_archive(content, safe_name) | |
| result = archive_result | |
| elif ext in ['.tar', '.tar.gz', '.tgz', '.tar.bz2', '.tar.xz']: | |
| result = extract_tar_gz(content, safe_name) | |
| # Structured data | |
| elif ext == '.json': | |
| result = extract_json(content, safe_name) | |
| elif ext == '.xml': | |
| result = extract_xml(content, safe_name) | |
| elif ext == '.csv': | |
| result = extract_csv(content, safe_name) | |
| # Image files with OCR | |
| elif ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.tiff', '.tif']: | |
| result = extract_image_ocr(content, safe_name) | |
| # Code and text files | |
| elif ext in [ | |
| '.py', '.js', '.ts', '.tsx', '.jsx', '.vue', '.svelte', | |
| '.java', '.kt', '.scala', '.clj', '.cljs', '.cljc', | |
| '.c', '.cpp', '.h', '.hpp', '.cs', '.fs', '.vb', | |
| '.go', '.rs', '.swift', '.dart', '.php', '.rb', '.pl', | |
| '.lua', '.r', '.scm', '.hs', '.elm', '.ex', '.exs', | |
| '.html', '.htm', '.xhtml', '.css', '.scss', '.sass', '.less', | |
| '.yaml', '.yml', '.toml', '.ini', '.env', '.cfg', | |
| '.svg', '.sql', '.sh', '.bash', '.zsh', '.fish', | |
| '.ps1', '.bat', '.cmd', '.md', '.markdown', '.rst', | |
| '.txt', '.log', '.tsv' | |
| ]: | |
| result = decode_text_safe(content, safe_name) | |
| # Binary files | |
| elif ext in ['.exe', '.dll', '.so', '.dylib', '.bin', '.dat']: | |
| result = f"\n[BINARY FILE: {safe_name}]\nSize: {content_size} bytes\n[Binary content not extractable]\n" | |
| # Audio/Video files | |
| elif ext in ['.mp3', '.mp4', '.avi', '.mov', '.wav', '.flac', '.mkv', '.webm']: | |
| result = f"\n[MEDIA FILE: {safe_name}]\nSize: {content_size} bytes\n[Media content not extractable]\n" | |
| # Database files | |
| elif ext in ['.db', '.sqlite', '.sqlite3', '.mdb', '.accdb']: | |
| result = f"\n[DATABASE FILE: {safe_name}]\n[Database content not extractable for security reasons]\n" | |
| # Unknown file type | |
| else: | |
| file_type = detect_file_type(content, safe_name) | |
| if not is_binary_file(content): | |
| result = decode_text_safe(content, safe_name) | |
| else: | |
| result = f"\n[UNKNOWN FILE TYPE: {safe_name}]\nType: {file_type}\nSize: {content_size} bytes\n[Binary content not extractable]\n" | |
| elapsed = time.time() - start_time | |
| metrics["files_processed"] += 1 | |
| metrics["total_bytes"] += content_size | |
| logger.info(f"Extracted {safe_name} ({content_size} bytes) in {elapsed:.2f}s") | |
| return result | |
| except Exception as e: | |
| error_msg = f"Error processing {safe_name}: {str(e)}" | |
| logger.error(error_msg) | |
| metrics["errors"].append(error_msg) | |
| return f"\n[FATAL ERROR processing {safe_name}: {str(e)}]\n" | |
| async def process_file_async(file: UploadFile) -> str: | |
| """Process a single file asynchronously.""" | |
| loop = asyncio.get_event_loop() | |
| try: | |
| content = await file.read() | |
| safe_name = sanitize_filename(file.filename) | |
| if len(content) > (Config.MAX_FILE_SIZE_MB * 1024 * 1024): | |
| return f"\n[ERROR: {safe_name} exceeds {Config.MAX_FILE_SIZE_MB}MB limit]\n" | |
| result = await loop.run_in_executor(executor, process_file_bytes, safe_name, content) | |
| return result | |
| except Exception as e: | |
| error_msg = f"Async processing error for {file.filename}: {str(e)}" | |
| logger.error(error_msg) | |
| metrics["errors"].append(error_msg) | |
| return f"\n[ERROR processing {file.filename}: {str(e)}]\n" | |
| # ==================== API ENDPOINTS ==================== | |
| async def ingest_files(files: List[UploadFile] = File(...)): | |
| """Universal file ingestion endpoint with async processing.""" | |
| if not files: | |
| raise HTTPException(status_code=400, detail="No files provided") | |
| start_time = time.time() | |
| logger.info(f"Processing batch of {len(files)} files") | |
| tasks = [process_file_async(file) for file in files] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| combined_result = "" | |
| files_processed = 0 | |
| errors = [] | |
| total_size = 0 | |
| for i, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| error_msg = f"Error processing {files[i].filename}: {str(result)}" | |
| logger.error(error_msg) | |
| errors.append(error_msg) | |
| combined_result += f"\n[ERROR: {error_msg}]\n" | |
| else: | |
| combined_result += result | |
| files_processed += 1 | |
| try: | |
| if hasattr(files[i], 'size'): | |
| total_size += files[i].size | |
| except: | |
| pass | |
| elapsed = time.time() - start_time | |
| logger.info(f"Batch processed in {elapsed:.2f}s - {files_processed} files, {total_size} bytes") | |
| return { | |
| "status": "success", | |
| "extracted_text": combined_result, | |
| "files_processed": files_processed, | |
| "total_files": len(files), | |
| "processing_time": elapsed, | |
| "total_size_bytes": total_size, | |
| "errors": errors if errors else [] | |
| } | |
| import re # Ensure this is imported at the top of app.py | |
| async def interact_with_files( | |
| files: List[UploadFile] = File(...), | |
| x_user_id: str = Header(..., alias="X-User-ID"), | |
| x_chat_id: str = Header(..., alias="X-Chat-ID"), | |
| x_file_id: Optional[str] = Header(None, alias="X-File-ID") | |
| ): | |
| """ | |
| Process files and store them in vector DB with user session isolation. | |
| INCLUDES FIX: Strips metadata headers before DB storage to prevent AST Parser crashes. | |
| """ | |
| if not files: | |
| raise HTTPException(status_code=400, detail="No files provided") | |
| start_time = time.time() | |
| logger.info(f"π€ Processing {len(files)} files for user {x_user_id[:8]}...") | |
| # 1. Extract text from files (Async processing) | |
| tasks = [process_file_async(file) for file in files] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| combined_result = "" | |
| files_processed = 0 | |
| storage_errors = [] | |
| # Regex to strip the "Wrapper" headers (e.g., --- TEXT FILE: app.py ---) | |
| # Matches: Header -> Metadata Block -> Double Newline -> CONTENT -> Double Newline -> Footer | |
| wrapper_pattern = r"(?s)(?:---|===)\s+.*?(?:FILE|DOCUMENT).*?[-=]+\n.*?\n\n(.*?)\n\n(?:---|===) END" | |
| # 2. Process each file and store in vector DB | |
| for i, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| error_msg = f"Error processing {files[i].filename}: {str(result)}" | |
| logger.error(error_msg) | |
| combined_result += f"\n[ERROR: {error_msg}]\n" | |
| continue | |
| # Add to combined result (Keep headers for the User UI!) | |
| combined_result += result | |
| files_processed += 1 | |
| # 3. Prepare Clean Content for Vector DB | |
| filename = files[i].filename | |
| clean_text_for_db = result | |
| # Attempt to unwrap the content so the AST parser works | |
| match = re.search(wrapper_pattern, result) | |
| if match: | |
| # Found the "meat" of the file, use that | |
| clean_text_for_db = match.group(1) | |
| else: | |
| # Fallback: If regex misses (e.g. short file), use original but trim whitespace | |
| clean_text_for_db = result.strip() | |
| try: | |
| # Get vector DB instance | |
| from vector import vdb | |
| # 4. SYNC storage in vector DB using CLEAN TEXT | |
| # We pass the pure code (clean_text_for_db) but the real filename | |
| # This allows V3 to parse classes/functions correctly while linking them to the source file. | |
| storage_success = vdb.store_session_document( | |
| text=clean_text_for_db, | |
| filename=filename, | |
| user_id=x_user_id, | |
| chat_id=x_chat_id, | |
| file_id=x_file_id | |
| ) | |
| if not storage_success: | |
| error_msg = f"Vector storage failed for {filename}" | |
| logger.error(error_msg) | |
| storage_errors.append(error_msg) | |
| combined_result += f"\n[WARNING: Vector storage failed for {filename}]\n" | |
| else: | |
| logger.info(f"β Vector storage successful for {filename}") | |
| except Exception as e: | |
| error_msg = f"Vector DB error for {filename}: {str(e)}" | |
| logger.error(error_msg) | |
| storage_errors.append(error_msg) | |
| combined_result += f"\n[WARNING: {error_msg}]\n" | |
| elapsed = time.time() - start_time | |
| # 5. Return response | |
| response_data = { | |
| "status": "success", | |
| "extracted_text": combined_result, | |
| "files_processed": files_processed, | |
| "total_files": len(files), | |
| "processing_time": round(elapsed, 2), | |
| "vector_status": "stored_synchronously", | |
| "session_id": x_user_id, | |
| "storage_errors": storage_errors if storage_errors else [] | |
| } | |
| logger.info(f"β Interaction completed in {elapsed:.2f}s for user {x_user_id[:8]}") | |
| return response_data | |
| async def delete_specific_file_endpoint( | |
| file_id: str, # Expects ?file_id=... in the URL | |
| x_user_id: str = Header(..., alias="X-User-ID"), | |
| x_chat_id: str = Header(..., alias="X-Chat-ID") | |
| ): | |
| """ | |
| Surgical Deletion Endpoint: | |
| Removes ONLY the vector chunks associated with a specific file_id. | |
| """ | |
| from vector import vdb | |
| # Run in thread to prevent blocking the main event loop | |
| success = await asyncio.to_thread(vdb.delete_file, x_user_id, x_chat_id, file_id) | |
| if success: | |
| logger.info(f"ποΈ Deleted file {file_id} for user {x_user_id[:8]}") | |
| return {"status": "deleted", "file_id": file_id} | |
| else: | |
| # 404 indicates the file wasn't found (maybe already deleted or never existed) | |
| return JSONResponse( | |
| status_code=404, | |
| content={"status": "not_found", "message": "File ID not found in current session"} | |
| ) | |
| # Add debug endpoints for monitoring | |
| async def debug_vector_status(x_user_id: str = Header(..., alias="X-User-ID")): | |
| """Debug endpoint to check vector DB status""" | |
| from vector import vdb | |
| stats = vdb.get_user_stats(x_user_id) | |
| return { | |
| "user_id": x_user_id, | |
| "stats": stats, | |
| "index_status": { | |
| "total_vectors": vdb.index.ntotal, | |
| "total_metadata": len(vdb.metadata), | |
| "index_type": vdb.index.__class__.__name__ | |
| } | |
| } | |
| async def cleanup_vector_db( | |
| max_age_hours: int = 24, | |
| x_user_id: str = Header(..., alias="X-User-ID") | |
| ): | |
| """Clean up old session data""" | |
| from vector import vdb | |
| try: | |
| cleaned = vdb.cleanup_old_sessions(max_age_hours) | |
| return { | |
| "status": "success", | |
| "cleaned_vectors": cleaned, | |
| "max_age_hours": max_age_hours, | |
| "user_id": x_user_id | |
| } | |
| except Exception as e: | |
| logger.error(f"Cleanup failed: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def delete_specific_session( | |
| x_user_id: str = Header(..., alias="X-User-ID"), | |
| x_chat_id: str = Header(..., alias="X-Chat-ID") | |
| ): | |
| """Triggered when user clicks 'Delete Chat' in UI""" | |
| from vector import vdb | |
| # Run in thread to not block other users while rebuilding index | |
| success = await asyncio.to_thread(vdb.delete_session, x_user_id, x_chat_id) | |
| if success: | |
| return {"status": "deleted", "chat_id": x_chat_id} | |
| else: | |
| return {"status": "not_found", "message": "Session was already empty"} | |
| async def search_vector_db( | |
| payload: SearchRequest, | |
| x_user_id: str = Header(..., alias="X-User-ID"), | |
| x_chat_id: str = Header(..., alias="X-Chat-ID") | |
| ): | |
| """ | |
| Search within user's session data with proper JSON serialization. | |
| """ | |
| from vector import vdb | |
| logger.info(f"π Search request from user {x_user_id[:8]}: '{payload.query[:50]}...'") | |
| try: | |
| results = vdb.retrieve_session_context( | |
| query=payload.query, | |
| user_id=x_user_id, | |
| chat_id=x_chat_id, | |
| filter_type=payload.target, | |
| top_k=50, | |
| final_k=2 | |
| ) | |
| logger.info(f"β Search completed: {len(results)} results for user {x_user_id[:8]}") | |
| # MANUALLY serialize to handle numpy types | |
| def serialize(obj): | |
| if isinstance(obj, (np.integer, np.floating)): | |
| return float(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| elif isinstance(obj, dict): | |
| return {k: serialize(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [serialize(item) for item in obj] | |
| return obj | |
| serialized_results = serialize(results) | |
| # Use JSONResponse with custom encoder | |
| return JSONResponse( | |
| content={"results": serialized_results}, | |
| media_type="application/json" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Search failed: {e}") | |
| raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") | |
| async def sync_chat_history( | |
| background_tasks: BackgroundTasks, | |
| messages: List[Dict] = Body(...), | |
| x_user_id: str = Header(..., alias="X-User-ID"), # <--- 1. Catch the ID | |
| x_chat_id: str = Header(..., alias="X-Chat-ID") | |
| ): | |
| """ | |
| Syncs chat history for the specific user session. | |
| """ | |
| if not messages: | |
| return {"status": "ignored", "reason": "empty"} | |
| # Trigger Secure Storage | |
| background_tasks.add_task( | |
| vdb.store_chat_context, # <--- Renamed Function | |
| messages=messages, | |
| user_id=x_user_id, # <--- Pass the ID | |
| chat_id=x_chat_id, | |
| ) | |
| return {"status": "syncing_started"} | |
| async def ingest_single_file(file: UploadFile = File(...)): | |
| """Process a single file endpoint.""" | |
| start_time = time.time() | |
| result = await process_file_async(file) | |
| elapsed = time.time() - start_time | |
| logger.info(f"Single file processed in {elapsed:.2f}s") | |
| return { | |
| "status": "success", | |
| "extracted_text": result, | |
| "filename": file.filename, | |
| "processing_time": elapsed, | |
| "file_size": file.size | |
| } | |
| async def health_check(): | |
| """Comprehensive health check endpoint.""" | |
| return { | |
| "status": "active", | |
| "version": "1.0.0", | |
| "engine": "High-Performance Production Extractor", | |
| "config": { | |
| "max_file_size_mb": Config.MAX_FILE_SIZE_MB, | |
| "max_zip_depth": Config.MAX_ZIP_DEPTH, | |
| "max_files_in_zip": Config.MAX_FILES_IN_ZIP, | |
| "worker_threads": Config.WORKER_THREADS, | |
| "enable_ocr": Config.ENABLE_OCR | |
| }, | |
| "metrics": { | |
| "files_processed": metrics["files_processed"], | |
| "total_bytes_processed": metrics["total_bytes"], | |
| "error_count": len(metrics["errors"]) | |
| }, | |
| "supported_types": [ | |
| "Documents: .pdf, .docx, .pptx, .xlsx, .xls", | |
| "Code: 20+ programming languages", | |
| "Archives: .zip, .tar, .tar.gz, .tar.bz2", | |
| "Data: .json, .xml, .csv, .tsv", | |
| "Text: .txt, .md, .log, .ini, .yaml", | |
| "Images: .png, .jpg, .jpeg, .tiff (OCR)" | |
| ] | |
| } | |
| async def get_metrics(): | |
| """Get detailed performance metrics.""" | |
| avg_bytes = metrics["total_bytes"] / max(1, metrics["files_processed"]) if metrics["files_processed"] > 0 else 0 | |
| return { | |
| "status": "ok", | |
| "metrics": { | |
| **metrics, | |
| "average_bytes_per_file": round(avg_bytes, 2), | |
| "uptime_seconds": metrics["processing_time"], | |
| "latest_errors": metrics["errors"][-10:] if len(metrics["errors"]) > 10 else metrics["errors"] | |
| } | |
| } | |
| # ==================== STRUCTURED IMPORT ENDPOINTS ==================== | |
| def _compute_median_font_size(blocks: list) -> float: | |
| """Compute the median font size from all text spans β this is our 'body text' baseline.""" | |
| sizes = [] | |
| for block in blocks: | |
| if block.get("type") != 0: # type 0 = text block | |
| continue | |
| for line in block.get("lines", []): | |
| for span in line.get("spans", []): | |
| text = span.get("text", "").strip() | |
| if text: | |
| sizes.append(span.get("size", 12)) | |
| if not sizes: | |
| return 12.0 | |
| sizes.sort() | |
| mid = len(sizes) // 2 | |
| return sizes[mid] if len(sizes) % 2 == 1 else (sizes[mid - 1] + sizes[mid]) / 2 | |
| def _classify_heading(font_size: float, median: float, flags: int) -> str: | |
| """Classify a text block as heading or paragraph based on font size ratio to median.""" | |
| if median == 0: | |
| return "p" | |
| ratio = font_size / median | |
| is_bold = bool(flags & (1 << 4)) | |
| if ratio >= 1.6: | |
| return "h1" | |
| elif ratio >= 1.35: | |
| return "h2" | |
| elif ratio >= 1.15 or (ratio >= 1.08 and is_bold): | |
| return "h3" | |
| return "p" | |
| def _detect_list_prefix(text: str): | |
| """Detect list item prefixes. Returns (type, cleaned_text) or None.""" | |
| stripped = text.strip() | |
| # Bullet list: β’, β, β, β , β, -, * | |
| bullet_match = re.match(r'^[\u2022\u25cf\u25cb\u25a0\u2013\-\*]\s+(.+)', stripped) | |
| if bullet_match: | |
| return ("ul", bullet_match.group(1)) | |
| # Numbered list: 1., 2., (1), (a), i., etc. | |
| num_match = re.match(r'^(?:\d+[\.\)]\s+|[a-z][\.\)]\s+|[ivxlcdm]+[\.\)]\s+)(.+)', stripped, re.IGNORECASE) | |
| if num_match: | |
| return ("ol", num_match.group(1)) | |
| return None | |
| def _format_span_html(text: str, flags: int) -> str: | |
| """Wrap text in <strong>/<em> based on PyMuPDF span flags.""" | |
| if not text: | |
| return "" | |
| escaped = text.replace("&", "&").replace("<", "<").replace(">", ">") | |
| is_bold = bool(flags & (1 << 4)) | |
| is_italic = bool(flags & (1 << 1)) | |
| result = escaped | |
| if is_bold: | |
| result = f"<strong>{result}</strong>" | |
| if is_italic: | |
| result = f"<em>{result}</em>" | |
| return result | |
| # ββ Page number detection ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _PAGE_NUM_RE = re.compile( | |
| r'^\s*' | |
| r'(?:' | |
| r'\d{1,4}' # standalone number: 1, 23, 456 | |
| r'|[Pp]age\s+\d{1,4}' # Page 3, page 12 | |
| r'|\d{1,4}\s+of\s+\d{1,4}' # 3 of 10 | |
| r'|[Pp]age\s+\d{1,4}\s+of\s+\d+' # Page 3 of 10 | |
| r'|[-ββ]\s*\d{1,4}\s*[-ββ]' # - 3 -, β 12 β | |
| r')' | |
| r'\s*$' | |
| ) | |
| def _is_page_number(block: dict, page_height: float) -> bool: | |
| """Detect if a text block is a page number (header/footer region + matching pattern).""" | |
| if block.get("type") != 0: | |
| return False | |
| bbox = block.get("bbox", (0, 0, 0, 0)) | |
| # Block must be in top 8% or bottom 8% of the page | |
| margin = page_height * 0.08 | |
| in_header = bbox[1] < margin # y0 near top | |
| in_footer = bbox[3] > page_height - margin # y1 near bottom | |
| if not (in_header or in_footer): | |
| return False | |
| # Extract all text from the block | |
| text = "" | |
| for line in block.get("lines", []): | |
| for span in line.get("spans", []): | |
| text += span.get("text", "") | |
| text = text.strip() | |
| if not text: | |
| return False | |
| return bool(_PAGE_NUM_RE.match(text)) | |
| def _extract_table_html(table, page=None, text_flags=0) -> str: | |
| """Extract table to HTML with direct cell-level text extraction for accuracy. | |
| Instead of relying on table.extract() (which uses default flags internally), | |
| we extract text from each cell rect ourselves using page.get_text() with | |
| our custom flags. This ensures Hindi ligatures, whitespace, and special | |
| characters are preserved exactly as they appear in the PDF. | |
| """ | |
| try: | |
| # ββ Primary path: direct extraction from page ββ | |
| if page is not None and hasattr(table, 'rows'): | |
| rows_data = [] | |
| for row_obj in table.rows: | |
| row_cells = [] | |
| for cell_rect in row_obj.cells: | |
| if cell_rect is None: | |
| row_cells.append("") # Merged cell placeholder | |
| else: | |
| rect = fitz.Rect(cell_rect) | |
| text = page.get_text("text", clip=rect, flags=text_flags, sort=True).strip() | |
| row_cells.append(text) | |
| rows_data.append(row_cells) | |
| else: | |
| # Fallback: use table.extract() if page not available | |
| raw = table.extract() | |
| if not raw: | |
| return "" | |
| rows_data = [[(c or "") for c in row] for row in raw] | |
| except Exception as e: | |
| logger.warning(f"Table extraction failed: {e}") | |
| return "" | |
| if not rows_data: | |
| return "" | |
| # Drop rows where every cell is empty | |
| rows_data = [r for r in rows_data if any(c.strip() for c in r)] | |
| if not rows_data: | |
| return "" | |
| html = '<table><tbody>\n' | |
| for i, row in enumerate(rows_data): | |
| tag = "th" if i == 0 else "td" | |
| html += " <tr>" | |
| for cell_text in row: | |
| escaped = cell_text.strip() | |
| escaped = escaped.replace("&", "&").replace("<", "<").replace(">", ">") | |
| escaped = escaped.replace("\n", "<br>") | |
| html += f"<{tag}>{escaped}</{tag}>" | |
| html += "</tr>\n" | |
| html += "</tbody></table>\n" | |
| return html | |
| def extract_pdf_to_html(content: bytes) -> dict: | |
| """ | |
| Convert a searchable PDF to structured HTML using PyMuPDF dict-mode extraction. | |
| Pipeline: | |
| 1. Extract all text blocks with font metadata via page.get_text("dict") | |
| 2. Extract tables via page.find_tables() | |
| 3. Compute median font size as body-text baseline | |
| 4. Classify blocks as headings (h1-h3) or paragraphs based on font size ratio | |
| 5. Detect bold/italic from span flags | |
| 6. Detect list patterns from line prefixes | |
| 7. Assemble clean HTML ready for TipTap editor | |
| """ | |
| start_time = time.time() | |
| with fitz.open(stream=content, filetype="pdf") as doc: | |
| if doc.is_encrypted: | |
| try: | |
| doc.authenticate("") | |
| except: | |
| return {"html": "<p>This PDF is encrypted and cannot be imported.</p>", "title": "Encrypted PDF", "pages": 0} | |
| # Extract title from metadata | |
| metadata = doc.metadata or {} | |
| title = metadata.get("title", "").strip() or "Imported PDF" | |
| total_pages = len(doc) | |
| # First pass: collect all blocks from all pages to compute global median font size | |
| all_page_data = [] | |
| all_blocks_flat = [] | |
| # Text extraction flags β preserve whitespace AND ligatures (critical for Hindi/Devanagari) | |
| # This same flags value is passed to _extract_table_html for direct cell extraction | |
| text_flags = fitz.TEXT_PRESERVE_WHITESPACE | fitz.TEXT_PRESERVE_LIGATURES | |
| for page in doc: | |
| try: | |
| page_dict = page.get_text("dict", flags=text_flags) | |
| blocks = page_dict.get("blocks", []) | |
| except Exception as e: | |
| logger.warning(f"Skipping corrupt page: {e}") | |
| blocks = [] | |
| # Extract tables for this page (if PyMuPDF version supports it) | |
| page_tables = [] | |
| try: | |
| tables = page.find_tables() | |
| if tables and tables.tables: | |
| page_tables = tables.tables | |
| except (AttributeError, Exception): | |
| pass # Older PyMuPDF version without find_tables() | |
| # Get table bounding boxes to exclude table text from block processing | |
| table_rects = [] | |
| for t in page_tables: | |
| try: | |
| table_rects.append(fitz.Rect(t.bbox)) | |
| except: | |
| pass | |
| all_page_data.append({ | |
| "blocks": blocks, | |
| "tables": page_tables, | |
| "table_rects": table_rects, | |
| "page_height": page.rect.height, | |
| }) | |
| all_blocks_flat.extend(blocks) | |
| median_size = _compute_median_font_size(all_blocks_flat) | |
| # Second pass: convert blocks to HTML | |
| html_parts = [] | |
| for page_idx, page_data in enumerate(all_page_data): | |
| blocks = page_data["blocks"] | |
| tables = page_data["tables"] | |
| table_rects = page_data["table_rects"] | |
| page_height = page_data["page_height"] | |
| page_obj = doc[page_idx] # re-access page for direct cell text extraction | |
| # Track which tables we've already inserted | |
| tables_inserted = set() | |
| for block in blocks: | |
| if block.get("type") != 0: # Skip image blocks | |
| continue | |
| # Skip page numbers (headers/footers like "Page 3", "- 5 -", etc.) | |
| if _is_page_number(block, page_height): | |
| continue | |
| block_bbox = fitz.Rect(block.get("bbox", (0, 0, 0, 0))) | |
| # Check if this block overlaps with any table region | |
| is_in_table = False | |
| for t_idx, t_rect in enumerate(table_rects): | |
| if block_bbox.intersects(t_rect): | |
| is_in_table = True | |
| if t_idx not in tables_inserted: | |
| tables_inserted.add(t_idx) | |
| html_parts.append(_extract_table_html(tables[t_idx], page_obj, text_flags)) | |
| break | |
| if is_in_table: | |
| continue | |
| # Process all lines in this block together | |
| lines = block.get("lines", []) | |
| if not lines: | |
| continue | |
| # Get dominant font size and flags for the block (from first substantial span) | |
| dominant_size = median_size | |
| dominant_flags = 0 | |
| for line in lines: | |
| for span in line.get("spans", []): | |
| if span.get("text", "").strip(): | |
| dominant_size = span.get("size", median_size) | |
| dominant_flags = span.get("flags", 0) | |
| break | |
| else: | |
| continue | |
| break | |
| # Determine the HTML tag | |
| tag = _classify_heading(dominant_size, median_size, dominant_flags) | |
| # Build the inner HTML from all spans | |
| block_html_parts = [] | |
| for line in lines: | |
| line_parts = [] | |
| for span in line.get("spans", []): | |
| text = span.get("text", "") | |
| if not text: | |
| continue | |
| flags = span.get("flags", 0) | |
| # For headings, don't double-wrap in bold if heading is already implied | |
| if tag.startswith("h") and bool(flags & (1 << 4)): | |
| formatted = text.replace("&", "&").replace("<", "<").replace(">", ">") | |
| if bool(flags & (1 << 1)): # Still apply italic | |
| formatted = f"<em>{formatted}</em>" | |
| else: | |
| formatted = _format_span_html(text, flags) | |
| line_parts.append(formatted) | |
| if line_parts: | |
| block_html_parts.append("".join(line_parts)) | |
| if not block_html_parts: | |
| continue | |
| full_text = " ".join(block_html_parts) | |
| clean_text = re.sub(r'<[^>]+>', '', full_text).strip() | |
| if not clean_text: | |
| continue | |
| # Check for list items | |
| if tag == "p": | |
| # Check each line for list patterns | |
| list_items = [] | |
| list_type = None | |
| is_list = True | |
| for line_html in block_html_parts: | |
| plain = re.sub(r'<[^>]+>', '', line_html).strip() | |
| result = _detect_list_prefix(plain) | |
| if result: | |
| lt, cleaned = result | |
| if list_type is None: | |
| list_type = lt | |
| elif lt != list_type: | |
| is_list = False | |
| break | |
| # Replace the plain text prefix in the HTML | |
| list_items.append(f"<li>{cleaned}</li>") | |
| else: | |
| is_list = False | |
| break | |
| if is_list and list_items and list_type: | |
| list_tag = list_type | |
| html_parts.append(f"<{list_tag}>{''.join(list_items)}</{list_tag}>") | |
| continue | |
| html_parts.append(f"<{tag}>{full_text}</{tag}>") | |
| # Insert any remaining tables that weren't matched to text blocks | |
| for t_idx, table in enumerate(tables): | |
| if t_idx not in tables_inserted: | |
| html_parts.append(_extract_table_html(table, page_obj, text_flags)) | |
| # Page separator (not after the last page) | |
| if page_idx < len(all_page_data) - 1: | |
| html_parts.append("<hr>") | |
| elapsed = time.time() - start_time | |
| final_html = "\n".join(html_parts) | |
| if not final_html.strip(): | |
| final_html = "<p>No readable text found. The PDF may be scanned or image-only.</p>" | |
| logger.info(f"PDFβHTML conversion: {total_pages} pages in {elapsed:.2f}s, {len(final_html)} chars") | |
| return { | |
| "html": final_html, | |
| "title": title, | |
| "pages": total_pages, | |
| "processing_time": round(elapsed, 2), | |
| } | |
| def extract_docx_to_html(content: bytes) -> dict: | |
| """ | |
| Convert a DOCX file to structured HTML using python-docx. | |
| Preserves headings, bold, italic, underline, lists, and tables. | |
| """ | |
| start_time = time.time() | |
| doc = docx.Document(io.BytesIO(content)) | |
| title = doc.core_properties.title or "Imported Document" | |
| html_parts = [] | |
| for para in doc.paragraphs: | |
| if not para.text.strip(): | |
| continue | |
| # Determine tag from paragraph style | |
| style_name = (para.style.name or "").lower() | |
| if "heading 1" in style_name: | |
| tag = "h1" | |
| elif "heading 2" in style_name: | |
| tag = "h2" | |
| elif "heading 3" in style_name: | |
| tag = "h3" | |
| elif "heading 4" in style_name: | |
| tag = "h4" | |
| elif "list" in style_name and "bullet" in style_name: | |
| # Collect as list item β simplified | |
| run_html = _docx_runs_to_html(para.runs) | |
| html_parts.append(f"<ul><li>{run_html}</li></ul>") | |
| continue | |
| elif "list" in style_name: | |
| run_html = _docx_runs_to_html(para.runs) | |
| html_parts.append(f"<ol><li>{run_html}</li></ol>") | |
| continue | |
| else: | |
| tag = "p" | |
| run_html = _docx_runs_to_html(para.runs) | |
| if run_html.strip(): | |
| html_parts.append(f"<{tag}>{run_html}</{tag}>") | |
| # Extract tables | |
| for table in doc.tables: | |
| html_parts.append("<table><tbody>") | |
| for i, row in enumerate(table.rows): | |
| cell_tag = "th" if i == 0 else "td" | |
| html_parts.append(" <tr>") | |
| for cell in row.cells: | |
| cell_text = cell.text.strip().replace("&", "&").replace("<", "<").replace(">", ">") | |
| html_parts.append(f" <{cell_tag}>{cell_text}</{cell_tag}>") | |
| html_parts.append(" </tr>") | |
| html_parts.append("</tbody></table>") | |
| elapsed = time.time() - start_time | |
| return { | |
| "html": "\n".join(html_parts), | |
| "title": title, | |
| "processing_time": round(elapsed, 2), | |
| } | |
| def _docx_runs_to_html(runs) -> str: | |
| """Convert DOCX paragraph runs to HTML with inline formatting.""" | |
| parts = [] | |
| for run in runs: | |
| text = run.text | |
| if not text: | |
| continue | |
| escaped = text.replace("&", "&").replace("<", "<").replace(">", ">") | |
| if run.bold: | |
| escaped = f"<strong>{escaped}</strong>" | |
| if run.italic: | |
| escaped = f"<em>{escaped}</em>" | |
| if run.underline: | |
| escaped = f"<u>{escaped}</u>" | |
| parts.append(escaped) | |
| return "".join(parts) | |
| def extract_pptx_to_html(content: bytes) -> dict: | |
| """ | |
| Convert a PPTX file to structured HTML. | |
| Each slide becomes a section with its text and tables. | |
| """ | |
| start_time = time.time() | |
| prs = pptx.Presentation(io.BytesIO(content)) | |
| html_parts = [] | |
| for i, slide in enumerate(prs.slides): | |
| slide_parts = [] | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text_frame"): | |
| for para in shape.text_frame.paragraphs: | |
| # Build HTML from runs to preserve bold/italic | |
| run_parts = [] | |
| for run in para.runs: | |
| t = run.text | |
| if not t: | |
| continue | |
| t = t.replace("&", "&").replace("<", "<").replace(">", ">") | |
| if run.font.bold: | |
| t = f"<strong>{t}</strong>" | |
| if run.font.italic: | |
| t = f"<em>{t}</em>" | |
| run_parts.append(t) | |
| text = "".join(run_parts) | |
| if not text.strip(): | |
| continue | |
| level = para.level | |
| if level == 0 and not slide_parts: | |
| slide_parts.append(f"<h2>{text}</h2>") | |
| elif level == 0: | |
| slide_parts.append(f"<p>{text}</p>") | |
| else: | |
| slide_parts.append(f"<ul><li>{text}</li></ul>") | |
| if shape.has_table: | |
| table_html = "<table><tbody>" | |
| for r_idx, row in enumerate(shape.table.rows): | |
| cell_tag = "th" if r_idx == 0 else "td" | |
| table_html += "<tr>" | |
| for cell in row.cells: | |
| cell_text = cell.text.strip().replace("&", "&").replace("<", "<").replace(">", ">") | |
| table_html += f"<{cell_tag}>{cell_text}</{cell_tag}>" | |
| table_html += "</tr>" | |
| table_html += "</tbody></table>" | |
| slide_parts.append(table_html) | |
| if slide_parts: | |
| html_parts.append(f"<!-- Slide {i+1} -->") | |
| html_parts.extend(slide_parts) | |
| if i < len(prs.slides) - 1: | |
| html_parts.append("<hr>") | |
| elapsed = time.time() - start_time | |
| return { | |
| "html": "\n".join(html_parts), | |
| "title": "Imported Presentation", | |
| "slides": len(prs.slides), | |
| "processing_time": round(elapsed, 2), | |
| } | |
| # ββ Import Endpoints βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def pdf_to_html_endpoint(file: UploadFile = File(...)): | |
| """ | |
| Convert a searchable PDF to structured HTML with formatting preservation. | |
| Returns { html, title, pages, processing_time }. | |
| """ | |
| if not file.filename.lower().endswith('.pdf'): | |
| raise HTTPException(status_code=400, detail="Only PDF files are accepted") | |
| content = await file.read() | |
| if len(content) > Config.MAX_FILE_SIZE_MB * 1024 * 1024: | |
| raise HTTPException(status_code=413, detail=f"File exceeds {Config.MAX_FILE_SIZE_MB}MB limit") | |
| loop = asyncio.get_event_loop() | |
| try: | |
| result = await loop.run_in_executor(executor, extract_pdf_to_html, content) | |
| return result | |
| except Exception as e: | |
| logger.error(f"PDF-to-HTML conversion failed: {e}") | |
| raise HTTPException(status_code=500, detail=f"Conversion failed: {str(e)}") | |
| async def docx_to_html_endpoint(file: UploadFile = File(...)): | |
| """ | |
| Convert a DOCX file to structured HTML with formatting preservation. | |
| Returns { html, title, processing_time }. | |
| """ | |
| if not file.filename.lower().endswith('.docx'): | |
| raise HTTPException(status_code=400, detail="Only DOCX files are accepted") | |
| content = await file.read() | |
| if len(content) > Config.MAX_FILE_SIZE_MB * 1024 * 1024: | |
| raise HTTPException(status_code=413, detail=f"File exceeds {Config.MAX_FILE_SIZE_MB}MB limit") | |
| loop = asyncio.get_event_loop() | |
| try: | |
| result = await loop.run_in_executor(executor, extract_docx_to_html, content) | |
| return result | |
| except Exception as e: | |
| logger.error(f"DOCX-to-HTML conversion failed: {e}") | |
| raise HTTPException(status_code=500, detail=f"Conversion failed: {str(e)}") | |
| async def pptx_to_html_endpoint(file: UploadFile = File(...)): | |
| """ | |
| Convert a PPTX file to structured HTML. | |
| Returns { html, title, slides, processing_time }. | |
| """ | |
| if not file.filename.lower().endswith('.pptx'): | |
| raise HTTPException(status_code=400, detail="Only PPTX files are accepted") | |
| content = await file.read() | |
| if len(content) > Config.MAX_FILE_SIZE_MB * 1024 * 1024: | |
| raise HTTPException(status_code=413, detail=f"File exceeds {Config.MAX_FILE_SIZE_MB}MB limit") | |
| loop = asyncio.get_event_loop() | |
| try: | |
| result = await loop.run_in_executor(executor, extract_pptx_to_html, content) | |
| return result | |
| except Exception as e: | |
| logger.error(f"PPTX-to-HTML conversion failed: {e}") | |
| raise HTTPException(status_code=500, detail=f"Conversion failed: {str(e)}") | |
| # ==================== MAIN ==================== | |
| if __name__ == "__main__": | |
| import sys | |
| port = int(os.getenv("PORT", 7860)) | |
| workers = int(os.getenv("WORKERS", 1)) | |
| host = os.getenv("HOST", "0.0.0.0") | |
| logger.info(f"Starting NeuralStream Production Extractor on {host}:{port}") | |
| logger.info(f"Worker processes: {workers}") | |
| logger.info(f"File size limit: {Config.MAX_FILE_SIZE_MB}MB") | |
| logger.info(f"ZIP processing depth: {Config.MAX_ZIP_DEPTH}") | |
| logger.info(f"OCR Enabled: {Config.ENABLE_OCR}") | |
| logger.info(f"OCR Language: {Config.OCR_LANGUAGE}") | |
| logger.info(f"Supported file types: 50+ formats") | |
| if '--dev' in sys.argv: | |
| uvicorn.run("app:app", host="127.0.0.1", port=port, reload=True) | |
| else: | |
| uvicorn.run( | |
| "app:app", | |
| host=host, | |
| port=port, | |
| workers=workers, | |
| log_level="info", | |
| access_log=True, | |
| loop="asyncio" | |
| ) | |