openai-chatbot-mcp / backend /document_reader.py
Julian Vanecek
init
3151bfa
"""
Document Reader for page-level document access.
"""
import os
import json
from typing import List, Optional, Dict, Union
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
class DocumentReader:
def __init__(self, pages_dir: Optional[Path] = None):
"""Initialize the document reader."""
self.pages_dir = pages_dir or Path(__file__).parent.parent / "pages"
self.document_index = self._load_document_index()
def _load_document_index(self) -> Dict:
"""Load document index if available."""
index_path = self.pages_dir / "document_index.json"
if index_path.exists():
try:
with open(index_path, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading document index: {e}")
return {}
def _normalize_document_name(self, document_name: str) -> str:
"""Normalize document name for consistent file matching."""
# Remove common prefixes/suffixes
name = document_name.strip()
name = name.replace(" ", "_")
name = name.replace(".", "_")
# Handle different formats
if not name.endswith(("UserGuide", "InstallationGuide", "QuickStartGuide")):
# Try to identify the document type
if "user" in name.lower() and "guide" in name.lower():
if not name.endswith("UserGuide"):
name = name.replace("User_Guide", "UserGuide")
elif "installation" in name.lower() and "guide" in name.lower():
if not name.endswith("InstallationGuide"):
name = name.replace("Installation_Guide", "InstallationGuide")
elif "quick" in name.lower() and "start" in name.lower():
if not name.endswith("QuickStartGuide"):
name = name.replace("Quick_Start_Guide", "QuickStartGuide")
return name
def get_table_of_contents(self, document_name: str) -> Optional[str]:
"""Get the table of contents for a document."""
normalized_name = self._normalize_document_name(document_name)
toc_filename = f"{normalized_name}_TOC.txt"
toc_path = self.pages_dir / toc_filename
if not toc_path.exists():
# Try alternative naming conventions
alternatives = [
f"{document_name}_TOC.txt",
f"{document_name.replace(' ', '_')}_TOC.txt",
f"{document_name.replace('.', '_')}_TOC.txt"
]
for alt in alternatives:
alt_path = self.pages_dir / alt
if alt_path.exists():
toc_path = alt_path
break
if toc_path.exists():
try:
with open(toc_path, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
logger.error(f"Error reading TOC file {toc_path}: {e}")
return None
logger.warning(f"TOC file not found for document: {document_name}")
return None
def read_pages(self, document_name: str, page_numbers: Optional[List[int]] = None) -> Union[str, Dict[int, str]]:
"""
Read specific pages from a document.
If page_numbers is None, returns the table of contents.
"""
if page_numbers is None:
# Return table of contents
toc = self.get_table_of_contents(document_name)
if toc:
return f"Table of Contents for {document_name}:\n\n{toc}"
else:
return f"Table of contents not found for document: {document_name}"
# Read specific pages
normalized_name = self._normalize_document_name(document_name)
pages_content = {}
for page_num in page_numbers:
page_filename = f"{normalized_name}_page_{page_num:03d}.txt"
page_path = self.pages_dir / page_filename
if not page_path.exists():
# Try alternative formats
alternatives = [
f"{document_name}_page_{page_num:03d}.txt",
f"{document_name.replace(' ', '_')}_page_{page_num:03d}.txt",
f"{document_name.replace('.', '_')}_page_{page_num:03d}.txt"
]
for alt in alternatives:
alt_path = self.pages_dir / alt
if alt_path.exists():
page_path = alt_path
break
if page_path.exists():
try:
with open(page_path, 'r', encoding='utf-8') as f:
pages_content[page_num] = f.read()
except Exception as e:
logger.error(f"Error reading page {page_num} from {document_name}: {e}")
pages_content[page_num] = f"Error reading page {page_num}"
else:
pages_content[page_num] = f"Page {page_num} not found"
# Format the output
if len(pages_content) == 1:
page_num = list(pages_content.keys())[0]
return f"Page {page_num} of {document_name}:\n\n{pages_content[page_num]}"
else:
formatted_pages = []
for page_num in sorted(pages_content.keys()):
formatted_pages.append(f"=== Page {page_num} ===\n{pages_content[page_num]}")
return f"Pages from {document_name}:\n\n" + "\n\n".join(formatted_pages)
def list_available_documents(self) -> List[str]:
"""List all available documents."""
documents = set()
# Scan for TOC files
for toc_file in self.pages_dir.glob("*_TOC.txt"):
doc_name = toc_file.stem.replace("_TOC", "")
documents.add(doc_name)
# Also check document index
if self.document_index:
documents.update(self.document_index.keys())
return sorted(list(documents))
def get_document_info(self, document_name: str) -> Dict[str, any]:
"""Get information about a document (number of pages, etc.)."""
normalized_name = self._normalize_document_name(document_name)
info = {
"name": document_name,
"normalized_name": normalized_name,
"has_toc": False,
"page_count": 0,
"available_pages": []
}
# Check for TOC
toc_path = self.pages_dir / f"{normalized_name}_TOC.txt"
info["has_toc"] = toc_path.exists()
# Count pages
page_pattern = f"{normalized_name}_page_*.txt"
page_files = list(self.pages_dir.glob(page_pattern))
if not page_files:
# Try alternative patterns
for alt_pattern in [f"{document_name}_page_*.txt",
f"{document_name.replace(' ', '_')}_page_*.txt"]:
page_files = list(self.pages_dir.glob(alt_pattern))
if page_files:
break
if page_files:
page_numbers = []
for page_file in page_files:
try:
# Extract page number from filename
page_num_str = page_file.stem.split("_page_")[-1]
page_num = int(page_num_str)
page_numbers.append(page_num)
except:
pass
info["page_count"] = len(page_numbers)
info["available_pages"] = sorted(page_numbers)
return info