Spaces:
Sleeping
Sleeping
| # tools.py | |
| import subprocess | |
| import requests | |
| import base64 | |
| from pathlib import Path | |
| import sys | |
| import json | |
| import pandas as pd | |
| from typing import List,Dict | |
| import io | |
| from sqlalchemy import create_engine, text | |
| import openai | |
| import wikipedia | |
| import numpy as np | |
| import tabula | |
| from PIL import Image | |
| import base64 | |
| from geopy.geocoders import Nominatim | |
| from playwright.async_api import async_playwright | |
| import asyncio | |
| from bs4 import BeautifulSoup | |
| import requests | |
| TOOL_DEFINITIONS = [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "fetch_url", | |
| "description": "Fetches the text content from a given URL. Use this for scraping websites or getting data from online sources.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "url": { | |
| "type": "string", | |
| "description": "The complete URL to fetch content from.", | |
| }, | |
| }, | |
| "required": ["url"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "python_interpreter", | |
| "description": ( | |
| "Executes Python code in an isolated environment for data analysis, manipulation, and visualization. " | |
| "The environment has pandas, matplotlib, numpy, and scikit-learn available. " | |
| "The code can access user-uploaded files directly by their filename (e.g., pd.read_csv('data.csv')). " | |
| "To return a plot, save it as 'output.png'. All print() output is captured as the result." | |
| ), | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "code": { | |
| "type": "string", | |
| "description": "The Python code to execute.", | |
| }, | |
| }, | |
| "required": ["code"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "get_dataframe_info", | |
| "description": "Reads a data file (like a .csv or .parquet) and returns a JSON summary including column names, data types, non-null counts, and descriptive statistics (mean, std, min, max, etc.). This is the best first step for understanding any dataset.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "file_path": { | |
| "type": "string", | |
| "description": "The filename of the data file to analyze (e.g., 'data.csv').", | |
| }, | |
| }, | |
| "required": ["file_path"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "calculate_correlation", | |
| "description": "Computes the Pearson correlation coefficient between two specific numerical columns in a given data file. The name of this function is `calculate_correlation`.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "file_path": {"type": "string", "description": "The filename of the data file (e.g., 'data.csv')."}, | |
| "column1": {"type": "string", "description": "The name of the first column."}, | |
| "column2": {"type": "string", "description": "The name of the second column."}, | |
| }, | |
| "required": ["file_path", "column1", "column2"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "create_pivot_table", | |
| "description": "Generates a pivot table to summarize data. This function takes a file and the names of the columns to use for the index, columns, and values of the pivot table. The name of this function is `create_pivot_table`.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "file_path": {"type": "string", "description": "The filename of the data file (e.g., 'data.csv')."}, | |
| "index": {"type": "string", "description": "The name of the column to use as the pivot table's index (rows)."}, | |
| "columns": {"type": "string", "description": "The name of the column to use as the pivot table's columns."}, | |
| "values": {"type": "string", "description": "The name of the column to aggregate as the values in the pivot table."}, | |
| }, | |
| "required": ["file_path", "index", "columns", "values"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "run_sql_query", | |
| "description": "Executes a SQL query against a database (like SQLite or DuckDB) and returns the result as JSON. The name of this function is `run_sql_query`.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "The SQL query to execute.", | |
| }, | |
| "db_connection_string": { | |
| "type": "string", | |
| "description": "The SQLAlchemy connection string for the database. For an uploaded SQLite file named 'my_db.db', use 'sqlite:///my_db.db'. For a DuckDB file, use 'duckdb:///my_db.duckdb'.", | |
| }, | |
| }, | |
| "required": ["query", "db_connection_string"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "get_sentiment", | |
| "description": "Analyzes a piece of text (like a movie review) to determine if its sentiment is positive, negative, or neutral. The name of this function is `get_sentiment`.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "text_to_analyze": { | |
| "type": "string", | |
| "description": "The text content to be analyzed.", | |
| }, | |
| }, | |
| "required": ["text_to_analyze"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "scrape_wikipedia_summary", | |
| "description": "Fetches the clean text summary from a Wikipedia page. Use this tool specifically for getting information from Wikipedia. The name of this function is `scrape_wikipedia_summary`.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "The title or search query for the Wikipedia page (e.g., 'Python (programming language)').", | |
| }, | |
| }, | |
| "required": ["query"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "scrape_pdf_tables", | |
| "description": "Extracts all tabular data from a PDF document and returns it as a list of JSON objects. Use this for any PDF that contains tables. The name of this function is `scrape_pdf_tables`.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "file_path": { | |
| "type": "string", | |
| "description": "The filename of the PDF file to process (e.g., 'report.pdf').", | |
| }, | |
| }, | |
| "required": ["file_path"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "analyze_image_content", | |
| "description": "Analyzes an uploaded image file (e.g., a PNG or JPG) and answers a specific question about its contents. Use this to identify objects, read text, or describe scenes in an image. The name of this function is `analyze_image_content`.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "image_path": {"type": "string", "description": "The filename of the image to analyze (e.g., 'chart.png')."}, | |
| "prompt": {"type": "string", "description": "The specific question to ask about the image (e.g., 'What is the title of this chart?', 'Is there a cat in this picture?')."}, | |
| }, | |
| "required": ["image_path", "prompt"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "geocode_address", | |
| "description": "Finds the geographic coordinates (latitude and longitude) for a given street address, city, or landmark. Uses the Nominatim service. The name of this function is `geocode_address`.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "address": { | |
| "type": "string", | |
| "description": "The address or place name to geocode (e.g., '1600 Amphitheatre Parkway, Mountain View, CA' or 'Tokyo Tower').", | |
| }, | |
| }, | |
| "required": ["address"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "scrape_dynamic_site", | |
| "description": | |
| "Renders a JavaScript-heavy website and saves the complete HTML to a file named 'scraped_page.html'. This is the first step in a two-step process. After calling this, use the 'parse_html' tool to extract specific data from the saved file. The name of this function is `scrape_dynamic_site`.", "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "url": {"type": "string", "description": "The URL of the dynamic website to scrape."}, | |
| }, | |
| "required": ["url"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "parse_html", | |
| "description": "Extracts specific data from an HTML file (like one saved by 'scrape_dynamic_site') using CSS selectors. Provide a dictionary where keys are desired data names and values are the CSS selectors to find that data. The name of this function is `parse_html`.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "file_path": {"type": "string", "description": "The local filename of the HTML file to parse (e.g., 'scraped_page.html')."}, | |
| "selectors": { | |
| "type": "object", | |
| "description": "A JSON object of 'data_name': 'css_selector' pairs. For example: {\"titles\": \"h2.product-title\", \"prices\": \".price-tag\"}", | |
| }, | |
| }, | |
| "required": ["file_path", "selectors"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "get_bbc_weather", | |
| "description": "Fetches the weather forecast for a location using its BBC Weather ID. Can provide a 3-day summary or a detailed hour-by-hour forecast. The name of this function is `get_bbc_weather`.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "location_id": { | |
| "type": "string", | |
| "description": "The numerical ID for the location (e.g., '2643743' for London).", | |
| }, | |
| "report_type": { | |
| "type": "string", | |
| "description": "The type of report to generate. Use 'summary' for a 3-day overview or 'detailed' for an hour-by-hour forecast.", | |
| "enum": ["summary", "detailed"], # 'enum' helps the LLM choose a valid option | |
| }, | |
| }, | |
| "required": ["location_id"], | |
| }, | |
| }, | |
| }, | |
| ] | |
| def get_bbc_weather(location_id: str, report_type: str = 'summary') -> str: | |
| """ | |
| Fetches the weather forecast for a given BBC Weather location ID. | |
| Can return a 'summary' (default) or a 'detailed' hour-by-hour report. | |
| """ | |
| print(f"Executing Tool 'get_bbc_weather' for ID: {location_id}, Type: {report_type}") | |
| url = f"https://weather-broker-cdn.api.bbci.co.uk/en/forecast/aggregated/{location_id}" | |
| try: | |
| response = requests.get(url, timeout=15) | |
| response.raise_for_status() | |
| weather_data = response.json() | |
| forecasts_data = weather_data.get("forecasts", []) | |
| if not forecasts_data: | |
| return "Error: Forecast data not found in the API response." | |
| report = forecasts_data[0] | |
| location_name = report.get("location", {}).get("name") | |
| # --- NEW LOGIC --- | |
| if report_type == 'detailed': | |
| # Extract the detailed, timeseries forecast | |
| detailed_forecast = { | |
| "location_name": location_name, | |
| "issued_at": report.get("issuedAt"), | |
| "detailed_forecast": [] | |
| } | |
| for slot in report.get("detailed", {}).get("reports", []): | |
| hour_summary = { | |
| "timestamp": slot.get("localDate"), | |
| "temperature_c": slot.get("temperatureC"), | |
| "feels_like_temp_c": slot.get("feelsLikeTempC"), | |
| "wind_speed_mph": slot.get("windSpeedMph"), | |
| "wind_direction": slot.get("windDirectionAbbreviation"), | |
| "precipitation_probability_percent": slot.get("precipitationProbabilityInPercent"), | |
| "weather_type": slot.get("weatherType") | |
| } | |
| detailed_forecast["detailed_forecast"].append(hour_summary) | |
| return json.dumps(detailed_forecast, indent=2) | |
| else: # Default to 'summary' | |
| # The existing summary logic | |
| summary_report = { | |
| "location_name": location_name, | |
| "issued_at": report.get("issuedAt"), | |
| "daily_summary": [] | |
| } | |
| for day in report.get("summary", {}).get("reports", []): | |
| day_summary = { | |
| "date": day.get("localDate"), | |
| "condition": day.get("weatherType"), | |
| "max_temp_c": day.get("maxTempC"), | |
| "min_temp_c": day.get("minTempC"), | |
| } | |
| summary_report["daily_summary"].append(day_summary) | |
| return json.dumps(summary_report, indent=2) | |
| except Exception as e: | |
| return f"An error occurred while processing weather data. Error: {e}" | |
| def parse_html(file_path: str, selectors: Dict[str, str], work_dir: str) -> str: | |
| """ | |
| Parses a local HTML file and extracts data using a dictionary of CSS selectors. | |
| For each key-value pair in the selectors dictionary, it finds elements matching | |
| the selector (value) and stores their text content under the given key. | |
| """ | |
| print(f"Executing Tool 'parse_html' for file: {file_path}") | |
| full_path = Path(work_dir) / file_path | |
| if not full_path.exists(): | |
| return f"Error: HTML file not found at {full_path}" | |
| try: | |
| with open(full_path, "r", encoding="utf-8") as f: | |
| html_content = f.read() | |
| soup = BeautifulSoup(html_content, "lxml") | |
| extracted_data = {} | |
| for data_key, selector in selectors.items(): | |
| # Find all elements matching the selector | |
| elements = soup.select(selector) | |
| # Extract the text from each element, stripping whitespace | |
| extracted_data[data_key] = [el.get_text(strip=True) for el in elements] | |
| return json.dumps(extracted_data, indent=2) | |
| except Exception as e: | |
| return f"Failed to parse HTML file {file_path}. Error: {e}" | |
| async def scrape_dynamic_site(url: str, work_dir: str) -> str: | |
| """ | |
| Renders a JavaScript-heavy website using a headless browser and saves the | |
| complete, final HTML to a file named 'scraped_page.html'. | |
| """ | |
| print(f"Executing Tool 'scrape_dynamic_site' for url: {url}") | |
| save_path = Path(work_dir) / "scraped_page.html" | |
| try: | |
| async with async_playwright() as p: | |
| browser = await p.chromium.launch() | |
| page = await browser.new_page() | |
| await page.goto(url, wait_until='networkidle', timeout=30000) # 30s timeout | |
| content = await page.content() | |
| await browser.close() | |
| # Save the full HTML content to the specified file | |
| with open(save_path, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| # Return a success message with the path to the saved file | |
| return json.dumps({ | |
| "status": "success", | |
| "url": url, | |
| "saved_to": str(save_path.name) # Return just the filename | |
| }) | |
| except Exception as e: | |
| return f"Failed to scrape dynamic site {url}. Error: {e}" | |
| def geocode_address(address: str) -> str: | |
| """ | |
| Converts a physical address or place name into geographic coordinates (latitude and longitude). | |
| """ | |
| print(f"Executing Tool 'geocode_address' for address: {address}") | |
| try: | |
| # Create a geolocator instance. A unique user_agent is good practice. | |
| geolocator = Nominatim(user_agent="data_analyst_agent_v1") | |
| location = geolocator.geocode(address) | |
| if location is None: | |
| return f"Error: Could not find coordinates for the address '{address}'." | |
| result = { | |
| "address": address, | |
| "latitude": location.latitude, | |
| "longitude": location.longitude, | |
| "full_address_found": location.address | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| return f"Failed to geocode address. Error: {e}" | |
| def analyze_image_content(image_path: str, prompt: str, work_dir: str, client: openai.Client) -> str: | |
| """ | |
| Analyzes the content of an image file using a multimodal LLM and answers a question about it. | |
| """ | |
| print(f"Executing Tool 'analyze_image_content' for file: {image_path}") | |
| full_path = Path(work_dir) / image_path | |
| if not full_path.exists(): | |
| return f"Error: Image file not found at {full_path}" | |
| try: | |
| # Open the image to verify it's a valid image file (optional but good practice) | |
| Image.open(full_path) | |
| # Encode the image to base64 | |
| with open(full_path, "rb") as image_file: | |
| base64_image = base64.b64encode(image_file.read()).decode('utf-8') | |
| # Call the multimodal model | |
| response = client.chat.completions.create( | |
| model="openai/gpt-4.1-nano", | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}, | |
| ], | |
| } | |
| ], | |
| max_tokens=500, # Allow for a reasonably detailed description | |
| ) | |
| description = response.choices[0].message.content | |
| return json.dumps({"image": image_path, "analysis": description}) | |
| except Exception as e: | |
| return f"Failed to analyze image. Error: {e}" | |
| def scrape_wikipedia_summary(query: str) -> str: | |
| """ | |
| Fetches the summary section of a Wikipedia page based on a search query. | |
| """ | |
| print(f"Executing Tool 'scrape_wikipedia_summary' for query: {query}") | |
| try: | |
| # Fetch the summary of the page | |
| summary = wikipedia.summary(query, auto_suggest=True) | |
| result = { | |
| "query": query, | |
| "summary": summary | |
| } | |
| return json.dumps(result, indent=2) | |
| except wikipedia.exceptions.PageError: | |
| return f"Error: Could not find a Wikipedia page for the query '{query}'." | |
| except wikipedia.exceptions.DisambiguationError as e: | |
| return f"Error: The query '{query}' is ambiguous. It could refer to any of the following: {e.options}" | |
| except Exception as e: | |
| return f"Failed to scrape Wikipedia. Error: {e}" | |
| def scrape_pdf_tables(file_path: str, work_dir: str) -> str: | |
| """ | |
| Extracts all tables from a specified page in a PDF file. | |
| """ | |
| print(f"Executing Tool 'scrape_pdf_tables' for file: {file_path}") | |
| full_path = Path(work_dir) / file_path | |
| if not full_path.exists(): | |
| return f"Error: PDF file not found at {full_path}" | |
| try: | |
| # read_pdf returns a list of DataFrames, one for each table found | |
| tables_as_dfs = tabula.read_pdf(full_path, pages='all', multiple_tables=True) | |
| if not tables_as_dfs: | |
| return "No tables were found in the PDF file." | |
| # Convert each DataFrame in the list to a JSON string | |
| tables_as_json = [df.to_json(orient='split') for df in tables_as_dfs] | |
| # Return a JSON object containing the list of tables | |
| return json.dumps({"file_name": file_path, "extracted_tables": tables_as_json}) | |
| except Exception as e: | |
| return f"Failed to scrape tables from PDF. Make sure Java is installed on the system. Error: {e}" | |
| def get_sentiment(text_to_analyze: str, client: openai.Client) -> str: | |
| """ | |
| Analyzes the sentiment of a given piece of text. | |
| """ | |
| print(f"Executing Tool 'get_sentiment'") | |
| try: | |
| # We use a specific, constrained prompt to force the LLM to be a classifier | |
| response = client.chat.completions.create( | |
| model="openai/gpt-5-nano", # Use a fast and cheap model for this simple task | |
| messages=[ | |
| {"role": "system", "content": "You are a sentiment analysis tool. Classify the user's text as 'positive', 'negative', or 'neutral'. Respond with only one of these three words and nothing else."}, | |
| {"role": "user", "content": text_to_analyze} | |
| ], | |
| max_tokens=5, # Limit the output to a single word | |
| temperature=0.0 # Make the output deterministic | |
| ) | |
| sentiment = response.choices[0].message.content.lower().strip() | |
| # Basic validation | |
| if sentiment not in ["positive", "negative", "neutral"]: | |
| return "Error: Could not determine a valid sentiment." | |
| return json.dumps({"text": text_to_analyze, "sentiment": sentiment}) | |
| except Exception as e: | |
| return f"Failed to get sentiment. Error: {e}" | |
| def run_sql_query(query: str, db_connection_string: str) -> str: | |
| """ | |
| Executes a SQL query against a specified database and returns the result. | |
| Supports file-based databases like SQLite and DuckDB. | |
| For SQLite, the connection string should be 'sqlite:///path/to/database.db'. | |
| The path should be relative to the agent's working directory. | |
| """ | |
| print(f"Executing Tool 'run_sql_query'") | |
| try: | |
| # Create a database engine from the connection string | |
| engine = create_engine(db_connection_string) | |
| # Execute the query and fetch results into a pandas DataFrame | |
| with engine.connect() as connection: | |
| result_df = pd.read_sql_query(sql=text(query), con=connection) | |
| # Return the result as a JSON string | |
| return result_df.to_json(orient="records") | |
| except Exception as e: | |
| return f"Failed to execute SQL query. Error: {e}" | |
| def calculate_correlation(file_path: str, column1: str, column2: str, work_dir: str) -> str: | |
| """ | |
| Calculates the Pearson correlation coefficient between two specified columns in a data file. | |
| """ | |
| print(f"Executing Tool 'calculate_correlation' for file: {file_path}") | |
| full_path = Path(work_dir) / file_path | |
| if not full_path.exists(): | |
| return f"Error: Data file not found at {full_path}" | |
| try: | |
| if file_path.lower().endswith('.csv'): | |
| df = pd.read_csv(full_path) | |
| elif file_path.lower().endswith('.parquet'): | |
| df = pd.read_parquet(full_path) | |
| else: | |
| return f"Error: Unsupported file type." | |
| # Ensure columns exist | |
| if column1 not in df.columns or column2 not in df.columns: | |
| return f"Error: One or both columns ('{column1}', '{column2}') not found in the file." | |
| # Calculate correlation | |
| correlation = df[column1].corr(df[column2]) | |
| result = { | |
| "file_name": file_path, | |
| "column_1": column1, | |
| "column_2": column2, | |
| "pearson_correlation": correlation | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| return f"Failed to calculate correlation. Error: {e}" | |
| def create_pivot_table(file_path: str, index: str, columns: str, values: str, work_dir: str) -> str: | |
| """ | |
| Creates a pivot table from the data in the specified file. | |
| """ | |
| print(f"Executing Tool 'create_pivot_table' for file: {file_path}") | |
| full_path = Path(work_dir) / file_path | |
| if not full_path.exists(): | |
| return f"Error: Data file not found at {full_path}" | |
| try: | |
| if file_path.lower().endswith('.csv'): | |
| df = pd.read_csv(full_path) | |
| elif file_path.lower().endswith('.parquet'): | |
| df = pd.read_parquet(full_path) | |
| else: | |
| return f"Error: Unsupported file type." | |
| # Create the pivot table | |
| pivot_table = pd.pivot_table(df, values=values, index=index, columns=columns, aggfunc=np.sum) | |
| # Return the pivot table as a JSON string | |
| return pivot_table.to_json(orient="split") | |
| except Exception as e: | |
| return f"Failed to create pivot table. Error: {e}" | |
| def get_dataframe_info(file_path: str, work_dir: str) -> str: | |
| """ | |
| Reads a data file (CSV, Parquet) and returns a summary of its contents. | |
| The summary includes column names, data types, and basic statistics. | |
| """ | |
| print(f"Executing Tool 'get_dataframe_info' for file: {file_path}") | |
| full_path = Path(work_dir) / file_path | |
| if not full_path.exists(): | |
| return f"Error: Data file not found at {full_path}" | |
| try: | |
| if file_path.lower().endswith('.csv'): | |
| df = pd.read_csv(full_path) | |
| elif file_path.lower().endswith('.parquet'): | |
| df = pd.read_parquet(full_path) | |
| else: | |
| return f"Error: Unsupported file type. Only .csv and .parquet are supported." | |
| # Use a string buffer to capture the output of df.info() | |
| info_buffer = io.StringIO() | |
| df.info(buf=info_buffer) | |
| info_str = info_buffer.getvalue() | |
| # Get the statistical summary | |
| describe_df = df.describe(include='all') | |
| # Combine everything into a single, informative string | |
| summary = { | |
| "file_name": file_path, | |
| "info": info_str, | |
| "statistical_summary": describe_df.to_json(orient="split") | |
| } | |
| return json.dumps(summary, indent=2) | |
| except Exception as e: | |
| return f"Failed to get DataFrame info. Error: {e}" | |
| def fetch_url(url: str) -> str: | |
| """Fetches text content from a specified URL using the AI Pipe proxy.""" | |
| print(f"Executing Tool 'fetch_url' with URL: {url}") | |
| try: | |
| proxy_url = f"https://aipipe.org/proxy/{url}" | |
| response = requests.get(proxy_url, timeout=30) | |
| response.raise_for_status() | |
| return response.text | |
| except requests.RequestException as e: | |
| return f"Error: Failed to fetch URL {url}. Reason: {e}" | |
| def python_interpreter(code: str, work_dir: str) -> str: | |
| """ | |
| Executes Python code in a sandboxed subprocess within a specific working directory. | |
| The code can access any files within its `work_dir`. | |
| If the code generates 'output.png', it will be base64 encoded and returned. | |
| """ | |
| python_executable = sys.executable | |
| print(f"Executing Tool 'python_interpreter' in directory: {work_dir}") | |
| work_path = Path(work_dir) | |
| script_path = work_path / "agent_script.py" | |
| plot_path = work_path / "output.png" | |
| with open(script_path, "w") as f: | |
| f.write(code) | |
| print("\n\n--- 📜 DECODING: SCRIPT TO EXECUTE 📜 ---") | |
| print(code) | |
| print("------------------------------------------\n") | |
| try: | |
| python_executable = sys.executable | |
| # +++ ADD THIS DEBUG LINE +++ | |
| print(f"--- [DEBUG] EXECUTING SUBPROCESS WITH PYTHON FROM: {python_executable} ---") | |
| # +++++++++++++++++++++++++++ | |
| process = subprocess.run( | |
| [python_executable, str(script_path)], | |
| cwd=work_path, # Run the script from within the temp directory | |
| capture_output=True, | |
| text=True, | |
| timeout=1000, | |
| check=False | |
| ) | |
| print("\n\n--- 📤 DECODING: SCRIPT RAW OUTPUT 📤 ---") | |
| print(f"Return Code: {process.returncode}") | |
| print("--- STDOUT ---") | |
| print(process.stdout) | |
| print("--- STDERR ---") | |
| print(process.stderr) | |
| print("------------------------------------------\n") | |
| if process.returncode != 0: | |
| return f"SCRIPT FAILED with return code {process.returncode}:\nSTDOUT:\n{process.stdout}\nSTDERR:\n{process.stderr}" | |
| stdout = process.stdout | |
| # Check if a plot was generated as output.png | |
| if plot_path.exists(): | |
| with open(plot_path, "rb") as img_file: | |
| img_base64 = base64.b64encode(img_file.read()).decode('utf-8') | |
| # Prepend the plot's data URI to the stdout | |
| plot_uri = f"data:image/png;base64,{img_base64}" | |
| return f"image_output:\n{plot_uri}\n\ntext_output:\n{stdout}" | |
| # If successful, just return the standard output | |
| return process.stdout | |
| except subprocess.CalledProcessError as e: | |
| return f"SCRIPT FAILED:\n--- STDOUT ---\n{e.stdout}\n--- STDERR ---\n{e.stderr}" | |
| except subprocess.TimeoutExpired: | |
| return "Error: The Python script took too long to execute." | |
| except Exception as e: | |
| return f"An unexpected error occurred: {e}" | |