Spaces:
Sleeping
Sleeping
| import sys | |
| import re | |
| import json | |
| import os | |
| import spacy | |
| import networkx as nx | |
| import torch | |
| import torch.nn as nn | |
| from transformers import AutoTokenizer, AutoModel | |
| from sentence_transformers.cross_encoder import CrossEncoder | |
| # ========================================================= | |
| # SKELETON CONSTANTS | |
| # ========================================================= | |
| SIMPLE = 0 | |
| PURE_AGG = 1 | |
| GROUPED_AGG = 2 | |
| SUPERLATIVE = 3 | |
| EXCEPT_QUERY = 4 | |
| LABEL_NAMES = {0:'SIMPLE', 1:'PURE_AGG', 2:'GROUPED_AGG', 3:'SUPERLATIVE', 4:'EXCEPT_QUERY'} | |
| # ========================================================= | |
| # SKELETON CLASSIFIER (inlined β no separate file needed) | |
| # ========================================================= | |
| class _SkeletonModel(nn.Module): | |
| def __init__(self, backbone, n_classes): | |
| super().__init__() | |
| self.backbone = backbone | |
| self.classifier = nn.Linear(384, n_classes) | |
| self.dropout = nn.Dropout(0.1) | |
| def forward(self, input_ids, attention_mask): | |
| out = self.backbone(input_ids=input_ids, attention_mask=attention_mask) | |
| pooled = out.last_hidden_state[:, 0, :] | |
| return self.classifier(self.dropout(pooled)) | |
| class SkeletonClassifier: | |
| def __init__(self, model_path): | |
| with open(f"{model_path}/labels.json") as f: | |
| self.label_names = json.load(f) | |
| n_classes = len(self.label_names) | |
| self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_path) | |
| backbone = AutoModel.from_pretrained('cross-encoder/ms-marco-MiniLM-L-6-v2') | |
| self.model = _SkeletonModel(backbone, n_classes).to(self.device) | |
| self.model.load_state_dict( | |
| torch.load(f"{model_path}/model.pt", map_location=self.device) | |
| ) | |
| self.model.eval() | |
| def predict(self, question): | |
| enc = self.tokenizer([question], padding=True, truncation=True, | |
| max_length=128, return_tensors='pt') | |
| with torch.no_grad(): | |
| logits = self.model( | |
| enc['input_ids'].to(self.device), | |
| enc['attention_mask'].to(self.device) | |
| ) | |
| label = int(logits.argmax(dim=1).item()) | |
| return label, self.label_names[str(label)] | |
| #============================================================ | |
| # DB Class | |
| #============================================================ | |
| from sentence_transformers import SentenceTransformer, util | |
| import sqlite3 | |
| class DBValueLookup: | |
| def __init__(self, db_dir, model_name='all-MiniLM-L6-v2'): | |
| self.db_dir = db_dir | |
| self._cache = {} # (db_id, table, col) -> [values] | |
| self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| print(f"Loading Semantic DB Lookup model ({model_name})...") | |
| self.model = SentenceTransformer(model_name).to(self.device) | |
| def get_values(self, db_id, table, col): | |
| key = (db_id, table, col) | |
| if key not in self._cache: | |
| db_path = f"{self.db_dir}/{db_id}/{db_id}.sqlite" | |
| try: | |
| conn = sqlite3.connect(db_path) | |
| # Fetch distinct values, ignore nulls | |
| rows = conn.execute( | |
| f"SELECT DISTINCT {col} FROM {table} WHERE {col} IS NOT NULL LIMIT 200" | |
| ).fetchall() | |
| conn.close() | |
| self._cache[key] = [str(r[0]) for r in rows if r[0]] | |
| except Exception: | |
| self._cache[key] = [] | |
| return self._cache[key] | |
| def semantic_match(self, value_text, db_id, table, col, threshold=0.5): | |
| candidates = self.get_values(db_id, table, col) | |
| if not candidates: | |
| return value_text | |
| # Fast-path: Exact match saves us running the neural model | |
| v_lower = value_text.lower() | |
| for c in candidates: | |
| if v_lower == c.lower(): | |
| return c | |
| # Semantic matching | |
| query_embedding = self.model.encode(value_text, convert_to_tensor=True, show_progress_bar=False, device=self.device) | |
| db_embeddings = self.model.encode(candidates, convert_to_tensor=True, show_progress_bar=False, device=self.device) | |
| cosine_scores = util.cos_sim(query_embedding, db_embeddings) | |
| best_score_val, best_idx = torch.max(cosine_scores, dim=1) | |
| best_score = best_score_val.item() | |
| if best_score >= threshold: | |
| return candidates[best_idx] | |
| return value_text # Fallback to original text if no good match | |
| #========================================================== | |
| # OPERATORCLISSIFIER CLASSS | |
| # ========================================================= | |
| class OperatorClassifier: | |
| def __init__(self, model_path): | |
| import torch | |
| import torch.nn as nn | |
| from transformers import AutoTokenizer, AutoModel | |
| self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_path) | |
| with open(f"{model_path}/operators.json") as f: | |
| self.operators = json.load(f) | |
| backbone = AutoModel.from_pretrained('cross-encoder/ms-marco-MiniLM-L-6-v2') | |
| class OperatorModel(nn.Module): | |
| def __init__(self, backbone, n_classes): | |
| super().__init__() | |
| self.backbone = backbone | |
| self.classifier = nn.Linear(384, n_classes) | |
| self.dropout = nn.Dropout(0.1) | |
| def forward(self, input_ids, attention_mask): | |
| out = self.backbone(input_ids=input_ids, attention_mask=attention_mask) | |
| pooled = out.last_hidden_state[:, 0, :] | |
| return self.classifier(self.dropout(pooled)) | |
| self.model = OperatorModel(backbone, len(self.operators)).to(self.device) | |
| self.model.load_state_dict( | |
| torch.load(f"{model_path}/model.pt", map_location=self.device) | |
| ) | |
| self.model.eval() | |
| def predict(self, context): | |
| import torch | |
| enc = self.tokenizer([context], padding=True, truncation=True, | |
| max_length=64, return_tensors='pt') | |
| with torch.no_grad(): | |
| logits = self.model( | |
| enc['input_ids'].to(self.device), | |
| enc['attention_mask'].to(self.device) | |
| ) | |
| idx = int(logits.argmax(dim=1).item()) | |
| return self.operators[idx] | |
| #========================================================= | |
| # 1. SCHEMA GRAPH | |
| # ========================================================= | |
| class SchemaGraph: | |
| def __init__(self, schema_text=None, spider_schema=None): | |
| self.graph = nx.Graph() | |
| self.tables = {} | |
| self.primary_keys = {} | |
| if spider_schema: | |
| self._parse_spider(spider_schema) | |
| elif schema_text: | |
| self._parse_text(schema_text) | |
| self._build_graph() | |
| def _parse_text(self, text): | |
| table_pattern = re.compile(r'CREATE\s+TABLE\s+(\w+)\s*\((.*?)\);', re.I | re.S) | |
| for match in table_pattern.finditer(text): | |
| table = match.group(1).lower() | |
| body = match.group(2) | |
| cols = {} | |
| for line in body.split(','): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| parts = line.split() | |
| col_name = parts[0].lower() | |
| col_type = "TEXT" | |
| if len(parts) > 1 and any(t in parts[1].upper() for t in ["INT", "FLOAT", "NUMBER"]): | |
| col_type = "NUM" | |
| cols[col_name] = col_type | |
| # if "PRIMARY KEY" in line.upper() or col_name in ('id', f"{table}_id"): | |
| if "PRIMARY KEY" in line.upper() or col_name in ('id', f"{table}_id") or col_name.endswith('id'): | |
| self.primary_keys[table] = col_name | |
| self.tables[table] = cols | |
| self.graph.add_node(table) | |
| def _parse_spider(self, schema_json): | |
| table_names_orig = schema_json['table_names_original'] | |
| table_names_norm = schema_json['table_names'] # The natural language table names | |
| col_names_orig = schema_json['column_names_original'] | |
| col_names_norm = schema_json['column_names'] # The natural language column names | |
| col_types = schema_json['column_types'] | |
| # New dictionaries to hold the translation mapping | |
| self.norm_table_names = {} | |
| self.norm_column_names = {} | |
| for i, tbl in enumerate(table_names_orig): | |
| t_name = tbl.lower() | |
| self.tables[t_name] = {} | |
| self.graph.add_node(t_name) | |
| # Map original table name -> normalized table name | |
| self.norm_table_names[t_name] = table_names_norm[i].lower() | |
| for i, (tbl_idx, col_name) in enumerate(col_names_orig): | |
| if tbl_idx == -1: | |
| continue | |
| t_name = table_names_orig[tbl_idx].lower() | |
| c_name = col_name.lower() | |
| # Spider stores the normalized name at index 1 of the inner list | |
| norm_c_name = col_names_norm[i][1].lower() | |
| c_type = "NUM" if col_types[i] == "number" else "TEXT" | |
| self.tables[t_name][c_name] = c_type | |
| # Map (table, original_column) -> normalized column | |
| self.norm_column_names[(t_name, c_name)] = norm_c_name | |
| if c_name in ('id', f"{t_name}_id"): | |
| self.primary_keys[t_name] = c_name | |
| for (col_idx_1, col_idx_2) in schema_json['foreign_keys']: | |
| t1_idx, c1_name = col_names_orig[col_idx_1] | |
| t2_idx, c2_name = col_names_orig[col_idx_2] | |
| t1 = table_names_orig[t1_idx].lower() | |
| t2 = table_names_orig[t2_idx].lower() | |
| self.graph.add_edge(t1, t2, on=f"{t1}.{c1_name.lower()} = {t2}.{c2_name.lower()}") | |
| def _build_graph(self): | |
| tables = list(self.tables.keys()) | |
| for t1 in tables: | |
| for col in self.tables[t1]: | |
| for t2 in tables: | |
| if col in (f"{t2}_id", f"{t2.rstrip('s')}_id"): | |
| if t1 == t2: | |
| continue | |
| pk = self.primary_keys.get(t2) | |
| if pk and not self.graph.has_edge(t1, t2): | |
| self.graph.add_edge(t1, t2, on=f"{t1}.{col} = {t2}.{pk}") | |
| # ========================================================= | |
| # 2. LINGUISTIC ENGINE | |
| # ========================================================= | |
| class LinguisticEngine: | |
| def __init__(self, schema_graph, table_model_path, column_model_path, | |
| value_model_path, skeleton_model_path, db_id=None): | |
| self.schema = schema_graph | |
| self.db_id = db_id | |
| print("Loading spaCy...") | |
| self.nlp = spacy.load("en_core_web_sm") | |
| print("Loading DB value lookup...") | |
| self.db_lookup = DBValueLookup('/kaggle/working/spider_data/database') | |
| print("Loading table linker...") | |
| self.table_linker = CrossEncoder(table_model_path) | |
| print("Loading column linker...") | |
| self.column_linker = CrossEncoder(column_model_path) | |
| print("Loading value linker...") | |
| self.value_linker = CrossEncoder(value_model_path) | |
| print("Loading skeleton classifier...") | |
| self.skeleton = SkeletonClassifier(skeleton_model_path) | |
| print("Ready.") | |
| print("Loading operator classifier...") | |
| self.operator_clf = OperatorClassifier('schema_linking_data/model_operator') | |
| self.boolean_cols = self._build_boolean_column_registry() | |
| def _serialize_table(self, table_name): | |
| cols = list(self.schema.tables.get(table_name, {}).keys()) | |
| # Use mapped table name and mapped column names | |
| norm_t_name = getattr(self.schema, 'norm_table_names', {}).get(table_name, table_name) | |
| norm_cols = [ | |
| getattr(self.schema, 'norm_column_names', {}).get((table_name, c), c) | |
| for c in cols | |
| ] | |
| return f"{norm_t_name} | {', '.join(norm_cols)}" | |
| def _serialize_column(self, table_name, col_name): | |
| all_cols = list(self.schema.tables.get(table_name, {}).keys()) | |
| col_type = self.schema.tables[table_name].get(col_name, 'text') | |
| # Safely fetch normalized names (falling back to raw names if not a Spider schema) | |
| norm_t_name = getattr(self.schema, 'norm_table_names', {}).get(table_name, table_name) | |
| norm_c_name = getattr(self.schema, 'norm_column_names', {}).get((table_name, col_name), col_name) | |
| # Normalize the context columns as well | |
| norm_context_list = [ | |
| getattr(self.schema, 'norm_column_names', {}).get((table_name, c), c) | |
| for c in all_cols if c != col_name | |
| ] | |
| context = ', '.join(norm_context_list) | |
| return f"{norm_t_name} | {norm_c_name} | {col_type} | context: {context}" | |
| def resolve_ordering(self, question, active_tables): | |
| q_lower = question.lower() | |
| doc = self.nlp(q_lower) | |
| target_spans = [] | |
| direction = "DESC" | |
| dir_map = { | |
| 'highest': 'DESC', 'largest': 'DESC', 'maximum': 'DESC', 'best': 'DESC', 'oldest': 'DESC', 'most': 'DESC', | |
| 'lowest': 'ASC', 'smallest': 'ASC', 'minimum': 'ASC', 'worst': 'ASC', 'youngest': 'ASC', 'least': 'ASC' | |
| } | |
| for token in doc: | |
| if token.tag_ in ['JJS', 'RBS'] or token.text in dir_map: | |
| # Safely set direction based on the first superlative encountered | |
| direction = dir_map.get(token.text, "ASC" if any(x in token.text for x in ['least','fewest','smallest','lowest','worst']) else "DESC") | |
| # Hard-target known abstract concepts to prevent noun pollution | |
| if token.text in ['youngest', 'oldest']: | |
| target_spans.append('age') | |
| elif token.text in ['most', 'least']: | |
| return 'COUNT(*)', direction | |
| else: | |
| head = token.head | |
| if head.pos_ in ['NOUN', 'PROPN'] and head.text != token.text: | |
| modifiers = [child.text for child in head.children if child.dep_ in ['amod', 'compound'] and child.text != token.text] | |
| modifiers.append(head.text) | |
| target_spans.append(" ".join(modifiers)) | |
| else: | |
| target_spans.append(token.text) | |
| break | |
| if not target_spans: | |
| if any(k in q_lower for k in ["descending order","ascending order","order by","sorted by","ordered by"]): | |
| chunks = list(doc.noun_chunks) | |
| target_span = chunks[-1].text if chunks else question.split()[-1] | |
| target_spans.append(target_span) | |
| if "asc" in q_lower: direction = "ASC" | |
| if not target_spans: return None, None | |
| # --- Structural Domain Override for Time-Series Superlatives --- | |
| # The prompt explicitly defines anomalies as being ranked by data_value_double | |
| # if getattr(self, 'db_id', None) == 'influx_system' and 'sys_target_alerts' in active_tables: | |
| # if any(w in question.lower() for w in ['anomaly', 'bottleneck', 'spike']): | |
| # return 'data_value_double', direction | |
| # --- Structural Domain Override for Time-Series Superlatives --- | |
| if getattr(self, 'db_id', None) == 'influx_system' and 'sys_target_alerts' in active_tables: | |
| # Protect aggregate queries! If they ask for a count, do NOT force the raw double value | |
| is_aggregate = any(w in question.lower() for w in ['count', 'total', 'how many', 'average', 'sum']) | |
| if not is_aggregate: | |
| if any(w in question.lower() for w in ['anomaly', 'bottleneck', 'spike']): | |
| return 'data_value_double', direction | |
| candidates = [(t, c) for t in active_tables for c in self.schema.tables[t] if c not in ('id', f"{t}_id")] | |
| # candidates = [(t, c) for t in active_tables for c in self.schema.tables[t] if c not in ('id', f"{t}_id")] | |
| if not candidates: return None, None | |
| best_overall_score = -float('inf') | |
| best_overall_match = None | |
| for span in target_spans: | |
| pairs = [(span, self._serialize_column(t, c)) for t, c in candidates] | |
| scores = self.column_linker.predict(pairs, show_progress_bar=False) | |
| best_idx = int(scores.argmax()) | |
| best_score = float(scores[best_idx]) | |
| if best_score > best_overall_score: | |
| best_overall_score = best_score | |
| best_t, best_c = candidates[best_idx] | |
| best_overall_match = f"{best_t}.{best_c}" | |
| return best_overall_match, direction | |
| def detect_aggregation(self, question): | |
| q = question.lower() | |
| found_aggs = [] | |
| # Aggregation should only trigger on mathematical intent | |
| checks = [ | |
| (["average", "mean", "avg"], "AVG"), | |
| (["maximum", "max", "highest"], "MAX"), | |
| (["minimum", "min", "lowest"], "MIN"), | |
| (["how many", "count the number"], "COUNT"), # Removed "all" and generic "count" | |
| (["total sum", "sum of"], "SUM") | |
| ] | |
| # Robust mathematical intent detection for "count" | |
| if "count" in q and any(w in q for w in ["per", "total", "of", "for"]): | |
| found_aggs.append((q.find("count"), "COUNT")) | |
| for keywords, agg_type in checks: | |
| for kw in keywords: | |
| idx = q.find(kw) | |
| if idx != -1: | |
| if agg_type == "SUM" and "total number" in q: | |
| continue | |
| found_aggs.append((idx, agg_type)) | |
| break | |
| found_aggs.sort(key=lambda x: x[0]) | |
| if 'SUM' in [a for _, a in found_aggs] and 'COUNT' in [a for _, a in found_aggs]: | |
| if 'total number' in q or 'total count' in q or 'count per' in q: | |
| found_aggs = [(i, a) for i, a in found_aggs if a == 'COUNT'] | |
| return [agg for _, agg in found_aggs] | |
| def bind_values(self, question, active_tables, window_size=5, debug=False): | |
| question = question.replace('\u2018', "'").replace('\u2019', "'").replace('\u201c', '"').replace('\u201d', '"') | |
| quoted = [(m.group(1), False, True) for m in re.finditer(r"['\"](.+?)['\"]", question)] | |
| top_n_positions = {m.start(1) for m in re.finditer( | |
| r'\b(?:top|bottom|worst|best)\s+(\d+)\b', question.lower())} | |
| numbered = [] | |
| for m in re.finditer(r'\b(\d+(?:\.\d+)?)\b', question): | |
| if m.start() in top_n_positions: | |
| continue | |
| num_str = m.group(1) | |
| if not any(num_str in q[0] for q in quoted): | |
| numbered.append((num_str, True, False)) | |
| doc = self.nlp(question) | |
| entity_texts = set(q[0].lower() for q in quoted) | |
| valid_ent_types = {'PERSON', 'ORG', 'GPE', 'LOC', 'FAC', 'PRODUCT', 'NORP', 'EVENT', 'WORK_OF_ART'} | |
| entities = [] | |
| for ent in doc.ents: | |
| if ent.text.lower() in entity_texts or re.search(r'\d+', ent.text) or ent.label_ not in valid_ent_types: | |
| continue | |
| entities.append((ent.text, False, False)) | |
| # ---> CHATBOT ENTITY SAFETY NET <--- | |
| # Users won't use quotes in a chatbot. We must explicitly scan the text | |
| # for known target names or critical identifiers. | |
| if self.db_id == 'derby_system' or self.db_id == 'influx_system': | |
| # Fast-fetch all known target names from Derby | |
| known_targets = self.db_lookup.get_values('derby_system', 'target', 'name') | |
| for kt in known_targets: | |
| if str(kt).lower() in question.lower() and len(str(kt)) > 3: | |
| # If the unquoted string exists in the DB, force it into the entity list! | |
| # The boolean flags are (is_numeric=False, is_quoted=False) | |
| entities.append((str(kt), False, False)) | |
| search_tables = set(active_tables) | |
| for t in active_tables: | |
| search_tables.update(self.schema.graph.neighbors(t)) | |
| if self.db_id: | |
| seen_texts = entity_texts | set(v[0].lower() for v in numbered) | |
| # ---> SURGICAL FIX: FORCE EXTRACTION OF TARGET NAMES <--- | |
| # Look for words containing underscores and numbers (e.g. MySQL_QUICK_1711_1) | |
| # complex_names = re.findall(r'\b[A-Za-z0-9]+_[A-Za-z0-9_]+\b', question) | |
| # for name in complex_names: | |
| # if name.lower() not in seen_texts: | |
| # ---> SURGICAL FIX: FORCE EXTRACTION OF TARGET NAMES <--- | |
| complex_names = re.findall(r'\b[A-Za-z0-9]+(?:_[A-Za-z0-9_]+|-[\w\-]+)+\b', question) | |
| for name in complex_names: | |
| # Prevent extracting substrings of already-quoted values! | |
| if name.lower() not in seen_texts and not any(name.lower() in st for st in seen_texts): | |
| # ALWAYS trust complex names as entities, skip the DB existence check here! | |
| # It will be evaluated by the structural trust rule downstream. | |
| entities.append((name, False, False)) | |
| seen_texts.add(name.lower()) | |
| # Standard spaCy noun extraction | |
| for token in doc: | |
| if token.pos_ not in ['NOUN', 'PROPN', 'X']: continue | |
| for txt in [token.text, token.lemma_]: | |
| if txt.lower() in seen_texts or len(txt) < 3: continue | |
| for table in search_tables: | |
| for col, col_type in self.schema.tables.get(table, {}).items(): | |
| if col_type != 'TEXT': continue | |
| db_vals = self.db_lookup.get_values(self.db_id, table, col) | |
| if any(txt.lower() == str(v).lower() for v in db_vals): | |
| entities.append((txt, False, False)) | |
| seen_texts.add(txt.lower()) | |
| seen_texts.add(token.text.lower()) | |
| break | |
| else: continue | |
| break | |
| # STRUCTURAL FIX 1: Deduplicate all extracted values | |
| unique_vals = [] | |
| seen = set() | |
| for v in quoted + numbered + entities: | |
| if v[0].lower() not in seen: | |
| seen.add(v[0].lower()) | |
| unique_vals.append(v) | |
| all_values = unique_vals | |
| if not all_values: return [] | |
| candidates = [(table, col, col_type) for table in search_tables for col, col_type in self.schema.tables.get(table, {}).items() if col != 'id' and not col.endswith('_id')] | |
| if not candidates: return [] | |
| filters = [] | |
| skip_vals = set() | |
| for val_text, is_numeric, is_quoted in all_values: | |
| if val_text in skip_vals: continue | |
| val_pos = question.lower().find(val_text.lower()) | |
| if val_pos == -1: continue | |
| before = question[:val_pos].split()[-window_size:] | |
| after = question[val_pos + len(val_text):].split()[:window_size] | |
| context = ' '.join(before + [val_text] + after) | |
| valid_candidates = candidates | |
| # --- THE FIX: STRUCTURAL TRUST --- | |
| # If the word contains underscores, hyphens mixed with numbers, or looks like a system ID, | |
| # we trust it is a valid entity even if the DB lookup fails. | |
| is_complex_identifier = bool(re.match(r'^[A-Za-z0-9]+(?:_[A-Za-z0-9_]+|-[\w\-]+)+$', val_text)) | |
| if self.db_id and not is_numeric and not is_quoted and not is_complex_identifier: | |
| exact_matches = [] | |
| for t, c, ct in candidates: | |
| if ct == 'TEXT': | |
| db_vals = self.db_lookup.get_values(self.db_id, t, c) | |
| if val_text.lower() in [str(v).lower() for v in db_vals]: | |
| exact_matches.append((t, c, ct)) | |
| # Discard unquoted, simple conversational nouns that do not physically exist in the DB! | |
| if not exact_matches: | |
| continue | |
| valid_candidates = exact_matches | |
| pairs = [(context, self._serialize_column(t, c)) for t, c, ct in valid_candidates] | |
| scores = self.value_linker.predict(pairs, show_progress_bar=False) | |
| sorted_indices = scores.argsort()[::-1] | |
| for idx in sorted_indices: | |
| best_score = float(scores[idx]) | |
| t, c, ct = valid_candidates[idx] | |
| # ---> BULLETPROOF LEXICAL OVERRIDE <--- | |
| # Check if the column name exists in the question, ignoring spaces and underscores | |
| col_clean = c.lower().replace("_", "") | |
| q_clean = question.lower().replace("_", "").replace(" ", "") | |
| is_lexical = col_clean in q_clean | |
| threshold = -2.0 if is_numeric else -5.0 | |
| # If the neural net hates it, but the column name is literally in the sentence, trust the text! | |
| if best_score <= threshold and not is_lexical: | |
| continue # Keep looking, don't break entirely | |
| if ct == 'NUM' and not is_numeric: continue | |
| op = self.operator_clf.predict(context) | |
| like_triggers = ['have', 'having', 'contain', 'containing', 'start', 'end', 'like', 'starts with', 'ends with'] | |
| if is_quoted and any(k in context.lower() for k in like_triggers): | |
| op = 'LIKE' | |
| val = f"'%{val_text}%'" | |
| else: | |
| if not is_numeric and not is_quoted and ct == 'TEXT' and self.db_id: | |
| grounded_val = self.db_lookup.semantic_match(val_text, self.db_id, t, c) | |
| val = f"'{grounded_val}'" | |
| else: | |
| val = val_text if is_numeric else f"'{val_text}'" | |
| if op == 'BETWEEN': | |
| remaining = question[question.lower().find(val_text.lower()) + len(val_text):] | |
| next_num = re.search(r'\b(\d+(?:\.\d+)?)\b', remaining) | |
| if next_num: | |
| next_val = next_num.group(1) | |
| val = f"{val_text} AND {next_val}" | |
| skip_vals.add(next_val) | |
| else: | |
| op = '=' | |
| intent = 'EXCLUDE' if op == '!=' and ct == 'TEXT' else ('INCLUDE' if op in ('=', 'LIKE') and ct == 'TEXT' else 'COMPARE') | |
| filters.append((t, c, op, val, intent)) | |
| break | |
| return filters | |
| def _build_boolean_column_registry(self): | |
| boolean_cols = set() | |
| for table_name, columns in self.schema.tables.items(): | |
| for col_name, col_info in columns.items(): | |
| col_type = col_info.get('type', '').upper() if isinstance(col_info, dict) else '' | |
| # Strategy 1: explicit BOOLEAN type | |
| if 'BOOLEAN' in col_type or 'BOOL' in col_type: | |
| boolean_cols.add((table_name, col_name)) | |
| # Strategy 2: naming convention patterns | |
| elif re.match(r'^is_|^has_|^hlc_is_', col_name): | |
| boolean_cols.add((table_name, col_name)) | |
| # Strategy 3: enum-like columns β low cardinality status/type/platform cols | |
| # These should filter, not display | |
| elif re.match(r'.+_(status|type|platform|period|provider)$', col_name): | |
| boolean_cols.add((table_name, col_name)) | |
| # Strategy 4: day-of-week, enabled flag | |
| elif col_name in ('enabled', 'status', 'platform'): | |
| boolean_cols.add((table_name, col_name)) | |
| return boolean_cols | |
| def extract_intent(self, question, top_k_tables=3, top_k_cols=6, table_margin=2.0, col_margin=2.0, debug=False): | |
| # PATCH: Derby execution-plan queries must stay in performance_schema + sql_plan only. | |
| # With 12 tables in derby_system, the report/template tables score within margin | |
| # for phrasing like "get the execution plan for digest X" because words like | |
| # "execution", "plan", "time" match scheduler/report column descriptions. | |
| # We short-circuit the neural table linker entirely for this intent. | |
| q_lower_pre = question.lower() | |
| # _force_tables = None | |
| # if getattr(self, 'db_id', None) == 'derby_system': | |
| # plan_kws = ['execution plan', 'sql plan', 'explain plan', 'query plan', | |
| # 'plain text plan', 'plan for digest', 'plan for the digest'] | |
| # has_plan = any(kw in q_lower_pre for kw in plan_kws) | |
| # has_digest_ref = bool(re.search(r'\bdigest\b', q_lower_pre)) and 'plan' in q_lower_pre | |
| # if has_plan or has_digest_ref: | |
| # _force_tables = ['performance_schema', 'sql_plan'] | |
| _force_tables = None | |
| _injected_digest_filter = None | |
| if getattr(self, 'db_id', None) == 'derby_system': | |
| plan_kws = ['execution plan', 'sql plan', 'explain plan', 'query plan', | |
| 'plain text plan', 'plan for digest', 'plan for the digest'] | |
| has_plan = any(kw in q_lower_pre for kw in plan_kws) | |
| has_digest_ref = bool(re.search(r'\bdigest\b', q_lower_pre)) and 'plan' in q_lower_pre | |
| if has_plan or has_digest_ref: | |
| _force_tables = ['performance_schema', 'sql_plan'] | |
| # Also extract digest value if present but unquoted, so value binder | |
| # doesn't miss it (bind_values only catches quoted or numeric tokens). | |
| digest_val_match = re.search( | |
| r"\bdigest\s+['\"]?([\w\-]+--[\w\-]+)['\"]?", q_lower_pre | |
| ) | |
| if digest_val_match: | |
| _injected_digest_filter = digest_val_match.group(1) | |
| all_tables = list(self.schema.tables.keys()) | |
| table_pairs = [(question, self._serialize_table(t)) for t in all_tables] | |
| all_tables = [t for t in self.schema.tables.keys()] | |
| # if self.db_id == 'influx_system': | |
| # # Force Influx queries to only consider Influx tables | |
| # all_tables = [t for t in all_tables if t in ['sys_target_alerts', 'target_based_time_series_data']] | |
| # elif self.db_id == 'derby_system': | |
| # # Force Derby queries to only consider Derby tables | |
| # all_tables = [t for t in all_tables if t not in ['sys_target_alerts', 'target_based_time_series_data']] | |
| # Force table selection based on which DB the Router chose | |
| if self.db_id == 'influx_system': | |
| # If we are in Influx, the ONLY valid tables are metrics | |
| valid_tables = ['sys_target_alerts', 'target_based_time_series_data'] | |
| else: | |
| # If we are in Derby, exclude the Influx-specific tables | |
| valid_tables = [t for t in self.schema.tables.keys() if t not in ['sys_target_alerts', 'target_based_time_series_data']] | |
| # Filter the neural network's search space | |
| table_pairs = [(question, self._serialize_table(t)) for t in valid_tables] | |
| table_scores = self.table_linker.predict(table_pairs, show_progress_bar=False) | |
| sorted_table_indices = table_scores.argsort()[::-1] | |
| best_table_score = table_scores[sorted_table_indices[0]] | |
| active_tables = [] | |
| for idx in sorted_table_indices[:top_k_tables]: | |
| score = table_scores[idx] | |
| if score >= (best_table_score - table_margin): | |
| # active_tables.append(all_tables[idx]) | |
| active_tables.append(valid_tables[idx]) | |
| # Apply force-lock after neural scoring (preserves scores for col linker) | |
| # if _force_tables is not None: | |
| # active_tables = _force_tables | |
| # Apply force-lock after neural scoring (preserves scores for col linker) | |
| if _force_tables is not None: | |
| active_tables = _force_tables | |
| # Store injected digest for generate_sql to consume via engine attribute | |
| self._injected_digest_filter = getattr(self, '_injected_digest_filter', None) \ | |
| if not hasattr(self, '_injected_digest_filter') else _injected_digest_filter | |
| self._injected_digest_filter = _injected_digest_filter | |
| # ββ 1. The Smart NLP Lexical Anchor Extraction ββ | |
| doc = self.nlp(question.lower()) | |
| # Extract base lemmas ONLY for meaningful parts of speech. | |
| meaningful_lemmas = { | |
| token.lemma_ for token in doc | |
| if token.pos_ in ['NOUN', 'PROPN', 'ADJ'] | |
| } | |
| # Extract noun chunks for multi-word concepts | |
| noun_chunks = {chunk.text for chunk in doc.noun_chunks} | |
| # ββ 2. Smart Table Rescue ββ | |
| # Force-include tables if their clean lemma is explicitly a noun in the question. | |
| for t in all_tables: | |
| # Check against the raw table name AND Spider's normalized English name | |
| norm_t = getattr(self.schema, 'norm_table_names', {}).get(t, t).lower() | |
| if t in meaningful_lemmas or norm_t in meaningful_lemmas: | |
| if t not in active_tables: | |
| active_tables.append(t) | |
| if debug: | |
| print(f"[extract_intent] active_tables={active_tables}") | |
| col_hits = [] | |
| for table in active_tables: | |
| cols = [c for c in self.schema.tables[table] if c not in ('id', f"{table}_id")] | |
| if not cols: | |
| continue | |
| col_pairs = [(question, self._serialize_column(table, c)) for c in cols] | |
| col_scores = self.column_linker.predict(col_pairs, show_progress_bar=False) | |
| sorted_col_indices = col_scores.argsort()[::-1] | |
| best_col_score = col_scores[sorted_col_indices[0]] | |
| for idx in sorted_col_indices: | |
| score = col_scores[idx] | |
| if score >= (best_col_score - col_margin): | |
| col_hits.append((table, cols[idx], float(score))) | |
| # ββ 3. Smart Column Rescue ββ | |
| already_included = {(t, c) for t, c, s in col_hits} | |
| for table in active_tables: | |
| for col in self.schema.tables[table]: | |
| if col in ('id',) or col.endswith('_id'): | |
| continue | |
| # Fetch Spider's normalized name (e.g., "fname" -> "first name") | |
| norm_c = getattr(self.schema, 'norm_column_names', {}).get((table, col), col).lower() | |
| # We rescue the column if its normalized name is a direct lemma | |
| # OR if it's explicitly contained inside a noun chunk. | |
| is_lemma_match = norm_c in meaningful_lemmas | |
| is_chunk_match = any(norm_c in chunk for chunk in noun_chunks) | |
| if is_lemma_match or is_chunk_match: | |
| if (table, col) not in already_included: | |
| # Append with a score of 0.0 so it survives pruning but doesn't override neural top-picks | |
| col_hits.append((table, col, 0.0)) | |
| # if debug: | |
| # print(f"[extract_intent] col_hits={col_hits}") | |
| # ---> FIX 1: SURGICAL BOOLEAN RESCUE <--- | |
| # If antonyms are used, the neural net misses the column entirely. Force it in. | |
| # if any(w in q_lower for w in ['disabled', 'inactive', 'off']): | |
| # for t in active_tables: | |
| # if 'enabled' in self.schema.tables.get(t, {}) and not any(c == 'enabled' for _, c, _ in col_hits): | |
| # col_hits.append((t, 'enabled', 0.0)) | |
| # ---> SURGICAL BOOLEAN RESCUE <--- | |
| # If antonyms are used, the neural net often misses the column entirely. Force it in. | |
| rescue_triggers = { | |
| 'enabled': ['disabled', 'inactive', 'off', 'disable'], | |
| 'status': ['running', 'stopped', 'failed', 'error', 'active', 'inactive'] | |
| } | |
| for col_name, triggers in rescue_triggers.items(): | |
| # if any(w in q_lower for w in triggers): | |
| if any(w in question.lower() for w in triggers): | |
| for t in active_tables: | |
| if col_name in self.schema.tables.get(t, {}): | |
| # Check if the column is already in the list of tuples | |
| if not any(c == col_name for _, c, _ in col_hits): | |
| col_hits.append((t, col_name, 0.0)) | |
| if debug: | |
| print(f"[extract_intent] col_hits={col_hits}") | |
| filters = self.bind_values(question, active_tables, debug=debug) | |
| if filters: | |
| filter_tables = set(t for t, c, op, val, intent in filters) | |
| if len(filter_tables) == 1: | |
| sole = list(filter_tables)[0] | |
| # Only keep tables reachable from sole_table | |
| reachable = set(nx.single_source_shortest_path(self.schema.graph, sole).keys()) | |
| active_tables = [t for t in active_tables if t in reachable] | |
| # Rescue any tables discovered by the DB value matcher back into active_tables | |
| for t, c, op, val, intent in filters: | |
| if t not in active_tables: | |
| active_tables.append(t) | |
| if debug: | |
| print(f"[extract_intent] filters={filters}") | |
| return active_tables, col_hits, filters | |
| # ========================================================= | |
| # 3. SQL COMPOSER | |
| # ========================================================= | |
| def generate_sql(question, engine, debug=False): | |
| active_tables, col_hits, filters = engine.extract_intent(question, debug=debug) | |
| injected = getattr(engine, '_injected_digest_filter', None) | |
| if injected and not any('digest' in w for _, c, _, _, _ in filters for w in [c]): | |
| filters = list(filters) + [('performance_schema', 'digest', '=', f"'{injected}'", 'INCLUDE')] | |
| engine._injected_digest_filter = None | |
| for t, c, op, val, intent in filters: | |
| if t not in active_tables: | |
| active_tables.append(t) | |
| # ββ 1. BOOLEAN COLUMN PROMOTION (POLARITY FIX) ββ | |
| promoted_filter_cols = set() | |
| new_filters = list(filters) | |
| q_lower = question.lower() | |
| # FATAL FLAW FIX: We must only promote booleans belonging to the absolute core tables. | |
| core_table = active_tables[0] if active_tables else None | |
| for t, c, score in col_hits: | |
| # Strict enforcement: If this column doesn't belong to the main subject table, ignore it. | |
| if t != core_table: | |
| continue | |
| col_type_raw = engine.schema.tables.get(t, {}).get(c, 'TEXT') | |
| is_boolean_col = ( | |
| 'BOOL' in str(col_type_raw).upper() | |
| or c.startswith('is_') | |
| or c.startswith('has_') | |
| or c.startswith('hlc_is_') | |
| or c in ('enabled', 'status', 'platform') | |
| or c.endswith('_status') | |
| or c.endswith('_period') | |
| or c.endswith('_type') | |
| or c.endswith('_provider') | |
| or c.endswith('_day_of_week') # FIX 2: Add day_of_week to the enum list | |
| ) | |
| if not is_boolean_col: continue | |
| if any(fc == c and ft == t for ft, fc, _, _, _ in filters): continue | |
| # 1. THE NEURAL GUARDRAIL (Relaxed for antonyms) | |
| col_words = [w for w in c.lower().split('_') if w not in ('is', 'has', 'hlc')] | |
| lexical_extensions = set(col_words) | |
| if 'enabled' in col_words: lexical_extensions.update(['disabled', 'active', 'inactive']) | |
| if 'status' in col_words: lexical_extensions.update(['running', 'stopped', 'failed', 'error', 'active', 'inactive']) | |
| if 'dynamic' in col_words: lexical_extensions.update(['static']) | |
| has_lexical_match = any(w in q_lower for w in lexical_extensions) | |
| if score < -5.0 and not has_lexical_match: # Relaxed slightly more to let template_type through | |
| continue | |
| candidate_values = engine.db_lookup.get_values(engine.db_id, t, c) | |
| val_set = set(str(v).lower() for v in candidate_values) if candidate_values else set() | |
| universal_negation = [ | |
| 'not', 'no', 'false', 'without', 'excluding', 'exclude', | |
| 'disabled', 'inactive', 'stopped', 'off', 'never', 'fails', | |
| 'local', 'on-prem', 'on-premise' | |
| ] | |
| has_negation = any(re.search(rf'\b{w}\b', q_lower) for w in universal_negation) | |
| best_value = None | |
| # 2. BULLETPROOF POLARITY ROUTING | |
| # FIX 1: We no longer rely on val_set for columns starting with is/has. We force the structure. | |
| is_true_false_col = c.startswith('is_') or c.startswith('has_') or c.startswith('hlc_is_') | |
| if is_true_false_col: | |
| if 'demo' in c.lower(): | |
| if has_negation: | |
| best_value = 'false' | |
| elif 'demo' in q_lower: | |
| best_value = 'true' | |
| else: | |
| best_value = 'false' | |
| elif has_negation: | |
| best_value = 'false' | |
| else: | |
| best_value = 'true' | |
| # 2. STATUS ENUM ROUTING | |
| elif 'status' in c.lower() or c == 'enabled': | |
| if has_negation or 'inactive' in q_lower or 'disabled' in q_lower: | |
| best_value = 'Inactive' if 'active' in val_set or 'Inactive' in val_set else 'false' | |
| elif any(w in q_lower for w in ['active', 'running', 'enabled', 'working']): | |
| best_value = 'Active' if 'active' in val_set or 'Active' in val_set else 'true' | |
| else: | |
| continue | |
| # 3. DERBY TARGET STATUS ROUTING | |
| elif 'running' in val_set or 'stopped' in val_set: | |
| if any(w in q_lower for w in ['error', 'failed', 'failing']): | |
| best_value = 'Error' | |
| elif any(w in q_lower for w in ['stopped', 'down', 'inactive', 'not running']): | |
| best_value = 'Stopped' | |
| elif any(w in q_lower for w in ['running', 'active', 'up', 'working']): | |
| best_value = 'Running' | |
| else: | |
| continue | |
| # 4. FALLBACK FOR OTHER ENUMS (e.g. DAILY, Dashboard,R ENUMS MONDAY) | |
| if best_value is None and candidate_values: | |
| # BYPASS NEURAL NET: If the exact DB value is in the question, just use it! | |
| for val in candidate_values: | |
| # We check lower() to match, but we use the original case 'val' for the SQL | |
| if str(val).lower() in q_lower: | |
| best_value = val | |
| break | |
| # If the pure string match failed, ONLY THEN ask the neural net to guess | |
| if best_value is None: | |
| pairs = [(question, f"{engine._serialize_column(t, c)} | value: {val}") for val in candidate_values] | |
| scores = engine.value_linker.predict(pairs, show_progress_bar=False) | |
| best_idx = int(scores.argmax()) | |
| best_score = float(scores[best_idx]) | |
| # Require a positive score to prevent wild hallucinations like "DASHBOARD" for "FinOps" | |
| if best_score > 0.0: | |
| best_value = candidate_values[best_idx] | |
| if best_value is not None: | |
| new_filters.append((t, c, '=', f"'{best_value}'", 'INCLUDE')) | |
| promoted_filter_cols.add((t, c)) | |
| # 3. THE "CLOUD" DOUBLE-BIND CLEANUP | |
| cleaned_filters = [] | |
| for ft, fc, fop, fval, fintent in new_filters: | |
| if (ft, fc) in promoted_filter_cols: | |
| cleaned_filters.append((ft, fc, fop, fval, fintent)) | |
| continue | |
| val_clean = fval.replace("'", "").replace("%", "").lower() | |
| is_redundant = False | |
| for pt, pc in promoted_filter_cols: | |
| if pt == ft and val_clean in pc.lower() and len(val_clean) > 3: | |
| is_redundant = True | |
| break | |
| if not is_redundant: | |
| cleaned_filters.append((ft, fc, fop, fval, fintent)) | |
| filters = cleaned_filters | |
| col_hits = [(t, c, s) for t, c, s in col_hits if (t, c) not in promoted_filter_cols] | |
| sorted_hits = sorted(col_hits, key=lambda x: -x[2]) | |
| # ββ 2. PREPARE THE FROM SLOT ββ | |
| # ββ 2. PREPARE THE FROM SLOT (GENERALIZED GRAPH PRUNING) ββ | |
| path_nodes = set(active_tables) | |
| if len(active_tables) > 1: | |
| root = active_tables[0] | |
| valid_tables = [root] | |
| path_nodes = {root} | |
| for tgt in active_tables[1:]: | |
| try: | |
| path = nx.shortest_path(engine.schema.graph, root, tgt) | |
| path_nodes.update(path) | |
| valid_tables.append(tgt) | |
| except nx.NetworkXNoPath: | |
| # Silently drop hallucinated tables that can't be joined | |
| pass | |
| active_tables = valid_tables | |
| ordered_nodes = list(path_nodes) | |
| else: | |
| ordered_nodes = active_tables | |
| start_node = ordered_nodes[0] if ordered_nodes else active_tables[0] | |
| table_aliases = {tbl: f"T{i+1}" for i, tbl in enumerate(ordered_nodes)} | |
| subgraph = engine.schema.graph.subgraph(ordered_nodes) | |
| use_aliases = len(ordered_nodes) > 1 | |
| if use_aliases: | |
| join_clauses = [f"{start_node} AS {table_aliases[start_node]}"] | |
| try: | |
| for u, v in nx.bfs_edges(subgraph, source=start_node): | |
| edge_data = engine.schema.graph.get_edge_data(u, v) | |
| if edge_data: | |
| on_cond = edge_data['on'] | |
| left_part, right_part = on_cond.split(' = ') | |
| t1, c1 = left_part.strip().split('.') | |
| t2, c2 = right_part.strip().split('.') | |
| u_col, v_col = (c1, c2) if t1 == u else (c2, c1) | |
| join_clauses.append(f"JOIN {v} AS {table_aliases[v]} ON {table_aliases[u]}.{u_col} = {table_aliases[v]}.{v_col}") | |
| except Exception: pass | |
| joins_sql = ' ' + ' '.join(join_clauses) | |
| else: | |
| joins_sql = f" {start_node}" | |
| def apply_alias(col_str): | |
| if "." in col_str: | |
| t_name, c_name = col_str.split('.') | |
| if not use_aliases: return c_name | |
| if t_name in table_aliases: return f"{table_aliases[t_name]}.{c_name}" | |
| return col_str | |
| # ββ 3. AST SLOTS INITIALIZATION ββ | |
| slots = {"DISTINCT": False, "SELECT": [], "FROM": joins_sql, "WHERE": [], "GROUP BY": [], "HAVING": "", "ORDER BY": "", "LIMIT": ""} | |
| skeleton_label, skeleton_name = engine.skeleton.predict(question) | |
| aggs = engine.detect_aggregation(question) | |
| if re.search(r'\bper\s+\w+\b', q_lower) and aggs: skeleton_label = GROUPED_AGG | |
| if skeleton_label == EXCEPT_QUERY and aggs: | |
| skeleton_label = PURE_AGG | |
| t_main = next((t for t in active_tables), None) | |
| t_sub = next((t for t in active_tables if t != t_main), None) | |
| if t_main and t_sub: | |
| main_pk = engine.schema.primary_keys.get(t_main) or next((c for c in engine.schema.tables.get(t_main, {}) if 'id' in c.lower()), None) | |
| fk_in_sub = next((c for c in engine.schema.tables.get(t_sub, {}) if c == main_pk or (t_main.rstrip('s') in c and 'id' in c)), main_pk) | |
| if main_pk and fk_in_sub: | |
| sub_filters = [f"{c} {op} {val}" for t, c, op, val, intent in filters if t == t_sub] | |
| sub_where = f" WHERE {' AND '.join(sub_filters)}" if sub_filters else "" | |
| slots["WHERE"].append(f"{main_pk} NOT IN (SELECT {fk_in_sub} FROM {t_sub}{sub_where})") | |
| slots["FROM"] = f" {t_main}" | |
| use_aliases = False | |
| table_aliases = {t_main: t_main} | |
| # ββ 4. FILL THE WHERE SLOT ββ | |
| filter_dict = {} | |
| for t, c, op, val, intent in filters: | |
| col_str = apply_alias(f"{t}.{c}") | |
| if col_str not in filter_dict: filter_dict[col_str] = [] | |
| filter_dict[col_str].append(f"{col_str} {op} {val}") | |
| intersect_vals = [] | |
| except_outer_table = next((t for t in active_tables), None) if skeleton_label == EXCEPT_QUERY else None | |
| for col_str, conditions in filter_dict.items(): | |
| if except_outer_table is not None: | |
| filter_table = next((t for t, c, op, val, intent in filters if apply_alias(f"{t}.{c}") == col_str), None) | |
| if filter_table and filter_table != except_outer_table: continue | |
| if len(conditions) > 1 and ("both" in q_lower or "and" in q_lower): intersect_vals = conditions | |
| elif len(conditions) > 1: slots["WHERE"].append("(" + " OR ".join(conditions) + ")") | |
| else: slots["WHERE"].append(conditions[0]) | |
| if "average" in q_lower and any(kw in q_lower for kw in ["above", "older", "greater", "more", "higher"]): | |
| num_col_raw = next((f"{t}.{c}" for t, c, s in sorted_hits if engine.schema.tables[t].get(c) == 'NUM'), None) | |
| if num_col_raw: | |
| tbl, col = num_col_raw.split('.') | |
| slots["WHERE"].append(f"{apply_alias(num_col_raw)} > (SELECT avg({col}) FROM {tbl})") | |
| skeleton_label = SIMPLE | |
| elif "average" in q_lower and any(kw in q_lower for kw in ["below", "smaller", "less", "under", "lower"]): | |
| num_col_raw = next((f"{t}.{c}" for t, c, s in sorted_hits if engine.schema.tables[t].get(c) == 'NUM'), None) | |
| if num_col_raw: | |
| tbl, col = num_col_raw.split('.') | |
| slots["WHERE"].append(f"{apply_alias(num_col_raw)} < (SELECT avg({col}) FROM {tbl})") | |
| skeleton_label = SIMPLE | |
| # ββ 5. PREPARE COLUMNS & SHOW ALL FIX ββ | |
| structural_cols = set() | |
| for u, v, data in engine.schema.graph.edges(data=True): | |
| for part in data.get('on', '').split(' = '): | |
| if '.' in part.strip(): structural_cols.add(tuple(part.strip().split('.'))) | |
| for tbl, pk in engine.schema.primary_keys.items(): structural_cols.add((tbl, pk)) | |
| q_tokens = set(re.findall(r'\w+', q_lower)) | |
| candidates_with_scores = [(t, c, s) for t, c, s in sorted_hits if (t, c) not in structural_cols or len(active_tables) == 1 or c.lower() in q_tokens] | |
| if candidates_with_scores: | |
| best_score = candidates_with_scores[0][2] | |
| if best_score < 0: candidates_with_scores = [candidates_with_scores[0]] | |
| else: | |
| safe_candidates = [] | |
| for t, c, s in candidates_with_scores: | |
| if s >= best_score - 2.0: safe_candidates.append((t, c, s)) | |
| elif s == 0.0 and (c.lower() in q_tokens or c.lower() + 's' in q_tokens or c.lower().rstrip('s') in q_tokens): safe_candidates.append((t, c, s)) | |
| candidates_with_scores = safe_candidates | |
| else: candidates_with_scores = sorted_hits[:1] | |
| # candidates = [(t, c) for t, c, s in candidates_with_scores] | |
| # ORPHANED COLUMN KILL SWITCH: Only allow columns if their table survived the JOIN graph | |
| candidates = [(t, c) for t, c, s in candidates_with_scores if t in ordered_nodes] | |
| # Fallback if the killer switch wipes everything out | |
| if not candidates: | |
| candidates = [(start_node, 'name')] if 'name' in engine.schema.tables.get(start_node, {}) else [(start_node, list(engine.schema.tables.get(start_node, {}).keys())[0])] | |
| math_words = {'average', 'max', 'min', 'sum', 'highest', 'lowest', 'maximum', 'minimum'} | |
| for t, c in list(candidates): | |
| if c.lower() in math_words: | |
| other_nums = [col for col, ctype in engine.schema.tables.get(t, {}).items() if ctype == 'NUM' and col.lower() not in math_words] | |
| if other_nums and aggs: candidates.remove((t, c)) | |
| has_name_col = any('name' in c.lower() for t, c in candidates) | |
| for t in active_tables: | |
| for c in engine.schema.tables[t]: | |
| if (t, c) in candidates or c == 'id' or c.endswith('_id'): continue | |
| c_clean = c.lower() | |
| if c_clean in ['name', 'pettype', 'type', 'country'] and c_clean in q_tokens: | |
| if c_clean == 'name' and has_name_col: continue | |
| candidates.append((t, c)) | |
| filter_cols = set((t, c) for t, c, op, val, intent in filters) | |
| if len(candidates) > 1: | |
| pure = [x for x in candidates if x not in filter_cols] | |
| if pure: candidates = pure | |
| else: | |
| for t in active_tables: | |
| for fallback in ['name', 'title', 'integration_name']: | |
| if fallback in engine.schema.tables.get(t, {}): | |
| candidates = [(t, fallback)] | |
| break | |
| else: continue | |
| break | |
| elif len(candidates) == 1 and candidates[0] in filter_cols: | |
| t = candidates[0][0] | |
| display_fallback = next((c for c in engine.schema.tables.get(t, {}) if c not in ('id',) and not c.endswith('_id') and c not in {fc for _, fc, _, _, _ in filters}), None) | |
| if display_fallback: candidates = [(t, display_fallback)] | |
| if re.search(r'\bid\b', q_lower): | |
| for t in active_tables: | |
| pk = engine.schema.primary_keys.get(t) | |
| if pk and (t, pk) not in filter_cols and t not in [ft for ft, fc, fo, fv, fi in filters]: | |
| candidates = [(t, pk)] + [x for x in candidates if x != (t, pk)] | |
| break | |
| # --- π€ CHATBOT CONVERSATIONAL BYPASS --- | |
| # Phrases your coworker uses that imply they want the "whole row" | |
| chatbot_phrases = [ | |
| "give me", "show me", "tell me about", "get me", "profile for", | |
| "information on", "info for", "details of", "all target", "complete profile" | |
| ] | |
| # Standard list phrases | |
| list_phrases = ["list all", "show all", "get all", "list demo", "show demo", "list reports", "list targets"] | |
| is_asking_for_everything = any(p in q_lower for p in chatbot_phrases + list_phrases) | |
| # Keywords that suggest the user wants a SPECIFIC calculation (which would break SELECT *) | |
| # We add "status" and "platform" here so "What is the status" doesn't trigger SELECT * | |
| explicit_math_or_col = any(re.search(rf'\b{w}\b', q_lower) for w in [ | |
| "names", "ids", "types", "platforms", "status", "text", "plan", | |
| "average", "count", "how many", "total", "avg", "sum" | |
| ]) | |
| # If they use chatbot phrasing and didn't ask for a specific math/column, give them SELECT * | |
| if is_asking_for_everything and not explicit_math_or_col and not aggs: | |
| top_cols = ["*"] | |
| display_col = "*" | |
| else: | |
| top_cols = [apply_alias(f"{t}.{c}") for t, c in candidates] | |
| top_cols.sort(key=lambda x: 0 if 'name' in x.lower() else 1) | |
| display_col = top_cols[0] if top_cols else "*" | |
| def get_group_col(disp_col): | |
| if disp_col == "*": return disp_col | |
| if "." in disp_col: | |
| alias = disp_col.split('.')[0] | |
| t_name = next((k for k, v in table_aliases.items() if v == alias), None) if use_aliases else alias | |
| else: | |
| t_name = start_node | |
| alias = start_node if not use_aliases else table_aliases[start_node] | |
| if t_name and 'name' in disp_col.lower(): | |
| pk = f"{t_name}_id" if f"{t_name}_id" in engine.schema.tables[t_name] else "id" | |
| if pk in engine.schema.tables[t_name]: return f"{alias}.{pk}" if use_aliases else pk | |
| return disp_col | |
| sort_col_raw, sort_dir = engine.resolve_ordering(question, active_tables) | |
| sort_col = apply_alias(sort_col_raw) if sort_col_raw else None | |
| is_explicit_order = any(k in q_lower for k in ["ordered by", "order by", "sorted by", "descending", "ascending"]) | |
| top_n_match = re.search(r'\b(?:top|bottom|worst|best|highest|lowest)\s+(\d+)\b', q_lower) | |
| forced_limit = int(top_n_match.group(1)) if top_n_match else None | |
| # ---> ROBUST SKELETON DOWNGRADE <--- | |
| # If the AI predicts SUPERLATIVE but there is nothing to sort by, no limit requested, | |
| # and no explicit count requested, then the classification is a false positive. | |
| if skeleton_label == SUPERLATIVE and not sort_col_raw and not top_n_match: | |
| is_count_select = any(q_lower.startswith(x) for x in ['how many', 'find the number', 'what is the number', 'number of']) | |
| if not is_count_select: | |
| skeleton_label = SIMPLE | |
| if sort_col_raw and skeleton_label not in [GROUPED_AGG, SUPERLATIVE, EXCEPT_QUERY] and not top_n_match: | |
| if not aggs and not is_explicit_order: | |
| if any(w in q_lower for w in ['youngest', 'oldest', 'most', 'least', 'highest', 'lowest', 'maximum', 'minimum', 'best', 'worst', 'largest', 'smallest']): | |
| skeleton_label = SUPERLATIVE | |
| # Down-grade false-positive PURE_AGG intents | |
| if skeleton_label == PURE_AGG and not aggs: | |
| is_count_word = any(w in q_lower for w in ['how many', 'count', 'total', 'number of']) | |
| if not is_count_word: | |
| skeleton_label = SIMPLE | |
| # ββ 6. POPULATE AST BY NEURAL SKELETON ββ | |
| if skeleton_label == EXCEPT_QUERY or (any(kw in q_lower.split() for kw in ["without"]) or "did not" in q_lower): | |
| t_main = active_tables[0] | |
| for col_str in top_cols: | |
| if col_str == "*": continue | |
| prefix = col_str.split('.')[0] if '.' in col_str else start_node | |
| cand_t = next((k for k, v in table_aliases.items() if v == prefix), prefix) | |
| if cand_t in active_tables: | |
| t_main = cand_t | |
| break | |
| t_sub = next((t for t in active_tables if t != t_main and engine.schema.primary_keys.get(t_main) in engine.schema.tables.get(t, {})), next((t for t in active_tables if t != t_main), None)) | |
| slots["FROM"] = f" {t_main} AS {table_aliases[t_main]}" if use_aliases else f" {t_main}" | |
| main_pk = engine.schema.primary_keys.get(t_main) or next((c for c in engine.schema.tables.get(t_main, {}) if 'id' in c.lower()), list(engine.schema.tables.get(t_main, {}).keys())[0]) | |
| if t_sub: | |
| fk_in_sub = next((c for c in engine.schema.tables.get(t_sub, {}) if c == main_pk or t_main in c), main_pk) | |
| target_alias = table_aliases[t_main] if use_aliases else t_main | |
| inner_wheres = [] | |
| outer_wheres = [] | |
| bridge_filter_tables = set(t for t, c, op, val, intent in filters if t != t_main and t != t_sub) | |
| has_explicit_exclude = any(i == 'EXCLUDE' for _, _, _, _, i in filters) | |
| for t, c, op, val, intent in filters: | |
| if t == t_sub or t in bridge_filter_tables: | |
| if intent == 'EXCLUDE': inner_wheres.append(f"{c} = {val}") | |
| elif intent == 'INCLUDE' and (has_explicit_exclude or t == t_main): outer_wheres.append((t, c, op, val)) | |
| else: inner_wheres.append(f"{c} {op} {val}") | |
| if outer_wheres: | |
| slots["FROM"] = joins_sql | |
| use_aliases = len(ordered_nodes) > 1 | |
| table_aliases = {tbl: f"T{i+1}" for i, tbl in enumerate(ordered_nodes)} | |
| for t, c, op, val in outer_wheres: | |
| slots["WHERE"].append(f"{apply_alias(f'{t}.{c}')} {op} {val}") | |
| inner_where_str = f" WHERE {' AND '.join(inner_wheres)}" if inner_wheres else "" | |
| if bridge_filter_tables: | |
| bridge_t = list(bridge_filter_tables)[0] | |
| edge = engine.schema.graph.get_edge_data(t_sub, bridge_t) | |
| if edge: | |
| sub_join = f"{t_sub} JOIN {bridge_t} ON {edge['on']}" | |
| slots["WHERE"].append(f"{target_alias}.{main_pk} NOT IN (SELECT {fk_in_sub} FROM {sub_join}{inner_where_str})") | |
| else: slots["WHERE"].append(f"{target_alias}.{main_pk} NOT IN (SELECT {fk_in_sub} FROM {t_sub}{inner_where_str})") | |
| else: slots["WHERE"].append(f"{target_alias}.{main_pk} NOT IN (SELECT {fk_in_sub} FROM {t_sub}{inner_where_str})") | |
| slots["SELECT"] = top_cols if top_cols else ["*"] | |
| elif skeleton_label == SUPERLATIVE: | |
| order_dir_sql = " DESC" if sort_dir == 'DESC' else "" | |
| is_count_select = any(q_lower.startswith(x) for x in ['how many', 'find the number', 'what is the number', 'number of']) | |
| if is_count_select and len(active_tables) >= 2 and sort_col_raw: | |
| sort_t = sort_col_raw.split('.')[0] if '.' in sort_col_raw else start_node | |
| t_inner = next((k for k, v in table_aliases.items() if v == sort_t), sort_t) if use_aliases else sort_t | |
| t_outer = next((t for t in active_tables if t != t_inner), active_tables[0]) | |
| sort_fk = next((c for c in engine.schema.tables.get(t_outer, {}) if t_inner in c or c == f"{t_inner}_id"), None) | |
| inner_pk = next((c for c in engine.schema.tables.get(t_inner, {}) if c == 'id' or c.endswith('_id')), 'id') | |
| if sort_fk: | |
| slots["SELECT"] = ["count(*)"] | |
| slots["FROM"] = f" {t_outer}" if not use_aliases else f" {t_outer} AS {table_aliases[t_outer]}" | |
| target_alias = table_aliases[t_outer] if use_aliases else t_outer | |
| inner_col = sort_col_raw.split('.')[-1] | |
| slots["WHERE"].append(f"{target_alias}.{sort_fk} = (SELECT {inner_pk} FROM {t_inner} ORDER BY {inner_col}{order_dir_sql} LIMIT 1)") | |
| else: | |
| slots["ORDER BY"] = f"ORDER BY {sort_col}{order_dir_sql}" | |
| if not is_explicit_order: slots["LIMIT"] = "LIMIT 1" | |
| slots["SELECT"].append("count(*)") | |
| else: | |
| slots["ORDER BY"] = f"ORDER BY {sort_col}{order_dir_sql}" if sort_col else "ORDER BY count(*) DESC" | |
| if not is_explicit_order: slots["LIMIT"] = "LIMIT 1" | |
| if is_count_select: select_cols = ["count(*)"] | |
| elif top_cols: | |
| select_cols = [c for c in top_cols if c != sort_col or is_explicit_order] | |
| if not select_cols: select_cols = top_cols | |
| else: select_cols = ["*"] | |
| for sc in select_cols: slots["SELECT"].append(sc) | |
| d_col = select_cols[0] if select_cols else "*" | |
| if "count(*)" in slots["ORDER BY"].lower() and d_col != "count(*)": | |
| slots["GROUP BY"].append(get_group_col(d_col if d_col != "*" and "count" not in d_col else top_cols[0])) | |
| elif skeleton_label == GROUPED_AGG: | |
| group_candidates = [col for col in top_cols if 'id' not in col.lower() and col != '*'] | |
| if not group_candidates: group_candidates = [col for col in top_cols if col != '*'] | |
| if not group_candidates and display_col != '*': group_candidates = [display_col] | |
| elif not group_candidates: group_candidates = [apply_alias(f"{start_node}.name")] | |
| def get_col_type(c): | |
| prefix = c.split('.')[0] if '.' in c else start_node | |
| t = next((k for k, v in table_aliases.items() if v == prefix), prefix) | |
| return engine.schema.tables.get(t, {}).get(c.split('.')[-1], 'TEXT') | |
| numeric_cols = [c for c in group_candidates if get_col_type(c) == 'NUM' and c.split('.')[-1] not in [x[1] for x in structural_cols]] | |
| categ_cols = [c for c in group_candidates if c not in numeric_cols] | |
| if aggs and numeric_cols: | |
| for agg in aggs: | |
| for nc in numeric_cols: slots["SELECT"].append(f"{agg.lower()}({nc})") | |
| group_by_col = categ_cols[0] if categ_cols else group_candidates[0] | |
| slots["SELECT"].extend(categ_cols if categ_cols else group_candidates) | |
| else: | |
| slots["SELECT"].extend(group_candidates) | |
| group_by_col = group_candidates[0] | |
| slots["GROUP BY"].append(get_group_col(group_by_col)) | |
| if any(k in q_lower for k in ['how many', 'number of', 'count']): | |
| slots["SELECT"].append("count(*)") | |
| elif skeleton_label == PURE_AGG: | |
| use_count_distinct = any(k in q_lower for k in ["distinct", "different", "unique", "various"]) | |
| count_str = f"count(DISTINCT {display_col})" if use_count_distinct and display_col != '*' else "count(*)" | |
| if not aggs: | |
| slots["SELECT"].append(count_str) | |
| else: | |
| target_col = sort_col if sort_col and sort_col != 'count(*)' else display_col | |
| has_literal_avg = any('average' in engine.schema.tables[t] for t in active_tables) | |
| for agg in aggs: | |
| if agg == "COUNT": slots["SELECT"].append(count_str) | |
| else: | |
| if agg == "AVG" and has_literal_avg and "average of" not in q_lower: | |
| avg_raw = next(f"{t}.average" for t in active_tables if 'average' in engine.schema.tables[t]) | |
| slots["SELECT"].append(apply_alias(avg_raw)) | |
| else: slots["SELECT"].append(f"{agg.lower()}({target_col})") | |
| for col in top_cols: | |
| if col != target_col and col not in slots["SELECT"] and col != '*' and col.split('.')[-1] in q_lower: | |
| slots["SELECT"].append(col) | |
| else: | |
| if any(k in q_lower for k in ["distinct", "unique", "different"]): slots["DISTINCT"] = True | |
| for c in top_cols: slots["SELECT"].append(c) | |
| if sort_col and any(k in q_lower for k in ["order by", "sorted by", "descending", "ascending"]): | |
| order_dir_sql = " DESC" if sort_dir == 'DESC' else "" | |
| slots["ORDER BY"] = f"ORDER BY {sort_col}{order_dir_sql}" | |
| if top_n_match: | |
| slots["ORDER BY"] = f"ORDER BY {sort_col if sort_col else 'data_value_double'} DESC" | |
| slots["LIMIT"] = f"LIMIT {forced_limit}" | |
| having_match = re.search(r'(more than|at least|fewer than|less than|greater than|over|under)\s+(\d+)', q_lower) | |
| if having_match and slots["GROUP BY"]: | |
| op_map = {'more than': '>', 'over': '>', 'greater than': '>', 'at least': '>=', 'fewer than': '<', 'less than': '<', 'under': '<'} | |
| op = op_map.get(having_match.group(1), '>') | |
| num = having_match.group(2) | |
| slots["HAVING"] = f"HAVING count(*) {op} {num}" | |
| # ββ 7. DIALECT ENFORCEMENT ENGINE ββ | |
| if getattr(engine, 'db_id', None) == 'influx_system': | |
| active_tables = ['sys_target_alerts'] | |
| slots["FROM"] = " sys_target_alerts" | |
| if not any('is_demo_alert' in w for w in slots["WHERE"]): | |
| slots["WHERE"].append("is_demo_alert != 'true'") | |
| is_bottom = any(w in q_lower for w in ['bottom', 'least', 'smallest', 'lowest']) | |
| order_dir = "ASC" if is_bottom else "DESC" | |
| if aggs: | |
| # If it's just asking for top/bottom anomalies (Superlative), DO NOT group. | |
| # We must return the raw 4 columns so the orchestrator can extract the digest! | |
| if skeleton_label == SUPERLATIVE or top_n_match or any(w in q_lower for w in ['top', 'bottom', 'worst']): | |
| slots["SELECT"] = ['target', 'metric', 'data_value_double', 'time'] | |
| slots["GROUP BY"] = [] | |
| slots["ORDER BY"] = f"ORDER BY data_value_double {order_dir}" | |
| slots["LIMIT"] = f"LIMIT {forced_limit}" if forced_limit else "LIMIT 5" | |
| else: | |
| # True aggregations (like "average by target") that don't feed into a multi-step digest lookup | |
| new_select = ['target'] | |
| for agg in aggs: | |
| if agg in ('MAX', 'MIN', 'AVG', 'SUM'): new_select.append(f"{agg.lower()}(data_value_double)") | |
| elif agg == 'COUNT': new_select.append("count(*)") | |
| slots["SELECT"] = new_select | |
| slots["GROUP BY"] = ['target'] | |
| if top_n_match: | |
| slots["ORDER BY"] = f"ORDER BY {new_select[1]} {order_dir}" | |
| slots["LIMIT"] = f"LIMIT {forced_limit}" if forced_limit else "LIMIT 5" | |
| else: | |
| slots["SELECT"] = [re.sub(r'\bT\d+\.', '', c) for c in slots["SELECT"]] | |
| if "*" in slots["SELECT"] and len(slots["SELECT"]) > 1: slots["SELECT"].remove("*") | |
| INFLUX_FIXED_COLS = {'target', 'metric', 'data_value_double', 'time', 'is_demo_alert', 'owner_id', 'templatename', 'confidencescore', 'measurement_value', 'data_value_long'} | |
| neural_pick = [c for c in slots["SELECT"] if c.lower() not in INFLUX_FIXED_COLS and c != '*'] | |
| where_cols = [c.split('.')[-1] for c in filter_dict.keys()] | |
| # ---> ORCHESTRATOR SAFETY LOCK <--- | |
| # The python orchestrator hardcodes indices 0, 1, 2, 3. We MUST guarantee they exist in this exact order. | |
| final_select = ['target', 'metric', 'data_value_double', 'time'] | |
| # Append anything else the user explicitly asked for to the end | |
| for col in neural_pick + where_cols: | |
| if col not in final_select and col != '*': | |
| final_select.append(col) | |
| slots["SELECT"] = final_select | |
| slots["GROUP BY"] = [] | |
| if top_n_match or skeleton_label == SUPERLATIVE or any(w in q_lower for w in ['top', 'bottom', 'worst']): | |
| slots["ORDER BY"] = f"ORDER BY data_value_double {order_dir}" | |
| slots["LIMIT"] = f"LIMIT {forced_limit}" if forced_limit else "LIMIT 5" | |
| elif getattr(engine, 'db_id', None) == 'derby_system': | |
| is_plan = any('plan' in c.lower() for c in slots["SELECT"]) | |
| has_digest = any('digest' in w.lower() for w in slots["WHERE"]) or 'digest' in q_lower | |
| if is_plan and has_digest: | |
| slots["FROM"] = " performance_schema AS T1 JOIN sql_plan AS T2 ON T1.sql_plan_id = T2.id" | |
| plan_cols = {'sql_plan', 'sql_plain_text_plan'} | |
| new_select = [] | |
| for c in slots["SELECT"]: | |
| if c == '*': | |
| new_select.append('*') | |
| continue | |
| bare = re.sub(r'\bT\d+\.', '', c) | |
| if bare in plan_cols: new_select.append(f"T2.{bare}") | |
| else: new_select.append(f"T1.{bare}") | |
| slots["SELECT"] = new_select | |
| # new_where = [] | |
| # for w in slots["WHERE"]: | |
| # parts = w.split(' ', 2) | |
| # if len(parts) == 3 and not w.startswith('('): new_where.append(f"T1.digest {parts[1]} {parts[2]}") | |
| # else: new_where.append(w) | |
| # slots["WHERE"] = new_where | |
| new_where = [] | |
| for w in slots["WHERE"]: | |
| # Wipe out any hallucinated table prefixes and strictly enforce T1.digest | |
| w_clean = re.sub(r'\b(?:T\d+|performance_schema|sql_plan)\.digest\b', 'T1.digest', w) | |
| w_clean = re.sub(r'(?<!\.)\bdigest\b', 'T1.digest', w_clean) | |
| new_where.append(w_clean) | |
| slots["WHERE"] = new_where | |
| bad_cols = {'measurement_value', 'confidencescore', 'templatename', 'is_demo_alert'} | |
| slots["SELECT"] = [c for c in slots["SELECT"] if c.lower() not in bad_cols] | |
| slots["SELECT"] = list(dict.fromkeys(slots["SELECT"])) | |
| is_join_query = 'JOIN' in slots["FROM"].upper() | |
| if not is_join_query: | |
| slots["SELECT"] = [ | |
| c.split('.')[-1] if '.' in c and not c.startswith('(') | |
| and not any(fn in c for fn in ('max(', 'min(', 'avg(', 'sum(', 'count(')) | |
| else c | |
| for c in slots["SELECT"] | |
| ] | |
| # Enforce identifier context for status queries | |
| if getattr(engine, 'db_id', None) == 'derby_system': | |
| if any('status' in c for c in slots["SELECT"]) and not any('name' in c for c in slots["SELECT"]): | |
| if 'name' in engine.schema.tables.get(start_node, {}): | |
| slots["SELECT"] = ['name'] + slots["SELECT"] | |
| # ββ 8. ASSEMBLE AST SLOTS INTO SQL STRING ββ | |
| def assemble(ast_slots): | |
| dist = "DISTINCT " if ast_slots["DISTINCT"] else "" | |
| sel = f"SELECT {dist}" + ", ".join(list(dict.fromkeys(ast_slots["SELECT"]))) if ast_slots["SELECT"] else f"SELECT {dist}*" | |
| frm = " FROM" + ast_slots["FROM"] | |
| whr = " WHERE " + " AND ".join(ast_slots["WHERE"]) if ast_slots["WHERE"] else "" | |
| grp = " GROUP BY " + ", ".join(ast_slots["GROUP BY"]) if ast_slots["GROUP BY"] else "" | |
| hav = " " + ast_slots["HAVING"] if ast_slots.get("HAVING") else "" | |
| ordr = " " + ast_slots["ORDER BY"] if ast_slots["ORDER BY"] else "" | |
| lmt = " " + ast_slots["LIMIT"] if ast_slots["LIMIT"] else "" | |
| return f"{sel}{frm}{whr}{grp}{hav}{ordr}{lmt}".strip() | |
| if intersect_vals and len(intersect_vals) == 2: | |
| s1, s2 = slots.copy(), slots.copy() | |
| s1["WHERE"] = slots["WHERE"] + [intersect_vals[0]] | |
| s2["WHERE"] = slots["WHERE"] + [intersect_vals[1]] | |
| final_sql = f"{assemble(s1)} INTERSECT {assemble(s2)}" | |
| else: | |
| final_sql = assemble(slots) | |
| return final_sql | |
| # # ========================================================= | |
| # # ROUTING & ORCHESTRATION (Formerly Cell 2) | |
| # # ========================================================= | |
| # print("\nInitializing Semantic Router...") | |
| # router_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| # faq_intents = { | |
| # "LIST_DERBY_TABLES": {"anchors": ["What tables are in the database?", "Show me the Derby database schema.", "List all the tables you have.", "What configuration tables exist?", "Show me the metadata tables."], "clarification": "Did you mean to ask for a list of all configuration and metadata tables in the Derby system?", "target_db": "derby_system", "sql": "SELECT name FROM sqlite_master WHERE type='table';"}, | |
| # "LIST_INFLUX_TABLES": {"anchors": ["What tables are in InfluxDB?", "Show me the time series measurements.", "Where are the anomalies stored?", "What does the Influx database contain?"], "clarification": "Did you mean to ask for a list of the time-series metric tables in InfluxDB?", "target_db": "influx_system", "sql": "SELECT name FROM sqlite_master WHERE type='table';"}, | |
| # "LIST_SUPPORTED_DATABASES": {"anchors": ["What are the different databases you have?", "Which database engines are supported?", "List the target database types.", "Do you support Oracle and MySQL?"], "clarification": "Did you mean to ask what types of database engines (like MySQL, Oracle) are currently monitored?", "target_db": "derby_system", "sql": "SELECT DISTINCT hlc_database_type FROM target WHERE hlc_database_type IS NOT NULL;"}, | |
| # "LIST_CLOUD_PROVIDERS": {"anchors": ["What cloud platforms do you monitor?", "Are there any targets on AWS or Azure?", "Show me the cloud hosting providers.", "Which targets are on the cloud?"], "clarification": "Did you mean to ask which cloud platforms your databases are hosted on?", "target_db": "derby_system", "sql": "SELECT DISTINCT cloud_region, is_on_cloud FROM target WHERE is_on_cloud = 'true';"}, | |
| # "CLOUD_PRICING_INFO": {"anchors": ["How does cloud pricing work?", "Show me the cost of cloud databases.", "What are the pricing lookup codes?", "Show me billing and credit costs for AWS."], "clarification": "Did you mean to view the cloud database pricing and credit cost charts?", "target_db": "derby_system", "sql": "SELECT DISTINCT cloud_database_provider, region, credit_cost FROM cloud_database_pricing;"}, | |
| # "COLLECTOR_INFO": {"anchors": ["What are collector agents?", "Show me the dbactdc instances.", "List all the monitoring guards.", "Where are the collectors installed?", "What are the SSH hosts for the collectors?"], "clarification": "Did you mean to ask for a list of the active collector agents?", "target_db": "derby_system", "sql": "SELECT name, status, platform, ssh_host FROM dbactdc_instance;"}, | |
| # "EXTERNAL_INTEGRATIONS": {"anchors": ["What external integrations are supported?", "Do you connect to Datadog or AppDynamics?", "Show me the third party tools.", "What alerts can be sent to New Relic?"], "clarification": "Did you mean to view the configured external integrations?", "target_db": "derby_system", "sql": "SELECT integration_name, enabled FROM integration;"}, | |
| # "REPORTING_CAPABILITIES": {"anchors": ["What types of reports exist?", "Show me the report scheduling options.", "Can I get PDF emails?", "List the automated reports."], "clarification": "Did you mean to ask about the automated report schedules and formats?", "target_db": "derby_system", "sql": "SELECT name, scheduler_params_period, is_email_pdf_report_enabled FROM report;"}, | |
| # "DASHBOARD_TEMPLATES": {"anchors": ["What dashboard templates do you have?", "Show me the UI dashlets.", "What is a FinOps template?", "How are charts configured?"], "clarification": "Did you mean to list the available dashboard and chart templates?", "target_db": "derby_system", "sql": "SELECT name, template_type, is_dynamic FROM template;"}, | |
| # "PERFORMANCE_SCHEMA_INFO": {"anchors": ["Where are the execution plans stored?", "How do you track SQL digests?", "Show me how SQL text is mapped.", "What is the performance schema?"], "clarification": "Did you mean to ask how SQL queries and execution plans are stored?", "target_db": "derby_system", "sql": "SELECT digest, substr(digest_text, 1, 50) as sql_preview FROM performance_schema LIMIT 5;"} | |
| # } | |
| # anchor_texts = [] | |
| # anchor_intents = [] | |
| # for intent_id, data in faq_intents.items(): | |
| # for anchor in data["anchors"]: | |
| # anchor_texts.append(anchor) | |
| # anchor_intents.append(intent_id) | |
| # anchor_embeddings = router_model.encode(anchor_texts, convert_to_tensor=True) | |
| # def semantic_metadata_router(user_question, embedding_model, anchor_embeddings, threshold_high=0.75, threshold_mid=0.68): | |
| # q_emb = embedding_model.encode(user_question, convert_to_tensor=True) | |
| # cosine_scores = util.cos_sim(q_emb, anchor_embeddings)[0] | |
| # best_score_val, best_idx = torch.max(cosine_scores, dim=0) | |
| # best_score = best_score_val.item() | |
| # winning_intent_id = anchor_intents[best_idx] | |
| # intent_data = faq_intents[winning_intent_id] | |
| # if best_score >= threshold_high: | |
| # return {"status": "MATCH", "sql": intent_data["sql"], "target_db": intent_data["target_db"], "message": "Direct Match"} | |
| # elif best_score >= threshold_mid: | |
| # return {"status": "CLARIFY", "message": intent_data["clarification"], "sql": intent_data["sql"], "target_db": intent_data["target_db"]} | |
| # else: | |
| # return {"status": "PASS", "sql": None, "target_db": None, "message": None} | |
| # def route_query(question): | |
| # q_lower = question.lower() | |
| # if any(k in q_lower for k in ['sql query text', 'actual sql', 'execution plan']): return 'derby_system' | |
| # influx_triggers = [ | |
| # 'anomaly', 'anomalies', 'spike', 'bottleneck', 'bottlenecks', | |
| # 'alert', 'alerts', 'time series', 'worst performing', | |
| # 'top sql issue', 'top sql issues', 'sql problem', 'sql problems', 'capture source', 'capture method', | |
| # 'performance issue', 'performance bottleneck', 'detected for' | |
| # ] | |
| # if any(trigger in q_lower for trigger in influx_triggers): | |
| # return 'influx_system' | |
| # return 'derby_system' | |
| # def word_to_num(word): | |
| # word_map = {'single': 1, 'one': 1, 'two': 2, 'three': 3, 'four': 4, 'five': 5, 'six': 6, 'seven': 7, 'eight': 8, 'nine': 9, 'ten': 10, 'dozen': 12} | |
| # return word_map.get(word.lower(), None) | |
| # ========================================================= | |
| # ROUTING & ORCHESTRATION | |
| # ========================================================= | |
| print("\nInitializing Semantic Router...") | |
| router_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| faq_intents = { | |
| # ββ SCHEMA META ββ | |
| "LIST_DERBY_TABLES": { | |
| "anchors": [ | |
| "What tables are in the database?", "Show me the Derby database schema.", | |
| "List all the tables you have.", "What configuration tables exist?", | |
| "Show me the metadata tables.", "what tables do you have", | |
| "show me your schema", "what data do you store", "list tables", | |
| "what's in the derby database", "database structure" | |
| ], | |
| "clarification": "Did you mean to list all configuration and metadata tables in the Derby system?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT name FROM sqlite_master WHERE type='table';" | |
| }, | |
| "LIST_INFLUX_TABLES": { | |
| "anchors": [ | |
| "What tables are in InfluxDB?", "Show me the time series measurements.", | |
| "Where are the anomalies stored?", "What does the Influx database contain?", | |
| "what metrics do you store", "show influx schema", "what time series data exists", | |
| "where is performance data stored", "influx measurements" | |
| ], | |
| "clarification": "Did you mean to list the time-series metric tables in InfluxDB?", | |
| "target_db": "influx_system", | |
| "sql": "SELECT name FROM sqlite_master WHERE type='table';" | |
| }, | |
| # ββ TARGETS ββ | |
| "LIST_TARGETS": { | |
| "anchors": [ | |
| "show me all targets", "list my targets", "what targets do I have", | |
| "how many targets", "all my databases", "what am I monitoring", | |
| "list all monitored databases", "show targets", "get all targets", | |
| "what databases are being monitored", "give me the target list", | |
| "show me everything you monitor" | |
| ], | |
| "clarification": "Did you mean to list all monitored database targets?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT name, hlc_database_type, db_target_status FROM target;" | |
| }, | |
| # "TARGET_STATUS": { | |
| # "anchors": [ | |
| # "which targets are running", "show me stopped targets", "any targets with errors", | |
| # "what targets are down", "target health", "are all my targets up", | |
| # "which databases are failing", "show target status", "what is the status of my targets", | |
| # "which targets have errors", "any targets offline", "targets not running", | |
| # "check target health", "what targets are active", "database status overview", | |
| # "which targets are stopped", "show me failing targets", "target availability" | |
| # ], | |
| # "clarification": "Did you mean to check the current status of all monitored targets?", | |
| # "target_db": "derby_system", | |
| # "sql": "SELECT name, db_target_status, hlc_database_type FROM target;" | |
| # }, | |
| # "DEMO_TARGETS": { | |
| # "anchors": [ | |
| # "show demo targets", "which targets are demo", "list demo databases", | |
| # "what are the sample targets", "show me demo data", "which ones are test targets", | |
| # "list sample databases", "show demo instances" | |
| # ], | |
| # "clarification": "Did you mean to list demo or sample targets?", | |
| # "target_db": "derby_system", | |
| # "sql": "SELECT name, hlc_database_type, db_target_status FROM target WHERE is_demo_target = 'true';" | |
| # }, | |
| "LIST_SUPPORTED_DATABASES": { | |
| "anchors": [ | |
| "What are the different databases you have?", "Which database engines are supported?", | |
| "List the target database types.", "Do you support Oracle and MySQL?", | |
| "what database types do you monitor", "which engines are monitored", | |
| "do you support postgres", "what kind of databases", "supported db types", | |
| "show me database vendors", "which database flavors" | |
| ], | |
| "clarification": "Did you mean to ask what database engine types are currently monitored?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT DISTINCT hlc_database_type FROM target WHERE hlc_database_type IS NOT NULL;" | |
| }, | |
| # "CLOUD_TARGETS": { | |
| # "anchors": [ | |
| # "What cloud platforms do you monitor?", "Are there any targets on AWS or Azure?", | |
| # "Show me the cloud hosting providers.", "Which targets are on the cloud?", | |
| # "list cloud targets", "show cloud databases", "which targets are cloud hosted", | |
| # "what is on AWS", "show me azure targets", "cloud based databases", | |
| # "which databases are on GCP", "cloud target list" | |
| # ], | |
| # "clarification": "Did you mean to list cloud-hosted database targets?", | |
| # "target_db": "derby_system", | |
| # "sql": "SELECT name, cloud_region, hlc_database_type FROM target WHERE is_on_cloud = 'true';" | |
| # }, | |
| # "TARGET_PLATFORM": { | |
| # "anchors": [ | |
| # "what platform are targets on", "show windows targets", "list linux targets", | |
| # "which targets run on windows", "which targets are linux", "target operating systems", | |
| # "show me targets by platform", "what os are the targets" | |
| # ], | |
| # "clarification": "Did you mean to see which OS platform each target runs on?", | |
| # "target_db": "derby_system", | |
| # "sql": "SELECT name, hlc_server_platform, hlc_server_type FROM target;" | |
| # }, | |
| # "TARGET_JAVA_COLLECTORS": { | |
| # "anchors": [ | |
| # "which targets use java collector", "show java based targets", | |
| # "list java collector targets", "targets with java agent" | |
| # ], | |
| # "clarification": "Did you mean to list targets that use a Java-based data collector?", | |
| # "target_db": "derby_system", | |
| # "sql": "SELECT name, hlc_database_type FROM target WHERE is_java_collector = 'true';" | |
| # }, | |
| "TARGET_COST": { | |
| "anchors": [ | |
| "how much do targets cost", "show target credit costs", "what is the monitoring cost", | |
| "show me credit usage per target", "target billing", "how much is each target costing" | |
| ], | |
| "clarification": "Did you mean to view the credit cost assigned to each monitored target?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT name, hlc_credit_cost, cloud_region FROM target WHERE hlc_credit_cost > 0;" | |
| }, | |
| # ββ CLOUD PRICING ββ | |
| "CLOUD_PRICING_INFO": { | |
| "anchors": [ | |
| "How does cloud pricing work?", "Show me the cost of cloud databases.", | |
| "What are the pricing lookup codes?", "Show me billing and credit costs for AWS.", | |
| "cloud pricing table", "show pricing info", "what does cloud monitoring cost", | |
| "AWS pricing", "Azure pricing", "GCP pricing", "credit cost table", | |
| "show me the pricing chart", "cloud cost breakdown" | |
| ], | |
| "clarification": "Did you mean to view the cloud database pricing chart?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT DISTINCT cloud_database_provider, region, credit_cost FROM cloud_database_pricing;" | |
| }, | |
| "CLOUD_PRICING_BY_PROVIDER": { | |
| "anchors": [ | |
| "show AWS prices", "what does Azure cost", "GCP credit cost", | |
| "pricing for Amazon RDS", "show me costs by cloud provider" | |
| ], | |
| "clarification": "Did you mean to see pricing broken down by cloud provider?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT cloud_database_provider, region, credit_cost, lookup_code FROM cloud_database_pricing ORDER BY cloud_database_provider;" | |
| }, | |
| # ββ COLLECTORS (DBACTDC) ββ | |
| "COLLECTOR_INFO": { | |
| "anchors": [ | |
| "What are collector agents?", "Show me the dbactdc instances.", | |
| "List all the monitoring guards.", "Where are the collectors installed?", | |
| "What are the SSH hosts for the collectors?", "show me collectors", | |
| "list my collectors", "which collectors are running", "are my collectors active", | |
| "collector status", "dbactdc status", "is the collector up", | |
| "show collector agents", "list monitoring agents", "show dbactdc", | |
| "get collector list", "collector health" | |
| ], | |
| "clarification": "Did you mean to list the active collector agents (DBACTDC instances)?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT name, status, platform, ssh_host FROM dbactdc_instance;" | |
| }, | |
| # "COLLECTOR_STATUS": { | |
| # "anchors": [ | |
| # "which collectors are inactive", "show stopped collectors", | |
| # "any collectors with errors", "collector availability", | |
| # "are all collectors up", "which collectors are down", | |
| # "show me failed collectors", "active collectors only" | |
| # ], | |
| # "clarification": "Did you mean to check which collectors are currently active or inactive?", | |
| # "target_db": "derby_system", | |
| # "sql": "SELECT name, status, platform, ssh_host FROM dbactdc_instance;" | |
| # }, | |
| "COLLECTOR_PLATFORM": { | |
| "anchors": [ | |
| "which collectors are on windows", "linux collectors", "show collector platforms", | |
| "what OS are collectors on", "remote collectors", "local collectors" | |
| ], | |
| "clarification": "Did you mean to see what platform each collector runs on?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT name, platform, is_remote, ssh_host FROM dbactdc_instance;" | |
| }, | |
| # ββ INTEGRATIONS ββ | |
| "EXTERNAL_INTEGRATIONS": { | |
| "anchors": [ | |
| "What external integrations are supported?", "Do you connect to Datadog or AppDynamics?", | |
| "Show me the third party tools.", "What alerts can be sent to New Relic?", | |
| "show integrations", "list integrations", "what tools are connected", | |
| "show third party connections", "what monitoring tools are linked", | |
| "do you integrate with anything", "show me all integrations", | |
| "which integrations are enabled", "external connections" | |
| ], | |
| "clarification": "Did you mean to view the configured external integrations?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT integration_name, title, enabled FROM integration;" | |
| }, | |
| # "ACTIVE_INTEGRATIONS": { | |
| # "anchors": [ | |
| # "which integrations are active", "show enabled integrations", | |
| # "what tools are currently active", "active third party connections", | |
| # "which integrations are turned on" | |
| # ], | |
| # "clarification": "Did you mean to list only the currently enabled integrations?", | |
| # "target_db": "derby_system", | |
| # "sql": "SELECT integration_name, title FROM integration WHERE enabled = 'true';" | |
| # }, | |
| # ββ REPORTS ββ | |
| "REPORTING_CAPABILITIES": { | |
| "anchors": [ | |
| "What types of reports exist?", "Show me the report scheduling options.", | |
| "Can I get PDF emails?", "List the automated reports.", | |
| "show me reports", "list reports", "what reports do I have", | |
| "show scheduled reports", "report list", "what is being reported", | |
| "show me all reports", "get report list", "reporting schedule" | |
| ], | |
| "clarification": "Did you mean to ask about the automated report schedules and formats?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT name, title, scheduler_params_period, is_email_pdf_report_enabled FROM report;" | |
| }, | |
| # "SCHEDULED_REPORTS": { | |
| # "anchors": [ | |
| # "which reports run automatically", "show daily reports", "show weekly reports", | |
| # "show monthly reports", "scheduled report list", "reports that run on schedule", | |
| # "what reports are automated", "show hourly reports" | |
| # ], | |
| # "clarification": "Did you mean to see which reports run on an automated schedule?", | |
| # "target_db": "derby_system", | |
| # "sql": "SELECT name, title, scheduler_params_period, scheduler_params_day_of_week FROM report WHERE is_scheduled = 'true';" | |
| # }, | |
| "PDF_REPORTS": { | |
| "anchors": [ | |
| "which reports send PDF emails", "show PDF report list", | |
| "what reports email PDFs", "PDF email reports" | |
| ], | |
| "clarification": "Did you mean to list reports that send PDF emails?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT name, title FROM report WHERE is_email_pdf_report_enabled = 'true';" | |
| }, | |
| # ββ TEMPLATES & DASHBOARDS ββ | |
| "DASHBOARD_TEMPLATES": { | |
| "anchors": [ | |
| "What dashboard templates do you have?", "Show me the UI dashlets.", | |
| "What is a FinOps template?", "How are charts configured?", | |
| "show templates", "list templates", "what templates exist", | |
| "show me dashboards", "chart templates", "list dashboards", | |
| "what dashboards are available", "show all templates", | |
| "show me FinOps templates", "show dynamic templates" | |
| ], | |
| "clarification": "Did you mean to list the available dashboard and chart templates?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT name, template_type, is_dynamic FROM template;" | |
| }, | |
| # "FINOPS_TEMPLATES": { | |
| # "anchors": [ | |
| # "show finops templates", "list cost templates", "FinOps dashboards", | |
| # "which templates are for cost", "cloud cost templates", "finance templates" | |
| # ], | |
| # "clarification": "Did you mean to list FinOps cost-analysis templates?", | |
| # "target_db": "derby_system", | |
| # "sql": "SELECT name, template_type, finance_targets_count FROM template WHERE template_type = 'FinOps';" | |
| # }, | |
| "DYNAMIC_TEMPLATES": { | |
| "anchors": [ | |
| "which templates are dynamic", "show dynamic dashboards", | |
| "list adaptive templates", "templates that auto update" | |
| ], | |
| "clarification": "Did you mean to list templates that dynamically adapt to available metrics?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT name, template_type FROM template WHERE is_dynamic = 'true';" | |
| }, | |
| "SYSTEM_DEFAULT_TEMPLATES": { | |
| "anchors": [ | |
| "which templates are system defaults", "show default templates", | |
| "built in templates", "out of the box templates" | |
| ], | |
| "clarification": "Did you mean to list system-provided default templates?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT name, template_type FROM template WHERE is_system_default = 'true';" | |
| }, | |
| # ββ PERFORMANCE SCHEMA / SQL DIGESTS ββ | |
| "PERFORMANCE_SCHEMA_INFO": { | |
| "anchors": [ | |
| "Where are the execution plans stored?", "How do you track SQL digests?", | |
| "Show me how SQL text is mapped.", "What is the performance schema?", | |
| "show sql digests", "list recent sql queries", "show me captured sql", | |
| "what sql has been captured", "show performance schema", | |
| "show digest table", "list sql digests", "recent query digests" | |
| ], | |
| "clarification": "Did you mean to ask how SQL queries and execution plans are stored?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT digest, substr(digest_text, 1, 80) as sql_preview FROM performance_schema LIMIT 10;" | |
| }, | |
| "SQL_PLANS": { | |
| "anchors": [ | |
| "show sql execution plans", "list query plans", "show me query plans", | |
| "what execution plans are stored", "show all plans", "sql plan table" | |
| ], | |
| "clarification": "Did you mean to view stored SQL execution plans?", | |
| "target_db": "derby_system", | |
| "sql": "SELECT id, substr(sql_plain_text_plan, 1, 100) as plan_preview FROM sql_plan LIMIT 10;" | |
| }, | |
| # ββ ANOMALIES / INFLUX ββ | |
| "RECENT_ANOMALIES": { | |
| "anchors": [ | |
| "show recent anomalies", "what anomalies were detected", "list alerts", | |
| "show me spikes", "any bottlenecks detected", "what performance issues exist", | |
| "show me recent alerts", "latest anomalies", "current performance problems", | |
| "what is wrong with my databases", "show performance issues", | |
| "any current problems", "show spikes", "performance alerts", | |
| "what bottlenecks exist", "show me the worst issues" | |
| ], | |
| "clarification": "Did you mean to view recent performance anomalies and alerts?", | |
| "target_db": "influx_system", | |
| "sql": "SELECT target, metric, data_value_double, time FROM sys_target_alerts WHERE is_demo_alert != 'true' ORDER BY data_value_double DESC LIMIT 20;" | |
| }, | |
| # "ANOMALIES_BY_TARGET": { | |
| # "anchors": [ | |
| # "which target has the most anomalies", "show anomalies per target", | |
| # "target anomaly summary", "which database has the most issues", | |
| # "anomaly count by target", "show me problems by target" | |
| # ], | |
| # "clarification": "Did you mean to see anomaly counts grouped by target?", | |
| # "target_db": "influx_system", | |
| # "sql": "SELECT target, count(*) FROM sys_target_alerts WHERE is_demo_alert != 'true' GROUP BY target ORDER BY count(*) DESC;" | |
| # }, | |
| # ββ OUT OF SCOPE / REJECT ββ | |
| "OUT_OF_SCOPE": { | |
| "anchors": [ | |
| "what is the weather today", "tell me a joke", "who won the football game", | |
| "write me a poem", "what is 2 plus 2", "how do I cook pasta", | |
| "what is the capital of France", "who is the president", | |
| "recommend a movie", "what time is it", "tell me something funny", | |
| "what is the meaning of life", "help me write an email", | |
| "translate this to spanish", "what stocks should I buy" | |
| ], | |
| "clarification": "I can only answer questions about your monitored database targets, performance metrics, collectors, reports, and SQL analysis. Could you rephrase?", | |
| "target_db": None, | |
| "sql": None | |
| } | |
| } | |
| # --------------------------------------------------------- | |
| # TARGET DIALECT ROUTING & EXTRACTION | |
| # --------------------------------------------------------- | |
| TARGET_DIALECTS = { | |
| "SHOW_TABLES": { | |
| "mysql": "SELECT table_name FROM information_schema.tables WHERE table_schema = DATABASE();", | |
| "postgres": "SELECT tablename FROM pg_tables WHERE schemaname NOT IN ('pg_catalog', 'information_schema');", | |
| "oracle": "SELECT table_name FROM all_tables;", | |
| "sqlserver": "SELECT name AS table_name FROM sys.tables;", | |
| "snowflake": "SHOW TABLES;", | |
| "mongodb": "db.getCollectionNames();", | |
| "redshift": "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname NOT IN ('pg_catalog', 'information_schema');", | |
| "db2": "SELECT tabname FROM syscat.tables WHERE tabschema NOT LIKE 'SYS%';", | |
| "cassandra": "SELECT table_name FROM system_schema.tables;" | |
| }, | |
| "SHOW_INDEXES": { | |
| "mysql": "SELECT index_name, table_name FROM information_schema.statistics;", | |
| "postgres": "SELECT indexname, tablename FROM pg_indexes;", | |
| "oracle": "SELECT index_name, table_name FROM all_indexes;", | |
| "snowflake": "SHOW INDEXES;", | |
| "sqlserver": "SELECT name AS index_name FROM sys.indexes;" | |
| } | |
| } | |
| dba_intents = { | |
| "TARGET_SHOW_TABLES": { | |
| "anchors": ["show me the tables for", "list tables in target", "what tables exist in", "show all tables"], | |
| "intent_key": "SHOW_TABLES" | |
| }, | |
| "TARGET_SHOW_INDEXES": { | |
| "anchors": ["show indexes for", "list the indexes in", "what indexes are on", "show index statistics"], | |
| "intent_key": "SHOW_INDEXES" | |
| } | |
| } | |
| def normalize_db_type(raw_type): | |
| """Maps Derby's hlc_database_type to our dictionary keys.""" | |
| if not raw_type: return None | |
| raw = raw_type.lower() | |
| if 'mysql' in raw: return 'mysql' | |
| if 'postgres' in raw or 'psql' in raw: return 'postgres' | |
| if 'oracle' in raw: return 'oracle' | |
| if 'sqlserver' in raw or 'mssql' in raw: return 'sqlserver' | |
| if 'snowflake' in raw: return 'snowflake' | |
| if 'mongo' in raw: return 'mongodb' | |
| if 'cassandra' in raw: return 'cassandra' | |
| if 'redshift' in raw: return 'redshift' | |
| if 'db2' in raw: return 'db2' | |
| return raw | |
| def extract_target_context(question, derby_db_path="./spider_data/database/derby_system/derby_system.sqlite"): | |
| try: | |
| conn = sqlite3.connect(derby_db_path) | |
| # Fetching hlc_database_type (engine), not hlc_server_type (cloud/on-prem) | |
| targets = conn.execute("SELECT name, hlc_database_type FROM target WHERE name IS NOT NULL").fetchall() | |
| conn.close() | |
| except Exception as e: | |
| print(f"[WARNING] Could not connect to Derby to resolve targets: {e}") | |
| return None, None | |
| q_lower = question.lower() | |
| quoted_matches = re.findall(r"['\"]([^'\"]+)['\"]", question) | |
| for q_match in quoted_matches: | |
| for t_name, t_type in targets: | |
| if q_match.lower() == t_name.lower(): | |
| return t_name, normalize_db_type(t_type) | |
| for t_name, t_type in targets: | |
| if re.search(rf'\b{re.escape(t_name.lower())}\b', q_lower): | |
| return t_name, normalize_db_type(t_type) | |
| return None, None | |
| # ---------------------------------------------------------# --------------------------------------------------------- | |
| # anchor_texts = [] | |
| # anchor_intents = [] | |
| # for intent_id, data in faq_intents.items(): | |
| # for anchor in data["anchors"]: | |
| # anchor_texts.append(anchor) | |
| # anchor_intents.append(intent_id) | |
| # anchor_embeddings = router_model.encode(anchor_texts, convert_to_tensor=True) | |
| anchor_texts = [] | |
| anchor_intents = [] | |
| # Load standard FAQ intents | |
| for intent_id, data in faq_intents.items(): | |
| for anchor in data["anchors"]: | |
| anchor_texts.append(anchor) | |
| anchor_intents.append(intent_id) | |
| # NEW: Load the DBA target intents | |
| for intent_id, data in dba_intents.items(): | |
| for anchor in data["anchors"]: | |
| anchor_texts.append(anchor) | |
| anchor_intents.append(intent_id) | |
| anchor_embeddings = router_model.encode(anchor_texts, convert_to_tensor=True) | |
| # def semantic_metadata_router(user_question, embedding_model, anchor_embeddings, | |
| # threshold_high=0.85, threshold_mid=0.75): | |
| # q_emb = embedding_model.encode(user_question, convert_to_tensor=True) | |
| # cosine_scores = util.cos_sim(q_emb, anchor_embeddings)[0] | |
| # best_score_val, best_idx = torch.max(cosine_scores, dim=0) | |
| # best_score = best_score_val.item() | |
| # winning_intent_id = anchor_intents[best_idx] | |
| # intent_data = faq_intents[winning_intent_id] | |
| # print(f" [ROUTER] Intent={winning_intent_id} Score={best_score:.2f}") | |
| # # Hard reject nonsense queries | |
| # if winning_intent_id == "OUT_OF_SCOPE" and best_score >= threshold_mid: | |
| # return {"status": "REJECT", "message": intent_data["clarification"], | |
| # "sql": None, "target_db": None} | |
| # if best_score >= threshold_high: | |
| # return {"status": "MATCH", "sql": intent_data["sql"], | |
| # "target_db": intent_data["target_db"], "message": "Direct Match"} | |
| # elif best_score >= threshold_mid: | |
| # return {"status": "CLARIFY", "message": intent_data["clarification"], | |
| # "sql": intent_data["sql"], "target_db": intent_data["target_db"]} | |
| # else: | |
| # return {"status": "PASS", "sql": None, "target_db": None, "message": None} | |
| def semantic_metadata_router(user_question, embedding_model, anchor_embeddings, | |
| threshold_high=0.85, threshold_mid=0.75): | |
| q_emb = embedding_model.encode(user_question, convert_to_tensor=True) | |
| cosine_scores = util.cos_sim(q_emb, anchor_embeddings)[0] | |
| best_score_val, best_idx = torch.max(cosine_scores, dim=0) | |
| best_score = best_score_val.item() | |
| winning_intent_id = anchor_intents[best_idx] | |
| print(f" [ROUTER] Intent={winning_intent_id} Score={best_score:.2f}") | |
| # --- FIX: Prevent KeyError --- | |
| if winning_intent_id not in faq_intents: | |
| return {"status": "PASS", "sql": None, "target_db": None, "message": None} | |
| intent_data = faq_intents[winning_intent_id] | |
| if winning_intent_id == "OUT_OF_SCOPE" and best_score >= threshold_mid: | |
| return {"status": "REJECT", "message": intent_data["clarification"], | |
| "sql": None, "target_db": None} | |
| if best_score >= threshold_high: | |
| return {"status": "MATCH", "sql": intent_data["sql"], | |
| "target_db": intent_data["target_db"], "message": "Direct Match"} | |
| elif best_score >= threshold_mid: | |
| return {"status": "CLARIFY", "message": intent_data["clarification"], | |
| "sql": intent_data["sql"], "target_db": intent_data["target_db"]} | |
| else: | |
| return {"status": "PASS", "sql": None, "target_db": None, "message": None} | |
| def route_query(question): | |
| q_lower = question.lower() | |
| if any(k in q_lower for k in ['sql query text', 'actual sql', 'execution plan']): | |
| return 'derby_system' | |
| influx_triggers = [ | |
| 'anomaly', 'anomalies', 'spike', 'bottleneck', 'bottlenecks', | |
| 'alert', 'alerts', 'time series', 'worst performing', | |
| 'top sql issue', 'top sql issues', 'sql problem', 'sql problems', | |
| 'capture source', 'capture method', 'performance issue', | |
| 'performance bottleneck', 'detected for' | |
| ] | |
| if any(trigger in q_lower for trigger in influx_triggers): | |
| return 'influx_system' | |
| return 'derby_system' | |
| def word_to_num(word): | |
| word_map = { | |
| 'single': 1, 'one': 1, 'two': 2, 'three': 3, 'four': 4, | |
| 'five': 5, 'six': 6, 'seven': 7, 'eight': 8, 'nine': 9, | |
| 'ten': 10, 'dozen': 12 | |
| } | |
| return word_map.get(word.lower(), None) | |
| import json | |
| import sqlite3 | |
| import re | |
| def format_results_as_md_table(cursor, results): | |
| if not results: | |
| return "_No data found for this query._" | |
| col_names = [desc[0] for desc in cursor.description] | |
| if len(results) == 1 and len(col_names) > 5: | |
| detail_view = "### π Record Detail View\n\n| Property | Value |\n| :--- | :--- |\n" | |
| row = results[0] | |
| for col, val in zip(col_names, row): | |
| detail_view += f"| **{col}** | {str(val).replace('|', '|')} |\n" | |
| return detail_view | |
| header = "| " + " | ".join(col_names) + " |" | |
| separator = "|" + "|".join(["---" for _ in col_names]) + "|" | |
| rows = ["| " + " | ".join([str(i).replace('|', '|') for i in r]) + " |" for r in results] | |
| return f'<div style="overflow-x: auto;">\n\n' + "\n".join([header, separator] + rows) + "\n\n</div>" | |
| def run_multi_step_workflow(question, engine_derby, engine_influx, debug=True): | |
| # ========================================================================= | |
| # INTENT 0: TARGET DATABASE QUERY INTERCEPTOR | |
| # ========================================================================= | |
| target_name, db_type = extract_target_context(question) | |
| if target_name and db_type: | |
| if debug: print(f"[ORCHESTRATOR] π― Target Detected: {target_name} (Engine: {db_type})") | |
| # Check what the user wants to do with this target | |
| q_emb = router_model.encode(question, convert_to_tensor=True) | |
| scores = util.cos_sim(q_emb, anchor_embeddings)[0] | |
| best_idx = int(torch.argmax(scores).item()) | |
| best_intent = anchor_intents[best_idx] | |
| if "TARGET_" in best_intent and float(scores[best_idx]) > 0.60: | |
| intent_action = dba_intents[best_intent]["intent_key"] | |
| # Fetch the SQL for this specific engine | |
| dialect_sql = TARGET_DIALECTS.get(intent_action, {}).get(db_type) | |
| if dialect_sql: | |
| exec_plan = ( | |
| f"### π οΈ Generated Execution Plan\n" | |
| f"* **Target System:** `{target_name}`\n" | |
| f"* **Engine Detected:** `{db_type.upper()}`\n" | |
| f"* **Action:** `{intent_action}`\n" | |
| f" ```sql\n {dialect_sql}\n ```\n" | |
| ) | |
| return f"{exec_plan}\n\n*Note: This query is ready to be dispatched to your secure execution pipeline. Local execution skipped.*" | |
| else: | |
| return f"β οΈ Detected target `{target_name}` as `{db_type.upper()}`, but I don't have the dialect mapping for `{intent_action}` on this engine yet." | |
| # ========================================================================= | |
| # INTENT 1: SEMANTIC ROUTER (FAQ & Metadata Queries) | |
| # ========================================================================= | |
| router_result = semantic_metadata_router(question, router_model, anchor_embeddings) | |
| if router_result["status"] in ["MATCH", "CLARIFY"]: | |
| sql_query = router_result["sql"] | |
| target_db = router_result.get("target_db", "derby_system") | |
| # Record the execution plan | |
| exec_plan = f"### π οΈ Generated Execution Plan\n* **Step 1** (Target: `{target_db}`):\n ```sql\n {sql_query}\n ```\n" | |
| db_path = f"./spider_data/database/{target_db}/{target_db}.sqlite" | |
| conn = sqlite3.connect(db_path) | |
| try: | |
| cursor = conn.cursor() | |
| cursor.execute(sql_query) | |
| results = cursor.fetchall() | |
| if "sqlite_master" in sql_query: | |
| formatted_results = "\n".join([f"- **{row[0]}**" for row in results if row[0] != 'sqlite_sequence']) | |
| else: | |
| formatted_results = format_results_as_md_table(cursor, results) | |
| except Exception as e: | |
| formatted_results = f"π¨ **SQL EXECUTION ERROR:**\n```\n{e}\n```" | |
| finally: | |
| conn.close() | |
| final_output = f"{exec_plan}\n### π Execution Results:\n{formatted_results}" | |
| if router_result["status"] == "CLARIFY": | |
| return f"π€ **SYSTEM QUESTION:**\n{router_result['message']}\n*(Assuming YES...)*\n\n{final_output}" | |
| return final_output | |
| # ========================================================================= | |
| # INTENT 2: MULTI-STEP PERFORMANCE ANALYSIS (The JSON Builder) | |
| # ========================================================================= | |
| q_lower = question.lower() | |
| needs_sql_text = bool(re.search(r'\b(sql quer(?:y|ies)|sql texts?|actual sql|actual quer(?:y|ies)|execution plans?|sql plans?|quer(?:y|ies) (?:behind|causing|responsible|running)|what sql|what quer(?:y|ies)|show.{0,20}quer(?:y|ies)|look like|what do they look)\b', q_lower)) | |
| needs_anomalies = bool(re.search(r'\b(worst|bottlenecks?|anomal(?:y|ies)|spikes?|issues?|problems?|most anomalous|performing|severe)\b', q_lower)) | |
| if needs_sql_text and needs_anomalies: | |
| if debug: print(f"[ORCHESTRATOR] β‘ Multi-Step Query Detected!") | |
| execution_steps = [] # Track queries for the user | |
| # --- 1. DETERMINE LIMIT & GET ANOMALIES FROM INFLUX --- | |
| influx_limit = 5 | |
| digit_match = re.search(r'\b(?:top|worst|highest|bottom|least)\s+(\d+)\b', q_lower) | |
| word_match = re.search(r'\b(?:top|worst|highest|bottom|least)\s+(one|two|three|four|five|six|seven|eight|nine|ten)\b', q_lower) | |
| if digit_match: influx_limit = int(digit_match.group(1)) | |
| elif word_match: | |
| parsed_num = word_to_num(word_match.group(1)) | |
| if parsed_num: influx_limit = parsed_num | |
| elif any(w in q_lower for w in ['single', 'most anomalous', 'worst target', 'worst query']): | |
| influx_limit = 1 | |
| influx_prompt = f"Find the top {influx_limit} worst performing queries." | |
| influx_sql = generate_sql(influx_prompt, engine_influx, debug=debug) | |
| execution_steps.append(f"* **Step 1** (Target: `influx_system`):\n ```sql\n {influx_sql}\n ```") | |
| conn_in = sqlite3.connect("./spider_data/database/influx_system/influx_system.sqlite") | |
| try: | |
| influx_results = conn_in.execute(influx_sql).fetchall() | |
| except Exception as e: | |
| return f"π¨ InfluxDB Execution Error: {e}" | |
| finally: | |
| conn_in.close() | |
| digests = [row[1] for row in influx_results if row[1]] | |
| if not digests: | |
| return "\n".join(execution_steps) + "\n\n**Result:** No performance bottlenecks found in InfluxDB." | |
| digest_list_str = ", ".join([f"'{d}'" for d in digests]) | |
| # --- 2. GET SQL TEXT & PLAN IDs FROM DERBY --- | |
| derby_text_sql = f"SELECT digest, digest_text, sql_plan_id FROM performance_schema WHERE digest IN ({digest_list_str})" | |
| execution_steps.append(f"* **Step 2** (Target: `derby_system`):\n *(Extracts digests from Step 1)*\n ```sql\n {derby_text_sql}\n ```") | |
| conn_derby = sqlite3.connect("./spider_data/database/derby_system/derby_system.sqlite") | |
| try: | |
| derby_text_results = conn_derby.execute(derby_text_sql).fetchall() | |
| except Exception as e: | |
| conn_derby.close() | |
| return f"π¨ Derby Query Error: {e}" | |
| sql_metadata = {} | |
| plan_ids_to_fetch = set() | |
| for row in derby_text_results: | |
| d_id, d_text, p_id = row[0], row[1], row[2] | |
| sql_metadata[d_id] = {"text": d_text, "plan_id": p_id} | |
| if p_id is not None: | |
| plan_ids_to_fetch.add(str(p_id)) | |
| # --- 3. GET EXECUTION PLANS FROM DERBY --- | |
| plan_metadata = {} | |
| if plan_ids_to_fetch: | |
| plan_id_str = ", ".join(plan_ids_to_fetch) | |
| derby_plan_sql = f"SELECT id, sql_plan, sql_plain_text_plan FROM sql_plan WHERE id IN ({plan_id_str})" | |
| execution_steps.append(f"* **Step 3** (Target: `derby_system`):\n *(Extracts plan IDs from Step 2)*\n ```sql\n {derby_plan_sql}\n ```") | |
| try: | |
| derby_plan_results = conn_derby.execute(derby_plan_sql).fetchall() | |
| for row in derby_plan_results: | |
| p_id, json_plan, plain_plan = row[0], row[1], row[2] | |
| plan_metadata[p_id] = json_plan if json_plan else plain_plan | |
| except Exception as e: | |
| if debug: print(f"[WARNING] Failed to fetch execution plans: {e}") | |
| conn_derby.close() | |
| # --- 4. PACKAGE DATA --- | |
| llm_payload = [] | |
| for row in influx_results: | |
| target = row[0] | |
| metric_digest = row[1] | |
| severity = row[2] | |
| timestamp = row[3] | |
| meta = sql_metadata.get(metric_digest, {}) | |
| actual_sql = meta.get("text", "-- Metadata Not Found in Derby --") | |
| plan_id = meta.get("plan_id") | |
| execution_plan = "-- No Plan Linked --" | |
| if plan_id and str(plan_id) in plan_metadata: | |
| execution_plan = plan_metadata[str(plan_id)] | |
| llm_payload.append({ | |
| "target_server": target, | |
| "digest": metric_digest, | |
| "anomaly_score": severity, | |
| "timestamp": timestamp, | |
| "sql_query": actual_sql, | |
| "execution_plan": execution_plan | |
| }) | |
| formatted_payload = json.dumps(llm_payload, indent=2) | |
| exec_plan_md = "### π οΈ Generated Execution Plan\n" + "\n".join(execution_steps) | |
| return f"{exec_plan_md}\n\n### π¦ Final JSON Payload (For LLM)\n```json\n{formatted_payload}\n```" | |
| # ========================================================================= | |
| # INTENT 3: STANDARD DB QUERY | |
| # ========================================================================= | |
| else: | |
| if debug: print(f"[ORCHESTRATOR] β‘οΈ Single-DB Query Detected.") | |
| target_db = route_query(question) | |
| engine = engine_derby if target_db == 'derby_system' else engine_influx | |
| sql_query = generate_sql(question, engine, debug=debug) | |
| # Record the execution plan | |
| exec_plan = f"### π οΈ Generated Execution Plan\n* **Step 1** (Target: `{target_db}`):\n ```sql\n {sql_query}\n ```\n" | |
| db_path = f"./spider_data/database/{target_db}/{target_db}.sqlite" | |
| conn = sqlite3.connect(db_path) | |
| try: | |
| cursor = conn.cursor() | |
| cursor.execute(sql_query) | |
| results = cursor.fetchall() | |
| formatted_results = format_results_as_md_table(cursor, results) | |
| except Exception as e: | |
| formatted_results = f"π¨ **SQL EXECUTION ERROR:**\n```\n{e}\n```" | |
| finally: | |
| conn.close() | |
| return f"{exec_plan}\n### π Execution Results:\n{formatted_results}" | |
| # def format_results_as_md_table(cursor, results): | |
| # if not results: | |
| # return "_No data found for this query._" | |
| # col_names = [desc[0] for desc in cursor.description] | |
| # # If there's only 1 row and many columns, show a "Detail View" instead of a table | |
| # if len(results) == 1 and len(col_names) > 5: | |
| # detail_view = "### π Record Detail View\n\n| Property | Value |\n| :--- | :--- |\n" | |
| # row = results[0] | |
| # for col, val in zip(col_names, row): | |
| # detail_view += f"| **{col}** | {str(val).replace('|', '|')} |\n" | |
| # return detail_view | |
| # # Otherwise, return the horizontal table with the scroll fix from Step 1... | |
| # header = "| " + " | ".join(col_names) + " |" | |
| # separator = "|" + "|".join(["---" for _ in col_names]) + "|" | |
| # rows = ["| " + " | ".join([str(i).replace('|', '|') for i in r]) + " |" for r in results] | |
| # return f'<div style="overflow-x: auto;">\n\n' + "\n".join([header, separator] + rows) + "\n\n</div>" | |
| # def run_multi_step_workflow(question, engine_derby, engine_influx, debug=True): | |
| # # Intent 1: Semantic Router | |
| # router_result = semantic_metadata_router(question, router_model, anchor_embeddings) | |
| # if router_result["status"] in ["MATCH", "CLARIFY"]: | |
| # sql_query = router_result["sql"] | |
| # target_db = router_result.get("target_db", "derby_system") | |
| # # PATH FIX 3 | |
| # db_path = f"./spider_data/database/{target_db}/{target_db}.sqlite" | |
| # conn = sqlite3.connect(db_path) | |
| # try: | |
| # cursor = conn.cursor() | |
| # cursor.execute(sql_query) | |
| # results = cursor.fetchall() | |
| # if "sqlite_master" in sql_query: | |
| # # Use bullet points for table listings | |
| # formatted_results = "\n".join([f"- **{row[0]}**" for row in results if row[0] != 'sqlite_sequence']) | |
| # else: | |
| # formatted_results = format_results_as_md_table(cursor, results) | |
| # except Exception as e: | |
| # formatted_results = f"π¨ **SQL EXECUTION ERROR:**\n```\n{e}\n```" | |
| # finally: | |
| # conn.close() | |
| # final_output = f"**GENERATED SQL (FAQ ROUTE):**\n```sql\n{sql_query}\n```\n\n**EXECUTION RESULTS:**\n{formatted_results}" | |
| # if router_result["status"] == "CLARIFY": | |
| # return f"π€ **SYSTEM QUESTION:**\n{router_result['message']}\n*(Assuming YES, executing query...)*\n\n{final_output}" | |
| # return final_output | |
| # # Intent 2: Multi-Step Performance Analysis | |
| # q_lower = question.lower() | |
| # needs_sql_text = bool(re.search(r'\b(sql quer(?:y|ies)|sql texts?|actual sql|actual quer(?:y|ies)|execution plans?|sql plans?|quer(?:y|ies) (?:behind|causing|responsible|running)|what sql|what quer(?:y|ies)|show.{0,20}quer(?:y|ies)|look like|what do they look)\b', q_lower)) | |
| # needs_anomalies = bool(re.search(r'\b(worst|bottlenecks?|anomal(?:y|ies)|spikes?|issues?|problems?|most anomalous|performing|severe)\b', q_lower)) | |
| # if needs_sql_text and needs_anomalies: | |
| # print(f"[ORCHESTRATOR] β‘ Multi-Step Query Detected!") | |
| # influx_limit = 5 | |
| # digit_match = re.search(r'\b(?:top|worst|highest|bottom|least)\s+(\d+)\b', q_lower) | |
| # word_match = re.search(r'\b(?:top|worst|highest|bottom|least)\s+(one|two|three|four|five|six|seven|eight|nine|ten)\b', q_lower) | |
| # if digit_match: | |
| # influx_limit = int(digit_match.group(1)) | |
| # elif word_match: | |
| # parsed_num = word_to_num(word_match.group(1)) | |
| # if parsed_num: influx_limit = parsed_num | |
| # elif any(w in q_lower for w in ['single', 'most anomalous', 'worst target', 'worst query']): | |
| # influx_limit = 1 | |
| # influx_prompt = f"Find the top {influx_limit} worst performing queries." | |
| # influx_sql = generate_sql(influx_prompt, engine_influx, debug=True) | |
| # # PATH FIX 4 | |
| # conn_in = sqlite3.connect("./spider_data/database/influx_system/influx_system.sqlite") | |
| # try: | |
| # influx_results = conn_in.execute(influx_sql).fetchall() | |
| # except Exception as e: | |
| # influx_results = [] | |
| # finally: | |
| # conn_in.close() | |
| # digests = [row[1] for row in influx_results] if influx_results else [] | |
| # if not digests: return "No performance bottlenecks found in InfluxDB." | |
| # digest_list_str = ", ".join([f"'{d}'" for d in digests]) | |
| # if 'plan' in q_lower: | |
| # derby_sql = f"SELECT T1.digest, T2.sql_plain_text_plan FROM performance_schema AS T1 JOIN sql_plan AS T2 ON T1.sql_plan_id = T2.id WHERE T1.digest IN ({digest_list_str})" | |
| # else: | |
| # derby_sql = f"SELECT digest, digest_text FROM performance_schema WHERE digest IN ({digest_list_str})" | |
| # # PATH FIX 5 | |
| # conn_derby = sqlite3.connect("./spider_data/database/derby_system/derby_system.sqlite") | |
| # try: | |
| # derby_results = conn_derby.execute(derby_sql).fetchall() | |
| # except Exception as e: | |
| # derby_results = [] | |
| # finally: | |
| # conn_derby.close() | |
| # sql_lookup = {row[0]: row[1] for row in derby_results} | |
| # report_type = "EXECUTION PLAN" if 'plan' in q_lower else "SQL TEXT" | |
| # report = ["\n---\n", f"### π¨ MULTI-SYSTEM CORRELATION REPORT: {report_type} π¨", "\n---\n"] | |
| # for row in influx_results: | |
| # target, metric_digest, severity, timestamp = row[0], row[1], row[2], row[3] | |
| # actual_sql = sql_lookup.get(metric_digest, "-- Metadata Not Found in Derby (Query may have aged out) --") | |
| # # Format the output cleanly using Markdown | |
| # report.append(f"**TARGET SERVER:** `{target}`\n**ALERT TIME:** `{timestamp}`\n**ANOMALY SCORE:** `{severity}` *(Digest: {metric_digest})*\n\n**{report_type}:**\n```sql\n{actual_sql}\n```\n---") | |
| # return "\n".join(report) | |
| # # Intent 3: Standard DB Query | |
| # else: | |
| # print(f"[ORCHESTRATOR] β‘οΈ Single-DB Query Detected.") | |
| # target_db = route_query(question) | |
| # engine = engine_derby if target_db == 'derby_system' else engine_influx | |
| # sql_query = generate_sql(question, engine, debug=debug) | |
| # # PATH FIX 6 | |
| # db_path = f"./spider_data/database/{target_db}/{target_db}.sqlite" | |
| # conn = sqlite3.connect(db_path) | |
| # try: | |
| # cursor = conn.cursor() | |
| # cursor.execute(sql_query) | |
| # results = cursor.fetchall() | |
| # formatted_results = format_results_as_md_table(cursor, results) | |
| # except Exception as e: | |
| # formatted_results = f"π¨ **SQL EXECUTION ERROR:**\n```\n{e}\n```" | |
| # finally: | |
| # conn.close() | |
| # return f"**GENERATED SQL:**\n```sql\n{sql_query}\n```\n\n**EXECUTION RESULTS:**\n{formatted_results}" |