Spaces:
Build error
Build error
| import builtins | |
| import contextlib | |
| import io | |
| from typing import Any | |
| from langchain.chat_models import init_chat_model | |
| from langgraph_codeact import create_codeact | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| from langchain_core.tools import tool | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| from langchain_core.tools import tool | |
| from langchain_community.document_loaders import WikipediaLoader, ArxivLoader | |
| from openai import OpenAI # audio | |
| import os | |
| import requests | |
| import subprocess | |
| from typing import Optional, Any | |
| import tempfile | |
| from urllib.parse import urlparse | |
| import uuid | |
| from langchain_tavily import TavilySearch | |
| from pathlib import Path # new import | |
| def eval(code: str, _locals: dict[str, Any]) -> tuple[str, dict[str, Any]]: | |
| # Store original keys before execution | |
| original_keys = set(_locals.keys()) | |
| try: | |
| with contextlib.redirect_stdout(io.StringIO()) as f: | |
| exec(code, builtins.__dict__, _locals) | |
| result = f.getvalue() or "<code ran, no output printed to stdout>" | |
| except Exception as e: | |
| result = f"Error during execution: {repr(e)}" | |
| # Determine new variables created during execution | |
| new_keys = set(_locals.keys()) - original_keys | |
| new_vars = {key: _locals[key] for key in new_keys} | |
| return result, new_vars | |
| # Tools | |
| def execute_python_code(code_path: str) -> str: | |
| """ | |
| Execute a Python script and return the final output or error. | |
| Args: | |
| code_path (str): the path to the Python file to be executed | |
| """ | |
| try: | |
| if not os.path.exists(code_path): | |
| return f"Error: file not found at {code_path}" | |
| # Execute the Python file and capture output | |
| result = subprocess.run( | |
| ['python', code_path], | |
| capture_output=True, | |
| text=True, | |
| check=True | |
| ) | |
| return result.stdout | |
| except subprocess.CalledProcessError as e: | |
| # Capture any error that occurs during execution | |
| return f"Execution error: {e.stderr}" | |
| except Exception as e: | |
| return f"Unexpected error: {str(e)}" | |
| #@tool | |
| #def speech_to_text(file_path: str) -> str: | |
| # """ | |
| # Transcribe an audio file from a local path to text. | |
| # Args: | |
| # file_path (str): Local path of the audio file to be transcribed. | |
| # """ | |
| # client = OpenAI() | |
| # try: | |
| # Check if the file exists | |
| # if not os.path.exists(file_path): | |
| # return f"Error: file not found at {file_path}" | |
| # Step 2: Transcribe the audio | |
| # with open(file_path, "rb") as file: | |
| # transcription = client.audio.transcriptions.create( | |
| # model="gpt-4o-mini-transcribe", | |
| # file=file | |
| # ) | |
| # print(f"Transcription result: {transcription['text']}") | |
| # return transcription["text"] | |
| # except Exception as e: | |
| # return f"Error during transcription: {str(e)}" | |
| def speech_to_text(file_path: str) -> str: | |
| """ | |
| Transcribe an audio file from a local path to text. | |
| Args: | |
| file_path (str): Local path of the audio file to be transcribed. | |
| """ | |
| client = OpenAI() | |
| try: | |
| # Check if the file exists | |
| if not os.path.exists(file_path): | |
| return f"Error: file not found at {file_path}" | |
| # Transcribe the audio | |
| with open(file_path, "rb") as file: | |
| transcription = client.audio.transcriptions.create( | |
| model="gpt-4o-mini-transcribe", | |
| file=file | |
| ) | |
| print(f"Transcription result: {transcription['text']}") | |
| return transcription["text"] | |
| except Exception as e: | |
| return f"Error during transcription: {str(e)}" | |
| def web_search(query: str) -> str: | |
| """ | |
| Search Tavily for a query and return formatted results. | |
| Args: | |
| query (str): The search query. | |
| Returns: | |
| str: A formatted string with the search results. | |
| """ | |
| try: | |
| search_tool = TavilySearch(max_results=3, topic="general") | |
| search_response = search_tool.invoke(input=query) | |
| # Check if the response contains results | |
| if search_response and "results" in search_response: | |
| results = search_response["results"] | |
| return "\n\n---\n\n".join( | |
| [ | |
| f"Title: {result['title']}\nURL: {result['url']}\nContent: {result['content']}" | |
| for result in results | |
| ] | |
| ) | |
| else: | |
| return "No results found." | |
| except Exception as e: | |
| print(f"Error during web search: {str(e)}") | |
| return f"Error during web search: {str(e)}" | |
| def arvix_search(query: str) -> str: | |
| """ | |
| Search Arxiv for a query and return maximum 3 results. | |
| Args: | |
| query: The search query. | |
| """ | |
| try: | |
| search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
| return "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata.get("source", "Unknown")}" page="{doc.metadata.get("page", "N/A")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
| for doc in search_docs | |
| ] | |
| ) | |
| except Exception as e: | |
| return f"Error during Arxiv search: {str(e)}" | |
| def save_and_read_file(content: str, filename: Optional[str] = None) -> str: | |
| """ | |
| Save content to a file and return the path. | |
| Args: | |
| content (str): the content to save to the file | |
| filename (str, optional): the name of the file. If not provided, a random name file will be created. | |
| """ | |
| temp_dir = tempfile.gettempdir() | |
| if filename is None: | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) | |
| filepath = temp_file.name | |
| else: | |
| filepath = os.path.join(temp_dir, filename) | |
| with open(filepath, "w") as f: | |
| f.write(content) | |
| return f"File saved to {filepath}. You can read this file to process its contents." | |
| def read_file(file_path: str) -> str: | |
| """ | |
| Return the raw text of a local file. | |
| Args: | |
| file_path (str): Local path of the file to be read. | |
| """ | |
| try: | |
| with open(file_path, "r", encoding="utf‑8", errors="ignore") as f: | |
| return f.read() | |
| except Exception as e: | |
| return f"Error reading {file_path}: {e}" | |
| def download_file_from_url(url: str, filename: Optional[str] = None) -> str: | |
| """ | |
| Download a file from a URL and save it to a temporary location. | |
| Args: | |
| url (str): the URL of the file to download. | |
| filename (str, optional): the name of the file. If not provided, a random name file will be created. | |
| """ | |
| try: | |
| # Parse URL to get filename if not provided | |
| if not filename: | |
| path = urlparse(url).path | |
| filename = os.path.basename(path) | |
| if not filename: | |
| filename = f"downloaded_{uuid.uuid4().hex[:8]}" | |
| # Create temporary file | |
| temp_dir = tempfile.gettempdir() | |
| filepath = os.path.join(temp_dir, filename) | |
| # Download the file | |
| response = requests.get(url, stream=True) | |
| response.raise_for_status() | |
| # Save the file | |
| with open(filepath, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return f"File downloaded to {filepath}. You can read this file to process its contents." | |
| except Exception as e: | |
| return f"Error downloading file: {str(e)}" | |
| tools = [ | |
| #execute_python_code, | |
| #speech_to_text, | |
| web_search, | |
| #arvix_search, | |
| #wiki_search | |
| #read_file, | |
| #save_and_read_file, | |
| #download_file_from_url | |
| ] | |
| class CodeActAgent: | |
| def __init__( | |
| self, | |
| system_prompt_path: str = "prompts/system_prompt.txt", | |
| model: str = "gpt-4o" | |
| ) -> None: | |
| self.system_prompt = Path(system_prompt_path).read_text(encoding="utf-8")#self.read_system_prompt(system_prompt_path) | |
| self.llm = init_chat_model(model, model_provider="openai") | |
| self.compiled_agent = create_codeact( | |
| self.llm, | |
| tools, | |
| eval | |
| ).compile() | |
| def read_system_prompt(self, path:str) -> str: | |
| with open(path, "r") as file: | |
| return file.read() | |
| def extract_final_answer(self, response: str) -> str: | |
| if "FINAL ANSWER:" in response: | |
| answer = response.split("FINAL ANSWER:")[1].strip().replace(".", "") | |
| else: | |
| # fallback if model did not follow instruction perfectly | |
| answer = response.strip() | |
| # Remove trailing period, but only at the end | |
| if answer.endswith("."): | |
| answer = answer[:-1].strip() | |
| return answer | |
| def __call__( | |
| self, question: str, | |
| #file_path: str=None | |
| ) -> str: | |
| inputs = { | |
| "messages": [ | |
| SystemMessage(content=self.system_prompt), | |
| HumanMessage(content=question) | |
| ] | |
| } | |
| # Add file path if available | |
| #inputs["file_path"] = file_path or None # type: ignore | |
| # Run the graph with the inputs | |
| result = self.compiled_agent.invoke( | |
| inputs, | |
| config={ | |
| "configurable": {"thread_id": "benchmark-test"} | |
| } | |
| ) | |
| final_msg = result["messages"][-1].content | |
| return self.extract_final_answer(final_msg) | |