File size: 11,406 Bytes
7d4338a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
"""
Pure file operations for the text_editor plugin.

No agent/tool dependencies — only stdlib + tokens helper.
"""

import os
import shutil
import tempfile
from typing import TypedDict

from helpers import tokens

_BINARY_PEEK = 8192


# ------------------------------------------------------------------
# Binary detection
# ------------------------------------------------------------------

def is_binary(path: str) -> bool:
    """Detect binary file by checking for null bytes."""
    try:
        with open(path, "rb") as f:
            chunk = f.read(_BINARY_PEEK)
        return b"\x00" in chunk
    except OSError:
        return False


# ------------------------------------------------------------------
# File metadata
# ------------------------------------------------------------------

class FileInfo(TypedDict):
    exists: bool
    is_file: bool
    realpath: str
    expanded: str
    mtime: float | None


def file_info(path: str) -> FileInfo:
    """Return file metadata for mtime tracking and path resolution."""
    path = os.path.expanduser(path)
    rp = os.path.realpath(path)
    exists = os.path.exists(path)
    is_file = os.path.isfile(path)
    mtime = None
    if exists:
        try:
            mtime = os.path.getmtime(path)
        except OSError:
            pass
    return FileInfo(
        exists=exists,
        is_file=is_file,
        realpath=rp,
        expanded=path,
        mtime=mtime,
    )


# ------------------------------------------------------------------
# Read
# ------------------------------------------------------------------

class ReadResult(TypedDict):
    content: str
    total_lines: int
    warnings: str
    error: str


def read_file(
    path: str,
    line_from: int = 1,
    line_to: int | None = None,
    max_line_tokens: int = 500,
    default_line_count: int = 100,
    max_total_read_tokens: int = 4000,
) -> ReadResult:
    """
    Read a text file and return numbered lines with token budgeting.

    Line numbers are 1-based (matching grep, sed, editors).
    line_from and line_to are both inclusive.
    None line_to defaults to line_from + default_line_count - 1.
    """
    path = os.path.expanduser(path)

    if not os.path.isfile(path):
        return ReadResult(
            content="", total_lines=0, warnings="",
            error="file not found",
        )

    if is_binary(path):
        return ReadResult(
            content="", total_lines=0, warnings="",
            error="file appears binary, use terminal instead",
        )

    try:
        with open(path, "r", encoding="utf-8", errors="replace") as f:
            all_lines = f.readlines()
    except OSError as exc:
        return ReadResult(
            content="", total_lines=0, warnings="",
            error=str(exc),
        )

    total_lines = len(all_lines)
    line_from = max(line_from, 1)
    if line_to is None:
        line_to = line_from + default_line_count - 1
    line_to = min(line_to, total_lines)

    # Convert 1-based inclusive range to 0-based slice
    idx_from = line_from - 1
    idx_to = line_to  # slice is exclusive, line_to is inclusive 1-based
    selected = all_lines[idx_from:idx_to]
    num_width = len(str(line_to))

    warn_parts: list[str] = []
    cropped_lines: list[int] = []
    output_lines: list[str] = []
    running_tokens = 0
    trimmed_by_total = False

    for i, raw_line in enumerate(selected):
        line_no = line_from + i  # 1-based
        stripped = raw_line.rstrip("\n").rstrip("\r")
        line_tok = tokens.count_tokens(stripped)

        if line_tok > max_line_tokens:
            chars_per_tok = max(len(stripped) / line_tok, 1)
            keep_chars = int(max_line_tokens * chars_per_tok * tokens.TRIM_BUFFER)
            stripped = stripped[:keep_chars] + "..."
            cropped_lines.append(line_no)
            line_tok = max_line_tokens

        if running_tokens + line_tok > max_total_read_tokens:
            trimmed_by_total = True
            break

        running_tokens += line_tok
        output_lines.append(f"{line_no:>{num_width}} {stripped}")

    if cropped_lines:
        nums = " ".join(str(n) for n in cropped_lines)
        warn_parts.append(
            f"long lines {nums} cropped - use terminal for precise manipulation"
        )
    if trimmed_by_total:
        actual_end = line_from + len(output_lines)
        warn_parts.append(
            f"output trimmed at line {actual_end} due to token limit"
            " - use line_from/line_to for remaining"
        )

    warn_str = ""
    if warn_parts:
        warn_str = "\nwarning: " + "; ".join(warn_parts)

    return ReadResult(
        content="\n".join(output_lines),
        total_lines=total_lines,
        warnings=warn_str,
        error="",
    )


# ------------------------------------------------------------------
# Write
# ------------------------------------------------------------------

class WriteResult(TypedDict):
    total_lines: int
    error: str


def write_file(path: str, content: str | None) -> WriteResult:
    """Create or overwrite a file."""
    if content is None:
        content = ""
    path = os.path.expanduser(path)
    try:
        os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
        with open(path, "w", encoding="utf-8") as f:
            f.write(content)
    except OSError as exc:
        return WriteResult(total_lines=0, error=str(exc))

    total = content.count("\n") + (
        1 if content and not content.endswith("\n") else 0
    )
    return WriteResult(total_lines=total, error="")


# ------------------------------------------------------------------
# Patch
# ------------------------------------------------------------------

class PatchResult(TypedDict):
    total_lines: int
    edit_count: int
    error: str


def validate_edits(edits: list | None) -> tuple[list[dict], str]:
    """
    Normalise and validate an edits array.

    Line numbers are 1-based (matching grep, sed, editors).
    Semantics (to is inclusive):
      {from:2, to:2, content:"x\\n"} - replace line 2
      {from:1, to:3, content:"x\\n"} - replace lines 1-3
      {from:2, to:2}                 - delete line 2
      {from:5}  or {from:5, to:-1}   - insert before line 5 (no deletion)

    Returns (parsed_edits, error_string). error_string is empty on success.
    """
    if not edits or not isinstance(edits, list):
        return [], "edits array is required"

    parsed: list[dict] = []
    for e in edits:
        if not isinstance(e, dict):
            return [], f"invalid edit entry: {e}"
        frm = int(e.get("from", 0))
        if frm < 1:
            return [], f"edit missing or invalid from (must be >= 1): {e}"
        # to == -1 or absent means pure insert (no lines removed)
        to = int(e.get("to", -1))
        is_insert = to < 0 or to < frm
        if is_insert:
            to = frm - 1  # normalise: marks zero-width range
        parsed.append({
            "from": frm,
            "to": to,
            "content": e.get("content", ""),
            "insert": is_insert,
        })

    parsed.sort(key=lambda x: (x["from"], 0 if x["insert"] else 1))
    for i in range(1, len(parsed)):
        prev, cur = parsed[i - 1], parsed[i]
        # Inserts at the same line don't overlap with each other or
        # with a replace that starts at the same line.
        if prev["insert"]:
            continue
        # prev is a replace/delete: its range is [from..to] inclusive
        if cur["from"] <= prev["to"]:
            return [], (
                f"overlapping edits: edit at {prev['from']}"
                f" (to {prev['to']}) and {cur['from']}"
                f" (to {cur['to']})"
            )

    return parsed, ""


def apply_patch(path: str, edits: list[dict]) -> int:
    """
    Apply sorted, validated edits by streaming to a temp file.

    Line numbers are 1-based. Edits use inclusive 'to'.
    Inserts have 'insert': True.
    Returns total line count after patching.
    """
    # Ensure content always ends with newline to prevent line merging
    for e in edits:
        if e["content"] and not e["content"].endswith("\n"):
            e["content"] += "\n"

    dir_name = os.path.dirname(path) or "."
    fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp")
    try:
        with (
            open(path, "r", encoding="utf-8", errors="replace") as src,
            os.fdopen(fd, "w", encoding="utf-8") as dst,
        ):
            edit_idx = 0
            line_no = 1  # 1-based
            total_written = 0

            for raw_line in src:
                # Process all inserts targeting this line first
                while (
                    edit_idx < len(edits)
                    and edits[edit_idx]["insert"]
                    and edits[edit_idx]["from"] == line_no
                ):
                    edit = edits[edit_idx]
                    if edit["content"]:
                        dst.write(edit["content"])
                        total_written += _count_content_lines(edit["content"])
                    edit_idx += 1

                # Check if current line falls in a replace/delete range
                if edit_idx < len(edits) and not edits[edit_idx]["insert"]:
                    edit = edits[edit_idx]
                    if edit["from"] <= line_no <= edit["to"]:
                        # Write replacement content once at range start
                        if line_no == edit["from"] and edit["content"]:
                            dst.write(edit["content"])
                            total_written += _count_content_lines(
                                edit["content"]
                            )
                        # Skip original line; advance edit at range end
                        if line_no == edit["to"]:
                            edit_idx += 1
                        line_no += 1
                        continue

                dst.write(raw_line)
                total_written += 1
                line_no += 1

            # Remaining edits past end of file
            while edit_idx < len(edits):
                edit = edits[edit_idx]
                if edit["content"]:
                    dst.write(edit["content"])
                    total_written += _count_content_lines(edit["content"])
                edit_idx += 1

        shutil.move(tmp_path, path)
        return total_written
    except Exception:
        if os.path.exists(tmp_path):
            os.unlink(tmp_path)
        raise


def patch_file(path: str, edits: list | None) -> PatchResult:
    """Validate and apply edits to a file."""
    path = os.path.expanduser(path)
    if not os.path.isfile(path):
        return PatchResult(total_lines=0, edit_count=0, error="file not found")

    parsed, err = validate_edits(edits)
    if err:
        return PatchResult(total_lines=0, edit_count=0, error=err)

    try:
        total = apply_patch(path, parsed)
    except Exception as exc:
        return PatchResult(total_lines=0, edit_count=0, error=str(exc))

    return PatchResult(total_lines=total, edit_count=len(parsed), error="")


# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------

def _count_content_lines(content: str) -> int:
    return content.count("\n") + (
        1 if content and not content.endswith("\n") else 0
    )