|
|
|
|
|
|
|
|
""" |
|
|
File System Tools Examples for EvoAgentX |
|
|
|
|
|
This module provides comprehensive examples for: |
|
|
- StorageToolkit: Comprehensive file storage operations with flexible storage backends |
|
|
- CMDToolkit: Command-line execution capabilities with timeout handling |
|
|
|
|
|
The examples demonstrate various file operations and system interactions including: |
|
|
- File save, read, append, list, delete, and existence checks |
|
|
- Command execution with working directory and timeout support |
|
|
- Cross-platform command handling |
|
|
- Storage handler management |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
sys.path.append(str(Path(__file__).parent.parent)) |
|
|
|
|
|
from evoagentx.tools import ( |
|
|
StorageToolkit, |
|
|
CMDToolkit |
|
|
) |
|
|
|
|
|
|
|
|
def run_file_tool_example(): |
|
|
""" |
|
|
Run an example using the StorageToolkit to read and write files with the new storage handler system. |
|
|
""" |
|
|
print("\n===== STORAGE TOOL EXAMPLE =====\n") |
|
|
|
|
|
try: |
|
|
|
|
|
storage_toolkit = StorageToolkit(name="DemoStorageToolkit") |
|
|
|
|
|
|
|
|
save_tool = storage_toolkit.get_tool("save") |
|
|
read_tool = storage_toolkit.get_tool("read") |
|
|
append_tool = storage_toolkit.get_tool("append") |
|
|
list_tool = storage_toolkit.get_tool("list_files") |
|
|
exists_tool = storage_toolkit.get_tool("exists") |
|
|
|
|
|
|
|
|
sample_text = """This is a sample text document created using the StorageToolkit. |
|
|
This tool provides comprehensive file operations with automatic format detection. |
|
|
It supports various file types including text, JSON, CSV, YAML, XML, Excel, and more.""" |
|
|
|
|
|
sample_json = { |
|
|
"name": "Sample Document", |
|
|
"type": "test", |
|
|
"content": "This is a JSON document for testing", |
|
|
"metadata": { |
|
|
"created": "2024-01-01", |
|
|
"version": "1.0" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
print("1. Testing file save operations...") |
|
|
|
|
|
|
|
|
text_result = save_tool( |
|
|
file_path="sample_document.txt", |
|
|
content=sample_text |
|
|
) |
|
|
print("Text file save result:") |
|
|
print("-" * 30) |
|
|
print(text_result) |
|
|
print("-" * 30) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
json_result = save_tool( |
|
|
file_path="sample_data.json", |
|
|
content=sample_json |
|
|
) |
|
|
print("JSON file save result:") |
|
|
print("-" * 30) |
|
|
print(json_result) |
|
|
print("-" * 30) |
|
|
|
|
|
|
|
|
print("\n2. Testing file read operations...") |
|
|
|
|
|
|
|
|
text_read_result = read_tool(file_path="sample_document.txt") |
|
|
print("Text file read result:") |
|
|
print("-" * 30) |
|
|
print(text_read_result) |
|
|
print("-" * 30) |
|
|
|
|
|
|
|
|
json_read_result = read_tool(file_path="sample_data.json") |
|
|
print("JSON file read result:") |
|
|
print("-" * 30) |
|
|
print(json_read_result) |
|
|
print("-" * 30) |
|
|
|
|
|
|
|
|
print("\n3. Testing file append operations...") |
|
|
|
|
|
|
|
|
append_text_result = append_tool( |
|
|
file_path="sample_document.txt", |
|
|
content="\n\nThis content was appended to the text file." |
|
|
) |
|
|
print("Text file append result:") |
|
|
print("-" * 30) |
|
|
print(append_text_result) |
|
|
print("-" * 30) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
updated_json_data = {**sample_json, "additional": "data", "timestamp": "2024-01-01T12:00:00Z"} |
|
|
update_json_result = save_tool( |
|
|
file_path="sample_data.json", |
|
|
content=updated_json_data |
|
|
) |
|
|
print("JSON file update result:") |
|
|
print("-" * 30) |
|
|
print(update_json_result) |
|
|
print("-" * 30) |
|
|
|
|
|
|
|
|
print("\n4. Testing file listing...") |
|
|
list_result = list_tool(path=".", max_depth=2, include_hidden=False) |
|
|
print("File listing result:") |
|
|
print("-" * 30) |
|
|
print(list_result) |
|
|
print("-" * 30) |
|
|
|
|
|
|
|
|
print("\n5. Testing file existence...") |
|
|
exists_result = exists_tool(path="sample_document.txt") |
|
|
print("File existence check result:") |
|
|
print("-" * 30) |
|
|
print(exists_result) |
|
|
print("-" * 30) |
|
|
|
|
|
|
|
|
print("\n6. Testing supported formats...") |
|
|
formats_tool = storage_toolkit.get_tool("list_supported_formats") |
|
|
formats_result = formats_tool() |
|
|
print("Supported formats result:") |
|
|
print("-" * 30) |
|
|
print(formats_result) |
|
|
print("-" * 30) |
|
|
|
|
|
print("\nβ StorageToolkit test completed successfully!") |
|
|
print("β All file operations working with default storage handler") |
|
|
print("β Automatic format detection working") |
|
|
print("β File operations working (including JSON updates)") |
|
|
print("β File listing and existence checks working") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error running storage tool example: {str(e)}") |
|
|
|
|
|
|
|
|
def run_cmd_tool_example(): |
|
|
"""Simple example using CMDToolkit for command line operations.""" |
|
|
print("\n===== CMD TOOL EXAMPLE =====\n") |
|
|
|
|
|
try: |
|
|
|
|
|
cmd_toolkit = CMDToolkit(name="DemoCMDToolkit") |
|
|
execute_tool = cmd_toolkit.get_tool("execute_command") |
|
|
|
|
|
print("β CMDToolkit initialized") |
|
|
|
|
|
|
|
|
print("1. Testing basic command execution...") |
|
|
result = execute_tool(command="echo 'Hello from CMD toolkit'") |
|
|
|
|
|
if result.get("success"): |
|
|
print("β Command executed successfully") |
|
|
print(f"Output: {result.get('stdout', 'No output')}") |
|
|
else: |
|
|
print(f"β Command failed: {result.get('error', 'Unknown error')}") |
|
|
|
|
|
|
|
|
print("\n2. Testing system information commands...") |
|
|
|
|
|
|
|
|
pwd_result = execute_tool(command="pwd") |
|
|
if pwd_result.get("success"): |
|
|
print(f"β Current directory: {pwd_result.get('stdout', '').strip()}") |
|
|
|
|
|
|
|
|
if os.name == 'posix': |
|
|
uname_result = execute_tool(command="uname -a") |
|
|
if uname_result.get("success"): |
|
|
print(f"β System info: {uname_result.get('stdout', '').strip()}") |
|
|
else: |
|
|
ver_result = execute_tool(command="ver") |
|
|
if ver_result.get("success"): |
|
|
print(f"β System info: {ver_result.get('stdout', '').strip()}") |
|
|
|
|
|
|
|
|
print("\n3. Testing file listing...") |
|
|
if os.name == 'posix': |
|
|
ls_result = execute_tool(command="ls -la", working_directory=".") |
|
|
else: |
|
|
ls_result = execute_tool(command="dir", working_directory=".") |
|
|
|
|
|
if ls_result.get("success"): |
|
|
print("β File listing successful") |
|
|
print(f"Output length: {len(ls_result.get('stdout', ''))} characters") |
|
|
else: |
|
|
print(f"β File listing failed: {ls_result.get('error', 'Unknown error')}") |
|
|
|
|
|
|
|
|
print("\n4. Testing command timeout...") |
|
|
timeout_result = execute_tool(command="sleep 5", timeout=12) |
|
|
if not timeout_result.get("success"): |
|
|
print("β Timeout working correctly (command was interrupted)") |
|
|
else: |
|
|
print("β Timeout may not be working as expected") |
|
|
|
|
|
print("\nβ CMDToolkit test completed") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error: {str(e)}") |
|
|
|
|
|
|
|
|
def run_storage_handler_examples(): |
|
|
""" |
|
|
Run examples demonstrating different storage handlers and configurations. |
|
|
""" |
|
|
print("\n===== STORAGE HANDLER EXAMPLES =====\n") |
|
|
|
|
|
try: |
|
|
|
|
|
print("1. Testing custom base path storage...") |
|
|
custom_storage_toolkit = StorageToolkit( |
|
|
name="CustomPathStorageToolkit", |
|
|
storage_handler=None, |
|
|
base_path="./custom_storage" |
|
|
) |
|
|
|
|
|
|
|
|
custom_save_tool = custom_storage_toolkit.get_tool("save") |
|
|
custom_result = custom_save_tool( |
|
|
file_path="custom_test.txt", |
|
|
content="This file is stored in a custom location" |
|
|
) |
|
|
|
|
|
if custom_result.get("success"): |
|
|
print("β Custom path storage working") |
|
|
print(f"File saved to: {custom_result.get('file_path')}") |
|
|
else: |
|
|
print(f"β Custom path storage failed: {custom_result.get('error')}") |
|
|
|
|
|
|
|
|
custom_read_tool = custom_storage_toolkit.get_tool("read") |
|
|
custom_read_result = custom_read_tool(file_path="custom_test.txt") |
|
|
|
|
|
if custom_read_result.get("success"): |
|
|
print("β Custom path file reading working") |
|
|
print(f"Content: {custom_read_result.get('content', '')[:50]}...") |
|
|
|
|
|
|
|
|
custom_list_tool = custom_storage_toolkit.get_tool("list_files") |
|
|
custom_list_result = custom_list_tool(path=".", max_depth=1, include_hidden=False) |
|
|
|
|
|
if custom_list_result.get("success"): |
|
|
print("β Custom path file listing working") |
|
|
files = custom_list_result.get("files", []) |
|
|
print(f"Found {len(files)} files in custom location") |
|
|
|
|
|
print("\nβ Storage handler examples completed") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error running storage handler examples: {str(e)}") |
|
|
|
|
|
|
|
|
def run_advanced_file_operations(): |
|
|
""" |
|
|
Run examples demonstrating advanced file operations and format handling. |
|
|
""" |
|
|
print("\n===== ADVANCED FILE OPERATIONS =====\n") |
|
|
|
|
|
try: |
|
|
|
|
|
storage_toolkit = StorageToolkit() |
|
|
save_tool = storage_toolkit.get_tool("save") |
|
|
read_tool = storage_toolkit.get_tool("read") |
|
|
|
|
|
|
|
|
print("1. Testing CSV file operations...") |
|
|
csv_content = """name,age,city |
|
|
John Doe,30,New York |
|
|
Jane Smith,25,Los Angeles |
|
|
Bob Johnson,35,Chicago""" |
|
|
|
|
|
csv_result = save_tool( |
|
|
file_path="sample_data.csv", |
|
|
content=csv_content |
|
|
) |
|
|
|
|
|
if csv_result.get("success"): |
|
|
print("β CSV file saved successfully") |
|
|
|
|
|
|
|
|
csv_read_result = read_tool(file_path="sample_data.csv") |
|
|
|
|
|
if csv_read_result.get("success"): |
|
|
print("β CSV file read successfully") |
|
|
print(f"Content: {csv_read_result.get('content', '')[:100]}...") |
|
|
else: |
|
|
print(f"β Failed to read CSV file: {csv_read_result.get('error')}") |
|
|
else: |
|
|
print(f"β Failed to save CSV file: {csv_result.get('error')}") |
|
|
|
|
|
|
|
|
print("\n2. Testing YAML file operations...") |
|
|
yaml_content = """name: Sample YAML |
|
|
version: 1.0 |
|
|
features: |
|
|
- feature1 |
|
|
- feature2 |
|
|
metadata: |
|
|
author: Test User |
|
|
date: 2024-01-01""" |
|
|
|
|
|
yaml_result = save_tool( |
|
|
file_path="sample_config.yaml", |
|
|
content=yaml_content |
|
|
) |
|
|
|
|
|
if yaml_result.get("success"): |
|
|
print("β YAML file saved successfully") |
|
|
|
|
|
|
|
|
yaml_read_result = read_tool(file_path="sample_config.yaml") |
|
|
|
|
|
if yaml_read_result.get("success"): |
|
|
print("β YAML file read successfully") |
|
|
print(f"Content: {yaml_read_result.get('content', '')[:100]}...") |
|
|
else: |
|
|
print(f"β Failed to read YAML file: {yaml_read_result.get('error')}") |
|
|
else: |
|
|
print(f"β Failed to save YAML file: {yaml_result.get('error')}") |
|
|
|
|
|
|
|
|
print("\n3. Testing PDF file operations...") |
|
|
|
|
|
|
|
|
pdf_content = """Test PDF Document |
|
|
|
|
|
This is a test PDF created by EvoAgentX. |
|
|
|
|
|
Features: |
|
|
β’ PDF creation from text |
|
|
β’ Automatic formatting |
|
|
β’ Professional layout |
|
|
|
|
|
This demonstrates the storage system's PDF capabilities.""" |
|
|
|
|
|
pdf_result = save_tool( |
|
|
file_path="test_pdf.pdf", |
|
|
content=pdf_content |
|
|
) |
|
|
|
|
|
if pdf_result.get("success"): |
|
|
print("β PDF file created successfully") |
|
|
else: |
|
|
print(f"β Failed to create PDF file: {pdf_result.get('error')}") |
|
|
|
|
|
|
|
|
pdf_read_result = read_tool(file_path="test_pdf.pdf") |
|
|
|
|
|
if pdf_read_result.get("success"): |
|
|
print("β PDF file read successfully") |
|
|
print(f"Content: {pdf_read_result.get('content', '')[:100]}...") |
|
|
else: |
|
|
print(f"β Failed to read PDF file: {pdf_read_result.get('error')}") |
|
|
|
|
|
|
|
|
print("\n4. Testing file deletion...") |
|
|
delete_tool = storage_toolkit.get_tool("delete") |
|
|
|
|
|
|
|
|
test_files = ["sample_document.txt", "sample_data.json", "custom_test.txt"] |
|
|
for test_file in test_files: |
|
|
if os.path.exists(test_file): |
|
|
delete_result = delete_tool(path=test_file) |
|
|
if delete_result.get("success"): |
|
|
print(f"β Deleted {test_file}") |
|
|
else: |
|
|
print(f"β Failed to delete {test_file}: {delete_result.get('error')}") |
|
|
|
|
|
print("\nβ Advanced file operations completed") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error running advanced file operations: {str(e)}") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main function to run all file system examples""" |
|
|
print("===== FILE SYSTEM TOOLS EXAMPLES =====") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
run_advanced_file_operations() |
|
|
|
|
|
print("\n===== ALL FILE SYSTEM EXAMPLES COMPLETED =====") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|