|
|
""" |
|
|
Validation module for the Coding Expert model |
|
|
""" |
|
|
import os |
|
|
import json |
|
|
from pathlib import Path |
|
|
import hashlib |
|
|
import datetime |
|
|
from typing import Dict, Any, List, Optional |
|
|
import subprocess |
|
|
import ast |
|
|
import sys |
|
|
import psutil |
|
|
|
|
|
class CodeValidator: |
|
|
def __init__(self, checkpoint_dir: str = "checkpoints"): |
|
|
self.checkpoint_dir = Path(checkpoint_dir) |
|
|
self.checkpoint_dir.mkdir(exist_ok=True) |
|
|
self.validation_dir = self.checkpoint_dir / "validation" |
|
|
self.validation_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
self.metrics = { |
|
|
"code_quality": [], |
|
|
"performance": [], |
|
|
"memory_usage": [], |
|
|
"error_count": [] |
|
|
} |
|
|
|
|
|
def validate_code(self, code: str, language: str = "python") -> Dict[str, Any]: |
|
|
"""Validate code quality and performance""" |
|
|
try: |
|
|
|
|
|
tree = ast.parse(code) |
|
|
|
|
|
|
|
|
metrics = self._calculate_code_metrics(tree) |
|
|
|
|
|
|
|
|
static_analysis = self._run_static_analysis(code, language) |
|
|
|
|
|
|
|
|
issues = self._check_common_issues(tree) |
|
|
|
|
|
return { |
|
|
"is_valid": not issues, |
|
|
"metrics": metrics, |
|
|
"static_analysis": static_analysis, |
|
|
"issues": issues, |
|
|
"validation_score": self._calculate_validation_score(metrics, issues) |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"is_valid": False, |
|
|
"error": str(e), |
|
|
"validation_score": 0.0 |
|
|
} |
|
|
|
|
|
def _calculate_code_metrics(self, tree: ast.AST) -> Dict[str, Any]: |
|
|
"""Calculate various code metrics""" |
|
|
return { |
|
|
"complexity": self._calculate_complexity(tree), |
|
|
"num_functions": len([node for node in ast.walk(tree) if isinstance(node, ast.FunctionDef)]), |
|
|
"num_classes": len([node for node in ast.walk(tree) if isinstance(node, ast.ClassDef)]), |
|
|
"num_imports": len([node for node in ast.walk(tree) if isinstance(node, ast.Import)]), |
|
|
"num_statements": len([node for node in ast.walk(tree) if isinstance(node, ast.stmt)]) |
|
|
} |
|
|
|
|
|
def _calculate_complexity(self, tree: ast.AST) -> int: |
|
|
"""Calculate cyclomatic complexity""" |
|
|
complexity = 1 |
|
|
for node in ast.walk(tree): |
|
|
if isinstance(node, (ast.If, ast.For, ast.While, ast.Try, ast.ExceptHandler)): |
|
|
complexity += 1 |
|
|
return complexity |
|
|
|
|
|
def _run_static_analysis(self, code: str, language: str) -> Dict[str, Any]: |
|
|
"""Run static analysis tools""" |
|
|
if language == "python": |
|
|
try: |
|
|
|
|
|
process = subprocess.run( |
|
|
["pylint", "-"], |
|
|
input=code, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=5 |
|
|
) |
|
|
score = float(process.stdout.split("Your code has been rated at")[1].split()[0]) |
|
|
return { |
|
|
"pylint_score": score, |
|
|
"issues": process.stdout.count("error") |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"pylint_score": 0.0, |
|
|
"error": str(e) |
|
|
} |
|
|
return {} |
|
|
|
|
|
def _check_common_issues(self, tree: ast.AST) -> List[str]: |
|
|
"""Check for common code issues""" |
|
|
issues = [] |
|
|
|
|
|
|
|
|
for node in ast.walk(tree): |
|
|
if isinstance(node, ast.Global): |
|
|
issues.append("Global variables detected") |
|
|
|
|
|
|
|
|
for node in ast.walk(tree): |
|
|
if isinstance(node, ast.FunctionDef): |
|
|
if len(node.body) > 50: |
|
|
issues.append(f"Function {node.name} is too long") |
|
|
|
|
|
|
|
|
for node in ast.walk(tree): |
|
|
if isinstance(node, ast.If): |
|
|
if len(node.body) > 20: |
|
|
issues.append("Complex if statement detected") |
|
|
|
|
|
return issues |
|
|
|
|
|
def _calculate_validation_score(self, metrics: Dict[str, Any], issues: List[str]) -> float: |
|
|
"""Calculate overall validation score""" |
|
|
score = 1.0 |
|
|
|
|
|
|
|
|
score *= 0.9 if metrics["complexity"] > 10 else 1.0 |
|
|
|
|
|
|
|
|
score *= 0.9 ** len(issues) |
|
|
|
|
|
return max(0.0, min(1.0, score)) |
|
|
|
|
|
def create_checkpoint(self, data: Dict[str, Any], name: str = None) -> str: |
|
|
"""Create a checkpoint of validation data""" |
|
|
if name is None: |
|
|
name = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
|
|
|
checkpoint_path = self.validation_dir / f"checkpoint_{name}.json" |
|
|
|
|
|
|
|
|
data["timestamp"] = str(datetime.datetime.now()) |
|
|
data["hash"] = hashlib.sha256(str(data).encode()).hexdigest() |
|
|
|
|
|
with open(checkpoint_path, 'w') as f: |
|
|
json.dump(data, f, indent=2) |
|
|
|
|
|
return str(checkpoint_path) |
|
|
|
|
|
def load_checkpoint(self, name: str) -> Optional[Dict[str, Any]]: |
|
|
"""Load a validation checkpoint""" |
|
|
checkpoint_path = self.validation_dir / f"checkpoint_{name}.json" |
|
|
if not checkpoint_path.exists(): |
|
|
return None |
|
|
|
|
|
with open(checkpoint_path, 'r') as f: |
|
|
return json.load(f) |
|
|
|
|
|
def validate_dataset(self, dataset: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
|
"""Validate a complete dataset""" |
|
|
results = [] |
|
|
error_count = 0 |
|
|
|
|
|
for idx, example in enumerate(dataset): |
|
|
try: |
|
|
|
|
|
if "code" in example: |
|
|
code_result = self.validate_code( |
|
|
example["code"], |
|
|
example.get("language", "python") |
|
|
) |
|
|
results.append(code_result) |
|
|
|
|
|
|
|
|
if "review" in example: |
|
|
review_result = self._validate_code_review( |
|
|
example["code"], |
|
|
example["review"] |
|
|
) |
|
|
results.append(review_result) |
|
|
except Exception as e: |
|
|
error_count += 1 |
|
|
results.append({ |
|
|
"error": str(e), |
|
|
"validation_score": 0.0 |
|
|
}) |
|
|
|
|
|
|
|
|
scores = [r["validation_score"] for r in results if "validation_score" in r] |
|
|
if scores: |
|
|
avg_score = np.mean(scores) |
|
|
else: |
|
|
avg_score = 0.0 |
|
|
|
|
|
return { |
|
|
"total_examples": len(dataset), |
|
|
"processed_examples": len(results), |
|
|
"error_count": error_count, |
|
|
"average_score": float(avg_score), |
|
|
"detailed_results": results |
|
|
} |
|
|
|
|
|
def save_validation_report(self, report: Dict[str, Any], name: str = None) -> str: |
|
|
"""Save a validation report""" |
|
|
if name is None: |
|
|
name = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
|
|
|
report_path = self.validation_dir / f"report_{name}.json" |
|
|
|
|
|
|
|
|
report["timestamp"] = str(datetime.datetime.now()) |
|
|
report["summary"] = { |
|
|
"accuracy": report.get("average_score", 0.0), |
|
|
"error_rate": report.get("error_count", 0) / report.get("total_examples", 1) |
|
|
} |
|
|
|
|
|
with open(report_path, 'w') as f: |
|
|
json.dump(report, f, indent=2) |
|
|
|
|
|
return str(report_path) |
|
|
|
|
|
def _validate_code_review(self, code: str, review: str) -> Dict[str, Any]: |
|
|
"""Validate code review comments""" |
|
|
try: |
|
|
|
|
|
code_result = self.validate_code(code) |
|
|
|
|
|
|
|
|
issues = self._check_common_issues(ast.parse(code)) |
|
|
review_issues = [issue for issue in issues if issue.lower() in review.lower()] |
|
|
|
|
|
return { |
|
|
"is_valid": len(review_issues) > 0, |
|
|
"review_issues_covered": len(review_issues), |
|
|
"total_issues": len(issues), |
|
|
"validation_score": code_result["validation_score"] * (len(review_issues) / len(issues) if issues else 1.0) |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"is_valid": False, |
|
|
"error": str(e), |
|
|
"validation_score": 0.0 |
|
|
} |
|
|
|