GraziePrego commited on
Commit
a4aade3
·
verified ·
1 Parent(s): 11ca46c

Upload helpers/files.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. helpers/files.py +753 -3
helpers/files.py CHANGED
@@ -1,3 +1,753 @@
1
- version https://git-lfs.github.com/spec/v1
2
- oid sha256:da0001d0762ce594b1d2c635baea06bcf692c0f85416b21f9d5e7e64fa0aa642
3
- size 24383
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from abc import ABC, abstractmethod
2
+ from fnmatch import fnmatch
3
+ import json
4
+ import os
5
+ import re
6
+ import base64
7
+ import shutil
8
+ import tempfile
9
+ from typing import Any, Literal
10
+ import zipfile
11
+ import glob
12
+ import mimetypes
13
+ from simpleeval import simple_eval
14
+ from helpers import yaml
15
+
16
+ AGENTS_DIR = "agents"
17
+ PLUGINS_DIR = "plugins"
18
+ PROJECTS_DIR = "projects"
19
+ EXTENSIONS_DIR = "extensions"
20
+ USER_DIR = "usr"
21
+ TEMP_DIR = "tmp"
22
+ API_DIR = "api"
23
+ _base_dir = os.path.dirname(os.path.abspath(os.path.join(__file__, "../")))
24
+
25
+ class VariablesPlugin(ABC):
26
+ @abstractmethod
27
+ def get_variables(self, file: str, backup_dirs: list[str] | None = None, **kwargs) -> dict[str, Any]: # type: ignore
28
+ pass
29
+
30
+
31
+ def load_plugin_variables(
32
+ file: str, backup_dirs: list[str] | None = None, **kwargs
33
+ ) -> dict[str, Any]:
34
+ if not file.endswith(".md"):
35
+ return {}
36
+
37
+ if backup_dirs is None:
38
+ backup_dirs = []
39
+
40
+ try:
41
+ # Create filename and directories list
42
+ plugin_filename = basename(file, ".md") + ".py"
43
+ directories = [dirname(file)] + backup_dirs
44
+ plugin_file = find_file_in_dirs(plugin_filename, directories)
45
+ except FileNotFoundError:
46
+ plugin_file = None
47
+
48
+ if plugin_file and exists(plugin_file):
49
+
50
+ from helpers import modules
51
+
52
+ classes = modules.load_classes_from_file(
53
+ plugin_file, VariablesPlugin, one_per_file=False
54
+ )
55
+ for cls in classes:
56
+ return cls().get_variables(file, backup_dirs, **kwargs) # type: ignore < abstract class here is ok, it is always a subclass
57
+
58
+ # load python code and extract variables variables from it
59
+ # module = None
60
+ # module_name = dirname(plugin_file).replace("/", ".") + "." + basename(plugin_file, '.py')
61
+
62
+ # try:
63
+ # spec = importlib.util.spec_from_file_location(module_name, plugin_file)
64
+ # if not spec:
65
+ # return {}
66
+ # module = importlib.util.module_from_spec(spec)
67
+ # sys.modules[spec.name] = module
68
+ # spec.loader.exec_module(module) # type: ignore
69
+ # except ImportError:
70
+ # return {}
71
+
72
+ # if module is None:
73
+ # return {}
74
+
75
+ # # Get all classes in the module
76
+ # class_list = inspect.getmembers(module, inspect.isclass)
77
+ # # Filter for classes that are subclasses of VariablesPlugin
78
+ # # iterate backwards to skip imported superclasses
79
+ # for cls in reversed(class_list):
80
+ # if cls[1] is not VariablesPlugin and issubclass(cls[1], VariablesPlugin):
81
+ # return cls[1]().get_variables() # type: ignore
82
+ return {}
83
+
84
+
85
+ from helpers.strings import sanitize_string
86
+
87
+
88
+ def parse_file(
89
+ _filename: str, _directories: list[str] | None = None, _encoding="utf-8", **kwargs
90
+ ):
91
+ if _directories is None:
92
+ _directories = []
93
+
94
+ # Find the file in the directories
95
+ absolute_path = find_file_in_dirs(_filename, _directories)
96
+
97
+ # Read the file content
98
+ with open(absolute_path, "r", encoding=_encoding) as f:
99
+ # content = remove_code_fences(f.read())
100
+ content = f.read()
101
+
102
+ is_json = is_full_json_template(content)
103
+ content = remove_code_fences(content)
104
+ variables = load_plugin_variables(absolute_path, _directories, **kwargs) or {} # type: ignore
105
+ variables.update(kwargs)
106
+ if is_json:
107
+ content = replace_placeholders_json(content, **variables)
108
+ obj = json.loads(content)
109
+ # obj = replace_placeholders_dict(obj, **variables)
110
+ return obj
111
+ else:
112
+ content = replace_placeholders_text(content, **variables)
113
+ # Process include statements
114
+ content = process_includes(
115
+ # here we use kwargs, the plugin variables are not inherited
116
+ content,
117
+ _directories,
118
+ **kwargs,
119
+ )
120
+ return content
121
+
122
+
123
+ def read_prompt_file(
124
+ _file: str, _directories: list[str] | None = None, _encoding="utf-8", **kwargs
125
+ ):
126
+ if _directories is None:
127
+ _directories = []
128
+
129
+ # If filename contains folder path, extract it and add to directories
130
+ if os.path.dirname(_file):
131
+ folder_path = os.path.dirname(_file)
132
+ _file = os.path.basename(_file)
133
+ _directories = [folder_path] + _directories
134
+
135
+ # Find the file in the directories
136
+ absolute_path = find_file_in_dirs(_file, _directories)
137
+ source_dir = os.path.dirname(absolute_path)
138
+
139
+ # Read the file content
140
+ with open(absolute_path, "r", encoding=_encoding) as f:
141
+ content = f.read()
142
+
143
+ variables = load_plugin_variables(_file, _directories, **kwargs) or {} # type: ignore
144
+ variables.update(kwargs)
145
+
146
+ # evaluate conditions
147
+ content = evaluate_text_conditions(content, **variables)
148
+
149
+ # Replace placeholders with values from kwargs
150
+ content = replace_placeholders_text(content, **variables)
151
+
152
+ # Process include statements (with source tracking for {{include original}})
153
+ content = process_includes(
154
+ # here we use kwargs, the plugin variables are not inherited
155
+ content,
156
+ _directories,
157
+ _source_file=_file,
158
+ _source_dir=source_dir,
159
+ **kwargs,
160
+ )
161
+
162
+ return content
163
+
164
+
165
+ def evaluate_text_conditions(_content: str, **kwargs):
166
+ # search for {{if ...}} ... {{endif}} blocks and evaluate conditions with nesting support
167
+ if_pattern = re.compile(r"{{\s*if\s+(.*?)}}", flags=re.DOTALL)
168
+ token_pattern = re.compile(r"{{\s*(if\b.*?|endif)\s*}}", flags=re.DOTALL)
169
+
170
+ def _process(text: str) -> str:
171
+ m_if = if_pattern.search(text)
172
+ if not m_if:
173
+ return text
174
+
175
+ depth = 1
176
+ pos = m_if.end()
177
+ while True:
178
+ m = token_pattern.search(text, pos)
179
+ if not m:
180
+ # Unterminated if-block, do not modify text
181
+ return text
182
+ token = m.group(1)
183
+ depth += 1 if token.startswith("if ") else -1
184
+ if depth == 0:
185
+ break
186
+ pos = m.end()
187
+
188
+ before = text[: m_if.start()]
189
+ condition = m_if.group(1).strip()
190
+ inner = text[m_if.end() : m.start()]
191
+ after = text[m.end() :]
192
+
193
+ try:
194
+ result = simple_eval(condition, names=kwargs)
195
+ except Exception:
196
+ # On evaluation error, do not modify this block
197
+ return text
198
+
199
+ if result:
200
+ # Keep inner content (processed recursively), remove if/endif markers
201
+ kept = before + _process(inner)
202
+ else:
203
+ # Skip entire block, including inner content and markers
204
+ kept = before
205
+
206
+ # Continue processing the remaining text after this block
207
+ return kept + _process(after)
208
+
209
+ return _process(_content)
210
+
211
+
212
+ def read_file(relative_path: str, encoding="utf-8"):
213
+ # Try to get the absolute path for the file from the original directory or backup directories
214
+ absolute_path = get_abs_path(relative_path)
215
+
216
+ # Read the file content
217
+ with open(absolute_path, "r", encoding=encoding) as f:
218
+ return f.read()
219
+
220
+ def read_file_json(relative_path: str, encoding="utf-8"):
221
+ # Try to get the absolute path for the file from the original directory or backup directories
222
+ absolute_path = get_abs_path(relative_path)
223
+
224
+ # Read the file content
225
+ with open(absolute_path, "r", encoding=encoding) as f:
226
+ return json.load(f)
227
+
228
+ def read_file_yaml(relative_path: str, encoding="utf-8"):
229
+ absolute_path = get_abs_path(relative_path)
230
+
231
+ with open(absolute_path, "r", encoding=encoding) as f:
232
+ return yaml.loads(f.read())
233
+
234
+ def read_file_bin(relative_path: str):
235
+ # Try to get the absolute path for the file from the original directory or backup directories
236
+ absolute_path = get_abs_path(relative_path)
237
+
238
+ # read binary content
239
+ with open(absolute_path, "rb") as f:
240
+ return f.read()
241
+
242
+
243
+ def read_file_base64(relative_path):
244
+ # get absolute path
245
+ absolute_path = get_abs_path(relative_path)
246
+
247
+ # read binary content and encode to base64
248
+ with open(absolute_path, "rb") as f:
249
+ return base64.b64encode(f.read()).decode("utf-8")
250
+
251
+
252
+ def is_probably_binary_bytes(data: bytes, threshold: float = 0.3) -> bool:
253
+ """
254
+ Binary detection.
255
+
256
+ - Fast path: NUL bytes => binary
257
+ - Otherwise: treat high ratio of suspicious ASCII control bytes as binary.
258
+ (We intentionally do NOT treat bytes >= 0x80 as binary to avoid false
259
+ positives for UTF-8 text.)
260
+ """
261
+ if not data:
262
+ return False
263
+ if b"\x00" in data:
264
+ return True
265
+
266
+ # Count suspicious control bytes
267
+ allowed = {8, 9, 10, 12, 13} # \b \t \n \f \r
268
+ suspicious = sum(1 for b in data if ((b < 32 and b not in allowed) or b == 127))
269
+ return (suspicious / len(data)) > threshold
270
+
271
+
272
+ def is_probably_binary_file(
273
+ file_path: str, sample_size: int = 10 * 1024, threshold: float = 0.3
274
+ ) -> bool:
275
+ """Binary detection by reading only the first ~sample_size bytes of a file."""
276
+ try:
277
+ with open(file_path, "rb") as f:
278
+ sample = f.read(sample_size)
279
+ except (FileNotFoundError, PermissionError, OSError):
280
+ raise OSError(f"Unable to read file for binary detection: {file_path}")
281
+ return is_probably_binary_bytes(sample, threshold=threshold)
282
+
283
+
284
+ def replace_placeholders_text(_content: str, **kwargs):
285
+ # Replace placeholders with values from kwargs
286
+ for key, value in kwargs.items():
287
+ placeholder = "{{" + key + "}}"
288
+ strval = str(value)
289
+ _content = _content.replace(placeholder, strval)
290
+ return _content
291
+
292
+
293
+ def replace_placeholders_json(_content: str, **kwargs):
294
+ # Replace placeholders with values from kwargs
295
+ for key, value in kwargs.items():
296
+ placeholder = "{{" + key + "}}"
297
+ if placeholder in _content:
298
+ strval = json.dumps(value)
299
+ _content = _content.replace(placeholder, strval)
300
+ return _content
301
+
302
+
303
+ def replace_placeholders_dict(_content: dict, **kwargs):
304
+ def replace_value(value):
305
+ if isinstance(value, str):
306
+ placeholders = re.findall(r"{{(\w+)}}", value)
307
+ if placeholders:
308
+ for placeholder in placeholders:
309
+ if placeholder in kwargs:
310
+ replacement = kwargs[placeholder]
311
+ if value == f"{{{{{placeholder}}}}}":
312
+ return replacement
313
+ elif isinstance(replacement, (dict, list)):
314
+ value = value.replace(
315
+ f"{{{{{placeholder}}}}}", json.dumps(replacement)
316
+ )
317
+ else:
318
+ value = value.replace(
319
+ f"{{{{{placeholder}}}}}", str(replacement)
320
+ )
321
+ return value
322
+ elif isinstance(value, dict):
323
+ return {k: replace_value(v) for k, v in value.items()}
324
+ elif isinstance(value, list):
325
+ return [replace_value(item) for item in value]
326
+ else:
327
+ return value
328
+
329
+ return replace_value(_content)
330
+
331
+
332
+ def process_includes(
333
+ _content: str,
334
+ _directories: list[str],
335
+ _source_file: str = "",
336
+ _source_dir: str = "",
337
+ **kwargs,
338
+ ):
339
+ # {{include original}} — include same file from lower-priority directory
340
+ original_pattern = re.compile(r"{{\s*include\s+original\s*}}")
341
+
342
+ def replace_original(match):
343
+ if not _source_file or not _source_dir:
344
+ return match.group(0)
345
+ remaining_dirs = _get_dirs_after(_directories, _source_dir)
346
+ if not remaining_dirs:
347
+ return ""
348
+ try:
349
+ return read_prompt_file(_source_file, remaining_dirs, **kwargs)
350
+ except FileNotFoundError:
351
+ return ""
352
+
353
+ _content = re.sub(original_pattern, replace_original, _content)
354
+
355
+ # {{ include 'path' }} — include a named file
356
+ include_pattern = re.compile(r"{{\s*include\s*['\"](.*?)['\"]\s*}}")
357
+
358
+ def replace_include(match):
359
+ include_path = match.group(1)
360
+ if os.path.isabs(include_path):
361
+ return match.group(0)
362
+ try:
363
+ return read_prompt_file(include_path, _directories, **kwargs)
364
+ except FileNotFoundError:
365
+ return match.group(0)
366
+
367
+ return re.sub(include_pattern, replace_include, _content)
368
+
369
+
370
+ def _get_dirs_after(_directories: list[str], _source_dir: str) -> list[str]:
371
+ """Return directories after _source_dir in the priority list."""
372
+ source_abs = os.path.normpath(os.path.abspath(_source_dir))
373
+ found = False
374
+ result: list[str] = []
375
+ for d in _directories:
376
+ d_abs = os.path.normpath(os.path.abspath(get_abs_path(d)))
377
+ if found:
378
+ result.append(d)
379
+ elif d_abs == source_abs:
380
+ found = True
381
+ return result
382
+
383
+
384
+ def find_file_in_dirs(_filename: str, _directories: list[str]):
385
+ """
386
+ This function searches for a filename in a list of directories in order.
387
+ Returns the absolute path of the first found file.
388
+ """
389
+ # Loop through the directories in order
390
+ for directory in _directories:
391
+ # Create full path
392
+ full_path = get_abs_path(directory, _filename)
393
+ if exists(full_path):
394
+ return full_path
395
+
396
+ # If the file is not found, raise FileNotFoundError
397
+ raise FileNotFoundError(
398
+ f"File '{_filename}' not found in any of the provided directories."
399
+ )
400
+
401
+
402
+ def get_unique_filenames_in_dirs(
403
+ dir_paths: list[str],
404
+ pattern: str = "*",
405
+ type: Literal["file", "dir", "any"] = "file",
406
+ ):
407
+ # returns absolute paths for unique filenames, priority by order in dir_paths
408
+ seen = set()
409
+ result = []
410
+ for dir_path in dir_paths:
411
+ full_dir = get_abs_path(dir_path)
412
+ for file_path in glob.glob(os.path.join(full_dir, pattern)):
413
+ fname = os.path.basename(file_path)
414
+ if fname not in seen and (
415
+ type == "any"
416
+ or (type == "file" and os.path.isfile(file_path))
417
+ or (type == "dir" and os.path.isdir(file_path))
418
+ ):
419
+ seen.add(fname)
420
+ result.append(get_abs_path(file_path))
421
+ # sort by filename (basename), not the full path
422
+ result.sort(key=lambda path: os.path.basename(path))
423
+ return result
424
+
425
+
426
+ def find_existing_paths_by_pattern(pattern: str):
427
+ if not pattern:
428
+ return []
429
+
430
+ search_pattern = get_abs_path(pattern)
431
+ matches = glob.glob(search_pattern, recursive=True)
432
+ matches.sort()
433
+ return matches
434
+
435
+
436
+ def remove_code_fences(text):
437
+ # Pattern to match code fences with optional language specifier
438
+ pattern = r"(```|~~~)(.*?\n)(.*?)(\1)"
439
+
440
+ # Function to replace the code fences
441
+ def replacer(match):
442
+ return match.group(3) # Return the code without fences
443
+
444
+ # Use re.DOTALL to make '.' match newlines
445
+ result = re.sub(pattern, replacer, text, flags=re.DOTALL)
446
+
447
+ return result
448
+
449
+
450
+ def is_full_json_template(text):
451
+ # Pattern to match the entire text enclosed in ```json or ~~~json fences
452
+ pattern = r"^\s*(```|~~~)\s*json\s*\n(.*?)\n\1\s*$"
453
+ # Use re.DOTALL to make '.' match newlines
454
+ match = re.fullmatch(pattern, text.strip(), flags=re.DOTALL)
455
+ return bool(match)
456
+
457
+
458
+ def write_file(relative_path: str, content: str, encoding: str = "utf-8"):
459
+ abs_path = get_abs_path(relative_path)
460
+ os.makedirs(os.path.dirname(abs_path), exist_ok=True)
461
+ content = sanitize_string(content, encoding)
462
+ with open(abs_path, "w", encoding=encoding) as f:
463
+ f.write(content)
464
+
465
+ def delete_file(relative_path: str):
466
+ abs_path = get_abs_path(relative_path)
467
+ if exists(abs_path):
468
+ os.remove(abs_path)
469
+
470
+ def write_file_bin(relative_path: str, content: bytes):
471
+ abs_path = get_abs_path(relative_path)
472
+ os.makedirs(os.path.dirname(abs_path), exist_ok=True)
473
+ with open(abs_path, "wb") as f:
474
+ f.write(content)
475
+
476
+
477
+ def write_file_base64(relative_path: str, content: str):
478
+ # decode base64 string to bytes
479
+ data = base64.b64decode(content)
480
+ abs_path = get_abs_path(relative_path)
481
+ os.makedirs(os.path.dirname(abs_path), exist_ok=True)
482
+ with open(abs_path, "wb") as f:
483
+ f.write(data)
484
+
485
+
486
+ def delete_dir(relative_path: str):
487
+ # ensure deletion of directory without propagating errors
488
+ abs_path = get_abs_path(relative_path)
489
+ if os.path.exists(abs_path):
490
+ # first try with ignore_errors=True which is the safest option
491
+ shutil.rmtree(abs_path, ignore_errors=True)
492
+
493
+ # if directory still exists, try more aggressive methods
494
+ if os.path.exists(abs_path):
495
+ try:
496
+ # try to change permissions and delete again
497
+ for root, dirs, files in os.walk(abs_path, topdown=False):
498
+ for name in files:
499
+ file_path = os.path.join(root, name)
500
+ os.chmod(file_path, 0o777)
501
+ for name in dirs:
502
+ dir_path = os.path.join(root, name)
503
+ os.chmod(dir_path, 0o777)
504
+
505
+ # try again after changing permissions
506
+ shutil.rmtree(abs_path, ignore_errors=True)
507
+ except:
508
+ # suppress all errors - we're ensuring no errors propagate
509
+ pass
510
+
511
+
512
+ def move_dir(old_path: str, new_path: str):
513
+ # rename/move the directory from old_path to new_path (both relative)
514
+ abs_old = get_abs_path(old_path)
515
+ abs_new = get_abs_path(new_path)
516
+ if not os.path.isdir(abs_old):
517
+ return # nothing to rename
518
+
519
+ # ensure parent directory exists
520
+ os.makedirs(os.path.dirname(abs_new), exist_ok=True)
521
+
522
+ try:
523
+ os.rename(abs_old, abs_new)
524
+ except OSError:
525
+ # os.rename fails across Docker volume mount points
526
+ import shutil
527
+ shutil.move(abs_old, abs_new)
528
+
529
+
530
+ # move dir safely, remove with number if needed
531
+ def move_dir_safe(src, dst, rename_format="{name}_{number}"):
532
+ base_dst = dst
533
+ i = 2
534
+ while exists(dst):
535
+ dst = rename_format.format(name=base_dst, number=i)
536
+ i += 1
537
+ move_dir(src, dst)
538
+ return dst
539
+
540
+
541
+ # create dir safely, add number if needed
542
+ def create_dir_safe(dst, rename_format="{name}_{number}"):
543
+ base_dst = dst
544
+ i = 2
545
+ while exists(dst):
546
+ dst = rename_format.format(name=base_dst, number=i)
547
+ i += 1
548
+ create_dir(dst)
549
+ return dst
550
+
551
+
552
+ def create_dir(relative_path: str):
553
+ abs_path = get_abs_path(relative_path)
554
+ os.makedirs(abs_path, exist_ok=True)
555
+
556
+
557
+ def list_files(relative_path: str, filter: str = "*"):
558
+ abs_path = get_abs_path(relative_path)
559
+ if not os.path.exists(abs_path):
560
+ return []
561
+ return [file for file in os.listdir(abs_path) if fnmatch(file, filter)]
562
+
563
+
564
+ def make_dirs(relative_path: str):
565
+ abs_path = get_abs_path(relative_path)
566
+ os.makedirs(os.path.dirname(abs_path), exist_ok=True)
567
+
568
+
569
+ def _resolve_path(*relative_paths):
570
+ if len(relative_paths) == 1 and os.path.isabs(relative_paths[0]):
571
+ return relative_paths[0]
572
+ return os.path.join(_base_dir, *relative_paths)
573
+
574
+
575
+ def get_abs_path(*relative_paths):
576
+ "Convert relative paths to absolute paths based on the base directory."
577
+ return _resolve_path(*relative_paths)
578
+
579
+
580
+ def get_abs_path_dockerized(*relative_paths):
581
+ "Ensures the abs path is dockerized (i.e. /a0/... path)"
582
+ abs = get_abs_path(*relative_paths)
583
+ from helpers import runtime
584
+
585
+ if runtime.is_dockerized():
586
+ return abs
587
+ return normalize_a0_path(abs)
588
+
589
+
590
+ def get_abs_path_development(*relative_paths):
591
+ "Ensures the abs path is relevant for dev environment"
592
+ abs = get_abs_path(*relative_paths)
593
+ return fix_dev_path(abs)
594
+
595
+
596
+ def deabsolute_path(path: str):
597
+ "Convert absolute paths to relative paths based on the base directory."
598
+ return os.path.relpath(path, get_base_dir())
599
+
600
+
601
+ def fix_dev_path(path: str):
602
+ "On dev environment, convert /a0/... paths to local absolute paths"
603
+ from helpers.runtime import is_development
604
+
605
+ if is_development():
606
+ if path.startswith("/a0/"):
607
+ path = path.replace("/a0/", "")
608
+ return get_abs_path(path)
609
+
610
+
611
+ def normalize_a0_path(path: str):
612
+ "Convert absolute paths into /a0/... paths"
613
+ if is_in_base_dir(path):
614
+ deabs = deabsolute_path(path)
615
+ return "/a0/" + deabs
616
+ return path
617
+
618
+
619
+ def exists(*relative_paths):
620
+ path = _resolve_path(*relative_paths)
621
+ return os.path.exists(path)
622
+
623
+
624
+ def is_file(*relative_paths):
625
+ path = _resolve_path(*relative_paths)
626
+ return os.path.isfile(path)
627
+
628
+
629
+ def is_dir(*relative_paths):
630
+ path = _resolve_path(*relative_paths)
631
+ return os.path.isdir(path)
632
+
633
+
634
+ def get_base_dir():
635
+ return _base_dir
636
+
637
+
638
+ def basename(path: str, suffix: str | None = None):
639
+ if suffix:
640
+ return os.path.basename(path).removesuffix(suffix)
641
+ return os.path.basename(path)
642
+
643
+
644
+ def dirname(path: str):
645
+ return os.path.dirname(path)
646
+
647
+
648
+ def is_in_base_dir(path: str):
649
+ return is_in_dir(path, get_base_dir())
650
+
651
+
652
+ def is_in_dir(path: str, dir: str):
653
+ # check if the given path is within the directory
654
+ abs_path = os.path.abspath(path)
655
+ abs_dir = os.path.abspath(dir)
656
+ return os.path.commonpath([abs_path, abs_dir]) == abs_dir
657
+
658
+
659
+ def get_subdirectories(
660
+ relative_path: str,
661
+ include: str | list[str] = "*",
662
+ exclude: str | list[str] | None = None,
663
+ ):
664
+ abs_path = get_abs_path(relative_path)
665
+ if not os.path.exists(abs_path):
666
+ return []
667
+ if isinstance(include, str):
668
+ include = [include]
669
+ if isinstance(exclude, str):
670
+ exclude = [exclude]
671
+ return [
672
+ subdir
673
+ for subdir in os.listdir(abs_path)
674
+ if os.path.isdir(os.path.join(abs_path, subdir))
675
+ and any(fnmatch(subdir, inc) for inc in include)
676
+ and (exclude is None or not any(fnmatch(subdir, exc) for exc in exclude))
677
+ ]
678
+
679
+
680
+ def zip_dir(dir_path: str):
681
+ full_path = get_abs_path(dir_path)
682
+ zip_file_path = tempfile.NamedTemporaryFile(suffix=".zip", delete=False).name
683
+ base_name = os.path.basename(full_path)
684
+ with zipfile.ZipFile(zip_file_path, "w", compression=zipfile.ZIP_DEFLATED) as zip:
685
+ for root, _, files in os.walk(full_path):
686
+ for file in files:
687
+ file_path = os.path.join(root, file)
688
+ rel_path = os.path.relpath(file_path, full_path)
689
+ zip.write(file_path, os.path.join(base_name, rel_path))
690
+ return zip_file_path
691
+
692
+
693
+ def move_file(relative_path: str, new_path: str):
694
+ abs_path = get_abs_path(relative_path)
695
+ new_abs_path = get_abs_path(new_path)
696
+ os.makedirs(os.path.dirname(new_abs_path), exist_ok=True)
697
+ try:
698
+ os.rename(abs_path, new_abs_path)
699
+ except OSError:
700
+ # fallback to copy and delete
701
+ import shutil
702
+
703
+ shutil.copy2(abs_path, new_abs_path)
704
+ try:
705
+ os.unlink(abs_path)
706
+ except OSError:
707
+ pass
708
+
709
+
710
+ def safe_file_name(filename: str) -> str:
711
+ # Replace any character that's not alphanumeric, dash, underscore, or dot with underscore
712
+ return re.sub(r"[^a-zA-Z0-9-._]", "_", filename)
713
+
714
+
715
+ def read_text_files_in_dir(
716
+ dir_path: str, max_size: int = 1024 * 1024, pattern: str = "*"
717
+ ) -> dict[str, str]:
718
+
719
+ abs_path = get_abs_path(dir_path)
720
+ if not os.path.exists(abs_path):
721
+ return {}
722
+ result = {}
723
+ for file_path in [os.path.join(abs_path, f) for f in os.listdir(abs_path)]:
724
+ try:
725
+ if not os.path.isfile(file_path):
726
+ continue
727
+ if not fnmatch(os.path.basename(file_path), pattern):
728
+ continue
729
+ if max_size > 0 and os.path.getsize(file_path) > max_size:
730
+ continue
731
+ mime, _ = mimetypes.guess_type(file_path)
732
+ if mime is not None and not mime.startswith("text"):
733
+ continue
734
+ # Check if file is binary by reading a small chunk
735
+ content = read_file(file_path)
736
+ result[os.path.basename(file_path)] = content
737
+ except Exception:
738
+ continue
739
+ return result
740
+
741
+
742
+ def list_files_in_dir_recursively(relative_path: str) -> list[str]:
743
+ abs_path = get_abs_path(relative_path)
744
+ if not os.path.exists(abs_path):
745
+ return []
746
+ result = []
747
+ for root, dirs, files in os.walk(abs_path):
748
+ for file in files:
749
+ file_path = os.path.join(root, file)
750
+ # Return relative path from the base directory
751
+ rel_path = os.path.relpath(file_path, abs_path)
752
+ result.append(rel_path)
753
+ return result