""" Feature extraction module for skill classification. This module provides functions to extract features from the SkillScope dataset, starting with TF-IDF vectorization of textual data from pull request issues. Dataset Information (from nlbse_tool_competition_data_by_issue): - 7,154 issues from 11 Java repositories - 226 total columns: - 2 text columns: 'issue text' (title) and 'issue description' (body) - metadata and other columns containing PR/file/context information - 217 label columns: domain/subdomain skill labels (142 active labels in this DB) Label Characteristics: - Multi-label classification problem - Average 32.9 labels per issue (median: 31) - Highly imbalanced: some labels appear in all issues, others in very few - Top labels: Language, Data Structure, DevOps, Error Handling """ from pathlib import Path import re import sqlite3 from typing import Optional, Tuple import joblib # Import per lo Stemming from nltk.stem import PorterStemmer import numpy as np import pandas as pd from sklearn.feature_extraction.text import TfidfVectorizer from hopcroft_skill_classification_tool_competition.config import ( MODELS_DIR, PROCESSED_DATA_DIR, RAW_DATA_DIR, ) # Inizializza lo stemmer una volta per efficienza stemmer = PorterStemmer() def clean_github_text(text: str, use_stemming: bool = True) -> str: """ Clean GitHub issue text as per SkillScope paper (Aracena et al. process). Removes emojis, URLs, HTML tags, and other noise commonly found in GitHub text. Optionally applies stemming. Args: text: Raw text from GitHub issue use_stemming: If True, apply Porter stemming (recommended for TF-IDF). If False, keep original words (recommended for Embeddings/LLMs). Returns: Cleaned text string (stemmed if use_stemming=True) """ if pd.isna(text) or text is None: return "" text = str(text) # Remove URLs (http/httpss/www) text = re.sub(r"http\S+|www\.\S+", "", text) # Remove HTML tags text = re.sub(r"<[^>]+>", "", text) # Remove markdown code blocks text = re.sub(r"```[\s\S]*?```", "", text) # Remove inline code text = re.sub(r"`[^`]*`", "", text) # Remove emojis and non-ASCII characters text = text.encode("ascii", "ignore").decode("ascii") # Remove extra whitespace text = re.sub(r"\s+", " ", text) text = text.strip() # Stemming condizionale: solo per TF-IDF, non per Embeddings if use_stemming: try: tokens = text.split() stemmed_tokens = [stemmer.stem(token) for token in tokens] text = " ".join(stemmed_tokens) except Exception as e: print(f"Warning: Stemming failed for text snippet '{text[:50]}...'. Error: {e}") # Ritorna il testo pulito ma non stemmato in caso di errore return text.strip() return text def get_dataset_info(df: pd.DataFrame) -> dict: """ Get summary information about the dataset. Args: df: Input dataframe Returns: Dictionary containing dataset statistics """ text_cols = get_text_columns(df) label_cols = get_label_columns(df) # Convert to binary labels binary_labels = (df[label_cols] > 0).astype(int) labels_per_issue = binary_labels.sum(axis=1) issues_per_label = binary_labels.sum(axis=0) info = { "total_issues": len(df), "total_columns": len(df.columns), "text_columns": text_cols, "num_text_columns": len(text_cols), "label_columns": label_cols, "num_labels": len(label_cols), "avg_labels_per_issue": labels_per_issue.mean(), "median_labels_per_issue": labels_per_issue.median(), "max_labels_per_issue": labels_per_issue.max(), "min_labels_per_issue": labels_per_issue.min(), "avg_issues_per_label": issues_per_label.mean(), "labels_with_no_issues": (issues_per_label == 0).sum(), } return info def load_data_from_db(db_path: Optional[Path] = None) -> pd.DataFrame: """ Load data from the SQLite database. Args: db_path: Path to the SQLite database file. If None, uses default path in data/raw/skillscope_data.db Returns: DataFrame containing the nlbse_tool_competition_data_by_issue table """ if db_path is None: db_path = RAW_DATA_DIR / "skillscope_data.db" conn = sqlite3.connect(db_path) # Load the main table query = "SELECT * FROM nlbse_tool_competition_data_by_issue" df = pd.read_sql_query(query, conn) conn.close() print(f"Loaded {len(df)} records from database") return df def get_text_columns(df: pd.DataFrame) -> list: """ Identify text columns in the dataframe (typically issue title, body, etc.). Args: df: Input dataframe Returns: List of column names containing textual data """ # Text columns from SkillScope database schema # Based on exploration: issue text (title) and issue description (body) text_cols = ["issue text", "issue description"] return [col for col in text_cols if col in df.columns] def get_label_columns(df: pd.DataFrame) -> list: """ Identify label columns (domains/subdomains with API counts). Args: df: Input dataframe Returns: List of column names containing labels """ # Metadata columns to exclude from labels # Based on exploration: these are not skill labels exclude_cols = [ "Repo Name", "PR #", "issue text", "issue description", "created_at", "author_name", ] # Label columns are numeric but not metadata. Use pandas is_numeric_dtype # to be robust to dtype representations. from pandas.api.types import is_numeric_dtype label_cols = [ col for col in df.columns if col not in exclude_cols and is_numeric_dtype(df[col]) ] return label_cols def combine_text_fields( df: pd.DataFrame, text_columns: list, use_stemming: bool = True ) -> pd.Series: """ Combine multiple text fields into a single text representation. Applies text cleaning as per SkillScope paper. Args: df: Input dataframe text_columns: List of column names to combine use_stemming: If True, apply stemming (for TF-IDF). If False, keep original words (for Embeddings). Returns: Series containing cleaned and combined text for each row """ # Apply cleaning to each text column and then combine combined_text = ( df[text_columns] .fillna("") .astype(str) .apply( lambda x: " ".join( x.map(lambda text: clean_github_text(text, use_stemming=use_stemming)) ), axis=1, ) ) return combined_text def extract_tfidf_features( df: pd.DataFrame, text_columns: Optional[list] = None, max_features: Optional[int] = 2000, min_df: int = 2, max_df: float = 0.95, ngram_range: Tuple[int, int] = (1, 2), ) -> Tuple[np.ndarray, TfidfVectorizer]: """ Extract TF-IDF features from textual data. Args: df: Input dataframe text_columns: List of text columns to use. If None, auto-detect. max_features: Maximum number of features to extract (default: 2000 for balanced sparsity) min_df: Minimum document frequency for a term to be included max_df: Maximum document frequency (ignore terms appearing in >max_df of docs) ngram_range: Range of n-grams to consider (e.g., (1,2) for unigrams and bigrams) Returns: Tuple of (feature matrix, fitted vectorizer) """ if text_columns is None: text_columns = get_text_columns(df) if not text_columns: raise ValueError("No text columns found in dataframe") # Combine text fields (with stemming for TF-IDF) print(f"Combining text from columns: {text_columns}") combined_text = combine_text_fields(df, text_columns, use_stemming=True) # Initialize TF-IDF vectorizer vectorizer = TfidfVectorizer( max_features=max_features, min_df=min_df, max_df=max_df, ngram_range=ngram_range, stop_words="english", lowercase=True, strip_accents="unicode", ) # Fit and transform print( f"Extracting TF-IDF features with max_features={max_features if max_features else 'All'}, " f"ngram_range={ngram_range}" ) tfidf_matrix = vectorizer.fit_transform(combined_text) print( f"Extracted {tfidf_matrix.shape[1]} TF-IDF features from {tfidf_matrix.shape[0]} samples" ) return tfidf_matrix.toarray(), vectorizer def extract_embedding_features( df: pd.DataFrame, text_columns: Optional[list] = None, model_name: str = "all-MiniLM-L6-v2", batch_size: int = 32, ) -> Tuple[np.ndarray, object]: """ Extract LLM embeddings from textual data using Sentence Transformers. Args: df: Input dataframe text_columns: List of text columns to use. If None, auto-detect. model_name: Name of the pre-trained model to use batch_size: Batch size for encoding Returns: Tuple of (feature matrix, model object) """ try: from sentence_transformers import SentenceTransformer except ImportError as e: raise ImportError( f"sentence-transformers import failed: {e}. Try running: pip install sentence-transformers" ) from e if text_columns is None: text_columns = get_text_columns(df) if not text_columns: raise ValueError("No text columns found in dataframe") # Combine text fields (without stemming for embeddings - LLMs need full words) print(f"Combining text from columns: {text_columns}") combined_text = combine_text_fields(df, text_columns, use_stemming=False) # Load model print(f"Loading embedding model: {model_name}") model = SentenceTransformer(model_name) # Encode print(f"Extracting embeddings for {len(combined_text)} samples...") embeddings = model.encode( combined_text.tolist(), batch_size=batch_size, show_progress_bar=True, convert_to_numpy=True, ) print(f"Extracted embeddings shape: {embeddings.shape}") return embeddings, model def prepare_labels(df: pd.DataFrame, label_columns: Optional[list] = None) -> pd.DataFrame: """ Prepare multi-label binary matrix from label columns. Args: df: Input dataframe label_columns: List of label columns. If None, auto-detect. Returns: DataFrame with binary labels (1 if label present, 0 otherwise) """ if label_columns is None: label_columns = get_label_columns(df) # Convert to binary: any value > 0 means label is present labels = (df[label_columns] > 0).astype(int) print(f"Prepared {len(label_columns)} labels") print(f"Label distribution:\n{labels.sum().describe()}") return labels def create_feature_dataset( db_path: Optional[Path] = None, save_processed: bool = True, feature_type: str = "tfidf", # 'tfidf' or 'embedding' model_name: str = "all-MiniLM-L6-v2", ) -> Tuple[np.ndarray, pd.DataFrame, list, list]: """ Main function to create the complete feature dataset. Args: db_path: Path to SQLite database save_processed: Whether to save processed data to disk feature_type: Type of features to extract ('tfidf' or 'embedding') model_name: Model name for embeddings (ignored if feature_type='tfidf') Returns: Tuple of (features, labels, feature_names, label_names) """ # Load data df = load_data_from_db(db_path) # Get dataset info info = get_dataset_info(df) print("\n=== Dataset Information ===") print(f"Total issues: {info['total_issues']:,}") print(f"Text columns: {info['text_columns']}") print(f"Number of labels: {info['num_labels']}") print(f"Avg labels per issue: {info['avg_labels_per_issue']:.2f}") print(f"Labels with no issues: {info['labels_with_no_issues']}") # Extract features text_columns = get_text_columns(df) label_columns = get_label_columns(df) feature_names = [] vectorizer = None if feature_type == "tfidf": features, vectorizer = extract_tfidf_features(df, text_columns=text_columns) feature_names = vectorizer.get_feature_names_out() elif feature_type == "embedding": features, _ = extract_embedding_features( df, text_columns=text_columns, model_name=model_name ) feature_names = [f"emb_{i}" for i in range(features.shape[1])] else: raise ValueError(f"Unknown feature_type: {feature_type}") # Prepare labels labels = prepare_labels(df, label_columns) # Save processed data if save_processed: # Path: processed/{feature_type}/ output_dir = PROCESSED_DATA_DIR / feature_type output_dir.mkdir(parents=True, exist_ok=True) features_path = output_dir / f"features_{feature_type}.npy" labels_path = output_dir / f"labels_{feature_type}.npy" np.save(features_path, features) np.save(labels_path, labels.values) print(f"\nSaved processed data to {output_dir}") print(f" - {features_path.name}: {features.shape}") print(f" - {labels_path.name}: {labels.shape}") # Save vectorizer and label names to models/ directory for inference MODELS_DIR.mkdir(parents=True, exist_ok=True) if feature_type == "tfidf" and vectorizer is not None: vectorizer_path = MODELS_DIR / "tfidf_vectorizer.pkl" joblib.dump(vectorizer, vectorizer_path) print(f" - Saved TF-IDF vectorizer to: {vectorizer_path}") # Always save label names (needed for both tfidf and embedding inference) label_names_path = MODELS_DIR / "label_names.pkl" joblib.dump(label_columns, label_names_path) print(f" - Saved {len(label_columns)} label names to: {label_names_path}") return features, labels, feature_names, label_columns def load_processed_data( feature_name: str = "tfidf", data_dir: Optional[Path] = None ) -> Tuple[np.ndarray, np.ndarray]: """ Load processed features and labels from disk. Args: feature_name: Name prefix of the features to load (e.g., 'tfidf', 'bow', 'embeddings') data_dir: Path to processed data directory. If None, uses default. Returns: Tuple of (features, labels) """ if data_dir is None: data_dir = PROCESSED_DATA_DIR features_path = data_dir / f"features_{feature_name}.npy" labels_path = data_dir / f"labels_{feature_name}.npy" features = np.load(features_path) labels = np.load(labels_path) print(f"Loaded processed data from {data_dir}") print(f" - Feature type: {feature_name}") print(f" - Features shape: {features.shape}") print(f" - Labels shape: {labels.shape}") return features, labels if __name__ == "__main__": features, labels, feature_names, label_names = create_feature_dataset(feature_type="embedding") print("\n=== Feature Extraction Summary ===") print(f"Features shape: {features.shape}") print(f"Labels shape: {labels.shape}") print(f"Number of feature names: {len(feature_names)}") print(f"Number of labels: {len(label_names)}")