from utils_llm import get_llm from planner import create_plan from tools import search_code, open_file, list_files from langchain_core.messages import SystemMessage, HumanMessage import json import re DECIDE_TOOL_PROMPT = """ You are an execution agent. Given a user query, a current step in the plan, and previous context, decide which tool to use next. ## ANTI-HALLUCINATION RULES - If you don't know where a file is, use `list_files` or `search_code` to find it. - NEVER assume the path or content of a file you have not seen in the 'Previous Context'. - If the current results are insufficient to complete the step, continue using tools. - NEVER guess function names, file structures, or logic. ## Available Tools: 1. `search_code(input_query)`: Use for semantic/keyword search across the whole codebase. 2. `open_file(input_path)`: Use to read the FULL content of a SPECIFIC file you have seen in the context. 3. `list_files(input_directory)`: Use to see files in a directory (defaults to "." for root). 4. `DONE(none)`: Use if you have already gathered enough information to fully answer the user's ORIGINAL query. ## Format Return ONLY a JSON object: { "tool": "tool_name", "input": "input_value", "reason": "why you chose this based ONLY on evidence" } """ FINAL_ANSWER_PROMPT = """ You are CodeLens AI. Based on the provided investigation context, answer the user's query. ## STRICT GROUNDING RULES 1. **NO HALLUCINATION**: Base your entire answer ONLY on the provided investigation context. 2. **ABSENCE OF EVIDENCE**: If the context does not contain enough information, state clearly: "I could not find [X] in the codebase." 3. **NO GUESSING**: Do not make "educated guesses" about how the code might work. Only state what you have actually seen. 4. **CITE SOURCES**: Always reference exact file names and line contexts. 5. **VERIFY BEFORE CLAIMING**: If you claim a file does [X], ensure you saw that logic in the observation. ## User Query {query} ## Investigation Context {context} """ BUG_DETECTION_PROMPT = """ You are a Senior Security Engineer. Analyze the provided investigation context for BUGS, SECURITY VULNERABILITIES, or PERFORMANCE BOTTLE NECKS. ## STRICT GROUNDING RULES 1. **REAL BUGS ONLY**: Only report bugs you have actually observed in the context. 2. **NO FALSE POSITIVES**: Do not assume a bug exists just because a function name sounds relevant. 3. **EVIDENCE-BASED**: Cite file paths and specific code snippets from the context for every single bug reported. 4. **FALLBACK**: If no bugs are found in the observed code, state: "No bugs were identified in the explored code." ## User Query {query} ## Investigation Context {context} Format your answer as a structured Bug Report. """ def decide_tool_for_step(query, step, context, openai_api_key): llm = get_llm(openai_api_key) messages = [ SystemMessage(content=DECIDE_TOOL_PROMPT), HumanMessage(content=f"Original User Query: {query}\n\nPrevious Context: {context[-4000:]}\nCurrent Step: {step}") ] response = llm.invoke(messages) content = response.content.strip() if "```" in content: match = re.search(r"```(?:json)?\s*({.*})\s*```", content, re.DOTALL) if match: content = match.group(1) else: content = content.replace("```json", "").replace("```", "").strip() try: data = json.loads(content) return data except Exception as e: print(f"Error parsing tool decision: {e}") # Default fallback return {"tool": "search_code", "input": step, "reason": "Fallback due to parse error"} def generate_final_answer(query, context, openai_api_key): llm = get_llm(openai_api_key) # Bug Detection Mode Detection is_bug_query = any(word in query.lower() for word in ["bug", "error", "issue", "vulnerability", "security", "fail", "broken"]) prompt_template = BUG_DETECTION_PROMPT if is_bug_query else FINAL_ANSWER_PROMPT messages = [ HumanMessage(content=prompt_template.format(query=query, context=context)) ] response = llm.invoke(messages) return response.content def run_agent(query, vectorstore, repo_path, openai_api_key): # Step 1: Create Plan plan = create_plan(query, openai_api_key) actions_log = [] context_for_llm = "" accessed_sources = set() # Step 2: Execute Loop # Allow up to 8 steps, but the LLM can stop early using the 'DONE' tool. max_steps = min(len(plan), 8) for i in range(max_steps): step = plan[i] # Decide Action action = decide_tool_for_step(query, step, context_for_llm, openai_api_key) # Execute Action tool_name = action.get("tool", "search_code") tool_input = action.get("input", step) if tool_name.upper() == "DONE": actions_log.append({ "step": step, "tool": "DONE", "input": "None", "observation": "Agent realized it has enough context to answer the user query fully. Stopping investigation.", "reason": action.get("reason", "Finished early") }) break result = "" if tool_name == "search_code": result = search_code(tool_input, vectorstore, repo_path) # Find file paths in search results: --- Result X [path] --- import re found = re.findall(r"--- Result \d+ \[(.*?)\] ---", result) for f in found: accessed_sources.add(f) elif tool_name == "open_file": result = open_file(tool_input, repo_path) if not result.startswith("Error:"): accessed_sources.add(tool_input) elif tool_name == "list_files": result = list_files(repo_path, tool_input) else: result = f"Tool {tool_name} not found." actions_log.append({ "step": step, "tool": tool_name, "input": tool_input, "observation": result[:500] + "..." if len(result) > 500 else result, "reason": action.get("reason", "") }) # Don't overload the context with too much redundant search output clean_result = result[:1500] if tool_name == "search_code" else result[:3000] context_for_llm += f"\n--- Step {i+1}: {step} ---\nAction: {tool_name}({tool_input})\nObservation: {clean_result}\n" # Step 3: Final Answer final_answer = generate_final_answer(query, context_for_llm, openai_api_key) return final_answer, plan, actions_log, list(accessed_sources)