KI-Agent / agent /analyzer.py
Astridkraft's picture
Update agent/analyzer.py
74142ea verified
import ast
import sys
import datetime
from pathlib import Path
from collections import Counter
from config.debug_config import DEBUG_MODULES
from config.paths_config import PROJECTS_PATH
#print("🔥🔥🔥 analyzer.py WURDE GELADEN", flush=True)
#print(f"🔥 __name__ in analyzer.py = {__name__}", flush=True)
class CodeAnalyzer:
def __init__(self, dry_run=True):
self.dry_run = dry_run
self._log(__name__, f"🔍 CodeAnalyzer initialisiert (dry_run={dry_run})")
def _log(self, module, message):
"""Debug-Ausgabe mit Timestamp und Modulsteuerung"""
if DEBUG_MODULES.get(module, False):
timestamp = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
log_line = f"📝 [{module} {timestamp}] {message}"
print(log_line, file=sys.stderr, flush=True)
def analyze_file(self, filepath):
"""
Analysiert eine Python-Datei und liefert Metriken und Optimierungsvorschläge.
Args:
filepath: Pfad zur Datei (z.B. 'agent/core.py')
Returns:
str: Formatierter Analysebericht
"""
self._log(__name__, f"🔍 Analysiere: {filepath}")
source_path = Path(filepath)
if not source_path.exists():
return f"❌ Datei nicht gefunden: {source_path}"
with open(source_path, 'r') as f:
content = f.read()
try:
tree = ast.parse(content)
except SyntaxError as e:
return f"❌ Syntax-Fehler in {filepath}: {e}"
# Metriken sammeln
metrics = {
"file": str(source_path),
"lines": len(content.split('\n')),
"functions": [],
"classes": [],
"imports": [],
"function_calls": [],
"complexity": {},
"dangerous": [] # NEU: für fehleranfällige Konstrukte
}
# AST durchlaufen
for node in ast.walk(tree):
# Funktionen
if isinstance(node, ast.FunctionDef):
func_info = self._analyze_function(node)
metrics["functions"].append(func_info)
metrics["complexity"][node.name] = func_info["complexity"]
# Klassen
elif isinstance(node, ast.ClassDef):
class_info = self._analyze_class(node)
metrics["classes"].append(class_info)
# Importe
elif isinstance(node, (ast.Import, ast.ImportFrom)):
metrics["imports"].append(ast.unparse(node))
# Funktionsaufrufe
elif isinstance(node, ast.Call) and hasattr(node.func, 'id'):
metrics["function_calls"].append(node.func.id)
# 1.except: (ohne Fehlertyp)- das ist ein exception-Block:isinstance(node, ast.ExceptHandler)
if isinstance(node, ast.ExceptHandler) and node.type is None:
metrics["dangerous"].append({
"type": "bare_except",
"line": node.lineno,
"msg": "Nacktes except: fängt ALLE Fehler (auch KeyboardInterrupt!)"
})
# 2. eval() / exec() Aufrufe
elif isinstance(node, ast.Call) and hasattr(node.func, 'id'):
if node.func.id in ['eval', 'exec']:
metrics["dangerous"].append({
"type": "eval_exec",
"line": node.lineno,
"msg": f"{node.func.id}() ist ein Sicherheitsrisiko"
})
# 3. Zu tiefe Verschachtelung (in Funktionen)- ist ein empfohlener Richtwert!
elif isinstance(node, ast.FunctionDef):
nesting_depth = self._get_nesting_depth(node)
if nesting_depth > 4:
metrics["dangerous"].append({
"type": "deep_nesting",
"line": node.lineno,
"msg": f"Funktion '{node.name}' hat {nesting_depth} Verschachtelungsebenen (>4)"
})
# 4. Rekursion ohne Abbruch erkennen - Rekursion=eine Funktion ruft sich immer wieder selbst auf! -> Unendlich-Schleife!
elif isinstance(node, ast.FunctionDef):
if self._has_uncontrolled_recursion(node):
metrics["dangerous"].append({
"type": "uncontrolled_recursion",
"line": node.lineno,
"msg": f"Funktion '{node.name}' ruft sich selbst auf – fehlt ein Abbruch?"
})
# 5. Global-Variablen in Funktionen
elif isinstance(node, ast.Global):
metrics["dangerous"].append({
"type": "global_var",
"line": node.lineno,
"msg": f"Global-Variable '{node.names}' – Seiteneffekte möglich"
})
# Duplikate finden
duplicates = self._find_duplicates(metrics["functions"])
# Bericht erstellen
return self._generate_report(metrics, duplicates)
def _get_nesting_depth(self, node, current_depth=0):
"""Berechnet die maximale Verschachtelungstiefe in einer Funktion"""
max_depth = current_depth
for child in ast.iter_child_nodes(node):
if isinstance(child, (ast.If, ast.For, ast.While, ast.With, ast.Try)):
child_depth = self._get_nesting_depth(child, current_depth + 1)
max_depth = max(max_depth, child_depth)
else:
child_depth = self._get_nesting_depth(child, current_depth)
max_depth = max(max_depth, child_depth)
return max_depth
def _has_uncontrolled_recursion(self, node):
"""Prüft grob, ob eine Funktion sich selbst aufruft ohne offensichtlichen Abbruch"""
has_self_call = False
has_return = False
for child in ast.walk(node):
if isinstance(child, ast.Call) and hasattr(child.func, 'id'):
if child.func.id == node.name:
has_self_call = True
elif isinstance(child, ast.Return):
has_return = True
# Wenn Selbstaufruf, aber kein Return? Vereinfachte Warnung
return has_self_call and not has_return
def _analyze_function(self, node):
"""Analysiert eine einzelne Funktion"""
# Zeilen zählen
start_line = node.lineno
end_line = max(getattr(node, 'end_lineno', start_line), start_line)
line_count = end_line - start_line + 1
# Zyklomatische Komplexität berechnen (Anzahl der Entscheidungspfade)
complexity = 1 # Basis
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1 # and/or erhöhen Komplexität
# Parameter zählen
arg_count = len(node.args.args)
return {
"name": node.name,
"line": start_line,
"lines": line_count,
"args": arg_count,
"complexity": complexity,
"docstring": ast.get_docstring(node) is not None
}
def _analyze_class(self, node):
"""Analysiert eine einzelne Klasse"""
methods = []
for item in node.body:
if isinstance(item, ast.FunctionDef):
methods.append(self._analyze_function(item))
return {
"name": node.name,
"line": node.lineno,
"methods": len(methods),
"method_list": methods
}
# Erkennung von Funktionen mit gleicher Zeilenanzahl
def _find_duplicates(self, functions):
"""Findet ähnliche Code-Strukturen (einfache Version)"""
# Gruppiere nach Zeilenanzahl (grobe Duplikatserkennung)
by_lines = {}
for func in functions:
line_key = func["lines"]
if line_key not in by_lines:
by_lines[line_key] = []
by_lines[line_key].append(func["name"])
duplicates = []
for line_count, names in by_lines.items():
if len(names) > 1:
duplicates.append({
"lines": line_count,
"functions": names,
"reason": f"{len(names)} Funktionen mit gleicher Zeilenanzahl ({line_count})"
})
return duplicates
def _generate_report(self, metrics, duplicates):
"""Erstellt einen lesbaren Analysebericht"""
# Prioritäten berechnen
critical = []
medium = []
low = []
evaluated_funcs = set() #merkt sich welche Funktionen bereits bewertet wurden
# 1. Zuerst: Fehleranfällige Konstrukte (immer kritisch!)
for danger in metrics["dangerous"]:
critical.append(f"⚫ {danger['msg']} (Zeile {danger['line']})")
# 2. Funktionen bewerten
for func in metrics["functions"]:
# Kritisch: hohe Komplexität oder sehr lang
if func["complexity"] > 10:
critical.append(f"🔴 Funktion '{func['name']}' (Zeile {func['line']}): Komplexität {func['complexity']} > 10")
evaluated_funcs.add(func['name'])
elif func["lines"] > 50:
critical.append(f"🔴 Funktion '{func['name']}' (Zeile {func['line']}): {func['lines']} Zeilen > 50")
evaluated_funcs.add(func['name'])
# Mittel: mittlere Komplexität oder viele Parameter
elif func["complexity"] > 5:
medium.append(f"🟡 Funktion '{func['name']}': Komplexität {func['complexity']}")
evaluated_funcs.add(func['name'])
elif func["args"] > 5:
medium.append(f"🟡 Funktion '{func['name']}': {func['args']} Parameter > 5")
evaluated_funcs.add(func['name'])
elif not func["docstring"]:
medium.append(f"🟡 Funktion '{func['name']}': Fehlender Docstring")
evaluated_funcs.add(func['name'])
# Niedrig: kleine Optimierungen
else:
if func["lines"] > 30:
low.append(f"🟢 Funktion '{func['name']}': {func['lines']} Zeilen (könnte kürzer sein)")
evaluated_funcs.add(func['name'])
# 3. Klassen bewerten
for cls in metrics["classes"]:
if cls["methods"] > 10:
medium.append(f"🟡 Klasse '{cls['name']}': {cls['methods']} Methoden (vielleicht zu groß)")
# 4. Duplikate bewerten (auch kritisch)
for dup in duplicates:
critical.append(f"🔴 Duplikat: {dup['reason']}")
self._log(__name__, f"🔍 GEFUNDENE FUNKTIONEN: {[f['name'] for f in metrics['functions']]}")
neutral_funcs = []
for func in metrics["functions"]:
if func['name'] not in evaluated_funcs:
neutral_funcs.append(func['name'])
# Bericht zusammenbauen
report = [
f"\n📊 **Code-Analyse für: {metrics['file']}**",
f" 📄 Zeilen: {metrics['lines']}",
f" 📦 Funktionen: {len(metrics['functions'])}",
f" 🏛️ Klassen: {len(metrics['classes'])}",
f" 📥 Importe: {len(metrics['imports'])}",
f" 📞 Funktionsaufrufe: {len(set(metrics['function_calls']))} verschiedene\n"
]
if critical:
report.append("⚫🔴 **KRITISCHE OPTIMIERUNGEN (höchste Priorität):**")
report.extend(critical)
report.append("")
if medium:
report.append("🟡 **MITTELPRIORITÄT:**")
report.extend(medium)
report.append("")
if low:
report.append("🟢 **NIEDRIGE PRIORITÄT:**")
report.extend(low)
report.append("")
# Neutrale Funktionen mit grauem Punkt ---
if neutral_funcs:
report.append("⚪ **WEITERE ENTHALTENE FUNKTIONEN (neutral):**")
for func_name in neutral_funcs:
report.append(f"⚪ `{func_name}`")
report.append("")
if not critical and not medium and not low:
report.append("✅ **Keine Optimierungen notwendig – sauberer Code!**")
return "\n".join(report)
def suggest_refactoring(self, filepath, function_name=None):
"""
Gibt konkrete Refactoring-Vorschläge für eine Datei oder Funktion.
"""
report = self.analyze_file(filepath)
self._log(__name__, f"🔍 suggest_refactoring: function_name={function_name}")
# Wenn nur eine Funktion interessiert, filtern
if function_name:
lines = report.split('\n')
filtered = [report.split('\n')[0]] # Header behalten
for line in lines:
if function_name in line:
filtered.append(line)
return "\n".join(filtered)
return report