notebooklm-fast / services /document_processor.py
jashdoshi77
feat: Add AI-powered query understanding with DeepSeek parsing
64deb3c
"""
Document Processor Service
Handles text extraction from various document types:
- PDF (text extraction + OCR fallback)
- DOCX (Word documents)
- Excel (XLS, XLSX)
- Images (via OCR)
- Plain text (TXT, MD)
"""
import os
import io
from pathlib import Path
from typing import Optional
import fitz # PyMuPDF
from docx import Document
from pptx import Presentation
from pptx.util import Inches
import pandas as pd
from PIL import Image
from services.ocr_service import ocr_service
from config import Config
class DocumentProcessor:
def __init__(self):
self.supported_extensions = Config.ALLOWED_EXTENSIONS
def get_file_type(self, filename: str) -> str:
"""Determine file type from extension"""
ext = Path(filename).suffix.lower().lstrip('.')
type_map = {
'pdf': 'pdf',
'doc': 'word',
'docx': 'word',
'ppt': 'powerpoint',
'pptx': 'powerpoint',
'xls': 'excel',
'xlsx': 'excel',
'txt': 'text',
'md': 'text',
'png': 'image',
'jpg': 'image',
'jpeg': 'image',
'gif': 'image',
'webp': 'image'
}
return type_map.get(ext, 'unknown')
def is_supported(self, filename: str) -> bool:
"""Check if file type is supported"""
ext = Path(filename).suffix.lower().lstrip('.')
return ext in self.supported_extensions
def process(self, file_path: str, filename: str) -> dict:
"""
Process a document and extract text
Returns: {"success": bool, "text": str, "method": str, "error": str}
"""
file_type = self.get_file_type(filename)
try:
if file_type == 'pdf':
return self._process_pdf(file_path)
elif file_type == 'word':
return self._process_word(file_path)
elif file_type == 'powerpoint':
return self._process_pptx(file_path)
elif file_type == 'excel':
return self._process_excel(file_path)
elif file_type == 'image':
return self._process_image(file_path)
elif file_type == 'text':
return self._process_text(file_path)
else:
return {
"success": False,
"error": f"Unsupported file type: {file_type}"
}
except Exception as e:
return {"success": False, "error": str(e)}
def _process_pdf(self, file_path: str) -> dict:
"""
Process PDF - Always use complete OpenRouter vision OCR for best accuracy
"""
try:
doc = fitz.open(file_path)
total_pages = len(doc)
doc.close()
print(f"Processing {total_pages} page PDF with OpenRouter vision OCR...")
# Use OpenRouter vision models for OCR
ocr_result = ocr_service.extract_text_from_pdf(file_path)
if ocr_result['success']:
print(f"PDF OCR successful")
return {
"success": True,
"text": ocr_result['text'],
"method": ocr_result.get('model', 'OpenRouter Vision OCR'),
"page_count": total_pages
}
else:
return {
"success": False,
"error": f"OCR failed: {ocr_result['error']}"
}
except Exception as e:
return {"success": False, "error": f"PDF processing error: {str(e)}"}
def _process_pdf_hybrid(self, file_path: str, text_pages: list, ocr_needed_pages: list) -> dict:
"""
Hybrid PDF processing: combine text extraction with OCR for scanned pages only
Used as fallback when full PDF OCR fails
"""
try:
doc = fitz.open(file_path)
total_pages = len(doc)
all_pages = {}
# Add already extracted text pages
for page_num, text in text_pages:
all_pages[page_num] = f"--- Page {page_num + 1} ---\n{text}"
# OCR the scanned pages in batches
print(f"OCR processing {len(ocr_needed_pages)} scanned pages...")
for i, page_num in enumerate(ocr_needed_pages):
page = doc[page_num]
# Render page to image
mat = fitz.Matrix(2, 2) # 2x zoom for better OCR
pix = page.get_pixmap(matrix=mat)
temp_path = f"{file_path}_page_{page_num}.png"
pix.save(temp_path)
ocr_result = ocr_service.extract_text(temp_path)
# Clean up temp file
if os.path.exists(temp_path):
os.remove(temp_path)
if ocr_result['success']:
all_pages[page_num] = f"--- Page {page_num + 1} (OCR) ---\n{ocr_result['text']}"
else:
all_pages[page_num] = f"--- Page {page_num + 1} ---\n[OCR failed: {ocr_result['error']}]"
# Progress logging every 10 pages
if (i + 1) % 10 == 0:
print(f"OCR progress: {i + 1}/{len(ocr_needed_pages)} pages")
doc.close()
# Combine all pages in order
text_parts = [all_pages[i] for i in sorted(all_pages.keys())]
return {
"success": True,
"text": "\n\n".join(text_parts),
"method": "hybrid (text + OCR)",
"page_count": total_pages
}
except Exception as e:
return {"success": False, "error": f"Hybrid PDF processing error: {str(e)}"}
def _process_word(self, file_path: str) -> dict:
"""Process Word documents (DOCX)"""
try:
doc = Document(file_path)
text_parts = []
# Extract paragraphs
for para in doc.paragraphs:
if para.text.strip():
text_parts.append(para.text)
# Extract tables
for table in doc.tables:
table_text = []
for row in table.rows:
row_text = [cell.text.strip() for cell in row.cells]
table_text.append(" | ".join(row_text))
if table_text:
text_parts.append("\n[Table]\n" + "\n".join(table_text))
return {
"success": True,
"text": "\n\n".join(text_parts),
"method": "docx extraction"
}
except Exception as e:
return {"success": False, "error": f"Word processing error: {str(e)}"}
def _process_pptx(self, file_path: str) -> dict:
"""Process PowerPoint files (PPTX) - extracts all text from slides"""
try:
prs = Presentation(file_path)
text_parts = []
slide_count = 0
for slide_num, slide in enumerate(prs.slides, 1):
slide_count += 1
slide_text_parts = []
# Extract text from all shapes
for shape in slide.shapes:
# Text frames (text boxes, titles, etc.)
if shape.has_text_frame:
for paragraph in shape.text_frame.paragraphs:
para_text = ""
for run in paragraph.runs:
para_text += run.text
if para_text.strip():
slide_text_parts.append(para_text.strip())
# Tables in slides
if shape.has_table:
table = shape.table
table_rows = []
for row in table.rows:
row_cells = []
for cell in row.cells:
cell_text = ""
for paragraph in cell.text_frame.paragraphs:
for run in paragraph.runs:
cell_text += run.text
row_cells.append(cell_text.strip())
table_rows.append(" | ".join(row_cells))
if table_rows:
slide_text_parts.append("[Table]\n" + "\n".join(table_rows))
# Speaker notes
if slide.has_notes_slide:
notes_frame = slide.notes_slide.notes_text_frame
if notes_frame:
notes_text = ""
for paragraph in notes_frame.paragraphs:
for run in paragraph.runs:
notes_text += run.text
if notes_text.strip():
slide_text_parts.append(f"[Speaker Notes]\n{notes_text.strip()}")
if slide_text_parts:
text_parts.append(f"--- Slide {slide_num} ---\n" + "\n".join(slide_text_parts))
if not text_parts:
return {
"success": False,
"error": "No text content found in PowerPoint file"
}
return {
"success": True,
"text": "\n\n".join(text_parts),
"method": "pptx extraction",
"slide_count": slide_count
}
except Exception as e:
return {"success": False, "error": f"PowerPoint processing error: {str(e)}"}
def _process_excel(self, file_path: str) -> dict:
"""Process Excel files"""
try:
# Read all sheets
excel_file = pd.ExcelFile(file_path)
text_parts = []
for sheet_name in excel_file.sheet_names:
df = pd.read_excel(excel_file, sheet_name=sheet_name)
if not df.empty:
# Convert to string representation
sheet_text = f"=== Sheet: {sheet_name} ===\n"
sheet_text += df.to_string(index=False)
text_parts.append(sheet_text)
return {
"success": True,
"text": "\n\n".join(text_parts),
"method": "excel extraction",
"sheet_count": len(excel_file.sheet_names)
}
except Exception as e:
return {"success": False, "error": f"Excel processing error: {str(e)}"}
def _process_image(self, file_path: str) -> dict:
"""Process images using OCR"""
result = ocr_service.extract_text(file_path)
if result['success']:
return {
"success": True,
"text": result['text'],
"method": f"OCR ({result.get('model', 'unknown')})"
}
else:
return {"success": False, "error": result['error']}
def _process_text(self, file_path: str) -> dict:
"""Process plain text files"""
try:
# Try different encodings
encodings = ['utf-8', 'latin-1', 'cp1252']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
text = f.read()
return {
"success": True,
"text": text,
"method": f"text read ({encoding})"
}
except UnicodeDecodeError:
continue
return {"success": False, "error": "Could not decode text file"}
except Exception as e:
return {"success": False, "error": f"Text processing error: {str(e)}"}
# Singleton instance
document_processor = DocumentProcessor()