| """File operations for AutoGPT""" |
| from __future__ import annotations |
|
|
| import os |
| import os.path |
| from typing import Generator |
|
|
| import requests |
| from colorama import Back, Fore |
| from requests.adapters import HTTPAdapter, Retry |
|
|
| from autogpt.spinner import Spinner |
| from autogpt.utils import readable_file_size |
| from autogpt.workspace import WORKSPACE_PATH, path_in_workspace |
|
|
| LOG_FILE = "file_logger.txt" |
| LOG_FILE_PATH = WORKSPACE_PATH / LOG_FILE |
|
|
|
|
| def check_duplicate_operation(operation: str, filename: str) -> bool: |
| """Check if the operation has already been performed on the given file |
| |
| Args: |
| operation (str): The operation to check for |
| filename (str): The name of the file to check for |
| |
| Returns: |
| bool: True if the operation has already been performed on the file |
| """ |
| log_content = read_file(LOG_FILE) |
| log_entry = f"{operation}: {filename}\n" |
| return log_entry in log_content |
|
|
|
|
| def log_operation(operation: str, filename: str) -> None: |
| """Log the file operation to the file_logger.txt |
| |
| Args: |
| operation (str): The operation to log |
| filename (str): The name of the file the operation was performed on |
| """ |
| log_entry = f"{operation}: {filename}\n" |
|
|
| |
| if not os.path.exists(LOG_FILE_PATH): |
| with open(LOG_FILE_PATH, "w", encoding="utf-8") as f: |
| f.write("File Operation Logger ") |
|
|
| append_to_file(LOG_FILE, log_entry, shouldLog=False) |
|
|
|
|
| def split_file( |
| content: str, max_length: int = 4000, overlap: int = 0 |
| ) -> Generator[str, None, None]: |
| """ |
| Split text into chunks of a specified maximum length with a specified overlap |
| between chunks. |
| |
| :param content: The input text to be split into chunks |
| :param max_length: The maximum length of each chunk, |
| default is 4000 (about 1k token) |
| :param overlap: The number of overlapping characters between chunks, |
| default is no overlap |
| :return: A generator yielding chunks of text |
| """ |
| start = 0 |
| content_length = len(content) |
|
|
| while start < content_length: |
| end = start + max_length |
| if end + overlap < content_length: |
| chunk = content[start : end + overlap - 1] |
| else: |
| chunk = content[start:content_length] |
|
|
| |
| if len(chunk) <= overlap: |
| break |
|
|
| yield chunk |
| start += max_length - overlap |
|
|
|
|
| def read_file(filename: str) -> str: |
| """Read a file and return the contents |
| |
| Args: |
| filename (str): The name of the file to read |
| |
| Returns: |
| str: The contents of the file |
| """ |
| try: |
| filepath = path_in_workspace(filename) |
| with open(filepath, "r", encoding="utf-8") as f: |
| content = f.read() |
| return content |
| except Exception as e: |
| return f"Error: {str(e)}" |
|
|
|
|
| def ingest_file( |
| filename: str, memory, max_length: int = 4000, overlap: int = 200 |
| ) -> None: |
| """ |
| Ingest a file by reading its content, splitting it into chunks with a specified |
| maximum length and overlap, and adding the chunks to the memory storage. |
| |
| :param filename: The name of the file to ingest |
| :param memory: An object with an add() method to store the chunks in memory |
| :param max_length: The maximum length of each chunk, default is 4000 |
| :param overlap: The number of overlapping characters between chunks, default is 200 |
| """ |
| try: |
| print(f"Working with file {filename}") |
| content = read_file(filename) |
| content_length = len(content) |
| print(f"File length: {content_length} characters") |
|
|
| chunks = list(split_file(content, max_length=max_length, overlap=overlap)) |
|
|
| num_chunks = len(chunks) |
| for i, chunk in enumerate(chunks): |
| print(f"Ingesting chunk {i + 1} / {num_chunks} into memory") |
| memory_to_add = ( |
| f"Filename: {filename}\n" f"Content part#{i + 1}/{num_chunks}: {chunk}" |
| ) |
|
|
| memory.add(memory_to_add) |
|
|
| print(f"Done ingesting {num_chunks} chunks from {filename}.") |
| except Exception as e: |
| print(f"Error while ingesting file '{filename}': {str(e)}") |
|
|
|
|
| def write_to_file(filename: str, text: str) -> str: |
| """Write text to a file |
| |
| Args: |
| filename (str): The name of the file to write to |
| text (str): The text to write to the file |
| |
| Returns: |
| str: A message indicating success or failure |
| """ |
| if check_duplicate_operation("write", filename): |
| return "Error: File has already been updated." |
| try: |
| filepath = path_in_workspace(filename) |
| directory = os.path.dirname(filepath) |
| if not os.path.exists(directory): |
| os.makedirs(directory) |
| with open(filepath, "w", encoding="utf-8") as f: |
| f.write(text) |
| log_operation("write", filename) |
| return "File written to successfully." |
| except Exception as e: |
| return f"Error: {str(e)}" |
|
|
|
|
| def append_to_file(filename: str, text: str, shouldLog: bool = True) -> str: |
| """Append text to a file |
| |
| Args: |
| filename (str): The name of the file to append to |
| text (str): The text to append to the file |
| |
| Returns: |
| str: A message indicating success or failure |
| """ |
| try: |
| filepath = path_in_workspace(filename) |
| with open(filepath, "a") as f: |
| f.write(text) |
|
|
| if shouldLog: |
| log_operation("append", filename) |
|
|
| return "Text appended successfully." |
| except Exception as e: |
| return f"Error: {str(e)}" |
|
|
|
|
| def delete_file(filename: str) -> str: |
| """Delete a file |
| |
| Args: |
| filename (str): The name of the file to delete |
| |
| Returns: |
| str: A message indicating success or failure |
| """ |
| if check_duplicate_operation("delete", filename): |
| return "Error: File has already been deleted." |
| try: |
| filepath = path_in_workspace(filename) |
| os.remove(filepath) |
| log_operation("delete", filename) |
| return "File deleted successfully." |
| except Exception as e: |
| return f"Error: {str(e)}" |
|
|
|
|
| def search_files(directory: str) -> list[str]: |
| """Search for files in a directory |
| |
| Args: |
| directory (str): The directory to search in |
| |
| Returns: |
| list[str]: A list of files found in the directory |
| """ |
| found_files = [] |
|
|
| if directory in {"", "/"}: |
| search_directory = WORKSPACE_PATH |
| else: |
| search_directory = path_in_workspace(directory) |
|
|
| for root, _, files in os.walk(search_directory): |
| for file in files: |
| if file.startswith("."): |
| continue |
| relative_path = os.path.relpath(os.path.join(root, file), WORKSPACE_PATH) |
| found_files.append(relative_path) |
|
|
| return found_files |
|
|
|
|
| def download_file(url, filename): |
| """Downloads a file |
| Args: |
| url (str): URL of the file to download |
| filename (str): Filename to save the file as |
| """ |
| safe_filename = path_in_workspace(filename) |
| try: |
| message = f"{Fore.YELLOW}Downloading file from {Back.LIGHTBLUE_EX}{url}{Back.RESET}{Fore.RESET}" |
| with Spinner(message) as spinner: |
| session = requests.Session() |
| retry = Retry(total=3, backoff_factor=1, status_forcelist=[502, 503, 504]) |
| adapter = HTTPAdapter(max_retries=retry) |
| session.mount("http://", adapter) |
| session.mount("https://", adapter) |
|
|
| total_size = 0 |
| downloaded_size = 0 |
|
|
| with session.get(url, allow_redirects=True, stream=True) as r: |
| r.raise_for_status() |
| total_size = int(r.headers.get("Content-Length", 0)) |
| downloaded_size = 0 |
|
|
| with open(safe_filename, "wb") as f: |
| for chunk in r.iter_content(chunk_size=8192): |
| f.write(chunk) |
| downloaded_size += len(chunk) |
|
|
| |
| progress = f"{readable_file_size(downloaded_size)} / {readable_file_size(total_size)}" |
| spinner.update_message(f"{message} {progress}") |
|
|
| return f'Successfully downloaded and locally stored file: "{filename}"! (Size: {readable_file_size(total_size)})' |
| except requests.HTTPError as e: |
| return f"Got an HTTP Error whilst trying to download file: {e}" |
| except Exception as e: |
| return "Error: " + str(e) |
|
|