amine-yagoub commited on
Commit
7ddc81b
·
1 Parent(s): d973752

feat: implement AST-based code dependency graph and CrewAI knowledge integration

Browse files
src/code_tribunal/code_graph.py ADDED
@@ -0,0 +1,314 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """AST-based code dependency graph for structured code analysis."""
2
+
3
+ import ast
4
+ import re
5
+ from dataclasses import dataclass, field
6
+ from pathlib import Path
7
+
8
+
9
+ @dataclass
10
+ class CodeNode:
11
+ """A node in the code graph — file, function, class, or import."""
12
+
13
+ id: str
14
+ kind: str # "file" | "function" | "class" | "import"
15
+ name: str
16
+ file: str
17
+ line: int
18
+ metadata: dict = field(default_factory=dict)
19
+
20
+
21
+ @dataclass
22
+ class CodeEdge:
23
+ """A directed edge — call, import, containment, or inheritance."""
24
+
25
+ source: str
26
+ target: str
27
+ kind: str # "calls" | "imports" | "contains" | "inherits"
28
+
29
+
30
+ _SOURCE_EXTENSIONS = {".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".go", ".rb"}
31
+
32
+
33
+ class CodeGraph:
34
+ """Lightweight code dependency graph built from AST and regex analysis."""
35
+
36
+ def __init__(self) -> None:
37
+ self.nodes: dict[str, CodeNode] = {}
38
+ self.edges: list[CodeEdge] = []
39
+ self._file_contents: dict[str, list[str]] = {}
40
+
41
+ # ------------------------------------------------------------------
42
+ # Building
43
+ # ------------------------------------------------------------------
44
+
45
+ def build_from_directory(self, target_dir: str) -> None:
46
+ """Scan all source files in *target_dir* and populate the graph."""
47
+ root = Path(target_dir)
48
+ for p in sorted(root.rglob("*")):
49
+ if p.suffix in _SOURCE_EXTENSIONS and p.is_file():
50
+ self.build_from_file(str(p), str(root))
51
+
52
+ def build_from_file(self, filepath: str, root: str = "") -> None:
53
+ """Parse a single file and add its nodes/edges."""
54
+ path = Path(filepath)
55
+ rel = path.relative_to(root) if root else path.name
56
+ ext = path.suffix
57
+
58
+ try:
59
+ source = path.read_text(errors="replace")
60
+ except OSError:
61
+ return
62
+
63
+ self._file_contents[str(rel)] = source.splitlines()
64
+
65
+ # File-level node
66
+ file_id = str(rel)
67
+ self.nodes[file_id] = CodeNode(
68
+ id=file_id, kind="file", name=path.name, file=str(rel), line=0,
69
+ )
70
+
71
+ if ext == ".py":
72
+ self._parse_python(source, str(rel), file_id)
73
+ elif ext in {".js", ".ts", ".jsx", ".tsx"}:
74
+ self._parse_javascript(source, str(rel), file_id)
75
+
76
+ # ------------------------------------------------------------------
77
+ # Query helpers (used by tools)
78
+ # ------------------------------------------------------------------
79
+
80
+ def get_file_summary(self, filepath: str) -> str:
81
+ """Return a compact summary of what a file contains."""
82
+ lines: list[str] = []
83
+ for node in sorted(
84
+ (n for n in self.nodes.values() if n.file == filepath and n.kind != "file"),
85
+ key=lambda n: n.line,
86
+ ):
87
+ lines.append(f"L{node.line}: [{node.kind}] {node.name}")
88
+ if not lines:
89
+ return f"No structured elements found in {filepath}"
90
+ return f"=== {filepath} ===\n" + "\n".join(lines)
91
+
92
+ def get_function_source(self, filepath: str, function_name: str) -> str:
93
+ """Return the source lines of a function."""
94
+ raw_lines = self._file_contents.get(filepath, [])
95
+ if not raw_lines:
96
+ return f"File {filepath} not cached."
97
+
98
+ # Find the function node to get its starting line
99
+ for node in self.nodes.values():
100
+ if node.file == filepath and node.name == function_name and node.kind == "function":
101
+ start = node.line - 1 # 0-indexed
102
+ # Collect until next dedent or end of file
103
+ collected: list[str] = []
104
+ for i in range(start, len(raw_lines)):
105
+ if i > start and raw_lines[i].strip() and not raw_lines[i].startswith((" ", "\t")):
106
+ break
107
+ collected.append(f"{i+1}: {raw_lines[i]}")
108
+ return "\n".join(collected) if collected else "Function body not available."
109
+
110
+ return f"Function '{function_name}' not found in {filepath}."
111
+
112
+ def trace_calls(self, function_name: str, depth: int = 3) -> str:
113
+ """Trace the call chain from *function_name* up to *depth* hops."""
114
+ chains: list[list[str]] = []
115
+ self._trace_recursive(function_name, depth, [function_name], chains)
116
+ if not chains:
117
+ return f"No call chain found for '{function_name}'."
118
+ lines: list[str] = []
119
+ for chain in chains:
120
+ lines.append(" → ".join(chain))
121
+ return "\n".join(lines)
122
+
123
+ def get_callers(self, function_name: str) -> list[str]:
124
+ """Return all functions that call *function_name*."""
125
+ callers: list[str] = []
126
+ for edge in self.edges:
127
+ if edge.kind == "calls" and edge.target.endswith(f":{function_name}"):
128
+ callers.append(edge.source)
129
+ return callers
130
+
131
+ def get_imports(self, filepath: str) -> list[str]:
132
+ """Return all imports in a file."""
133
+ imports: list[str] = []
134
+ for edge in self.edges:
135
+ if edge.kind == "imports" and edge.source == filepath:
136
+ imports.append(edge.target)
137
+ return imports
138
+
139
+ def to_text(self) -> str:
140
+ """Compact text representation of the graph for LLM context."""
141
+ lines: list[str] = ["=== CODE DEPENDENCY GRAPH ==="]
142
+ lines.append(f"Nodes: {len(self.nodes)} | Edges: {len(self.edges)}")
143
+
144
+ # Group nodes by file
145
+ by_file: dict[str, list[CodeNode]] = {}
146
+ for node in self.nodes.values():
147
+ by_file.setdefault(node.file, []).append(node)
148
+
149
+ for filepath in sorted(by_file):
150
+ file_nodes = sorted(by_file[filepath], key=lambda n: n.line)
151
+ lines.append(f"\n--- {filepath} ---")
152
+ for node in file_nodes:
153
+ if node.kind == "file":
154
+ continue
155
+ decorators = node.metadata.get("decorators", [])
156
+ dec_str = f" @{','.join(decorators)}" if decorators else ""
157
+ lines.append(f" L{node.line} [{node.kind}] {node.name}{dec_str}")
158
+
159
+ # Edges for this file
160
+ file_edges = [
161
+ e for e in self.edges
162
+ if e.source == filepath or e.source.startswith(f"{filepath}:")
163
+ ]
164
+ if file_edges:
165
+ for edge in file_edges[:20]:
166
+ short = edge.target.split(":")[-1] if ":" in edge.target else edge.target
167
+ lines.append(f" --{edge.kind}--> {short}")
168
+
169
+ return "\n".join(lines)
170
+
171
+ def get_statistics(self) -> dict:
172
+ """Return counts by node kind and edge kind."""
173
+ node_kinds: dict[str, int] = {}
174
+ for node in self.nodes.values():
175
+ node_kinds[node.kind] = node_kinds.get(node.kind, 0) + 1
176
+ edge_kinds: dict[str, int] = {}
177
+ for edge in self.edges:
178
+ edge_kinds[edge.kind] = edge_kinds.get(edge.kind, 0) + 1
179
+ return {"nodes": node_kinds, "edges": edge_kinds}
180
+
181
+ # ------------------------------------------------------------------
182
+ # Internal parsers
183
+ # ------------------------------------------------------------------
184
+
185
+ def _trace_recursive(
186
+ self, name: str, depth: int, path: list[str], results: list[list[str]]
187
+ ) -> None:
188
+ if depth <= 0:
189
+ return
190
+ callers = self.get_callers(name)
191
+ if not callers:
192
+ results.append(path[:])
193
+ return
194
+ for caller_id in callers:
195
+ caller_name = caller_id.split(":")[-1] if ":" in caller_id else caller_id
196
+ if caller_name in path:
197
+ continue # avoid cycles
198
+ path.append(caller_name)
199
+ self._trace_recursive(caller_name, depth - 1, path, results)
200
+ path.pop()
201
+
202
+ def _add_node(self, node: CodeNode) -> None:
203
+ if node.id not in self.nodes:
204
+ self.nodes[node.id] = node
205
+
206
+ def _add_edge(self, edge: CodeEdge) -> None:
207
+ self.edges.append(edge)
208
+
209
+ # -- Python AST --
210
+
211
+ def _parse_python(self, source: str, rel_path: str, file_id: str) -> None:
212
+ try:
213
+ tree = ast.parse(source, filename=rel_path)
214
+ except SyntaxError:
215
+ return
216
+
217
+ for node in ast.iter_child_nodes(tree):
218
+ if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
219
+ func_id = f"{file_id}:{node.name}"
220
+ decorators = [
221
+ d.attr if isinstance(d, ast.Attribute) else (d.id if isinstance(d, ast.Name) else str(d))
222
+ for d in node.decorator_list
223
+ ]
224
+ self._add_node(CodeNode(
225
+ id=func_id, kind="function", name=node.name,
226
+ file=rel_path, line=node.lineno,
227
+ metadata={"decorators": decorators},
228
+ ))
229
+ self._add_edge(CodeEdge(source=file_id, target=func_id, kind="contains"))
230
+ self._collect_calls(node, func_id, file_id)
231
+
232
+ elif isinstance(node, ast.ClassDef):
233
+ cls_id = f"{file_id}:{node.name}"
234
+ decorators = [
235
+ d.attr if isinstance(d, ast.Attribute) else (d.id if isinstance(d, ast.Name) else str(d))
236
+ for d in node.decorator_list
237
+ ]
238
+ self._add_node(CodeNode(
239
+ id=cls_id, kind="class", name=node.name,
240
+ file=rel_path, line=node.lineno,
241
+ metadata={"decorators": decorators},
242
+ ))
243
+ self._add_edge(CodeEdge(source=file_id, target=cls_id, kind="contains"))
244
+ # Inheritance
245
+ for base in node.bases:
246
+ if isinstance(base, ast.Name):
247
+ self._add_edge(CodeEdge(source=cls_id, target=base.id, kind="inherits"))
248
+ # Methods
249
+ for item in ast.iter_child_nodes(node):
250
+ if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)):
251
+ method_id = f"{cls_id}.{item.name}"
252
+ self._add_node(CodeNode(
253
+ id=method_id, kind="function", name=item.name,
254
+ file=rel_path, line=item.lineno,
255
+ ))
256
+ self._add_edge(CodeEdge(source=cls_id, target=method_id, kind="contains"))
257
+ self._collect_calls(item, method_id, file_id)
258
+
259
+ elif isinstance(node, (ast.Import, ast.ImportFrom)):
260
+ names = [alias.name for alias in node.names]
261
+ for name in names:
262
+ self._add_edge(CodeEdge(source=file_id, target=name, kind="imports"))
263
+
264
+ def _collect_calls(self, func_node: ast.AST, caller_id: str, file_id: str) -> None:
265
+ """Walk *func_node* and record function calls as edges."""
266
+ for child in ast.walk(func_node):
267
+ if isinstance(child, ast.Call):
268
+ name = None
269
+ if isinstance(child.func, ast.Name):
270
+ name = child.func.id
271
+ elif isinstance(child.func, ast.Attribute):
272
+ name = child.func.attr
273
+ if name:
274
+ self._add_edge(CodeEdge(source=caller_id, target=name, kind="calls"))
275
+
276
+ # -- JavaScript / TypeScript regex --
277
+
278
+ _JS_FUNC_RE = re.compile(
279
+ r"(?:function\s+(\w+)|(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?(?:function|\([^)]*\)\s*=>)|"
280
+ r"(?:async\s+)?(\w+)\s*\([^)]*\)\s*\{)",
281
+ re.MULTILINE,
282
+ )
283
+ _JS_IMPORT_RE = re.compile(
284
+ r"(?:import\s+.*?from\s+['\"]([^'\"]+)['\"]|require\s*\(\s*['\"]([^'\"]+)['\"]\s*\))",
285
+ re.MULTILINE,
286
+ )
287
+ _JS_CALL_RE = re.compile(r"(\w+)\s*\(", re.MULTILINE)
288
+
289
+ def _parse_javascript(self, source: str, rel_path: str, file_id: str) -> None:
290
+ for m in self._JS_FUNC_RE.finditer(source):
291
+ name = m.group(1) or m.group(2) or m.group(3)
292
+ if not name or name in {"if", "for", "while", "switch", "catch", "return", "throw"}:
293
+ continue
294
+ line = source[: m.start()].count("\n") + 1
295
+ func_id = f"{file_id}:{name}"
296
+ self._add_node(CodeNode(
297
+ id=func_id, kind="function", name=name,
298
+ file=rel_path, line=line,
299
+ ))
300
+ self._add_edge(CodeEdge(source=file_id, target=func_id, kind="contains"))
301
+ # Collect calls within the function body (approximate — until next function)
302
+ body_start = m.end()
303
+ next_match = self._JS_FUNC_RE.search(source, m.end())
304
+ body_end = next_match.start() if next_match else len(source)
305
+ body = source[body_start:body_end]
306
+ for cm in self._JS_CALL_RE.finditer(body):
307
+ callee = cm.group(1)
308
+ if callee not in {"if", "for", "while", "switch", "catch", "return", "throw", "function", "const", "let", "var"}:
309
+ self._add_edge(CodeEdge(source=func_id, target=callee, kind="calls"))
310
+
311
+ for m in self._JS_IMPORT_RE.finditer(source):
312
+ module = m.group(1) or m.group(2)
313
+ if module:
314
+ self._add_edge(CodeEdge(source=file_id, target=module, kind="imports"))
src/code_tribunal/knowledge.py ADDED
@@ -0,0 +1,95 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """CrewAI knowledge sources for CodeTribunal."""
2
+
3
+ from pathlib import Path
4
+
5
+ from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
6
+
7
+ from code_tribunal.evidence import EvidenceReport, Finding
8
+ from code_tribunal.code_graph import CodeGraph
9
+
10
+
11
+ def build_evidence_knowledge(
12
+ report: EvidenceReport, code_graph: CodeGraph
13
+ ) -> list[StringKnowledgeSource]:
14
+ """Create CrewAI knowledge sources from evidence and code graph."""
15
+ sources: list[StringKnowledgeSource] = []
16
+
17
+ # Per-domain evidence findings
18
+ for domain, findings in report.findings_by_domain.items():
19
+ lines = [f"## {domain.upper()} FINDINGS ({len(findings)} total)\n"]
20
+ for f in findings:
21
+ fname = Path(f.file).name
22
+ lines.append(
23
+ f"- [{f.severity_hint}] {fname}:{f.line} — {f.code.strip()}"
24
+ )
25
+ lines.append(f" Category: {f.category} | Domain: {f.domain}")
26
+ content = "\n".join(lines)
27
+ sources.append(
28
+ StringKnowledgeSource(
29
+ content=content,
30
+ metadata={"domain": domain, "finding_count": len(findings)},
31
+ chunk_size=2000,
32
+ chunk_overlap=100,
33
+ )
34
+ )
35
+
36
+ # Code graph summary
37
+ graph_text = code_graph.to_text()
38
+ if graph_text.strip():
39
+ sources.append(
40
+ StringKnowledgeSource(
41
+ content=graph_text,
42
+ metadata={"type": "code_graph"},
43
+ chunk_size=3000,
44
+ chunk_overlap=200,
45
+ )
46
+ )
47
+
48
+ # Evidence summary (for quick reference)
49
+ summary = report.to_text()
50
+ if summary.strip():
51
+ sources.append(
52
+ StringKnowledgeSource(
53
+ content=summary,
54
+ metadata={"type": "evidence_summary"},
55
+ chunk_size=4000,
56
+ chunk_overlap=200,
57
+ )
58
+ )
59
+
60
+ return sources
61
+
62
+
63
+ def build_qa_knowledge(context: dict) -> list[StringKnowledgeSource]:
64
+ """Build knowledge sources for the interactive Q&A agent."""
65
+ sources: list[StringKnowledgeSource] = []
66
+
67
+ parts = []
68
+
69
+ if context.get("evidence"):
70
+ parts.append(f"## EVIDENCE REPORT\n{context['evidence']}")
71
+
72
+ if context.get("investigation"):
73
+ parts.append(f"## INVESTIGATION REPORTS\n{context['investigation']}")
74
+
75
+ if context.get("transcript"):
76
+ parts.append(f"## TRIAL TRANSCRIPT\n{context['transcript']}")
77
+
78
+ if context.get("verdict"):
79
+ parts.append(f"## VERDICT\n{context['verdict']}")
80
+
81
+ if context.get("report"):
82
+ parts.append(f"## FINAL REPORT\n{context['report']}")
83
+
84
+ if parts:
85
+ content = "\n\n".join(parts)
86
+ sources.append(
87
+ StringKnowledgeSource(
88
+ content=content,
89
+ metadata={"type": "full_trial_context"},
90
+ chunk_size=4000,
91
+ chunk_overlap=300,
92
+ )
93
+ )
94
+
95
+ return sources