FerrellSyntheticIntelligence commited on
Commit
8f6d026
·
1 Parent(s): 9db2e7e

feat: complete robust multi-format ingestion module with structural fallback tracking

Browse files
Files changed (1) hide show
  1. src/core/memory_engine.py +208 -4
src/core/memory_engine.py CHANGED
@@ -1,8 +1,27 @@
1
  import os
 
 
 
 
 
 
 
 
 
 
 
2
 
3
  class MemoryEngine:
 
 
 
 
 
 
 
 
4
  def ingest_knowledge(self, directory):
5
- # Anchor the path to the current working directory
6
  base_path = os.path.join(os.getcwd(), directory)
7
 
8
  if not os.path.exists(base_path):
@@ -10,6 +29,191 @@ class MemoryEngine:
10
  os.makedirs(base_path, exist_ok=True)
11
  return
12
 
13
- for filename in os.listdir(base_path):
14
- print(f"Ingesting: {filename}")
15
- # Your existing ingestion logic here...
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
+ import json
3
+ import csv
4
+ import math
5
+ from dataclasses import dataclass, asdict
6
+
7
+ @dataclass
8
+ class KnowledgeChunk:
9
+ chunk_id: str
10
+ source_path: str
11
+ text: str
12
+ metadata: dict
13
 
14
  class MemoryEngine:
15
+ def __init__(self, model_name="all-MiniLM-L6-v2"):
16
+ # Lazy import of heavy ML libraries to keep initial startup fast
17
+ from sentence_transformers import SentenceTransformer
18
+ self.embedder = SentenceTransformer(model_name)
19
+ self.max_seq_length = self.embedder.get_max_seq_length() or 256
20
+ self.chunks_manifest = []
21
+ self.embeddings_cache = None
22
+
23
  def ingest_knowledge(self, directory):
24
+ """Scans directory, parses supported files, generates semantic chunks, and vectorizes them."""
25
  base_path = os.path.join(os.getcwd(), directory)
26
 
27
  if not os.path.exists(base_path):
 
29
  os.makedirs(base_path, exist_ok=True)
30
  return
31
 
32
+ all_chunks = []
33
+ print(f"[*] Beginning execution sweep over data matrix: {base_path}")
34
+
35
+ for root, _, files in os.walk(base_path):
36
+ for filename in files:
37
+ file_path = os.path.join(root, filename)
38
+ rel_path = os.path.relpath(file_path, base_path)
39
+
40
+ # Format Dispatcher Matrix
41
+ if filename.endswith(('.txt', '.md', '.rst')):
42
+ chunks = self._parse_txt(file_path, rel_path)
43
+ elif filename.endswith(('.json', '.jsonl')):
44
+ chunks = self._parse_json(file_path, rel_path)
45
+ elif filename.endswith(('.csv', '.tsv')):
46
+ chunks = self._parse_csv(file_path, rel_path)
47
+ else:
48
+ chunks = self._parse_fallback(file_path, rel_path)
49
+
50
+ if chunks:
51
+ all_chunks.extend(chunks)
52
+ print(f"[+] Extracted {len(chunks)} chunks from: {rel_path}")
53
+
54
+ if not all_chunks:
55
+ print("[!] Operation complete: No valid text blocks extracted for vectorization.")
56
+ return
57
+
58
+ self._generate_embeddings(all_chunks)
59
+ self._save_manifest(base_path)
60
+
61
+ def _parse_txt(self, file_path, rel_path):
62
+ chunks = []
63
+ try:
64
+ with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
65
+ lines = f.readlines()
66
+
67
+ # Group rows sequentially into basic paragraph-sized contextual windows
68
+ buffer = []
69
+ buffer_chars = 0
70
+ start_line = 1
71
+
72
+ for idx, line in enumerate(lines, start=1):
73
+ clean_line = line.strip()
74
+ if not clean_line:
75
+ continue
76
+ buffer.append(clean_line)
77
+ buffer_chars += len(clean_line)
78
+
79
+ if buffer_chars >= 1000: # ~200-250 words architectural chunk threshold
80
+ text_content = " ".join(buffer)
81
+ chunk_id = f"txt_{rel_path.replace('/', '_')}_L{start_line}"
82
+ chunks.append(KnowledgeChunk(
83
+ chunk_id=chunk_id,
84
+ source_path=rel_path,
85
+ text=text_content,
86
+ metadata={"type": "plain", "start_line": start_line, "end_line": idx}
87
+ ))
88
+ buffer = []
89
+ buffer_chars = 0
90
+ start_line = idx + 1
91
+
92
+ if buffer: # Clean up remaining trailing lines
93
+ text_content = " ".join(buffer)
94
+ chunks.append(KnowledgeChunk(
95
+ chunk_id=f"txt_{rel_path.replace('/', '_')}_L{start_line}",
96
+ source_path=rel_path,
97
+ text=text_content,
98
+ metadata={"type": "plain", "start_line": start_line, "end_line": len(lines)}
99
+ ))
100
+ except Exception as e:
101
+ print(f"[!] Error processing text file {rel_path}: {str(e)}")
102
+ return chunks
103
+
104
+ def _parse_json(self, file_path, rel_path):
105
+ chunks = []
106
+ try:
107
+ with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
108
+ if file_path.endswith('.jsonl'):
109
+ records = [json.loads(line) for line in f if line.strip()]
110
+ else:
111
+ data = json.load(f)
112
+ records = data if isinstance(data, list) else [data]
113
+
114
+ for idx, record in enumerate(records):
115
+ # Isolate string values matching natural language heuristic properties
116
+ for key, val in record.items():
117
+ if isinstance(val, str) and len(val) >= 20:
118
+ chunk_id = f"json_{rel_path.replace('/', '_')}_R{idx}_{key}"
119
+ chunks.append(KnowledgeChunk(
120
+ chunk_id=chunk_id,
121
+ source_path=rel_path,
122
+ text=val,
123
+ metadata={"type": "json", "record_index": idx, "key_path": key}
124
+ ))
125
+ except Exception as e:
126
+ print(f"[!] Error processing structured JSON {rel_path}: {str(e)}")
127
+ return self._parse_fallback(file_path, rel_path)
128
+ return chunks
129
+
130
+ def _parse_csv(self, file_path, rel_path):
131
+ chunks = []
132
+ try:
133
+ delimiter = '\t' if file_path.endswith('.tsv') else ','
134
+ with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
135
+ reader = csv.DictReader(f, delimiter=delimiter)
136
+ for idx, row in enumerate(reader, start=1):
137
+ # Combine dense text cells while keeping numeric structural attributes tied as metadata
138
+ text_parts = []
139
+ meta_fields = {}
140
+ for k, v in row.items():
141
+ if not k or not v:
142
+ continue
143
+ # Heuristic check for natural language strings vs identifiers/counters
144
+ if len(v) > 30 or any(x in k.lower() for x in ['desc', 'note', 'text', 'body', 'message', 'data']):
145
+ text_parts.append(f"{k}: {v}")
146
+ else:
147
+ meta_fields[k] = v
148
+
149
+ if text_parts:
150
+ combined_text = " | ".join(text_parts)
151
+ chunk_id = f"csv_{rel_path.replace('/', '_')}_R{idx}"
152
+ meta_fields.update({"type": "csv", "row_index": idx})
153
+ chunks.append(KnowledgeChunk(
154
+ chunk_id=chunk_id,
155
+ source_path=rel_path,
156
+ text=combined_text,
157
+ metadata=meta_fields
158
+ ))
159
+ except Exception as e:
160
+ print(f"[!] Error processing spreadsheet matrix {rel_path}: {str(e)}")
161
+ return self._parse_fallback(file_path, rel_path)
162
+ return chunks
163
+
164
+ def _parse_fallback(self, file_path, rel_path):
165
+ """Emergency safe-mode logic path to pull text securely from unidentified binary fragments."""
166
+ try:
167
+ with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
168
+ content = f.read(10000).strip() # Extract first 10k characters safely
169
+ if len(content) > 50:
170
+ return [KnowledgeChunk(
171
+ chunk_id=f"fallback_{rel_path.replace('/', '_')}",
172
+ source_path=rel_path,
173
+ text=content,
174
+ metadata={"type": "fallback_stream"}
175
+ )]
176
+ except Exception:
177
+ pass
178
+ return []
179
+
180
+ def _generate_embeddings(self, chunks):
181
+ """Batches and vectorizes parsed objects with the active Transformer context matrix."""
182
+ import torch
183
+ print(f"[*] Encoding {len(chunks)} text chunks into unified coordinate vector space...")
184
+
185
+ texts = [c.text for c in chunks]
186
+
187
+ # Execution execution batch slice for low-overhead ARM64 memory profiles
188
+ batch_size = 32
189
+ embeddings_list = []
190
+
191
+ for i in range(0, len(texts), batch_size):
192
+ batch_texts = texts[i:i+batch_size]
193
+ # Convert text streams into normalized vector spaces
194
+ batch_embeds = self.embedder.encode(batch_texts, convert_to_tensor=True, show_progress_bar=False)
195
+ embeddings_list.append(batch_embeds.cpu())
196
+
197
+ self.embeddings_cache = torch.cat(embeddings_list, dim=0)
198
+ self.chunks_manifest = [asdict(c) for c in chunks]
199
+ print(f"[+] Multi-dimensional vector calculation sequence resolved: {self.embeddings_cache.shape}")
200
+
201
+ def _save_manifest(self, base_path):
202
+ """Serializes the engine metadata matrix and tensor index to local scratch storage."""
203
+ import torch
204
+ manifest_path = os.path.join(base_path, "chunks_manifest.json")
205
+ vectors_path = os.path.join(base_path, "vectors_cache.pt")
206
+
207
+ with open(manifest_path, 'w', encoding='utf-8') as f:
208
+ json.dump(self.chunks_manifest, f, indent=4)
209
+
210
+ if self.embeddings_cache is not None:
211
+ torch.save(self.embeddings_cache, vectors_path)
212
+
213
+ print(f"[+] Storage sync finalized. Manifest recorded at: {manifest_path}")
214
+ print(f"[+] Vector tensor cache secured at: {vectors_path}")
215
+
216
+ if __name__ == "__main__":
217
+ # Internal execution harness verification loop
218
+ engine = MemoryEngine()
219
+ engine.ingest_knowledge('storage/knowledge')