|
|
"""Main document processor orchestrating the pipeline""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
from typing import List, Dict, Optional |
|
|
from pathlib import Path |
|
|
from src.rag.document_processing.models import Document, DocumentChunk |
|
|
from src.rag.document_processing.chunker import SemanticChunker |
|
|
|
|
|
|
|
|
class DocumentProcessor: |
|
|
""" |
|
|
Orchestrates document loading, cleaning, and chunking. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
chunk_size: int = 400, |
|
|
chunk_overlap: int = 100, |
|
|
min_chunk_size: int = 50, |
|
|
): |
|
|
"""Initialize document processor""" |
|
|
self.chunker = SemanticChunker( |
|
|
chunk_size=chunk_size, |
|
|
chunk_overlap=chunk_overlap, |
|
|
min_chunk_size=min_chunk_size, |
|
|
) |
|
|
self.documents: Dict[str, Document] = {} |
|
|
self.chunks: List[DocumentChunk] = [] |
|
|
|
|
|
def load_documents(self, directory: str) -> List[Document]: |
|
|
""" |
|
|
Load documents from a directory. |
|
|
|
|
|
Supports: .txt, .md, .json files |
|
|
""" |
|
|
documents = [] |
|
|
path = Path(directory) |
|
|
|
|
|
for file_path in path.glob('**/*'): |
|
|
if file_path.is_file() and file_path.suffix in ['.txt', '.md', '.json','.csv']: |
|
|
try: |
|
|
doc = self._load_single_file(file_path) |
|
|
if doc: |
|
|
documents.append(doc) |
|
|
self.documents[doc.doc_id] = doc |
|
|
except Exception as e: |
|
|
print(f"Error loading {file_path}: {e}") |
|
|
|
|
|
return documents |
|
|
|
|
|
def _load_single_file(self, file_path: Path) -> Optional[Document]: |
|
|
"""Load and parse a single file""" |
|
|
doc_id = file_path.stem |
|
|
filename = file_path.name |
|
|
|
|
|
try: |
|
|
if file_path.suffix == '.json': |
|
|
with open(file_path, 'r') as f: |
|
|
data = json.load(f) |
|
|
content = data.get('content', '') or str(data) |
|
|
else: |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
content = f.read() |
|
|
|
|
|
|
|
|
content = self._clean_text(content) |
|
|
|
|
|
if not content.strip(): |
|
|
return None |
|
|
|
|
|
return Document( |
|
|
doc_id=doc_id, |
|
|
filename=filename, |
|
|
content=content, |
|
|
doc_type=self._infer_doc_type(filename), |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Failed to load {file_path}: {e}") |
|
|
return None |
|
|
|
|
|
def _clean_text(self, text: str) -> str: |
|
|
"""Clean and normalize text""" |
|
|
|
|
|
text = ' '.join(text.split()) |
|
|
|
|
|
text = text.replace('\x00', '') |
|
|
return text.strip() |
|
|
|
|
|
def _infer_doc_type(self, filename: str) -> str: |
|
|
"""Infer document type from filename""" |
|
|
lower_name = filename.lower() |
|
|
if 'faq' in lower_name: |
|
|
return 'faq' |
|
|
elif 'manual' in lower_name: |
|
|
return 'product_manual' |
|
|
elif 'api' in lower_name: |
|
|
return 'api_docs' |
|
|
elif 'guide' in lower_name: |
|
|
return 'user_guide' |
|
|
else: |
|
|
return 'general_document' |
|
|
|
|
|
def process(self) -> List[DocumentChunk]: |
|
|
""" |
|
|
Process all loaded documents into chunks. |
|
|
""" |
|
|
self.chunks = [] |
|
|
|
|
|
for doc_id, doc in self.documents.items(): |
|
|
metadata = { |
|
|
'doc_type': doc.doc_type, |
|
|
'filename': doc.filename, |
|
|
**doc.metadata |
|
|
} |
|
|
|
|
|
chunks = self.chunker.chunk( |
|
|
text=doc.content, |
|
|
doc_id=doc_id, |
|
|
source_doc=doc.filename, |
|
|
metadata=metadata, |
|
|
) |
|
|
self.chunks.extend(chunks) |
|
|
|
|
|
return self.chunks |
|
|
|
|
|
def get_chunks_for_doc(self, doc_id: str) -> List[DocumentChunk]: |
|
|
"""Get all chunks for a specific document""" |
|
|
return [c for c in self.chunks if c.source_doc == self.documents.get(doc_id, {}).filename] |
|
|
|
|
|
def export_chunks(self, output_path: str) -> None: |
|
|
"""Export chunks to JSON for inspection""" |
|
|
data = [ |
|
|
{ |
|
|
'chunk_id': c.chunk_id, |
|
|
'content': c.content[:100] + '...' if len(c.content) > 100 else c.content, |
|
|
'source_doc': c.source_doc, |
|
|
'token_count': c.token_count, |
|
|
} |
|
|
for c in self.chunks |
|
|
] |
|
|
|
|
|
with open(output_path, 'w') as f: |
|
|
json.dump(data, f, indent=2) |
|
|
|