File size: 4,710 Bytes
e885bfa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""Main document processor orchestrating the pipeline"""

import os
import json
from typing import List, Dict, Optional
from pathlib import Path
from src.rag.document_processing.models import Document, DocumentChunk
from src.rag.document_processing.chunker import SemanticChunker


class DocumentProcessor:
    """
    Orchestrates document loading, cleaning, and chunking.
    """
    
    def __init__(
        self,
        chunk_size: int = 400,
        chunk_overlap: int = 100,
        min_chunk_size: int = 50,
    ):
        """Initialize document processor"""
        self.chunker = SemanticChunker(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            min_chunk_size=min_chunk_size,
        )
        self.documents: Dict[str, Document] = {}
        self.chunks: List[DocumentChunk] = []
    
    def load_documents(self, directory: str) -> List[Document]:
        """
        Load documents from a directory.
        
        Supports: .txt, .md, .json files
        """
        documents = []
        path = Path(directory)
        
        for file_path in path.glob('**/*'):
            if file_path.is_file() and file_path.suffix in ['.txt', '.md', '.json','.csv']:
                try:
                    doc = self._load_single_file(file_path)
                    if doc:
                        documents.append(doc)
                        self.documents[doc.doc_id] = doc
                except Exception as e:
                    print(f"Error loading {file_path}: {e}")
        
        return documents
    
    def _load_single_file(self, file_path: Path) -> Optional[Document]:
        """Load and parse a single file"""
        doc_id = file_path.stem
        filename = file_path.name
        
        try:
            if file_path.suffix == '.json':
                with open(file_path, 'r') as f:
                    data = json.load(f)
                    content = data.get('content', '') or str(data)
            else:
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
            
            # Clean content
            content = self._clean_text(content)
            
            if not content.strip():
                return None
            
            return Document(
                doc_id=doc_id,
                filename=filename,
                content=content,
                doc_type=self._infer_doc_type(filename),
            )
        except Exception as e:
            print(f"Failed to load {file_path}: {e}")
            return None
    
    def _clean_text(self, text: str) -> str:
        """Clean and normalize text"""
        # Remove extra whitespace
        text = ' '.join(text.split())
        # Remove special characters but keep punctuation
        text = text.replace('\x00', '')
        return text.strip()
    
    def _infer_doc_type(self, filename: str) -> str:
        """Infer document type from filename"""
        lower_name = filename.lower()
        if 'faq' in lower_name:
            return 'faq'
        elif 'manual' in lower_name:
            return 'product_manual'
        elif 'api' in lower_name:
            return 'api_docs'
        elif 'guide' in lower_name:
            return 'user_guide'
        else:
            return 'general_document'
    
    def process(self) -> List[DocumentChunk]:
        """
        Process all loaded documents into chunks.
        """
        self.chunks = []
        
        for doc_id, doc in self.documents.items():
            metadata = {
                'doc_type': doc.doc_type,
                'filename': doc.filename,
                **doc.metadata
            }
            
            chunks = self.chunker.chunk(
                text=doc.content,
                doc_id=doc_id,
                source_doc=doc.filename,
                metadata=metadata,
            )
            self.chunks.extend(chunks)
        
        return self.chunks
    
    def get_chunks_for_doc(self, doc_id: str) -> List[DocumentChunk]:
        """Get all chunks for a specific document"""
        return [c for c in self.chunks if c.source_doc == self.documents.get(doc_id, {}).filename]
    
    def export_chunks(self, output_path: str) -> None:
        """Export chunks to JSON for inspection"""
        data = [
            {
                'chunk_id': c.chunk_id,
                'content': c.content[:100] + '...' if len(c.content) > 100 else c.content,
                'source_doc': c.source_doc,
                'token_count': c.token_count,
            }
            for c in self.chunks
        ]
        
        with open(output_path, 'w') as f:
            json.dump(data, f, indent=2)