Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| import torch | |
| import os | |
| import gc | |
| app = Flask(__name__) | |
| model = None | |
| tokenizer = None | |
| device = None | |
| def setup_device(): | |
| if torch.cuda.is_available(): | |
| return torch.device('cuda') | |
| elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): | |
| return torch.device('mps') | |
| else: | |
| return torch.device('cpu') | |
| def load_model_and_tokenizer(): | |
| global model, tokenizer, device | |
| device = setup_device() | |
| print(f"Using device: {device}") | |
| try: | |
| model_path = "./best_model_final" | |
| tokenizer = AutoTokenizer.from_pretrained(model_path) | |
| model = AutoModelForSequenceClassification.from_pretrained(model_path) | |
| model.to(device) | |
| model.eval() | |
| if device.type == 'cuda': | |
| model.half() | |
| print("Model and tokenizer loaded successfully!") | |
| except Exception as e: | |
| print(f"Error loading model/tokenizer: {e}") | |
| raise e | |
| def cleanup_gpu_memory(): | |
| if device and device.type == 'cuda': | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| def predict_single(code): | |
| try: | |
| inputs = tokenizer( | |
| code, | |
| padding=True, | |
| truncation=True, | |
| max_length=512, | |
| return_tensors="pt" | |
| ) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| if device.type == 'cuda': | |
| with torch.cuda.amp.autocast(): | |
| outputs = model(**inputs) | |
| else: | |
| outputs = model(**inputs) | |
| preds = torch.sigmoid(outputs.logits).cpu().numpy() | |
| cpu_time, memory_usage = preds[0] | |
| # Invert values so higher = better | |
| return { | |
| "cpu_time": round(1.0 - float(cpu_time), 4), | |
| "memory_usage": round(1.0 - float(memory_usage), 4) | |
| } | |
| except Exception as e: | |
| print(f"Single prediction error: {e}") | |
| return {"cpu_time": 0.0, "memory_usage": 0.0} | |
| def predict_with_chunking(code, chunk_size=400, overlap=50): | |
| try: | |
| if not code or len(code.strip()) == 0: | |
| return {"cpu_time": 0.0, "memory_usage": 0.0} | |
| tokens = tokenizer.encode(code, add_special_tokens=False) | |
| if len(tokens) <= 450: | |
| return predict_single(code) | |
| max_cpu_efficiency = 0.0 | |
| max_memory_efficiency = 0.0 | |
| for start in range(0, len(tokens), chunk_size - overlap): | |
| end = min(start + chunk_size, len(tokens)) | |
| chunk_tokens = tokens[start:end] | |
| chunk_code = tokenizer.decode(chunk_tokens, skip_special_tokens=True) | |
| if chunk_code.strip(): | |
| result = predict_single(chunk_code) | |
| max_cpu_efficiency = max(max_cpu_efficiency, result["cpu_time"]) | |
| max_memory_efficiency = max(max_memory_efficiency, result["memory_usage"]) | |
| if end >= len(tokens): | |
| break | |
| return { | |
| "cpu_time": round(max_cpu_efficiency, 4), | |
| "memory_usage": round(max_memory_efficiency, 4) | |
| } | |
| except Exception as e: | |
| print(f"Chunking prediction error: {e}") | |
| return {"cpu_time": 0.0, "memory_usage": 0.0} | |
| def home(): | |
| return jsonify({ | |
| "message": "Code Efficiency Prediction API", | |
| "status": "Model loaded" if model is not None else "Model not loaded", | |
| "device": str(device) if device else "unknown", | |
| "endpoints": { | |
| "/predict": "POST with JSON body containing 'codes' array", | |
| "/health": "GET server health status" | |
| } | |
| }) | |
| def predict_batch(): | |
| try: | |
| if model is None or tokenizer is None: | |
| return jsonify({"error": "Model not loaded properly"}), 500 | |
| data = request.get_json() | |
| if not data or 'codes' not in data: | |
| return jsonify({"error": "Missing 'codes' field in JSON body"}), 400 | |
| codes = data['codes'] | |
| if not isinstance(codes, list) or len(codes) == 0: | |
| return jsonify({"error": "'codes' must be a non-empty array"}), 400 | |
| if len(codes) > 100: | |
| return jsonify({"error": "Too many codes. Maximum 100 allowed."}), 400 | |
| validated_codes = [] | |
| for i, code in enumerate(codes): | |
| if not isinstance(code, str): | |
| return jsonify({"error": f"Code at index {i} must be a string"}), 400 | |
| if len(code.strip()) == 0: | |
| validated_codes.append("# empty code") | |
| elif len(code) > 50000: | |
| return jsonify({"error": f"Code at index {i} too long. Maximum 50000 characters."}), 400 | |
| else: | |
| validated_codes.append(code.strip()) | |
| batch_size = min(len(validated_codes), 16) | |
| results = [] | |
| for i in range(0, len(validated_codes), batch_size): | |
| batch = validated_codes[i:i+batch_size] | |
| for code in batch: | |
| tokens = tokenizer.encode(code, add_special_tokens=False) | |
| if len(tokens) > 450: | |
| result = predict_with_chunking(code) | |
| else: | |
| result = predict_single(code) | |
| results.append(result) | |
| cleanup_gpu_memory() | |
| return jsonify({"results": results}) | |
| except Exception as e: | |
| cleanup_gpu_memory() | |
| return jsonify({"error": f"Batch prediction error: {str(e)}"}), 500 | |
| def health_check(): | |
| return jsonify({ | |
| "status": "healthy", | |
| "model_loaded": model is not None, | |
| "tokenizer_loaded": tokenizer is not None, | |
| "device": str(device) if device else "unknown" | |
| }) | |
| load_model_and_tokenizer() | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860, debug=False, threaded=True) | |