bhoomika19 commited on
Commit
2e345cc
·
verified ·
1 Parent(s): 1fab090

Upload json_context_extractor.py

Browse files
src/preprocessor/json_context_extractor.py ADDED
@@ -0,0 +1,1209 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import re
3
+ import os
4
+ import sys
5
+ from typing import Dict, List, Any
6
+ from collections import defaultdict
7
+ from fuzzywuzzy import fuzz # For fuzzy deduplication; pip install fuzzywuzzy python-Levenshtein
8
+ import anthropic
9
+ from dotenv import load_dotenv
10
+
11
+ # Add the project root to the path
12
+ sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
13
+ from database.supabase_manager import supabase_manager
14
+
15
+ # Load environment variables
16
+ load_dotenv()
17
+
18
+ class JSONContextExtractor:
19
+ def __init__(self, chunk_size: int = 100, overlap_size: int = 20):
20
+ self.chunk_size = chunk_size
21
+ self.overlap_size = overlap_size
22
+
23
+ # Initialize Anthropic client
24
+ self.anthropic_client = anthropic.Anthropic(
25
+ api_key=os.getenv("ANTHROPIC_API_KEY")
26
+ )
27
+
28
+ # Set default model name
29
+ self.model_name = "claude-3-5-sonnet-20240620"
30
+
31
+ # Test Supabase connection
32
+ if not supabase_manager.is_connected():
33
+ print("⚠️ Supabase connection failed. Operations will be logged only.")
34
+ self.db_enabled = False
35
+ else:
36
+ print("✅ Supabase connection successful.")
37
+ self.db_enabled = True
38
+
39
+ def chunk_messages(self, messages: List[Dict[str, str]]) -> List[List[Dict[str, str]]]:
40
+ """Split messages into overlapping chunks for LLM processing"""
41
+ if not messages:
42
+ return []
43
+
44
+ chunks = []
45
+ for i in range(0, len(messages), self.chunk_size - self.overlap_size):
46
+ chunk = messages[i:i + self.chunk_size]
47
+ if chunk:
48
+ chunks.append(chunk)
49
+
50
+ # Stop if we've reached the end
51
+ if i + self.chunk_size >= len(messages):
52
+ break
53
+
54
+ return chunks
55
+
56
+ def deduplicate_messages(self, messages: List[Dict]) -> List[Dict]:
57
+ """Remove duplicate messages within a context using fuzzy matching"""
58
+ seen = set()
59
+ deduped = []
60
+ for msg in messages:
61
+ text = msg.get("message", "").strip().lower()
62
+ if text:
63
+ is_duplicate = any(fuzz.ratio(text, s) > 95 for s in seen)
64
+ if not is_duplicate:
65
+ seen.add(text)
66
+ deduped.append(msg)
67
+ return deduped
68
+
69
+ def prepare_comprehensive_llm_prompt(self, chunk: List[Dict[str, str]], user_id: str) -> str:
70
+ """Prepare prompt for comprehensive LLM extraction from message chunk with ownership validation"""
71
+ deduped_chunk = self.deduplicate_messages(chunk)
72
+ context_text = "\n".join([
73
+ f"[{msg.get('id', i)}] {msg.get('sender', 'Unknown')}: {msg.get('message', '')}"
74
+ for i, msg in enumerate(deduped_chunk)
75
+ ])
76
+
77
+ prompt = f"""
78
+ You are an expert information extractor for a personal assistant memory system. Extract ONLY personal information that specifically belongs to or is claimed by the target user "{user_id}".
79
+
80
+ TARGET USER: {user_id}
81
+
82
+ CONVERSATION CHUNK:
83
+ {context_text}
84
+
85
+ CRITICAL OWNERSHIP VALIDATION RULES:
86
+ 1. ONLY extract information that is specifically ABOUT {user_id} or claimed BY {user_id}
87
+ 2. REJECT information about other people, temporary locations, or general mentions
88
+ 3. REQUIRE clear ownership indicators like "my", "I am", "I live", "my name is", etc.
89
+ 4. REJECT casual mentions that don't indicate personal ownership
90
+ 5. If unclear who the information belongs to, DO NOT extract it
91
+
92
+ VALID OWNERSHIP PATTERNS:
93
+ ✅ "{user_id} says: My address is..."
94
+ ✅ "I live at..." (when {user_id} is speaking)
95
+ ✅ "My phone number is..." (when {user_id} is speaking)
96
+ ✅ "I was born in..." (when {user_id} is speaking)
97
+ ✅ "My wife/husband is..." (when {user_id} is speaking)
98
+
99
+ INVALID PATTERNS TO REJECT:
100
+ ❌ "Let's meet at Mumbai" (temporary location, not personal address)
101
+ ❌ "The office is in Bandra" (not personal address)
102
+ ❌ "She lives in..." (someone else's information)
103
+ ❌ Any location mentioned without clear personal ownership
104
+ ❌ Business addresses, meeting locations, casual place mentions
105
+
106
+ MEMORY LAYERS TO EXTRACT (ONLY IF CLEARLY OWNED BY {user_id}):
107
+
108
+ LAYER 1 - Basic Personal Information (HIGHEST PRIORITY):
109
+ - Full names, ages, date of birth, nationality, gender, blood group (ONLY if about {user_id})
110
+ - Phone numbers, email addresses (ONLY if stated as THEIR contact info)
111
+ - Home addresses (ONLY if clearly stated as THEIR home/personal address)
112
+ - Relationship status, occupation, company details (ONLY if {user_id}'s info)
113
+ - Work location, car model, Work Address comes under LAYER 1
114
+
115
+ LAYER 2 - Document Information (HIGH PRIORITY):
116
+ - DONT EXTRACT INFORMATION IF IT IS NOT CLEARLY DESCRIBED. For example, user mentions about the aadhar card, but doesn't have any aadhar card ID/details, Then DONT EXTRACT
117
+ - Government IDs, certificates (ONLY if {user_id}'s documents)
118
+ - Document numbers, policies, licenses (ONLY if belonging to {user_id})
119
+ - Credit cards, bank details (ONLY if {user_id}'s financial info)
120
+
121
+ LAYER 3 - Relationships & Contacts (MEDIUM PRIORITY):
122
+ - Family members, friends, colleagues (ONLY if {user_id}'s relationships AND names are mentioned)
123
+ - CONTACT DETAILS OF FAMILY/FRIENDS (ONLY if {user_id} is sharing about THEIR contacts)
124
+ - IMPORTANT: Only extract relationship if SPECIFIC NAMES are mentioned
125
+ - REJECT: Generic "my wife", "my husband" without names
126
+ - ACCEPT: "my wife Sarah", "my husband John"
127
+
128
+ LAYER 4 - Preferences & Instructions (LOWER PRIORITY):
129
+ - Food preferences, allergies (ONLY if {user_id}'s preferences)
130
+ - Favorite places, vendors (ONLY if {user_id}'s preferences)
131
+ - Habits, routines (ONLY if {user_id}'s personal habits)
132
+ - DO NOT put travel plans here unless they are long-term preferences
133
+ - DO NOT CONSIDER EVERYTHING AS A PREFERENCE, UNTIL {user_id} SAYS OR LIKES IT.
134
+
135
+ STRICT LAYER 3 RELATIONSHIP RULES:
136
+ - ONLY extract relationships if SPECIFIC NAMES are mentioned.
137
+ ✅ "My wife Sarah" → Extract: relationship="spouse", name="Sarah"
138
+ ✅ "My husband John works at..." → Extract: relationship="spouse", name="John"
139
+ ✅ "My brother Mike lives in..." → Extract: relationship="brother", name="Mike"
140
+ ❌ "My wife will come" → REJECT (no name mentioned)
141
+ ❌ "Thanks guys" → REJECT (not a relationship statement)
142
+ ❌ "I want to carry something for my 9 year old nephew" → REJECT (no name mentioned)
143
+
144
+
145
+ OWNERSHIP VALIDATION EXAMPLES:
146
+ ✅ "My address is 402, Pinnacle Gold, Bandra" → Extract as {user_id}'s address
147
+ ❌ "Let's meet at Pinnacle Gold, Bandra" → REJECT (not personal address)
148
+ ✅ "I live in Mumbai" → Extract as {user_id}'s address
149
+ ❌ "Mumbai has good restaurants" → REJECT (general comment, not personal info)
150
+ ✅ "My wife Sarah" → Extract as {user_id}'s spouse with name "Sarah"
151
+ ❌ "My wife will come" → REJECT (no specific name mentioned)
152
+ ❌ "Priya is coming" → REJECT (unclear relationship/ownership)
153
+
154
+ CRITICAL EXTRACTION RULES:
155
+ 1. NEVER extract relationship information without specific names.
156
+ 2. NEVER extract information from casual conversation phrases
157
+ 3. ALWAYS ensure the information directly belongs to {user_id}
158
+ 4. FOCUS ON Layer 1, 2, 3 - avoid putting travel plans in Layer 1
159
+ 5. For relationships: ONLY extract if both relationship type AND name are clear
160
+ 6. IF THE USER MENTIONS THE CONTACT OR DETAILS OF A FAMILY MEMBER/FRIEND, ONLY EXTRACT IF IT IS CLEAR THAT THE DETAILS BELONG TO {user_id}(For e.g., "My wife's number is..." then save it as spouse phone number)
161
+
162
+ CONFIDENCE SCORING BASED ON OWNERSHIP:
163
+ - 0.9-1.0: Direct personal claims with clear ownership ("My address is...")
164
+ - 0.8-0.9: Clear attribution to {user_id} ("I live at...")
165
+ - Below 0.8: REJECT (unclear ownership)
166
+
167
+ RESPONSE FORMAT (JSON only):
168
+ {{
169
+ "Layer1": [
170
+ {{
171
+ "detail": {{"type": "field_type", "value": "extracted_value"}},
172
+ "confidence": 0.95,
173
+ "evidence": [
174
+ {{"message_id": "id", "message_snippet": "relevant_part"}}
175
+ ],
176
+ "timestamp": "2025-09-13 00:00:00",
177
+ "ownership_reason": "Clear personal claim by {user_id}"
178
+ }}
179
+ ],
180
+ "Layer2": [...],
181
+ "Layer3": [...],
182
+ "Layer4": [...]
183
+ }}
184
+
185
+ FIELD VALUE SPECIFICATIONS:
186
+ - relationship_status: Use "married" or "single". If mentions "wife"/"husband" use "married"
187
+ - gender: Use "Male" or "Female"
188
+ - address: ONLY extract if clearly stated as personal/home address, NOT business/meeting locations or anything other than personal.
189
+ - spouse/spouse_name: Use ONLY if specific name is mentioned (e.g., "My wife Sarah" → spouse_name: "Sarah")
190
+ - family_member: Use ONLY if specific name is mentioned (e.g., "My brother Mike" → family_member: "Mike")
191
+ - phone_number: Use exact format from message,
192
+ - email: Use exact email address as written.
193
+
194
+ PROHIBITED EXTRACTIONS:
195
+ ❌ Do NOT extract relationships (e.g., "wife", "husband", "nephew") as standalone values without names
196
+ ❌ Do NOT extract meeting locations as personal addresses
197
+ ❌ Do NOT extract third-party information as user's information
198
+
199
+ REMEMBER: When in doubt about ownership or clarity, DO NOT extract. It's better to miss information than to incorrectly attribute someone else's details to {user_id}.
200
+
201
+ Only return valid JSON. If no clearly owned information found for a layer, return empty array for that layer.
202
+ """
203
+ return prompt
204
+
205
+ def call_llm_for_extraction(self, prompt: str) -> Dict:
206
+ """Call Claude LLM for information extraction"""
207
+ try:
208
+ response = self.anthropic_client.messages.create(
209
+ model=self.model_name,
210
+ max_tokens=3000, # Increased for comprehensive extraction
211
+ temperature=0.1,
212
+ messages=[{"role": "user", "content": prompt}]
213
+ )
214
+
215
+ # Extract text content from response
216
+ response_text = response.content[0].text
217
+
218
+ # Try to parse JSON response
219
+ try:
220
+ raw_data = json.loads(response_text)
221
+ # Apply validation to filter out bad extractions
222
+ validated_data = self._validate_extractions(raw_data)
223
+ return validated_data
224
+ except json.JSONDecodeError:
225
+ print(f"Failed to parse LLM response as JSON: {response_text[:200]}...")
226
+ return {}
227
+
228
+ except Exception as e:
229
+ print(f"LLM API error: {e}")
230
+ return {}
231
+
232
+ def _validate_extractions(self, raw_data: Dict) -> Dict:
233
+ """Validate and filter out bad extractions"""
234
+ validated_data = {}
235
+
236
+ # Invalid values that should never be extracted
237
+ invalid_relationship_values = {
238
+ 'wife', 'husband', 'nephew', 'niece', 'son', 'daughter', 'brother', 'sister',
239
+ 'mother', 'father', 'aunt', 'uncle', 'cousin', 'friend', 'colleague'
240
+ }
241
+
242
+ # Invalid evidence snippets that indicate bad extraction
243
+ invalid_evidence_patterns = [
244
+ 'thanks guys', 'thank you', 'thanks', 'ok', 'okay', 'yes', 'no',
245
+ 'sure', 'great', 'perfect', 'sounds good', 'alright'
246
+ ]
247
+
248
+ for layer, nodes in raw_data.items():
249
+ validated_nodes = []
250
+
251
+ for node in nodes:
252
+ if not isinstance(node, dict) or 'detail' not in node:
253
+ continue
254
+
255
+ detail = node['detail']
256
+ fact_type = detail.get('type', '')
257
+ fact_value = detail.get('value', '').lower().strip()
258
+ evidence = node.get('evidence', [])
259
+
260
+ # Skip invalid extractions
261
+ skip_node = False
262
+
263
+ # Check for invalid relationship extractions
264
+ if fact_type in ['relationship', 'family_member', 'spouse'] and fact_value in invalid_relationship_values:
265
+ print(f"⚠️ Skipping invalid {fact_type}: '{fact_value}' (no specific name)")
266
+ skip_node = True
267
+
268
+ # Check for invalid evidence snippets
269
+ for ev in evidence:
270
+ snippet = ev.get('message_snippet', '').lower().strip()
271
+ if snippet in invalid_evidence_patterns:
272
+ print(f"⚠️ Skipping extraction from invalid evidence: '{snippet}'")
273
+ skip_node = True
274
+ break
275
+
276
+ # Check for travel plans in Layer 1
277
+ if layer == 'Layer1' and fact_type in ['travel_plan', 'flight_number', 'travel_date']:
278
+ print(f"⚠️ Moving {fact_type} from Layer1 to Layer4 (travel plans don't belong in basic info)")
279
+ # Move to Layer4 instead of rejecting
280
+ if 'Layer4' not in validated_data:
281
+ validated_data['Layer4'] = []
282
+ validated_data['Layer4'].append(node)
283
+ continue
284
+
285
+ # Check confidence threshold
286
+ confidence = node.get('confidence', 0)
287
+ if confidence < 0.75:
288
+ print(f"⚠️ Skipping low confidence ({confidence}) extraction: {fact_type} = {fact_value}")
289
+ skip_node = True
290
+
291
+ if not skip_node:
292
+ validated_nodes.append(node)
293
+
294
+ if validated_nodes:
295
+ validated_data[layer] = validated_nodes
296
+
297
+ return validated_data
298
+
299
+ async def store_in_db(self, extracted_nodes: Dict, user_id: str):
300
+ """Store extracted nodes in Supabase DB using SupabaseManager
301
+ Returns dict with newly_stored_nodes containing only facts that were actually added in this iteration"""
302
+ if not self.db_enabled:
303
+ print(f"\n=== EXTRACTED DATA FOR USER: {user_id} (DB DISABLED) ===")
304
+ for layer, nodes in extracted_nodes.items():
305
+ print(f"\n{layer}:")
306
+ for i, node in enumerate(nodes, 1):
307
+ print(f" {i}. Type: {node['detail']['type']}")
308
+ print(f" Value: {node['detail']['value']}")
309
+ print(f" Confidence: {node['confidence']}")
310
+ print(f" Evidence: {len(node['evidence'])} message(s)")
311
+ print("=" * 50)
312
+ # Return all nodes as "newly stored" when DB is disabled (for testing), but filter empty layers
313
+ filtered_extracted = {layer: nodes for layer, nodes in extracted_nodes.items() if nodes}
314
+ return {"newly_stored_nodes": filtered_extracted, "stats": {"stored": 0, "duplicate": 0, "low_confidence_rejected": 0}}
315
+
316
+ try:
317
+ stored_nodes = 0
318
+ duplicate_nodes = 0
319
+ low_confidence_rejected = 0
320
+ newly_stored_nodes = {} # Track only nodes that were actually stored in this iteration
321
+
322
+ # Minimum confidence threshold for ownership validation
323
+ CONFIDENCE_THRESHOLD = 0.75
324
+
325
+ for layer, nodes in extracted_nodes.items():
326
+ layer_number = int(layer.replace("Layer", ""))
327
+ newly_stored_nodes[layer] = [] # Initialize layer in newly stored nodes
328
+
329
+ for node in nodes:
330
+ detail = node["detail"]
331
+ confidence = node["confidence"]
332
+ evidence = node["evidence"]
333
+
334
+ # Apply confidence threshold - reject low confidence extractions
335
+ if confidence < CONFIDENCE_THRESHOLD:
336
+ low_confidence_rejected += 1
337
+ print(f" ⚠️ Rejected low confidence {detail['type']}: {detail['value']} (confidence: {confidence:.2f})")
338
+ continue
339
+
340
+ # Safely extract required fields
341
+ try:
342
+ fact_type = detail.get("type", "unknown")
343
+ fact_value = detail.get("value", "N/A")
344
+
345
+ if fact_value == "N/A":
346
+ print(f"⚠️ Warning: Missing 'value' field in detail: {detail}")
347
+
348
+ # Format concluded fact for human readability
349
+ concluded_fact = self._format_concluded_fact(user_id, fact_type, fact_value)
350
+
351
+ # Store memory node directly in Supabase
352
+ node_id = await supabase_manager.store_memory_node(
353
+ user_phone=user_id,
354
+ layer=layer_number,
355
+ fact_type=fact_type,
356
+ content=fact_value,
357
+ concluded_fact=concluded_fact,
358
+ confidence=confidence,
359
+ evidence=evidence
360
+ )
361
+
362
+ if node_id:
363
+ stored_nodes += 1
364
+ ownership_reason = node.get('ownership_reason', 'Ownership validated')
365
+ print(f" ✅ Stored {fact_type}: {fact_value} (confidence: {confidence:.2f}) - {ownership_reason}")
366
+ # Add to newly stored nodes for this iteration
367
+ newly_stored_nodes[layer].append(node)
368
+ else:
369
+ duplicate_nodes += 1
370
+ print(f" ⚠️ Failed to store {fact_type}: {fact_value}")
371
+
372
+ except Exception as detail_error:
373
+ print(f"❌ Error processing detail: {str(detail_error)}")
374
+ print(f"❌ Detail structure: {detail}")
375
+ continue # Skip this detail and continue with next one
376
+
377
+ # Print summary
378
+ print(f"\n📊 Database Summary for {user_id}:")
379
+ print(f" Stored: {stored_nodes} new nodes")
380
+ print(f" Failed: {duplicate_nodes} nodes")
381
+ print(f" Rejected (low confidence): {low_confidence_rejected} nodes")
382
+
383
+ # Get user summary from database
384
+ import asyncio
385
+ summary = await supabase_manager.get_user_summary(user_id)
386
+ if summary:
387
+ print(f" Total nodes in DB: {summary.get('total_nodes', 0)}")
388
+ print(f" By status: Approved={summary.get('approved_nodes', 0)}, Pending={summary.get('pending_nodes', 0)}, Rejected={summary.get('rejected_nodes', 0)}")
389
+ if 'layers' in summary:
390
+ print(f" By layers: {summary['layers']}")
391
+
392
+ # Return only the newly stored nodes from this iteration
393
+ # Filter out empty layers for cleaner UI display
394
+ filtered_newly_stored = {layer: nodes for layer, nodes in newly_stored_nodes.items() if nodes}
395
+
396
+ return {
397
+ "newly_stored_nodes": filtered_newly_stored,
398
+ "stats": {
399
+ "stored": stored_nodes,
400
+ "duplicate": duplicate_nodes,
401
+ "low_confidence_rejected": low_confidence_rejected
402
+ }
403
+ }
404
+
405
+ except Exception as e:
406
+ print(f"❌ Database error: {e}")
407
+ # Fallback to logging
408
+ print(f"\n=== EXTRACTED DATA FOR USER: {user_id} (DB ERROR FALLBACK) ===")
409
+ for layer, nodes in extracted_nodes.items():
410
+ print(f"\n{layer}:")
411
+ for i, node in enumerate(nodes, 1):
412
+ print(f" {i}. Type: {node['detail']['type']}")
413
+ print(f" Value: {node['detail']['value']}")
414
+ print(f" Confidence: {node['confidence']}")
415
+ print(f" Evidence: {len(node['evidence'])} message(s)")
416
+ print("=" * 50)
417
+ # Return all nodes as fallback when DB has errors, but filter empty layers
418
+ filtered_extracted = {layer: nodes for layer, nodes in extracted_nodes.items() if nodes}
419
+ return {"newly_stored_nodes": filtered_extracted, "stats": {"stored": 0, "duplicate": 0, "low_confidence_rejected": 0}}
420
+
421
+ def _format_concluded_fact(self, user_id: str, fact_type: str, value: str) -> str:
422
+ """Format a concluded fact in human-readable form"""
423
+
424
+ # Common fact type mappings with ownership emphasis
425
+ fact_formatters = {
426
+ 'phone': lambda u, v: f"📱 Phone number of {u} is {v}",
427
+ 'phone_number': lambda u, v: f"📱 Phone number of {u} is {v}",
428
+ 'email': lambda u, v: f"📧 Email address of {u} is {v}",
429
+ 'address': lambda u, v: f"🏠 Home address of {u} is {v}",
430
+ 'dob': lambda u, v: f"🎂 Date of birth of {u} is {v}",
431
+ 'name': lambda u, v: f"👤 Name of {u} is {v}",
432
+ 'age': lambda u, v: f"📅 Age of {u} is {v}",
433
+ 'occupation': lambda u, v: f"💼 Occupation of {u} is {v}",
434
+ 'company': lambda u, v: f"🏢 Company of {u} is {v}",
435
+ 'relationship_status': lambda u, v: f"💑 Relationship status of {u} is {v}",
436
+ 'gender': lambda u, v: f"⚧ Gender of {u} is {v}",
437
+ 'nationality': lambda u, v: f"🌍 Nationality of {u} is {v}",
438
+ 'document_type': lambda u, v: f"📄 {u} shared a document of type {v}",
439
+ 'aadhaar_number': lambda u, v: f"🆔 Aadhaar number of {u} is {v}",
440
+ 'pan_number': lambda u, v: f"📇 PAN number of {u} is {v}",
441
+ 'family_member': lambda u, v: f"👨‍👩‍👧‍👦 Family member of {u}: {v}",
442
+ 'friend': lambda u, v: f"👫 Friend of {u}: {v}",
443
+ 'colleague': lambda u, v: f"🤝 Colleague of {u}: {v}",
444
+ 'contact_name': lambda u, v: f"📞 Contact of {u}: {v}",
445
+ 'relationship_type': lambda u, v: f"💕 {v} of {u}",
446
+ 'food_preference': lambda u, v: f"🍽️ Food preference of {u}: {v}",
447
+ 'restaurant_preference': lambda u, v: f"🏪 Preferred restaurant of {u}: {v}",
448
+ 'allergy': lambda u, v: f"⚠️ {u} is allergic to {v}",
449
+ 'service_provider': lambda u, v: f"🔧 Service provider of {u}: {v}",
450
+ 'vendor_name': lambda u, v: f"🛒 Vendor used by {u}: {v}",
451
+ 'habit': lambda u, v: f"🔄 Personal habit of {u}: {v}",
452
+ 'routine': lambda u, v: f"📋 Routine of {u}: {v}",
453
+ }
454
+
455
+ # Use specific formatter or fallback to generic
456
+ formatter = fact_formatters.get(fact_type, lambda u, v: f"{fact_type.replace('_', ' ').title()} of {u} is {v}")
457
+ return formatter(user_id, value)
458
+
459
+ def merge_extracted_data(self, all_extracted: List[Dict]) -> Dict:
460
+ """Merge extracted data from multiple chunks, removing duplicates"""
461
+ merged = {"Layer1": [], "Layer2": [], "Layer3": [], "Layer4": []}
462
+
463
+ for extracted in all_extracted:
464
+ for layer in ["Layer1", "Layer2", "Layer3", "Layer4"]:
465
+ if layer in extracted:
466
+ merged[layer].extend(extracted[layer])
467
+
468
+ # Deduplicate based on fact type and value similarity
469
+ for layer in merged:
470
+ merged[layer] = self._deduplicate_facts(merged[layer])
471
+
472
+ return merged
473
+
474
+ def _deduplicate_facts(self, facts: List[Dict]) -> List[Dict]:
475
+ """Remove duplicate facts based on type and value similarity"""
476
+ if not facts:
477
+ return facts
478
+
479
+ deduped = []
480
+ for fact in facts:
481
+ is_duplicate = False
482
+ fact_type = fact["detail"]["type"]
483
+ fact_value = fact["detail"]["value"].lower().strip()
484
+
485
+ for existing in deduped:
486
+ existing_type = existing["detail"]["type"]
487
+ existing_value = existing["detail"]["value"].lower().strip()
488
+
489
+ # Same type and high similarity
490
+ if (fact_type == existing_type and
491
+ fuzz.ratio(fact_value, existing_value) > 85):
492
+ is_duplicate = True
493
+ # Keep the one with higher confidence
494
+ if fact["confidence"] > existing["confidence"]:
495
+ deduped.remove(existing)
496
+ deduped.append(fact)
497
+ break
498
+
499
+ if not is_duplicate:
500
+ deduped.append(fact)
501
+
502
+ return deduped
503
+
504
+ def call_llm_for_deduplication(self, prompt: str) -> Dict:
505
+ """Call LLM specifically for deduplication tasks"""
506
+ try:
507
+ response = self.anthropic_client.messages.create(
508
+ model=self.model_name,
509
+ max_tokens=4000,
510
+ temperature=0.1, # Low temperature for consistency
511
+ messages=[
512
+ {"role": "user", "content": prompt}
513
+ ],
514
+ system="""You are a memory system deduplication and validation expert. Your job is to return a valid JSON with deduplicated facts AND validate that all facts make sense.
515
+
516
+ DEDUPLICATION RULES:
517
+ - REMOVE ANY DUPLICATE VALUES THAT ARE PRESENT, OR ANYTHING THAT LOOKS REPEATED.
518
+ - IF THERE ARE 2 SAME KEYS, WITH DIFFERENT VALUES, MAKE SURE YOU ONLY ALLOW ONE INSTANCE WITH THE HIGHEST CONFIDENCE SCORE.
519
+ For example, if the same phone number/address is present multiple times, keep only one instance with the highest confidence.
520
+
521
+ VALIDATION & SENSE-CHECKING RULES:
522
+ - REMOVE any facts that don't make logical sense (e.g., "age": "blue", "phone": "happy", "name": "12345")
523
+ - REMOVE any facts with impossible values (e.g., age over 150, phone numbers with letters, invalid email formats)
524
+ - REMOVE any facts that are clearly nonsensical or corrupted data
525
+ - REMOVE any facts that contain only special characters, random symbols, or gibberish
526
+ - REMOVE any facts where the value doesn't match the fact type (e.g., an address in a phone number field)
527
+ - REMOVE any facts with extremely low confidence scores (below 0.3) that also seem questionable
528
+ - KEEP facts that make sense even if they seem unusual (e.g., uncommon names, foreign addresses)
529
+
530
+ FORMATTING RULES:
531
+ 1. Return ONLY the JSON data with NO explanatory text, NO markdown formatting, and NO code blocks
532
+ 2. Your entire response must be parseable as JSON
533
+ 3. Maintain the exact same structure as the input JSON
534
+ 4. CRITICAL: Preserve ALL evidence snippets exactly as given in the input. DO NOT change snippets to "No snippet available" or any placeholder text.
535
+ 5. Never add new facts or modify existing valid data beyond merging duplicates and removing nonsensical entries
536
+ 6. If duplicates have different confidence scores, keep the one with higher confidence
537
+ 7. If duplicates have the same confidence, merge their evidence lists completely with no loss of information"""
538
+ )
539
+
540
+ # Extract the response content as text
541
+ if not response or not response.content:
542
+ print("⚠️ Empty response from LLM")
543
+ return {}
544
+
545
+ content = response.content[0].text
546
+ print(f"Raw LLM response (first 100 chars): {content[:100]}...")
547
+
548
+ # Try multiple methods to extract JSON
549
+
550
+ # Method 1: Try to find JSON in code blocks
551
+ json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', content)
552
+ if json_match:
553
+ json_str = json_match.group(1)
554
+ print("Found JSON in code block")
555
+ else:
556
+ # Method 2: Find first { and last }
557
+ first_brace = content.find('{')
558
+ last_brace = content.rfind('}')
559
+
560
+ if first_brace != -1 and last_brace != -1 and first_brace < last_brace:
561
+ json_str = content[first_brace:last_brace+1]
562
+ print(f"Extracted JSON using brace matching: chars {first_brace} to {last_brace}")
563
+ else:
564
+ # Method 3: Use the whole response
565
+ json_str = content
566
+ print("Using full response content for JSON parsing")
567
+
568
+ # Clean up the JSON string
569
+ json_str = re.sub(r'(?m)^//.*$', '', json_str) # Remove comment lines
570
+ json_str = json_str.strip()
571
+
572
+ try:
573
+ print(f"Attempting to parse JSON (length: {len(json_str)})")
574
+ result = json.loads(json_str)
575
+ print("✅ Successfully parsed JSON")
576
+ return result
577
+ except json.JSONDecodeError as e:
578
+ print(f"⚠️ Failed to decode JSON from LLM response: {e}")
579
+ print(f"JSON string start: {json_str[:200]}...")
580
+ print(f"JSON string end: ...{json_str[-200:]}")
581
+
582
+ # Fallback to raw extraction
583
+ try:
584
+ # Attempt to rescue the response using regex
585
+ layer_pattern = r'"(Layer\d+)":\s*\[([\s\S]*?)\]'
586
+ layers = re.findall(layer_pattern, json_str)
587
+
588
+ if layers:
589
+ print("🛠️ Attempting fallback regex extraction")
590
+ result = {}
591
+ for layer_name, layer_content in layers:
592
+ result[layer_name] = []
593
+ return result
594
+ except Exception as regex_error:
595
+ print(f"Regex fallback failed: {regex_error}")
596
+
597
+ return {}
598
+
599
+ except Exception as e:
600
+ print(f"⚠️ Error calling LLM for deduplication: {e}")
601
+ return {}
602
+
603
+ async def deduplicate_with_llm(self, extracted_data: Dict, user_id: str) -> Dict:
604
+ """Use LLM to deduplicate facts and merge evidence when appropriate
605
+
606
+ This takes the raw extraction results and has the LLM analyze them to:
607
+ 1. Remove exact duplicates, keeping the highest confidence version
608
+ 2. Merge duplicates with same confidence by combining their evidence
609
+ 3. Maintain the 4-layer memory structure
610
+ """
611
+ if not extracted_data:
612
+ print(f"No data to deduplicate for user {user_id}")
613
+ return {}
614
+
615
+ # Convert extracted data to a format suitable for LLM processing
616
+ facts_by_layer = {}
617
+ for layer, nodes in extracted_data.items():
618
+ facts_by_layer[layer] = []
619
+ for node in nodes:
620
+ # Create a simplified representation for LLM
621
+ evidence_snippets = []
622
+ for ev in node['evidence']:
623
+ evidence_snippets.append({
624
+ 'message_id': ev.get('message_id', 'unknown'),
625
+ 'snippet': ev.get('message_snippet', ev.get('snippet', '')) # Handle both formats
626
+ })
627
+
628
+ fact = {
629
+ 'type': node['detail']['type'],
630
+ 'value': node['detail']['value'],
631
+ 'confidence': node['confidence'],
632
+ 'evidence': evidence_snippets
633
+ }
634
+ facts_by_layer[layer].append(fact)
635
+
636
+ # Create prompt for LLM deduplication and validation
637
+ prompt = f"""
638
+ Review, validate, and deduplicate the following memory facts for user {user_id}.
639
+
640
+ DEDUPLICATION INSTRUCTIONS:
641
+ 1. PHONE NUMBER DEDUPLICATION: If there are multiple phone numbers for the same person, keep ONLY the one with the highest confidence score. Different formats of the same number (+91 prefix vs without) should be considered duplicates.
642
+ 2. ADDRESS DEDUPLICATION: If there are multiple addresses for the same person, keep ONLY the one with the highest confidence score. Similar addresses should be considered duplicates.
643
+ 3. EMAIL DEDUPLICATION: If there are multiple emails, keep all distinct email addresses as people often have multiple emails.
644
+ 4. RELATIONSHIP DEDUPLICATION: Remove any relationship entries that are just generic terms like "wife", "husband", "nephew" without specific names. Only keep relationships with actual names.
645
+ 5. GENERAL RULE: For all other fact types, remove duplicates keeping only the highest confidence version for each unique fact.
646
+
647
+ VALIDATION & SENSE-CHECKING INSTRUCTIONS:
648
+ 6. DATA TYPE VALIDATION: Remove facts where the value doesn't match the expected data type:
649
+ - Names should be text, not numbers or symbols
650
+ - Phone numbers should contain only digits, spaces, +, -, (, )
651
+ - Ages should be reasonable numbers (0-150)
652
+ - Emails should have valid email format
653
+ - Addresses should be coherent location text
654
+ 7. LOGICAL SENSE CHECK: Remove facts that are clearly nonsensical:
655
+ - Random character combinations (e.g., "asdfgh", "123!@#")
656
+ - Values that are clearly system errors or corrupted data
657
+ - Facts with impossible combinations (e.g., age: "red", name: "999999")
658
+ 8. CONFIDENCE FILTERING: Remove facts with very low confidence (<0.3) that also seem questionable or corrupted
659
+ 9. PRESERVE UNUSUAL BUT VALID: Keep facts that seem unusual but are logically valid (foreign names, uncommon addresses, etc.)
660
+
661
+ STRUCTURAL RULES:
662
+ 10. If duplicates have identical confidence scores, merge their evidence lists into one comprehensive fact.
663
+ 11. Maintain the 4-layer structure (Layer1: Basic Personal Info, Layer2: Documents, Layer3: Relationships, Layer4: Preferences).
664
+ 12. Do not add, remove, or modify any actual data values that are valid - simply reorganize, deduplicate, and remove nonsensical entries.
665
+ 13. CRITICAL: Preserve ALL evidence snippets exactly as given in the input. DO NOT replace snippets with "No snippet available" or any placeholder text.
666
+ 14. Return ONLY a valid JSON with no additional text or explanation.
667
+
668
+ IMPORTANT: Your response must be ONLY the cleaned and deduplicated JSON data with nothing else. Do not include any explanations, markdown formatting, or text outside the JSON. Return the JSON directly with all evidence snippets preserved exactly as provided.
669
+
670
+ When processing evidence, maintain the exact field names as provided in input (message_id and either 'snippet' or 'message_snippet').
671
+
672
+ RELATIONSHIP FILTERING RULES:
673
+ - REMOVE: "relationship": "wife" (no name specified)
674
+ - REMOVE: "family_member": "nephew" (no name specified)
675
+ - KEEP: "spouse": "Sarah" (specific name provided)
676
+ - KEEP: "family_member": "Mike Johnson" (specific name provided)
677
+
678
+ PHONE NUMBER DEDUPLICATION EXAMPLE:
679
+ These are 3 different phone numbers for the same person. Keep ONLY the highest confidence one: "9870781578" (confidence: 0.95)
680
+
681
+ Input facts: {json.dumps(facts_by_layer, indent=2)}
682
+ """
683
+
684
+ try:
685
+ # Call LLM for deduplication
686
+ deduplicated_data = self.call_llm_for_deduplication(prompt)
687
+
688
+ # Verify we got a valid response
689
+ if not deduplicated_data:
690
+ print("⚠️ No valid data from LLM deduplication. Applying fallback deduplication.")
691
+ return self._apply_fallback_deduplication(extracted_data, user_id)
692
+
693
+ # Verify that we have all the expected layers
694
+ missing_layers = set(extracted_data.keys()) - set(deduplicated_data.keys())
695
+ if missing_layers:
696
+ print(f"⚠️ LLM response is missing layers: {missing_layers}. Applying fallback deduplication.")
697
+ return self._apply_fallback_deduplication(extracted_data, user_id)
698
+
699
+ print(f"\n🔍 LLM deduplication complete for {user_id}")
700
+
701
+ # Count before/after
702
+ before_count = sum(len(nodes) for nodes in extracted_data.values())
703
+ after_count = sum(len(nodes) for nodes in deduplicated_data.values())
704
+
705
+ # Safety check - if LLM removed all facts, something went wrong
706
+ if after_count == 0 and before_count > 0:
707
+ print("⚠️ LLM deduplication removed ALL facts! Applying fallback deduplication.")
708
+ return self._apply_fallback_deduplication(extracted_data, user_id)
709
+
710
+ print(f" Facts before: {before_count}, after: {after_count}, removed: {before_count - after_count}")
711
+
712
+ # Convert back to the original format structure
713
+ result = {}
714
+ for layer, facts in deduplicated_data.items():
715
+ result[layer] = []
716
+ for fact in facts:
717
+ # Handle potential missing keys with defaults
718
+ if 'type' not in fact or 'value' not in fact:
719
+ print(f"⚠️ Skipping malformed fact (missing type/value): {fact}")
720
+ continue
721
+
722
+ # Reconstruct evidence in the expected format - strictly preserve original snippets
723
+ evidence = []
724
+ if 'evidence' in fact and isinstance(fact['evidence'], list):
725
+ for ev in fact['evidence']:
726
+ message_id = ev.get('message_id', 'unknown')
727
+ # Handle both 'snippet' and 'message_snippet' from LLM responses
728
+ snippet = ev.get('snippet', ev.get('message_snippet', ''))
729
+
730
+ # Ensure snippet is preserved exactly as in the original
731
+ if not snippet or snippet == "No snippet available":
732
+ print(f"⚠️ Warning: Missing or placeholder snippet for message {message_id}")
733
+
734
+ evidence.append({
735
+ 'message_id': message_id,
736
+ 'snippet': snippet
737
+ })
738
+ else:
739
+ print(f"⚠️ Warning: Missing evidence list in fact: {fact['type']}: {fact['value']}")
740
+ # Create minimal evidence to avoid UI errors
741
+ evidence = [{'message_id': 'unknown', 'snippet': 'Evidence missing during deduplication'}]
742
+
743
+ # Reconstruct node in the expected format
744
+ node = {
745
+ 'detail': {
746
+ 'type': fact['type'],
747
+ 'value': fact['value']
748
+ },
749
+ 'confidence': fact.get('confidence', 0.8), # Default if missing
750
+ 'evidence': evidence
751
+ }
752
+ result[layer].append(node)
753
+
754
+ return result
755
+ except Exception as e:
756
+ print(f"⚠️ Error during LLM deduplication: {e}")
757
+ print("Applying fallback deduplication to ensure duplicates are removed")
758
+ return self._apply_fallback_deduplication(extracted_data, user_id)
759
+
760
+ def _apply_fallback_deduplication(self, extracted_data: Dict, user_id: str) -> Dict:
761
+ """Apply rule-based deduplication when LLM deduplication fails
762
+
763
+ This ensures that deduplication ALWAYS happens, even if LLM fails.
764
+ """
765
+ print(f"\n🔧 Applying fallback deduplication for {user_id}")
766
+
767
+ result = {}
768
+ total_removed = 0
769
+
770
+ for layer, nodes in extracted_data.items():
771
+ if not nodes:
772
+ result[layer] = []
773
+ continue
774
+
775
+ # Apply rule-based deduplication with special handling for phone numbers
776
+ deduplicated_nodes = self._deduplicate_facts_enhanced(nodes, layer)
777
+ result[layer] = deduplicated_nodes
778
+
779
+ removed_count = len(nodes) - len(deduplicated_nodes)
780
+ total_removed += removed_count
781
+
782
+ if removed_count > 0:
783
+ print(f" {layer}: {len(nodes)} → {len(deduplicated_nodes)} facts (removed {removed_count} duplicates)")
784
+
785
+ before_count = sum(len(nodes) for nodes in extracted_data.values())
786
+ after_count = sum(len(nodes) for nodes in result.values())
787
+
788
+ print(f" Facts before: {before_count}, after: {after_count}, removed: {total_removed}")
789
+ print(f"✅ Fallback deduplication complete for {user_id}")
790
+
791
+ return result
792
+
793
+ def _deduplicate_facts_enhanced(self, facts: List[Dict], layer: str) -> List[Dict]:
794
+ """Enhanced rule-based deduplication with special handling for different fact types"""
795
+ if not facts:
796
+ return []
797
+
798
+ deduped = []
799
+
800
+ # Group facts by type for specialized deduplication
801
+ facts_by_type = {}
802
+ for fact in facts:
803
+ fact_type = fact['detail']['type']
804
+ if fact_type not in facts_by_type:
805
+ facts_by_type[fact_type] = []
806
+ facts_by_type[fact_type].append(fact)
807
+
808
+ # Apply type-specific deduplication rules
809
+ for fact_type, type_facts in facts_by_type.items():
810
+ if fact_type == 'phone_number':
811
+ # For phone numbers: keep only the highest confidence one
812
+ deduplicated = self._deduplicate_phone_numbers(type_facts)
813
+ elif fact_type in ['address', 'email']:
814
+ # For addresses and emails: more sophisticated deduplication
815
+ deduplicated = self._deduplicate_contact_info(type_facts, fact_type)
816
+ else:
817
+ # For other types: use standard fuzzy matching
818
+ deduplicated = self._deduplicate_facts(type_facts)
819
+
820
+ deduped.extend(deduplicated)
821
+
822
+ return deduped
823
+
824
+ def _deduplicate_phone_numbers(self, phone_facts: List[Dict]) -> List[Dict]:
825
+ """Keep only the highest confidence phone number"""
826
+ if not phone_facts:
827
+ return []
828
+
829
+ # Sort by confidence (highest first)
830
+ sorted_phones = sorted(phone_facts, key=lambda x: x['confidence'], reverse=True)
831
+
832
+ # Keep only the highest confidence phone number
833
+ best_phone = sorted_phones[0]
834
+
835
+ print(f" Phone deduplication: {len(phone_facts)} → 1 (kept highest confidence: {best_phone['detail']['value']})")
836
+
837
+ return [best_phone]
838
+
839
+ def _deduplicate_contact_info(self, contact_facts: List[Dict], fact_type: str) -> List[Dict]:
840
+ """Deduplicate addresses/emails with fuzzy matching"""
841
+ if not contact_facts:
842
+ return []
843
+
844
+ deduped = []
845
+
846
+ for fact in contact_facts:
847
+ fact_value = fact['detail']['value'].lower().strip()
848
+ is_duplicate = False
849
+
850
+ for existing in deduped:
851
+ existing_value = existing['detail']['value'].lower().strip()
852
+
853
+ # For emails: exact match after normalization
854
+ if fact_type == 'email':
855
+ if fact_value == existing_value:
856
+ is_duplicate = True
857
+ break
858
+
859
+ # For addresses: fuzzy matching
860
+ elif fact_type == 'address':
861
+ if (fact_value == existing_value or
862
+ fuzz.ratio(fact_value, existing_value) > 85):
863
+ is_duplicate = True
864
+ # Keep the one with higher confidence
865
+ if fact["confidence"] > existing["confidence"]:
866
+ deduped.remove(existing)
867
+ deduped.append(fact)
868
+ break
869
+
870
+ if not is_duplicate:
871
+ deduped.append(fact)
872
+
873
+ if len(contact_facts) != len(deduped):
874
+ print(f" {fact_type} deduplication: {len(contact_facts)} → {len(deduped)}")
875
+
876
+ return deduped
877
+
878
+ async def process_json(self, input_json: List[Dict], user_id: str, force_reprocess: bool = False) -> Dict:
879
+ """Process the JSON: chunk messages, call LLM for each chunk, merge and store results"""
880
+
881
+ # Check if file has already been processed (unless force reprocess)
882
+ if not force_reprocess and self.db_enabled:
883
+ already_processed = await supabase_manager.is_file_processed(user_id)
884
+ if already_processed:
885
+ print(f"⏭️ {user_id} already processed. Skipping. Use force_reprocess=True to reprocess.")
886
+ return {"message": "Already processed", "skipped": True}
887
+
888
+ # Flatten messages, preserving order and adding metadata
889
+ all_messages = []
890
+ for conv_idx, conv in enumerate(input_json):
891
+ for query in conv.get("user_queries", []):
892
+ query = query.copy()
893
+ query["sender"] = "User"
894
+ query["conversation_id"] = conv_idx
895
+ # Preserve original message_id, use as id for processing
896
+ query["id"] = query.get("message_id", f"unknown_user_{conv_idx}")
897
+ all_messages.append(query)
898
+ for reply in conv.get("team_replies", []):
899
+ reply = reply.copy()
900
+ reply["sender"] = "Team"
901
+ reply["conversation_id"] = conv_idx
902
+ # Preserve original message_id, use as id for processing
903
+ reply["id"] = reply.get("message_id", f"unknown_team_{conv_idx}")
904
+ all_messages.append(reply)
905
+
906
+ print(f"Processing {len(all_messages)} messages for user {user_id}")
907
+
908
+ # Create chunks for processing
909
+ chunks = self.chunk_messages(all_messages)
910
+ print(f"Split into {len(chunks)} chunks (chunk_size={self.chunk_size}, overlap={self.overlap_size})")
911
+
912
+ # Process each chunk
913
+ all_extracted_data = []
914
+ for i, chunk in enumerate(chunks):
915
+ print(f"\nProcessing chunk {i+1}/{len(chunks)} ({len(chunk)} messages)")
916
+
917
+ prompt = self.prepare_comprehensive_llm_prompt(chunk, user_id)
918
+ extracted_data = self.call_llm_for_extraction(prompt)
919
+
920
+ if extracted_data:
921
+ # Count nodes in this chunk
922
+ total_nodes = sum(len(nodes) for nodes in extracted_data.values() if isinstance(nodes, list))
923
+ print(f" Extracted {total_nodes} nodes from chunk {i+1}")
924
+ all_extracted_data.append(extracted_data)
925
+ else:
926
+ print(f" No data extracted from chunk {i+1}")
927
+
928
+ # Merge all extracted data and remove duplicates
929
+ if all_extracted_data:
930
+ merged_data = self.merge_extracted_data(all_extracted_data)
931
+
932
+ # Print summary
933
+ total_nodes = sum(len(nodes) for nodes in merged_data.values())
934
+ print(f"\nMerged Results for user {user_id}: {total_nodes} total nodes")
935
+ for layer, nodes in merged_data.items():
936
+ if nodes:
937
+ print(f" {layer}: {len(nodes)} nodes")
938
+
939
+ # Apply LLM deduplication to get a refined version with no duplicates
940
+ print("\n🧹 Applying LLM deduplication to remove duplicates and merge evidence...")
941
+ try:
942
+ deduplicated_data = await self.deduplicate_with_llm(merged_data, user_id)
943
+ print("✅ LLM deduplication completed successfully")
944
+ except Exception as dedup_error:
945
+ print(f"❌ Error during LLM deduplication: {str(dedup_error)}")
946
+ raise dedup_error
947
+
948
+ # Print deduplication summary
949
+ dedup_total_nodes = sum(len(nodes) for nodes in deduplicated_data.values())
950
+ print(f"\nDeduplicated Results for user {user_id}: {dedup_total_nodes} total nodes")
951
+ for layer, nodes in deduplicated_data.items():
952
+ if nodes:
953
+ print(f" {layer}: {len(nodes)} nodes")
954
+
955
+ # Store deduplicated data in database
956
+ print(f"\n💾 Storing {dedup_total_nodes} deduplicated nodes in database...")
957
+ try:
958
+ storage_result = await self.store_in_db(deduplicated_data, user_id)
959
+ print("✅ Database storage completed successfully")
960
+ except Exception as storage_error:
961
+ print(f"❌ Error during database storage: {str(storage_error)}")
962
+ raise storage_error
963
+
964
+ # Mark file as processed
965
+ if self.db_enabled:
966
+ print(f"\n📝 Marking file as processed for user {user_id}...")
967
+ try:
968
+ await supabase_manager.mark_file_processed(user_id, dedup_total_nodes)
969
+ print("✅ File marked as processed successfully")
970
+ except Exception as mark_error:
971
+ print(f"❌ Error marking file as processed: {str(mark_error)}")
972
+ raise mark_error
973
+
974
+ # Return only the newly stored facts from this iteration
975
+ newly_stored_facts = storage_result.get("newly_stored_nodes", {})
976
+ print(f"\n🎯 Returning {sum(len(nodes) for nodes in newly_stored_facts.values())} newly stored facts for this iteration")
977
+
978
+ return newly_stored_facts
979
+ else:
980
+ print(f"No data extracted for user {user_id}")
981
+ return {}
982
+
983
+ async def reprocess_rejected_fact(self, user_id: str, fact_type: str, layer: str,
984
+ excluded_message_ids: List[str], original_node_id: str) -> Dict:
985
+ """Reprocess a specific rejected fact type by re-analyzing with focused prompt"""
986
+
987
+ # Load the user's JSON file
988
+ input_path = os.path.join("input_jsons", f"{user_id}.json")
989
+ if not os.path.exists(input_path):
990
+ print(f"Warning: {input_path} not found for reprocessing.")
991
+ return {}
992
+
993
+ with open(input_path, 'r', encoding='utf-8') as f:
994
+ input_data = json.load(f)
995
+
996
+ # Flatten messages, preserving order
997
+ all_messages = []
998
+ for conv in input_data:
999
+ for query in conv.get("user_queries", []):
1000
+ query = query.copy()
1001
+ query["sender"] = "User"
1002
+ # Preserve original message_id, use as id for processing
1003
+ query["id"] = query.get("message_id", "unknown_user")
1004
+ all_messages.append(query)
1005
+ for reply in conv.get("team_replies", []):
1006
+ reply = reply.copy()
1007
+ reply["sender"] = "Team"
1008
+ # Preserve original message_id, use as id for processing
1009
+ reply["id"] = reply.get("message_id", "unknown_team")
1010
+ all_messages.append(reply)
1011
+
1012
+ # Filter out messages that were used in the original (rejected) extraction
1013
+ filtered_messages = [
1014
+ msg for msg in all_messages
1015
+ if msg.get("id") not in excluded_message_ids
1016
+ ]
1017
+
1018
+ print(f"Reprocessing {fact_type} for {user_id}")
1019
+ print(f"Original context had {len(excluded_message_ids)} messages, searching in {len(filtered_messages)} remaining messages")
1020
+
1021
+ # Create chunks from filtered messages
1022
+ chunks = self.chunk_messages(filtered_messages)
1023
+
1024
+ layer_number = layer.split("_")[-1] if "_" in layer else layer.replace("layer", "").replace("Layer", "")
1025
+ layer_key = f"Layer{layer_number}"
1026
+ extracted_data = {layer_key: []}
1027
+
1028
+ print(f"Processing {len(chunks)} chunks for reprocessing")
1029
+
1030
+ for i, chunk in enumerate(chunks):
1031
+ print(f" Reprocessing chunk {i+1}: {len(chunk)} messages")
1032
+
1033
+ # Create a specialized prompt for reprocessing
1034
+ prompt = self.prepare_reprocessing_prompt(chunk, layer_key, fact_type)
1035
+ llm_output = self.call_llm_for_extraction(prompt)
1036
+ nodes = llm_output.get(layer_key, [])
1037
+
1038
+ if nodes:
1039
+ print(f" Extracted {len(nodes)} nodes on reprocessing")
1040
+ extracted_data[layer_key].extend(nodes)
1041
+ else:
1042
+ print(f" No nodes extracted on reprocessing")
1043
+
1044
+ # Deduplicate the reprocessed results
1045
+ if extracted_data[layer_key]:
1046
+ extracted_data[layer_key] = self._deduplicate_facts(extracted_data[layer_key])
1047
+ await self.store_reprocessed_nodes(extracted_data, user_id, original_node_id)
1048
+ else:
1049
+ print(f"Reprocessing complete: No {fact_type} found in alternative contexts")
1050
+
1051
+ return extracted_data
1052
+
1053
+ def prepare_reprocessing_prompt(self, chunk: List[Dict[str, str]], layer: str, fact_type: str) -> str:
1054
+ """Prepare specialized prompt for reprocessing rejected facts"""
1055
+ deduped_chunk = self.deduplicate_messages(chunk)
1056
+ context_text = "\n".join([
1057
+ f"[{msg.get('id', i)}] {msg.get('sender', 'Unknown')}: {msg.get('message', '')}"
1058
+ for i, msg in enumerate(deduped_chunk)
1059
+ ])
1060
+ layer_number = layer.replace("Layer", "")
1061
+
1062
+ layer_descriptions = {
1063
+ "1": "Basic Personal Information (name, age, address, phone, email, DOB, nationality, gender, blood group, relationship status)",
1064
+ "2": "Information from Documents Shared (Aadhaar card, PAN card, driving license, voter ID, birth certificate, insurance, rent agreement, utility bills)",
1065
+ "3": "Loved Ones & Relations (family members, friends, colleagues, roommates, partners with their names, relationships, and contact details)",
1066
+ "4": "Preferences, Vendors, Standing Instructions (food preferences, favorite restaurants, service providers, vendors, habits, routines, standing orders)"
1067
+ }
1068
+
1069
+ prompt = f"""
1070
+ You are reprocessing a REJECTED extraction for a personal assistant memory system. A previous extraction of "{fact_type}" was rejected by reviewers.
1071
+
1072
+ LAYER {layer_number}: {layer_descriptions[layer_number]}
1073
+
1074
+ SPECIFIC TASK: Find "{fact_type}" information in this conversation chunk with extra care and precision.
1075
+
1076
+ CONVERSATION CHUNK:
1077
+ {context_text}
1078
+
1079
+ INSTRUCTIONS:
1080
+ 1. Focus ONLY on finding clear, unambiguous "{fact_type}" information
1081
+ 2. This is a REPROCESSING attempt - be more careful and precise than usual
1082
+ 3. Look for casual conversational patterns, not just formal statements
1083
+ 4. Only extract if you find very clear evidence with high confidence (0.8+)
1084
+ 5. If not found clearly, return empty results
1085
+ 6. Reference specific message IDs as evidence
1086
+
1087
+ RESPONSE FORMAT (JSON only):
1088
+ {{
1089
+ "{layer}": [
1090
+ {{
1091
+ "detail": {{"type": "{fact_type}", "value": "extracted_value"}},
1092
+ "confidence": 0.85,
1093
+ "evidence": [
1094
+ {{"message_id": "id", "message_snippet": "relevant_part_of_message"}}
1095
+ ],
1096
+ "timestamp": "2025-09-13 00:00:00"
1097
+ }}
1098
+ ]
1099
+ }}
1100
+
1101
+ Only return JSON. If no clear "{fact_type}" information is found, return empty array.
1102
+ """
1103
+ return prompt
1104
+
1105
+ async def store_reprocessed_nodes(self, extracted_nodes: Dict, user_id: str, original_node_id: str):
1106
+ """Store reprocessed nodes with link to original rejected node"""
1107
+ if not self.db_enabled:
1108
+ print(f"\n=== REPROCESSED DATA FOR USER: {user_id} (DB DISABLED) ===")
1109
+ for layer, nodes in extracted_nodes.items():
1110
+ print(f"\n{layer}:")
1111
+ for i, node in enumerate(nodes, 1):
1112
+ print(f" {i}. Type: {node['detail']['type']}")
1113
+ print(f" Value: {node['detail']['value']}")
1114
+ print(f" Confidence: {node['confidence']}")
1115
+ print(f" Evidence: {len(node['evidence'])} message(s)")
1116
+ print(f" Original Node ID: {original_node_id}")
1117
+ print("=" * 50)
1118
+ return
1119
+
1120
+ try:
1121
+ stored_nodes = 0
1122
+
1123
+ for layer, nodes in extracted_nodes.items():
1124
+ layer_number = int(layer.replace("Layer", ""))
1125
+
1126
+ for node in nodes:
1127
+ detail = node["detail"]
1128
+ confidence = node["confidence"]
1129
+ evidence = node["evidence"]
1130
+
1131
+ # Safely extract required fields
1132
+ try:
1133
+ fact_type = detail.get("type", "unknown")
1134
+ fact_value = detail.get("value", "N/A")
1135
+
1136
+ if fact_value == "N/A":
1137
+ print(f"⚠️ Warning: Missing 'value' field in detail: {detail}")
1138
+
1139
+ # Format concluded fact for human readability
1140
+ concluded_fact = self._format_concluded_fact(user_id, fact_type, fact_value)
1141
+
1142
+ # Store reprocessed memory node with link to original
1143
+ node_id = await supabase_manager.store_memory_node(
1144
+ user_phone=user_id,
1145
+ layer=layer_number,
1146
+ fact_type=fact_type,
1147
+ content=fact_value,
1148
+ concluded_fact=concluded_fact,
1149
+ confidence=confidence,
1150
+ evidence=evidence,
1151
+ extraction_method='reprocess',
1152
+ parent_update_id=original_node_id
1153
+ )
1154
+
1155
+ if node_id:
1156
+ stored_nodes += 1
1157
+ print(f" ✅ Stored reprocessed {fact_type}: {fact_value} (Node ID: {node_id})")
1158
+ else:
1159
+ print(f" ⚠️ Failed to store reprocessed {fact_type}: {fact_value}")
1160
+
1161
+ except Exception as detail_error:
1162
+ print(f"❌ Error processing detail: {str(detail_error)}")
1163
+ print(f"❌ Detail structure: {detail}")
1164
+ continue # Skip this detail and continue with next one
1165
+
1166
+ # Mark original node as reprocessed
1167
+ if stored_nodes > 0:
1168
+ await supabase_manager.mark_reprocessing_complete(original_node_id)
1169
+ print(f"\n📊 Reprocessing Summary for {user_id}:")
1170
+ print(f" Stored: {stored_nodes} reprocessed nodes")
1171
+ print(f" Linked to original node: {original_node_id}")
1172
+
1173
+ except Exception as e:
1174
+ print(f"❌ Database error during reprocessing: {e}")
1175
+
1176
+ def main(self):
1177
+ """Standalone batch processing for all JSON files"""
1178
+ input_folder = "input_jsons"
1179
+ input_files = [
1180
+ "RahulSingh.json",
1181
+ "MonarkMoolchandani.json",
1182
+ "ManuJain.json",
1183
+ "Gaurav-Sherlocksai.json",
1184
+ "Anurag.json",
1185
+ "AdityaShetty.json"
1186
+ ]
1187
+
1188
+ for filename in input_files:
1189
+ input_path = os.path.join(input_folder, filename)
1190
+ if not os.path.exists(input_path):
1191
+ print(f"Warning: {input_path} not found, skipping.")
1192
+ continue
1193
+
1194
+ print(f"\n{'='*50}")
1195
+ print(f"Processing {input_path}")
1196
+ print(f"{'='*50}")
1197
+
1198
+ with open(input_path, 'r', encoding='utf-8') as f:
1199
+ input_data = json.load(f)
1200
+
1201
+ user_id = filename.split(".")[0]
1202
+
1203
+ import asyncio
1204
+ asyncio.run(self.process_json(input_data, user_id))
1205
+ print(f"Finished processing {input_path}")
1206
+
1207
+ if __name__ == "__main__":
1208
+ extractor = JSONContextExtractor(chunk_size=100, overlap_size=20) # Configurable chunk sizes
1209
+ extractor.main()