""" Validation module for the Coding Expert model """ import os import json from pathlib import Path import hashlib import datetime from typing import Dict, Any, List, Optional import subprocess import ast import sys import psutil class CodeValidator: def __init__(self, checkpoint_dir: str = "checkpoints"): self.checkpoint_dir = Path(checkpoint_dir) self.checkpoint_dir.mkdir(exist_ok=True) self.validation_dir = self.checkpoint_dir / "validation" self.validation_dir.mkdir(exist_ok=True) # Initialize validation metrics self.metrics = { "code_quality": [], "performance": [], "memory_usage": [], "error_count": [] } def validate_code(self, code: str, language: str = "python") -> Dict[str, Any]: """Validate code quality and performance""" try: # Parse the code to check syntax tree = ast.parse(code) # Calculate code metrics metrics = self._calculate_code_metrics(tree) # Run static analysis static_analysis = self._run_static_analysis(code, language) # Check for common issues issues = self._check_common_issues(tree) return { "is_valid": not issues, "metrics": metrics, "static_analysis": static_analysis, "issues": issues, "validation_score": self._calculate_validation_score(metrics, issues) } except Exception as e: return { "is_valid": False, "error": str(e), "validation_score": 0.0 } def _calculate_code_metrics(self, tree: ast.AST) -> Dict[str, Any]: """Calculate various code metrics""" return { "complexity": self._calculate_complexity(tree), "num_functions": len([node for node in ast.walk(tree) if isinstance(node, ast.FunctionDef)]), "num_classes": len([node for node in ast.walk(tree) if isinstance(node, ast.ClassDef)]), "num_imports": len([node for node in ast.walk(tree) if isinstance(node, ast.Import)]), "num_statements": len([node for node in ast.walk(tree) if isinstance(node, ast.stmt)]) } def _calculate_complexity(self, tree: ast.AST) -> int: """Calculate cyclomatic complexity""" complexity = 1 # Start with 1 for the main program for node in ast.walk(tree): if isinstance(node, (ast.If, ast.For, ast.While, ast.Try, ast.ExceptHandler)): complexity += 1 return complexity def _run_static_analysis(self, code: str, language: str) -> Dict[str, Any]: """Run static analysis tools""" if language == "python": try: # Run pylint process = subprocess.run( ["pylint", "-"], input=code, capture_output=True, text=True, timeout=5 ) score = float(process.stdout.split("Your code has been rated at")[1].split()[0]) return { "pylint_score": score, "issues": process.stdout.count("error") } except Exception as e: return { "pylint_score": 0.0, "error": str(e) } return {} def _check_common_issues(self, tree: ast.AST) -> List[str]: """Check for common code issues""" issues = [] # Check for global variables for node in ast.walk(tree): if isinstance(node, ast.Global): issues.append("Global variables detected") # Check for long functions for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): if len(node.body) > 50: issues.append(f"Function {node.name} is too long") # Check for complex if statements for node in ast.walk(tree): if isinstance(node, ast.If): if len(node.body) > 20: issues.append("Complex if statement detected") return issues def _calculate_validation_score(self, metrics: Dict[str, Any], issues: List[str]) -> float: """Calculate overall validation score""" score = 1.0 # Penalize for code complexity score *= 0.9 if metrics["complexity"] > 10 else 1.0 # Penalize for issues score *= 0.9 ** len(issues) return max(0.0, min(1.0, score)) def create_checkpoint(self, data: Dict[str, Any], name: str = None) -> str: """Create a checkpoint of validation data""" if name is None: name = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") checkpoint_path = self.validation_dir / f"checkpoint_{name}.json" # Add timestamp and hash data["timestamp"] = str(datetime.datetime.now()) data["hash"] = hashlib.sha256(str(data).encode()).hexdigest() with open(checkpoint_path, 'w') as f: json.dump(data, f, indent=2) return str(checkpoint_path) def load_checkpoint(self, name: str) -> Optional[Dict[str, Any]]: """Load a validation checkpoint""" checkpoint_path = self.validation_dir / f"checkpoint_{name}.json" if not checkpoint_path.exists(): return None with open(checkpoint_path, 'r') as f: return json.load(f) def validate_dataset(self, dataset: List[Dict[str, Any]]) -> Dict[str, Any]: """Validate a complete dataset""" results = [] error_count = 0 for idx, example in enumerate(dataset): try: # Validate code if "code" in example: code_result = self.validate_code( example["code"], example.get("language", "python") ) results.append(code_result) # Validate code review if "review" in example: review_result = self._validate_code_review( example["code"], example["review"] ) results.append(review_result) except Exception as e: error_count += 1 results.append({ "error": str(e), "validation_score": 0.0 }) # Calculate overall metrics scores = [r["validation_score"] for r in results if "validation_score" in r] if scores: avg_score = np.mean(scores) else: avg_score = 0.0 return { "total_examples": len(dataset), "processed_examples": len(results), "error_count": error_count, "average_score": float(avg_score), "detailed_results": results } def save_validation_report(self, report: Dict[str, Any], name: str = None) -> str: """Save a validation report""" if name is None: name = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") report_path = self.validation_dir / f"report_{name}.json" # Add timestamp and summary metrics report["timestamp"] = str(datetime.datetime.now()) report["summary"] = { "accuracy": report.get("average_score", 0.0), "error_rate": report.get("error_count", 0) / report.get("total_examples", 1) } with open(report_path, 'w') as f: json.dump(report, f, indent=2) return str(report_path) def _validate_code_review(self, code: str, review: str) -> Dict[str, Any]: """Validate code review comments""" try: # Validate code code_result = self.validate_code(code) # Check if review addresses key issues issues = self._check_common_issues(ast.parse(code)) review_issues = [issue for issue in issues if issue.lower() in review.lower()] return { "is_valid": len(review_issues) > 0, "review_issues_covered": len(review_issues), "total_issues": len(issues), "validation_score": code_result["validation_score"] * (len(review_issues) / len(issues) if issues else 1.0) } except Exception as e: return { "is_valid": False, "error": str(e), "validation_score": 0.0 }