""" Length-aware gating ensemble: CodeBERTfinetune + XGBoost predictor """ import torch import joblib import numpy as np import re import warnings from transformers import AutoTokenizer, AutoModelForSequenceClassification from sklearn.feature_extraction.text import TfidfVectorizer from scipy.sparse import hstack from pathlib import Path warnings.filterwarnings("ignore") LABELS = {'Functional': 0, 'Non-Paradigm': 1, 'Oop': 2, 'Procedural': 3} LABEL_TO_NAME = {v: k for k, v in LABELS.items()} class FeatureExtractor: """Same feature extractor used during training""" def __init__(self): self.oop_kw = ['class', 'object', 'this', 'self', 'extends', 'implements', 'interface', 'public', 'private', 'protected', 'static', 'virtual', 'override'] self.fp_kw = ['map', 'filter', 'reduce', 'fold', 'lambda', 'closure', '=>', 'monad', 'functor', 'pure', 'immutable', 'const', 'let'] self.proc_kw = ['void', 'int', 'char', 'float', 'struct', 'malloc', 'free', 'pointer', 'goto', 'scanf', 'printf'] def extract(self, text): t = text.lower() return { 'oop_score': sum(t.count(k) for k in self.oop_kw), 'fp_score': sum(t.count(k) for k in self.fp_kw), 'proc_score': sum(t.count(k) for k in self.proc_kw), 'length': len(text), 'num_lines': text.count('\n') + 1, 'has_class': 1 if re.search(r'\bclass\s+\w+', t) else 0, 'has_lambda': 1 if 'lambda' in t or '=>' in text else 0, 'num_dots': text.count('.'), 'num_arrows': text.count('->') + text.count('=>'), 'num_braces': text.count('{') + text.count('}') } class EnsemblePredictor: def __init__(self, codebert_path, xgb_model_path, tfidf_path=None): """ Initialize the ensemble predictor. Args: codebert_path: Path to CodeBERT model directory xgb_model_path: Path to saved XGBoost model (REQUIRED) tfidf_path: Path to saved TF-IDF vectorizer (REQUIRED for XGBoost features) """ self.feature_extractor = FeatureExtractor() self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {self.device}") print("Loading CodeBERT model...") self.tokenizer = AutoTokenizer.from_pretrained(codebert_path) self.codebert = AutoModelForSequenceClassification.from_pretrained(codebert_path) self.codebert.eval() self.codebert.to(self.device) print("CodeBERT fine-tuned model loaded successfully\n") if tfidf_path is None: tfidf_path = Path(__file__).parent / "tfidf_vectorizer.pkl" tfidf_path = Path(tfidf_path) if not tfidf_path.exists(): raise FileNotFoundError(f"TF-IDF vectorizer NOT FOUND: {tfidf_path}\n") print(f"Loading TF-IDF vectorizer from {tfidf_path}...") self.tfidf = joblib.load(str(tfidf_path)) print("TF-IDF vectorizer loaded successfully\n") # Load XGBoost xgb_path = Path(xgb_model_path) if not xgb_path.exists(): raise FileNotFoundError(f"XGBoost model NOT FOUND: {xgb_model_path}") print(f"Loading XGBoost model from {xgb_model_path}...") try: self.xgb_model = joblib.load(str(xgb_path)) print("XGBoost model loaded successfully\n") except Exception as e: raise RuntimeError(f"Failed to load XGBoost model: {e}") def get_codebert_proba(self, text): """Get probability predictions from CodeBERT""" with torch.no_grad(): inputs = self.tokenizer( text, return_tensors="pt", padding=True, truncation=True, max_length=256 ) inputs = {k: v.to(self.device) for k, v in inputs.items()} outputs = self.codebert(**inputs) logits = outputs.logits proba = torch.softmax(logits, dim=-1).cpu().numpy()[0] return proba def get_xgb_proba(self, text): """Get probability predictions from XGBoost.""" features = self._extract_features(text) proba = self.xgb_model.predict_proba(features)[0] return proba def _extract_features(self, text): """ Extract features using same pipeline as training: TF-IDF (1000 features) + Handcrafted Features (10 features) = 1010 total NO CodeBERT embeddings """ tfidf_vec = self.tfidf.transform([text]) # Returns sparse matrix [1, 1000] handcrafted_feats = self.feature_extractor.extract(text) handcrafted_vec = np.array(list(handcrafted_feats.values()), dtype=np.float32).reshape(1, -1) # Stack: TF-IDF + handcrafted combined = hstack([tfidf_vec, handcrafted_vec]) return combined def predict(self, text): """ Predict using length-aware gating ensemble. Args: text: Input code/text string Returns: Dictionary with probabilities, ensembled prediction, and paradigm label """ tokens = self.tokenizer.tokenize(text) length = len(tokens) # Get CodeBERT probabilities codebert_probas = self.get_codebert_proba(text) cb_pred_class = np.argmax(codebert_probas) # Get XGBoost probabilities xgb_probas = self.get_xgb_proba(text) xgb_pred_class = np.argmax(xgb_probas) # Length-aware gating if length < 60: weight_info = "Short (CodeBERT 80% + XGB 20%)" cb_weight = 0.8 xgb_weight = 0.2 elif length > 150: weight_info = "Long (CodeBERT 50% + XGB 50%)" cb_weight = 0.5 xgb_weight = 0.5 else: weight_info = "Medium (CodeBERT 65% + XGB 35%)" cb_weight = 0.65 xgb_weight = 0.35 # Weighted average of two probability distributions ensemble_probas = cb_weight * codebert_probas + xgb_weight * xgb_probas ensemble_probas = ensemble_probas / ensemble_probas.sum() # Normalize predicted_class = np.argmax(ensemble_probas) predicted_label = LABEL_TO_NAME[predicted_class] # Debug prints print("\n" + "="*60) print("DEBUG: Model Outputs") print("="*60) print(f"Token length: {length}") print(f"Weights: CB={cb_weight:.2f}, XGB={xgb_weight:.2f}\n") print("CodeBERT class probabilities:") for i, prob in enumerate(codebert_probas): print(f" {LABEL_TO_NAME[i]:15s}: {prob:.4f}") print(f" → Predicted: {LABEL_TO_NAME[cb_pred_class]}\n") print("XGBoost class probabilities:") for i, prob in enumerate(xgb_probas): print(f" {LABEL_TO_NAME[i]:15s}: {prob:.4f}") print(f" → Predicted: {LABEL_TO_NAME[xgb_pred_class]}\n") print("Ensemble class probabilities:") for i, prob in enumerate(ensemble_probas): marker = " ← FINAL" if i == predicted_class else "" print(f" {LABEL_TO_NAME[i]:15s}: {prob:.4f}{marker}") print("="*60 + "\n") return { "length": length, "weight_info": weight_info, "codebert_class_probas": {LABEL_TO_NAME[i]: round(float(codebert_probas[i]), 4) for i in range(len(codebert_probas))}, "codebert_pred_class": LABEL_TO_NAME[cb_pred_class], "xgb_class_probas": {LABEL_TO_NAME[i]: round(float(xgb_probas[i]), 4) for i in range(len(xgb_probas))}, "xgb_pred_class": LABEL_TO_NAME[xgb_pred_class], "ensemble_class_probas": {LABEL_TO_NAME[i]: round(float(ensemble_probas[i]), 4) for i in range(len(ensemble_probas))}, "predicted_class": predicted_class, "predicted_label": predicted_label, "confidence": round(float(ensemble_probas[predicted_class]), 4) } # ============================================================================ # USAGE EXAMPLE # ============================================================================ if __name__ == "__main__": # ===== INPUT: Modify this variable for querying ===== text_input = """Increment the value of a pointer when sent as a parameter I am stuck in the following pointer problem: Say you have a function: void Function (unsigned char *ubPointer) { ubPointer++; } int main (void) { unsigned char *PointerX; Function( PointerX ); } What I want is that the ++ is reflected in PointerX, without declaring it as a global variable. Thank you very much. """ # ======================================= # Paths codebert_path = Path(__file__).parent / "codebert_model" xgb_model_path = Path(__file__).parent / "xgboost_model.pkl" tfidf_path = Path(__file__).parent / "tfidf_vectorizer.pkl" try: print("="*60) print("ENSEMBLE PREDICTOR - CodeBERT + XGBoost") print("(Training Pipeline: TF-IDF + Handcrafted Features)") print("="*60 + "\n") predictor = EnsemblePredictor( codebert_path=str(codebert_path), xgb_model_path=str(xgb_model_path), tfidf_path=str(tfidf_path) ) print(f"Input text: {repr(text_input)}\n") result = predictor.predict(text_input) print("="*60) print("FINAL RESULTS") print("="*60) print(f"Weighting: {result['weight_info']}") print(f"\nFINAL PREDICTION: {result['predicted_label'].upper()}") print(f"Confidence: {result['confidence']:.4f}") print("="*60) except Exception as e: print(f"Error: {e}") import traceback traceback.print_exc()