Spaces:
Sleeping
Sleeping
| from utils_llm import get_llm | |
| from planner import create_plan | |
| from tools import search_code, open_file, list_files | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| import json | |
| import re | |
| DECIDE_TOOL_PROMPT = """ | |
| You are an execution agent. Given a user query, a current step in the plan, and previous context, decide which tool to use next. | |
| ## ANTI-HALLUCINATION RULES | |
| - If you don't know where a file is, use `list_files` or `search_code` to find it. | |
| - NEVER assume the path or content of a file you have not seen in the 'Previous Context'. | |
| - If the current results are insufficient to complete the step, continue using tools. | |
| - NEVER guess function names, file structures, or logic. | |
| ## Available Tools: | |
| 1. `search_code(input_query)`: Use for semantic/keyword search across the whole codebase. | |
| 2. `open_file(input_path)`: Use to read the FULL content of a SPECIFIC file you have seen in the context. | |
| 3. `list_files(input_directory)`: Use to see files in a directory (defaults to "." for root). | |
| 4. `DONE(none)`: Use if you have already gathered enough information to fully answer the user's ORIGINAL query. | |
| ## Format | |
| Return ONLY a JSON object: | |
| { | |
| "tool": "tool_name", | |
| "input": "input_value", | |
| "reason": "why you chose this based ONLY on evidence" | |
| } | |
| """ | |
| FINAL_ANSWER_PROMPT = """ | |
| You are CodeLens AI. Based on the provided investigation context, answer the user's query. | |
| ## STRICT GROUNDING RULES | |
| 1. **NO HALLUCINATION**: Base your entire answer ONLY on the provided investigation context. | |
| 2. **ABSENCE OF EVIDENCE**: If the context does not contain enough information, state clearly: "I could not find [X] in the codebase." | |
| 3. **NO GUESSING**: Do not make "educated guesses" about how the code might work. Only state what you have actually seen. | |
| 4. **CITE SOURCES**: Always reference exact file names and line contexts. | |
| 5. **VERIFY BEFORE CLAIMING**: If you claim a file does [X], ensure you saw that logic in the observation. | |
| ## User Query | |
| {query} | |
| ## Investigation Context | |
| {context} | |
| """ | |
| BUG_DETECTION_PROMPT = """ | |
| You are a Senior Security Engineer. Analyze the provided investigation context for BUGS, SECURITY VULNERABILITIES, or PERFORMANCE BOTTLE NECKS. | |
| ## STRICT GROUNDING RULES | |
| 1. **REAL BUGS ONLY**: Only report bugs you have actually observed in the context. | |
| 2. **NO FALSE POSITIVES**: Do not assume a bug exists just because a function name sounds relevant. | |
| 3. **EVIDENCE-BASED**: Cite file paths and specific code snippets from the context for every single bug reported. | |
| 4. **FALLBACK**: If no bugs are found in the observed code, state: "No bugs were identified in the explored code." | |
| ## User Query | |
| {query} | |
| ## Investigation Context | |
| {context} | |
| Format your answer as a structured Bug Report. | |
| """ | |
| def decide_tool_for_step(query, step, context, openai_api_key): | |
| llm = get_llm(openai_api_key) | |
| messages = [ | |
| SystemMessage(content=DECIDE_TOOL_PROMPT), | |
| HumanMessage(content=f"Original User Query: {query}\n\nPrevious Context: {context[-4000:]}\nCurrent Step: {step}") | |
| ] | |
| response = llm.invoke(messages) | |
| content = response.content.strip() | |
| if "```" in content: | |
| match = re.search(r"```(?:json)?\s*({.*})\s*```", content, re.DOTALL) | |
| if match: | |
| content = match.group(1) | |
| else: | |
| content = content.replace("```json", "").replace("```", "").strip() | |
| try: | |
| data = json.loads(content) | |
| return data | |
| except Exception as e: | |
| print(f"Error parsing tool decision: {e}") | |
| # Default fallback | |
| return {"tool": "search_code", "input": step, "reason": "Fallback due to parse error"} | |
| def generate_final_answer(query, context, openai_api_key): | |
| llm = get_llm(openai_api_key) | |
| # Bug Detection Mode Detection | |
| is_bug_query = any(word in query.lower() for word in ["bug", "error", "issue", "vulnerability", "security", "fail", "broken"]) | |
| prompt_template = BUG_DETECTION_PROMPT if is_bug_query else FINAL_ANSWER_PROMPT | |
| messages = [ | |
| HumanMessage(content=prompt_template.format(query=query, context=context)) | |
| ] | |
| response = llm.invoke(messages) | |
| return response.content | |
| def run_agent(query, vectorstore, repo_path, openai_api_key): | |
| # Step 1: Create Plan | |
| plan = create_plan(query, openai_api_key) | |
| actions_log = [] | |
| context_for_llm = "" | |
| accessed_sources = set() | |
| # Step 2: Execute Loop | |
| # Allow up to 8 steps, but the LLM can stop early using the 'DONE' tool. | |
| max_steps = min(len(plan), 8) | |
| for i in range(max_steps): | |
| step = plan[i] | |
| # Decide Action | |
| action = decide_tool_for_step(query, step, context_for_llm, openai_api_key) | |
| # Execute Action | |
| tool_name = action.get("tool", "search_code") | |
| tool_input = action.get("input", step) | |
| if tool_name.upper() == "DONE": | |
| actions_log.append({ | |
| "step": step, | |
| "tool": "DONE", | |
| "input": "None", | |
| "observation": "Agent realized it has enough context to answer the user query fully. Stopping investigation.", | |
| "reason": action.get("reason", "Finished early") | |
| }) | |
| break | |
| result = "" | |
| if tool_name == "search_code": | |
| result = search_code(tool_input, vectorstore, repo_path) | |
| # Find file paths in search results: --- Result X [path] --- | |
| import re | |
| found = re.findall(r"--- Result \d+ \[(.*?)\] ---", result) | |
| for f in found: accessed_sources.add(f) | |
| elif tool_name == "open_file": | |
| result = open_file(tool_input, repo_path) | |
| if not result.startswith("Error:"): | |
| accessed_sources.add(tool_input) | |
| elif tool_name == "list_files": | |
| result = list_files(repo_path, tool_input) | |
| else: | |
| result = f"Tool {tool_name} not found." | |
| actions_log.append({ | |
| "step": step, | |
| "tool": tool_name, | |
| "input": tool_input, | |
| "observation": result[:500] + "..." if len(result) > 500 else result, | |
| "reason": action.get("reason", "") | |
| }) | |
| # Don't overload the context with too much redundant search output | |
| clean_result = result[:1500] if tool_name == "search_code" else result[:3000] | |
| context_for_llm += f"\n--- Step {i+1}: {step} ---\nAction: {tool_name}({tool_input})\nObservation: {clean_result}\n" | |
| # Step 3: Final Answer | |
| final_answer = generate_final_answer(query, context_for_llm, openai_api_key) | |
| return final_answer, plan, actions_log, list(accessed_sources) | |