File size: 10,834 Bytes
e1ced8e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 | """code_lookup node — lightweight snippet reviewer (no multi-turn tool loop).
Flow:
1. discover_code_locations() — ChromaDB semantic search (~1-2 sec per query)
2. GPT reviews the raw snippets in a SINGLE call — flags relevant ones with
a relevance tag and brief note
3. Raw flagged snippets + GPT notes go to the compliance analyst
No fetch_full_chapter in the initial pass. The compliance analyst can request
targeted chapter fetches via additional_code_queries if it needs more context.
"""
from __future__ import annotations
import json
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from openai import OpenAI
from config import CODE_LOOKUP_MODEL, OPENAI_API_KEY
from prompts.code_lookup import CODE_REVIEWER_SYSTEM_PROMPT
from state import AgentMessage, CodeQuery, CodeSection, ComplianceState
from tools.chroma_tools import QueryCache, discover_code_locations
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Single-call snippet reviewer (replaces the multi-turn tool loop)
# ---------------------------------------------------------------------------
def _review_snippets(
research_goal: str,
discover_report: str,
) -> tuple[str, list[CodeSection]]:
"""GPT reviews discover results in ONE call.
Returns (brief_review, flagged_sections).
"""
client = OpenAI(api_key=OPENAI_API_KEY)
response = client.chat.completions.create(
model=CODE_LOOKUP_MODEL,
messages=[
{"role": "system", "content": CODE_REVIEWER_SYSTEM_PROMPT},
{
"role": "user",
"content": (
f"## Research Goal\n{research_goal}\n\n"
f"## Code Snippets from Database\n{discover_report}"
),
},
],
response_format={"type": "json_object"},
)
raw = response.choices[0].message.content or "{}"
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
logger.warning("GPT snippet review returned invalid JSON, using raw text.")
return raw, []
flagged_sections: list[CodeSection] = []
for item in parsed.get("relevant_sections", []):
flagged_sections.append(
CodeSection(
section_full=item.get("section_id", "?"),
code_type=item.get("code_type", "?"),
parent_major=item.get("chapter", "?"),
text=item.get("snippet", "")[:1500],
relevance=item.get("relevance_note", ""),
)
)
summary = parsed.get("summary", "No summary provided.")
return summary, flagged_sections
def _run_single_lookup(
cq: CodeQuery,
query_cache: QueryCache | None = None,
) -> tuple[str, list[CodeSection], str]:
"""Run discover + review for ONE code query.
Returns (summary, flagged_sections, discover_report_raw).
"""
research_goal = f"{cq['query']} (Context: {cq['context']})"
logger.info("ChromaDB query: %s", research_goal)
# Step 1: ChromaDB discover (fast, ~1-2s)
discover_report = discover_code_locations(research_goal, cache=query_cache)
# Step 2: GPT reviews snippets in a single call
summary, flagged = _review_snippets(research_goal, discover_report)
return summary, flagged, discover_report
# ---------------------------------------------------------------------------
# LangGraph node functions
# ---------------------------------------------------------------------------
def initial_code_lookup(state: ComplianceState) -> dict:
"""Run the initial code lookup based on planner's code_queries.
All queries run in PARALLEL via ThreadPoolExecutor.
Each query = 1 discover + 1 GPT review call (no multi-turn loop).
"""
code_queries = state.get("code_queries", [])
if not code_queries:
return {
"code_report": "No code queries were planned.",
"code_sections": [],
"discussion_log": [
AgentMessage(
timestamp=datetime.now().strftime("%H:%M:%S"),
agent="code_analyst",
action="search_code",
summary="No code queries to execute.",
detail="The planner did not generate any code queries.",
evidence_refs=[],
)
],
"status_message": ["No code queries to execute."],
}
query_cache = QueryCache()
discussion_messages: list[AgentMessage] = []
# Add "searching" messages for all queries upfront
for cq in code_queries:
discussion_messages.append(
AgentMessage(
timestamp=datetime.now().strftime("%H:%M:%S"),
agent="code_analyst",
action="search_code",
summary=f"Searching: {cq['query'][:80]}...",
detail=f"Focus area: {cq['focus_area']}\nContext: {cq['context']}",
evidence_refs=[],
)
)
# Execute ALL queries concurrently
results: dict[int, tuple[str, list[CodeSection], str]] = {}
with ThreadPoolExecutor(max_workers=min(len(code_queries), 4)) as pool:
futures = {
pool.submit(_run_single_lookup, cq, query_cache): i
for i, cq in enumerate(code_queries)
}
for future in as_completed(futures):
i = futures[future]
try:
summary, flagged, _raw = future.result()
results[i] = (summary, flagged, _raw)
cq = code_queries[i]
section_ids = ", ".join(
s["section_full"] for s in flagged
)
discussion_messages.append(
AgentMessage(
timestamp=datetime.now().strftime("%H:%M:%S"),
agent="code_analyst",
action="search_code",
summary=(
f"Flagged {len(flagged)} sections "
f"for '{cq['query']}'"
),
detail=(
f"**Query:** {cq['query']}\n"
f"**Focus:** {cq['focus_area']}\n\n"
f"**Sections:** {section_ids}\n\n"
f"{summary[:800]}"
),
evidence_refs=[s["section_full"] for s in flagged],
)
)
except Exception as e:
logger.error("Code query %d failed: %s", i, e)
results[i] = (f"Error: {e}", [], "")
discussion_messages.append(
AgentMessage(
timestamp=datetime.now().strftime("%H:%M:%S"),
agent="code_analyst",
action="search_code",
summary=f"Query {i + 1} failed: {e}",
detail=str(e),
evidence_refs=[],
)
)
# Reassemble in original order
report_parts: list[str] = []
all_sections: list[CodeSection] = []
for i in range(len(code_queries)):
summary, flagged, _raw = results.get(i, ("No result", [], ""))
cq = code_queries[i]
report_parts.append(
f"### Query {i + 1}: {cq['focus_area']}\n{summary}"
)
all_sections.extend(flagged)
combined_report = "\n\n---\n\n".join(report_parts)
return {
"code_report": combined_report,
"code_sections": all_sections,
"discussion_log": discussion_messages,
"status_message": [
f"Code lookup complete. {len(all_sections)} relevant sections "
f"flagged across {len(code_queries)} queries."
],
}
def targeted_code_lookup(state: ComplianceState) -> dict:
"""Run additional code lookups requested by the compliance analyst.
These may use fetch_full_chapter for deeper context when the analyst
needs full exception text or cross-reference detail.
"""
additional_queries = state.get("additional_code_queries", [])
if not additional_queries:
return {
"status_message": ["No additional code queries."],
}
query_cache = QueryCache()
all_sections: list[CodeSection] = []
report_parts: list[str] = []
discussion_messages: list[AgentMessage] = []
for i, cq in enumerate(additional_queries):
discussion_messages.append(
AgentMessage(
timestamp=datetime.now().strftime("%H:%M:%S"),
agent="code_analyst",
action="search_code",
summary=f"Follow-up search: {cq['query'][:80]}...",
detail=f"Requested by compliance analyst.\nFocus: {cq['focus_area']}",
evidence_refs=[],
)
)
summary, flagged, _raw = _run_single_lookup(cq, query_cache)
report_parts.append(summary)
all_sections.extend(flagged)
section_ids = ", ".join(s["section_full"] for s in flagged)
discussion_messages.append(
AgentMessage(
timestamp=datetime.now().strftime("%H:%M:%S"),
agent="code_analyst",
action="search_code",
summary=(
f"Follow-up: flagged {len(flagged)} sections "
f"for '{cq['query']}'"
),
detail=(
f"**Query:** {cq['query']}\n"
f"**Focus:** {cq['focus_area']}\n\n"
f"**Sections:** {section_ids}\n\n"
f"{summary[:800]}"
),
evidence_refs=[s["section_full"] for s in flagged],
)
)
# Append to existing report
existing_report = state.get("code_report", "")
new_report = "\n\n---\n\n".join(report_parts)
combined_report = f"{existing_report}\n\n## FOLLOW-UP CODE RESEARCH\n\n{new_report}"
return {
"code_report": combined_report,
"code_sections": all_sections,
"additional_code_queries": [], # Clear after processing
"discussion_log": discussion_messages,
"status_message": [
f"Targeted code lookup complete. {len(all_sections)} additional sections "
f"from {len(additional_queries)} follow-up queries."
],
}
|