GAIA / tools.py
hapda12's picture
Upload 12 files
358eb7e verified
"""
核心工具模块 - GAIA Agent 基础工具
包含:web_search, fetch_task_files, read_file, calc, run_python
"""
import os
import re
import json
import tempfile
import zipfile
from typing import Optional
import requests
from langchain_core.tools import tool
from config import (
SCORING_API_URL,
SEARCH_MAX_RESULTS,
MAX_FILE_SIZE,
TOOL_TIMEOUT,
TEMP_DIR,
TAVILY_API_KEY,
WIKIPEDIA_MAX_RESULTS,
ARXIV_MAX_RESULTS,
TAVILY_MAX_RESULTS,
)
# DuckDuckGo 搜索 (新包名: ddgs)
try:
from ddgs import DDGS
except ImportError:
try:
from duckduckgo_search import DDGS
except ImportError:
DDGS = None
# Wikipedia 搜索
try:
import wikipedia
wikipedia.set_lang("en")
except ImportError:
wikipedia = None
# Tavily 搜索
try:
from tavily import TavilyClient
except ImportError:
TavilyClient = None
# arXiv 搜索
try:
import arxiv
except ImportError:
arxiv = None
# ========================================
# 信息获取工具
# ========================================
@tool
def web_search(query: str, max_results: int = SEARCH_MAX_RESULTS) -> str:
"""
使用 DuckDuckGo 搜索网络信息。
适用场景:
- 查找人物信息(生卒年、职业、成就等)
- 查找事件详情(时间、地点、参与者等)
- 查找组织/公司信息
- 获取最新资讯
Args:
query: 搜索关键词,建议使用英文
max_results: 返回结果数量,默认5条
Returns:
搜索结果摘要(标题+内容+URL)
"""
if DDGS is None:
return "搜索服务不可用:请安装 ddgs 库 (pip install ddgs)"
try:
ddgs = DDGS()
results = list(ddgs.text(query, max_results=max_results))
if not results:
return f"没有找到与 '{query}' 相关的搜索结果。"
output = []
for i, r in enumerate(results, 1):
title = r.get('title', 'N/A')
body = r.get('body', 'N/A')
url = r.get('href', 'N/A')
output.append(f"{i}. {title}")
output.append(f" {body}")
output.append(f" URL: {url}")
output.append("")
return "\n".join(output)
except Exception as e:
return f"搜索出错: {type(e).__name__}: {str(e)}"
@tool
def wikipedia_search(query: str, max_results: int = WIKIPEDIA_MAX_RESULTS) -> str:
"""
在维基百科中搜索信息。
适用场景:
- 查找人物传记、历史事件
- 获取概念定义和详细解释
- 查找地理、科学、文化等百科知识
Args:
query: 搜索关键词,建议使用英文
max_results: 返回结果数量,默认2条
Returns:
维基百科文章摘要
"""
if wikipedia is None:
return "Wikipedia 搜索不可用:请安装 wikipedia 库 (pip install wikipedia)"
try:
# 搜索相关页面
search_results = wikipedia.search(query, results=max_results)
if not search_results:
return f"没有找到与 '{query}' 相关的维基百科文章。"
output = []
for i, title in enumerate(search_results, 1):
try:
# 获取页面摘要
page = wikipedia.page(title, auto_suggest=False)
summary = wikipedia.summary(title, sentences=3, auto_suggest=False)
output.append(f"{i}. {page.title}")
output.append(f" {summary}")
output.append(f" URL: {page.url}")
output.append("")
except wikipedia.exceptions.DisambiguationError as e:
# 处理歧义页面,取第一个选项
if e.options:
try:
page = wikipedia.page(e.options[0], auto_suggest=False)
summary = wikipedia.summary(e.options[0], sentences=3, auto_suggest=False)
output.append(f"{i}. {page.title}")
output.append(f" {summary}")
output.append(f" URL: {page.url}")
output.append("")
except:
output.append(f"{i}. {title} (歧义页面,可选: {', '.join(e.options[:3])})")
output.append("")
except wikipedia.exceptions.PageError:
continue
return "\n".join(output) if output else f"没有找到与 '{query}' 相关的详细信息。"
except Exception as e:
return f"Wikipedia 搜索出错: {type(e).__name__}: {str(e)}"
@tool
def wikipedia_page(title: str, section: str = None) -> str:
"""
获取维基百科页面的完整内容。
当 wikipedia_search 返回的摘要不够详细时使用此工具。
特别适用于需要获取列表、表格、详细数据的场景(如专辑列表、获奖记录等)。
Args:
title: 页面标题(从 wikipedia_search 结果中获取)
section: 可选,指定要获取的章节名(如 "Discography", "Awards")
Returns:
页面完整内容或指定章节内容
"""
if wikipedia is None:
return "Wikipedia 不可用:请安装 wikipedia 库 (pip install wikipedia)"
try:
page = wikipedia.page(title, auto_suggest=False)
content = page.content
# 如果指定了章节,尝试提取该章节
if section:
# 查找章节(支持 == Section == 和 === Subsection === 格式)
section_name = section.strip()
heading_re = re.compile(r'^(=+)\s*(.+?)\s*\1\s*$', re.MULTILINE)
headings = list(heading_re.finditer(content))
# 精确匹配
target_idx = None
for i, m in enumerate(headings):
if m.group(2).strip().lower() == section_name.lower():
target_idx = i
break
# 模糊匹配(包含)
matched_label = ""
if target_idx is None:
for i, m in enumerate(headings):
if section_name.lower() in m.group(2).strip().lower():
target_idx = i
matched_label = " (matched)"
break
if target_idx is not None:
level = len(headings[target_idx].group(1))
start = headings[target_idx].end()
end = len(content)
for m in headings[target_idx + 1:]:
if len(m.group(1)) <= level:
end = m.start()
break
section_text = content[start:end].strip()
content = f"{headings[target_idx].group(0)}{matched_label}\n{section_text}"
else:
available = [m.group(2).strip() for m in headings][:20]
content = (
f"未找到 '{section_name}' 章节。\n\n可用章节:\n"
+ "\n".join(available)
+ f"\n\n完整内容:\n{content[:3000]}"
)
# 组装输出并限制长度(保留标题与 URL)
output = f"Wikipedia 页面: {page.title}\nURL: {page.url}\n\n{content}"
if len(output) > MAX_FILE_SIZE:
return output[:MAX_FILE_SIZE] + f"\n\n... [内容已截断,共 {len(output)} 字符]"
return output
except wikipedia.exceptions.DisambiguationError as e:
options = e.options[:10]
return f"'{title}' 是一个歧义页面,请指定更具体的标题:\n" + "\n".join(f" - {opt}" for opt in options)
except wikipedia.exceptions.PageError:
return f"找不到标题为 '{title}' 的维基百科页面。请检查标题拼写或使用 wikipedia_search 搜索。"
except Exception as e:
return f"Wikipedia 页面获取出错: {type(e).__name__}: {str(e)}"
@tool
def tavily_search(query: str, max_results: int = TAVILY_MAX_RESULTS) -> str:
"""
使用 Tavily 进行高质量网络搜索(需要 API Key)。
适用场景:
- 需要高质量、准确的搜索结果
- 查找最新新闻和实时信息
- 需要更智能的搜索结果排序
Args:
query: 搜索关键词
max_results: 返回结果数量,默认3条
Returns:
搜索结果摘要
"""
if TavilyClient is None:
return "Tavily 搜索不可用:请安装 tavily-python 库 (pip install tavily-python)"
if not TAVILY_API_KEY:
return "Tavily 搜索不可用:请在 .env 文件中设置 TAVILY_API_KEY"
try:
client = TavilyClient(api_key=TAVILY_API_KEY)
response = client.search(query, max_results=max_results)
results = response.get('results', [])
if not results:
return f"没有找到与 '{query}' 相关的搜索结果。"
output = []
for i, r in enumerate(results, 1):
title = r.get('title', 'N/A')
content = r.get('content', 'N/A')
url = r.get('url', 'N/A')
output.append(f"{i}. {title}")
output.append(f" {content[:300]}..." if len(content) > 300 else f" {content}")
output.append(f" URL: {url}")
output.append("")
return "\n".join(output)
except Exception as e:
return f"Tavily 搜索出错: {type(e).__name__}: {str(e)}"
@tool
def arxiv_search(query: str, max_results: int = ARXIV_MAX_RESULTS) -> str:
"""
在 arXiv 上搜索学术论文。
适用场景:
- 查找最新学术研究论文
- 搜索特定领域的科学文献
- 获取论文摘要和作者信息
Args:
query: 搜索关键词(建议使用英文学术术语)
max_results: 返回结果数量,默认3条
Returns:
论文信息(标题、作者、摘要、链接)
"""
if arxiv is None:
return "arXiv 搜索不可用:请安装 arxiv 库 (pip install arxiv)"
try:
client = arxiv.Client()
search = arxiv.Search(
query=query,
max_results=max_results,
sort_by=arxiv.SortCriterion.Relevance
)
results = list(client.results(search))
if not results:
return f"没有找到与 '{query}' 相关的 arXiv 论文。"
output = []
for i, paper in enumerate(results, 1):
title = paper.title
authors = ", ".join([a.name for a in paper.authors[:3]])
if len(paper.authors) > 3:
authors += f" 等 ({len(paper.authors)} 位作者)"
summary = paper.summary[:400] + "..." if len(paper.summary) > 400 else paper.summary
published = paper.published.strftime("%Y-%m-%d")
url = paper.entry_id
output.append(f"{i}. {title}")
output.append(f" 作者: {authors}")
output.append(f" 发布日期: {published}")
output.append(f" 摘要: {summary}")
output.append(f" URL: {url}")
output.append("")
return "\n".join(output)
except Exception as e:
return f"arXiv 搜索出错: {type(e).__name__}: {str(e)}"
# ========================================
# YouTube 搜索工具
# ========================================
@tool
def youtube_search(query: str, max_results: int = 3) -> str:
"""
搜索 YouTube 视频信息。
适用场景:
- 查找教程视频
- 搜索特定主题的视频内容
- 获取视频标题、频道和描述
Args:
query: 搜索关键词
max_results: 返回结果数量,默认3条
Returns:
视频信息(标题、频道、链接)
"""
try:
from youtube_search import YoutubeSearch
except ImportError:
# 备选方案:使用 DuckDuckGo 搜索 YouTube
if DDGS is None:
return "YouTube 搜索不可用:请安装 youtube-search-python 库 (pip install youtube-search-python)"
try:
ddgs = DDGS()
results = list(ddgs.text(f"site:youtube.com {query}", max_results=max_results))
if not results:
return f"没有找到与 '{query}' 相关的 YouTube 视频。"
output = []
for i, r in enumerate(results, 1):
title = r.get('title', 'N/A')
url = r.get('href', 'N/A')
output.append(f"{i}. {title}")
output.append(f" URL: {url}")
output.append("")
return "\n".join(output)
except Exception as e:
return f"YouTube 搜索出错: {type(e).__name__}: {str(e)}"
try:
results = YoutubeSearch(query, max_results=max_results).to_dict()
if not results:
return f"没有找到与 '{query}' 相关的 YouTube 视频。"
output = []
for i, video in enumerate(results, 1):
title = video.get('title', 'N/A')
channel = video.get('channel', 'N/A')
duration = video.get('duration', 'N/A')
views = video.get('views', 'N/A')
url_suffix = video.get('url_suffix', '')
url = f"https://youtube.com{url_suffix}" if url_suffix else 'N/A'
output.append(f"{i}. {title}")
output.append(f" 频道: {channel}")
output.append(f" 时长: {duration} | 播放量: {views}")
output.append(f" URL: {url}")
output.append("")
return "\n".join(output)
except Exception as e:
return f"YouTube 搜索出错: {type(e).__name__}: {str(e)}"
# ========================================
# 新闻搜索工具
# ========================================
@tool
def news_search(query: str, max_results: int = 5) -> str:
"""
搜索最新新闻资讯。
适用场景:
- 查找最新新闻事件
- 获取时事热点信息
- 搜索特定主题的新闻报道
Args:
query: 搜索关键词
max_results: 返回结果数量,默认5条
Returns:
新闻标题、来源和摘要
"""
if DDGS is None:
return "新闻搜索不可用:请安装 ddgs 库 (pip install ddgs)"
try:
ddgs = DDGS()
results = list(ddgs.news(query, max_results=max_results))
if not results:
return f"没有找到与 '{query}' 相关的新闻。"
output = []
for i, r in enumerate(results, 1):
title = r.get('title', 'N/A')
body = r.get('body', 'N/A')
source = r.get('source', 'N/A')
date = r.get('date', 'N/A')
url = r.get('url', 'N/A')
output.append(f"{i}. {title}")
output.append(f" 来源: {source} | 日期: {date}")
output.append(f" {body[:200]}..." if len(body) > 200 else f" {body}")
output.append(f" URL: {url}")
output.append("")
return "\n".join(output)
except Exception as e:
return f"新闻搜索出错: {type(e).__name__}: {str(e)}"
# ========================================
# StackOverflow 搜索工具
# ========================================
@tool
def stackoverflow_search(query: str, max_results: int = 3) -> str:
"""
在 StackOverflow 上搜索编程问题和解答。
适用场景:
- 查找编程问题的解决方案
- 搜索代码错误的修复方法
- 获取技术问题的讨论
Args:
query: 搜索关键词(建议包含编程语言或技术栈)
max_results: 返回结果数量,默认3条
Returns:
问题标题、回答数和链接
"""
try:
import requests
# 使用 StackExchange API
api_url = "https://api.stackexchange.com/2.3/search/advanced"
params = {
"order": "desc",
"sort": "relevance",
"q": query,
"site": "stackoverflow",
"pagesize": max_results,
"filter": "withbody"
}
response = requests.get(api_url, params=params, timeout=TOOL_TIMEOUT)
response.raise_for_status()
data = response.json()
items = data.get('items', [])
if not items:
return f"没有找到与 '{query}' 相关的 StackOverflow 问题。"
output = []
for i, item in enumerate(items, 1):
title = item.get('title', 'N/A')
score = item.get('score', 0)
answer_count = item.get('answer_count', 0)
is_answered = "✓ 已解答" if item.get('is_answered') else "○ 待解答"
tags = ", ".join(item.get('tags', [])[:5])
url = item.get('link', 'N/A')
output.append(f"{i}. {title}")
output.append(f" {is_answered} | 得分: {score} | 回答数: {answer_count}")
output.append(f" 标签: {tags}")
output.append(f" URL: {url}")
output.append("")
return "\n".join(output)
except Exception as e:
return f"StackOverflow 搜索出错: {type(e).__name__}: {str(e)}"
# ========================================
# Google 搜索工具
# ========================================
@tool
def google_search(query: str, max_results: int = 5) -> str:
"""
使用 Google 搜索网络信息(通过 DuckDuckGo 代理)。
适用场景:
- 综合网络搜索
- 查找官方网站和权威来源
- 获取多样化的搜索结果
Args:
query: 搜索关键词
max_results: 返回结果数量,默认5条
Returns:
搜索结果(标题+摘要+URL)
注意:
由于 Google API 限制,此工具通过 DuckDuckGo 实现类似功能
"""
# 复用 DuckDuckGo 搜索,但添加 Google 特定的搜索词
if DDGS is None:
return "Google 搜索不可用:请安装 ddgs 库 (pip install ddgs)"
try:
ddgs = DDGS()
results = list(ddgs.text(query, max_results=max_results))
if not results:
return f"没有找到与 '{query}' 相关的搜索结果。"
output = []
for i, r in enumerate(results, 1):
title = r.get('title', 'N/A')
body = r.get('body', 'N/A')
url = r.get('href', 'N/A')
output.append(f"{i}. {title}")
output.append(f" {body}")
output.append(f" URL: {url}")
output.append("")
return "\n".join(output)
except Exception as e:
return f"Google 搜索出错: {type(e).__name__}: {str(e)}"
# ========================================
# 文件处理工具
# ========================================
@tool
def fetch_task_files(task_id: str) -> str:
"""
从评分服务器下载任务相关的附件文件。
当问题涉及附件时必须先调用此工具下载文件,然后使用 read_file 或其他工具读取。
Args:
task_id: 任务 ID(从问题中获取)
Returns:
下载文件的本地路径,或错误信息
"""
try:
url = f"{SCORING_API_URL}/files/{task_id}"
response = requests.get(url, timeout=TOOL_TIMEOUT)
if response.status_code == 404:
return "该任务没有附件文件。"
response.raise_for_status()
# 从 Content-Disposition 获取文件名
content_disp = response.headers.get("Content-Disposition", "")
filename_match = re.search(r'filename="?([^";\n]+)"?', content_disp)
filename = filename_match.group(1) if filename_match else f"task_{task_id}_file"
# 保存到临时目录
file_path = TEMP_DIR / filename
with open(file_path, "wb") as f:
f.write(response.content)
# 返回文件信息和使用建议
file_size = len(response.content)
file_ext = os.path.splitext(filename)[1].lower()
# 根据文件类型给出下一步建议
next_step_hint = ""
if file_ext in ['.xlsx', '.xls']:
next_step_hint = "\n\n⚠️ 下一步:请立即使用 parse_excel(file_path) 工具读取此 Excel 文件内容,不要搜索网络。"
elif file_ext == '.pdf':
next_step_hint = "\n\n⚠️ 下一步:请立即使用 parse_pdf(file_path) 工具读取此 PDF 文件,不要搜索网络。"
elif file_ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp']:
next_step_hint = "\n\n⚠️ 下一步:请使用 image_ocr(file_path) 或 analyze_image(file_path, question) 工具处理此图片。"
elif file_ext in ['.mp3', '.wav', '.m4a', '.ogg']:
next_step_hint = "\n\n⚠️ 下一步:请使用 transcribe_audio(file_path) 工具转写此音频文件。"
elif file_ext in ['.txt', '.csv', '.json', '.md', '.py', '.html', '.xml']:
next_step_hint = "\n\n⚠️ 下一步:请立即使用 read_file(file_path) 工具读取此文件内容。"
elif file_ext == '.zip':
next_step_hint = "\n\n⚠️ 下一步:请使用 read_file(file_path) 工具解压此 ZIP 文件。"
return f"文件已下载到: {file_path}\n文件大小: {file_size} 字节\n文件名: {filename}{next_step_hint}"
except requests.Timeout:
return f"下载超时({TOOL_TIMEOUT}秒),请稍后重试。"
except Exception as e:
return f"下载文件出错: {type(e).__name__}: {str(e)}"
@tool
def read_file(file_path: str, encoding: str = "utf-8") -> str:
"""
读取本地文件内容。
支持格式:txt, csv, json, py, html, xml, zip, md
Args:
file_path: 文件完整路径
encoding: 编码格式,默认 utf-8
Returns:
文件内容(超过指定字符数会截断)
注意:
- ZIP 文件会自动解压并列出内容
- JSON 文件会自动美化输出
- PDF/Excel 需使用专门的扩展工具
"""
try:
if not os.path.exists(file_path):
return f"文件不存在: {file_path}"
file_ext = os.path.splitext(file_path)[1].lower()
# 处理 ZIP 文件
if file_ext == '.zip':
extract_dir = file_path.replace('.zip', '_extracted')
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
files = os.listdir(extract_dir)
file_list = "\n".join(f" - {f}" for f in files)
return f"ZIP 已解压到: {extract_dir}\n包含文件:\n{file_list}"
# 读取文本文件
with open(file_path, 'r', encoding=encoding, errors='ignore') as f:
content = f.read()
# JSON 美化
if file_ext == '.json':
try:
data = json.loads(content)
content = json.dumps(data, indent=2, ensure_ascii=False)
except json.JSONDecodeError:
pass # 保持原始内容
# 限制返回长度
if len(content) > MAX_FILE_SIZE:
return content[:MAX_FILE_SIZE] + f"\n\n... [内容已截断,共 {len(content)} 字符]"
return content
except Exception as e:
return f"读取文件出错: {type(e).__name__}: {str(e)}"
# ========================================
# 计算和代码工具
# ========================================
@tool
def calc(expression: str) -> str:
"""
执行安全的数学计算。
支持:
- 基础运算:+, -, *, /, **, %
- 数学函数:sqrt, sin, cos, tan, log, log10, exp, floor, ceil
- 常量:pi, e
Args:
expression: 数学表达式,如 "2+3*4" 或 "sqrt(16)"
Returns:
计算结果
"""
import math
# 允许的数学函数和常量
safe_dict = {
# 内置函数
'abs': abs, 'round': round, 'min': min, 'max': max,
'sum': sum, 'pow': pow, 'len': len,
# math 函数
'sqrt': math.sqrt, 'sin': math.sin, 'cos': math.cos,
'tan': math.tan, 'log': math.log, 'log10': math.log10,
'exp': math.exp, 'floor': math.floor, 'ceil': math.ceil,
'asin': math.asin, 'acos': math.acos, 'atan': math.atan,
'sinh': math.sinh, 'cosh': math.cosh, 'tanh': math.tanh,
'degrees': math.degrees, 'radians': math.radians,
'factorial': math.factorial, 'gcd': math.gcd,
# 常量
'pi': math.pi, 'e': math.e,
}
try:
# 清理表达式
expression = expression.strip()
# 安全执行
result = eval(expression, {"__builtins__": {}}, safe_dict)
# 格式化结果
if isinstance(result, float):
# 避免浮点数精度问题
if result.is_integer():
return str(int(result))
return str(round(result, 10))
return str(result)
except ZeroDivisionError:
return "计算出错: 除数不能为零"
except ValueError as e:
return f"计算出错: 无效的数学操作 - {str(e)}"
except Exception as e:
return f"计算出错: {type(e).__name__}: {str(e)}"
@tool
def run_python(code: str) -> str:
"""
在沙箱环境中执行 Python 代码。
支持 import 以下模块:
- math: 数学模块
- re: 正则表达式模块
- json: JSON 处理模块
- datetime: 日期时间模块
- collections: 集合工具模块
- random: 随机数模块
- string: 字符串常量模块
- itertools: 迭代器工具模块
- functools: 函数工具模块
可用内置函数:
- 类型: list, dict, set, tuple, str, int, float, bool, bytes
- 函数: print, len, range, enumerate, zip, map, filter, sorted, reversed
- 数值: sum, min, max, abs, round, pow, divmod, all, any
- 转换: ord, chr, hex, bin, oct, isinstance, type, format, repr
适用场景:
- 复杂数学计算
- 数据排序和过滤
- 字符串处理
- 日期计算
Args:
code: Python 代码,需使用 print() 输出结果
Returns:
代码的标准输出
示例:
from datetime import datetime, timedelta
today = datetime(2024, 1, 15)
print(today + timedelta(days=30))
"""
import io
import sys
import math
import re as re_module
import json as json_module
import datetime as datetime_module
import collections as collections_module
import random as random_module
import string as string_module
import itertools as itertools_module
import functools as functools_module
# 允许导入的模块白名单
ALLOWED_MODULES = {
'math': math,
're': re_module,
'json': json_module,
'datetime': datetime_module,
'collections': collections_module,
'random': random_module,
'string': string_module,
'itertools': itertools_module,
'functools': functools_module,
}
def restricted_import(name, globals=None, locals=None, fromlist=(), level=0):
"""受限的 import 函数,只允许导入白名单中的模块"""
if name not in ALLOWED_MODULES:
raise ImportError(f"不允许导入模块 '{name}',可用模块: {', '.join(ALLOWED_MODULES.keys())}")
return ALLOWED_MODULES[name]
# 受限的内置函数
safe_builtins = {
# 类型
'list': list, 'dict': dict, 'set': set, 'tuple': tuple,
'str': str, 'int': int, 'float': float, 'bool': bool,
'bytes': bytes, 'bytearray': bytearray,
# 函数
'print': print, 'len': len, 'range': range, 'enumerate': enumerate,
'zip': zip, 'map': map, 'filter': filter, 'sorted': sorted,
'reversed': reversed, 'iter': iter, 'next': next,
'sum': sum, 'min': min, 'max': max, 'abs': abs, 'round': round,
'pow': pow, 'divmod': divmod,
'all': all, 'any': any,
'isinstance': isinstance, 'type': type,
'ord': ord, 'chr': chr,
'hex': hex, 'bin': bin, 'oct': oct,
'format': format, 'repr': repr,
'hasattr': hasattr, 'getattr': getattr, 'setattr': setattr,
'slice': slice, 'object': object,
# 支持 import
'__import__': restricted_import,
# 常量
'True': True, 'False': False, 'None': None,
}
# 预注入的模块(可以直接使用,也可以 import)
preloaded = {
'math': math,
're': re_module,
'json': json_module,
'datetime': datetime_module.datetime,
'date': datetime_module.date,
'timedelta': datetime_module.timedelta,
'Counter': collections_module.Counter,
'defaultdict': collections_module.defaultdict,
'OrderedDict': collections_module.OrderedDict,
'random': random_module,
}
# 合并命名空间
namespace = {"__builtins__": safe_builtins}
namespace.update(preloaded)
# 捕获 stdout
old_stdout = sys.stdout
sys.stdout = io.StringIO()
try:
exec(code, namespace)
output = sys.stdout.getvalue()
if not output:
return "代码执行成功,无输出。请使用 print() 输出结果。"
# 限制输出长度
if len(output) > MAX_FILE_SIZE:
return output[:MAX_FILE_SIZE] + f"\n\n... [输出已截断,共 {len(output)} 字符]"
return output.strip()
except SyntaxError as e:
return f"语法错误: 第 {e.lineno} 行 - {e.msg}"
except NameError as e:
return f"名称错误: {str(e)}(该函数或变量在沙箱中不可用)"
except Exception as e:
return f"执行出错: {type(e).__name__}: {str(e)}"
finally:
sys.stdout = old_stdout
# ========================================
# 导出工具列表
# ========================================
BASE_TOOLS = [
# 搜索工具
web_search,
wikipedia_search,
wikipedia_page, # 获取完整 Wikipedia 页面内容
tavily_search,
arxiv_search,
youtube_search,
news_search,
stackoverflow_search,
google_search,
# 文件工具
fetch_task_files,
read_file,
# 计算工具
calc,
run_python,
]
# 尝试导入扩展工具
try:
from extension_tools import EXTENSION_TOOLS
ALL_TOOLS = BASE_TOOLS + EXTENSION_TOOLS
except ImportError:
ALL_TOOLS = BASE_TOOLS
# 尝试导入 RAG 工具
try:
from rag import RAG_TOOLS
ALL_TOOLS = ALL_TOOLS + RAG_TOOLS
except ImportError:
pass