efficiency-eval / main.py
yousefabdallah031's picture
Update main.py
0ede02a verified
from flask import Flask, request, jsonify
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import os
import gc
app = Flask(__name__)
model = None
tokenizer = None
device = None
def setup_device():
if torch.cuda.is_available():
return torch.device('cuda')
elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
return torch.device('mps')
else:
return torch.device('cpu')
def load_model_and_tokenizer():
global model, tokenizer, device
device = setup_device()
print(f"Using device: {device}")
try:
model_path = "./best_model_final"
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForSequenceClassification.from_pretrained(model_path)
model.to(device)
model.eval()
if device.type == 'cuda':
model.half()
print("Model and tokenizer loaded successfully!")
except Exception as e:
print(f"Error loading model/tokenizer: {e}")
raise e
def cleanup_gpu_memory():
if device and device.type == 'cuda':
torch.cuda.empty_cache()
gc.collect()
def predict_single(code):
try:
inputs = tokenizer(
code,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
)
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
if device.type == 'cuda':
with torch.cuda.amp.autocast():
outputs = model(**inputs)
else:
outputs = model(**inputs)
preds = torch.sigmoid(outputs.logits).cpu().numpy()
cpu_time, memory_usage = preds[0]
# Invert values so higher = better
return {
"cpu_time": round(1.0 - float(cpu_time), 4),
"memory_usage": round(1.0 - float(memory_usage), 4)
}
except Exception as e:
print(f"Single prediction error: {e}")
return {"cpu_time": 0.0, "memory_usage": 0.0}
def predict_with_chunking(code, chunk_size=400, overlap=50):
try:
if not code or len(code.strip()) == 0:
return {"cpu_time": 0.0, "memory_usage": 0.0}
tokens = tokenizer.encode(code, add_special_tokens=False)
if len(tokens) <= 450:
return predict_single(code)
max_cpu_efficiency = 0.0
max_memory_efficiency = 0.0
for start in range(0, len(tokens), chunk_size - overlap):
end = min(start + chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
chunk_code = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
if chunk_code.strip():
result = predict_single(chunk_code)
max_cpu_efficiency = max(max_cpu_efficiency, result["cpu_time"])
max_memory_efficiency = max(max_memory_efficiency, result["memory_usage"])
if end >= len(tokens):
break
return {
"cpu_time": round(max_cpu_efficiency, 4),
"memory_usage": round(max_memory_efficiency, 4)
}
except Exception as e:
print(f"Chunking prediction error: {e}")
return {"cpu_time": 0.0, "memory_usage": 0.0}
@app.route("/", methods=['GET'])
def home():
return jsonify({
"message": "Code Efficiency Prediction API",
"status": "Model loaded" if model is not None else "Model not loaded",
"device": str(device) if device else "unknown",
"endpoints": {
"/predict": "POST with JSON body containing 'codes' array",
"/health": "GET server health status"
}
})
@app.route("/predict", methods=['POST'])
def predict_batch():
try:
if model is None or tokenizer is None:
return jsonify({"error": "Model not loaded properly"}), 500
data = request.get_json()
if not data or 'codes' not in data:
return jsonify({"error": "Missing 'codes' field in JSON body"}), 400
codes = data['codes']
if not isinstance(codes, list) or len(codes) == 0:
return jsonify({"error": "'codes' must be a non-empty array"}), 400
if len(codes) > 100:
return jsonify({"error": "Too many codes. Maximum 100 allowed."}), 400
validated_codes = []
for i, code in enumerate(codes):
if not isinstance(code, str):
return jsonify({"error": f"Code at index {i} must be a string"}), 400
if len(code.strip()) == 0:
validated_codes.append("# empty code")
elif len(code) > 50000:
return jsonify({"error": f"Code at index {i} too long. Maximum 50000 characters."}), 400
else:
validated_codes.append(code.strip())
batch_size = min(len(validated_codes), 16)
results = []
for i in range(0, len(validated_codes), batch_size):
batch = validated_codes[i:i+batch_size]
for code in batch:
tokens = tokenizer.encode(code, add_special_tokens=False)
if len(tokens) > 450:
result = predict_with_chunking(code)
else:
result = predict_single(code)
results.append(result)
cleanup_gpu_memory()
return jsonify({"results": results})
except Exception as e:
cleanup_gpu_memory()
return jsonify({"error": f"Batch prediction error: {str(e)}"}), 500
@app.route("/health", methods=['GET'])
def health_check():
return jsonify({
"status": "healthy",
"model_loaded": model is not None,
"tokenizer_loaded": tokenizer is not None,
"device": str(device) if device else "unknown"
})
load_model_and_tokenizer()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860, debug=False, threaded=True)