Spaces:
Build error
Build error
File size: 2,684 Bytes
8a682b5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
"""
Tool implementations with proper registration
"""
from src.tools.base_tool import tool
from src.utils.logging import get_logger
logger = get_logger(__name__)
# File operations
@tool
def file_reader(file_path: str) -> str:
"""Read contents of a file"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
logger.error(f"Error reading file {file_path}: {e}")
return f"Error reading file: {str(e)}"
@tool
def file_writer(file_path: str, content: str) -> str:
"""Write content to a file"""
try:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return f"Successfully wrote to {file_path}"
except Exception as e:
logger.error(f"Error writing file {file_path}: {e}")
return f"Error writing file: {str(e)}"
# Web operations
@tool
async def web_researcher(query: str) -> str:
"""Research information on the web"""
# Implementation would use actual web search API
return f"Research results for: {query}"
# Code execution
@tool
def python_interpreter(code: str) -> str:
"""Execute Python code safely"""
import subprocess
import tempfile
try:
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code)
f.flush()
result = subprocess.run(
['python', f.name],
capture_output=True,
text=True,
timeout=30
)
return result.stdout or result.stderr
except Exception as e:
return f"Error executing code: {str(e)}"
# Media processing
@tool
async def audio_transcriber(audio_path: str) -> str:
"""Transcribe audio file to text"""
# Implementation would use actual transcription service
return f"Transcription of {audio_path}"
@tool
async def video_analyzer(video_path: str) -> Dict[str, Any]:
"""Analyze video content"""
# Implementation would use actual video analysis
return {
"duration": "unknown",
"resolution": "unknown",
"description": f"Analysis of {video_path}"
}
@tool
async def image_analyzer(image_path: str) -> Dict[str, Any]:
"""Analyze image content"""
# Implementation would use actual image analysis
return {
"format": "unknown",
"dimensions": "unknown",
"description": f"Analysis of {image_path}"
}
# Export all tools
__all__ = [
'file_reader',
'file_writer',
'web_researcher',
'python_interpreter',
'audio_transcriber',
'video_analyzer',
'image_analyzer'
] |