cryogenic22 commited on
Commit
3514910
·
verified ·
1 Parent(s): 4904b73

Create core/document/processor.py

Browse files
Files changed (1) hide show
  1. core/document/processor.py +242 -0
core/document/processor.py ADDED
@@ -0,0 +1,242 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Dict, Any, List, Optional, BinaryIO
2
+ from ...core.base import LatticeComponent, LatticeError
3
+ from pydantic import BaseModel
4
+ import fitz # PyMuPDF
5
+ from docx import Document as DocxDocument
6
+ import pandas as pd
7
+ import hashlib
8
+ from pathlib import Path
9
+ import magic
10
+ import logging
11
+ from datetime import datetime
12
+
13
+ class DocumentConfig(BaseModel):
14
+ """Document processing configuration"""
15
+ extract_text: bool = True
16
+ extract_metadata: bool = True
17
+ extract_images: bool = False
18
+ chunk_size: int = 500
19
+ chunk_overlap: int = 50
20
+ encoding: str = 'utf-8'
21
+ ocr_enabled: bool = False
22
+
23
+ class ProcessedChunk(BaseModel):
24
+ """Processed document chunk"""
25
+ content: str
26
+ start_index: int
27
+ end_index: int
28
+ metadata: Dict[str, Any]
29
+
30
+ class ProcessedDocument(BaseModel):
31
+ """Processed document result"""
32
+ doc_id: str
33
+ content: str
34
+ chunks: List[ProcessedChunk]
35
+ metadata: Dict[str, Any]
36
+ file_type: str
37
+ timestamp: datetime
38
+
39
+ class DocumentProcessor(LatticeComponent):
40
+ """Main document processor"""
41
+
42
+ SUPPORTED_TYPES = {
43
+ 'pdf': ['application/pdf'],
44
+ 'docx': ['application/vnd.openxmlformats-officedocument.wordprocessingml.document'],
45
+ 'txt': ['text/plain'],
46
+ 'csv': ['text/csv', 'application/csv']
47
+ }
48
+
49
+ def __init__(self, config: Optional[Dict[str, Any]] = None):
50
+ super().__init__(config)
51
+ self.doc_config = DocumentConfig(**(config or {}))
52
+
53
+ async def initialize(self) -> None:
54
+ """Initialize document processor"""
55
+ try:
56
+ # Initialize OCR if enabled
57
+ if self.doc_config.ocr_enabled:
58
+ import pytesseract
59
+ self.ocr = pytesseract
60
+
61
+ self._initialized = True
62
+
63
+ except Exception as e:
64
+ raise LatticeError(f"Failed to initialize document processor: {str(e)}")
65
+
66
+ async def validate_config(self) -> bool:
67
+ """Validate configuration"""
68
+ try:
69
+ DocumentConfig(**(self.config or {}))
70
+ return True
71
+ except Exception as e:
72
+ self.logger.error(f"Invalid configuration: {str(e)}")
73
+ return False
74
+
75
+ def get_file_type(self, file: BinaryIO) -> str:
76
+ """Determine file type using magic numbers"""
77
+ mime = magic.from_buffer(file.read(2048), mime=True)
78
+ file.seek(0)
79
+
80
+ for file_type, mime_types in self.SUPPORTED_TYPES.items():
81
+ if mime in mime_types:
82
+ return file_type
83
+
84
+ raise LatticeError(f"Unsupported file type: {mime}")
85
+
86
+ async def process_document(
87
+ self,
88
+ file: BinaryIO,
89
+ file_type: Optional[str] = None
90
+ ) -> ProcessedDocument:
91
+ """Process document"""
92
+ self.ensure_initialized()
93
+
94
+ try:
95
+ # Determine file type if not provided
96
+ if not file_type:
97
+ file_type = self.get_file_type(file)
98
+
99
+ # Generate document ID
100
+ doc_id = self._generate_doc_id(file)
101
+
102
+ # Extract content and metadata
103
+ if file_type == 'pdf':
104
+ content, metadata = self._process_pdf(file)
105
+ elif file_type == 'docx':
106
+ content, metadata = self._process_docx(file)
107
+ elif file_type == 'txt':
108
+ content, metadata = self._process_text(file)
109
+ elif file_type == 'csv':
110
+ content, metadata = self._process_csv(file)
111
+ else:
112
+ raise LatticeError(f"Unsupported file type: {file_type}")
113
+
114
+ # Create chunks
115
+ chunks = self._create_chunks(content)
116
+
117
+ return ProcessedDocument(
118
+ doc_id=doc_id,
119
+ content=content,
120
+ chunks=chunks,
121
+ metadata=metadata,
122
+ file_type=file_type,
123
+ timestamp=datetime.now()
124
+ )
125
+
126
+ except Exception as e:
127
+ self.logger.error(f"Error processing document: {str(e)}")
128
+ raise LatticeError(f"Document processing failed: {str(e)}")
129
+
130
+ def _generate_doc_id(self, file: BinaryIO) -> str:
131
+ """Generate unique document ID"""
132
+ file_hash = hashlib.sha256()
133
+ for chunk in iter(lambda: file.read(4096), b""):
134
+ file_hash.update(chunk)
135
+ file.seek(0)
136
+ return file_hash.hexdigest()[:16]
137
+
138
+ def _process_pdf(self, file: BinaryIO) -> tuple[str, Dict[str, Any]]:
139
+ """Process PDF document"""
140
+ pdf = fitz.open(stream=file.read())
141
+
142
+ # Extract text
143
+ text = ""
144
+ if self.doc_config.extract_text:
145
+ for page in pdf:
146
+ text += page.get_text()
147
+
148
+ # Extract metadata
149
+ metadata = {}
150
+ if self.doc_config.extract_metadata:
151
+ metadata = {
152
+ 'title': pdf.metadata.get('title'),
153
+ 'author': pdf.metadata.get('author'),
154
+ 'subject': pdf.metadata.get('subject'),
155
+ 'keywords': pdf.metadata.get('keywords'),
156
+ 'page_count': len(pdf),
157
+ 'file_size': file.tell()
158
+ }
159
+
160
+ return text, metadata
161
+
162
+ def _process_docx(self, file: BinaryIO) -> tuple[str, Dict[str, Any]]:
163
+ """Process DOCX document"""
164
+ doc = DocxDocument(file)
165
+
166
+ # Extract text
167
+ text = ""
168
+ if self.doc_config.extract_text:
169
+ for para in doc.paragraphs:
170
+ text += para.text + "\n"
171
+
172
+ # Extract metadata
173
+ metadata = {}
174
+ if self.doc_config.extract_metadata:
175
+ core_props = doc.core_properties
176
+ metadata = {
177
+ 'title': core_props.title,
178
+ 'author': core_props.author,
179
+ 'created': core_props.created.isoformat() if core_props.created else None,
180
+ 'modified': core_props.modified.isoformat() if core_props.modified else None,
181
+ 'paragraph_count': len(doc.paragraphs),
182
+ 'file_size': file.tell()
183
+ }
184
+
185
+ return text, metadata
186
+
187
+ def _process_text(self, file: BinaryIO) -> tuple[str, Dict[str, Any]]:
188
+ """Process text document"""
189
+ content = file.read().decode(self.doc_config.encoding)
190
+
191
+ metadata = {
192
+ 'file_size': file.tell(),
193
+ 'encoding': self.doc_config.encoding,
194
+ 'line_count': content.count('\n') + 1
195
+ }
196
+
197
+ return content, metadata
198
+
199
+ def _process_csv(self, file: BinaryIO) -> tuple[str, Dict[str, Any]]:
200
+ """Process CSV document"""
201
+ df = pd.read_csv(file)
202
+
203
+ # Convert to string representation
204
+ content = df.to_string()
205
+
206
+ metadata = {
207
+ 'file_size': file.tell(),
208
+ 'row_count': len(df),
209
+ 'column_count': len(df.columns),
210
+ 'columns': df.columns.tolist()
211
+ }
212
+
213
+ return content, metadata
214
+
215
+ def _create_chunks(self, content: str) -> List[ProcessedChunk]:
216
+ """Create document chunks"""
217
+ chunks = []
218
+ start = 0
219
+
220
+ while start < len(content):
221
+ end = start + self.doc_config.chunk_size
222
+
223
+ # Adjust end to prevent cutting words
224
+ if end < len(content):
225
+ end = content.rfind(' ', start, end) + 1
226
+
227
+ chunk_content = content[start:end]
228
+ chunks.append(
229
+ ProcessedChunk(
230
+ content=chunk_content,
231
+ start_index=start,
232
+ end_index=end,
233
+ metadata={
234
+ 'chunk_size': len(chunk_content),
235
+ 'position': len(chunks)
236
+ }
237
+ )
238
+ )
239
+
240
+ start = end - self.doc_config.chunk_overlap
241
+
242
+ return chunks