File size: 4,710 Bytes
e885bfa |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
"""Main document processor orchestrating the pipeline"""
import os
import json
from typing import List, Dict, Optional
from pathlib import Path
from src.rag.document_processing.models import Document, DocumentChunk
from src.rag.document_processing.chunker import SemanticChunker
class DocumentProcessor:
"""
Orchestrates document loading, cleaning, and chunking.
"""
def __init__(
self,
chunk_size: int = 400,
chunk_overlap: int = 100,
min_chunk_size: int = 50,
):
"""Initialize document processor"""
self.chunker = SemanticChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
min_chunk_size=min_chunk_size,
)
self.documents: Dict[str, Document] = {}
self.chunks: List[DocumentChunk] = []
def load_documents(self, directory: str) -> List[Document]:
"""
Load documents from a directory.
Supports: .txt, .md, .json files
"""
documents = []
path = Path(directory)
for file_path in path.glob('**/*'):
if file_path.is_file() and file_path.suffix in ['.txt', '.md', '.json','.csv']:
try:
doc = self._load_single_file(file_path)
if doc:
documents.append(doc)
self.documents[doc.doc_id] = doc
except Exception as e:
print(f"Error loading {file_path}: {e}")
return documents
def _load_single_file(self, file_path: Path) -> Optional[Document]:
"""Load and parse a single file"""
doc_id = file_path.stem
filename = file_path.name
try:
if file_path.suffix == '.json':
with open(file_path, 'r') as f:
data = json.load(f)
content = data.get('content', '') or str(data)
else:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Clean content
content = self._clean_text(content)
if not content.strip():
return None
return Document(
doc_id=doc_id,
filename=filename,
content=content,
doc_type=self._infer_doc_type(filename),
)
except Exception as e:
print(f"Failed to load {file_path}: {e}")
return None
def _clean_text(self, text: str) -> str:
"""Clean and normalize text"""
# Remove extra whitespace
text = ' '.join(text.split())
# Remove special characters but keep punctuation
text = text.replace('\x00', '')
return text.strip()
def _infer_doc_type(self, filename: str) -> str:
"""Infer document type from filename"""
lower_name = filename.lower()
if 'faq' in lower_name:
return 'faq'
elif 'manual' in lower_name:
return 'product_manual'
elif 'api' in lower_name:
return 'api_docs'
elif 'guide' in lower_name:
return 'user_guide'
else:
return 'general_document'
def process(self) -> List[DocumentChunk]:
"""
Process all loaded documents into chunks.
"""
self.chunks = []
for doc_id, doc in self.documents.items():
metadata = {
'doc_type': doc.doc_type,
'filename': doc.filename,
**doc.metadata
}
chunks = self.chunker.chunk(
text=doc.content,
doc_id=doc_id,
source_doc=doc.filename,
metadata=metadata,
)
self.chunks.extend(chunks)
return self.chunks
def get_chunks_for_doc(self, doc_id: str) -> List[DocumentChunk]:
"""Get all chunks for a specific document"""
return [c for c in self.chunks if c.source_doc == self.documents.get(doc_id, {}).filename]
def export_chunks(self, output_path: str) -> None:
"""Export chunks to JSON for inspection"""
data = [
{
'chunk_id': c.chunk_id,
'content': c.content[:100] + '...' if len(c.content) > 100 else c.content,
'source_doc': c.source_doc,
'token_count': c.token_count,
}
for c in self.chunks
]
with open(output_path, 'w') as f:
json.dump(data, f, indent=2)
|