Spaces:
Sleeping
Sleeping
| """ | |
| Feature extraction module for skill classification. | |
| This module provides functions to extract features from the SkillScope dataset, | |
| starting with TF-IDF vectorization of textual data from pull request issues. | |
| Dataset Information (from nlbse_tool_competition_data_by_issue): | |
| - 7,154 issues from 11 Java repositories | |
| - 226 total columns: | |
| - 2 text columns: 'issue text' (title) and 'issue description' (body) | |
| - metadata and other columns containing PR/file/context information | |
| - 217 label columns: domain/subdomain skill labels (142 active labels in this DB) | |
| Label Characteristics: | |
| - Multi-label classification problem | |
| - Average 32.9 labels per issue (median: 31) | |
| - Highly imbalanced: some labels appear in all issues, others in very few | |
| - Top labels: Language, Data Structure, DevOps, Error Handling | |
| """ | |
| from pathlib import Path | |
| import re | |
| import sqlite3 | |
| from typing import Optional, Tuple | |
| import joblib | |
| # Import per lo Stemming | |
| from nltk.stem import PorterStemmer | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from hopcroft_skill_classification_tool_competition.config import ( | |
| MODELS_DIR, | |
| PROCESSED_DATA_DIR, | |
| RAW_DATA_DIR, | |
| ) | |
| # Inizializza lo stemmer una volta per efficienza | |
| stemmer = PorterStemmer() | |
| def clean_github_text(text: str, use_stemming: bool = True) -> str: | |
| """ | |
| Clean GitHub issue text as per SkillScope paper (Aracena et al. process). | |
| Removes emojis, URLs, HTML tags, and other noise commonly found in GitHub text. | |
| Optionally applies stemming. | |
| Args: | |
| text: Raw text from GitHub issue | |
| use_stemming: If True, apply Porter stemming (recommended for TF-IDF). | |
| If False, keep original words (recommended for Embeddings/LLMs). | |
| Returns: | |
| Cleaned text string (stemmed if use_stemming=True) | |
| """ | |
| if pd.isna(text) or text is None: | |
| return "" | |
| text = str(text) | |
| # Remove URLs (http/httpss/www) | |
| text = re.sub(r"http\S+|www\.\S+", "", text) | |
| # Remove HTML tags | |
| text = re.sub(r"<[^>]+>", "", text) | |
| # Remove markdown code blocks | |
| text = re.sub(r"```[\s\S]*?```", "", text) | |
| # Remove inline code | |
| text = re.sub(r"`[^`]*`", "", text) | |
| # Remove emojis and non-ASCII characters | |
| text = text.encode("ascii", "ignore").decode("ascii") | |
| # Remove extra whitespace | |
| text = re.sub(r"\s+", " ", text) | |
| text = text.strip() | |
| # Stemming condizionale: solo per TF-IDF, non per Embeddings | |
| if use_stemming: | |
| try: | |
| tokens = text.split() | |
| stemmed_tokens = [stemmer.stem(token) for token in tokens] | |
| text = " ".join(stemmed_tokens) | |
| except Exception as e: | |
| print(f"Warning: Stemming failed for text snippet '{text[:50]}...'. Error: {e}") | |
| # Ritorna il testo pulito ma non stemmato in caso di errore | |
| return text.strip() | |
| return text | |
| def get_dataset_info(df: pd.DataFrame) -> dict: | |
| """ | |
| Get summary information about the dataset. | |
| Args: | |
| df: Input dataframe | |
| Returns: | |
| Dictionary containing dataset statistics | |
| """ | |
| text_cols = get_text_columns(df) | |
| label_cols = get_label_columns(df) | |
| # Convert to binary labels | |
| binary_labels = (df[label_cols] > 0).astype(int) | |
| labels_per_issue = binary_labels.sum(axis=1) | |
| issues_per_label = binary_labels.sum(axis=0) | |
| info = { | |
| "total_issues": len(df), | |
| "total_columns": len(df.columns), | |
| "text_columns": text_cols, | |
| "num_text_columns": len(text_cols), | |
| "label_columns": label_cols, | |
| "num_labels": len(label_cols), | |
| "avg_labels_per_issue": labels_per_issue.mean(), | |
| "median_labels_per_issue": labels_per_issue.median(), | |
| "max_labels_per_issue": labels_per_issue.max(), | |
| "min_labels_per_issue": labels_per_issue.min(), | |
| "avg_issues_per_label": issues_per_label.mean(), | |
| "labels_with_no_issues": (issues_per_label == 0).sum(), | |
| } | |
| return info | |
| def load_data_from_db(db_path: Optional[Path] = None) -> pd.DataFrame: | |
| """ | |
| Load data from the SQLite database. | |
| Args: | |
| db_path: Path to the SQLite database file. | |
| If None, uses default path in data/raw/skillscope_data.db | |
| Returns: | |
| DataFrame containing the nlbse_tool_competition_data_by_issue table | |
| """ | |
| if db_path is None: | |
| db_path = RAW_DATA_DIR / "skillscope_data.db" | |
| conn = sqlite3.connect(db_path) | |
| # Load the main table | |
| query = "SELECT * FROM nlbse_tool_competition_data_by_issue" | |
| df = pd.read_sql_query(query, conn) | |
| conn.close() | |
| print(f"Loaded {len(df)} records from database") | |
| return df | |
| def get_text_columns(df: pd.DataFrame) -> list: | |
| """ | |
| Identify text columns in the dataframe (typically issue title, body, etc.). | |
| Args: | |
| df: Input dataframe | |
| Returns: | |
| List of column names containing textual data | |
| """ | |
| # Text columns from SkillScope database schema | |
| # Based on exploration: issue text (title) and issue description (body) | |
| text_cols = ["issue text", "issue description"] | |
| return [col for col in text_cols if col in df.columns] | |
| def get_label_columns(df: pd.DataFrame) -> list: | |
| """ | |
| Identify label columns (domains/subdomains with API counts). | |
| Args: | |
| df: Input dataframe | |
| Returns: | |
| List of column names containing labels | |
| """ | |
| # Metadata columns to exclude from labels | |
| # Based on exploration: these are not skill labels | |
| exclude_cols = [ | |
| "Repo Name", | |
| "PR #", | |
| "issue text", | |
| "issue description", | |
| "created_at", | |
| "author_name", | |
| ] | |
| # Label columns are numeric but not metadata. Use pandas is_numeric_dtype | |
| # to be robust to dtype representations. | |
| from pandas.api.types import is_numeric_dtype | |
| label_cols = [ | |
| col for col in df.columns if col not in exclude_cols and is_numeric_dtype(df[col]) | |
| ] | |
| return label_cols | |
| def combine_text_fields( | |
| df: pd.DataFrame, text_columns: list, use_stemming: bool = True | |
| ) -> pd.Series: | |
| """ | |
| Combine multiple text fields into a single text representation. | |
| Applies text cleaning as per SkillScope paper. | |
| Args: | |
| df: Input dataframe | |
| text_columns: List of column names to combine | |
| use_stemming: If True, apply stemming (for TF-IDF). If False, keep original words (for Embeddings). | |
| Returns: | |
| Series containing cleaned and combined text for each row | |
| """ | |
| # Apply cleaning to each text column and then combine | |
| combined_text = ( | |
| df[text_columns] | |
| .fillna("") | |
| .astype(str) | |
| .apply( | |
| lambda x: " ".join( | |
| x.map(lambda text: clean_github_text(text, use_stemming=use_stemming)) | |
| ), | |
| axis=1, | |
| ) | |
| ) | |
| return combined_text | |
| def extract_tfidf_features( | |
| df: pd.DataFrame, | |
| text_columns: Optional[list] = None, | |
| max_features: Optional[int] = 2000, | |
| min_df: int = 2, | |
| max_df: float = 0.95, | |
| ngram_range: Tuple[int, int] = (1, 2), | |
| ) -> Tuple[np.ndarray, TfidfVectorizer]: | |
| """ | |
| Extract TF-IDF features from textual data. | |
| Args: | |
| df: Input dataframe | |
| text_columns: List of text columns to use. If None, auto-detect. | |
| max_features: Maximum number of features to extract (default: 2000 for balanced sparsity) | |
| min_df: Minimum document frequency for a term to be included | |
| max_df: Maximum document frequency (ignore terms appearing in >max_df of docs) | |
| ngram_range: Range of n-grams to consider (e.g., (1,2) for unigrams and bigrams) | |
| Returns: | |
| Tuple of (feature matrix, fitted vectorizer) | |
| """ | |
| if text_columns is None: | |
| text_columns = get_text_columns(df) | |
| if not text_columns: | |
| raise ValueError("No text columns found in dataframe") | |
| # Combine text fields (with stemming for TF-IDF) | |
| print(f"Combining text from columns: {text_columns}") | |
| combined_text = combine_text_fields(df, text_columns, use_stemming=True) | |
| # Initialize TF-IDF vectorizer | |
| vectorizer = TfidfVectorizer( | |
| max_features=max_features, | |
| min_df=min_df, | |
| max_df=max_df, | |
| ngram_range=ngram_range, | |
| stop_words="english", | |
| lowercase=True, | |
| strip_accents="unicode", | |
| ) | |
| # Fit and transform | |
| print( | |
| f"Extracting TF-IDF features with max_features={max_features if max_features else 'All'}, " | |
| f"ngram_range={ngram_range}" | |
| ) | |
| tfidf_matrix = vectorizer.fit_transform(combined_text) | |
| print( | |
| f"Extracted {tfidf_matrix.shape[1]} TF-IDF features from {tfidf_matrix.shape[0]} samples" | |
| ) | |
| return tfidf_matrix.toarray(), vectorizer | |
| def extract_embedding_features( | |
| df: pd.DataFrame, | |
| text_columns: Optional[list] = None, | |
| model_name: str = "all-MiniLM-L6-v2", | |
| batch_size: int = 32, | |
| ) -> Tuple[np.ndarray, object]: | |
| """ | |
| Extract LLM embeddings from textual data using Sentence Transformers. | |
| Args: | |
| df: Input dataframe | |
| text_columns: List of text columns to use. If None, auto-detect. | |
| model_name: Name of the pre-trained model to use | |
| batch_size: Batch size for encoding | |
| Returns: | |
| Tuple of (feature matrix, model object) | |
| """ | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| except ImportError as e: | |
| raise ImportError( | |
| f"sentence-transformers import failed: {e}. Try running: pip install sentence-transformers" | |
| ) from e | |
| if text_columns is None: | |
| text_columns = get_text_columns(df) | |
| if not text_columns: | |
| raise ValueError("No text columns found in dataframe") | |
| # Combine text fields (without stemming for embeddings - LLMs need full words) | |
| print(f"Combining text from columns: {text_columns}") | |
| combined_text = combine_text_fields(df, text_columns, use_stemming=False) | |
| # Load model | |
| print(f"Loading embedding model: {model_name}") | |
| model = SentenceTransformer(model_name) | |
| # Encode | |
| print(f"Extracting embeddings for {len(combined_text)} samples...") | |
| embeddings = model.encode( | |
| combined_text.tolist(), | |
| batch_size=batch_size, | |
| show_progress_bar=True, | |
| convert_to_numpy=True, | |
| ) | |
| print(f"Extracted embeddings shape: {embeddings.shape}") | |
| return embeddings, model | |
| def prepare_labels(df: pd.DataFrame, label_columns: Optional[list] = None) -> pd.DataFrame: | |
| """ | |
| Prepare multi-label binary matrix from label columns. | |
| Args: | |
| df: Input dataframe | |
| label_columns: List of label columns. If None, auto-detect. | |
| Returns: | |
| DataFrame with binary labels (1 if label present, 0 otherwise) | |
| """ | |
| if label_columns is None: | |
| label_columns = get_label_columns(df) | |
| # Convert to binary: any value > 0 means label is present | |
| labels = (df[label_columns] > 0).astype(int) | |
| print(f"Prepared {len(label_columns)} labels") | |
| print(f"Label distribution:\n{labels.sum().describe()}") | |
| return labels | |
| def create_feature_dataset( | |
| db_path: Optional[Path] = None, | |
| save_processed: bool = True, | |
| feature_type: str = "tfidf", # 'tfidf' or 'embedding' | |
| model_name: str = "all-MiniLM-L6-v2", | |
| ) -> Tuple[np.ndarray, pd.DataFrame, list, list]: | |
| """ | |
| Main function to create the complete feature dataset. | |
| Args: | |
| db_path: Path to SQLite database | |
| save_processed: Whether to save processed data to disk | |
| feature_type: Type of features to extract ('tfidf' or 'embedding') | |
| model_name: Model name for embeddings (ignored if feature_type='tfidf') | |
| Returns: | |
| Tuple of (features, labels, feature_names, label_names) | |
| """ | |
| # Load data | |
| df = load_data_from_db(db_path) | |
| # Get dataset info | |
| info = get_dataset_info(df) | |
| print("\n=== Dataset Information ===") | |
| print(f"Total issues: {info['total_issues']:,}") | |
| print(f"Text columns: {info['text_columns']}") | |
| print(f"Number of labels: {info['num_labels']}") | |
| print(f"Avg labels per issue: {info['avg_labels_per_issue']:.2f}") | |
| print(f"Labels with no issues: {info['labels_with_no_issues']}") | |
| # Extract features | |
| text_columns = get_text_columns(df) | |
| label_columns = get_label_columns(df) | |
| feature_names = [] | |
| vectorizer = None | |
| if feature_type == "tfidf": | |
| features, vectorizer = extract_tfidf_features(df, text_columns=text_columns) | |
| feature_names = vectorizer.get_feature_names_out() | |
| elif feature_type == "embedding": | |
| features, _ = extract_embedding_features( | |
| df, text_columns=text_columns, model_name=model_name | |
| ) | |
| feature_names = [f"emb_{i}" for i in range(features.shape[1])] | |
| else: | |
| raise ValueError(f"Unknown feature_type: {feature_type}") | |
| # Prepare labels | |
| labels = prepare_labels(df, label_columns) | |
| # Save processed data | |
| if save_processed: | |
| # Path: processed/{feature_type}/ | |
| output_dir = PROCESSED_DATA_DIR / feature_type | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| features_path = output_dir / f"features_{feature_type}.npy" | |
| labels_path = output_dir / f"labels_{feature_type}.npy" | |
| np.save(features_path, features) | |
| np.save(labels_path, labels.values) | |
| print(f"\nSaved processed data to {output_dir}") | |
| print(f" - {features_path.name}: {features.shape}") | |
| print(f" - {labels_path.name}: {labels.shape}") | |
| # Save vectorizer and label names to models/ directory for inference | |
| MODELS_DIR.mkdir(parents=True, exist_ok=True) | |
| if feature_type == "tfidf" and vectorizer is not None: | |
| vectorizer_path = MODELS_DIR / "tfidf_vectorizer.pkl" | |
| joblib.dump(vectorizer, vectorizer_path) | |
| print(f" - Saved TF-IDF vectorizer to: {vectorizer_path}") | |
| # Always save label names (needed for both tfidf and embedding inference) | |
| label_names_path = MODELS_DIR / "label_names.pkl" | |
| joblib.dump(label_columns, label_names_path) | |
| print(f" - Saved {len(label_columns)} label names to: {label_names_path}") | |
| return features, labels, feature_names, label_columns | |
| def load_processed_data( | |
| feature_name: str = "tfidf", data_dir: Optional[Path] = None | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Load processed features and labels from disk. | |
| Args: | |
| feature_name: Name prefix of the features to load (e.g., 'tfidf', 'bow', 'embeddings') | |
| data_dir: Path to processed data directory. If None, uses default. | |
| Returns: | |
| Tuple of (features, labels) | |
| """ | |
| if data_dir is None: | |
| data_dir = PROCESSED_DATA_DIR | |
| features_path = data_dir / f"features_{feature_name}.npy" | |
| labels_path = data_dir / f"labels_{feature_name}.npy" | |
| features = np.load(features_path) | |
| labels = np.load(labels_path) | |
| print(f"Loaded processed data from {data_dir}") | |
| print(f" - Feature type: {feature_name}") | |
| print(f" - Features shape: {features.shape}") | |
| print(f" - Labels shape: {labels.shape}") | |
| return features, labels | |
| if __name__ == "__main__": | |
| features, labels, feature_names, label_names = create_feature_dataset(feature_type="embedding") | |
| print("\n=== Feature Extraction Summary ===") | |
| print(f"Features shape: {features.shape}") | |
| print(f"Labels shape: {labels.shape}") | |
| print(f"Number of feature names: {len(feature_names)}") | |
| print(f"Number of labels: {len(label_names)}") | |