MetaCortex-Dynamics commited on
Commit
65511e4
·
verified ·
1 Parent(s): a798fca

Create pipeline/stages/s1_segment.py

Browse files
Files changed (1) hide show
  1. pipeline/stages/s1_segment.py +131 -0
pipeline/stages/s1_segment.py ADDED
@@ -0,0 +1,131 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ S1: SEGMENT — Split input into atomic units.
3
+
4
+ Per SPEC-PIPELINE-001 Part B.2:
5
+ Verilog/SystemVerilog: split by module boundary.
6
+ Natural language docs: split by section/paragraph.
7
+ SVA: split by assertion.
8
+ CSV/JSON: split by record.
9
+
10
+ Invariant: Union of all segment byte-ranges = source file minus whitespace-only gaps.
11
+ """
12
+
13
+ from __future__ import annotations
14
+
15
+ from dataclasses import dataclass
16
+ from typing import Iterator
17
+
18
+ from pipeline.types import SourceProvenance, Tier
19
+
20
+
21
+ @dataclass
22
+ class Segment:
23
+ """An atomic unit of source content with byte-range traceability."""
24
+ source: SourceProvenance
25
+ byte_range: tuple[int, int] # (start, end) inclusive
26
+ text: str
27
+ segment_type: str # "module", "section", "assertion", "record"
28
+
29
+
30
+ def segment(source: SourceProvenance, content: bytes) -> Iterator[Segment]:
31
+ """Split source content into atomic segments based on tier and format."""
32
+ text = content.decode("utf-8", errors="replace")
33
+
34
+ if source.tier == Tier.T1:
35
+ yield from _segment_t1(source, text)
36
+ elif source.tier == Tier.T2:
37
+ yield from _segment_t2(source, text)
38
+ elif source.tier == Tier.T3:
39
+ yield from _segment_t3(source, text)
40
+
41
+
42
+ def _segment_t1(source: SourceProvenance, text: str) -> Iterator[Segment]:
43
+ """T1: EDA output — split by record (CSV line or JSON object)."""
44
+ offset = 0
45
+ for line in text.splitlines():
46
+ stripped = line.strip()
47
+ if stripped:
48
+ start = text.index(line, offset)
49
+ end = start + len(line) - 1
50
+ yield Segment(
51
+ source=source,
52
+ byte_range=(start, end),
53
+ text=stripped,
54
+ segment_type="record",
55
+ )
56
+ offset = text.index(line, offset) + len(line)
57
+
58
+
59
+ def _segment_t2(source: SourceProvenance, text: str) -> Iterator[Segment]:
60
+ """T2: RTL docs — split by module boundary or section heading."""
61
+ # Verilog module boundary detection
62
+ import re
63
+ module_pattern = re.compile(
64
+ r"(module\s+\w+[\s\S]*?endmodule)", re.MULTILINE
65
+ )
66
+ last_end = 0
67
+ for match in module_pattern.finditer(text):
68
+ # Emit any pre-module text as a section
69
+ if match.start() > last_end:
70
+ pre_text = text[last_end:match.start()].strip()
71
+ if pre_text:
72
+ yield Segment(
73
+ source=source,
74
+ byte_range=(last_end, match.start() - 1),
75
+ text=pre_text,
76
+ segment_type="section",
77
+ )
78
+ yield Segment(
79
+ source=source,
80
+ byte_range=(match.start(), match.end() - 1),
81
+ text=match.group(0),
82
+ segment_type="module",
83
+ )
84
+ last_end = match.end()
85
+
86
+ # Trailing content
87
+ if last_end < len(text):
88
+ trailing = text[last_end:].strip()
89
+ if trailing:
90
+ yield Segment(
91
+ source=source,
92
+ byte_range=(last_end, len(text) - 1),
93
+ text=trailing,
94
+ segment_type="section",
95
+ )
96
+
97
+
98
+ def _segment_t3(source: SourceProvenance, text: str) -> Iterator[Segment]:
99
+ """T3: Formal properties — split by assertion."""
100
+ import re
101
+ assertion_pattern = re.compile(
102
+ r"((?:assert|assume|cover)\s+property\s*\([\s\S]*?\)\s*;)", re.MULTILINE
103
+ )
104
+ last_end = 0
105
+ for match in assertion_pattern.finditer(text):
106
+ if match.start() > last_end:
107
+ pre_text = text[last_end:match.start()].strip()
108
+ if pre_text:
109
+ yield Segment(
110
+ source=source,
111
+ byte_range=(last_end, match.start() - 1),
112
+ text=pre_text,
113
+ segment_type="section",
114
+ )
115
+ yield Segment(
116
+ source=source,
117
+ byte_range=(match.start(), match.end() - 1),
118
+ text=match.group(0),
119
+ segment_type="assertion",
120
+ )
121
+ last_end = match.end()
122
+
123
+ if last_end < len(text):
124
+ trailing = text[last_end:].strip()
125
+ if trailing:
126
+ yield Segment(
127
+ source=source,
128
+ byte_range=(last_end, len(text) - 1),
129
+ text=trailing,
130
+ segment_type="section",
131
+ )