shaliz-kong commited on
Commit
ee959f2
Β·
1 Parent(s): 0ddcd64

created schema fallback

Browse files
app/schemas/org_schema.py CHANGED
@@ -2,10 +2,11 @@
2
  from typing import Dict, Optional, List, Tuple
3
  import json
4
  import logging
 
5
  from app.core.event_hub import event_hub
6
  from app.service.llm_service import LocalLLMService
7
- from app.service.vector_service import VectorService # Your existing vector service
8
- import duckdb
9
 
10
  logger = logging.getLogger(__name__)
11
 
@@ -31,21 +32,22 @@ class OrgSchema:
31
  "trans_date", "sale_time", "order_date"],
32
  }
33
 
34
- def __init__(self, org_id: str):
35
  self.org_id = org_id
36
- self.cache_key = f"schema:{org_id}:ai:v3"
 
37
  self.stats_key = f"schema:stats:{org_id}"
38
  self.llm = LocalLLMService()
39
- self.vector = VectorService(org_id)
40
 
41
  def get_mapping(self) -> Dict[str, str]:
42
  """Autonomous mapping with AI fallback for unmatched columns"""
43
  try:
44
  if cached := event_hub.get_key(self.cache_key):
45
- logger.info(f"[Schema] Cache hit for org {self.org_id}")
46
  return json.loads(cached)
47
 
48
- logger.info(f"[Schema] Starting AI discovery for org {self.org_id}")
49
  mapping = self._discover_schema()
50
  self.save_mapping(mapping)
51
  return mapping
@@ -56,15 +58,19 @@ class OrgSchema:
56
 
57
  def _discover_schema(self) -> Dict[str, str]:
58
  """Three-tier discovery: Rule-based β†’ Vector similarity β†’ LLM reasoning"""
59
- conn = duckdb.connect("md:?motherduck_token=")
60
 
61
- # Get column metadata
62
  columns_info = conn.execute(f"""
63
  SELECT column_name, data_type, is_nullable
64
  FROM information_schema.columns
65
- WHERE table_name = 'transactions_{self.org_id}'
 
66
  """).fetchall()
67
 
 
 
 
68
  columns = {row[0]: row[1] for row in columns_info}
69
  mapping = {}
70
 
@@ -84,6 +90,7 @@ class OrgSchema:
84
  mapping[semantic] = match
85
  continue
86
 
 
87
  return mapping
88
 
89
  def _exact_match(self, semantic: str, columns: Dict[str, str]) -> Optional[str]:
@@ -91,17 +98,16 @@ class OrgSchema:
91
  patterns = self.PATTERN_VECTORS.get(semantic, [])
92
  for col in columns.keys():
93
  if any(pattern in col.lower().replace("_", "") for pattern in patterns):
 
94
  return col
95
  return None
96
 
97
  def _vector_match(self, semantic: str, column_names: List[str]) -> Optional[str]:
98
  """Semantic similarity via embeddings"""
99
  try:
100
- # Embed semantic field and candidate columns
101
  semantic_emb = self.vector.embed(semantic)
102
  column_embs = [self.vector.embed(name) for name in column_names]
103
 
104
- # Find best match above threshold
105
  best_match, score = self.vector.find_best_match(semantic_emb, column_embs, column_names)
106
 
107
  if score > 0.85: # High confidence threshold
@@ -115,23 +121,54 @@ class OrgSchema:
115
  def _llm_match(self, semantic: str, columns: Dict[str, str]) -> Optional[str]:
116
  """LLM reasoning with schema context"""
117
  try:
118
- prompt = f"""
119
- You are a data schema expert. Map this semantic field to the most likely column.
120
 
121
  Semantic Field: `{semantic}`
122
  Available Columns: {list(columns.keys())}
123
  Data Types: {columns}
124
 
125
  Return ONLY the matching column name or "NONE" if no match.
126
- Consider: naming conventions, business context, data types.
127
- """
128
 
129
  response = self.llm.generate(prompt, max_tokens=20).strip()
130
- return response if response != "NONE" else None
 
 
 
131
  except Exception as e:
132
  logger.warning(f"[LLM] Matching failed: {e}")
133
  return None
134
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
135
  def get_column(self, semantic: str) -> Optional[str]:
136
  """Safely get column name with audit logging"""
137
  mapping = self.get_mapping()
@@ -139,8 +176,6 @@ class OrgSchema:
139
 
140
  if not actual:
141
  logger.warning(f"[Schema] Missing semantic field: {semantic}")
142
- self._log_missing_field(semantic)
143
-
144
  return actual
145
 
146
  def build_dynamic_query(self, required_fields: List[str]) -> Tuple[str, List[str]]:
@@ -150,9 +185,12 @@ class OrgSchema:
150
 
151
  for field in required_fields:
152
  if actual := mapping.get(field):
153
- available.append(f"{actual} AS {field}") # Alias to semantic name
154
 
155
  if not available:
156
- raise ValueError(f"No required fields available: {required_fields}")
 
 
 
157
 
158
- return f"SELECT {', '.join(available)} FROM transactions_{self.org_id}", available
 
2
  from typing import Dict, Optional, List, Tuple
3
  import json
4
  import logging
5
+ from datetime import datetime
6
  from app.core.event_hub import event_hub
7
  from app.service.llm_service import LocalLLMService
8
+ from app.service.vector_service import VectorService
9
+ from app.db import get_conn
10
 
11
  logger = logging.getLogger(__name__)
12
 
 
32
  "trans_date", "sale_time", "order_date"],
33
  }
34
 
35
+ def __init__(self, org_id: str, entity_type: str):
36
  self.org_id = org_id
37
+ self._entity_type = entity_type
38
+ self.cache_key = f"schema:{org_id}:{entity_type}:v3"
39
  self.stats_key = f"schema:stats:{org_id}"
40
  self.llm = LocalLLMService()
41
+ self.vector = VectorService(org_id)
42
 
43
  def get_mapping(self) -> Dict[str, str]:
44
  """Autonomous mapping with AI fallback for unmatched columns"""
45
  try:
46
  if cached := event_hub.get_key(self.cache_key):
47
+ logger.info(f"[Schema] Cache hit for org {self.org_id}/{self._entity_type}")
48
  return json.loads(cached)
49
 
50
+ logger.info(f"[Schema] Starting AI discovery for org {self.org_id}/{self._entity_type}")
51
  mapping = self._discover_schema()
52
  self.save_mapping(mapping)
53
  return mapping
 
58
 
59
  def _discover_schema(self) -> Dict[str, str]:
60
  """Three-tier discovery: Rule-based β†’ Vector similarity β†’ LLM reasoning"""
61
+ conn = get_conn(self.org_id)
62
 
63
+ # Get columns from actual canonical table
64
  columns_info = conn.execute(f"""
65
  SELECT column_name, data_type, is_nullable
66
  FROM information_schema.columns
67
+ WHERE table_schema = 'main'
68
+ AND table_name = '{self._entity_type}_canonical'
69
  """).fetchall()
70
 
71
+ if not columns_info:
72
+ raise ValueError(f"No schema found for {self._entity_type}_canonical")
73
+
74
  columns = {row[0]: row[1] for row in columns_info}
75
  mapping = {}
76
 
 
90
  mapping[semantic] = match
91
  continue
92
 
93
+ logger.info(f"[Schema] AI discovery complete: {len(mapping)} fields mapped")
94
  return mapping
95
 
96
  def _exact_match(self, semantic: str, columns: Dict[str, str]) -> Optional[str]:
 
98
  patterns = self.PATTERN_VECTORS.get(semantic, [])
99
  for col in columns.keys():
100
  if any(pattern in col.lower().replace("_", "") for pattern in patterns):
101
+ logger.info(f"[Rule] Matched '{semantic}' β†’ '{col}' (pattern)")
102
  return col
103
  return None
104
 
105
  def _vector_match(self, semantic: str, column_names: List[str]) -> Optional[str]:
106
  """Semantic similarity via embeddings"""
107
  try:
 
108
  semantic_emb = self.vector.embed(semantic)
109
  column_embs = [self.vector.embed(name) for name in column_names]
110
 
 
111
  best_match, score = self.vector.find_best_match(semantic_emb, column_embs, column_names)
112
 
113
  if score > 0.85: # High confidence threshold
 
121
  def _llm_match(self, semantic: str, columns: Dict[str, str]) -> Optional[str]:
122
  """LLM reasoning with schema context"""
123
  try:
124
+ prompt = f"""You are a data schema expert. Map this semantic field to the most likely column.
 
125
 
126
  Semantic Field: `{semantic}`
127
  Available Columns: {list(columns.keys())}
128
  Data Types: {columns}
129
 
130
  Return ONLY the matching column name or "NONE" if no match.
131
+ Consider: naming conventions, business context, data types."""
 
132
 
133
  response = self.llm.generate(prompt, max_tokens=20).strip()
134
+ if response != "NONE":
135
+ logger.info(f"[LLM] Matched '{semantic}' β†’ '{response}'")
136
+ return response
137
+ return None
138
  except Exception as e:
139
  logger.warning(f"[LLM] Matching failed: {e}")
140
  return None
141
 
142
+ def save_mapping(self, mapping: Dict[str, str]) -> None:
143
+ """Persist mapping with TTL and stats"""
144
+ try:
145
+ event_hub.redis.setex(self.cache_key, 3600, json.dumps(mapping))
146
+
147
+ stats = {
148
+ "timestamp": datetime.now().isoformat(),
149
+ "fields_mapped": len(mapping),
150
+ "entity_type": self._entity_type
151
+ }
152
+ event_hub.redis.setex(self.stats_key, 3600, json.dumps(stats))
153
+ except Exception as e:
154
+ logger.warning(f"[Schema] Failed to save mapping: {e}")
155
+
156
+ def _get_fallback_mapping(self) -> Dict[str, str]:
157
+ """
158
+ πŸš€ EMERGENCY FALLBACK: Map columns to themselves
159
+ Ensures SaaS flexibility for any schema
160
+ """
161
+ logger.warning(f"[Schema] 🚨 EMERGENCY FALLBACK for {self.org_id}/{self._entity_type}")
162
+
163
+ conn = get_conn(self.org_id)
164
+ columns_info = conn.execute(f"""
165
+ SELECT column_name FROM information_schema.columns
166
+ WHERE table_schema = 'main' AND table_name = '{self._entity_type}_canonical'
167
+ """).fetchall()
168
+
169
+ # Map every column to itself - works for ANY schema
170
+ return {row[0]: row[0] for row in columns_info}
171
+
172
  def get_column(self, semantic: str) -> Optional[str]:
173
  """Safely get column name with audit logging"""
174
  mapping = self.get_mapping()
 
176
 
177
  if not actual:
178
  logger.warning(f"[Schema] Missing semantic field: {semantic}")
 
 
179
  return actual
180
 
181
  def build_dynamic_query(self, required_fields: List[str]) -> Tuple[str, List[str]]:
 
185
 
186
  for field in required_fields:
187
  if actual := mapping.get(field):
188
+ available.append(f"{actual} AS {field}")
189
 
190
  if not available:
191
+ # Return all columns if no semantic matches
192
+ conn = get_conn(self.org_id)
193
+ columns = conn.execute(f"PRAGMA table_info('{self._entity_type}_canonical')").fetchall()
194
+ available = [f"{c[1]} AS {c[1]}" for c in columns]
195
 
196
+ return f"SELECT {', '.join(available)} FROM {self._entity_type}_canonical", available
app/tasks/analytics_worker.py CHANGED
@@ -360,31 +360,46 @@ class AnalyticsWorker:
360
 
361
  # ==================== SCHEMA & EMBEDDING ====================
362
 
363
- async def _discover_schema(self, df: pd.DataFrame) -> Dict[str, str]:
364
- """🧠 Einstein's discovery engine with caching"""
 
 
365
  try:
366
- cache_key = f"schema:mapping:{self.org_id}"
367
-
368
- if cached := event_hub.get_key(cache_key):
369
- logger.info("[SCHEMA] πŸ’Ύ Cache hit")
370
- return json.loads(cached)
371
-
372
  logger.info("[SCHEMA] 🧠 Cache miss, discovering...")
 
 
 
 
 
373
  mapping = self.schema.get_mapping()
374
-
375
  if not mapping:
376
- logger.error("[SCHEMA] Discovery returned empty")
377
- return {}
378
 
379
- # Cache for 24h
 
 
 
380
  event_hub.setex(cache_key, 86400, json.dumps(mapping))
381
- logger.info(f"[SCHEMA] βœ… Discovered {len(mapping)} mappings")
382
-
 
 
383
  return mapping
384
-
385
  except Exception as e:
386
- logger.error(f"[SCHEMA] ❌ Discovery failed: {e}", exc_info=True)
387
- return {}
 
 
 
 
 
 
 
 
 
 
388
 
389
  def _alias_columns(self, df: pd.DataFrame, mapping: Dict[str, str]) -> pd.DataFrame:
390
  """πŸ”€ Renames columns to semantic names"""
 
360
 
361
  # ==================== SCHEMA & EMBEDDING ====================
362
 
363
+ # app/tasks/analytics_worker.py - Replace your _discover_schema method
364
+
365
+ def _discover_schema(self):
366
+ """Schema discovery with proper caching and error handling"""
367
  try:
 
 
 
 
 
 
368
  logger.info("[SCHEMA] 🧠 Cache miss, discovering...")
369
+
370
+ from app.schemas.org_schema import OrgSchema
371
+
372
+ # Initialize schema discoverer with entity context
373
+ self.schema = OrgSchema(self.org_id, self._entity_type)
374
  mapping = self.schema.get_mapping()
375
+
376
  if not mapping:
377
+ raise ValueError("Empty mapping returned")
 
378
 
379
+ # βœ… FIX: Define cache_key BEFORE using it
380
+ cache_key = f"schema:{self.org_id}:{self._entity_type}:worker_cache"
381
+
382
+ # βœ… FIX: Save to Redis with proper TTL
383
  event_hub.setex(cache_key, 86400, json.dumps(mapping))
384
+ logger.info(f"[SCHEMA] πŸ’Ύ Cached mapping for 24h: {cache_key}")
385
+
386
+ self._schema_cache = mapping
387
+ logger.info(f"[SCHEMA] βœ… Discovery complete: {len(mapping)} columns")
388
  return mapping
389
+
390
  except Exception as e:
391
+ logger.error(f"[SCHEMA] ❌ Discovery failed: {e}")
392
+
393
+ # πŸš€ EMERGENCY FALLBACK: Map columns to themselves (SaaS-ready)
394
+ logger.warning("[SCHEMA] 🚨 Using fallback - mapping columns as-is")
395
+ stealth_mapping = {col: col for col in self.df.columns}
396
+
397
+ # βœ… Cache the fallback too
398
+ cache_key = f"schema:{self.org_id}:{self._entity_type}:worker_cache:fallback"
399
+ event_hub.setex(cache_key, 3600, json.dumps(stealth_mapping))
400
+
401
+ self._schema_cache = stealth_mapping
402
+ return stealth_mapping
403
 
404
  def _alias_columns(self, df: pd.DataFrame, mapping: Dict[str, str]) -> pd.DataFrame:
405
  """πŸ”€ Renames columns to semantic names"""