Spaces:
Sleeping
Sleeping
File size: 30,377 Bytes
c0897c1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 |
# tools.py
import subprocess
import requests
import base64
from pathlib import Path
import sys
import json
import pandas as pd
from typing import List,Dict
import io
from sqlalchemy import create_engine, text
import openai
import wikipedia
import numpy as np
import tabula
from PIL import Image
import base64
from geopy.geocoders import Nominatim
from playwright.async_api import async_playwright
import asyncio
from bs4 import BeautifulSoup
import requests
TOOL_DEFINITIONS = [
{
"type": "function",
"function": {
"name": "fetch_url",
"description": "Fetches the text content from a given URL. Use this for scraping websites or getting data from online sources.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The complete URL to fetch content from.",
},
},
"required": ["url"],
},
},
},
{
"type": "function",
"function": {
"name": "python_interpreter",
"description": (
"Executes Python code in an isolated environment for data analysis, manipulation, and visualization. "
"The environment has pandas, matplotlib, numpy, and scikit-learn available. "
"The code can access user-uploaded files directly by their filename (e.g., pd.read_csv('data.csv')). "
"To return a plot, save it as 'output.png'. All print() output is captured as the result."
),
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "The Python code to execute.",
},
},
"required": ["code"],
},
},
},
{
"type": "function",
"function": {
"name": "get_dataframe_info",
"description": "Reads a data file (like a .csv or .parquet) and returns a JSON summary including column names, data types, non-null counts, and descriptive statistics (mean, std, min, max, etc.). This is the best first step for understanding any dataset.",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The filename of the data file to analyze (e.g., 'data.csv').",
},
},
"required": ["file_path"],
},
},
},
{
"type": "function",
"function": {
"name": "calculate_correlation",
"description": "Computes the Pearson correlation coefficient between two specific numerical columns in a given data file. The name of this function is `calculate_correlation`.",
"parameters": {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "The filename of the data file (e.g., 'data.csv')."},
"column1": {"type": "string", "description": "The name of the first column."},
"column2": {"type": "string", "description": "The name of the second column."},
},
"required": ["file_path", "column1", "column2"],
},
},
},
{
"type": "function",
"function": {
"name": "create_pivot_table",
"description": "Generates a pivot table to summarize data. This function takes a file and the names of the columns to use for the index, columns, and values of the pivot table. The name of this function is `create_pivot_table`.",
"parameters": {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "The filename of the data file (e.g., 'data.csv')."},
"index": {"type": "string", "description": "The name of the column to use as the pivot table's index (rows)."},
"columns": {"type": "string", "description": "The name of the column to use as the pivot table's columns."},
"values": {"type": "string", "description": "The name of the column to aggregate as the values in the pivot table."},
},
"required": ["file_path", "index", "columns", "values"],
},
},
},
{
"type": "function",
"function": {
"name": "run_sql_query",
"description": "Executes a SQL query against a database (like SQLite or DuckDB) and returns the result as JSON. The name of this function is `run_sql_query`.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SQL query to execute.",
},
"db_connection_string": {
"type": "string",
"description": "The SQLAlchemy connection string for the database. For an uploaded SQLite file named 'my_db.db', use 'sqlite:///my_db.db'. For a DuckDB file, use 'duckdb:///my_db.duckdb'.",
},
},
"required": ["query", "db_connection_string"],
},
},
},
{
"type": "function",
"function": {
"name": "get_sentiment",
"description": "Analyzes a piece of text (like a movie review) to determine if its sentiment is positive, negative, or neutral. The name of this function is `get_sentiment`.",
"parameters": {
"type": "object",
"properties": {
"text_to_analyze": {
"type": "string",
"description": "The text content to be analyzed.",
},
},
"required": ["text_to_analyze"],
},
},
},
{
"type": "function",
"function": {
"name": "scrape_wikipedia_summary",
"description": "Fetches the clean text summary from a Wikipedia page. Use this tool specifically for getting information from Wikipedia. The name of this function is `scrape_wikipedia_summary`.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The title or search query for the Wikipedia page (e.g., 'Python (programming language)').",
},
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "scrape_pdf_tables",
"description": "Extracts all tabular data from a PDF document and returns it as a list of JSON objects. Use this for any PDF that contains tables. The name of this function is `scrape_pdf_tables`.",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The filename of the PDF file to process (e.g., 'report.pdf').",
},
},
"required": ["file_path"],
},
},
},
{
"type": "function",
"function": {
"name": "analyze_image_content",
"description": "Analyzes an uploaded image file (e.g., a PNG or JPG) and answers a specific question about its contents. Use this to identify objects, read text, or describe scenes in an image. The name of this function is `analyze_image_content`.",
"parameters": {
"type": "object",
"properties": {
"image_path": {"type": "string", "description": "The filename of the image to analyze (e.g., 'chart.png')."},
"prompt": {"type": "string", "description": "The specific question to ask about the image (e.g., 'What is the title of this chart?', 'Is there a cat in this picture?')."},
},
"required": ["image_path", "prompt"],
},
},
},
{
"type": "function",
"function": {
"name": "geocode_address",
"description": "Finds the geographic coordinates (latitude and longitude) for a given street address, city, or landmark. Uses the Nominatim service. The name of this function is `geocode_address`.",
"parameters": {
"type": "object",
"properties": {
"address": {
"type": "string",
"description": "The address or place name to geocode (e.g., '1600 Amphitheatre Parkway, Mountain View, CA' or 'Tokyo Tower').",
},
},
"required": ["address"],
},
},
},
{
"type": "function",
"function": {
"name": "scrape_dynamic_site",
"description":
"Renders a JavaScript-heavy website and saves the complete HTML to a file named 'scraped_page.html'. This is the first step in a two-step process. After calling this, use the 'parse_html' tool to extract specific data from the saved file. The name of this function is `scrape_dynamic_site`.", "parameters": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "The URL of the dynamic website to scrape."},
},
"required": ["url"],
},
},
},
{
"type": "function",
"function": {
"name": "parse_html",
"description": "Extracts specific data from an HTML file (like one saved by 'scrape_dynamic_site') using CSS selectors. Provide a dictionary where keys are desired data names and values are the CSS selectors to find that data. The name of this function is `parse_html`.",
"parameters": {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "The local filename of the HTML file to parse (e.g., 'scraped_page.html')."},
"selectors": {
"type": "object",
"description": "A JSON object of 'data_name': 'css_selector' pairs. For example: {\"titles\": \"h2.product-title\", \"prices\": \".price-tag\"}",
},
},
"required": ["file_path", "selectors"],
},
},
},
{
"type": "function",
"function": {
"name": "get_bbc_weather",
"description": "Fetches the weather forecast for a location using its BBC Weather ID. Can provide a 3-day summary or a detailed hour-by-hour forecast. The name of this function is `get_bbc_weather`.",
"parameters": {
"type": "object",
"properties": {
"location_id": {
"type": "string",
"description": "The numerical ID for the location (e.g., '2643743' for London).",
},
"report_type": {
"type": "string",
"description": "The type of report to generate. Use 'summary' for a 3-day overview or 'detailed' for an hour-by-hour forecast.",
"enum": ["summary", "detailed"], # 'enum' helps the LLM choose a valid option
},
},
"required": ["location_id"],
},
},
},
]
def get_bbc_weather(location_id: str, report_type: str = 'summary') -> str:
"""
Fetches the weather forecast for a given BBC Weather location ID.
Can return a 'summary' (default) or a 'detailed' hour-by-hour report.
"""
print(f"Executing Tool 'get_bbc_weather' for ID: {location_id}, Type: {report_type}")
url = f"https://weather-broker-cdn.api.bbci.co.uk/en/forecast/aggregated/{location_id}"
try:
response = requests.get(url, timeout=15)
response.raise_for_status()
weather_data = response.json()
forecasts_data = weather_data.get("forecasts", [])
if not forecasts_data:
return "Error: Forecast data not found in the API response."
report = forecasts_data[0]
location_name = report.get("location", {}).get("name")
# --- NEW LOGIC ---
if report_type == 'detailed':
# Extract the detailed, timeseries forecast
detailed_forecast = {
"location_name": location_name,
"issued_at": report.get("issuedAt"),
"detailed_forecast": []
}
for slot in report.get("detailed", {}).get("reports", []):
hour_summary = {
"timestamp": slot.get("localDate"),
"temperature_c": slot.get("temperatureC"),
"feels_like_temp_c": slot.get("feelsLikeTempC"),
"wind_speed_mph": slot.get("windSpeedMph"),
"wind_direction": slot.get("windDirectionAbbreviation"),
"precipitation_probability_percent": slot.get("precipitationProbabilityInPercent"),
"weather_type": slot.get("weatherType")
}
detailed_forecast["detailed_forecast"].append(hour_summary)
return json.dumps(detailed_forecast, indent=2)
else: # Default to 'summary'
# The existing summary logic
summary_report = {
"location_name": location_name,
"issued_at": report.get("issuedAt"),
"daily_summary": []
}
for day in report.get("summary", {}).get("reports", []):
day_summary = {
"date": day.get("localDate"),
"condition": day.get("weatherType"),
"max_temp_c": day.get("maxTempC"),
"min_temp_c": day.get("minTempC"),
}
summary_report["daily_summary"].append(day_summary)
return json.dumps(summary_report, indent=2)
except Exception as e:
return f"An error occurred while processing weather data. Error: {e}"
def parse_html(file_path: str, selectors: Dict[str, str], work_dir: str) -> str:
"""
Parses a local HTML file and extracts data using a dictionary of CSS selectors.
For each key-value pair in the selectors dictionary, it finds elements matching
the selector (value) and stores their text content under the given key.
"""
print(f"Executing Tool 'parse_html' for file: {file_path}")
full_path = Path(work_dir) / file_path
if not full_path.exists():
return f"Error: HTML file not found at {full_path}"
try:
with open(full_path, "r", encoding="utf-8") as f:
html_content = f.read()
soup = BeautifulSoup(html_content, "lxml")
extracted_data = {}
for data_key, selector in selectors.items():
# Find all elements matching the selector
elements = soup.select(selector)
# Extract the text from each element, stripping whitespace
extracted_data[data_key] = [el.get_text(strip=True) for el in elements]
return json.dumps(extracted_data, indent=2)
except Exception as e:
return f"Failed to parse HTML file {file_path}. Error: {e}"
async def scrape_dynamic_site(url: str, work_dir: str) -> str:
"""
Renders a JavaScript-heavy website using a headless browser and saves the
complete, final HTML to a file named 'scraped_page.html'.
"""
print(f"Executing Tool 'scrape_dynamic_site' for url: {url}")
save_path = Path(work_dir) / "scraped_page.html"
try:
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.goto(url, wait_until='networkidle', timeout=30000) # 30s timeout
content = await page.content()
await browser.close()
# Save the full HTML content to the specified file
with open(save_path, "w", encoding="utf-8") as f:
f.write(content)
# Return a success message with the path to the saved file
return json.dumps({
"status": "success",
"url": url,
"saved_to": str(save_path.name) # Return just the filename
})
except Exception as e:
return f"Failed to scrape dynamic site {url}. Error: {e}"
def geocode_address(address: str) -> str:
"""
Converts a physical address or place name into geographic coordinates (latitude and longitude).
"""
print(f"Executing Tool 'geocode_address' for address: {address}")
try:
# Create a geolocator instance. A unique user_agent is good practice.
geolocator = Nominatim(user_agent="data_analyst_agent_v1")
location = geolocator.geocode(address)
if location is None:
return f"Error: Could not find coordinates for the address '{address}'."
result = {
"address": address,
"latitude": location.latitude,
"longitude": location.longitude,
"full_address_found": location.address
}
return json.dumps(result, indent=2)
except Exception as e:
return f"Failed to geocode address. Error: {e}"
def analyze_image_content(image_path: str, prompt: str, work_dir: str, client: openai.Client) -> str:
"""
Analyzes the content of an image file using a multimodal LLM and answers a question about it.
"""
print(f"Executing Tool 'analyze_image_content' for file: {image_path}")
full_path = Path(work_dir) / image_path
if not full_path.exists():
return f"Error: Image file not found at {full_path}"
try:
# Open the image to verify it's a valid image file (optional but good practice)
Image.open(full_path)
# Encode the image to base64
with open(full_path, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode('utf-8')
# Call the multimodal model
response = client.chat.completions.create(
model="openai/gpt-4.1-nano",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}},
],
}
],
max_tokens=500, # Allow for a reasonably detailed description
)
description = response.choices[0].message.content
return json.dumps({"image": image_path, "analysis": description})
except Exception as e:
return f"Failed to analyze image. Error: {e}"
def scrape_wikipedia_summary(query: str) -> str:
"""
Fetches the summary section of a Wikipedia page based on a search query.
"""
print(f"Executing Tool 'scrape_wikipedia_summary' for query: {query}")
try:
# Fetch the summary of the page
summary = wikipedia.summary(query, auto_suggest=True)
result = {
"query": query,
"summary": summary
}
return json.dumps(result, indent=2)
except wikipedia.exceptions.PageError:
return f"Error: Could not find a Wikipedia page for the query '{query}'."
except wikipedia.exceptions.DisambiguationError as e:
return f"Error: The query '{query}' is ambiguous. It could refer to any of the following: {e.options}"
except Exception as e:
return f"Failed to scrape Wikipedia. Error: {e}"
def scrape_pdf_tables(file_path: str, work_dir: str) -> str:
"""
Extracts all tables from a specified page in a PDF file.
"""
print(f"Executing Tool 'scrape_pdf_tables' for file: {file_path}")
full_path = Path(work_dir) / file_path
if not full_path.exists():
return f"Error: PDF file not found at {full_path}"
try:
# read_pdf returns a list of DataFrames, one for each table found
tables_as_dfs = tabula.read_pdf(full_path, pages='all', multiple_tables=True)
if not tables_as_dfs:
return "No tables were found in the PDF file."
# Convert each DataFrame in the list to a JSON string
tables_as_json = [df.to_json(orient='split') for df in tables_as_dfs]
# Return a JSON object containing the list of tables
return json.dumps({"file_name": file_path, "extracted_tables": tables_as_json})
except Exception as e:
return f"Failed to scrape tables from PDF. Make sure Java is installed on the system. Error: {e}"
def get_sentiment(text_to_analyze: str, client: openai.Client) -> str:
"""
Analyzes the sentiment of a given piece of text.
"""
print(f"Executing Tool 'get_sentiment'")
try:
# We use a specific, constrained prompt to force the LLM to be a classifier
response = client.chat.completions.create(
model="openai/gpt-5-nano", # Use a fast and cheap model for this simple task
messages=[
{"role": "system", "content": "You are a sentiment analysis tool. Classify the user's text as 'positive', 'negative', or 'neutral'. Respond with only one of these three words and nothing else."},
{"role": "user", "content": text_to_analyze}
],
max_tokens=5, # Limit the output to a single word
temperature=0.0 # Make the output deterministic
)
sentiment = response.choices[0].message.content.lower().strip()
# Basic validation
if sentiment not in ["positive", "negative", "neutral"]:
return "Error: Could not determine a valid sentiment."
return json.dumps({"text": text_to_analyze, "sentiment": sentiment})
except Exception as e:
return f"Failed to get sentiment. Error: {e}"
def run_sql_query(query: str, db_connection_string: str) -> str:
"""
Executes a SQL query against a specified database and returns the result.
Supports file-based databases like SQLite and DuckDB.
For SQLite, the connection string should be 'sqlite:///path/to/database.db'.
The path should be relative to the agent's working directory.
"""
print(f"Executing Tool 'run_sql_query'")
try:
# Create a database engine from the connection string
engine = create_engine(db_connection_string)
# Execute the query and fetch results into a pandas DataFrame
with engine.connect() as connection:
result_df = pd.read_sql_query(sql=text(query), con=connection)
# Return the result as a JSON string
return result_df.to_json(orient="records")
except Exception as e:
return f"Failed to execute SQL query. Error: {e}"
def calculate_correlation(file_path: str, column1: str, column2: str, work_dir: str) -> str:
"""
Calculates the Pearson correlation coefficient between two specified columns in a data file.
"""
print(f"Executing Tool 'calculate_correlation' for file: {file_path}")
full_path = Path(work_dir) / file_path
if not full_path.exists():
return f"Error: Data file not found at {full_path}"
try:
if file_path.lower().endswith('.csv'):
df = pd.read_csv(full_path)
elif file_path.lower().endswith('.parquet'):
df = pd.read_parquet(full_path)
else:
return f"Error: Unsupported file type."
# Ensure columns exist
if column1 not in df.columns or column2 not in df.columns:
return f"Error: One or both columns ('{column1}', '{column2}') not found in the file."
# Calculate correlation
correlation = df[column1].corr(df[column2])
result = {
"file_name": file_path,
"column_1": column1,
"column_2": column2,
"pearson_correlation": correlation
}
return json.dumps(result, indent=2)
except Exception as e:
return f"Failed to calculate correlation. Error: {e}"
def create_pivot_table(file_path: str, index: str, columns: str, values: str, work_dir: str) -> str:
"""
Creates a pivot table from the data in the specified file.
"""
print(f"Executing Tool 'create_pivot_table' for file: {file_path}")
full_path = Path(work_dir) / file_path
if not full_path.exists():
return f"Error: Data file not found at {full_path}"
try:
if file_path.lower().endswith('.csv'):
df = pd.read_csv(full_path)
elif file_path.lower().endswith('.parquet'):
df = pd.read_parquet(full_path)
else:
return f"Error: Unsupported file type."
# Create the pivot table
pivot_table = pd.pivot_table(df, values=values, index=index, columns=columns, aggfunc=np.sum)
# Return the pivot table as a JSON string
return pivot_table.to_json(orient="split")
except Exception as e:
return f"Failed to create pivot table. Error: {e}"
def get_dataframe_info(file_path: str, work_dir: str) -> str:
"""
Reads a data file (CSV, Parquet) and returns a summary of its contents.
The summary includes column names, data types, and basic statistics.
"""
print(f"Executing Tool 'get_dataframe_info' for file: {file_path}")
full_path = Path(work_dir) / file_path
if not full_path.exists():
return f"Error: Data file not found at {full_path}"
try:
if file_path.lower().endswith('.csv'):
df = pd.read_csv(full_path)
elif file_path.lower().endswith('.parquet'):
df = pd.read_parquet(full_path)
else:
return f"Error: Unsupported file type. Only .csv and .parquet are supported."
# Use a string buffer to capture the output of df.info()
info_buffer = io.StringIO()
df.info(buf=info_buffer)
info_str = info_buffer.getvalue()
# Get the statistical summary
describe_df = df.describe(include='all')
# Combine everything into a single, informative string
summary = {
"file_name": file_path,
"info": info_str,
"statistical_summary": describe_df.to_json(orient="split")
}
return json.dumps(summary, indent=2)
except Exception as e:
return f"Failed to get DataFrame info. Error: {e}"
def fetch_url(url: str) -> str:
"""Fetches text content from a specified URL using the AI Pipe proxy."""
print(f"Executing Tool 'fetch_url' with URL: {url}")
try:
proxy_url = f"https://aipipe.org/proxy/{url}"
response = requests.get(proxy_url, timeout=30)
response.raise_for_status()
return response.text
except requests.RequestException as e:
return f"Error: Failed to fetch URL {url}. Reason: {e}"
def python_interpreter(code: str, work_dir: str) -> str:
"""
Executes Python code in a sandboxed subprocess within a specific working directory.
The code can access any files within its `work_dir`.
If the code generates 'output.png', it will be base64 encoded and returned.
"""
python_executable = sys.executable
print(f"Executing Tool 'python_interpreter' in directory: {work_dir}")
work_path = Path(work_dir)
script_path = work_path / "agent_script.py"
plot_path = work_path / "output.png"
with open(script_path, "w") as f:
f.write(code)
print("\n\n--- 📜 DECODING: SCRIPT TO EXECUTE 📜 ---")
print(code)
print("------------------------------------------\n")
try:
python_executable = sys.executable
# +++ ADD THIS DEBUG LINE +++
print(f"--- [DEBUG] EXECUTING SUBPROCESS WITH PYTHON FROM: {python_executable} ---")
# +++++++++++++++++++++++++++
process = subprocess.run(
[python_executable, str(script_path)],
cwd=work_path, # Run the script from within the temp directory
capture_output=True,
text=True,
timeout=1000,
check=False
)
print("\n\n--- 📤 DECODING: SCRIPT RAW OUTPUT 📤 ---")
print(f"Return Code: {process.returncode}")
print("--- STDOUT ---")
print(process.stdout)
print("--- STDERR ---")
print(process.stderr)
print("------------------------------------------\n")
if process.returncode != 0:
return f"SCRIPT FAILED with return code {process.returncode}:\nSTDOUT:\n{process.stdout}\nSTDERR:\n{process.stderr}"
stdout = process.stdout
# Check if a plot was generated as output.png
if plot_path.exists():
with open(plot_path, "rb") as img_file:
img_base64 = base64.b64encode(img_file.read()).decode('utf-8')
# Prepend the plot's data URI to the stdout
plot_uri = f"data:image/png;base64,{img_base64}"
return f"image_output:\n{plot_uri}\n\ntext_output:\n{stdout}"
# If successful, just return the standard output
return process.stdout
except subprocess.CalledProcessError as e:
return f"SCRIPT FAILED:\n--- STDOUT ---\n{e.stdout}\n--- STDERR ---\n{e.stderr}"
except subprocess.TimeoutExpired:
return "Error: The Python script took too long to execute."
except Exception as e:
return f"An unexpected error occurred: {e}"
|