| """
|
| Length-aware gating ensemble: CodeBERTfinetune + XGBoost predictor
|
| """
|
|
|
| import torch
|
| import joblib
|
| import numpy as np
|
| import re
|
| import warnings
|
| from transformers import AutoTokenizer, AutoModelForSequenceClassification
|
| from sklearn.feature_extraction.text import TfidfVectorizer
|
| from scipy.sparse import hstack
|
| from pathlib import Path
|
|
|
| warnings.filterwarnings("ignore")
|
|
|
|
|
| LABELS = {'Functional': 0, 'Non-Paradigm': 1, 'Oop': 2, 'Procedural': 3}
|
| LABEL_TO_NAME = {v: k for k, v in LABELS.items()}
|
|
|
|
|
| class FeatureExtractor:
|
| """Same feature extractor used during training"""
|
| def __init__(self):
|
| self.oop_kw = ['class', 'object', 'this', 'self', 'extends', 'implements', 'interface',
|
| 'public', 'private', 'protected', 'static', 'virtual', 'override']
|
| self.fp_kw = ['map', 'filter', 'reduce', 'fold', 'lambda', 'closure', '=>',
|
| 'monad', 'functor', 'pure', 'immutable', 'const', 'let']
|
| self.proc_kw = ['void', 'int', 'char', 'float', 'struct', 'malloc', 'free',
|
| 'pointer', 'goto', 'scanf', 'printf']
|
|
|
| def extract(self, text):
|
| t = text.lower()
|
| return {
|
| 'oop_score': sum(t.count(k) for k in self.oop_kw),
|
| 'fp_score': sum(t.count(k) for k in self.fp_kw),
|
| 'proc_score': sum(t.count(k) for k in self.proc_kw),
|
| 'length': len(text),
|
| 'num_lines': text.count('\n') + 1,
|
| 'has_class': 1 if re.search(r'\bclass\s+\w+', t) else 0,
|
| 'has_lambda': 1 if 'lambda' in t or '=>' in text else 0,
|
| 'num_dots': text.count('.'),
|
| 'num_arrows': text.count('->') + text.count('=>'),
|
| 'num_braces': text.count('{') + text.count('}')
|
| }
|
|
|
|
|
| class EnsemblePredictor:
|
| def __init__(self, codebert_path, xgb_model_path, tfidf_path=None):
|
| """
|
| Initialize the ensemble predictor.
|
|
|
| Args:
|
| codebert_path: Path to CodeBERT model directory
|
| xgb_model_path: Path to saved XGBoost model (REQUIRED)
|
| tfidf_path: Path to saved TF-IDF vectorizer (REQUIRED for XGBoost features)
|
| """
|
| self.feature_extractor = FeatureExtractor()
|
|
|
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
| print(f"Using device: {self.device}")
|
|
|
| print("Loading CodeBERT model...")
|
| self.tokenizer = AutoTokenizer.from_pretrained(codebert_path)
|
| self.codebert = AutoModelForSequenceClassification.from_pretrained(codebert_path)
|
| self.codebert.eval()
|
| self.codebert.to(self.device)
|
| print("CodeBERT fine-tuned model loaded successfully\n")
|
|
|
| if tfidf_path is None:
|
| tfidf_path = Path(__file__).parent / "tfidf_vectorizer.pkl"
|
|
|
| tfidf_path = Path(tfidf_path)
|
| if not tfidf_path.exists():
|
| raise FileNotFoundError(f"TF-IDF vectorizer NOT FOUND: {tfidf_path}\n")
|
|
|
|
|
| print(f"Loading TF-IDF vectorizer from {tfidf_path}...")
|
| self.tfidf = joblib.load(str(tfidf_path))
|
| print("TF-IDF vectorizer loaded successfully\n")
|
|
|
|
|
| xgb_path = Path(xgb_model_path)
|
| if not xgb_path.exists():
|
| raise FileNotFoundError(f"XGBoost model NOT FOUND: {xgb_model_path}")
|
|
|
| print(f"Loading XGBoost model from {xgb_model_path}...")
|
| try:
|
| self.xgb_model = joblib.load(str(xgb_path))
|
| print("XGBoost model loaded successfully\n")
|
| except Exception as e:
|
| raise RuntimeError(f"Failed to load XGBoost model: {e}")
|
|
|
| def get_codebert_proba(self, text):
|
| """Get probability predictions from CodeBERT"""
|
| with torch.no_grad():
|
| inputs = self.tokenizer(
|
| text,
|
| return_tensors="pt",
|
| padding=True,
|
| truncation=True,
|
| max_length=256
|
| )
|
| inputs = {k: v.to(self.device) for k, v in inputs.items()}
|
|
|
| outputs = self.codebert(**inputs)
|
| logits = outputs.logits
|
| proba = torch.softmax(logits, dim=-1).cpu().numpy()[0]
|
|
|
| return proba
|
|
|
| def get_xgb_proba(self, text):
|
| """Get probability predictions from XGBoost."""
|
| features = self._extract_features(text)
|
| proba = self.xgb_model.predict_proba(features)[0]
|
| return proba
|
|
|
| def _extract_features(self, text):
|
| """
|
| Extract features using same pipeline as training:
|
| TF-IDF (1000 features) + Handcrafted Features (10 features) = 1010 total
|
| NO CodeBERT embeddings
|
| """
|
| tfidf_vec = self.tfidf.transform([text])
|
|
|
| handcrafted_feats = self.feature_extractor.extract(text)
|
| handcrafted_vec = np.array(list(handcrafted_feats.values()), dtype=np.float32).reshape(1, -1)
|
|
|
|
|
| combined = hstack([tfidf_vec, handcrafted_vec])
|
|
|
| return combined
|
|
|
| def predict(self, text):
|
| """
|
| Predict using length-aware gating ensemble.
|
|
|
| Args:
|
| text: Input code/text string
|
|
|
| Returns:
|
| Dictionary with probabilities, ensembled prediction, and paradigm label
|
| """
|
| tokens = self.tokenizer.tokenize(text)
|
| length = len(tokens)
|
|
|
|
|
| codebert_probas = self.get_codebert_proba(text)
|
| cb_pred_class = np.argmax(codebert_probas)
|
|
|
|
|
| xgb_probas = self.get_xgb_proba(text)
|
| xgb_pred_class = np.argmax(xgb_probas)
|
|
|
|
|
| if length < 60:
|
| weight_info = "Short (CodeBERT 80% + XGB 20%)"
|
| cb_weight = 0.8
|
| xgb_weight = 0.2
|
| elif length > 150:
|
| weight_info = "Long (CodeBERT 50% + XGB 50%)"
|
| cb_weight = 0.5
|
| xgb_weight = 0.5
|
| else:
|
| weight_info = "Medium (CodeBERT 65% + XGB 35%)"
|
| cb_weight = 0.65
|
| xgb_weight = 0.35
|
|
|
|
|
| ensemble_probas = cb_weight * codebert_probas + xgb_weight * xgb_probas
|
| ensemble_probas = ensemble_probas / ensemble_probas.sum()
|
|
|
| predicted_class = np.argmax(ensemble_probas)
|
| predicted_label = LABEL_TO_NAME[predicted_class]
|
|
|
|
|
| print("\n" + "="*60)
|
| print("DEBUG: Model Outputs")
|
| print("="*60)
|
| print(f"Token length: {length}")
|
| print(f"Weights: CB={cb_weight:.2f}, XGB={xgb_weight:.2f}\n")
|
|
|
| print("CodeBERT class probabilities:")
|
| for i, prob in enumerate(codebert_probas):
|
| print(f" {LABEL_TO_NAME[i]:15s}: {prob:.4f}")
|
| print(f" → Predicted: {LABEL_TO_NAME[cb_pred_class]}\n")
|
|
|
| print("XGBoost class probabilities:")
|
| for i, prob in enumerate(xgb_probas):
|
| print(f" {LABEL_TO_NAME[i]:15s}: {prob:.4f}")
|
| print(f" → Predicted: {LABEL_TO_NAME[xgb_pred_class]}\n")
|
|
|
| print("Ensemble class probabilities:")
|
| for i, prob in enumerate(ensemble_probas):
|
| marker = " ← FINAL" if i == predicted_class else ""
|
| print(f" {LABEL_TO_NAME[i]:15s}: {prob:.4f}{marker}")
|
| print("="*60 + "\n")
|
|
|
| return {
|
| "length": length,
|
| "weight_info": weight_info,
|
| "codebert_class_probas": {LABEL_TO_NAME[i]: round(float(codebert_probas[i]), 4) for i in range(len(codebert_probas))},
|
| "codebert_pred_class": LABEL_TO_NAME[cb_pred_class],
|
| "xgb_class_probas": {LABEL_TO_NAME[i]: round(float(xgb_probas[i]), 4) for i in range(len(xgb_probas))},
|
| "xgb_pred_class": LABEL_TO_NAME[xgb_pred_class],
|
| "ensemble_class_probas": {LABEL_TO_NAME[i]: round(float(ensemble_probas[i]), 4) for i in range(len(ensemble_probas))},
|
| "predicted_class": predicted_class,
|
| "predicted_label": predicted_label,
|
| "confidence": round(float(ensemble_probas[predicted_class]), 4)
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
| if __name__ == "__main__":
|
|
|
|
|
| text_input = """Increment the value of a pointer when sent as a parameter I am stuck in the following pointer problem: Say you have a function: void Function (unsigned char *ubPointer) { ubPointer++; } int main (void) { unsigned char *PointerX; Function( PointerX ); } What I want is that the ++ is reflected in PointerX, without declaring it as a global variable. Thank you very much.
|
|
|
| """
|
|
|
|
|
|
|
| codebert_path = Path(__file__).parent / "codebert_model"
|
| xgb_model_path = Path(__file__).parent / "xgboost_model.pkl"
|
| tfidf_path = Path(__file__).parent / "tfidf_vectorizer.pkl"
|
|
|
| try:
|
| print("="*60)
|
| print("ENSEMBLE PREDICTOR - CodeBERT + XGBoost")
|
| print("(Training Pipeline: TF-IDF + Handcrafted Features)")
|
| print("="*60 + "\n")
|
|
|
| predictor = EnsemblePredictor(
|
| codebert_path=str(codebert_path),
|
| xgb_model_path=str(xgb_model_path),
|
| tfidf_path=str(tfidf_path)
|
| )
|
|
|
| print(f"Input text: {repr(text_input)}\n")
|
|
|
| result = predictor.predict(text_input)
|
|
|
| print("="*60)
|
| print("FINAL RESULTS")
|
| print("="*60)
|
| print(f"Weighting: {result['weight_info']}")
|
| print(f"\nFINAL PREDICTION: {result['predicted_label'].upper()}")
|
| print(f"Confidence: {result['confidence']:.4f}")
|
| print("="*60)
|
|
|
| except Exception as e:
|
| print(f"Error: {e}")
|
| import traceback
|
| traceback.print_exc()
|
|
|