Data-Science-Agent / src /utils /semantic_layer.py
Pulastya B
Fix: Better SBERT error handling + suppress invalid hand-off warnings
4a3a3e8
raw
history blame
15.2 kB
"""
Semantic Layer using SBERT for Column Understanding and Agent Routing
Provides semantic understanding of dataset columns and agent intent matching
using sentence-transformers embeddings.
"""
import numpy as np
from typing import Dict, Any, List, Optional, Tuple
import polars as pl
from pathlib import Path
import json
# SBERT for semantic embeddings
try:
from sentence_transformers import SentenceTransformer
import torch
SBERT_AVAILABLE = True
except ImportError:
SBERT_AVAILABLE = False
print("⚠️ sentence-transformers not available. Install with: pip install sentence-transformers")
# Sklearn for similarity
try:
from sklearn.metrics.pairwise import cosine_similarity
SKLEARN_AVAILABLE = True
except ImportError:
SKLEARN_AVAILABLE = False
class SemanticLayer:
"""
Semantic understanding layer using SBERT embeddings.
Features:
- Column semantic embedding (name + sample values + dtype)
- Semantic column matching (find similar columns)
- Agent intent routing (semantic task → agent mapping)
- Target column inference (semantic similarity to "target")
"""
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
"""
Initialize semantic layer with SBERT model.
Args:
model_name: Sentence-transformer model name
- all-MiniLM-L6-v2: Fast, 384 dims (recommended)
- all-mpnet-base-v2: Better quality, 768 dims, slower
- paraphrase-MiniLM-L6-v2: Good for short texts
"""
self.model_name = model_name
self.model = None
self.enabled = SBERT_AVAILABLE and SKLEARN_AVAILABLE
if self.enabled:
try:
print(f"🧠 Loading SBERT model: {model_name}...")
# Try loading with trust_remote_code for better compatibility
self.model = SentenceTransformer(model_name, trust_remote_code=True)
# Use GPU if available
if torch.cuda.is_available():
self.model = self.model.to('cuda')
print("✅ SBERT loaded on GPU")
else:
print("✅ SBERT loaded on CPU")
except Exception as e:
print(f"⚠️ Failed to load SBERT model: {e}")
print(f" Falling back to keyword-based routing (semantic features disabled)")
self.enabled = False
else:
print("⚠️ SBERT semantic layer disabled (missing dependencies)")
def encode_column(self, column_name: str, dtype: str,
sample_values: Optional[List[Any]] = None,
stats: Optional[Dict[str, Any]] = None) -> np.ndarray:
"""
Create semantic embedding for a column.
Combines column name, data type, sample values, and stats into
a text description that captures the column's semantic meaning.
Args:
column_name: Name of the column
dtype: Data type (Int64, Float64, Utf8, etc.)
sample_values: Sample values from the column
stats: Optional statistics (mean, min, max, etc.)
Returns:
Embedding vector (numpy array)
Example:
>>> encode_column("annual_salary", "Float64", [50000, 75000], {"mean": 65000})
>>> # Returns embedding for "annual_salary (Float64 numeric): values like 50000, 75000, mean 65000"
"""
if not self.enabled:
return np.zeros(384) # Dummy embedding
# Build semantic description
description_parts = [f"Column name: {column_name}"]
# Add type information
type_desc = self._interpret_dtype(dtype)
description_parts.append(f"Type: {type_desc}")
# Add sample values
if sample_values:
# Format samples nicely
samples_str = ", ".join([str(v)[:50] for v in sample_values[:5] if v is not None])
description_parts.append(f"Example values: {samples_str}")
# Add statistics
if stats:
if 'mean' in stats and stats['mean'] is not None:
description_parts.append(f"Mean: {stats['mean']:.2f}")
if 'unique_count' in stats and stats['unique_count'] is not None:
description_parts.append(f"Unique values: {stats['unique_count']}")
if 'null_percentage' in stats and stats['null_percentage'] is not None:
description_parts.append(f"Missing: {stats['null_percentage']:.1f}%")
# Combine into single text
text = ". ".join(description_parts)
# Generate embedding
try:
embedding = self.model.encode(text, convert_to_numpy=True, show_progress_bar=False)
return embedding
except Exception as e:
print(f"⚠️ Error encoding column {column_name}: {e}")
return np.zeros(self.model.get_sentence_embedding_dimension())
def _interpret_dtype(self, dtype: str) -> str:
"""Convert polars dtype to human-readable description."""
dtype_lower = str(dtype).lower()
if 'int' in dtype_lower or 'float' in dtype_lower:
return "numeric continuous or count data"
elif 'bool' in dtype_lower:
return "boolean flag"
elif 'utf8' in dtype_lower or 'str' in dtype_lower:
return "text or categorical label"
elif 'date' in dtype_lower or 'time' in dtype_lower:
return "temporal timestamp"
else:
return "data values"
def find_similar_columns(self, query_column: str, column_embeddings: Dict[str, np.ndarray],
top_k: int = 3, threshold: float = 0.6) -> List[Tuple[str, float]]:
"""
Find columns semantically similar to query column.
Use case: Detect duplicates or related columns
Example: "Salary" → finds ["Annual_Income", "Compensation", "Pay"]
Args:
query_column: Column name to search for
column_embeddings: Dict mapping column names to their embeddings
top_k: Number of similar columns to return
threshold: Minimum similarity score (0-1)
Returns:
List of (column_name, similarity_score) tuples
"""
if not self.enabled or query_column not in column_embeddings:
return []
query_emb = column_embeddings[query_column].reshape(1, -1)
similarities = []
for col_name, col_emb in column_embeddings.items():
if col_name == query_column:
continue
sim = cosine_similarity(query_emb, col_emb.reshape(1, -1))[0][0]
if sim >= threshold:
similarities.append((col_name, float(sim)))
# Sort by similarity descending
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:top_k]
def infer_target_column(self, column_embeddings: Dict[str, np.ndarray],
task_description: str) -> Optional[Tuple[str, float]]:
"""
Infer which column is likely the target/label for prediction.
Uses semantic similarity between column descriptions and task description.
Args:
column_embeddings: Dict mapping column names to embeddings
task_description: User's task description
Returns:
(column_name, confidence_score) or None
Example:
>>> infer_target_column(embeddings, "predict house prices")
>>> ("Price", 0.85) # High confidence "Price" is target
"""
if not self.enabled:
return None
# Encode task description
task_emb = self.model.encode(task_description, convert_to_numpy=True, show_progress_bar=False)
task_emb = task_emb.reshape(1, -1)
# Find column with highest similarity to task
best_col = None
best_score = 0.0
for col_name, col_emb in column_embeddings.items():
sim = cosine_similarity(task_emb, col_emb.reshape(1, -1))[0][0]
if sim > best_score:
best_score = sim
best_col = col_name
# Only return if confidence is reasonable
if best_score >= 0.4: # Threshold for target inference
return (best_col, float(best_score))
return None
def route_to_agent(self, task_description: str,
agent_descriptions: Dict[str, str]) -> Tuple[str, float]:
"""
Route task to appropriate specialist agent using semantic similarity.
Replaces keyword-based routing with semantic understanding.
Args:
task_description: User's task description
agent_descriptions: Dict mapping agent_key → agent description
Returns:
(agent_key, confidence_score)
Example:
>>> route_to_agent("build a predictive model", {
... "modeling_agent": "Expert in ML training and models",
... "viz_agent": "Expert in visualizations"
... })
>>> ("modeling_agent", 0.92)
"""
if not self.enabled:
# Fallback to first agent
return list(agent_descriptions.keys())[0], 0.5
# Encode task
task_emb = self.model.encode(task_description, convert_to_numpy=True, show_progress_bar=False)
task_emb = task_emb.reshape(1, -1)
# Encode agent descriptions
best_agent = None
best_score = 0.0
for agent_key, agent_desc in agent_descriptions.items():
agent_emb = self.model.encode(agent_desc, convert_to_numpy=True, show_progress_bar=False)
agent_emb = agent_emb.reshape(1, -1)
sim = cosine_similarity(task_emb, agent_emb)[0][0]
if sim > best_score:
best_score = sim
best_agent = agent_key
return best_agent, float(best_score)
def semantic_column_match(self, target_name: str, available_columns: List[str],
threshold: float = 0.6) -> Optional[Tuple[str, float]]:
"""
Find best matching column for a target name using fuzzy semantic matching.
Better than string fuzzy matching because it understands synonyms:
- "salary" matches "annual_income", "compensation", "pay"
- "target" matches "label", "class", "outcome"
Args:
target_name: Column name to find (might not exist exactly)
available_columns: List of actual column names in dataset
threshold: Minimum similarity to consider a match
Returns:
(matched_column, confidence) or None
Example:
>>> semantic_column_match("salary", ["Annual_Income", "Name", "Age"])
>>> ("Annual_Income", 0.78)
"""
if not self.enabled:
# Fallback to exact match
if target_name in available_columns:
return (target_name, 1.0)
return None
# Encode target
target_emb = self.model.encode(target_name, convert_to_numpy=True, show_progress_bar=False)
target_emb = target_emb.reshape(1, -1)
# Find best match
best_col = None
best_score = 0.0
for col in available_columns:
col_emb = self.model.encode(col, convert_to_numpy=True, show_progress_bar=False)
col_emb = col_emb.reshape(1, -1)
sim = cosine_similarity(target_emb, col_emb)[0][0]
if sim > best_score:
best_score = sim
best_col = col
if best_score >= threshold:
return (best_col, float(best_score))
return None
def enrich_dataset_info(self, dataset_info: Dict[str, Any],
file_path: str, sample_size: int = 100) -> Dict[str, Any]:
"""
Enrich dataset_info with semantic column embeddings.
Adds 'column_embeddings' and 'semantic_insights' to dataset_info.
Args:
dataset_info: Dataset info from schema_extraction
file_path: Path to CSV file
sample_size: Number of rows to sample for encoding
Returns:
Enhanced dataset_info with semantic layer
"""
if not self.enabled:
return dataset_info
try:
# Load dataset
df = pl.read_csv(file_path, n_rows=sample_size)
column_embeddings = {}
for col_name, col_info in dataset_info['columns'].items():
# Get sample values
sample_values = df[col_name].head(5).to_list()
# Create embedding
embedding = self.encode_column(
column_name=col_name,
dtype=col_info['dtype'],
sample_values=sample_values,
stats={
'unique_count': col_info.get('unique_count'),
'missing_pct': col_info.get('missing_pct'),
'mean': col_info.get('mean')
}
)
column_embeddings[col_name] = embedding
# Add to dataset_info
dataset_info['column_embeddings'] = column_embeddings
# Detect similar columns (potential duplicates)
similar_pairs = []
cols = list(column_embeddings.keys())
for i, col1 in enumerate(cols):
similar = self.find_similar_columns(col1, column_embeddings, top_k=1, threshold=0.75)
if similar:
similar_pairs.append((col1, similar[0][0], similar[0][1]))
dataset_info['semantic_insights'] = {
'similar_columns': similar_pairs,
'total_columns_embedded': len(column_embeddings)
}
print(f"🧠 Semantic layer: Embedded {len(column_embeddings)} columns")
if similar_pairs:
print(f" Found {len(similar_pairs)} similar column pairs (potential duplicates)")
except Exception as e:
print(f"⚠️ Error enriching dataset with semantic layer: {e}")
return dataset_info
# Global semantic layer instance (lazy loaded)
_semantic_layer = None
def get_semantic_layer() -> SemanticLayer:
"""Get or create global semantic layer instance."""
global _semantic_layer
if _semantic_layer is None:
_semantic_layer = SemanticLayer()
return _semantic_layer