Spaces:
Sleeping
Sleeping
| import ast | |
| import sys | |
| import datetime | |
| from pathlib import Path | |
| from collections import Counter | |
| from config.debug_config import DEBUG_MODULES | |
| from config.paths_config import PROJECTS_PATH | |
| #print("🔥🔥🔥 analyzer.py WURDE GELADEN", flush=True) | |
| #print(f"🔥 __name__ in analyzer.py = {__name__}", flush=True) | |
| class CodeAnalyzer: | |
| def __init__(self, dry_run=True): | |
| self.dry_run = dry_run | |
| self._log(__name__, f"🔍 CodeAnalyzer initialisiert (dry_run={dry_run})") | |
| def _log(self, module, message): | |
| """Debug-Ausgabe mit Timestamp und Modulsteuerung""" | |
| if DEBUG_MODULES.get(module, False): | |
| timestamp = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3] | |
| log_line = f"📝 [{module} {timestamp}] {message}" | |
| print(log_line, file=sys.stderr, flush=True) | |
| def analyze_file(self, filepath): | |
| """ | |
| Analysiert eine Python-Datei und liefert Metriken und Optimierungsvorschläge. | |
| Args: | |
| filepath: Pfad zur Datei (z.B. 'agent/core.py') | |
| Returns: | |
| str: Formatierter Analysebericht | |
| """ | |
| self._log(__name__, f"🔍 Analysiere: {filepath}") | |
| source_path = Path(filepath) | |
| if not source_path.exists(): | |
| return f"❌ Datei nicht gefunden: {source_path}" | |
| with open(source_path, 'r') as f: | |
| content = f.read() | |
| try: | |
| tree = ast.parse(content) | |
| except SyntaxError as e: | |
| return f"❌ Syntax-Fehler in {filepath}: {e}" | |
| # Metriken sammeln | |
| metrics = { | |
| "file": str(source_path), | |
| "lines": len(content.split('\n')), | |
| "functions": [], | |
| "classes": [], | |
| "imports": [], | |
| "function_calls": [], | |
| "complexity": {}, | |
| "dangerous": [] # NEU: für fehleranfällige Konstrukte | |
| } | |
| # AST durchlaufen | |
| for node in ast.walk(tree): | |
| # Funktionen | |
| if isinstance(node, ast.FunctionDef): | |
| func_info = self._analyze_function(node) | |
| metrics["functions"].append(func_info) | |
| metrics["complexity"][node.name] = func_info["complexity"] | |
| # Klassen | |
| elif isinstance(node, ast.ClassDef): | |
| class_info = self._analyze_class(node) | |
| metrics["classes"].append(class_info) | |
| # Importe | |
| elif isinstance(node, (ast.Import, ast.ImportFrom)): | |
| metrics["imports"].append(ast.unparse(node)) | |
| # Funktionsaufrufe | |
| elif isinstance(node, ast.Call) and hasattr(node.func, 'id'): | |
| metrics["function_calls"].append(node.func.id) | |
| # 1.except: (ohne Fehlertyp)- das ist ein exception-Block:isinstance(node, ast.ExceptHandler) | |
| if isinstance(node, ast.ExceptHandler) and node.type is None: | |
| metrics["dangerous"].append({ | |
| "type": "bare_except", | |
| "line": node.lineno, | |
| "msg": "Nacktes except: fängt ALLE Fehler (auch KeyboardInterrupt!)" | |
| }) | |
| # 2. eval() / exec() Aufrufe | |
| elif isinstance(node, ast.Call) and hasattr(node.func, 'id'): | |
| if node.func.id in ['eval', 'exec']: | |
| metrics["dangerous"].append({ | |
| "type": "eval_exec", | |
| "line": node.lineno, | |
| "msg": f"{node.func.id}() ist ein Sicherheitsrisiko" | |
| }) | |
| # 3. Zu tiefe Verschachtelung (in Funktionen)- ist ein empfohlener Richtwert! | |
| elif isinstance(node, ast.FunctionDef): | |
| nesting_depth = self._get_nesting_depth(node) | |
| if nesting_depth > 4: | |
| metrics["dangerous"].append({ | |
| "type": "deep_nesting", | |
| "line": node.lineno, | |
| "msg": f"Funktion '{node.name}' hat {nesting_depth} Verschachtelungsebenen (>4)" | |
| }) | |
| # 4. Rekursion ohne Abbruch erkennen - Rekursion=eine Funktion ruft sich immer wieder selbst auf! -> Unendlich-Schleife! | |
| elif isinstance(node, ast.FunctionDef): | |
| if self._has_uncontrolled_recursion(node): | |
| metrics["dangerous"].append({ | |
| "type": "uncontrolled_recursion", | |
| "line": node.lineno, | |
| "msg": f"Funktion '{node.name}' ruft sich selbst auf – fehlt ein Abbruch?" | |
| }) | |
| # 5. Global-Variablen in Funktionen | |
| elif isinstance(node, ast.Global): | |
| metrics["dangerous"].append({ | |
| "type": "global_var", | |
| "line": node.lineno, | |
| "msg": f"Global-Variable '{node.names}' – Seiteneffekte möglich" | |
| }) | |
| # Duplikate finden | |
| duplicates = self._find_duplicates(metrics["functions"]) | |
| # Bericht erstellen | |
| return self._generate_report(metrics, duplicates) | |
| def _get_nesting_depth(self, node, current_depth=0): | |
| """Berechnet die maximale Verschachtelungstiefe in einer Funktion""" | |
| max_depth = current_depth | |
| for child in ast.iter_child_nodes(node): | |
| if isinstance(child, (ast.If, ast.For, ast.While, ast.With, ast.Try)): | |
| child_depth = self._get_nesting_depth(child, current_depth + 1) | |
| max_depth = max(max_depth, child_depth) | |
| else: | |
| child_depth = self._get_nesting_depth(child, current_depth) | |
| max_depth = max(max_depth, child_depth) | |
| return max_depth | |
| def _has_uncontrolled_recursion(self, node): | |
| """Prüft grob, ob eine Funktion sich selbst aufruft ohne offensichtlichen Abbruch""" | |
| has_self_call = False | |
| has_return = False | |
| for child in ast.walk(node): | |
| if isinstance(child, ast.Call) and hasattr(child.func, 'id'): | |
| if child.func.id == node.name: | |
| has_self_call = True | |
| elif isinstance(child, ast.Return): | |
| has_return = True | |
| # Wenn Selbstaufruf, aber kein Return? Vereinfachte Warnung | |
| return has_self_call and not has_return | |
| def _analyze_function(self, node): | |
| """Analysiert eine einzelne Funktion""" | |
| # Zeilen zählen | |
| start_line = node.lineno | |
| end_line = max(getattr(node, 'end_lineno', start_line), start_line) | |
| line_count = end_line - start_line + 1 | |
| # Zyklomatische Komplexität berechnen (Anzahl der Entscheidungspfade) | |
| complexity = 1 # Basis | |
| for child in ast.walk(node): | |
| if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)): | |
| complexity += 1 | |
| elif isinstance(child, ast.BoolOp): | |
| complexity += len(child.values) - 1 # and/or erhöhen Komplexität | |
| # Parameter zählen | |
| arg_count = len(node.args.args) | |
| return { | |
| "name": node.name, | |
| "line": start_line, | |
| "lines": line_count, | |
| "args": arg_count, | |
| "complexity": complexity, | |
| "docstring": ast.get_docstring(node) is not None | |
| } | |
| def _analyze_class(self, node): | |
| """Analysiert eine einzelne Klasse""" | |
| methods = [] | |
| for item in node.body: | |
| if isinstance(item, ast.FunctionDef): | |
| methods.append(self._analyze_function(item)) | |
| return { | |
| "name": node.name, | |
| "line": node.lineno, | |
| "methods": len(methods), | |
| "method_list": methods | |
| } | |
| # Erkennung von Funktionen mit gleicher Zeilenanzahl | |
| def _find_duplicates(self, functions): | |
| """Findet ähnliche Code-Strukturen (einfache Version)""" | |
| # Gruppiere nach Zeilenanzahl (grobe Duplikatserkennung) | |
| by_lines = {} | |
| for func in functions: | |
| line_key = func["lines"] | |
| if line_key not in by_lines: | |
| by_lines[line_key] = [] | |
| by_lines[line_key].append(func["name"]) | |
| duplicates = [] | |
| for line_count, names in by_lines.items(): | |
| if len(names) > 1: | |
| duplicates.append({ | |
| "lines": line_count, | |
| "functions": names, | |
| "reason": f"{len(names)} Funktionen mit gleicher Zeilenanzahl ({line_count})" | |
| }) | |
| return duplicates | |
| def _generate_report(self, metrics, duplicates): | |
| """Erstellt einen lesbaren Analysebericht""" | |
| # Prioritäten berechnen | |
| critical = [] | |
| medium = [] | |
| low = [] | |
| evaluated_funcs = set() #merkt sich welche Funktionen bereits bewertet wurden | |
| # 1. Zuerst: Fehleranfällige Konstrukte (immer kritisch!) | |
| for danger in metrics["dangerous"]: | |
| critical.append(f"⚫ {danger['msg']} (Zeile {danger['line']})") | |
| # 2. Funktionen bewerten | |
| for func in metrics["functions"]: | |
| # Kritisch: hohe Komplexität oder sehr lang | |
| if func["complexity"] > 10: | |
| critical.append(f"🔴 Funktion '{func['name']}' (Zeile {func['line']}): Komplexität {func['complexity']} > 10") | |
| evaluated_funcs.add(func['name']) | |
| elif func["lines"] > 50: | |
| critical.append(f"🔴 Funktion '{func['name']}' (Zeile {func['line']}): {func['lines']} Zeilen > 50") | |
| evaluated_funcs.add(func['name']) | |
| # Mittel: mittlere Komplexität oder viele Parameter | |
| elif func["complexity"] > 5: | |
| medium.append(f"🟡 Funktion '{func['name']}': Komplexität {func['complexity']}") | |
| evaluated_funcs.add(func['name']) | |
| elif func["args"] > 5: | |
| medium.append(f"🟡 Funktion '{func['name']}': {func['args']} Parameter > 5") | |
| evaluated_funcs.add(func['name']) | |
| elif not func["docstring"]: | |
| medium.append(f"🟡 Funktion '{func['name']}': Fehlender Docstring") | |
| evaluated_funcs.add(func['name']) | |
| # Niedrig: kleine Optimierungen | |
| else: | |
| if func["lines"] > 30: | |
| low.append(f"🟢 Funktion '{func['name']}': {func['lines']} Zeilen (könnte kürzer sein)") | |
| evaluated_funcs.add(func['name']) | |
| # 3. Klassen bewerten | |
| for cls in metrics["classes"]: | |
| if cls["methods"] > 10: | |
| medium.append(f"🟡 Klasse '{cls['name']}': {cls['methods']} Methoden (vielleicht zu groß)") | |
| # 4. Duplikate bewerten (auch kritisch) | |
| for dup in duplicates: | |
| critical.append(f"🔴 Duplikat: {dup['reason']}") | |
| self._log(__name__, f"🔍 GEFUNDENE FUNKTIONEN: {[f['name'] for f in metrics['functions']]}") | |
| neutral_funcs = [] | |
| for func in metrics["functions"]: | |
| if func['name'] not in evaluated_funcs: | |
| neutral_funcs.append(func['name']) | |
| # Bericht zusammenbauen | |
| report = [ | |
| f"\n📊 **Code-Analyse für: {metrics['file']}**", | |
| f" 📄 Zeilen: {metrics['lines']}", | |
| f" 📦 Funktionen: {len(metrics['functions'])}", | |
| f" 🏛️ Klassen: {len(metrics['classes'])}", | |
| f" 📥 Importe: {len(metrics['imports'])}", | |
| f" 📞 Funktionsaufrufe: {len(set(metrics['function_calls']))} verschiedene\n" | |
| ] | |
| if critical: | |
| report.append("⚫🔴 **KRITISCHE OPTIMIERUNGEN (höchste Priorität):**") | |
| report.extend(critical) | |
| report.append("") | |
| if medium: | |
| report.append("🟡 **MITTELPRIORITÄT:**") | |
| report.extend(medium) | |
| report.append("") | |
| if low: | |
| report.append("🟢 **NIEDRIGE PRIORITÄT:**") | |
| report.extend(low) | |
| report.append("") | |
| # Neutrale Funktionen mit grauem Punkt --- | |
| if neutral_funcs: | |
| report.append("⚪ **WEITERE ENTHALTENE FUNKTIONEN (neutral):**") | |
| for func_name in neutral_funcs: | |
| report.append(f"⚪ `{func_name}`") | |
| report.append("") | |
| if not critical and not medium and not low: | |
| report.append("✅ **Keine Optimierungen notwendig – sauberer Code!**") | |
| return "\n".join(report) | |
| def suggest_refactoring(self, filepath, function_name=None): | |
| """ | |
| Gibt konkrete Refactoring-Vorschläge für eine Datei oder Funktion. | |
| """ | |
| report = self.analyze_file(filepath) | |
| self._log(__name__, f"🔍 suggest_refactoring: function_name={function_name}") | |
| # Wenn nur eine Funktion interessiert, filtern | |
| if function_name: | |
| lines = report.split('\n') | |
| filtered = [report.split('\n')[0]] # Header behalten | |
| for line in lines: | |
| if function_name in line: | |
| filtered.append(line) | |
| return "\n".join(filtered) | |
| return report |