|
|
""" |
|
|
Feature extraction module for skill classification. |
|
|
|
|
|
This module provides functions to extract features from the SkillScope dataset, |
|
|
starting with TF-IDF vectorization of textual data from pull request issues. |
|
|
|
|
|
Dataset Information (from nlbse_tool_competition_data_by_issue): |
|
|
- 7,154 issues from 11 Java repositories |
|
|
- 226 total columns: |
|
|
- 2 text columns: 'issue text' (title) and 'issue description' (body) |
|
|
- metadata and other columns containing PR/file/context information |
|
|
- 217 label columns: domain/subdomain skill labels (142 active labels in this DB) |
|
|
|
|
|
Label Characteristics: |
|
|
- Multi-label classification problem |
|
|
- Average 32.9 labels per issue (median: 31) |
|
|
- Highly imbalanced: some labels appear in all issues, others in very few |
|
|
- Top labels: Language, Data Structure, DevOps, Error Handling |
|
|
""" |
|
|
|
|
|
from pathlib import Path |
|
|
import re |
|
|
import sqlite3 |
|
|
from typing import Optional, Tuple |
|
|
|
|
|
import joblib |
|
|
|
|
|
|
|
|
from nltk.stem import PorterStemmer |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
|
|
|
from hopcroft_skill_classification_tool_competition.config import ( |
|
|
MODELS_DIR, |
|
|
PROCESSED_DATA_DIR, |
|
|
RAW_DATA_DIR, |
|
|
) |
|
|
|
|
|
|
|
|
stemmer = PorterStemmer() |
|
|
|
|
|
|
|
|
def clean_github_text(text: str, use_stemming: bool = True) -> str: |
|
|
""" |
|
|
Clean GitHub issue text as per SkillScope paper (Aracena et al. process). |
|
|
Removes emojis, URLs, HTML tags, and other noise commonly found in GitHub text. |
|
|
Optionally applies stemming. |
|
|
|
|
|
Args: |
|
|
text: Raw text from GitHub issue |
|
|
use_stemming: If True, apply Porter stemming (recommended for TF-IDF). |
|
|
If False, keep original words (recommended for Embeddings/LLMs). |
|
|
|
|
|
Returns: |
|
|
Cleaned text string (stemmed if use_stemming=True) |
|
|
""" |
|
|
if pd.isna(text) or text is None: |
|
|
return "" |
|
|
|
|
|
text = str(text) |
|
|
|
|
|
|
|
|
text = re.sub(r"http\S+|www\.\S+", "", text) |
|
|
|
|
|
|
|
|
text = re.sub(r"<[^>]+>", "", text) |
|
|
|
|
|
|
|
|
text = re.sub(r"```[\s\S]*?```", "", text) |
|
|
|
|
|
|
|
|
text = re.sub(r"`[^`]*`", "", text) |
|
|
|
|
|
|
|
|
text = text.encode("ascii", "ignore").decode("ascii") |
|
|
|
|
|
|
|
|
text = re.sub(r"\s+", " ", text) |
|
|
|
|
|
text = text.strip() |
|
|
|
|
|
|
|
|
if use_stemming: |
|
|
try: |
|
|
tokens = text.split() |
|
|
stemmed_tokens = [stemmer.stem(token) for token in tokens] |
|
|
text = " ".join(stemmed_tokens) |
|
|
except Exception as e: |
|
|
print(f"Warning: Stemming failed for text snippet '{text[:50]}...'. Error: {e}") |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
return text |
|
|
|
|
|
|
|
|
def get_dataset_info(df: pd.DataFrame) -> dict: |
|
|
""" |
|
|
Get summary information about the dataset. |
|
|
|
|
|
Args: |
|
|
df: Input dataframe |
|
|
|
|
|
Returns: |
|
|
Dictionary containing dataset statistics |
|
|
""" |
|
|
text_cols = get_text_columns(df) |
|
|
label_cols = get_label_columns(df) |
|
|
|
|
|
|
|
|
binary_labels = (df[label_cols] > 0).astype(int) |
|
|
labels_per_issue = binary_labels.sum(axis=1) |
|
|
issues_per_label = binary_labels.sum(axis=0) |
|
|
|
|
|
info = { |
|
|
"total_issues": len(df), |
|
|
"total_columns": len(df.columns), |
|
|
"text_columns": text_cols, |
|
|
"num_text_columns": len(text_cols), |
|
|
"label_columns": label_cols, |
|
|
"num_labels": len(label_cols), |
|
|
"avg_labels_per_issue": labels_per_issue.mean(), |
|
|
"median_labels_per_issue": labels_per_issue.median(), |
|
|
"max_labels_per_issue": labels_per_issue.max(), |
|
|
"min_labels_per_issue": labels_per_issue.min(), |
|
|
"avg_issues_per_label": issues_per_label.mean(), |
|
|
"labels_with_no_issues": (issues_per_label == 0).sum(), |
|
|
} |
|
|
|
|
|
return info |
|
|
|
|
|
|
|
|
def load_data_from_db(db_path: Optional[Path] = None) -> pd.DataFrame: |
|
|
""" |
|
|
Load data from the SQLite database. |
|
|
|
|
|
Args: |
|
|
db_path: Path to the SQLite database file. |
|
|
If None, uses default path in data/raw/skillscope_data.db |
|
|
|
|
|
Returns: |
|
|
DataFrame containing the nlbse_tool_competition_data_by_issue table |
|
|
""" |
|
|
if db_path is None: |
|
|
db_path = RAW_DATA_DIR / "skillscope_data.db" |
|
|
|
|
|
conn = sqlite3.connect(db_path) |
|
|
|
|
|
|
|
|
query = "SELECT * FROM nlbse_tool_competition_data_by_issue" |
|
|
df = pd.read_sql_query(query, conn) |
|
|
|
|
|
conn.close() |
|
|
|
|
|
print(f"Loaded {len(df)} records from database") |
|
|
return df |
|
|
|
|
|
|
|
|
def get_text_columns(df: pd.DataFrame) -> list: |
|
|
""" |
|
|
Identify text columns in the dataframe (typically issue title, body, etc.). |
|
|
|
|
|
Args: |
|
|
df: Input dataframe |
|
|
|
|
|
Returns: |
|
|
List of column names containing textual data |
|
|
""" |
|
|
|
|
|
|
|
|
text_cols = ["issue text", "issue description"] |
|
|
|
|
|
return [col for col in text_cols if col in df.columns] |
|
|
|
|
|
|
|
|
def get_label_columns(df: pd.DataFrame) -> list: |
|
|
""" |
|
|
Identify label columns (domains/subdomains with API counts). |
|
|
|
|
|
Args: |
|
|
df: Input dataframe |
|
|
|
|
|
Returns: |
|
|
List of column names containing labels |
|
|
""" |
|
|
|
|
|
|
|
|
exclude_cols = [ |
|
|
"Repo Name", |
|
|
"PR #", |
|
|
"issue text", |
|
|
"issue description", |
|
|
"created_at", |
|
|
"author_name", |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
from pandas.api.types import is_numeric_dtype |
|
|
|
|
|
label_cols = [ |
|
|
col for col in df.columns if col not in exclude_cols and is_numeric_dtype(df[col]) |
|
|
] |
|
|
|
|
|
return label_cols |
|
|
|
|
|
|
|
|
def combine_text_fields( |
|
|
df: pd.DataFrame, text_columns: list, use_stemming: bool = True |
|
|
) -> pd.Series: |
|
|
""" |
|
|
Combine multiple text fields into a single text representation. |
|
|
Applies text cleaning as per SkillScope paper. |
|
|
|
|
|
Args: |
|
|
df: Input dataframe |
|
|
text_columns: List of column names to combine |
|
|
use_stemming: If True, apply stemming (for TF-IDF). If False, keep original words (for Embeddings). |
|
|
|
|
|
Returns: |
|
|
Series containing cleaned and combined text for each row |
|
|
""" |
|
|
|
|
|
combined_text = ( |
|
|
df[text_columns] |
|
|
.fillna("") |
|
|
.astype(str) |
|
|
.apply( |
|
|
lambda x: " ".join( |
|
|
x.map(lambda text: clean_github_text(text, use_stemming=use_stemming)) |
|
|
), |
|
|
axis=1, |
|
|
) |
|
|
) |
|
|
return combined_text |
|
|
|
|
|
|
|
|
def extract_tfidf_features( |
|
|
df: pd.DataFrame, |
|
|
text_columns: Optional[list] = None, |
|
|
max_features: Optional[int] = 2000, |
|
|
min_df: int = 2, |
|
|
max_df: float = 0.95, |
|
|
ngram_range: Tuple[int, int] = (1, 2), |
|
|
) -> Tuple[np.ndarray, TfidfVectorizer]: |
|
|
""" |
|
|
Extract TF-IDF features from textual data. |
|
|
|
|
|
Args: |
|
|
df: Input dataframe |
|
|
text_columns: List of text columns to use. If None, auto-detect. |
|
|
max_features: Maximum number of features to extract (default: 2000 for balanced sparsity) |
|
|
min_df: Minimum document frequency for a term to be included |
|
|
max_df: Maximum document frequency (ignore terms appearing in >max_df of docs) |
|
|
ngram_range: Range of n-grams to consider (e.g., (1,2) for unigrams and bigrams) |
|
|
|
|
|
Returns: |
|
|
Tuple of (feature matrix, fitted vectorizer) |
|
|
""" |
|
|
if text_columns is None: |
|
|
text_columns = get_text_columns(df) |
|
|
|
|
|
if not text_columns: |
|
|
raise ValueError("No text columns found in dataframe") |
|
|
|
|
|
|
|
|
print(f"Combining text from columns: {text_columns}") |
|
|
combined_text = combine_text_fields(df, text_columns, use_stemming=True) |
|
|
|
|
|
|
|
|
vectorizer = TfidfVectorizer( |
|
|
max_features=max_features, |
|
|
min_df=min_df, |
|
|
max_df=max_df, |
|
|
ngram_range=ngram_range, |
|
|
stop_words="english", |
|
|
lowercase=True, |
|
|
strip_accents="unicode", |
|
|
) |
|
|
|
|
|
|
|
|
print( |
|
|
f"Extracting TF-IDF features with max_features={max_features if max_features else 'All'}, " |
|
|
f"ngram_range={ngram_range}" |
|
|
) |
|
|
tfidf_matrix = vectorizer.fit_transform(combined_text) |
|
|
|
|
|
print( |
|
|
f"Extracted {tfidf_matrix.shape[1]} TF-IDF features from {tfidf_matrix.shape[0]} samples" |
|
|
) |
|
|
|
|
|
return tfidf_matrix.toarray(), vectorizer |
|
|
|
|
|
|
|
|
def extract_embedding_features( |
|
|
df: pd.DataFrame, |
|
|
text_columns: Optional[list] = None, |
|
|
model_name: str = "all-MiniLM-L6-v2", |
|
|
batch_size: int = 32, |
|
|
) -> Tuple[np.ndarray, object]: |
|
|
""" |
|
|
Extract LLM embeddings from textual data using Sentence Transformers. |
|
|
|
|
|
Args: |
|
|
df: Input dataframe |
|
|
text_columns: List of text columns to use. If None, auto-detect. |
|
|
model_name: Name of the pre-trained model to use |
|
|
batch_size: Batch size for encoding |
|
|
|
|
|
Returns: |
|
|
Tuple of (feature matrix, model object) |
|
|
""" |
|
|
try: |
|
|
from sentence_transformers import SentenceTransformer |
|
|
except ImportError as e: |
|
|
raise ImportError( |
|
|
f"sentence-transformers import failed: {e}. Try running: pip install sentence-transformers" |
|
|
) from e |
|
|
|
|
|
if text_columns is None: |
|
|
text_columns = get_text_columns(df) |
|
|
|
|
|
if not text_columns: |
|
|
raise ValueError("No text columns found in dataframe") |
|
|
|
|
|
|
|
|
print(f"Combining text from columns: {text_columns}") |
|
|
combined_text = combine_text_fields(df, text_columns, use_stemming=False) |
|
|
|
|
|
|
|
|
print(f"Loading embedding model: {model_name}") |
|
|
model = SentenceTransformer(model_name) |
|
|
|
|
|
|
|
|
print(f"Extracting embeddings for {len(combined_text)} samples...") |
|
|
embeddings = model.encode( |
|
|
combined_text.tolist(), |
|
|
batch_size=batch_size, |
|
|
show_progress_bar=True, |
|
|
convert_to_numpy=True, |
|
|
) |
|
|
|
|
|
print(f"Extracted embeddings shape: {embeddings.shape}") |
|
|
|
|
|
return embeddings, model |
|
|
|
|
|
|
|
|
def prepare_labels(df: pd.DataFrame, label_columns: Optional[list] = None) -> pd.DataFrame: |
|
|
""" |
|
|
Prepare multi-label binary matrix from label columns. |
|
|
|
|
|
Args: |
|
|
df: Input dataframe |
|
|
label_columns: List of label columns. If None, auto-detect. |
|
|
|
|
|
Returns: |
|
|
DataFrame with binary labels (1 if label present, 0 otherwise) |
|
|
""" |
|
|
if label_columns is None: |
|
|
label_columns = get_label_columns(df) |
|
|
|
|
|
|
|
|
labels = (df[label_columns] > 0).astype(int) |
|
|
|
|
|
print(f"Prepared {len(label_columns)} labels") |
|
|
print(f"Label distribution:\n{labels.sum().describe()}") |
|
|
|
|
|
return labels |
|
|
|
|
|
|
|
|
def create_feature_dataset( |
|
|
db_path: Optional[Path] = None, |
|
|
save_processed: bool = True, |
|
|
feature_type: str = "tfidf", |
|
|
model_name: str = "all-MiniLM-L6-v2", |
|
|
) -> Tuple[np.ndarray, pd.DataFrame, list, list]: |
|
|
""" |
|
|
Main function to create the complete feature dataset. |
|
|
|
|
|
Args: |
|
|
db_path: Path to SQLite database |
|
|
save_processed: Whether to save processed data to disk |
|
|
feature_type: Type of features to extract ('tfidf' or 'embedding') |
|
|
model_name: Model name for embeddings (ignored if feature_type='tfidf') |
|
|
|
|
|
Returns: |
|
|
Tuple of (features, labels, feature_names, label_names) |
|
|
""" |
|
|
|
|
|
df = load_data_from_db(db_path) |
|
|
|
|
|
|
|
|
info = get_dataset_info(df) |
|
|
print("\n=== Dataset Information ===") |
|
|
print(f"Total issues: {info['total_issues']:,}") |
|
|
print(f"Text columns: {info['text_columns']}") |
|
|
print(f"Number of labels: {info['num_labels']}") |
|
|
print(f"Avg labels per issue: {info['avg_labels_per_issue']:.2f}") |
|
|
print(f"Labels with no issues: {info['labels_with_no_issues']}") |
|
|
|
|
|
|
|
|
text_columns = get_text_columns(df) |
|
|
label_columns = get_label_columns(df) |
|
|
|
|
|
feature_names = [] |
|
|
|
|
|
vectorizer = None |
|
|
|
|
|
if feature_type == "tfidf": |
|
|
features, vectorizer = extract_tfidf_features(df, text_columns=text_columns) |
|
|
feature_names = vectorizer.get_feature_names_out() |
|
|
elif feature_type == "embedding": |
|
|
features, _ = extract_embedding_features( |
|
|
df, text_columns=text_columns, model_name=model_name |
|
|
) |
|
|
feature_names = [f"emb_{i}" for i in range(features.shape[1])] |
|
|
else: |
|
|
raise ValueError(f"Unknown feature_type: {feature_type}") |
|
|
|
|
|
|
|
|
labels = prepare_labels(df, label_columns) |
|
|
|
|
|
|
|
|
if save_processed: |
|
|
|
|
|
output_dir = PROCESSED_DATA_DIR / feature_type |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
features_path = output_dir / f"features_{feature_type}.npy" |
|
|
labels_path = output_dir / f"labels_{feature_type}.npy" |
|
|
|
|
|
np.save(features_path, features) |
|
|
np.save(labels_path, labels.values) |
|
|
|
|
|
print(f"\nSaved processed data to {output_dir}") |
|
|
print(f" - {features_path.name}: {features.shape}") |
|
|
print(f" - {labels_path.name}: {labels.shape}") |
|
|
|
|
|
|
|
|
MODELS_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
if feature_type == "tfidf" and vectorizer is not None: |
|
|
vectorizer_path = MODELS_DIR / "tfidf_vectorizer.pkl" |
|
|
joblib.dump(vectorizer, vectorizer_path) |
|
|
print(f" - Saved TF-IDF vectorizer to: {vectorizer_path}") |
|
|
|
|
|
|
|
|
label_names_path = MODELS_DIR / "label_names.pkl" |
|
|
joblib.dump(label_columns, label_names_path) |
|
|
print(f" - Saved {len(label_columns)} label names to: {label_names_path}") |
|
|
|
|
|
return features, labels, feature_names, label_columns |
|
|
|
|
|
|
|
|
def load_processed_data( |
|
|
feature_name: str = "tfidf", data_dir: Optional[Path] = None |
|
|
) -> Tuple[np.ndarray, np.ndarray]: |
|
|
""" |
|
|
Load processed features and labels from disk. |
|
|
|
|
|
Args: |
|
|
feature_name: Name prefix of the features to load (e.g., 'tfidf', 'bow', 'embeddings') |
|
|
data_dir: Path to processed data directory. If None, uses default. |
|
|
|
|
|
Returns: |
|
|
Tuple of (features, labels) |
|
|
""" |
|
|
if data_dir is None: |
|
|
data_dir = PROCESSED_DATA_DIR |
|
|
|
|
|
features_path = data_dir / f"features_{feature_name}.npy" |
|
|
labels_path = data_dir / f"labels_{feature_name}.npy" |
|
|
|
|
|
features = np.load(features_path) |
|
|
labels = np.load(labels_path) |
|
|
|
|
|
print(f"Loaded processed data from {data_dir}") |
|
|
print(f" - Feature type: {feature_name}") |
|
|
print(f" - Features shape: {features.shape}") |
|
|
print(f" - Labels shape: {labels.shape}") |
|
|
|
|
|
return features, labels |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
features, labels, feature_names, label_names = create_feature_dataset(feature_type="embedding") |
|
|
|
|
|
print("\n=== Feature Extraction Summary ===") |
|
|
print(f"Features shape: {features.shape}") |
|
|
print(f"Labels shape: {labels.shape}") |
|
|
print(f"Number of feature names: {len(feature_names)}") |
|
|
print(f"Number of labels: {len(label_names)}") |
|
|
|