from flask import Flask, request, jsonify from transformers import AutoTokenizer, AutoModelForSequenceClassification import torch import os import gc app = Flask(__name__) model = None tokenizer = None device = None def setup_device(): if torch.cuda.is_available(): return torch.device('cuda') elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): return torch.device('mps') else: return torch.device('cpu') def load_model_and_tokenizer(): global model, tokenizer, device device = setup_device() print(f"Using device: {device}") try: model_path = "./best_model_final" tokenizer = AutoTokenizer.from_pretrained(model_path) model = AutoModelForSequenceClassification.from_pretrained(model_path) model.to(device) model.eval() if device.type == 'cuda': model.half() print("Model and tokenizer loaded successfully!") except Exception as e: print(f"Error loading model/tokenizer: {e}") raise e def cleanup_gpu_memory(): if device and device.type == 'cuda': torch.cuda.empty_cache() gc.collect() def predict_single(code): try: inputs = tokenizer( code, padding=True, truncation=True, max_length=512, return_tensors="pt" ) inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): if device.type == 'cuda': with torch.cuda.amp.autocast(): outputs = model(**inputs) else: outputs = model(**inputs) preds = torch.sigmoid(outputs.logits).cpu().numpy() cpu_time, memory_usage = preds[0] # Invert values so higher = better return { "cpu_time": round(1.0 - float(cpu_time), 4), "memory_usage": round(1.0 - float(memory_usage), 4) } except Exception as e: print(f"Single prediction error: {e}") return {"cpu_time": 0.0, "memory_usage": 0.0} def predict_with_chunking(code, chunk_size=400, overlap=50): try: if not code or len(code.strip()) == 0: return {"cpu_time": 0.0, "memory_usage": 0.0} tokens = tokenizer.encode(code, add_special_tokens=False) if len(tokens) <= 450: return predict_single(code) max_cpu_efficiency = 0.0 max_memory_efficiency = 0.0 for start in range(0, len(tokens), chunk_size - overlap): end = min(start + chunk_size, len(tokens)) chunk_tokens = tokens[start:end] chunk_code = tokenizer.decode(chunk_tokens, skip_special_tokens=True) if chunk_code.strip(): result = predict_single(chunk_code) max_cpu_efficiency = max(max_cpu_efficiency, result["cpu_time"]) max_memory_efficiency = max(max_memory_efficiency, result["memory_usage"]) if end >= len(tokens): break return { "cpu_time": round(max_cpu_efficiency, 4), "memory_usage": round(max_memory_efficiency, 4) } except Exception as e: print(f"Chunking prediction error: {e}") return {"cpu_time": 0.0, "memory_usage": 0.0} @app.route("/", methods=['GET']) def home(): return jsonify({ "message": "Code Efficiency Prediction API", "status": "Model loaded" if model is not None else "Model not loaded", "device": str(device) if device else "unknown", "endpoints": { "/predict": "POST with JSON body containing 'codes' array", "/health": "GET server health status" } }) @app.route("/predict", methods=['POST']) def predict_batch(): try: if model is None or tokenizer is None: return jsonify({"error": "Model not loaded properly"}), 500 data = request.get_json() if not data or 'codes' not in data: return jsonify({"error": "Missing 'codes' field in JSON body"}), 400 codes = data['codes'] if not isinstance(codes, list) or len(codes) == 0: return jsonify({"error": "'codes' must be a non-empty array"}), 400 if len(codes) > 100: return jsonify({"error": "Too many codes. Maximum 100 allowed."}), 400 validated_codes = [] for i, code in enumerate(codes): if not isinstance(code, str): return jsonify({"error": f"Code at index {i} must be a string"}), 400 if len(code.strip()) == 0: validated_codes.append("# empty code") elif len(code) > 50000: return jsonify({"error": f"Code at index {i} too long. Maximum 50000 characters."}), 400 else: validated_codes.append(code.strip()) batch_size = min(len(validated_codes), 16) results = [] for i in range(0, len(validated_codes), batch_size): batch = validated_codes[i:i+batch_size] for code in batch: tokens = tokenizer.encode(code, add_special_tokens=False) if len(tokens) > 450: result = predict_with_chunking(code) else: result = predict_single(code) results.append(result) cleanup_gpu_memory() return jsonify({"results": results}) except Exception as e: cleanup_gpu_memory() return jsonify({"error": f"Batch prediction error: {str(e)}"}), 500 @app.route("/health", methods=['GET']) def health_check(): return jsonify({ "status": "healthy", "model_loaded": model is not None, "tokenizer_loaded": tokenizer is not None, "device": str(device) if device else "unknown" }) load_model_and_tokenizer() if __name__ == "__main__": app.run(host="0.0.0.0", port=7860, debug=False, threaded=True)