File size: 7,898 Bytes
2c5ae19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
"""Code context utilities for BLUX-cA.

This module gives the Clarity Agent a structured view of a codebase:

- Resolves a project root.
- Reads files safely with byte limits.
- Extracts line ranges (for focused context windows).
- Detects anchor regions (e.g. ``# >>> MAIN_MENU`` / ``# <<< MAIN_MENU``).
- Iterates over source files by extension.

It is intentionally self-contained so it can be used from both the CLI and
higher-level orchestration layers.
"""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, Iterator, List, Optional, Sequence, Tuple

import re


ANCHOR_OPEN_PATTERN = re.compile(r"#\s*>>>\s*([A-Za-z0-9_\- ]+)")
ANCHOR_CLOSE_PATTERN = re.compile(r"#\s*<<<\s*([A-Za-z0-9_\- ]+)")


@dataclass(frozen=True)
class AnchorRegion:
    """Represents a logical region in a file delimited by anchors.

    Example:

        # >>> MAIN_MENU
        ...
        # <<< MAIN_MENU
    """

    name: str
    start_line: int
    end_line: int


@dataclass(frozen=True)
class FileSnippet:
    """A slice of a file with line number metadata."""

    path: Path
    start_line: int
    end_line: int
    text: str


class CodeContext:
    """Provides a project-rooted view of source files.

    Parameters
    ----------
    root:
        Optional project root. Defaults to the current working directory.
    max_bytes:
        Default maximum number of bytes to read from a file. Can be overridden
        per call.
    encoding:
        Text encoding used when reading files.
    """

    def __init__(
        self,
        root: Optional[Path] = None,
        *,
        max_bytes: int = 128_000,
        encoding: str = "utf-8",
    ) -> None:
        self._root = (root or Path.cwd()).resolve()
        self._max_bytes = max_bytes
        self._encoding = encoding

    @property
    def root(self) -> Path:
        return self._root

    def resolve(self, path: Path | str) -> Path:
        """Resolve a path against the project root."""
        p = Path(path)
        if not p.is_absolute():
            p = self._root / p
        return p.resolve()

    # --------------------------------------------------------------------- #
    # Basic file reading
    # --------------------------------------------------------------------- #

    def read_file(
        self,
        path: Path | str,
        *,
        max_bytes: Optional[int] = None,
    ) -> str:
        """Read up to ``max_bytes`` from a file, decoding as text.

        Raises
        ------
        FileNotFoundError
            If the file does not exist.
        """

        full_path = self.resolve(path)
        if not full_path.exists():
            raise FileNotFoundError(str(full_path))

        limit = max_bytes if max_bytes is not None else self._max_bytes
        data: bytes
        with full_path.open("rb") as handle:
            data = handle.read(limit)

        return data.decode(self._encoding, errors="replace")

    def read_lines(
        self,
        path: Path | str,
        start_line: int,
        end_line: int,
    ) -> FileSnippet:
        """Return a specific line range from a file (1-based, inclusive).

        If ``end_line`` exceeds the file length, it is clamped to the last line.
        """

        if start_line < 1:
            raise ValueError("start_line must be >= 1")
        if end_line < start_line:
            raise ValueError("end_line must be >= start_line")

        full_path = self.resolve(path)
        if not full_path.exists():
            raise FileNotFoundError(str(full_path))

        lines: List[str] = []
        with full_path.open("r", encoding=self._encoding, errors="replace") as handle:
            for idx, line in enumerate(handle, start=1):
                if idx > end_line:
                    break
                if idx >= start_line:
                    lines.append(line)

        actual_end = start_line + len(lines) - 1
        snippet_text = "".join(lines)

        return FileSnippet(
            path=full_path,
            start_line=start_line,
            end_line=actual_end,
            text=snippet_text,
        )

    # --------------------------------------------------------------------- #
    # Anchor detection
    # --------------------------------------------------------------------- #

    def find_anchors(self, path: Path | str) -> Dict[str, AnchorRegion]:
        """Detect anchor regions in a file.

        Anchors are defined using the BLUX-style convention:

            # >>> NAME
            # body
            # <<< NAME

        If a region has an opening anchor but no explicit closing anchor,
        the end line defaults to the last line in the file.

        Returns
        -------
        Dict[str, AnchorRegion]
            Mapping of anchor name to region (first occurrence wins).
        """

        full_path = self.resolve(path)
        if not full_path.exists():
            raise FileNotFoundError(str(full_path))

        anchors: Dict[str, AnchorRegion] = {}
        open_stack: Dict[str, int] = {}
        last_line_number = 0

        with full_path.open("r", encoding=self._encoding, errors="replace") as handle:
            for line_no, line in enumerate(handle, start=1):
                last_line_number = line_no

                open_match = ANCHOR_OPEN_PATTERN.search(line)
                if open_match:
                    name = open_match.group(1).strip()
                    # Only track first occurrence of each anchor.
                    if name not in anchors and name not in open_stack:
                        open_stack[name] = line_no
                    continue

                close_match = ANCHOR_CLOSE_PATTERN.search(line)
                if close_match:
                    name = close_match.group(1).strip()
                    start = open_stack.pop(name, None)
                    if start is not None and name not in anchors:
                        anchors[name] = AnchorRegion(
                            name=name,
                            start_line=start,
                            end_line=line_no,
                        )

        # Any unclosed anchors extend to end of file.
        for name, start in open_stack.items():
            if name not in anchors:
                anchors[name] = AnchorRegion(
                    name=name,
                    start_line=start,
                    end_line=last_line_number or start,
                )

        return anchors

    # --------------------------------------------------------------------- #
    # Repo scanning
    # --------------------------------------------------------------------- #

    def iter_source_files(
        self,
        exts: Sequence[str] = (".py", ".js", ".ts"),
        *,
        include_hidden: bool = False,
    ) -> Iterator[Path]:
        """Yield source files under the project root matching given extensions.

        Parameters
        ----------
        exts:
            File extensions (including leading dot) to include.
        include_hidden:
            If ``False`` (default), skip dot-dirs like ``.git`` and files whose
            name starts with a dot.
        """

        root = self._root
        ext_set = {e.lower() for e in exts}

        for path in root.rglob("*"):
            if not path.is_file():
                continue

            if not include_hidden:
                parts = path.relative_to(root).parts
                if any(part.startswith(".") for part in parts):
                    continue

            if path.suffix.lower() not in ext_set:
                continue

            yield path

    def snapshot(
        self,
        exts: Sequence[str] = (".py", ".js", ".ts"),
    ) -> List[Path]:
        """Return a materialized list of source files for quick inspection."""
        return list(self.iter_source_files(exts=exts))