Spaces:
Sleeping
Sleeping
| """ | |
| Research Agent — Autonomous Migration Context Gatherer | |
| ====================================================== | |
| A separate LLM-driven agent that runs BEFORE the fix agent. It gets up to | |
| 12 steps to autonomously investigate what broke and why, using real tools | |
| that fetch live documentation from PyPI, GitHub, Python docs, and the web. | |
| The agent is NOT limited to a hardcoded database. It can: | |
| - Parse test errors to identify the exact exception and involved packages | |
| - Look up package info on PyPI (versions, homepage, changelog URL) | |
| - Fetch actual documentation pages, changelogs, and migration guides | |
| - Read Python "What's New" pages for specific versions | |
| - Search GitHub release notes for breaking change details | |
| - Compare old vs new dependency versions from the task metadata | |
| - Synthesize all findings into a focused context for the fix agent | |
| The key insight: the model drives the research strategy. We give it good | |
| tools and let it figure out what to look up based on the actual error. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import re | |
| import time | |
| from typing import Any, Dict, List, Optional | |
| from urllib.request import urlopen, Request | |
| from urllib.error import URLError | |
| log = logging.getLogger("research_agent") | |
| # --------------------------------------------------------------------------- | |
| # Model family detection | |
| # --------------------------------------------------------------------------- | |
| def _detect_model_family(model_name: str) -> str: | |
| name_lower = model_name.lower() | |
| if "gemma" in name_lower: | |
| return "gemma" | |
| if "qwen3" in name_lower: | |
| return "qwen3" | |
| if "qwen" in name_lower: | |
| return "qwen2" | |
| return "unknown" | |
| def _strip_model_artifacts(raw_text: str, family: str) -> str: | |
| clean = raw_text | |
| if family == "gemma": | |
| clean = re.sub(r"<\|channel>thought\n.*?<channel\|>", "", clean, flags=re.DOTALL) | |
| for tok in ["<turn|>", "<|turn>", "<eos>", "</s>"]: | |
| clean = clean.replace(tok, "") | |
| elif family == "qwen3": | |
| clean = re.sub(r"<think>.*?</think>", "", clean, flags=re.DOTALL) | |
| # Truncate at first <|im_end|> — everything after is hallucinated | |
| im_end = clean.find("<|im_end|>") | |
| if im_end != -1: | |
| clean = clean[:im_end] | |
| for tok in ["<|im_end|>", "<|endoftext|>", "<|im_start|>"]: | |
| clean = clean.replace(tok, "") | |
| elif family == "qwen2": | |
| im_end = clean.find("<|im_end|>") | |
| if im_end != -1: | |
| clean = clean[:im_end] | |
| for tok in ["<|im_end|>", "<|endoftext|>", "<|im_start|>"]: | |
| clean = clean.replace(tok, "") | |
| else: | |
| for tok in ["<eos>", "</s>", "<|im_end|>", "<|endoftext|>", "<turn|>", "<|turn>"]: | |
| clean = clean.replace(tok, "") | |
| return clean.strip() | |
| # ─────────────────────────────────────────────────────────────── | |
| # Research Tools — real, live, no hardcoded DB | |
| # ─────────────────────────────────────────────────────────────── | |
| def tool_parse_error(test_output: str) -> str: | |
| """Parse test output to extract every error, traceback file, and package.""" | |
| errors = [] | |
| packages = set() | |
| for m in re.finditer( | |
| r"(ImportError|ModuleNotFoundError|AttributeError|TypeError|NameError|" | |
| r"SyntaxError|ValueError|DeprecationWarning|RuntimeError|KeyError): (.+)", | |
| test_output, | |
| ): | |
| errors.append(f"{m.group(1)}: {m.group(2).strip()}") | |
| for pkg in re.finditer(r"'(\w+(?:\.\w+)*)'", m.group(2)): | |
| top = pkg.group(1).split(".")[0] | |
| if top not in ("builtins", "importlib", "usr", "work"): | |
| packages.add(top) | |
| files = re.findall(r'File "(/work/\S+\.py)", line (\d+)', test_output) | |
| if not files: | |
| files = re.findall(r"(/work/\S+\.py):(\d+)", test_output) | |
| failed = re.search(r"(\d+) (?:failed|error)", test_output) | |
| passed = re.search(r"(\d+) passed", test_output) | |
| out = "=== ERROR ANALYSIS ===\n" | |
| if errors: | |
| out += f"Errors found ({len(errors)}):\n" | |
| for e in errors[:15]: | |
| out += f" - {e}\n" | |
| else: | |
| out += "Could not parse specific errors. Check the raw test output.\n" | |
| if files: | |
| unique = list(dict.fromkeys(f"{f}:{ln}" for f, ln in files)) | |
| out += f"\nSource files referenced ({len(unique)}):\n" | |
| for f in unique[:15]: | |
| out += f" - {f}\n" | |
| if packages: | |
| out += f"\nPackages involved: {', '.join(sorted(packages))}\n" | |
| if failed: | |
| out += f"\nTest summary: {failed.group(0)}" | |
| if passed: | |
| out += f", {passed.group(0)}" | |
| out += "\n" | |
| return out | |
| def tool_search_pypi(package_name: str) -> str: | |
| """Fetch package metadata from PyPI: version, summary, homepage, changelog URL.""" | |
| try: | |
| url = f"https://pypi.org/pypi/{package_name}/json" | |
| req = Request(url, headers={"User-Agent": "code-migration-research/1.0"}) | |
| with urlopen(req, timeout=10) as resp: | |
| data = json.loads(resp.read()) | |
| info = data.get("info", {}) | |
| version = info.get("version", "unknown") | |
| summary = info.get("summary", "") | |
| home_page = info.get("home_page", "") or "" | |
| project_urls = info.get("project_urls") or {} | |
| # Try to find changelog/release notes URL | |
| changelog_url = "" | |
| for key in ["Changelog", "Changes", "Release Notes", "History", | |
| "Release notes", "CHANGELOG", "What's New"]: | |
| if key in project_urls: | |
| changelog_url = project_urls[key] | |
| break | |
| # Also grab the GitHub URL if available | |
| github_url = "" | |
| for key in ["Source", "Source Code", "Repository", "Homepage", "GitHub"]: | |
| if key in project_urls: | |
| val = project_urls[key] | |
| if "github.com" in val: | |
| github_url = val | |
| break | |
| if not github_url and home_page and "github.com" in home_page: | |
| github_url = home_page | |
| result = f"Package: {package_name}\n" | |
| result += f"Latest version: {version}\n" | |
| result += f"Summary: {summary}\n" | |
| if home_page: | |
| result += f"Homepage: {home_page}\n" | |
| if changelog_url: | |
| result += f"Changelog URL: {changelog_url}\n" | |
| if github_url: | |
| result += f"GitHub: {github_url}\n" | |
| # List all project URLs for the agent to explore | |
| if project_urls: | |
| result += "All project URLs:\n" | |
| for k, v in project_urls.items(): | |
| result += f" {k}: {v}\n" | |
| return result | |
| except Exception as e: | |
| return f"PyPI lookup failed for '{package_name}': {e}" | |
| def tool_fetch_url(url: str) -> str: | |
| """Fetch and return clean text content from any URL. All HTML is stripped. | |
| Returns up to 5000 chars of readable text content. | |
| """ | |
| try: | |
| req = Request(url, headers={ | |
| "User-Agent": "code-migration-research/1.0", | |
| "Accept": "text/html,application/xhtml+xml,text/plain,*/*", | |
| }) | |
| with urlopen(req, timeout=15) as resp: | |
| raw = resp.read().decode("utf-8", errors="replace") | |
| # Remove script and style blocks entirely | |
| text = re.sub(r"<script[^>]*>.*?</script>", "", raw, flags=re.DOTALL | re.IGNORECASE) | |
| text = re.sub(r"<style[^>]*>.*?</style>", "", text, flags=re.DOTALL | re.IGNORECASE) | |
| # Remove HTML comments | |
| text = re.sub(r"<!--.*?-->", "", text, flags=re.DOTALL) | |
| # Replace block-level tags with newlines for readability | |
| text = re.sub(r"<(?:br|p|div|li|tr|h\d|dt|dd|section|article)[^>]*>", "\n", text, flags=re.IGNORECASE) | |
| # Strip all remaining HTML tags | |
| text = re.sub(r"<[^>]+>", " ", text) | |
| # Decode common HTML entities | |
| text = text.replace("&", "&").replace("<", "<").replace(">", ">") | |
| text = text.replace(""", '"').replace("'", "'").replace(" ", " ") | |
| text = re.sub(r"&#\d+;", " ", text) | |
| text = re.sub(r"&\w+;", " ", text) | |
| # Collapse inline whitespace but keep newlines | |
| text = re.sub(r"[^\S\n]+", " ", text) | |
| # Collapse multiple blank lines into one | |
| text = re.sub(r"\n\s*\n+", "\n\n", text) | |
| text = text.strip() | |
| if not text: | |
| return f"Page at {url} returned empty content after HTML stripping." | |
| return text[:5000] | |
| except Exception as e: | |
| return f"Failed to fetch {url}: {e}" | |
| def tool_fetch_python_docs(version: str) -> str: | |
| """Fetch the official Python 'What's New' page for a specific version. | |
| Example: version="3.12" fetches https://docs.python.org/3.12/whatsnew/3.12.html | |
| Returns the text content focused on breaking changes and removals. | |
| """ | |
| minor = version.split(".") | |
| if len(minor) >= 2: | |
| ver = f"{minor[0]}.{minor[1]}" | |
| else: | |
| ver = version | |
| url = f"https://docs.python.org/{ver}/whatsnew/{ver}.html" | |
| try: | |
| req = Request(url, headers={"User-Agent": "code-migration-research/1.0"}) | |
| with urlopen(req, timeout=15) as resp: | |
| raw = resp.read().decode("utf-8", errors="replace") | |
| # Strip HTML | |
| text = re.sub(r"<script[^>]*>.*?</script>", " ", raw, flags=re.DOTALL) | |
| text = re.sub(r"<style[^>]*>.*?</style>", " ", text, flags=re.DOTALL) | |
| text = re.sub(r"<[^>]+>", " ", text) | |
| text = re.sub(r" ", " ", text) | |
| text = re.sub(r"&[a-z]+;", " ", text) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| # Try to extract just the "Removed" and "Deprecated" sections | |
| # by looking for those keywords and grabbing surrounding context | |
| sections = [] | |
| for keyword in ["Removed", "removed", "Deprecated", "deprecated", | |
| "Breaking", "breaking", "Porting"]: | |
| for m in re.finditer(keyword, text): | |
| start = max(0, m.start() - 100) | |
| end = min(len(text), m.end() + 800) | |
| chunk = text[start:end].strip() | |
| if chunk and chunk not in sections: | |
| sections.append(chunk) | |
| if sections: | |
| result = f"Python {ver} What's New — breaking changes:\n\n" | |
| result += "\n---\n".join(sections[:8]) | |
| return result[:5000] | |
| # Fallback: return first 5000 chars | |
| return f"Python {ver} What's New (full page excerpt):\n{text[:5000]}" | |
| except Exception as e: | |
| return f"Failed to fetch Python {ver} What's New: {e}" | |
| def tool_search_github_releases(owner: str, repo: str) -> str: | |
| """Fetch the latest 5 release notes from a GitHub repository. | |
| Use this to find breaking changes, migration notes, and changelog entries. | |
| Pass owner and repo separately, e.g. owner="numpy", repo="numpy". | |
| """ | |
| try: | |
| url = f"https://api.github.com/repos/{owner}/{repo}/releases?per_page=5" | |
| req = Request(url, headers={ | |
| "User-Agent": "code-migration-research/1.0", | |
| "Accept": "application/vnd.github.v3+json", | |
| }) | |
| with urlopen(req, timeout=10) as resp: | |
| releases = json.loads(resp.read()) | |
| if not releases: | |
| return f"No releases found for {owner}/{repo}." | |
| result = [] | |
| for rel in releases[:5]: | |
| tag = rel.get("tag_name", "unknown") | |
| name = rel.get("name", tag) | |
| body = rel.get("body", "")[:1200] | |
| result.append(f"=== {name} ({tag}) ===\n{body}") | |
| return "\n\n".join(result)[:5000] | |
| except Exception as e: | |
| return f"GitHub releases lookup failed for {owner}/{repo}: {e}" | |
| def tool_compare_versions(dependency_versions: str, package_name: str) -> str: | |
| """Extract the installed version of a specific package from the dependency list. | |
| This helps you know exactly which version is installed so you can look up | |
| the right migration guide (e.g., numpy 1.x → 2.x has different breaks than 1.24 → 1.26). | |
| """ | |
| if not dependency_versions: | |
| return f"No dependency version info available for {package_name}." | |
| for line in dependency_versions.strip().splitlines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Parse "package==version" or "package>=version" | |
| parts = re.split(r"[=><~!]+", line, maxsplit=1) | |
| if len(parts) >= 2: | |
| pkg = parts[0].strip().lower().replace("-", "_") | |
| ver = parts[1].strip() | |
| if pkg == package_name.lower().replace("-", "_"): | |
| return f"{package_name} installed version: {ver}" | |
| return f"Package '{package_name}' not found in dependency list." | |
| def tool_find_changelog(github_url: str) -> str: | |
| """Given a GitHub repo URL, try to find and fetch the CHANGELOG/CHANGES/HISTORY file. | |
| Automatically tries common changelog filenames on both main and master branches. | |
| This is the most reliable way to find breaking changes for a package. | |
| Example: github_url="https://github.com/numpy/numpy" | |
| """ | |
| # Extract owner/repo from URL | |
| m = re.search(r"github\.com/([^/]+)/([^/]+)", github_url) | |
| if not m: | |
| return f"Could not parse GitHub owner/repo from: {github_url}" | |
| owner = m.group(1) | |
| repo = m.group(2).rstrip("/").replace(".git", "") | |
| # Try common changelog filenames on common branches | |
| filenames = [ | |
| "CHANGELOG.md", "CHANGELOG.rst", "CHANGELOG", | |
| "CHANGES.md", "CHANGES.rst", "CHANGES", | |
| "HISTORY.md", "HISTORY.rst", | |
| "NEWS.md", "NEWS.rst", | |
| "RELEASE_NOTES.md", | |
| ] | |
| branches = ["main", "master"] | |
| for branch in branches: | |
| for fname in filenames: | |
| raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{fname}" | |
| try: | |
| req = Request(raw_url, headers={"User-Agent": "code-migration-research/1.0"}) | |
| with urlopen(req, timeout=5) as resp: | |
| content = resp.read().decode("utf-8", errors="replace") | |
| if len(content) < 20: | |
| continue | |
| # Found it — return a useful chunk | |
| # Try to find breaking/removed/deprecated sections | |
| sections = [] | |
| for keyword in ["breaking", "removed", "deprecated", "incompatible", | |
| "migration", "upgrade", "BREAKING", "Removed", "Deprecated"]: | |
| for km in re.finditer(keyword, content, re.IGNORECASE): | |
| start = max(0, km.start() - 200) | |
| end = min(len(content), km.end() + 1500) | |
| chunk = content[start:end].strip() | |
| if chunk not in sections: | |
| sections.append(chunk) | |
| if sections: | |
| result = f"Found {fname} in {owner}/{repo} ({branch}):\n\n" | |
| result += "\n---\n".join(sections[:6]) | |
| return result[:5000] | |
| # No keyword matches — return the first 5000 chars | |
| return f"Found {fname} in {owner}/{repo} ({branch}):\n\n{content[:5000]}" | |
| except Exception: | |
| continue | |
| return f"No changelog file found in {owner}/{repo}. Try search_github_releases or fetch_url with a specific docs URL." | |
| # ─────────────────────────────────────────────────────────────── | |
| # Tool registry | |
| # ─────────────────────────────────────────────────────────────── | |
| RESEARCH_TOOLS = { | |
| "parse_error": tool_parse_error, | |
| "search_pypi": tool_search_pypi, | |
| "fetch_url": tool_fetch_url, | |
| "fetch_python_docs": tool_fetch_python_docs, | |
| "search_github_releases": tool_search_github_releases, | |
| "compare_versions": tool_compare_versions, | |
| "find_changelog": tool_find_changelog, | |
| } | |
| # ─────────────────────────────────────────────────────────────── | |
| # System prompt — the agent drives the research strategy | |
| # ─────────────────────────────────────────────────────────────── | |
| RESEARCH_SYSTEM_PROMPT = """You are a research agent. Your job is to investigate WHY tests are failing | |
| after a Python project's dependencies were upgraded, and find the EXACT fix patterns. | |
| You have these tools (output ONE JSON tool call per turn): | |
| 1. parse_error(test_output) | |
| - Extracts all errors, tracebacks, file paths, and package names from test output | |
| - ALWAYS call this first to understand what broke | |
| 2. compare_versions(dependency_versions, package_name) | |
| - Shows the exact installed version of a package | |
| - Use this to know if it's a major version jump (e.g., numpy 1.x → 2.x) | |
| 3. search_pypi(package_name) | |
| - Gets package info from PyPI: latest version, homepage, changelog URL, GitHub URL | |
| - Use this to find where the docs/changelog live | |
| 4. find_changelog(github_url) | |
| - Given a GitHub URL, finds and fetches the CHANGELOG/CHANGES/HISTORY file | |
| - Automatically searches common filenames and branches | |
| - Returns sections mentioning breaking changes, removals, deprecations | |
| - THIS IS YOUR BEST TOOL for finding what changed between versions | |
| 5. fetch_url(url) | |
| - Fetches any URL and returns text content (HTML stripped) | |
| - Use this to read specific docs pages, migration guides, or changelog URLs from PyPI | |
| 6. fetch_python_docs(version) | |
| - Fetches the official Python "What's New" page for a version (e.g., "3.12") | |
| - Shows removed modules, deprecated APIs, breaking changes | |
| 7. search_github_releases(owner, repo) | |
| - Gets recent release notes from GitHub | |
| - Good for finding breaking changes when there's no changelog file | |
| Output format: {"name": "tool_name", "arguments": {"arg": "value"}} | |
| STRATEGY: | |
| 1. parse_error — understand exactly what errors occurred and which packages are involved | |
| 2. compare_versions — check what version of the failing package is installed (is it a major bump?) | |
| 3. search_pypi — find the package's GitHub URL | |
| 4. find_changelog — fetch the changelog from GitHub and look for breaking changes | |
| 5. If changelog doesn't have what you need, try search_github_releases or fetch_url on the docs | |
| 6. For Python stdlib issues, use fetch_python_docs | |
| 7. IMPORTANT: Also check the dependency versions list for ANY major version bumps | |
| (e.g., numpy 1.x→2.x, pandas 1.x→2.x, Django 3.x→5.x). These often have | |
| breaking changes that won't show up in the CURRENT test errors but will appear | |
| after you fix the first error. Research ALL major-bumped packages, not just | |
| the one currently failing. | |
| 8. When you have enough info: {"name": "done", "arguments": {"summary": "YOUR FINDINGS"}} | |
| Your summary MUST include for EACH error: | |
| - The exact error message | |
| - The root cause (what was removed/renamed/moved in which version) | |
| - The EXACT fix: old code → new code replacement | |
| - Which file(s) need to be changed | |
| Also include any POTENTIAL breaking changes from major version bumps in the | |
| dependency list, even if they don't show up in the current test errors yet. | |
| For example, if pandas went from 1.x to 2.x, mention that DataFrame.append() | |
| was removed and the fix is pd.concat(). | |
| RULES: | |
| - NEVER call the same tool with the same arguments twice | |
| - Don't just look things up — READ the actual changelog. Use find_changelog. | |
| - Be specific: "replace np.math.factorial with math.factorial" not "check numpy docs" | |
| - You have limited steps — be efficient. parse_error → search_pypi → find_changelog → done. | |
| - If the dependency list shows multiple major version bumps, research ALL of them. | |
| """ | |
| # ─────────────────────────────────────────────────────────────── | |
| # Research Agent class | |
| # ─────────────────────────────────────────────────────────────── | |
| class ResearchAgent: | |
| """LLM-driven agent that autonomously researches migration breaking changes.""" | |
| def __init__(self, model, tokenizer, max_steps: int = 12, model_name: str = ""): | |
| self.model = model | |
| self.tokenizer = tokenizer | |
| self.max_steps = max_steps | |
| self.model_family = _detect_model_family(model_name) if model_name else "unknown" | |
| self.last_research_steps: List[Dict] = [] | |
| def research( | |
| self, | |
| repo_name: str, | |
| old_python: str, | |
| new_python: str, | |
| related_modules: str, | |
| test_output: str, | |
| dependency_versions: str = "", | |
| on_step: Any = None, | |
| ) -> str: | |
| """Run the research loop and return migration context for the fix agent.""" | |
| import torch | |
| log.info(" [RESEARCH] Starting for %s (%s → %s, modules=%s)", | |
| repo_name, old_python, new_python, related_modules) | |
| # Build the initial context with everything the agent needs | |
| dep_summary = "" | |
| if dependency_versions: | |
| dep_summary = f"\nInstalled dependency versions:\n{dependency_versions}\n" | |
| # Highlight which modules are involved so the agent researches all of them | |
| modules_note = "" | |
| if related_modules and "," in related_modules: | |
| mods = [m.strip() for m in related_modules.split(",")] | |
| modules_note = ( | |
| f"\nIMPORTANT: This project uses multiple upgraded packages: {', '.join(mods)}. " | |
| f"Research breaking changes for ALL of them, not just the one in the current error. " | |
| f"After fixing one error, another package's breaking change may surface.\n" | |
| ) | |
| initial_context = ( | |
| f"Repository: {repo_name}\n" | |
| f"Python version migration: {old_python} → {new_python}\n" | |
| f"Related modules: {related_modules}\n" | |
| f"{modules_note}" | |
| f"{dep_summary}\n" | |
| f"Failing test output (last 3000 chars):\n" | |
| f"{test_output[-3000:]}\n" | |
| ) | |
| messages = [ | |
| {"role": "system", "content": RESEARCH_SYSTEM_PROMPT}, | |
| {"role": "user", "content": initial_context}, | |
| ] | |
| gathered_info: List[str] = [] | |
| research_steps: List[Dict] = [] | |
| seen_calls: set = set() # track ALL previous calls, not just last | |
| for step in range(1, self.max_steps + 1): | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| # Generate | |
| try: | |
| text = self.tokenizer.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True, | |
| **({"enable_thinking": False} if self.model_family == "qwen3" else {}), | |
| ) | |
| if self.model_family == "gemma": | |
| text = text.replace("<|think|>", "") | |
| inputs = self.tokenizer(text, return_tensors="pt").to(self.model.device) | |
| input_len = inputs["input_ids"].shape[-1] | |
| # Build gen kwargs with stop tokens for Qwen | |
| gen_kwargs = dict( | |
| max_new_tokens=400, | |
| temperature=0.3, | |
| top_p=0.95, | |
| do_sample=True, | |
| pad_token_id=self.tokenizer.pad_token_id, | |
| ) | |
| if self.model_family in ("qwen3", "qwen2"): | |
| stop_ids = [] | |
| for tok_str in ["<|im_end|>", "<|endoftext|>"]: | |
| tid = self.tokenizer.convert_tokens_to_ids(tok_str) | |
| if tid is not None and tid != self.tokenizer.unk_token_id: | |
| stop_ids.append(tid) | |
| if stop_ids: | |
| eos = self.tokenizer.eos_token_id | |
| if isinstance(eos, int): | |
| stop_ids.append(eos) | |
| gen_kwargs["eos_token_id"] = list(set(stop_ids)) | |
| with torch.no_grad(): | |
| outputs = self.model.generate(**inputs, **gen_kwargs) | |
| raw = self.tokenizer.decode(outputs[0][input_len:], skip_special_tokens=False) | |
| del inputs, outputs | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| clean = _strip_model_artifacts(raw, self.model_family) | |
| except Exception as e: | |
| log.warning(" [RESEARCH] Step %d generation failed: %s", step, e) | |
| break | |
| # Parse tool call | |
| parsed = _parse_research_call(clean) | |
| tool_name = parsed["tool_name"] | |
| tool_args = parsed["tool_args"] | |
| # Done signal | |
| if tool_name == "done": | |
| summary = tool_args.get("summary", "") | |
| if summary: | |
| gathered_info.append(f"=== RESEARCH SUMMARY ===\n{summary}") | |
| log.info(" [RESEARCH] Step %d: done (summary=%d chars)", step, len(summary)) | |
| break | |
| # Anti-repeat: block any call we've already made | |
| curr_key = f"{tool_name}:{json.dumps(tool_args, sort_keys=True)}" | |
| if curr_key in seen_calls: | |
| log.info(" [RESEARCH] Step %d: already called %s with same args → forcing done", step, tool_name) | |
| break | |
| seen_calls.add(curr_key) | |
| # Execute tool | |
| result = self._execute_tool( | |
| tool_name, tool_args, | |
| test_output=test_output, | |
| dependency_versions=dependency_versions, | |
| new_python=new_python, | |
| related_modules=related_modules, | |
| ) | |
| gathered_info.append(result) | |
| args_short = json.dumps(tool_args, default=str)[:150] | |
| log.info(" [RESEARCH] Step %d: %s(%s) → %d chars", | |
| step, tool_name, args_short, len(result)) | |
| research_steps.append({ | |
| "step": step, | |
| "tool": tool_name, | |
| "args": tool_args, | |
| "result": result, | |
| "model_output": clean, | |
| }) | |
| # Live callback for streaming steps to the frontend | |
| if on_step: | |
| on_step(research_steps[-1]) | |
| # Feed result back to the agent | |
| messages.append({"role": "assistant", "content": clean}) | |
| messages.append({"role": "user", "content": f"Tool result:\n{result}"}) | |
| self.last_research_steps = research_steps | |
| if not gathered_info: | |
| log.warning(" [RESEARCH] No info gathered, returning empty context") | |
| return "Research agent could not gather migration context. Use general debugging." | |
| return "\n\n".join(gathered_info) | |
| def _execute_tool( | |
| self, | |
| tool_name: str, | |
| tool_args: Dict[str, Any], | |
| *, | |
| test_output: str, | |
| dependency_versions: str, | |
| new_python: str, | |
| related_modules: str, | |
| ) -> str: | |
| """Dispatch a tool call with proper argument handling.""" | |
| try: | |
| if tool_name == "parse_error": | |
| return tool_parse_error(test_output) | |
| elif tool_name == "compare_versions": | |
| return tool_compare_versions( | |
| dependency_versions, | |
| tool_args.get("package_name", related_modules), | |
| ) | |
| elif tool_name == "search_pypi": | |
| return tool_search_pypi( | |
| tool_args.get("package_name", related_modules), | |
| ) | |
| elif tool_name == "fetch_url": | |
| url = tool_args.get("url", "") | |
| if not url: | |
| return "Error: fetch_url requires a 'url' argument." | |
| return tool_fetch_url(url) | |
| elif tool_name == "fetch_python_docs": | |
| return tool_fetch_python_docs( | |
| tool_args.get("version", new_python), | |
| ) | |
| elif tool_name == "search_github_releases": | |
| owner = tool_args.get("owner", "") | |
| repo = tool_args.get("repo", "") | |
| if not owner or not repo: | |
| return "Error: search_github_releases requires 'owner' and 'repo'." | |
| return tool_search_github_releases(owner, repo) | |
| elif tool_name == "find_changelog": | |
| github_url = tool_args.get("github_url", "") | |
| if not github_url: | |
| return "Error: find_changelog requires a 'github_url' argument." | |
| return tool_find_changelog(github_url) | |
| else: | |
| return f"Unknown tool: {tool_name}. Available: {', '.join(RESEARCH_TOOLS.keys())}" | |
| except Exception as e: | |
| return f"Tool '{tool_name}' error: {e}" | |
| def _parse_research_call(text: str) -> Dict[str, Any]: | |
| """Parse a JSON tool call from model output. | |
| Finds the FIRST complete JSON object to avoid hallucinated multi-turn content. | |
| """ | |
| text = text.strip() | |
| # Strip markdown fences | |
| if text.startswith("```"): | |
| lines = text.split("\n") | |
| lines = [l for l in lines if not l.strip().startswith("```")] | |
| text = "\n".join(lines).strip() | |
| # Find first { and match its closing } | |
| start = text.find("{") | |
| if start != -1: | |
| depth = 0 | |
| for i in range(start, len(text)): | |
| if text[i] == "{": | |
| depth += 1 | |
| elif text[i] == "}": | |
| depth -= 1 | |
| if depth == 0: | |
| try: | |
| data = json.loads(text[start:i + 1]) | |
| name = data.get("name", data.get("tool_name", "")) | |
| args = data.get("arguments", data.get("tool_args", data.get("parameters", {}))) | |
| if name: | |
| return {"tool_name": name, "tool_args": args} | |
| except json.JSONDecodeError: | |
| pass | |
| break | |
| # Fallback: start with parse_error | |
| return {"tool_name": "parse_error", "tool_args": {}} | |