Spaces:
Build error
Build error
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| import autopep8 | |
| import subprocess | |
| import time | |
| import re | |
| import os | |
| from pathlib import Path | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import tempfile | |
| app = FastAPI(title="Code Evaluation & Optimization API") | |
| # CORS Configuration | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Environment Setup - Modified for Hugging Face Spaces | |
| CACHE_DIR = Path(os.getenv("HF_HOME", "/tmp/huggingface")) | |
| CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
| os.environ["TRANSFORMERS_CACHE"] = str(CACHE_DIR) | |
| os.environ["HF_HOME"] = str(CACHE_DIR) | |
| # Load AI Model - Using smaller model for Spaces compatibility | |
| MODEL_NAME = "codellama/CodeLlama-7b-instruct-hf" # More suitable for API use | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| MODEL_NAME, | |
| cache_dir=str(CACHE_DIR)) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_NAME, | |
| device_map="auto", | |
| torch_dtype=torch.float16, | |
| cache_dir=str(CACHE_DIR)) | |
| except Exception as e: | |
| print(f"Model loading warning: {str(e)}") | |
| model = None | |
| tokenizer = None | |
| # Request Model | |
| class CodeRequest(BaseModel): | |
| code: str | |
| language: str = "python" | |
| def create_temp_file(code: str, extension: str) -> str: | |
| """Create temporary file in writable directory with proper permissions""" | |
| temp_dir = "/tmp/code_files" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| fd, path = tempfile.mkstemp(suffix=f".{extension}", dir=temp_dir) | |
| with os.fdopen(fd, 'w') as tmp: | |
| tmp.write(code) | |
| os.chmod(path, 0o777) # Ensure executable permissions | |
| return path | |
| def cleanup_temp_files(): | |
| """Clean up temporary files""" | |
| temp_dir = "/tmp/code_files" | |
| if os.path.exists(temp_dir): | |
| for filename in os.listdir(temp_dir): | |
| file_path = os.path.join(temp_dir, filename) | |
| try: | |
| if os.path.isfile(file_path): | |
| os.unlink(file_path) | |
| except Exception as e: | |
| print(f"Error deleting {file_path}: {e}") | |
| # Helper Functions | |
| def evaluate_code(user_code: str, lang: str) -> dict: | |
| """Evaluate code for correctness, performance, and security""" | |
| start_time = time.time() | |
| file_ext = {"python": "py", "java": "java", "cpp": "cpp", "javascript": "js"}.get(lang, "txt") | |
| try: | |
| filename = create_temp_file(user_code, file_ext) | |
| commands = { | |
| "python": ["python3", filename], | |
| "java": ["javac", filename, "&&", "java", filename.replace(".java", "")], | |
| "cpp": ["g++", filename, "-o", f"{filename}.out", "&&", f"./{filename}.out"], | |
| "javascript": ["node", filename] | |
| } | |
| if lang not in commands: | |
| return {"status": "error", "message": "Unsupported language", "score": 0} | |
| result = subprocess.run( | |
| " ".join(commands[lang]), | |
| capture_output=True, | |
| text=True, | |
| timeout=5, | |
| shell=True | |
| ) | |
| exec_time = time.time() - start_time | |
| correctness = 1 if result.returncode == 0 else 0 | |
| error_message = None if correctness else result.stderr.strip() | |
| # Scoring logic | |
| readability_score = 20 if len(user_code) < 200 else 10 | |
| efficiency_score = 30 if exec_time < 1 else 10 | |
| security_score = 20 if "eval(" not in user_code and "exec(" not in user_code else 0 | |
| total_score = (correctness * 50) + readability_score + efficiency_score + security_score | |
| feedback = [] | |
| if correctness == 0: | |
| feedback.append("β Error in Code Execution! Check syntax or logic errors.") | |
| feedback.append(f"π Error Details: {error_message}") | |
| else: | |
| feedback.append("β Code executed successfully!") | |
| if efficiency_score < 30: | |
| feedback.append("β‘ Performance Issue: Code took longer to execute. Optimize loops or calculations.") | |
| if readability_score < 20: | |
| feedback.append("π Readability Issue: Code is lengthy. Break into smaller functions.") | |
| if security_score == 0: | |
| feedback.append("π Security Risk: Avoid using eval() or exec().") | |
| return { | |
| "status": "success" if correctness else "error", | |
| "execution_time": round(exec_time, 3) if correctness else None, | |
| "score": max(0, min(100, total_score)), | |
| "feedback": "\n".join(feedback), | |
| "error_details": error_message if not correctness else None | |
| } | |
| except subprocess.TimeoutExpired: | |
| return {"status": "error", "message": "Execution timed out", "score": 0} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e), "score": 0} | |
| finally: | |
| cleanup_temp_files() | |
| def optimize_code_ai(user_code: str, lang: str) -> str: | |
| """Generate optimized code using AI""" | |
| if model is None or tokenizer is None: | |
| raise HTTPException(status_code=503, detail="AI service temporarily unavailable") | |
| try: | |
| if lang == "python": | |
| user_code = autopep8.fix_code(user_code) | |
| user_code = re.sub(r"eval\((.*)\)", r"int(\1) # Removed eval for security", user_code) | |
| user_code = re.sub(r"/ 0", "/ 1 # Fixed division by zero", user_code) | |
| prompt = f"""Optimize this {lang} code for better performance and readability: | |
| ```{lang} | |
| {user_code} | |
| """ | |
| inputs = tokenizer(prompt, return_tensors="pt").to(model.device) | |
| with torch.no_grad(): | |
| outputs = model.generate(**inputs, max_length=1024, temperature=0.7) | |
| optimized_code = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Extract code between the last code block markers | |
| code_blocks = re.findall(r'```(?:python)?\n(.*?)\n```', optimized_code, re.DOTALL) | |
| if code_blocks: | |
| optimized_code = code_blocks[-1] # Get the last code block | |
| return optimized_code.strip() if optimized_code else user_code | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"AI optimization failed: {str(e)}") | |
| # API Endpoints | |
| async def evaluate_endpoint(request: CodeRequest): | |
| try: | |
| result = evaluate_code(request.code, request.language) | |
| return {"status": "success", "result": result} | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def optimize_endpoint(request: CodeRequest): | |
| try: | |
| optimized = optimize_code_ai(request.code, request.language) | |
| return {"status": "success", "optimized_code": optimized} | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| def health_check(): | |
| return { | |
| "status": "API is running", | |
| "model": MODEL_NAME if model else "Not loaded", | |
| "endpoints": { | |
| "evaluate": "POST /evaluate", | |
| "optimize": "POST /optimize" | |
| } | |
| } | |
| def shutdown_event(): | |
| cleanup_temp_files() | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("main:app", host="0.0.0.0", port=7860) |