insightgenai / modules /transformer_embeddings.py
mohsinbhatti's picture
Initial commit - InsightGenAI files
e478478
"""
Transformer Embeddings Module - InsightGenAI
============================================
Text embedding generation using SentenceTransformers.
Automatically detects text columns and generates embeddings.
Author: InsightGenAI Team
Version: 1.0.0
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional, Union
import streamlit as st
import warnings
warnings.filterwarnings('ignore')
# Try to import sentence_transformers
try:
from sentence_transformers import SentenceTransformer
ST_AVAILABLE = True
except ImportError:
ST_AVAILABLE = False
class TextEmbeddingEngine:
"""
Text embedding generation engine using SentenceTransformers.
Attributes:
model_name: Name of the SentenceTransformer model
model: Loaded SentenceTransformer model
embedding_dim: Dimension of generated embeddings
"""
# Pre-configured models for different use cases
MODEL_OPTIONS = {
'lightweight': 'all-MiniLM-L6-v2', # Fast, good quality
'balanced': 'all-mpnet-base-v2', # Better quality, slower
'high_quality': 'all-roberta-large-v1', # Best quality, slowest
'multilingual': 'paraphrase-multilingual-MiniLM-L12-v2'
}
def __init__(self, model_type: str = 'lightweight'):
"""
Initialize the Text Embedding Engine.
Args:
model_type: Type of model to use ('lightweight', 'balanced', 'high_quality', 'multilingual')
"""
if not ST_AVAILABLE:
raise ImportError(
"sentence-transformers is not installed. "
"Please install with: pip install sentence-transformers"
)
self.model_name = self.MODEL_OPTIONS.get(model_type, self.MODEL_OPTIONS['lightweight'])
self.model = None
self.embedding_dim = None
# Load model
self._load_model()
def _load_model(self) -> None:
"""Load the SentenceTransformer model."""
try:
with st.spinner(f"Loading {self.model_name} model..."):
self.model = SentenceTransformer(self.model_name)
self.embedding_dim = self.model.get_sentence_embedding_dimension()
except Exception as e:
raise RuntimeError(f"Failed to load model {self.model_name}: {str(e)}")
def generate_embeddings(self, texts: List[str],
batch_size: int = 32,
show_progress: bool = False) -> np.ndarray:
"""
Generate embeddings for a list of texts.
Args:
texts: List of text strings
batch_size: Batch size for encoding
show_progress: Whether to show progress bar
Returns:
np.ndarray: Embedding matrix (n_texts x embedding_dim)
"""
if self.model is None:
raise ValueError("Model not loaded. Please initialize the engine first.")
# Handle empty or None texts
texts = [str(t) if pd.notna(t) else "" for t in texts]
# Generate embeddings
embeddings = self.model.encode(
texts,
batch_size=batch_size,
show_progress_bar=show_progress,
convert_to_numpy=True
)
return embeddings
def process_dataframe(self, df: pd.DataFrame,
text_columns: Optional[List[str]] = None,
prefix: str = 'embed_',
drop_original: bool = False) -> pd.DataFrame:
"""
Process a dataframe and add embedding columns for text columns.
Args:
df: Input dataframe
text_columns: List of text column names (auto-detected if None)
prefix: Prefix for embedding column names
drop_original: Whether to drop original text columns
Returns:
pd.DataFrame: Dataframe with added embedding columns
"""
df_processed = df.copy()
# Auto-detect text columns if not provided
if text_columns is None:
text_columns = self._detect_text_columns(df_processed)
if not text_columns:
return df_processed
# Generate embeddings for each text column
for col in text_columns:
st.info(f"Generating embeddings for column: {col}")
embeddings = self.generate_embeddings(
df_processed[col].tolist(),
show_progress=True
)
# Add embedding columns
for i in range(embeddings.shape[1]):
col_name = f"{prefix}{col}_{i}"
df_processed[col_name] = embeddings[:, i]
# Drop original column if requested
if drop_original:
df_processed = df_processed.drop(columns=[col])
return df_processed
def _detect_text_columns(self, df: pd.DataFrame,
min_avg_length: int = 20,
max_unique_ratio: float = 0.5) -> List[str]:
"""
Automatically detect text columns in a dataframe.
Args:
df: Input dataframe
min_avg_length: Minimum average string length to consider as text
max_unique_ratio: Maximum ratio of unique values to total rows
Returns:
List of text column names
"""
text_columns = []
for col in df.columns:
if df[col].dtype == 'object':
# Calculate metrics
non_null = df[col].dropna()
if len(non_null) == 0:
continue
avg_length = non_null.astype(str).str.len().mean()
unique_ratio = non_null.nunique() / len(non_null)
# Heuristic for text detection
if avg_length >= min_avg_length and unique_ratio > max_unique_ratio:
text_columns.append(col)
return text_columns
def get_embedding_info(self) -> Dict:
"""
Get information about the embedding model.
Returns:
Dict with model information
"""
return {
'model_name': self.model_name,
'embedding_dimension': self.embedding_dim,
'max_sequence_length': self.model.get_max_seq_length() if self.model else None
}
def compute_similarity(self, text1: str, text2: str) -> float:
"""
Compute cosine similarity between two texts.
Args:
text1: First text
text2: Second text
Returns:
float: Cosine similarity score
"""
embeddings = self.generate_embeddings([text1, text2])
# Compute cosine similarity
similarity = np.dot(embeddings[0], embeddings[1]) / (
np.linalg.norm(embeddings[0]) * np.linalg.norm(embeddings[1])
)
return float(similarity)
def find_similar_texts(self, query: str,
texts: List[str],
top_k: int = 5) -> List[Tuple[int, str, float]]:
"""
Find most similar texts to a query.
Args:
query: Query text
texts: List of texts to search
top_k: Number of top results to return
Returns:
List of tuples (index, text, similarity_score)
"""
# Generate embeddings
query_embedding = self.generate_embeddings([query])
text_embeddings = self.generate_embeddings(texts)
# Compute similarities
similarities = np.dot(text_embeddings, query_embedding.T).flatten()
similarities = similarities / (
np.linalg.norm(text_embeddings, axis=1) * np.linalg.norm(query_embedding)
)
# Get top k
top_indices = np.argsort(similarities)[::-1][:top_k]
results = [
(int(idx), texts[idx], float(similarities[idx]))
for idx in top_indices
]
return results
def reduce_dimensions(self, embeddings: np.ndarray,
method: str = 'pca',
n_components: int = 2) -> np.ndarray:
"""
Reduce embedding dimensions for visualization.
Args:
embeddings: Embedding matrix
method: Dimensionality reduction method ('pca', 'tsne', 'umap')
n_components: Number of output dimensions
Returns:
np.ndarray: Reduced embeddings
"""
if method == 'pca':
from sklearn.decomposition import PCA
reducer = PCA(n_components=n_components)
elif method == 'tsne':
from sklearn.manifold import TSNE
reducer = TSNE(n_components=n_components, random_state=42)
elif method == 'umap':
try:
import umap
reducer = umap.UMAP(n_components=n_components, random_state=42)
except ImportError:
st.warning("UMAP not available, falling back to PCA")
from sklearn.decomposition import PCA
reducer = PCA(n_components=n_components)
else:
raise ValueError(f"Unknown method: {method}")
return reducer.fit_transform(embeddings)
class SimpleTextProcessor:
"""
Simple text processor when SentenceTransformers is not available.
Uses basic text features instead of embeddings.
"""
def __init__(self):
"""Initialize simple text processor."""
pass
def extract_text_features(self, df: pd.DataFrame,
text_columns: Optional[List[str]] = None) -> pd.DataFrame:
"""
Extract basic text features from text columns.
Args:
df: Input dataframe
text_columns: List of text column names
Returns:
pd.DataFrame: Dataframe with text features
"""
df_processed = df.copy()
if text_columns is None:
text_columns = [col for col in df.columns if df[col].dtype == 'object']
for col in text_columns:
# Basic text statistics
df_processed[f'{col}_length'] = df_processed[col].astype(str).str.len()
df_processed[f'{col}_word_count'] = df_processed[col].astype(str).str.split().str.len()
df_processed[f'{col}_avg_word_length'] = (
df_processed[col].astype(str).str.replace(' ', '').str.len() /
df_processed[f'{col}_word_count'].replace(0, 1)
)
df_processed[f'{col}_uppercase_ratio'] = (
df_processed[col].astype(str).apply(lambda x: sum(1 for c in str(x) if c.isupper())) /
df_processed[f'{col}_length'].replace(0, 1)
)
df_processed[f'{col}_digit_ratio'] = (
df_processed[col].astype(str).apply(lambda x: sum(1 for c in str(x) if c.isdigit())) /
df_processed[f'{col}_length'].replace(0, 1)
)
return df_processed
def _detect_text_columns(self, df: pd.DataFrame) -> List[str]:
"""Detect text columns in dataframe."""
text_columns = []
for col in df.columns:
if df[col].dtype == 'object':
non_null = df[col].dropna()
if len(non_null) > 0:
avg_length = non_null.astype(str).str.len().mean()
if avg_length > 20:
text_columns.append(col)
return text_columns
def create_embedding_engine(model_type: str = 'lightweight'):
"""
Factory function to create appropriate embedding engine.
Args:
model_type: Type of model to use
Returns:
TextEmbeddingEngine or SimpleTextProcessor instance
"""
if ST_AVAILABLE:
try:
return TextEmbeddingEngine(model_type)
except Exception as e:
st.warning(f"SentenceTransformer failed, using simple text features: {str(e)}")
return SimpleTextProcessor()
else:
return SimpleTextProcessor()
# Streamlit display functions
def display_text_embeddings_ui(df: pd.DataFrame):
"""Display text embedding UI in Streamlit."""
st.subheader("📝 Text Embedding Generation")
if not ST_AVAILABLE:
st.warning(
"SentenceTransformers is not installed. "
"Using basic text features instead of embeddings."
)
processor = SimpleTextProcessor()
text_cols = processor._detect_text_columns(df)
if text_cols:
st.write(f"Detected text columns: {', '.join(text_cols)}")
if st.button("Extract Text Features"):
df_processed = processor.extract_text_features(df, text_cols)
st.success(f"Extracted features! New shape: {df_processed.shape}")
st.dataframe(df_processed.head(), use_container_width=True)
return df_processed
else:
st.info("No text columns detected.")
return df
# SentenceTransformers available
model_type = st.selectbox(
"Select Model",
options=list(TextEmbeddingEngine.MODEL_OPTIONS.keys()),
format_func=lambda x: f"{x} ({TextEmbeddingEngine.MODEL_OPTIONS[x]})"
)
# Detect text columns
temp_engine = TextEmbeddingEngine.__new__(TextEmbeddingEngine)
text_cols = temp_engine._detect_text_columns(df)
if text_cols:
st.write(f"Detected text columns: {', '.join(text_cols)}")
selected_cols = st.multiselect(
"Select columns to process",
options=text_cols,
default=text_cols
)
drop_original = st.checkbox("Drop original text columns", value=False)
if st.button("Generate Embeddings") and selected_cols:
try:
engine = TextEmbeddingEngine(model_type)
info = engine.get_embedding_info()
st.write(f"Model: {info['model_name']}")
st.write(f"Embedding dimension: {info['embedding_dimension']}")
df_processed = engine.process_dataframe(
df, selected_cols, drop_original=drop_original
)
st.success(f"Embeddings generated! New shape: {df_processed.shape}")
st.dataframe(df_processed.head(), use_container_width=True)
return df_processed
except Exception as e:
st.error(f"Error generating embeddings: {str(e)}")
else:
st.info("No text columns detected in the dataset.")
return df