|
|
|
|
|
import tempfile |
|
|
|
|
|
from llama_index.core import SimpleDirectoryReader |
|
|
from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec |
|
|
from tavily import AsyncTavilyClient |
|
|
from llama_index.core.tools import FunctionTool |
|
|
from llama_index.core.agent.workflow import AgentWorkflow |
|
|
import numexpr as ne |
|
|
from llama_index.llms.openai import OpenAI |
|
|
import base64 |
|
|
import openai |
|
|
import os |
|
|
from huggingface_hub import InferenceClient |
|
|
from dotenv import load_dotenv |
|
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
|
import re |
|
|
import requests |
|
|
load_dotenv() |
|
|
|
|
|
OPEN_AI = os.getenv("OPENAI_API_KEY").strip() |
|
|
HF_TOKEN = os.environ.get("HF_TOKEN") |
|
|
TAVILY = os.getenv("TAVILY_KEY") |
|
|
client = InferenceClient(HF_TOKEN) |
|
|
|
|
|
def python_interpreter(code: str) -> str: |
|
|
""" |
|
|
Execute Python code and return the output (stdout). |
|
|
Use this when you need to perform complex calculations, string manipulations, |
|
|
or when asked to determine the output of a python script. |
|
|
WARNING: The code will be executed in the current environment. |
|
|
""" |
|
|
|
|
|
buffer = io.StringIO() |
|
|
|
|
|
try: |
|
|
|
|
|
with contextlib.redirect_stdout(buffer): |
|
|
|
|
|
local_scope = {} |
|
|
exec(code, {}, local_scope) |
|
|
|
|
|
output = buffer.getvalue() |
|
|
if not output and local_scope: |
|
|
|
|
|
return f"Code executed successfully. Variables: {local_scope}" |
|
|
return output if output else "Code executed. No output produced." |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error executing code: {str(e)}" |
|
|
|
|
|
def transcribe_audio_openai(audio_path: str) -> str: |
|
|
"""Transcribe audio using OpenAI Whisper API - compatible with Spaces""" |
|
|
try: |
|
|
if not os.path.exists(audio_path): |
|
|
return "Error: Audio file not found." |
|
|
|
|
|
|
|
|
if not OPEN_AI: |
|
|
return "Error: OpenAI API key not configured" |
|
|
|
|
|
|
|
|
openai.api_key = OPEN_AI |
|
|
|
|
|
with open(audio_path, "rb") as audio_file: |
|
|
transcript = openai.audio.transcriptions.create( |
|
|
model="whisper-1", |
|
|
file=audio_file, |
|
|
response_format="text" |
|
|
) |
|
|
|
|
|
return transcript |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error transcribing audio: {str(e)}" |
|
|
|
|
|
def get_youtube_transcript(video_url: str) -> str: |
|
|
"""Extract transcript from a YouTube video URL.""" |
|
|
try: |
|
|
|
|
|
if "youtu.be/" in video_url: |
|
|
video_id = video_url.split("youtu.be/")[1].split("?")[0] |
|
|
elif "youtube.com/watch" in video_url: |
|
|
video_id = video_url.split("v=")[1].split("&")[0] |
|
|
else: |
|
|
return "Invalid YouTube URL" |
|
|
|
|
|
transcript_yt = YouTubeTranscriptApi() |
|
|
|
|
|
fetched_transcript = transcript_yt.fetch(video_id) |
|
|
|
|
|
transcript = [] |
|
|
for snippet in fetched_transcript: |
|
|
transcript.append(snippet.text) |
|
|
|
|
|
text = "\n".join(transcript) |
|
|
return text[:3000] |
|
|
|
|
|
except Exception as e: |
|
|
return f"Unavailable. Error: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
def read_document(file_name: str) -> str: |
|
|
""" |
|
|
Downloads a file from the GAIA source if not present. |
|
|
- If it's a text file (.txt, .py, .csv, .json, .md), returns the content. |
|
|
- If it's a binary file (.xlsx, .pdf, .png, .mp3), returns the file path |
|
|
and instructions to use other tools (like python_interpreter or analyze_image). |
|
|
""" |
|
|
import os |
|
|
import requests |
|
|
|
|
|
|
|
|
file_name = os.path.basename(file_name) |
|
|
|
|
|
|
|
|
base_url = "https://huggingface.co/datasets/gaia-benchmark/GAIA/resolve/main/2023/validation" |
|
|
file_url = f"{base_url}/{file_name}" |
|
|
|
|
|
|
|
|
if not os.path.exists(file_name): |
|
|
try: |
|
|
print(f"📥 Downloading {file_name} from {file_url}...") |
|
|
response = requests.get(file_url) |
|
|
if response.status_code == 200: |
|
|
with open(file_name, "wb") as f: |
|
|
f.write(response.content) |
|
|
else: |
|
|
return f"Error: File {file_name} not found in GAIA source (Status {response.status_code})." |
|
|
except Exception as e: |
|
|
return f"Error downloading file: {str(e)}" |
|
|
|
|
|
|
|
|
_, ext = os.path.splitext(file_name) |
|
|
ext = ext.lower() |
|
|
|
|
|
|
|
|
binary_extensions = ['.xlsx', '.xls', '.png', '.jpg', '.jpeg', '.mp3', '.wav', '.pdf', '.zip'] |
|
|
|
|
|
if ext in binary_extensions: |
|
|
return (f"File '{file_name}' has been downloaded and saved locally. " |
|
|
f"It is a binary file ({ext}). DO NOT read it as text. " |
|
|
f"Use 'python_interpreter' (with pandas for excel) or 'analyze_image' to process it.") |
|
|
|
|
|
|
|
|
try: |
|
|
with open(file_name, "r", encoding='utf-8', errors='ignore') as file: |
|
|
content = file.read() |
|
|
|
|
|
if len(content) > 10000: |
|
|
return content[:10000] + "\n...[Content Truncated]..." |
|
|
return content |
|
|
except Exception as e: |
|
|
return f"Error reading text file: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
def analyze_image(image_path: str, question = """ Describe what you see in this image""") -> str: |
|
|
"""Analyze and extract information from images """ |
|
|
try: |
|
|
if not os.path.exists(image_path): |
|
|
return "Error: Image file not found at the specified path." |
|
|
|
|
|
with open(image_path, "rb") as image_file: |
|
|
image_bytes = image_file.read() |
|
|
image_base64 = base64.b64encode(image_bytes).decode("utf-8") |
|
|
|
|
|
|
|
|
ext = os.path.splitext(image_path)[1].lower() |
|
|
mime_type = { |
|
|
'.jpg': 'image/jpeg', |
|
|
'.jpeg': 'image/jpeg', |
|
|
'.png': 'image/png', |
|
|
'.gif': 'image/gif', |
|
|
'.webp': 'image/webp' |
|
|
}.get(ext, 'image/jpeg') |
|
|
|
|
|
from llama_index.core.base.llms.types import ChatMessage, MessageRole |
|
|
|
|
|
message = ChatMessage( |
|
|
role = MessageRole.USER, |
|
|
content = [ |
|
|
{type: "text", "text": question}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": { |
|
|
"url": f"data:{mime_type};base64,{image_base64}" |
|
|
} |
|
|
} |
|
|
] |
|
|
) |
|
|
|
|
|
llm = OpenAI(api_key=OPEN_AI,model="gpt-4o-mini",temperature=0.7) |
|
|
|
|
|
response = llm.chat([message]) |
|
|
|
|
|
return response.message.content |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error analyzing image: {str(e)}" |
|
|
|
|
|
|
|
|
def calculator_numexpr(expression: str) -> str: |
|
|
""" |
|
|
Evaluate expresiones matem'aticas |
|
|
""" |
|
|
try: |
|
|
expression = expression.strip() |
|
|
|
|
|
|
|
|
result = ne.evaluate(expression) |
|
|
|
|
|
if hasattr(result, 'item'): |
|
|
result = result.item() |
|
|
|
|
|
return f"Result: {result}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error calculating '{expression}': {str(e)}" |
|
|
|
|
|
|
|
|
tool_spec = DuckDuckGoSearchToolSpec() |
|
|
async def search_web(query: str) -> str: |
|
|
"""Useful for using the web to answer questions.""" |
|
|
client = AsyncTavilyClient(api_key=TAVILY) |
|
|
return str(await client.search(query)) |
|
|
|
|
|
|
|
|
|
|
|
python_tool = FunctionTool.from_defaults( |
|
|
fn=python_interpreter, |
|
|
name="python_interpreter", |
|
|
description="Executes Python code. Use this to run code found in files or to perform complex logic." |
|
|
) |
|
|
|
|
|
image_analyzer_tool = FunctionTool.from_defaults( |
|
|
fn = analyze_image, |
|
|
name = "analyze_image", |
|
|
description = "Analyze image to extract information, identify objects and read text" |
|
|
) |
|
|
|
|
|
calculator_tool = FunctionTool.from_defaults( |
|
|
fn=calculator_numexpr, |
|
|
name="calculator", |
|
|
description="Evaluate mathematical expressions including basic operations (+, -, *, /, **) and functions (sqrt, log, sin, cos, etc.)" |
|
|
) |
|
|
|
|
|
read_document_tool = FunctionTool.from_defaults(fn = read_document, |
|
|
name="read_document", |
|
|
description = "Read and extract text content from documents including PDF, DOCX, TXT, MD, CSV, and JSON files") |
|
|
|
|
|
youtube_transcript_tool = FunctionTool.from_defaults( |
|
|
fn=get_youtube_transcript, |
|
|
name="youtube_transcript", |
|
|
description="Get transcript/subtitles from YouTube videos. Use this when you need to know what is said in a YouTube video." |
|
|
) |
|
|
|
|
|
|
|
|
search_tool = FunctionTool.from_defaults( |
|
|
fn=search_web, |
|
|
name="web_search", |
|
|
description="Search the web for current information, facts, and answers to questions" |
|
|
) |