| | """code_lookup node — lightweight snippet reviewer (no multi-turn tool loop).
|
| |
|
| | Flow:
|
| | 1. discover_code_locations() — ChromaDB semantic search (~1-2 sec per query)
|
| | 2. GPT reviews the raw snippets in a SINGLE call — flags relevant ones with
|
| | a relevance tag and brief note
|
| | 3. Raw flagged snippets + GPT notes go to the compliance analyst
|
| |
|
| | No fetch_full_chapter in the initial pass. The compliance analyst can request
|
| | targeted chapter fetches via additional_code_queries if it needs more context.
|
| | """
|
| | from __future__ import annotations
|
| |
|
| | import json
|
| | import logging
|
| | from concurrent.futures import ThreadPoolExecutor, as_completed
|
| | from datetime import datetime
|
| |
|
| | from openai import OpenAI
|
| |
|
| | from config import CODE_LOOKUP_MODEL, OPENAI_API_KEY
|
| | from prompts.code_lookup import CODE_REVIEWER_SYSTEM_PROMPT
|
| | from state import AgentMessage, CodeQuery, CodeSection, ComplianceState
|
| | from tools.chroma_tools import QueryCache, discover_code_locations
|
| |
|
| | logger = logging.getLogger(__name__)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def _review_snippets(
|
| | research_goal: str,
|
| | discover_report: str,
|
| | ) -> tuple[str, list[CodeSection]]:
|
| | """GPT reviews discover results in ONE call.
|
| |
|
| | Returns (brief_review, flagged_sections).
|
| | """
|
| | client = OpenAI(api_key=OPENAI_API_KEY)
|
| |
|
| | response = client.chat.completions.create(
|
| | model=CODE_LOOKUP_MODEL,
|
| | messages=[
|
| | {"role": "system", "content": CODE_REVIEWER_SYSTEM_PROMPT},
|
| | {
|
| | "role": "user",
|
| | "content": (
|
| | f"## Research Goal\n{research_goal}\n\n"
|
| | f"## Code Snippets from Database\n{discover_report}"
|
| | ),
|
| | },
|
| | ],
|
| | response_format={"type": "json_object"},
|
| | )
|
| |
|
| | raw = response.choices[0].message.content or "{}"
|
| | try:
|
| | parsed = json.loads(raw)
|
| | except json.JSONDecodeError:
|
| | logger.warning("GPT snippet review returned invalid JSON, using raw text.")
|
| | return raw, []
|
| |
|
| | flagged_sections: list[CodeSection] = []
|
| | for item in parsed.get("relevant_sections", []):
|
| | flagged_sections.append(
|
| | CodeSection(
|
| | section_full=item.get("section_id", "?"),
|
| | code_type=item.get("code_type", "?"),
|
| | parent_major=item.get("chapter", "?"),
|
| | text=item.get("snippet", "")[:1500],
|
| | relevance=item.get("relevance_note", ""),
|
| | )
|
| | )
|
| |
|
| | summary = parsed.get("summary", "No summary provided.")
|
| | return summary, flagged_sections
|
| |
|
| |
|
| | def _run_single_lookup(
|
| | cq: CodeQuery,
|
| | query_cache: QueryCache | None = None,
|
| | ) -> tuple[str, list[CodeSection], str]:
|
| | """Run discover + review for ONE code query.
|
| |
|
| | Returns (summary, flagged_sections, discover_report_raw).
|
| | """
|
| | research_goal = f"{cq['query']} (Context: {cq['context']})"
|
| | logger.info("ChromaDB query: %s", research_goal)
|
| |
|
| |
|
| | discover_report = discover_code_locations(research_goal, cache=query_cache)
|
| |
|
| |
|
| | summary, flagged = _review_snippets(research_goal, discover_report)
|
| |
|
| | return summary, flagged, discover_report
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def initial_code_lookup(state: ComplianceState) -> dict:
|
| | """Run the initial code lookup based on planner's code_queries.
|
| |
|
| | All queries run in PARALLEL via ThreadPoolExecutor.
|
| | Each query = 1 discover + 1 GPT review call (no multi-turn loop).
|
| | """
|
| | code_queries = state.get("code_queries", [])
|
| | if not code_queries:
|
| | return {
|
| | "code_report": "No code queries were planned.",
|
| | "code_sections": [],
|
| | "discussion_log": [
|
| | AgentMessage(
|
| | timestamp=datetime.now().strftime("%H:%M:%S"),
|
| | agent="code_analyst",
|
| | action="search_code",
|
| | summary="No code queries to execute.",
|
| | detail="The planner did not generate any code queries.",
|
| | evidence_refs=[],
|
| | )
|
| | ],
|
| | "status_message": ["No code queries to execute."],
|
| | }
|
| |
|
| | query_cache = QueryCache()
|
| | discussion_messages: list[AgentMessage] = []
|
| |
|
| |
|
| | for cq in code_queries:
|
| | discussion_messages.append(
|
| | AgentMessage(
|
| | timestamp=datetime.now().strftime("%H:%M:%S"),
|
| | agent="code_analyst",
|
| | action="search_code",
|
| | summary=f"Searching: {cq['query'][:80]}...",
|
| | detail=f"Focus area: {cq['focus_area']}\nContext: {cq['context']}",
|
| | evidence_refs=[],
|
| | )
|
| | )
|
| |
|
| |
|
| | results: dict[int, tuple[str, list[CodeSection], str]] = {}
|
| |
|
| | with ThreadPoolExecutor(max_workers=min(len(code_queries), 4)) as pool:
|
| | futures = {
|
| | pool.submit(_run_single_lookup, cq, query_cache): i
|
| | for i, cq in enumerate(code_queries)
|
| | }
|
| | for future in as_completed(futures):
|
| | i = futures[future]
|
| | try:
|
| | summary, flagged, _raw = future.result()
|
| | results[i] = (summary, flagged, _raw)
|
| | cq = code_queries[i]
|
| | section_ids = ", ".join(
|
| | s["section_full"] for s in flagged
|
| | )
|
| | discussion_messages.append(
|
| | AgentMessage(
|
| | timestamp=datetime.now().strftime("%H:%M:%S"),
|
| | agent="code_analyst",
|
| | action="search_code",
|
| | summary=(
|
| | f"Flagged {len(flagged)} sections "
|
| | f"for '{cq['query']}'"
|
| | ),
|
| | detail=(
|
| | f"**Query:** {cq['query']}\n"
|
| | f"**Focus:** {cq['focus_area']}\n\n"
|
| | f"**Sections:** {section_ids}\n\n"
|
| | f"{summary[:800]}"
|
| | ),
|
| | evidence_refs=[s["section_full"] for s in flagged],
|
| | )
|
| | )
|
| | except Exception as e:
|
| | logger.error("Code query %d failed: %s", i, e)
|
| | results[i] = (f"Error: {e}", [], "")
|
| | discussion_messages.append(
|
| | AgentMessage(
|
| | timestamp=datetime.now().strftime("%H:%M:%S"),
|
| | agent="code_analyst",
|
| | action="search_code",
|
| | summary=f"Query {i + 1} failed: {e}",
|
| | detail=str(e),
|
| | evidence_refs=[],
|
| | )
|
| | )
|
| |
|
| |
|
| | report_parts: list[str] = []
|
| | all_sections: list[CodeSection] = []
|
| | for i in range(len(code_queries)):
|
| | summary, flagged, _raw = results.get(i, ("No result", [], ""))
|
| | cq = code_queries[i]
|
| | report_parts.append(
|
| | f"### Query {i + 1}: {cq['focus_area']}\n{summary}"
|
| | )
|
| | all_sections.extend(flagged)
|
| |
|
| | combined_report = "\n\n---\n\n".join(report_parts)
|
| |
|
| | return {
|
| | "code_report": combined_report,
|
| | "code_sections": all_sections,
|
| | "discussion_log": discussion_messages,
|
| | "status_message": [
|
| | f"Code lookup complete. {len(all_sections)} relevant sections "
|
| | f"flagged across {len(code_queries)} queries."
|
| | ],
|
| | }
|
| |
|
| |
|
| | def targeted_code_lookup(state: ComplianceState) -> dict:
|
| | """Run additional code lookups requested by the compliance analyst.
|
| |
|
| | These may use fetch_full_chapter for deeper context when the analyst
|
| | needs full exception text or cross-reference detail.
|
| | """
|
| | additional_queries = state.get("additional_code_queries", [])
|
| | if not additional_queries:
|
| | return {
|
| | "status_message": ["No additional code queries."],
|
| | }
|
| |
|
| | query_cache = QueryCache()
|
| | all_sections: list[CodeSection] = []
|
| | report_parts: list[str] = []
|
| | discussion_messages: list[AgentMessage] = []
|
| |
|
| | for i, cq in enumerate(additional_queries):
|
| | discussion_messages.append(
|
| | AgentMessage(
|
| | timestamp=datetime.now().strftime("%H:%M:%S"),
|
| | agent="code_analyst",
|
| | action="search_code",
|
| | summary=f"Follow-up search: {cq['query'][:80]}...",
|
| | detail=f"Requested by compliance analyst.\nFocus: {cq['focus_area']}",
|
| | evidence_refs=[],
|
| | )
|
| | )
|
| |
|
| | summary, flagged, _raw = _run_single_lookup(cq, query_cache)
|
| | report_parts.append(summary)
|
| | all_sections.extend(flagged)
|
| |
|
| | section_ids = ", ".join(s["section_full"] for s in flagged)
|
| | discussion_messages.append(
|
| | AgentMessage(
|
| | timestamp=datetime.now().strftime("%H:%M:%S"),
|
| | agent="code_analyst",
|
| | action="search_code",
|
| | summary=(
|
| | f"Follow-up: flagged {len(flagged)} sections "
|
| | f"for '{cq['query']}'"
|
| | ),
|
| | detail=(
|
| | f"**Query:** {cq['query']}\n"
|
| | f"**Focus:** {cq['focus_area']}\n\n"
|
| | f"**Sections:** {section_ids}\n\n"
|
| | f"{summary[:800]}"
|
| | ),
|
| | evidence_refs=[s["section_full"] for s in flagged],
|
| | )
|
| | )
|
| |
|
| |
|
| | existing_report = state.get("code_report", "")
|
| | new_report = "\n\n---\n\n".join(report_parts)
|
| | combined_report = f"{existing_report}\n\n## FOLLOW-UP CODE RESEARCH\n\n{new_report}"
|
| |
|
| | return {
|
| | "code_report": combined_report,
|
| | "code_sections": all_sections,
|
| | "additional_code_queries": [],
|
| | "discussion_log": discussion_messages,
|
| | "status_message": [
|
| | f"Targeted code lookup complete. {len(all_sections)} additional sections "
|
| | f"from {len(additional_queries)} follow-up queries."
|
| | ],
|
| | }
|
| |
|