Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import uuid | |
| import traceback | |
| import logging | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, Any | |
| # Import the base intelligent agent. | |
| # Adjust this import as necessary if you move IntelligentAgent to a separate module. | |
| logger = logging.getLogger(__name__) | |
| from app.py import IntelligentAgent | |
| class FileManagementAgent(IntelligentAgent): | |
| def __init__(self, hub): | |
| super().__init__("file_management", hub) | |
| def process_task(self, task: str) -> Dict[str, Any]: | |
| logger.info(f"FileManagementAgent processing: {task}") | |
| task_lower = task.lower() | |
| if any(word in task_lower for word in ["create", "make", "generate", "write"]): | |
| operation = "create" | |
| elif any(word in task_lower for word in ["read", "open", "show", "display", "content"]): | |
| operation = "read" | |
| elif any(word in task_lower for word in ["list", "find", "directory", "folder", "files in"]): | |
| operation = "list" | |
| elif any(word in task_lower for word in ["delete", "remove"]): | |
| operation = "delete" | |
| else: | |
| operation = "unknown" | |
| filename = None | |
| file_extensions = ['.txt', '.json', '.csv', '.md', '.py', '.html', '.js', '.css'] | |
| words = task.split() | |
| for word in words: | |
| for ext in file_extensions: | |
| if ext in word.lower(): | |
| filename = word.strip(':"\'.,;') | |
| break | |
| if filename: | |
| break | |
| if not filename: | |
| file_keywords = ["file", "named", "called", "filename"] | |
| for i, word in enumerate(words): | |
| if word.lower() in file_keywords and i < len(words) - 1: | |
| potential_name = words[i+1].strip(':"\'.,;') | |
| if '.' not in potential_name: | |
| if "json" in task_lower: | |
| potential_name += ".json" | |
| elif "csv" in task_lower: | |
| potential_name += ".csv" | |
| elif "python" in task_lower or "py" in task_lower: | |
| potential_name += ".py" | |
| else: | |
| potential_name += ".txt" | |
| filename = potential_name | |
| break | |
| if not filename: | |
| if "json" in task_lower: | |
| filename = f"data_{uuid.uuid4().hex[:6]}.json" | |
| elif "csv" in task_lower: | |
| filename = f"data_{uuid.uuid4().hex[:6]}.csv" | |
| elif "python" in task_lower or "py" in task_lower: | |
| filename = f"script_{uuid.uuid4().hex[:6]}.py" | |
| elif "log" in task_lower: | |
| filename = f"log_{uuid.uuid4().hex[:6]}.txt" | |
| else: | |
| filename = f"file_{uuid.uuid4().hex[:6]}.txt" | |
| result = {} | |
| if operation == "create": | |
| if filename.endswith('.json'): | |
| content = json.dumps({ | |
| "name": "Sample Data", | |
| "description": task, | |
| "created": pd.Timestamp.now().isoformat(), | |
| "values": [1, 2, 3, 4, 5], | |
| "metadata": {"source": "FileManagementAgent", "version": "1.0"} | |
| }, indent=2) | |
| elif filename.endswith('.csv'): | |
| content = "id,name,value,timestamp\n" | |
| for i in range(5): | |
| content += f"{i+1},Item{i+1},{np.random.randint(1, 100)},{pd.Timestamp.now().isoformat()}\n" | |
| elif filename.endswith('.py'): | |
| content = f"""# Generated Python Script: {filename} | |
| # Created: {pd.Timestamp.now().isoformat()} | |
| # Description: {task} | |
| def main(): | |
| print("Hello from the FileManagementAgent!") | |
| data = [1, 2, 3, 4, 5] | |
| result = sum(data) | |
| print(f"Sample calculation: sum(data) = {{result}}") | |
| return result | |
| if __name__ == "__main__": | |
| main() | |
| """ | |
| else: | |
| content = f"File created by FileManagementAgent\nCreated: {pd.Timestamp.now().isoformat()}\nBased on request: {task}\n\nThis is sample content." | |
| try: | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| result = { | |
| "text": f"Successfully created file: {filename}", | |
| "operation": "create", | |
| "filename": filename, | |
| "size": len(content), | |
| "preview": content[:200] + "..." if len(content) > 200 else content | |
| } | |
| self.memory.add_short_term({"operation": "create", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()}) | |
| self.memory.add_long_term(f"file:{filename}", {"operation": "create", "type": Path(filename).suffix, "timestamp": pd.Timestamp.now().isoformat()}) | |
| except Exception as e: | |
| error_msg = f"Error creating file {filename}: {str(e)}" | |
| logger.error(error_msg) | |
| result = {"text": error_msg, "error": str(e)} | |
| elif operation == "read": | |
| if not filename: | |
| result = {"text": "Please specify a filename to read."} | |
| elif not Path(filename).exists(): | |
| result = {"text": f"File '{filename}' not found."} | |
| else: | |
| try: | |
| with open(filename, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| result = {"text": f"Content of {filename}:\n\n{content}", "operation": "read", "filename": filename, "content": content, "size": len(content)} | |
| self.memory.add_short_term({"operation": "read", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()}) | |
| except Exception as e: | |
| error_msg = f"Error reading file {filename}: {str(e)}" | |
| logger.error(error_msg) | |
| result = {"text": error_msg, "error": str(e)} | |
| elif operation == "list": | |
| try: | |
| directory = "." | |
| for term in ["directory", "folder", "in"]: | |
| if term in task_lower: | |
| parts = task_lower.split(term) | |
| if len(parts) > 1: | |
| potential_dir = parts[1].strip().split()[0].strip(':"\'.,;') | |
| if Path(potential_dir).exists() and Path(potential_dir).is_dir(): | |
| directory = potential_dir | |
| extension_filter = None | |
| for ext in file_extensions: | |
| if ext in task_lower: | |
| extension_filter = ext | |
| break | |
| files = list(Path(directory).glob('*' + (extension_filter or ''))) | |
| file_groups = {} | |
| for file in files: | |
| file_groups.setdefault(file.suffix, []).append({ | |
| "name": file.name, | |
| "size": file.stat().st_size, | |
| "modified": pd.Timestamp(file.stat().st_mtime, unit='s').isoformat() | |
| }) | |
| response_text = f"Found {len(files)} files" + (f" with extension {extension_filter}" if extension_filter else "") + f" in {directory}:\n\n" | |
| for ext, group in file_groups.items(): | |
| response_text += f"{ext} files ({len(group)}):\n" | |
| for file_info in sorted(group, key=lambda x: x["name"]): | |
| size_kb = file_info["size"] / 1024 | |
| response_text += f"- {file_info['name']} ({size_kb:.1f} KB, modified: {file_info['modified']})\n" | |
| response_text += "\n" | |
| result = {"text": response_text, "operation": "list", "directory": directory, "file_count": len(files), "files": file_groups} | |
| self.memory.add_short_term({"operation": "list", "directory": directory, "file_count": len(files), "timestamp": pd.Timestamp.now().isoformat()}) | |
| except Exception as e: | |
| error_msg = f"Error listing files: {str(e)}" | |
| logger.error(error_msg) | |
| result = {"text": error_msg, "error": str(e)} | |
| elif operation == "delete": | |
| if not filename: | |
| result = {"text": "Please specify a filename to delete."} | |
| elif not Path(filename).exists(): | |
| result = {"text": f"File '{filename}' not found."} | |
| else: | |
| try: | |
| os.remove(filename) | |
| result = {"text": f"Successfully deleted file: {filename}", "operation": "delete", "filename": filename} | |
| self.memory.add_short_term({"operation": "delete", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()}) | |
| self.memory.add_long_term(f"file:{filename}", {"operation": "delete", "timestamp": pd.Timestamp.now().isoformat()}) | |
| except Exception as e: | |
| error_msg = f"Error deleting file {filename}: {str(e)}" | |
| logger.error(error_msg) | |
| result = {"text": error_msg, "error": str(e)} | |
| else: | |
| result = {"text": f"Unknown operation requested in task: {task}"} | |
| return result | |