Spaces:
Sleeping
Sleeping
File size: 6,706 Bytes
9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 9b4bb17 1c6f444 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | from utils_llm import get_llm
from planner import create_plan
from tools import search_code, open_file, list_files
from langchain_core.messages import SystemMessage, HumanMessage
import json
import re
DECIDE_TOOL_PROMPT = """
You are an execution agent. Given a user query, a current step in the plan, and previous context, decide which tool to use next.
## ANTI-HALLUCINATION RULES
- If you don't know where a file is, use `list_files` or `search_code` to find it.
- NEVER assume the path or content of a file you have not seen in the 'Previous Context'.
- If the current results are insufficient to complete the step, continue using tools.
- NEVER guess function names, file structures, or logic.
## Available Tools:
1. `search_code(input_query)`: Use for semantic/keyword search across the whole codebase.
2. `open_file(input_path)`: Use to read the FULL content of a SPECIFIC file you have seen in the context.
3. `list_files(input_directory)`: Use to see files in a directory (defaults to "." for root).
4. `DONE(none)`: Use if you have already gathered enough information to fully answer the user's ORIGINAL query.
## Format
Return ONLY a JSON object:
{
"tool": "tool_name",
"input": "input_value",
"reason": "why you chose this based ONLY on evidence"
}
"""
FINAL_ANSWER_PROMPT = """
You are CodeLens AI. Based on the provided investigation context, answer the user's query.
## STRICT GROUNDING RULES
1. **NO HALLUCINATION**: Base your entire answer ONLY on the provided investigation context.
2. **ABSENCE OF EVIDENCE**: If the context does not contain enough information, state clearly: "I could not find [X] in the codebase."
3. **NO GUESSING**: Do not make "educated guesses" about how the code might work. Only state what you have actually seen.
4. **CITE SOURCES**: Always reference exact file names and line contexts.
5. **VERIFY BEFORE CLAIMING**: If you claim a file does [X], ensure you saw that logic in the observation.
## User Query
{query}
## Investigation Context
{context}
"""
BUG_DETECTION_PROMPT = """
You are a Senior Security Engineer. Analyze the provided investigation context for BUGS, SECURITY VULNERABILITIES, or PERFORMANCE BOTTLE NECKS.
## STRICT GROUNDING RULES
1. **REAL BUGS ONLY**: Only report bugs you have actually observed in the context.
2. **NO FALSE POSITIVES**: Do not assume a bug exists just because a function name sounds relevant.
3. **EVIDENCE-BASED**: Cite file paths and specific code snippets from the context for every single bug reported.
4. **FALLBACK**: If no bugs are found in the observed code, state: "No bugs were identified in the explored code."
## User Query
{query}
## Investigation Context
{context}
Format your answer as a structured Bug Report.
"""
def decide_tool_for_step(query, step, context, openai_api_key):
llm = get_llm(openai_api_key)
messages = [
SystemMessage(content=DECIDE_TOOL_PROMPT),
HumanMessage(content=f"Original User Query: {query}\n\nPrevious Context: {context[-4000:]}\nCurrent Step: {step}")
]
response = llm.invoke(messages)
content = response.content.strip()
if "```" in content:
match = re.search(r"```(?:json)?\s*({.*})\s*```", content, re.DOTALL)
if match:
content = match.group(1)
else:
content = content.replace("```json", "").replace("```", "").strip()
try:
data = json.loads(content)
return data
except Exception as e:
print(f"Error parsing tool decision: {e}")
# Default fallback
return {"tool": "search_code", "input": step, "reason": "Fallback due to parse error"}
def generate_final_answer(query, context, openai_api_key):
llm = get_llm(openai_api_key)
# Bug Detection Mode Detection
is_bug_query = any(word in query.lower() for word in ["bug", "error", "issue", "vulnerability", "security", "fail", "broken"])
prompt_template = BUG_DETECTION_PROMPT if is_bug_query else FINAL_ANSWER_PROMPT
messages = [
HumanMessage(content=prompt_template.format(query=query, context=context))
]
response = llm.invoke(messages)
return response.content
def run_agent(query, vectorstore, repo_path, openai_api_key):
# Step 1: Create Plan
plan = create_plan(query, openai_api_key)
actions_log = []
context_for_llm = ""
accessed_sources = set()
# Step 2: Execute Loop
# Allow up to 8 steps, but the LLM can stop early using the 'DONE' tool.
max_steps = min(len(plan), 8)
for i in range(max_steps):
step = plan[i]
# Decide Action
action = decide_tool_for_step(query, step, context_for_llm, openai_api_key)
# Execute Action
tool_name = action.get("tool", "search_code")
tool_input = action.get("input", step)
if tool_name.upper() == "DONE":
actions_log.append({
"step": step,
"tool": "DONE",
"input": "None",
"observation": "Agent realized it has enough context to answer the user query fully. Stopping investigation.",
"reason": action.get("reason", "Finished early")
})
break
result = ""
if tool_name == "search_code":
result = search_code(tool_input, vectorstore, repo_path)
# Find file paths in search results: --- Result X [path] ---
import re
found = re.findall(r"--- Result \d+ \[(.*?)\] ---", result)
for f in found: accessed_sources.add(f)
elif tool_name == "open_file":
result = open_file(tool_input, repo_path)
if not result.startswith("Error:"):
accessed_sources.add(tool_input)
elif tool_name == "list_files":
result = list_files(repo_path, tool_input)
else:
result = f"Tool {tool_name} not found."
actions_log.append({
"step": step,
"tool": tool_name,
"input": tool_input,
"observation": result[:500] + "..." if len(result) > 500 else result,
"reason": action.get("reason", "")
})
# Don't overload the context with too much redundant search output
clean_result = result[:1500] if tool_name == "search_code" else result[:3000]
context_for_llm += f"\n--- Step {i+1}: {step} ---\nAction: {tool_name}({tool_input})\nObservation: {clean_result}\n"
# Step 3: Final Answer
final_answer = generate_final_answer(query, context_for_llm, openai_api_key)
return final_answer, plan, actions_log, list(accessed_sources)
|