File size: 10,377 Bytes
44c3a8f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
"""

Length-aware gating ensemble: CodeBERTfinetune + XGBoost predictor

"""

import torch
import joblib
import numpy as np
import re
import warnings
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sklearn.feature_extraction.text import TfidfVectorizer
from scipy.sparse import hstack
from pathlib import Path

warnings.filterwarnings("ignore")


LABELS = {'Functional': 0, 'Non-Paradigm': 1, 'Oop': 2, 'Procedural': 3}
LABEL_TO_NAME = {v: k for k, v in LABELS.items()}


class FeatureExtractor:
    """Same feature extractor used during training"""
    def __init__(self):
        self.oop_kw = ['class', 'object', 'this', 'self', 'extends', 'implements', 'interface',
                       'public', 'private', 'protected', 'static', 'virtual', 'override']
        self.fp_kw = ['map', 'filter', 'reduce', 'fold', 'lambda', 'closure', '=>',
                      'monad', 'functor', 'pure', 'immutable', 'const', 'let']
        self.proc_kw = ['void', 'int', 'char', 'float', 'struct', 'malloc', 'free',
                        'pointer', 'goto', 'scanf', 'printf']
    
    def extract(self, text):
        t = text.lower()
        return {
            'oop_score': sum(t.count(k) for k in self.oop_kw),
            'fp_score': sum(t.count(k) for k in self.fp_kw),
            'proc_score': sum(t.count(k) for k in self.proc_kw),
            'length': len(text),
            'num_lines': text.count('\n') + 1,
            'has_class': 1 if re.search(r'\bclass\s+\w+', t) else 0,
            'has_lambda': 1 if 'lambda' in t or '=>' in text else 0,
            'num_dots': text.count('.'),
            'num_arrows': text.count('->') + text.count('=>'),
            'num_braces': text.count('{') + text.count('}')
        }


class EnsemblePredictor:
    def __init__(self, codebert_path, xgb_model_path, tfidf_path=None):
        """

        Initialize the ensemble predictor.

        

        Args:

            codebert_path: Path to CodeBERT model directory

            xgb_model_path: Path to saved XGBoost model (REQUIRED)

            tfidf_path: Path to saved TF-IDF vectorizer (REQUIRED for XGBoost features)

        """
        self.feature_extractor = FeatureExtractor()
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {self.device}")
        
        print("Loading CodeBERT model...")
        self.tokenizer = AutoTokenizer.from_pretrained(codebert_path)
        self.codebert = AutoModelForSequenceClassification.from_pretrained(codebert_path)
        self.codebert.eval()
        self.codebert.to(self.device)
        print("CodeBERT fine-tuned model loaded successfully\n")
        
        if tfidf_path is None:
            tfidf_path = Path(__file__).parent / "tfidf_vectorizer.pkl"
        
        tfidf_path = Path(tfidf_path)
        if not tfidf_path.exists():
            raise FileNotFoundError(f"TF-IDF vectorizer NOT FOUND: {tfidf_path}\n")

        
        print(f"Loading TF-IDF vectorizer from {tfidf_path}...")
        self.tfidf = joblib.load(str(tfidf_path))
        print("TF-IDF vectorizer loaded successfully\n")
        
        # Load XGBoost
        xgb_path = Path(xgb_model_path)
        if not xgb_path.exists():
            raise FileNotFoundError(f"XGBoost model NOT FOUND: {xgb_model_path}")
        
        print(f"Loading XGBoost model from {xgb_model_path}...")
        try:
            self.xgb_model = joblib.load(str(xgb_path))
            print("XGBoost model loaded successfully\n")
        except Exception as e:
            raise RuntimeError(f"Failed to load XGBoost model: {e}")
    
    def get_codebert_proba(self, text):
        """Get probability predictions from CodeBERT"""
        with torch.no_grad():
            inputs = self.tokenizer(
                text,
                return_tensors="pt",
                padding=True,
                truncation=True,
                max_length=256 
            )
            inputs = {k: v.to(self.device) for k, v in inputs.items()}
            
            outputs = self.codebert(**inputs)
            logits = outputs.logits
            proba = torch.softmax(logits, dim=-1).cpu().numpy()[0]
        
        return proba
    
    def get_xgb_proba(self, text):
        """Get probability predictions from XGBoost."""
        features = self._extract_features(text)
        proba = self.xgb_model.predict_proba(features)[0]
        return proba
    
    def _extract_features(self, text):
        """

        Extract features using same pipeline as training:

        TF-IDF (1000 features) + Handcrafted Features (10 features) = 1010 total

        NO CodeBERT embeddings

        """
        tfidf_vec = self.tfidf.transform([text])  # Returns sparse matrix [1, 1000]
        
        handcrafted_feats = self.feature_extractor.extract(text)
        handcrafted_vec = np.array(list(handcrafted_feats.values()), dtype=np.float32).reshape(1, -1)
        
        # Stack: TF-IDF + handcrafted
        combined = hstack([tfidf_vec, handcrafted_vec])
        
        return combined
    
    def predict(self, text):
        """

        Predict using length-aware gating ensemble.

        

        Args:

            text: Input code/text string

        

        Returns:

            Dictionary with probabilities, ensembled prediction, and paradigm label

        """
        tokens = self.tokenizer.tokenize(text)
        length = len(tokens)
        
        # Get CodeBERT probabilities
        codebert_probas = self.get_codebert_proba(text)
        cb_pred_class = np.argmax(codebert_probas)
        
        # Get XGBoost probabilities
        xgb_probas = self.get_xgb_proba(text)
        xgb_pred_class = np.argmax(xgb_probas)
        
        # Length-aware gating
        if length < 60:
            weight_info = "Short (CodeBERT 80% + XGB 20%)"
            cb_weight = 0.8
            xgb_weight = 0.2
        elif length > 150:
            weight_info = "Long (CodeBERT 50% + XGB 50%)"
            cb_weight = 0.5
            xgb_weight = 0.5
        else:
            weight_info = "Medium (CodeBERT 65% + XGB 35%)"
            cb_weight = 0.65
            xgb_weight = 0.35
        
        # Weighted average of two probability distributions
        ensemble_probas = cb_weight * codebert_probas + xgb_weight * xgb_probas
        ensemble_probas = ensemble_probas / ensemble_probas.sum()  # Normalize
        
        predicted_class = np.argmax(ensemble_probas)
        predicted_label = LABEL_TO_NAME[predicted_class]
        
        # Debug prints
        print("\n" + "="*60)
        print("DEBUG: Model Outputs")
        print("="*60)
        print(f"Token length: {length}")
        print(f"Weights: CB={cb_weight:.2f}, XGB={xgb_weight:.2f}\n")
        
        print("CodeBERT class probabilities:")
        for i, prob in enumerate(codebert_probas):
            print(f"  {LABEL_TO_NAME[i]:15s}: {prob:.4f}")
        print(f"  → Predicted: {LABEL_TO_NAME[cb_pred_class]}\n")
        
        print("XGBoost class probabilities:")
        for i, prob in enumerate(xgb_probas):
            print(f"  {LABEL_TO_NAME[i]:15s}: {prob:.4f}")
        print(f"  → Predicted: {LABEL_TO_NAME[xgb_pred_class]}\n")
        
        print("Ensemble class probabilities:")
        for i, prob in enumerate(ensemble_probas):
            marker = " ← FINAL" if i == predicted_class else ""
            print(f"  {LABEL_TO_NAME[i]:15s}: {prob:.4f}{marker}")
        print("="*60 + "\n")
        
        return {
            "length": length,
            "weight_info": weight_info,
            "codebert_class_probas": {LABEL_TO_NAME[i]: round(float(codebert_probas[i]), 4) for i in range(len(codebert_probas))},
            "codebert_pred_class": LABEL_TO_NAME[cb_pred_class],
            "xgb_class_probas": {LABEL_TO_NAME[i]: round(float(xgb_probas[i]), 4) for i in range(len(xgb_probas))},
            "xgb_pred_class": LABEL_TO_NAME[xgb_pred_class],
            "ensemble_class_probas": {LABEL_TO_NAME[i]: round(float(ensemble_probas[i]), 4) for i in range(len(ensemble_probas))},
            "predicted_class": predicted_class,
            "predicted_label": predicted_label,
            "confidence": round(float(ensemble_probas[predicted_class]), 4)
        }


# ============================================================================
# USAGE EXAMPLE
# ============================================================================

if __name__ == "__main__":
    
    # ===== INPUT: Modify this variable for querying =====
    text_input = """Increment the value of a pointer when sent as a parameter	I am stuck in the following pointer problem: Say you have a function: void Function (unsigned char *ubPointer) { ubPointer++; } int main (void) { unsigned char *PointerX; Function( PointerX ); } What I want is that the ++ is reflected in PointerX, without declaring it as a global variable. Thank you very much.



"""
    # =======================================
    
    # Paths
    codebert_path = Path(__file__).parent / "codebert_model"
    xgb_model_path = Path(__file__).parent / "xgboost_model.pkl"
    tfidf_path = Path(__file__).parent / "tfidf_vectorizer.pkl"
    
    try:
        print("="*60)
        print("ENSEMBLE PREDICTOR - CodeBERT + XGBoost")
        print("(Training Pipeline: TF-IDF + Handcrafted Features)")
        print("="*60 + "\n")
        
        predictor = EnsemblePredictor(
            codebert_path=str(codebert_path),
            xgb_model_path=str(xgb_model_path),
            tfidf_path=str(tfidf_path)
        )
        
        print(f"Input text: {repr(text_input)}\n")
        
        result = predictor.predict(text_input)
        
        print("="*60)
        print("FINAL RESULTS")
        print("="*60)
        print(f"Weighting: {result['weight_info']}")
        print(f"\nFINAL PREDICTION: {result['predicted_label'].upper()}")
        print(f"Confidence: {result['confidence']:.4f}")
        print("="*60)
        
    except Exception as e:
        print(f"Error: {e}")
        import traceback
        traceback.print_exc()