File size: 6,628 Bytes
899a7c7
 
 
902cd29
899a7c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
902cd29
899a7c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1432cf4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
899a7c7
 
 
 
 
 
 
 
bc02c39
899a7c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
902cd29
899a7c7
 
902cd29
899a7c7
 
 
 
 
 
 
 
 
902cd29
899a7c7
 
902cd29
899a7c7
 
 
 
 
bc02c39
899a7c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b196357
 
 
 
 
 
 
 
 
 
 
 
899a7c7
 
 
1432cf4
899a7c7
b196357
 
 
899a7c7
 
 
 
 
 
 
b196357
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from __future__ import annotations

import json
import os
from pathlib import Path

from sqlmodel import Session, select

from db.schema import ModuleNode
from env.observation import CodeObservation, NeighborSummary, RequestedContext
from graph.graph_manager import GraphManager
from graph.token_budget import TokenBudget


DEFAULT_ACTIONS = [
	"FLAG_STYLE",
	"FLAG_BUG",
	"FLAG_SECURITY",
	"FLAG_DEPENDENCY_ISSUE",
	"ADD_COMMENT",
	"REQUEST_CONTEXT",
	"REQUEST_CHANGES",
	"APPROVE",
	"AMEND_REVIEW",
]


class ObservationBuilder:
	def __init__(self, source_root: str | Path, db_path: str | Path | None = None) -> None:
		self.graph_manager = GraphManager(source_root=source_root, db_path=db_path)
		self.token_budget = TokenBudget()
		self._expose_neighbor_reviews = os.getenv("GRAPHREVIEW_EXPOSE_NEIGHBOR_REVIEWS", "false").lower() == "true"

	def _fetch_node(self, module_id: str) -> ModuleNode:
		with Session(self.graph_manager.store.engine) as session:
			node = session.exec(
				select(ModuleNode).where(
					ModuleNode.source_root == self.graph_manager.store.config.source_root,
					ModuleNode.module_id == module_id,
				)
			).first()
		if not node:
			raise ValueError(f"Unknown module_id: {module_id}")
		return node

	@staticmethod
	def _ast_summary_payload(ast_summary: str) -> dict[str, object]:
		try:
			loaded = json.loads(ast_summary)
		except json.JSONDecodeError:
			return {"text": ast_summary}
		return loaded if isinstance(loaded, dict) else {"items": loaded}

	def _module_context_summary(
		self,
		node: ModuleNode,
		dependencies: list[str],
		dependents: list[str],
	) -> str:
		findings = self.graph_manager.store.get_findings(node.module_id)
		security_count = sum(1 for finding in findings if finding.tool == "bandit")
		high_severity = sum(1 for finding in findings if finding.severity.value == "high")
		parts = [
			node.summary or node.ast_summary,
			f"related_dependencies={', '.join(dependencies[:5]) or 'none'}",
			f"related_dependents={', '.join(dependents[:3]) or 'none'}",
			f"security_findings={security_count}",
			f"high_severity_findings={high_severity}",
		]
		return " | ".join(parts)

	def build(
		self,
		module_id: str,
		task_description: str,
		available_actions: list[str] | None = None,
		context_request: str | None = None,
	) -> CodeObservation:
		graph = self.graph_manager.load_graph()
		module_id = self.graph_manager.resolve_module_id(module_id)

		node = self._fetch_node(module_id)
		centrality = self.graph_manager.centrality()

		dependencies = list(graph.successors(module_id))
		dependents = list(graph.predecessors(module_id))

		dep_ranked = sorted(dependencies, key=lambda n: (-float(centrality.get(n, 0.0)), n))[:5]
		dependent_ranked = sorted(dependents, key=lambda n: (-float(centrality.get(n, 0.0)), n))[:3]

		dependency_summaries: list[NeighborSummary] = []
		dependent_summaries: list[NeighborSummary] = []
		neighbor_reviews: list[str] = []

		for dep_id in dep_ranked:
			dep_node = self._fetch_node(dep_id)
			dependency_summaries.append(
				NeighborSummary(
					module_id=dep_id,
					relation="dependency",
					summary=dep_node.summary or dep_node.ast_summary,
					review_snippet=dep_node.review_summary if self._expose_neighbor_reviews else None,
				)
			)
			if self._expose_neighbor_reviews and dep_node.review_summary:
				neighbor_reviews.append(f"{dep_id}: {dep_node.review_summary}")

		for depd_id in dependent_ranked:
			depd_node = self._fetch_node(depd_id)
			dependent_summaries.append(
				NeighborSummary(
					module_id=depd_id,
					relation="dependent",
					summary=depd_node.summary or depd_node.ast_summary,
					review_snippet=depd_node.review_summary if self._expose_neighbor_reviews else None,
				)
			)
			if self._expose_neighbor_reviews and depd_node.review_summary:
				neighbor_reviews.append(f"{depd_id}: {depd_node.review_summary}")

		requested_context: RequestedContext | None = None
		requested_context_code = ""
		if context_request:
			context_request = self.graph_manager.resolve_module_id(context_request)
			context_node = self._fetch_node(context_request)
			requested_context_code = context_node.raw_code

		actions = available_actions or DEFAULT_ACTIONS
		budgeted = self.token_budget.enforce(
			{
				"code": node.raw_code,
				"ast_summary_text": node.ast_summary,
				"dependency_summaries": [item.model_dump_json() for item in dependency_summaries],
				"dependent_summaries": [item.model_dump_json() for item in dependent_summaries],
				"neighbor_reviews": neighbor_reviews[:4],
				"task_description": task_description,
				"available_actions": actions,
				"requested_context_code": requested_context_code,
			}
		)

		if context_request:
			context_trimmed = budgeted.payload.get("requested_context_code", "")
			requested_context = RequestedContext(
				module_id=context_request,
				code=str(context_trimmed),
				was_truncated=str(context_trimmed) != requested_context_code,
			)

		dependency_summaries_bounded = self._trim_neighbor_summaries(
			dependency_summaries,
			str(budgeted.payload.get("dependency_summaries_text", "")),
		)
		dependent_summaries_bounded = self._trim_neighbor_summaries(
			dependent_summaries,
			str(budgeted.payload.get("dependent_summaries_text", "")),
		)
		neighbor_reviews_bounded = [
			line for line in str(budgeted.payload.get("neighbor_reviews_text", "")).splitlines() if line.strip()
		]

		return CodeObservation(
			module_id=module_id,
			code=str(budgeted.payload.get("code", "")),
			module_summary=self._module_context_summary(node, dep_ranked, dependent_ranked),
			ast_summary=self._ast_summary_payload(str(budgeted.payload.get("ast_summary_text", ""))),
			dependency_summaries=dependency_summaries_bounded,
			dependent_summaries=dependent_summaries_bounded,
			neighbor_reviews=neighbor_reviews_bounded,
			task_description=task_description,
			available_actions=actions,
			requested_context=requested_context,
			token_usage=budgeted.token_usage,
			total_tokens=budgeted.total_tokens,
			within_budget=budgeted.total_tokens <= self.token_budget.max_total_tokens,
		)

	@staticmethod
	def _trim_neighbor_summaries(
		summaries: list[NeighborSummary],
		serialized_text: str,
	) -> list[NeighborSummary]:
		if not summaries or not serialized_text.strip():
			return []

		max_count = serialized_text.count("\n") + 1
		bounded = summaries[:max_count]
		if "[TRUNCATED]" in serialized_text and bounded:
			last = bounded[-1]
			bounded[-1] = NeighborSummary(
				module_id=last.module_id,
				relation=last.relation,
				summary=f"{last.summary}\n... [TRUNCATED]",
				review_snippet=last.review_snippet,
			)
		return bounded