ai-codelens / agent.py
nnsohamnn's picture
feat: upgrade to full Agentic CodeLens with dynamic Plan-and-Execute, anti-hallucination guards, and alias-aware tool resolution
1c6f444
from utils_llm import get_llm
from planner import create_plan
from tools import search_code, open_file, list_files
from langchain_core.messages import SystemMessage, HumanMessage
import json
import re
DECIDE_TOOL_PROMPT = """
You are an execution agent. Given a user query, a current step in the plan, and previous context, decide which tool to use next.
## ANTI-HALLUCINATION RULES
- If you don't know where a file is, use `list_files` or `search_code` to find it.
- NEVER assume the path or content of a file you have not seen in the 'Previous Context'.
- If the current results are insufficient to complete the step, continue using tools.
- NEVER guess function names, file structures, or logic.
## Available Tools:
1. `search_code(input_query)`: Use for semantic/keyword search across the whole codebase.
2. `open_file(input_path)`: Use to read the FULL content of a SPECIFIC file you have seen in the context.
3. `list_files(input_directory)`: Use to see files in a directory (defaults to "." for root).
4. `DONE(none)`: Use if you have already gathered enough information to fully answer the user's ORIGINAL query.
## Format
Return ONLY a JSON object:
{
"tool": "tool_name",
"input": "input_value",
"reason": "why you chose this based ONLY on evidence"
}
"""
FINAL_ANSWER_PROMPT = """
You are CodeLens AI. Based on the provided investigation context, answer the user's query.
## STRICT GROUNDING RULES
1. **NO HALLUCINATION**: Base your entire answer ONLY on the provided investigation context.
2. **ABSENCE OF EVIDENCE**: If the context does not contain enough information, state clearly: "I could not find [X] in the codebase."
3. **NO GUESSING**: Do not make "educated guesses" about how the code might work. Only state what you have actually seen.
4. **CITE SOURCES**: Always reference exact file names and line contexts.
5. **VERIFY BEFORE CLAIMING**: If you claim a file does [X], ensure you saw that logic in the observation.
## User Query
{query}
## Investigation Context
{context}
"""
BUG_DETECTION_PROMPT = """
You are a Senior Security Engineer. Analyze the provided investigation context for BUGS, SECURITY VULNERABILITIES, or PERFORMANCE BOTTLE NECKS.
## STRICT GROUNDING RULES
1. **REAL BUGS ONLY**: Only report bugs you have actually observed in the context.
2. **NO FALSE POSITIVES**: Do not assume a bug exists just because a function name sounds relevant.
3. **EVIDENCE-BASED**: Cite file paths and specific code snippets from the context for every single bug reported.
4. **FALLBACK**: If no bugs are found in the observed code, state: "No bugs were identified in the explored code."
## User Query
{query}
## Investigation Context
{context}
Format your answer as a structured Bug Report.
"""
def decide_tool_for_step(query, step, context, openai_api_key):
llm = get_llm(openai_api_key)
messages = [
SystemMessage(content=DECIDE_TOOL_PROMPT),
HumanMessage(content=f"Original User Query: {query}\n\nPrevious Context: {context[-4000:]}\nCurrent Step: {step}")
]
response = llm.invoke(messages)
content = response.content.strip()
if "```" in content:
match = re.search(r"```(?:json)?\s*({.*})\s*```", content, re.DOTALL)
if match:
content = match.group(1)
else:
content = content.replace("```json", "").replace("```", "").strip()
try:
data = json.loads(content)
return data
except Exception as e:
print(f"Error parsing tool decision: {e}")
# Default fallback
return {"tool": "search_code", "input": step, "reason": "Fallback due to parse error"}
def generate_final_answer(query, context, openai_api_key):
llm = get_llm(openai_api_key)
# Bug Detection Mode Detection
is_bug_query = any(word in query.lower() for word in ["bug", "error", "issue", "vulnerability", "security", "fail", "broken"])
prompt_template = BUG_DETECTION_PROMPT if is_bug_query else FINAL_ANSWER_PROMPT
messages = [
HumanMessage(content=prompt_template.format(query=query, context=context))
]
response = llm.invoke(messages)
return response.content
def run_agent(query, vectorstore, repo_path, openai_api_key):
# Step 1: Create Plan
plan = create_plan(query, openai_api_key)
actions_log = []
context_for_llm = ""
accessed_sources = set()
# Step 2: Execute Loop
# Allow up to 8 steps, but the LLM can stop early using the 'DONE' tool.
max_steps = min(len(plan), 8)
for i in range(max_steps):
step = plan[i]
# Decide Action
action = decide_tool_for_step(query, step, context_for_llm, openai_api_key)
# Execute Action
tool_name = action.get("tool", "search_code")
tool_input = action.get("input", step)
if tool_name.upper() == "DONE":
actions_log.append({
"step": step,
"tool": "DONE",
"input": "None",
"observation": "Agent realized it has enough context to answer the user query fully. Stopping investigation.",
"reason": action.get("reason", "Finished early")
})
break
result = ""
if tool_name == "search_code":
result = search_code(tool_input, vectorstore, repo_path)
# Find file paths in search results: --- Result X [path] ---
import re
found = re.findall(r"--- Result \d+ \[(.*?)\] ---", result)
for f in found: accessed_sources.add(f)
elif tool_name == "open_file":
result = open_file(tool_input, repo_path)
if not result.startswith("Error:"):
accessed_sources.add(tool_input)
elif tool_name == "list_files":
result = list_files(repo_path, tool_input)
else:
result = f"Tool {tool_name} not found."
actions_log.append({
"step": step,
"tool": tool_name,
"input": tool_input,
"observation": result[:500] + "..." if len(result) > 500 else result,
"reason": action.get("reason", "")
})
# Don't overload the context with too much redundant search output
clean_result = result[:1500] if tool_name == "search_code" else result[:3000]
context_for_llm += f"\n--- Step {i+1}: {step} ---\nAction: {tool_name}({tool_input})\nObservation: {clean_result}\n"
# Step 3: Final Answer
final_answer = generate_final_answer(query, context_for_llm, openai_api_key)
return final_answer, plan, actions_log, list(accessed_sources)