Spaces:
Sleeping
Sleeping
| from googlesearch import search as google_search | |
| from smolagents import tool, Tool | |
| from curl_scraper import BasicScraper | |
| from huggingface_hub import hf_hub_download, upload_folder | |
| import os | |
| class FileReader(Tool): | |
| name = "read_file" | |
| description = "Reads a file and returns the contents as a string." | |
| inputs = {"file": {"type": "string", "description": "The absolute path to the file to be read."}} | |
| output_type = "string" | |
| def __init__(self, space_id, folder_path, **kwargs): | |
| super().__init__() | |
| self.space_id = space_id | |
| self.folder_path = folder_path | |
| def forward(self, file: str) -> str: | |
| file_path = os.path.join(self.folder_path, file) | |
| if not os.path.exists(file_path): | |
| hf_hub_download(repo_id=self.space_id, filename=file, repo_type="space", local_dir=self.folder_path, local_dir_use_symlinks=False) | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| return content | |
| class FileWriter(Tool): | |
| name = "write_file" | |
| description = "Overwrite or append content to a file. Use for creating new files, appending content, or modifying existing files." | |
| inputs = { | |
| "file": {"type": "string", "description": "The absolute path to the file to be written."}, | |
| "content": {"type": "string", "description": "The content to be written to the file."}, | |
| "append": {"type": "boolean", "description": "Whether to append to the file or overwrite it.", "nullable": True}, | |
| } | |
| output_type = "string" | |
| def __init__(self, space_id, folder_path, **kwargs): | |
| super().__init__() | |
| self.space_id = space_id | |
| self.folder_path = folder_path | |
| def forward(self, file: str, content: str, append: bool = False) -> str: | |
| file_path = os.path.join(self.folder_path, file) | |
| mode = 'a' if append else 'w' | |
| with open(file_path, mode, encoding='utf-8') as f: | |
| f.write(content if append else content.strip()) | |
| return f"Successfully {'appended to' if append else 'wrote to'} file at {file_path}" | |
| def search(query: str) -> list: | |
| """ | |
| Search for web pages and scrape their content. | |
| Args: | |
| query (str): The search query. | |
| Returns: | |
| list: List of top web results including url, title, description from the search results. | |
| """ | |
| # Get search results | |
| results = [] | |
| for result in google_search(query, advanced=True, unique=True): | |
| results.append(result) | |
| return results | |
| def scraper(url: str, response_type: str = 'plain_text') -> str: | |
| """ | |
| Scrapes the content of a web page. Important: Don't use regex to extract information from the output of a this scraper tool, Think more and find answer yourself. | |
| Args: | |
| url (str): The URL of the web page to scrape. | |
| response_type (str): The type of response to return. Valid options are 'markdown' and 'plain_text'. Default is 'plain_text'. | |
| Returns: | |
| str: The content of the web page in the specified format. | |
| """ | |
| response = BasicScraper(timeout=5).get(url) | |
| if response_type == 'markdown': | |
| return response.markdown | |
| elif response_type == 'plain_text': | |
| return response.plain_text | |
| else: | |
| raise ValueError("Invalid response_type. Use 'markdown' or 'plain_text'.") | |
| class CommitChanges(Tool): | |
| name = "commit_changes" | |
| description = "Commits changes to a repository." | |
| inputs = { | |
| "commit_message": {"type": "string", "description": "The commit message."}, | |
| "commit_description": {"type": "string", "description": "The commit description."}, | |
| } | |
| output_type = "string" | |
| def __init__(self, space_id, folder_path, **kwargs): | |
| super().__init__() | |
| self.space_id = space_id | |
| self.folder_path = folder_path | |
| def forward(self, commit_message: str, commit_description: str) -> str: | |
| try: | |
| upload_folder( | |
| folder_path=self.folder_path, | |
| repo_id=self.space_id, | |
| repo_type="space", | |
| create_pr=True, | |
| commit_message=commit_message, | |
| commit_description=commit_description, | |
| ) | |
| return "Changes committed successfully." | |
| except Exception as e: | |
| return f"Error committing changes: {str(e)}" | |
| from huggingface_hub import HfApi | |
| import shutil | |
| import os | |
| import fnmatch | |
| from typing import List, Dict, Any, Optional, Union | |
| from pathlib import Path | |
| class FileMover(Tool): | |
| name = "move_file" | |
| description = "Move or rename a file or directory." | |
| inputs = { | |
| "source": {"type": "string", "description": "Source file or directory path"}, | |
| "destination": {"type": "string", "description": "Destination path"} | |
| } | |
| output_type = "string" | |
| def __init__(self, space_id, folder_path, **kwargs): | |
| super().__init__() | |
| self.space_id = space_id | |
| self.folder_path = folder_path | |
| def forward(self, source: str, destination: str) -> str: | |
| src_path = os.path.join(self.folder_path, source) | |
| dst_path = os.path.join(self.folder_path, destination) | |
| if not os.path.exists(src_path): | |
| return f"Error: Source path does not exist: {src_path}" | |
| try: | |
| shutil.move(src_path, dst_path) | |
| return f"Successfully moved {src_path} to {dst_path}" | |
| except Exception as e: | |
| return f"Error moving {src_path}: {str(e)}" | |
| class FileCopier(Tool): | |
| name = "copy_file" | |
| description = "Copy a file or directory." | |
| inputs = { | |
| "source": {"type": "string", "description": "Source file or directory path"}, | |
| "destination": {"type": "string", "description": "Destination path"} | |
| } | |
| output_type = "string" | |
| def __init__(self, space_id, folder_path, **kwargs): | |
| super().__init__() | |
| self.space_id = space_id | |
| self.folder_path = folder_path | |
| def forward(self, source: str, destination: str) -> str: | |
| src_path = os.path.join(self.folder_path, source) | |
| dst_path = os.path.join(self.folder_path, destination) | |
| if not os.path.exists(src_path): | |
| return f"Error: Source path does not exist: {src_path}" | |
| try: | |
| if os.path.isdir(src_path): | |
| shutil.copytree(src_path, dst_path) | |
| else: | |
| shutil.copy2(src_path, dst_path) | |
| return f"Successfully copied {src_path} to {dst_path}" | |
| except Exception as e: | |
| return f"Error copying {src_path}: {str(e)}" | |
| class FileDeleter(Tool): | |
| name = "delete_file" | |
| description = "Delete a file or directory." | |
| inputs = { | |
| "path": {"type": "string", "description": "Path to file or directory to delete"} | |
| } | |
| output_type = "string" | |
| def __init__(self, space_id, folder_path, **kwargs): | |
| super().__init__() | |
| self.space_id = space_id | |
| self.folder_path = folder_path | |
| def forward(self, path: str) -> str: | |
| full_path = os.path.join(self.folder_path, path) | |
| if not os.path.exists(full_path): | |
| return f"Error: Path does not exist: {full_path}" | |
| try: | |
| if os.path.isdir(full_path): | |
| shutil.rmtree(full_path) | |
| else: | |
| os.remove(full_path) | |
| return f"Successfully deleted: {full_path}" | |
| except Exception as e: | |
| return f"Error deleting {full_path}: {str(e)}" | |
| class FileSearcher(Tool): | |
| name = "search_files" | |
| description = "Search for text within files in the repository." | |
| inputs = { | |
| "search_term": {"type": "string", "description": "Text to search for"}, | |
| "file_pattern": {"type": "string", "description": "File pattern to search in (e.g., '*.txt' for all .txt files) default is '*'", "nullable": True}, | |
| "case_sensitive": {"type": "boolean", "description": "Whether the search should be case sensitive default is False", "nullable": True} | |
| } | |
| output_type = "array" | |
| def __init__(self, space_id, folder_path, **kwargs): | |
| super().__init__() | |
| self.space_id = space_id | |
| self.folder_path = folder_path | |
| def _find_files(self, pattern: str) -> List[str]: | |
| """Find all files matching the pattern in the repository.""" | |
| matches = [] | |
| for root, _, filenames in os.walk(self.folder_path): | |
| for filename in fnmatch.filter(filenames, pattern): | |
| matches.append(os.path.join(root, filename)) | |
| return matches | |
| def _search_in_file(self, file_path: str, search_term: str, case_sensitive: bool) -> List[Dict[str, Any]]: | |
| """Search for the term in a single file.""" | |
| matches = [] | |
| try: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| for line_num, line in enumerate(f, 1): | |
| if not case_sensitive: | |
| if search_term.lower() in line.lower(): | |
| matches.append({ | |
| 'file': os.path.relpath(file_path, self.folder_path), | |
| 'line': line_num, | |
| 'content': line.strip() | |
| }) | |
| else: | |
| if search_term in line: | |
| matches.append({ | |
| 'file': os.path.relpath(file_path, self.folder_path), | |
| 'line': line_num, | |
| 'content': line.strip() | |
| }) | |
| except Exception as e: | |
| return [{'file': file_path, 'error': f"Error reading file: {str(e)}"}] | |
| return matches | |
| def forward(self, search_term: str, file_pattern: str = "*", case_sensitive: bool = False) -> List[Dict[str, Any]]: | |
| if not search_term.strip(): | |
| return [{"error": "Search term cannot be empty"}] | |
| try: | |
| matching_files = self._find_files(file_pattern) | |
| if not matching_files: | |
| return [{"message": f"No files found matching pattern: {file_pattern}"}] | |
| results = [] | |
| for file_path in matching_files: | |
| file_matches = self._search_in_file(file_path, search_term, case_sensitive) | |
| if file_matches: | |
| results.extend(file_matches) | |
| return results if results else [{"message": "No matches found"}] | |
| except Exception as e: | |
| return [{"error": f"Error during search: {str(e)}"}] | |
| def get_repository_structure(space_id: str) -> str: | |
| api = HfApi() | |
| space_info = api.space_info(space_id) | |
| printed_dirs = set() | |
| output_lines = [] | |
| sorted_siblings = sorted(space_info.siblings, key=lambda x: x.rfilename) | |
| for sibling in sorted_siblings: | |
| rfilename = sibling.rfilename | |
| path_parts = rfilename.split('/') | |
| current_dir_path_parts = [] | |
| for i in range(len(path_parts) - 1): | |
| dir_name = path_parts[i] | |
| current_dir_path_parts.append(dir_name) | |
| dir_path = '/'.join(current_dir_path_parts) | |
| if dir_path not in printed_dirs: | |
| depth = i | |
| indent = ' ' * depth | |
| output_lines.append(f"{indent}{dir_name}") | |
| printed_dirs.add(dir_path) | |
| file_name = path_parts[-1] | |
| file_indent_depth = len(path_parts) - 1 | |
| file_indent = ' ' * file_indent_depth | |
| if file_indent_depth == 0: | |
| output_lines.append(file_name) | |
| else: | |
| is_last_file = sibling == sorted_siblings[-1] or not any( | |
| s.rfilename.startswith('/'.join(path_parts[:-1]) + '/') | |
| for s in sorted_siblings[sorted_siblings.index(sibling)+1:] | |
| ) | |
| file_prefix = 'ββ ' if is_last_file else 'ββ ' | |
| output_lines.append(f"{file_indent}{file_prefix}{file_name}") | |
| return '\n'.join(output_lines) |