|
|
import os |
|
|
from langchain_core.tools import tool |
|
|
from typing import Annotated |
|
|
from typing_extensions import Annotated |
|
|
from langchain_core.tools.base import InjectedToolCallId |
|
|
from langchain_core.runnables import RunnableConfig |
|
|
from langgraph.types import Command |
|
|
from langchain_core.messages import ToolMessage |
|
|
from dotenv import load_dotenv |
|
|
import tempfile |
|
|
from pathlib import Path |
|
|
import yt_dlp |
|
|
from e2b_code_interpreter import Sandbox |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
@tool |
|
|
def execute_code(code: str, tool_call_id: Annotated[str, InjectedToolCallId], config: RunnableConfig) -> Command: |
|
|
""" |
|
|
Execute code in a e2b_code_interpreter sandbox and return the results. |
|
|
Args: |
|
|
code: The code to execute. Should be Python code without the triple backticks. |
|
|
""" |
|
|
result = _execute_code_in_sandbox(code, os.getenv("E2B_API_KEY")) |
|
|
|
|
|
formatted_result = f""" |
|
|
## Code |
|
|
```python |
|
|
{code} |
|
|
``` |
|
|
## Output |
|
|
``` |
|
|
{result['stdout']} |
|
|
``` |
|
|
## Errors |
|
|
``` |
|
|
{result['stderr']} |
|
|
``` |
|
|
""" |
|
|
|
|
|
external_information = f"{config.get('external_information', '')}\n---\n# Code Execution Results \n{formatted_result}" |
|
|
return Command( |
|
|
update={ |
|
|
"external_information": external_information, |
|
|
"messages": [ToolMessage(content=formatted_result, tool_call_id=tool_call_id)] |
|
|
} |
|
|
) |
|
|
|
|
|
def _execute_code_in_sandbox(code: str, api_key: str): |
|
|
"""Execute code in E2B sandbox and return the results.""" |
|
|
sbx = Sandbox() |
|
|
execution = sbx.run_code(code) |
|
|
|
|
|
files = sbx.files.list("/") |
|
|
|
|
|
return { |
|
|
"stdout": execution.logs.stdout, |
|
|
"stderr": execution.logs.stderr, |
|
|
"files": files |
|
|
} |
|
|
|
|
|
@tool |
|
|
def execute_python_file_url(file_url: str, tool_call_id: Annotated[str, InjectedToolCallId], config: RunnableConfig) -> Command: |
|
|
""" |
|
|
Download a python file from a given URL and get the result |
|
|
Args: |
|
|
file_url: The URL of the file to download. |
|
|
Returns: |
|
|
The content of the file as a string, or an error message if the file couldn't be downloaded |
|
|
""" |
|
|
sbx = Sandbox() |
|
|
file_name = "code.py" |
|
|
result = sbx.commands.run(f"wget -O {file_name} {file_url} && cat {file_name}") |
|
|
|
|
|
result_code = _execute_code_in_sandbox(result.stdout, os.getenv("E2B_API_KEY")) |
|
|
final_result = "" |
|
|
for value in result_code["stdout"]: |
|
|
final_result += value |
|
|
|
|
|
return Command( |
|
|
update={ |
|
|
"external_information": f"{config.get('external_information', '')}\n---\n# result {final_result}", |
|
|
"messages": [ToolMessage(content=final_result, tool_call_id=tool_call_id)] |
|
|
} |
|
|
) |
|
|
|
|
|
@tool |
|
|
def get_youtube_transcript(url: str, tool_call_id: Annotated[str, InjectedToolCallId] = None, config: RunnableConfig = None) -> Command | str: |
|
|
""" |
|
|
This tool extracts the transcript text from YouTube videos, returns the transcript as a string. |
|
|
Args: |
|
|
url: The YouTube video URL. |
|
|
Returns: |
|
|
The transcript as a string, or an error message if the transcript couldn't be obtained |
|
|
""" |
|
|
temp_dir = tempfile.mkdtemp() |
|
|
current_dir = os.getcwd() |
|
|
subtitle_content = "" |
|
|
|
|
|
try: |
|
|
os.chdir(temp_dir) |
|
|
|
|
|
ydl_opts = { |
|
|
'writesubtitles': True, |
|
|
'writeautomaticsub': True, |
|
|
'subtitleslangs': ['en'], |
|
|
'skip_download': True, |
|
|
'outtmpl': 'subtitle', |
|
|
'quiet': True, |
|
|
'no_warnings': False, |
|
|
'ignoreerrors': True, |
|
|
'geo_bypass': True, |
|
|
} |
|
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
ydl.extract_info(url, download=True) |
|
|
|
|
|
subtitle_files = list(Path(temp_dir).glob("*.vtt")) + list(Path(temp_dir).glob("*.srt")) |
|
|
|
|
|
if subtitle_files: |
|
|
with open(subtitle_files[0], 'r', encoding='utf-8') as f: |
|
|
subtitle_content = f.read() |
|
|
|
|
|
lines = subtitle_content.split('\n') |
|
|
cleaned_lines = [] |
|
|
for line in lines: |
|
|
if line.strip() and not line.strip().isdigit() and not '-->' in line and not line.startswith('WEBVTT'): |
|
|
cleaned_lines.append(line) |
|
|
subtitle_content = '\n '.join(cleaned_lines) |
|
|
else: |
|
|
subtitle_content = "Error: No subtitles found for this video." |
|
|
|
|
|
except Exception as e: |
|
|
subtitle_content = f"Error retrieving YouTube transcript: {str(e)}" |
|
|
finally: |
|
|
os.chdir(current_dir) |
|
|
|
|
|
try: |
|
|
for file in os.listdir(temp_dir): |
|
|
os.remove(os.path.join(temp_dir, file)) |
|
|
os.rmdir(temp_dir) |
|
|
except: |
|
|
pass |
|
|
|
|
|
external_information= f"{config.get('external_information', '')}\n---\n# Youtube transcript \n{subtitle_content}" |
|
|
return Command( |
|
|
update={ |
|
|
"external_information": external_information, |
|
|
"messages": [ToolMessage(content=subtitle_content, tool_call_id=tool_call_id)] |
|
|
} |
|
|
) |
|
|
|
|
|
""" if __name__ == "__main__": |
|
|
# Simple test: print "Hello World" |
|
|
url = "https://agents-course-unit4-scoring.hf.space/files/f918266a-b3e0-4914-865d-4faa564f1aef" |
|
|
|
|
|
# Build a minimal RunnableConfig with no external information |
|
|
config = RunnableConfig(**{"external_information": ""}) |
|
|
input = f"{url}" |
|
|
# Execute the test code |
|
|
# Call the underlying function to bypass the BaseTool wrapper |
|
|
cmd: Command = execute_python_file_url.func( |
|
|
input, |
|
|
"test-call", |
|
|
config, |
|
|
) """ |