Vivek kumar
RAG based QNA
e885bfa
"""Main document processor orchestrating the pipeline"""
import os
import json
from typing import List, Dict, Optional
from pathlib import Path
from src.rag.document_processing.models import Document, DocumentChunk
from src.rag.document_processing.chunker import SemanticChunker
class DocumentProcessor:
"""
Orchestrates document loading, cleaning, and chunking.
"""
def __init__(
self,
chunk_size: int = 400,
chunk_overlap: int = 100,
min_chunk_size: int = 50,
):
"""Initialize document processor"""
self.chunker = SemanticChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
min_chunk_size=min_chunk_size,
)
self.documents: Dict[str, Document] = {}
self.chunks: List[DocumentChunk] = []
def load_documents(self, directory: str) -> List[Document]:
"""
Load documents from a directory.
Supports: .txt, .md, .json files
"""
documents = []
path = Path(directory)
for file_path in path.glob('**/*'):
if file_path.is_file() and file_path.suffix in ['.txt', '.md', '.json','.csv']:
try:
doc = self._load_single_file(file_path)
if doc:
documents.append(doc)
self.documents[doc.doc_id] = doc
except Exception as e:
print(f"Error loading {file_path}: {e}")
return documents
def _load_single_file(self, file_path: Path) -> Optional[Document]:
"""Load and parse a single file"""
doc_id = file_path.stem
filename = file_path.name
try:
if file_path.suffix == '.json':
with open(file_path, 'r') as f:
data = json.load(f)
content = data.get('content', '') or str(data)
else:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Clean content
content = self._clean_text(content)
if not content.strip():
return None
return Document(
doc_id=doc_id,
filename=filename,
content=content,
doc_type=self._infer_doc_type(filename),
)
except Exception as e:
print(f"Failed to load {file_path}: {e}")
return None
def _clean_text(self, text: str) -> str:
"""Clean and normalize text"""
# Remove extra whitespace
text = ' '.join(text.split())
# Remove special characters but keep punctuation
text = text.replace('\x00', '')
return text.strip()
def _infer_doc_type(self, filename: str) -> str:
"""Infer document type from filename"""
lower_name = filename.lower()
if 'faq' in lower_name:
return 'faq'
elif 'manual' in lower_name:
return 'product_manual'
elif 'api' in lower_name:
return 'api_docs'
elif 'guide' in lower_name:
return 'user_guide'
else:
return 'general_document'
def process(self) -> List[DocumentChunk]:
"""
Process all loaded documents into chunks.
"""
self.chunks = []
for doc_id, doc in self.documents.items():
metadata = {
'doc_type': doc.doc_type,
'filename': doc.filename,
**doc.metadata
}
chunks = self.chunker.chunk(
text=doc.content,
doc_id=doc_id,
source_doc=doc.filename,
metadata=metadata,
)
self.chunks.extend(chunks)
return self.chunks
def get_chunks_for_doc(self, doc_id: str) -> List[DocumentChunk]:
"""Get all chunks for a specific document"""
return [c for c in self.chunks if c.source_doc == self.documents.get(doc_id, {}).filename]
def export_chunks(self, output_path: str) -> None:
"""Export chunks to JSON for inspection"""
data = [
{
'chunk_id': c.chunk_id,
'content': c.content[:100] + '...' if len(c.content) > 100 else c.content,
'source_doc': c.source_doc,
'token_count': c.token_count,
}
for c in self.chunks
]
with open(output_path, 'w') as f:
json.dump(data, f, indent=2)