Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| from pathlib import Path | |
| from agent.parser import LogParser | |
| from agent.memory import MemoryStore | |
| from agent.file_patcher import CodePatcher | |
| from agent.modular_patcher import ModularPatcher | |
| from agent.project_patcher import ProjectPatcher | |
| from agent.analyzer import CodeAnalyzer | |
| from config.debug_config import DEBUG_MODULES | |
| from config.paths_config import PROJECTS_PATH, MEMORY_PATH | |
| #print("🔥🔥🔥 core.py WURDE GELADEN", flush=True) | |
| #print(f"🔥 __name__ in core.py = {__name__}", flush=True) | |
| class CodeAgent: | |
| # dummy_mode=True wenn Antworten nur von Dummy-Text kommen, wenn Antworten von LLM-False | |
| # Patcher und Parser sind Algorithmen die man einfach über die Python-Bibliothek aufrufen kann! | |
| def __init__(self, api_key=None, dummy_mode=True, project="default"): | |
| self.dummy_mode = dummy_mode | |
| self.api_key = api_key | |
| self.project = project # Projektname für Dateipfade | |
| self.project_path = PROJECTS_PATH / project | |
| # Stelle sicher, dass Projektverzeichnis existiert | |
| self.project_path.mkdir(parents=True, exist_ok=True) | |
| # Memory mit eigenem Pfad | |
| memory_file = MEMORY_PATH / "agent_memory.json" | |
| self.memory = MemoryStore(memory_file=str(memory_file)) | |
| self.parser = LogParser() | |
| # Patcher initialisieren | |
| self.file_patcher = CodePatcher(dry_run=False) #bei False wird Inhalt der Datei abgeändert! | |
| self.modular_patcher = ModularPatcher(dry_run=True) | |
| self.project_patcher = ProjectPatcher(dry_run=True) | |
| # Letzten Vorschlag merken für Ja/Nein | |
| self.last_suggestion = None | |
| self._log(__name__, "✅ CodeAgent initialisiert") | |
| def _resolve_project_path(self, filename): | |
| """Wandelt Dateinamen in absoluten Pfad unter /data/projekte/projekt/ um""" | |
| full_path = self.project_path / filename | |
| self._log(__name__, f"🔍 _resolve_project_path: {filename} → {full_path}") | |
| self._log(__name__, f" Existiert? {full_path.exists()}") | |
| return full_path | |
| def _log(self, module, message): | |
| """Zentrale Logging-Funktion - steuerbar über DEBUG_MODULES""" | |
| if DEBUG_MODULES.get(module, False): | |
| print(f"📝 [{module}] {message}") | |
| def analyze_input(self, user_input): | |
| """Bestimmt Typ des Problems""" | |
| self._log(__name__, f"analyze_input: {user_input[:50]}...") | |
| user_input = user_input.lower() | |
| if ".py" in user_input or "import" in user_input or "syntax" in user_input: | |
| return "code_error" | |
| elif "log" in user_input or "traceback" in user_input or "exception" in user_input: | |
| return "runtime_error" | |
| elif "gradio" in user_input or "ui" in user_input or "button" in user_input: | |
| return "ui_issue" | |
| elif "install" in user_input or "pip" in user_input or "requirements" in user_input: | |
| return "dependency" | |
| else: | |
| return "general" | |
| def try_autonomous_fix(self, message, problem_type): | |
| """Sucht nach bekannten Mustern in Memory""" | |
| self._log(__name__, f"try_autonomous_fix: {message[:50]}...") | |
| matches = self.memory.find_similar(message) | |
| if matches: | |
| best_match = matches[0] | |
| confidence = best_match["confidence"] | |
| solution = best_match["solution"] | |
| self._log(__name__, f"✅ Treffer! Confidence={confidence:.3f}") | |
| return solution, confidence | |
| else: | |
| self._log(__name__, "❌ Kein Treffer in Memory") | |
| return "Kein bekanntes Muster gefunden.", 0.0 | |
| def generate_options(self, message, problem_type): | |
| """Generiert 3 Lösungsvorschläge (Dummy)""" | |
| self._log(__name__, f"generate_options: {problem_type}") | |
| options = { | |
| "code_error": [ | |
| "1. Fehlende Imports prüfen und hinzufügen", | |
| "2. Syntax-Fehler in Zeile X korrigieren", | |
| "3. Try/Except-Block einbauen" | |
| ], | |
| "runtime_error": [ | |
| "1. Log-Datei auf Nullpointer prüfen", | |
| "2. Umgebungsvariablen checken", | |
| "3. Fallback-Wert definieren" | |
| ], | |
| "ui_issue": [ | |
| "1. Gradio-Version aktualisieren", | |
| "2. Event-Handler korrigieren", | |
| "3. Output-Komponente austauschen" | |
| ], | |
| "dependency": [ | |
| "1. requirements.txt aktualisieren", | |
| "2. Python-Version prüfen", | |
| "3. Virtualenv neu erstellen" | |
| ], | |
| "general": [ | |
| "1. Code-Struktur überprüfen", | |
| "2. Dokumentation konsultieren", | |
| "3. StackOverflow suchen" | |
| ] | |
| } | |
| opts = options.get(problem_type, options["general"]) | |
| return "\n".join(opts) | |
| def handle_message(self, message, history): | |
| """Zentrale Nachrichtenverarbeitung - WIRD VON app.py GERUFEN""" | |
| self._log(__name__, f"handle_message: '{message[:50]}...'") | |
| # --- Memory-Befehl? --- | |
| if message.lower().startswith("!memory "): | |
| self._log(__name__, "🧠 Memory-Befehl erkannt") | |
| return self._handle_memory_command(message) | |
| # --- Patch-Befehl? --- | |
| if message.lower().startswith("!patch "): | |
| self._log(__name__, "✅ Patch-Befehl erkannt") | |
| return self._handle_patch_command(message) | |
| # --- Modular-Befehl --- | |
| if message.lower().startswith("!modular "): | |
| self._log(__name__, "🧩 Modular-Befehl erkannt") | |
| return self._handle_modular_command(message) | |
| # --- Analyze-Befehl? --- | |
| if message.lower().startswith("!analyse"): | |
| self._log(__name__, "🔍 Analyze-Befehl erkannt") | |
| return self._handle_analyze_command(message) | |
| # --- 2. Antwort auf Validierung? --- | |
| if history and len(history) > 0: | |
| last_response = history[-1][0] if isinstance(history[-1], tuple) else history[-1] | |
| self._log(__name__, f"Letzte Antwort: '{last_response[:50]}...'") | |
| if "ja" in message.lower() and "[VALIDIERUNG]" in last_response: | |
| self._log(__name__, "✅ 'ja' auf Validierung erkannt") | |
| return self._execute_last_suggestion() | |
| elif "nein" in message.lower() and "[VALIDIERUNG]" in last_response: | |
| self._log(__name__, "⏭️ 'nein' auf Validierung erkannt") | |
| self.last_suggestion = None | |
| return "⏭️ OK, nichts geändert." | |
| # --- 3. Normaler Modus --- | |
| self._log(__name__, "Normaler Modus") | |
| problem_type = self.analyze_input(message) | |
| solution, confidence = self.try_autonomous_fix(message, problem_type) | |
| if confidence > 0.95: | |
| self._log(__name__, f"🤖 Autonom (confidence={confidence:.3f})") | |
| return f"🤖 [AUTONOM] {solution}" | |
| elif confidence > 0.70: | |
| self._log(__name__, f"✅ Validierung (confidence={confidence:.3f})") | |
| # Vorschlag merken für spätere Bestätigung-Daten werden nur im RAM gehalten(Zwischenspeicher)! | |
| self.last_suggestion = { | |
| "solution": solution, | |
| "file": "app.py", | |
| "line": 42, | |
| "content": "# Fixed by Agent" | |
| } | |
| return f"✅ [VALIDIERUNG] Ich würde vorschlagen: {solution}\n\nSoll ich das umsetzen? (ja/nein)" | |
| else: #Aufruf an das LLM | |
| self._log(__name__, f"❓ Kollaborativ (confidence={confidence:.3f})") | |
| options = self.generate_options(message, problem_type) | |
| return f"❓ [KOLLABORATIV] Ich bin unsicher. Hier sind 3 Optionen:\n\n{options}\n\nWelche soll ich nehmen?" | |
| # Diese Methode prüft die Vektorähnlichkeit und ruft aus Datei-Memory den Ähnlichkeitsvergleich auf! | |
| # Achtung: hier nur die Steuerung! | |
| def _handle_memory_command(self, message): | |
| """Verarbeitet !memory Befehle für Vektordatenbank-Tests""" | |
| self._log(__name__, f"_handle_memory_command: '{message}'") | |
| # In message steht die Fehlermeldung aus Logfile, abschneiden der ersten 8 Zeichen (!memory), Leerzeichen vorne und hinten entfernen (.strip) | |
| # und verteilen der einzelnen Worte (.split) auf Liste parts! je Listenelement ein Wort! | |
| parts = message[8:].strip().split() | |
| if not parts: # Also keine Fehlermeldung im Log, manuell nur !memory eingegeben! | |
| return "❌ Befehl: !memory query <text> oder !memory stats" | |
| command = parts[0].lower() | |
| # Suche | |
| if command == "query": | |
| # !memory query ImportError: No module named 'gradio' | |
| # aus der Liste parts wird das erste Element entfernt und alle anderen Elemente werden mit Leerzeichen zu einem string verbunden. | |
| # query hat also nur 1 Listenelement query[0] | |
| query = " ".join(parts[1:]) | |
| if not query: | |
| return "❌ Bitte Suchtext angeben: !memory query <text>" | |
| self._log(__name__, f"🔍 Suche nach: '{query}'") | |
| # Aufruf der Funktion find_similar in Datei memory- Rückgabe: 3 beste Ähnlichkeiten aus JSON-File | |
| results = self.memory.find_similar(query, threshold=0.3, top_k=3) | |
| if not results: | |
| return f"❌ Keine Treffer für: {query}" | |
| # Anzeige im Interface: | |
| output = f"🧠 **Memory-Treffer für:** '{query}'\n\n" | |
| for i, r in enumerate(results): | |
| output += f"**#{i+1}** (Ähnlichkeit: {r['similarity']:.3f}, Confidence: {r['confidence']:.3f})\n" | |
| output += f"📌 Fehler: {r['error'][:100]}...\n" | |
| output += f"✅ Lösung: {r['solution'][:100]}...\n" | |
| if r.get('patch_command'): | |
| output += f"🛠️ Patch: `{r['patch_command']}`\n" | |
| output += "\n" | |
| return output | |
| # Status | |
| elif command == "stats": | |
| # !memory stats | |
| stats = self.memory.get_stats() | |
| return ( | |
| f"🧠 **Memory-Statistik**\n\n" | |
| f"📊 Einträge: {stats['total_entries']}\n" | |
| f"⭐ Durchschnittl. Confidence: {stats['avg_confidence']:.3f}\n" | |
| f"✅ Erfolge gesamt: {stats['total_success']}\n" | |
| f"🛠️ Mit Patch: {stats['with_patch']}\n" | |
| f"📝 Mit Kontext: {stats['with_context']}" | |
| ) | |
| # Hinzufügen | |
| elif command == "add": | |
| # !memory add "Fehler" "Lösung" | |
| import re | |
| # sucht nach Text in Anführungszeichen | |
| error_match = re.search(r'"([^"]+)"', message) | |
| if not error_match: | |
| return "❌ FEHLER: Du musst den Fehlertext in Anführungszeichen angeben!" | |
| # group(1) ist der erste Text in Klammern egal ob rund oder eckig! | |
| error = error_match.group(1) | |
| rest = message[message.index(error_match.group(0)) + len(error_match.group(0)):] | |
| solution_match = re.search(r'"([^"]+)"', rest) | |
| if not solution_match: | |
| return "❌ FEHLER: Du musst den Fehlertext in Anführungszeichen angeben!" | |
| solution = solution_match.group(1) | |
| self.memory.add_memory(error, solution, confidence=0.8) | |
| return f"✅ Neuer Memory-Eintrag hinzugefügt:\nFehler: {error}\nLösung: {solution}" | |
| else: | |
| return ( | |
| "❌ Unbekannter Memory-Befehl.\n\n" | |
| "Verfügbar:\n" | |
| " `!memory query <text>` - Suche ähnliche Fehler\n" | |
| " `!memory stats` - Memory-Statistik\n" | |
| " `!memory add \"Fehler\" \"Lösung\"` - Neuen Eintrag" | |
| ) | |
| # Die Funktion gibt das File, die Zeilennummer und den Inhalt auf den geändert werden muß zurück! | |
| def _handle_patch_command(self, message): | |
| """Verarbeitet !patch Befehle""" | |
| self._log(__name__, f"_handle_patch_command: '{message}'") | |
| parts = message[7:].split() | |
| self._log(__name__, f"Parts: {parts}") | |
| params = {} | |
| i = 0 | |
| while i < len(parts): | |
| p = parts[i] | |
| self._log(__name__, f"Verarbeite Teil {i}: '{p}'") | |
| if ":" in p: | |
| key, value = p.split(":", 1) | |
| self._log(__name__, f"Key='{key}', Value='{value}'") | |
| if key == "insert_text": | |
| self._log(__name__, f"insert_text erkannt, ersetze \\n") | |
| value = value.replace('\\n', '\n') | |
| text_parts = [value] + parts[i+1:] | |
| params[key] = " ".join(text_parts) | |
| self._log(__name__, f"insert_text gesetzt auf: '{params[key]}'") | |
| i += len(text_parts) - 1 | |
| self._log(__name__, f"Springe zu i={i}") | |
| elif key == "content": | |
| self._log(__name__, f"content erkannt") | |
| content_parts = [value] + parts[i+1:] | |
| params[key] = " ".join(content_parts) | |
| self._log(__name__, f"content gesetzt auf: '{params[key]}'") | |
| i += len(content_parts) - 1 | |
| self._log(__name__, f"Springe zu i={i}") | |
| else: | |
| params[key] = value | |
| self._log(__name__, f"Normaler Parameter: {key}={value}") | |
| i += 1 | |
| scope = params.get("scope", "file") | |
| fix_type = params.get("type", "replace_line") | |
| self._log(__name__, f"scope={scope}, fix_type={fix_type}") | |
| self._log(__name__, f"Alle params: {params}") | |
| clean_params = {k: v for k, v in params.items() | |
| if k not in ["scope", "type"]} | |
| self._log(__name__, f"clean_params: {clean_params}") | |
| self.last_suggestion = None | |
| self.last_response = None | |
| self._log(__name__, "last_suggestion zurückgesetzt") | |
| result = self.apply_fix(scope, fix_type, **clean_params) | |
| self._log(__name__, f"apply_fix Ergebnis: {result[:100]}...") | |
| return result | |
| def _handle_modular_command(self, message): | |
| """Verarbeitet !modular Befehle zum Testen des ModularPatchers""" | |
| self._log(__name__, f"🧩 _handle_modular_command: '{message}'") | |
| # Befehl parsen: !modular extract source_file function_name [target_module] [--remove] | |
| parts = message[9:].strip().split() | |
| if len(parts) < 3 or parts[0] != "extract": | |
| return ("❌ Befehl: !modular extract <source_file> <function_name> [target_module] [--remove]\n" | |
| "Beispiel: !modular extract agent/core.py apply_fix\n" | |
| " !modular extract agent/core.py apply_fix mymodule --remove") | |
| source_file = parts[1] | |
| function_name = parts[2] | |
| # Optionale Parameter parsen | |
| target_module = None | |
| remove_original = False # Standard: nicht löschen | |
| for i in range(3, len(parts)): | |
| if parts[i] == "--remove": | |
| remove_original = True | |
| elif target_module is None and not parts[i].startswith("--"): | |
| target_module = parts[i] | |
| self._log(__name__, f" remove_original={remove_original}, target_module={target_module}") | |
| # ModularPatcher initialisieren | |
| from agent.modular_patcher import ModularPatcher | |
| patcher = ModularPatcher(dry_run=False) | |
| # Funktion ausführen mit Fehlerbehandlung | |
| try: | |
| result = patcher.extract_function( | |
| source_file=source_file, | |
| function_name=function_name, | |
| target_module=target_module, | |
| remove_original=remove_original | |
| ) | |
| self._log(__name__, f"🔥 RESULT TYPE: {type(result)}") | |
| self._log(__name__, f"🔥 RESULT: {result}") | |
| return str(result) | |
| except Exception as e: | |
| self._log(__name__, f"❌ Fehler in modular: {e}") | |
| return f"❌ Fehler bei Extraktion: {str(e)}" | |
| def _handle_analyze_command(self, message): | |
| """Verarbeitet !analyze Befehle""" | |
| parts = message[9:].strip().split() | |
| if not parts: | |
| return "❌ Befehl: !analyze <datei> [funktion]\nBeispiel: !analyze agent/core.py" | |
| filepath = parts[0] | |
| function_name = parts[1] if len(parts) > 1 else None | |
| from agent.analyzer import CodeAnalyzer | |
| analyzer = CodeAnalyzer(dry_run=False) | |
| # Wenn absoluter Pfad übergeben, direkt verwenden | |
| if Path(filepath).is_absolute(): | |
| full_path = Path(filepath) | |
| else: | |
| full_path = self._resolve_project_path(filepath) | |
| if function_name: | |
| return analyzer.suggest_refactoring(str(full_path), function_name) | |
| else: | |
| return analyzer.analyze_file(str(full_path)) | |
| def _execute_last_suggestion(self): | |
| """Führt den gemerkten Vorschlag aus""" | |
| self._log(__name__, f"_execute_last_suggestion: {self.last_suggestion}") | |
| if not self.last_suggestion: | |
| self._log(__name__, "❌ Kein Vorschlag") | |
| return "❌ Kein Vorschlag zum Ausführen." | |
| result = self.apply_fix( | |
| scope="file", | |
| fix_type="replace_line", | |
| filepath=self.last_suggestion["file"], | |
| line=self.last_suggestion["line"], | |
| content=self.last_suggestion["content"] | |
| ) | |
| self._log(__name__, f"apply_fix Ergebnis: {result[:100]}...") | |
| # Bei Erfolg: Lernen! | |
| if "✅" in result: | |
| self._log(__name__, "✅ Erfolg, lerne dazu") | |
| self.learn_from_feedback( | |
| str(self.last_suggestion), | |
| "Automatischer Fix", | |
| success=True | |
| ) | |
| self.last_suggestion = None | |
| self._log(__name__, "last_suggestion zurückgesetzt") | |
| return f"✅ Fix angewendet:\n{result}" | |
| # Für API-Schnittstelle | |
| def handle_patch(self, project, file, line, content): | |
| """API-gesteuerter Patch (ohne menschliche Rückfrage)""" | |
| return self.apply_fix( | |
| scope="file", | |
| fix_type="replace_line", | |
| filepath=str(PROJECTS_PATH / project / file), | |
| line=line, | |
| content=content | |
| ) | |
| def apply_fix(self, scope, fix_type, **kwargs): | |
| """Dynamische Entscheidung: Datei vs Modul vs Projekt""" | |
| self._log(__name__, f"apply_fix: scope={scope}, fix_type={fix_type}, kwargs={kwargs}") | |
| if scope == "file": | |
| patcher = self.file_patcher | |
| self._log(__name__, "Verwende file_patcher") | |
| elif scope == "module": | |
| patcher = self.modular_patcher | |
| self._log(__name__, "Verwende modular_patcher") | |
| elif scope == "project": | |
| patcher = self.project_patcher | |
| self._log(__name__, "Verwende project_patcher") | |
| else: | |
| self._log(__name__, "Unbekannter Scope") | |
| return "❌ Unbekannter Scope" | |
| try: | |
| # Fix ausführen | |
| method = getattr(patcher, fix_type) | |
| self._log(__name__, f"Methode gefunden: {method.__name__}") | |
| # Spezialbehandlungen für verschiedene Fix-Typen | |
| if fix_type == "read_file": | |
| result = method(kwargs.get("filepath")) | |
| elif fix_type == "replace_line": | |
| self._log(__name__, "replace_line Spezialbehandlung") | |
| result = method( | |
| kwargs.get("filepath"), | |
| int(kwargs.get("line")), | |
| kwargs.get("content") | |
| ) | |
| elif fix_type == "find_and_replace": | |
| self._log(__name__, "find_and_replace Spezialbehandlung") | |
| result = method( | |
| kwargs.get("filepath"), | |
| kwargs.get("find"), | |
| kwargs.get("replace"), | |
| kwargs.get("regex", "false").lower() == "true" | |
| ) | |
| elif fix_type == "insert_after": | |
| self._log(__name__, "insert_after Spezialbehandlung") | |
| result = method( | |
| kwargs.get("filepath"), | |
| kwargs.get("search_text"), | |
| kwargs.get("insert_text") | |
| ) | |
| else: | |
| self._log(__name__, "Standard-Behandlung") | |
| result = method(**kwargs) | |
| self._log(__name__, f"Ergebnis: {result[:100]}...") | |
| # Nach erfolgreichem Fix | |
| self.last_suggestion = None | |
| self._log(__name__, "last_suggestion zurückgesetzt") | |
| return result | |
| except AttributeError: | |
| self._log(__name__, f"AttributeError: {fix_type} nicht gefunden") | |
| return f"❌ Fix-Typ '{fix_type}' nicht gefunden" | |
| except ValueError as ve: | |
| self._log(__name__, f"ValueError: {ve} bei Zeile: {kwargs.get('line')}") | |
| return f"❌ Zeile muss eine Zahl sein, bekam: {kwargs.get('line')}" | |
| except Exception as e: | |
| self._log(__name__, f"Exception: {str(e)}") | |
| return f"❌ Fehler bei Ausführung: {str(e)}" | |
| def learn_from_feedback(self, problem, solution, success): | |
| """Speichert erfolgreiche Lösungen""" | |
| if success: | |
| self._log(__name__, f"Lerne: {problem[:50]}... → {solution[:50]}...") | |
| self.memory.add_memory(problem, solution, confidence=0.8) | |
| return "✅ Gelernt! Wird nächstes Mal autonom angewendet." | |
| else: | |
| self._log(__name__, "Feedback gespeichert (kein Erfolg)") | |
| return "📝 Feedback gespeichert." |