Spaces:
Sleeping
Sleeping
| """ | |
| Transformer Embeddings Module - InsightGenAI | |
| ============================================ | |
| Text embedding generation using SentenceTransformers. | |
| Automatically detects text columns and generates embeddings. | |
| Author: InsightGenAI Team | |
| Version: 1.0.0 | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Tuple, Optional, Union | |
| import streamlit as st | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| # Try to import sentence_transformers | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| ST_AVAILABLE = True | |
| except ImportError: | |
| ST_AVAILABLE = False | |
| class TextEmbeddingEngine: | |
| """ | |
| Text embedding generation engine using SentenceTransformers. | |
| Attributes: | |
| model_name: Name of the SentenceTransformer model | |
| model: Loaded SentenceTransformer model | |
| embedding_dim: Dimension of generated embeddings | |
| """ | |
| # Pre-configured models for different use cases | |
| MODEL_OPTIONS = { | |
| 'lightweight': 'all-MiniLM-L6-v2', # Fast, good quality | |
| 'balanced': 'all-mpnet-base-v2', # Better quality, slower | |
| 'high_quality': 'all-roberta-large-v1', # Best quality, slowest | |
| 'multilingual': 'paraphrase-multilingual-MiniLM-L12-v2' | |
| } | |
| def __init__(self, model_type: str = 'lightweight'): | |
| """ | |
| Initialize the Text Embedding Engine. | |
| Args: | |
| model_type: Type of model to use ('lightweight', 'balanced', 'high_quality', 'multilingual') | |
| """ | |
| if not ST_AVAILABLE: | |
| raise ImportError( | |
| "sentence-transformers is not installed. " | |
| "Please install with: pip install sentence-transformers" | |
| ) | |
| self.model_name = self.MODEL_OPTIONS.get(model_type, self.MODEL_OPTIONS['lightweight']) | |
| self.model = None | |
| self.embedding_dim = None | |
| # Load model | |
| self._load_model() | |
| def _load_model(self) -> None: | |
| """Load the SentenceTransformer model.""" | |
| try: | |
| with st.spinner(f"Loading {self.model_name} model..."): | |
| self.model = SentenceTransformer(self.model_name) | |
| self.embedding_dim = self.model.get_sentence_embedding_dimension() | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to load model {self.model_name}: {str(e)}") | |
| def generate_embeddings(self, texts: List[str], | |
| batch_size: int = 32, | |
| show_progress: bool = False) -> np.ndarray: | |
| """ | |
| Generate embeddings for a list of texts. | |
| Args: | |
| texts: List of text strings | |
| batch_size: Batch size for encoding | |
| show_progress: Whether to show progress bar | |
| Returns: | |
| np.ndarray: Embedding matrix (n_texts x embedding_dim) | |
| """ | |
| if self.model is None: | |
| raise ValueError("Model not loaded. Please initialize the engine first.") | |
| # Handle empty or None texts | |
| texts = [str(t) if pd.notna(t) else "" for t in texts] | |
| # Generate embeddings | |
| embeddings = self.model.encode( | |
| texts, | |
| batch_size=batch_size, | |
| show_progress_bar=show_progress, | |
| convert_to_numpy=True | |
| ) | |
| return embeddings | |
| def process_dataframe(self, df: pd.DataFrame, | |
| text_columns: Optional[List[str]] = None, | |
| prefix: str = 'embed_', | |
| drop_original: bool = False) -> pd.DataFrame: | |
| """ | |
| Process a dataframe and add embedding columns for text columns. | |
| Args: | |
| df: Input dataframe | |
| text_columns: List of text column names (auto-detected if None) | |
| prefix: Prefix for embedding column names | |
| drop_original: Whether to drop original text columns | |
| Returns: | |
| pd.DataFrame: Dataframe with added embedding columns | |
| """ | |
| df_processed = df.copy() | |
| # Auto-detect text columns if not provided | |
| if text_columns is None: | |
| text_columns = self._detect_text_columns(df_processed) | |
| if not text_columns: | |
| return df_processed | |
| # Generate embeddings for each text column | |
| for col in text_columns: | |
| st.info(f"Generating embeddings for column: {col}") | |
| embeddings = self.generate_embeddings( | |
| df_processed[col].tolist(), | |
| show_progress=True | |
| ) | |
| # Add embedding columns | |
| for i in range(embeddings.shape[1]): | |
| col_name = f"{prefix}{col}_{i}" | |
| df_processed[col_name] = embeddings[:, i] | |
| # Drop original column if requested | |
| if drop_original: | |
| df_processed = df_processed.drop(columns=[col]) | |
| return df_processed | |
| def _detect_text_columns(self, df: pd.DataFrame, | |
| min_avg_length: int = 20, | |
| max_unique_ratio: float = 0.5) -> List[str]: | |
| """ | |
| Automatically detect text columns in a dataframe. | |
| Args: | |
| df: Input dataframe | |
| min_avg_length: Minimum average string length to consider as text | |
| max_unique_ratio: Maximum ratio of unique values to total rows | |
| Returns: | |
| List of text column names | |
| """ | |
| text_columns = [] | |
| for col in df.columns: | |
| if df[col].dtype == 'object': | |
| # Calculate metrics | |
| non_null = df[col].dropna() | |
| if len(non_null) == 0: | |
| continue | |
| avg_length = non_null.astype(str).str.len().mean() | |
| unique_ratio = non_null.nunique() / len(non_null) | |
| # Heuristic for text detection | |
| if avg_length >= min_avg_length and unique_ratio > max_unique_ratio: | |
| text_columns.append(col) | |
| return text_columns | |
| def get_embedding_info(self) -> Dict: | |
| """ | |
| Get information about the embedding model. | |
| Returns: | |
| Dict with model information | |
| """ | |
| return { | |
| 'model_name': self.model_name, | |
| 'embedding_dimension': self.embedding_dim, | |
| 'max_sequence_length': self.model.get_max_seq_length() if self.model else None | |
| } | |
| def compute_similarity(self, text1: str, text2: str) -> float: | |
| """ | |
| Compute cosine similarity between two texts. | |
| Args: | |
| text1: First text | |
| text2: Second text | |
| Returns: | |
| float: Cosine similarity score | |
| """ | |
| embeddings = self.generate_embeddings([text1, text2]) | |
| # Compute cosine similarity | |
| similarity = np.dot(embeddings[0], embeddings[1]) / ( | |
| np.linalg.norm(embeddings[0]) * np.linalg.norm(embeddings[1]) | |
| ) | |
| return float(similarity) | |
| def find_similar_texts(self, query: str, | |
| texts: List[str], | |
| top_k: int = 5) -> List[Tuple[int, str, float]]: | |
| """ | |
| Find most similar texts to a query. | |
| Args: | |
| query: Query text | |
| texts: List of texts to search | |
| top_k: Number of top results to return | |
| Returns: | |
| List of tuples (index, text, similarity_score) | |
| """ | |
| # Generate embeddings | |
| query_embedding = self.generate_embeddings([query]) | |
| text_embeddings = self.generate_embeddings(texts) | |
| # Compute similarities | |
| similarities = np.dot(text_embeddings, query_embedding.T).flatten() | |
| similarities = similarities / ( | |
| np.linalg.norm(text_embeddings, axis=1) * np.linalg.norm(query_embedding) | |
| ) | |
| # Get top k | |
| top_indices = np.argsort(similarities)[::-1][:top_k] | |
| results = [ | |
| (int(idx), texts[idx], float(similarities[idx])) | |
| for idx in top_indices | |
| ] | |
| return results | |
| def reduce_dimensions(self, embeddings: np.ndarray, | |
| method: str = 'pca', | |
| n_components: int = 2) -> np.ndarray: | |
| """ | |
| Reduce embedding dimensions for visualization. | |
| Args: | |
| embeddings: Embedding matrix | |
| method: Dimensionality reduction method ('pca', 'tsne', 'umap') | |
| n_components: Number of output dimensions | |
| Returns: | |
| np.ndarray: Reduced embeddings | |
| """ | |
| if method == 'pca': | |
| from sklearn.decomposition import PCA | |
| reducer = PCA(n_components=n_components) | |
| elif method == 'tsne': | |
| from sklearn.manifold import TSNE | |
| reducer = TSNE(n_components=n_components, random_state=42) | |
| elif method == 'umap': | |
| try: | |
| import umap | |
| reducer = umap.UMAP(n_components=n_components, random_state=42) | |
| except ImportError: | |
| st.warning("UMAP not available, falling back to PCA") | |
| from sklearn.decomposition import PCA | |
| reducer = PCA(n_components=n_components) | |
| else: | |
| raise ValueError(f"Unknown method: {method}") | |
| return reducer.fit_transform(embeddings) | |
| class SimpleTextProcessor: | |
| """ | |
| Simple text processor when SentenceTransformers is not available. | |
| Uses basic text features instead of embeddings. | |
| """ | |
| def __init__(self): | |
| """Initialize simple text processor.""" | |
| pass | |
| def extract_text_features(self, df: pd.DataFrame, | |
| text_columns: Optional[List[str]] = None) -> pd.DataFrame: | |
| """ | |
| Extract basic text features from text columns. | |
| Args: | |
| df: Input dataframe | |
| text_columns: List of text column names | |
| Returns: | |
| pd.DataFrame: Dataframe with text features | |
| """ | |
| df_processed = df.copy() | |
| if text_columns is None: | |
| text_columns = [col for col in df.columns if df[col].dtype == 'object'] | |
| for col in text_columns: | |
| # Basic text statistics | |
| df_processed[f'{col}_length'] = df_processed[col].astype(str).str.len() | |
| df_processed[f'{col}_word_count'] = df_processed[col].astype(str).str.split().str.len() | |
| df_processed[f'{col}_avg_word_length'] = ( | |
| df_processed[col].astype(str).str.replace(' ', '').str.len() / | |
| df_processed[f'{col}_word_count'].replace(0, 1) | |
| ) | |
| df_processed[f'{col}_uppercase_ratio'] = ( | |
| df_processed[col].astype(str).apply(lambda x: sum(1 for c in str(x) if c.isupper())) / | |
| df_processed[f'{col}_length'].replace(0, 1) | |
| ) | |
| df_processed[f'{col}_digit_ratio'] = ( | |
| df_processed[col].astype(str).apply(lambda x: sum(1 for c in str(x) if c.isdigit())) / | |
| df_processed[f'{col}_length'].replace(0, 1) | |
| ) | |
| return df_processed | |
| def _detect_text_columns(self, df: pd.DataFrame) -> List[str]: | |
| """Detect text columns in dataframe.""" | |
| text_columns = [] | |
| for col in df.columns: | |
| if df[col].dtype == 'object': | |
| non_null = df[col].dropna() | |
| if len(non_null) > 0: | |
| avg_length = non_null.astype(str).str.len().mean() | |
| if avg_length > 20: | |
| text_columns.append(col) | |
| return text_columns | |
| def create_embedding_engine(model_type: str = 'lightweight'): | |
| """ | |
| Factory function to create appropriate embedding engine. | |
| Args: | |
| model_type: Type of model to use | |
| Returns: | |
| TextEmbeddingEngine or SimpleTextProcessor instance | |
| """ | |
| if ST_AVAILABLE: | |
| try: | |
| return TextEmbeddingEngine(model_type) | |
| except Exception as e: | |
| st.warning(f"SentenceTransformer failed, using simple text features: {str(e)}") | |
| return SimpleTextProcessor() | |
| else: | |
| return SimpleTextProcessor() | |
| # Streamlit display functions | |
| def display_text_embeddings_ui(df: pd.DataFrame): | |
| """Display text embedding UI in Streamlit.""" | |
| st.subheader("📝 Text Embedding Generation") | |
| if not ST_AVAILABLE: | |
| st.warning( | |
| "SentenceTransformers is not installed. " | |
| "Using basic text features instead of embeddings." | |
| ) | |
| processor = SimpleTextProcessor() | |
| text_cols = processor._detect_text_columns(df) | |
| if text_cols: | |
| st.write(f"Detected text columns: {', '.join(text_cols)}") | |
| if st.button("Extract Text Features"): | |
| df_processed = processor.extract_text_features(df, text_cols) | |
| st.success(f"Extracted features! New shape: {df_processed.shape}") | |
| st.dataframe(df_processed.head(), use_container_width=True) | |
| return df_processed | |
| else: | |
| st.info("No text columns detected.") | |
| return df | |
| # SentenceTransformers available | |
| model_type = st.selectbox( | |
| "Select Model", | |
| options=list(TextEmbeddingEngine.MODEL_OPTIONS.keys()), | |
| format_func=lambda x: f"{x} ({TextEmbeddingEngine.MODEL_OPTIONS[x]})" | |
| ) | |
| # Detect text columns | |
| temp_engine = TextEmbeddingEngine.__new__(TextEmbeddingEngine) | |
| text_cols = temp_engine._detect_text_columns(df) | |
| if text_cols: | |
| st.write(f"Detected text columns: {', '.join(text_cols)}") | |
| selected_cols = st.multiselect( | |
| "Select columns to process", | |
| options=text_cols, | |
| default=text_cols | |
| ) | |
| drop_original = st.checkbox("Drop original text columns", value=False) | |
| if st.button("Generate Embeddings") and selected_cols: | |
| try: | |
| engine = TextEmbeddingEngine(model_type) | |
| info = engine.get_embedding_info() | |
| st.write(f"Model: {info['model_name']}") | |
| st.write(f"Embedding dimension: {info['embedding_dimension']}") | |
| df_processed = engine.process_dataframe( | |
| df, selected_cols, drop_original=drop_original | |
| ) | |
| st.success(f"Embeddings generated! New shape: {df_processed.shape}") | |
| st.dataframe(df_processed.head(), use_container_width=True) | |
| return df_processed | |
| except Exception as e: | |
| st.error(f"Error generating embeddings: {str(e)}") | |
| else: | |
| st.info("No text columns detected in the dataset.") | |
| return df | |