|
|
|
|
|
|
|
|
import os |
|
|
import json |
|
|
import uuid |
|
|
import logging |
|
|
import uvicorn |
|
|
import gc |
|
|
from typing import List, Dict, Any, Optional |
|
|
from pathlib import Path |
|
|
from fastapi import FastAPI, UploadFile, File, HTTPException |
|
|
|
|
|
|
|
|
import fitz |
|
|
import pdfplumber |
|
|
import mammoth |
|
|
import email |
|
|
import email.policy |
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class DocumentChunk: |
|
|
"""Simple data class for document chunks""" |
|
|
def __init__(self, content: str, metadata: Dict[str, Any], chunk_id: str): |
|
|
self.content = content |
|
|
self.metadata = metadata |
|
|
self.chunk_id = chunk_id |
|
|
|
|
|
def to_dict(self): |
|
|
return { |
|
|
"content": self.content, |
|
|
"metadata": self.metadata, |
|
|
"chunk_id": self.chunk_id |
|
|
} |
|
|
|
|
|
class FastDocumentParserService: |
|
|
"""Ultra-fast document parsing service""" |
|
|
|
|
|
def __init__(self): |
|
|
self.chunk_size = 2000 |
|
|
self.chunk_overlap = 200 |
|
|
self.max_chunks = 500 |
|
|
self.table_row_limit = 20 |
|
|
logger.info("FastDocumentParserService initialized with speed optimizations") |
|
|
|
|
|
def fast_text_split(self, text: str, source: str) -> List[str]: |
|
|
"""Super fast text splitting with hard limits""" |
|
|
if not text or len(text) < 100: |
|
|
return [text] if text else [] |
|
|
|
|
|
|
|
|
if len(text) <= self.chunk_size: |
|
|
return [text] |
|
|
|
|
|
chunks = [] |
|
|
start = 0 |
|
|
chunk_count = 0 |
|
|
|
|
|
while start < len(text) and chunk_count < self.max_chunks: |
|
|
end = min(start + self.chunk_size, len(text)) |
|
|
|
|
|
|
|
|
if end < len(text): |
|
|
|
|
|
search_start = max(start, end - 200) |
|
|
period_pos = text.rfind('.', search_start, end) |
|
|
if period_pos > search_start: |
|
|
end = period_pos + 1 |
|
|
|
|
|
chunk = text[start:end].strip() |
|
|
if chunk: |
|
|
chunks.append(chunk) |
|
|
chunk_count += 1 |
|
|
|
|
|
start = end - self.chunk_overlap |
|
|
|
|
|
|
|
|
if start <= 0: |
|
|
start = end |
|
|
|
|
|
logger.info(f"Split {source} into {len(chunks)} chunks (limit: {self.max_chunks})") |
|
|
return chunks[:self.max_chunks] |
|
|
|
|
|
def extract_tables_fast(self, file_path: str) -> str: |
|
|
"""Fast table extraction with smart limits""" |
|
|
table_text = "" |
|
|
table_count = 0 |
|
|
max_tables = 25 |
|
|
|
|
|
try: |
|
|
with pdfplumber.open(file_path) as pdf: |
|
|
total_pages = len(pdf.pages) |
|
|
|
|
|
|
|
|
if total_pages <= 20: |
|
|
step = 1 |
|
|
elif total_pages <= 40: |
|
|
step = 2 |
|
|
else: |
|
|
step = 3 |
|
|
|
|
|
pages_to_process = list(range(0, min(total_pages, 50), step)) |
|
|
|
|
|
logger.info(f"📊 Smart table scan: processing {len(pages_to_process)} of {total_pages} pages (step={step})") |
|
|
|
|
|
for page_num in pages_to_process: |
|
|
if table_count >= max_tables: |
|
|
break |
|
|
|
|
|
page = pdf.pages[page_num] |
|
|
tables = page.find_tables() |
|
|
|
|
|
for table_idx, table in enumerate(tables): |
|
|
if table_count >= max_tables: |
|
|
break |
|
|
|
|
|
try: |
|
|
table_data = table.extract() |
|
|
if table_data and len(table_data) >= 2: |
|
|
|
|
|
limited_data = table_data[:min(30, len(table_data))] |
|
|
|
|
|
|
|
|
if len(limited_data[0]) <= 6: |
|
|
header = " | ".join(str(cell or "").strip()[:60] for cell in limited_data[0]) |
|
|
separator = " | ".join(["---"] * len(limited_data[0])) |
|
|
|
|
|
rows = [] |
|
|
for row in limited_data[1:]: |
|
|
|
|
|
padded_row = list(row) + [None] * (len(limited_data[0]) - len(row)) |
|
|
row_str = " | ".join(str(cell or "").strip()[:60] for cell in padded_row) |
|
|
rows.append(row_str) |
|
|
|
|
|
table_md = f"\n**TABLE {table_count + 1} - Page {page_num + 1}**\n" |
|
|
table_md += f"*{len(limited_data)} rows × {len(limited_data[0])} columns*\n\n" |
|
|
table_md += f"| {header} |\n| {separator} |\n" |
|
|
for row in rows: |
|
|
table_md += f"| {row} |\n" |
|
|
table_md += "\n" |
|
|
|
|
|
table_text += table_md |
|
|
table_count += 1 |
|
|
logger.info(f"⚡ Table {table_count}: {len(limited_data)}×{len(limited_data[0])} from page {page_num + 1}") |
|
|
else: |
|
|
logger.info(f"⚠️ Skipped wide table ({len(limited_data[0])} cols) on page {page_num + 1}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"⚠️ Skip table on page {page_num + 1}: {e}") |
|
|
|
|
|
logger.info(f"🎯 Extracted {table_count} tables in fast mode") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Fast table extraction failed: {e}") |
|
|
|
|
|
return table_text |
|
|
|
|
|
def process_pdf_ultrafast(self, file_path: str) -> List[DocumentChunk]: |
|
|
"""Ultra-fast PDF processing - under 1 minute target""" |
|
|
logger.info(f"🚀 ULTRA-FAST PDF processing: {os.path.basename(file_path)}") |
|
|
start_time = __import__('time').time() |
|
|
|
|
|
chunks = [] |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info("📊 Fast table extraction...") |
|
|
table_content = self.extract_tables_fast(file_path) |
|
|
|
|
|
|
|
|
logger.info("📄 Fast text extraction with PyMuPDF...") |
|
|
doc = fitz.open(file_path) |
|
|
|
|
|
full_text = "" |
|
|
total_pages = len(doc) |
|
|
|
|
|
|
|
|
if total_pages > 40: |
|
|
|
|
|
pages_to_process = list(range(0, min(total_pages, 60), 2)) |
|
|
logger.info(f"📑 Large document: processing {len(pages_to_process)} of {total_pages} pages") |
|
|
else: |
|
|
pages_to_process = list(range(total_pages)) |
|
|
|
|
|
for page_num in pages_to_process: |
|
|
try: |
|
|
page = doc[page_num] |
|
|
page_text = page.get_text() |
|
|
|
|
|
|
|
|
page_text = page_text.strip() |
|
|
if len(page_text) > 10000: |
|
|
page_text = page_text[:10000] + f"\n[Page {page_num + 1} truncated for speed]" |
|
|
|
|
|
full_text += f"\n\n--- Page {page_num + 1} ---\n{page_text}" |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"⚠️ Error processing page {page_num + 1}: {e}") |
|
|
|
|
|
doc.close() |
|
|
|
|
|
|
|
|
if table_content: |
|
|
full_text += f"\n\n{'='*50}\nEXTRACTED TABLES\n{'='*50}\n{table_content}" |
|
|
|
|
|
|
|
|
logger.info("📦 Creating chunks...") |
|
|
text_chunks = self.fast_text_split(full_text, os.path.basename(file_path)) |
|
|
|
|
|
|
|
|
for idx, chunk_text in enumerate(text_chunks): |
|
|
has_tables = "**TABLE" in chunk_text or "EXTRACTED TABLES" in chunk_text |
|
|
|
|
|
chunks.append(DocumentChunk( |
|
|
content=chunk_text, |
|
|
metadata={ |
|
|
"source": os.path.basename(file_path), |
|
|
"chunk_index": idx, |
|
|
"document_type": "pdf_ultrafast", |
|
|
"has_tables": has_tables, |
|
|
"total_pages": total_pages, |
|
|
"pages_processed": len(pages_to_process), |
|
|
"processing_method": "ultrafast_pymupdf" |
|
|
}, |
|
|
chunk_id=str(uuid.uuid4()) |
|
|
)) |
|
|
|
|
|
elapsed = __import__('time').time() - start_time |
|
|
logger.info(f"✅ ULTRA-FAST processing complete in {elapsed:.2f}s: {len(chunks)} chunks") |
|
|
|
|
|
if elapsed > 90: |
|
|
logger.warning(f"⚠️ Processing took {elapsed:.2f}s - consider reducing document size") |
|
|
|
|
|
return chunks |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Ultra-fast processing failed: {e}") |
|
|
return self._emergency_fallback(file_path) |
|
|
|
|
|
def _emergency_fallback(self, file_path: str) -> List[DocumentChunk]: |
|
|
"""Emergency fallback - text only, no tables""" |
|
|
logger.info("🆘 Emergency fallback: text-only extraction") |
|
|
|
|
|
try: |
|
|
doc = fitz.open(file_path) |
|
|
|
|
|
|
|
|
max_pages = min(10, len(doc)) |
|
|
text_parts = [] |
|
|
|
|
|
for page_num in range(max_pages): |
|
|
page = doc[page_num] |
|
|
page_text = page.get_text() |
|
|
if len(page_text) > 5000: |
|
|
page_text = page_text[:5000] + f"\n[Page {page_num + 1} truncated]" |
|
|
text_parts.append(f"Page {page_num + 1}:\n{page_text}") |
|
|
|
|
|
doc.close() |
|
|
|
|
|
full_text = "\n\n".join(text_parts) |
|
|
chunks = [] |
|
|
|
|
|
|
|
|
chunk_size = len(full_text) // 10 + 1 |
|
|
for i in range(0, len(full_text), chunk_size): |
|
|
chunk_text = full_text[i:i + chunk_size] |
|
|
chunks.append(DocumentChunk( |
|
|
content=chunk_text, |
|
|
metadata={ |
|
|
"source": os.path.basename(file_path), |
|
|
"chunk_index": len(chunks), |
|
|
"document_type": "pdf_emergency_fallback", |
|
|
"has_tables": False, |
|
|
"pages_processed": max_pages |
|
|
}, |
|
|
chunk_id=str(uuid.uuid4()) |
|
|
)) |
|
|
|
|
|
return chunks |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Emergency fallback failed: {e}") |
|
|
raise Exception("All processing methods failed") |
|
|
|
|
|
def process_word_doc_fast(self, file_path: str) -> List[DocumentChunk]: |
|
|
"""Fast Word document processing""" |
|
|
chunks = [] |
|
|
|
|
|
try: |
|
|
with open(file_path, "rb") as docx_file: |
|
|
result = mammoth.convert_to_html(docx_file) |
|
|
soup = BeautifulSoup(result.html, 'html.parser') |
|
|
|
|
|
|
|
|
tables = soup.find_all('table') |
|
|
for idx, table in enumerate(tables[:10]): |
|
|
rows = table.find_all('tr')[:15] |
|
|
table_md = f"\n**TABLE {idx + 1}**\n" |
|
|
|
|
|
for row in rows: |
|
|
cells = [cell.get_text(strip=True)[:30] for cell in row.find_all(['td', 'th'])] |
|
|
table_md += "| " + " | ".join(cells) + " |\n" |
|
|
|
|
|
table.replace_with(table_md) |
|
|
|
|
|
text_content = soup.get_text() |
|
|
text_chunks = self.fast_text_split(text_content, os.path.basename(file_path)) |
|
|
|
|
|
for idx, chunk in enumerate(text_chunks): |
|
|
chunks.append(DocumentChunk( |
|
|
content=chunk, |
|
|
metadata={ |
|
|
"source": os.path.basename(file_path), |
|
|
"chunk_index": idx, |
|
|
"document_type": "docx_fast", |
|
|
"has_tables": "**TABLE" in chunk |
|
|
}, |
|
|
chunk_id=str(uuid.uuid4()) |
|
|
)) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Fast Word processing failed: {e}") |
|
|
raise Exception(f"Word processing failed: {e}") |
|
|
|
|
|
return chunks |
|
|
|
|
|
def process_email_fast(self, file_path: str) -> List[DocumentChunk]: |
|
|
"""Fast email processing""" |
|
|
chunks = [] |
|
|
|
|
|
try: |
|
|
with open(file_path, 'rb') as email_file: |
|
|
msg = email.message_from_bytes(email_file.read(), policy=email.policy.default) |
|
|
|
|
|
subject = msg.get('Subject', 'No Subject') |
|
|
sender = msg.get('From', 'Unknown Sender') |
|
|
date = msg.get('Date', 'Unknown Date') |
|
|
|
|
|
|
|
|
body_content = "" |
|
|
if msg.is_multipart(): |
|
|
for part in msg.walk(): |
|
|
if part.get_content_type() == "text/plain": |
|
|
content = part.get_content()[:5000] |
|
|
body_content += content |
|
|
break |
|
|
else: |
|
|
body_content = msg.get_content()[:5000] |
|
|
|
|
|
email_content = f"EMAIL: {subject}\nFrom: {sender}\nDate: {date}\n\n{body_content}" |
|
|
text_chunks = self.fast_text_split(email_content, os.path.basename(file_path)) |
|
|
|
|
|
for idx, chunk in enumerate(text_chunks): |
|
|
chunks.append(DocumentChunk( |
|
|
content=chunk, |
|
|
metadata={ |
|
|
"source": os.path.basename(file_path), |
|
|
"chunk_index": idx, |
|
|
"document_type": "email_fast", |
|
|
"subject": subject |
|
|
}, |
|
|
chunk_id=str(uuid.uuid4()) |
|
|
)) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Fast email processing failed: {e}") |
|
|
raise Exception(f"Email processing failed: {e}") |
|
|
|
|
|
return chunks |
|
|
|
|
|
|
|
|
|
|
|
parser_service = FastDocumentParserService() |
|
|
|
|
|
|
|
|
app = FastAPI(title="Ultra-Fast Document Parser", version="3.0.0") |
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
return {"status": "healthy", "message": "Ultra-fast document parser running"} |
|
|
|
|
|
@app.post("/parse") |
|
|
async def parse_file(file: UploadFile = File(...)): |
|
|
"""Ultra-fast file parsing - target < 60 seconds""" |
|
|
temp_file_path = None |
|
|
start_time = __import__('time').time() |
|
|
|
|
|
try: |
|
|
gc.collect() |
|
|
|
|
|
temp_file_path = f"./temp_{uuid.uuid4()}_{file.filename}" |
|
|
|
|
|
|
|
|
with open(temp_file_path, "wb") as buffer: |
|
|
content = await file.read() |
|
|
buffer.write(content) |
|
|
|
|
|
file_extension = Path(file.filename).suffix.lower() |
|
|
logger.info(f"⚡ FAST processing: {file.filename} ({file_extension})") |
|
|
|
|
|
|
|
|
if file_extension == '.pdf': |
|
|
chunks = parser_service.process_pdf_ultrafast(temp_file_path) |
|
|
elif file_extension in ['.docx', '.doc']: |
|
|
chunks = parser_service.process_word_doc_fast(temp_file_path) |
|
|
elif file_extension in ['.eml', '.msg']: |
|
|
chunks = parser_service.process_email_fast(temp_file_path) |
|
|
else: |
|
|
raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_extension}") |
|
|
|
|
|
|
|
|
chunk_dicts = [chunk.to_dict() for chunk in chunks] |
|
|
|
|
|
elapsed = __import__('time').time() - start_time |
|
|
|
|
|
|
|
|
try: |
|
|
with open("./_fast_parsed_output.json", "w") as f: |
|
|
json.dump({ |
|
|
"filename": file.filename, |
|
|
"total_chunks": len(chunks), |
|
|
"processing_time_seconds": elapsed, |
|
|
"first_chunk_preview": chunks[0].content[:200] if chunks else "No chunks" |
|
|
}, f, indent=2) |
|
|
except: |
|
|
pass |
|
|
|
|
|
logger.info(f"🎯 COMPLETED {file.filename} in {elapsed:.2f}s: {len(chunks)} chunks") |
|
|
|
|
|
return { |
|
|
"filename": file.filename, |
|
|
"status": "success", |
|
|
"chunks": chunk_dicts, |
|
|
"total_chunks": len(chunks), |
|
|
"processing_time_seconds": round(elapsed, 2), |
|
|
"processing_method": "ultrafast" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
elapsed = __import__('time').time() - start_time |
|
|
logger.error(f"❌ Processing failed after {elapsed:.2f}s: {e}") |
|
|
raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}") |
|
|
|
|
|
finally: |
|
|
if temp_file_path and os.path.exists(temp_file_path): |
|
|
try: |
|
|
os.remove(temp_file_path) |
|
|
except: |
|
|
pass |
|
|
gc.collect() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
logger.info("🚀 Starting Ultra-Fast Document Parser...") |
|
|
uvicorn.run(app, host="0.0.0.0", port=8001, log_level="info") |