| from importlib import util |
| import os |
| import re |
|
|
| from config import TOOLS_DIR, FUNCTIONS_DIR |
|
|
|
|
| def extract_frontmatter(file_path): |
| """ |
| Extract frontmatter as a dictionary from the specified file path. |
| """ |
| frontmatter = {} |
| frontmatter_started = False |
| frontmatter_ended = False |
| frontmatter_pattern = re.compile(r"^\s*([a-z_]+):\s*(.*)\s*$", re.IGNORECASE) |
|
|
| try: |
| with open(file_path, "r", encoding="utf-8") as file: |
| first_line = file.readline() |
| if first_line.strip() != '"""': |
| |
| return {} |
|
|
| frontmatter_started = True |
|
|
| for line in file: |
| if '"""' in line: |
| if frontmatter_started: |
| frontmatter_ended = True |
| break |
|
|
| if frontmatter_started and not frontmatter_ended: |
| match = frontmatter_pattern.match(line) |
| if match: |
| key, value = match.groups() |
| frontmatter[key.strip()] = value.strip() |
|
|
| except FileNotFoundError: |
| print(f"Error: The file {file_path} does not exist.") |
| return {} |
| except Exception as e: |
| print(f"An error occurred: {e}") |
| return {} |
|
|
| return frontmatter |
|
|
|
|
| def load_toolkit_module_by_id(toolkit_id): |
| toolkit_path = os.path.join(TOOLS_DIR, f"{toolkit_id}.py") |
| spec = util.spec_from_file_location(toolkit_id, toolkit_path) |
| module = util.module_from_spec(spec) |
| frontmatter = extract_frontmatter(toolkit_path) |
|
|
| try: |
| spec.loader.exec_module(module) |
| print(f"Loaded module: {module.__name__}") |
| if hasattr(module, "Tools"): |
| return module.Tools(), frontmatter |
| else: |
| raise Exception("No Tools class found") |
| except Exception as e: |
| print(f"Error loading module: {toolkit_id}") |
| |
| os.rename(toolkit_path, f"{toolkit_path}.error") |
| raise e |
|
|
|
|
| def load_function_module_by_id(function_id): |
| function_path = os.path.join(FUNCTIONS_DIR, f"{function_id}.py") |
|
|
| spec = util.spec_from_file_location(function_id, function_path) |
| module = util.module_from_spec(spec) |
| frontmatter = extract_frontmatter(function_path) |
|
|
| try: |
| spec.loader.exec_module(module) |
| print(f"Loaded module: {module.__name__}") |
| if hasattr(module, "Pipe"): |
| return module.Pipe(), "pipe", frontmatter |
| elif hasattr(module, "Filter"): |
| return module.Filter(), "filter", frontmatter |
| elif hasattr(module, "Action"): |
| return module.Action(), "action", frontmatter |
| else: |
| raise Exception("No Function class found") |
| except Exception as e: |
| print(f"Error loading module: {function_id}") |
| |
| os.rename(function_path, f"{function_path}.error") |
| raise e |
|
|