File size: 3,571 Bytes
b8e5043
a7c4301
 
 
 
b8e5043
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a7c4301
 
b8e5043
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""File tools - read, write, edit files with path safety."""

import aiofiles
from pathlib import Path


from utils.path_utils import safe_path


async def run_read(
    path: str, workdir: Path | None = None, limit: int | None = None
) -> str:
    """Read a file.

    Args:
        path: Relative or absolute file path.
        workdir: Base directory for path safety.
        limit: Maximum number of lines to read.

    Returns:
        File content or error message.
    """
    try:
        if workdir:
            file_path = safe_path(workdir, path)
        else:
            file_path = Path(path).resolve()

        if not file_path.exists():
            return f"Error: File not found: {path}"

        async with aiofiles.open(file_path, "r", encoding="utf-8", errors="replace") as f:
            if limit:
                lines = []
                for i, line in enumerate(await f.readlines()):
                    if i >= limit:
                        break
                    lines.append(line)
                content = "".join(lines)
            else:
                content = await f.read()

        # Truncate very long files
        if len(content) > 100000:
            content = content[:100000] + "\n\n... (file truncated at 100000 chars)"

        return content
    except ValueError as e:
        return f"Error: {e}"
    except Exception as e:
        return f"Error reading file: {e}"


async def run_write(path: str, content: str, workdir: Path | None = None) -> str:
    """Write content to a file, creating parent directories as needed.

    Args:
        path: Relative or absolute file path.
        content: Content to write.
        workdir: Base directory for path safety.

    Returns:
        Success message or error.
    """
    try:
        if workdir:
            file_path = safe_path(workdir, path)
        else:
            file_path = Path(path).resolve()

        file_path.parent.mkdir(parents=True, exist_ok=True)

        async with aiofiles.open(file_path, "w", encoding="utf-8") as f:
            await f.write(content)

        return f"File written: {file_path} ({len(content)} chars)"
    except ValueError as e:
        return f"Error: {e}"
    except Exception as e:
        return f"Error writing file: {e}"


async def run_edit(
    path: str, old_text: str, new_text: str, workdir: Path | None = None
) -> str:
    """Replace exact text in a file.

    Args:
        path: Relative or absolute file path.
        old_text: Exact text to find.
        new_text: Text to replace with.
        workdir: Base directory for path safety.

    Returns:
        Success message or error.
    """
    try:
        if workdir:
            file_path = safe_path(workdir, path)
        else:
            file_path = Path(path).resolve()

        if not file_path.exists():
            return f"Error: File not found: {path}"

        async with aiofiles.open(file_path, "r", encoding="utf-8") as f:
            content = await f.read()

        if old_text not in content:
            return f"Error: old_text not found in {path}"

        count = content.count(old_text)
        new_content = content.replace(old_text, new_text, 1)

        async with aiofiles.open(file_path, "w", encoding="utf-8") as f:
            await f.write(new_content)

        msg = f"File edited: {file_path}"
        if count > 1:
            msg += f" (replaced first of {count} occurrences)"
        return msg
    except ValueError as e:
        return f"Error: {e}"
    except Exception as e:
        return f"Error editing file: {e}"