Spaces:
Runtime error
Runtime error
| import os | |
| from dotenv import load_dotenv | |
| from typing import List, Dict, Any, Optional | |
| import tempfile | |
| import re | |
| import json | |
| import requests | |
| from urllib.parse import urlparse | |
| import pytesseract | |
| from PIL import Image, ImageDraw, ImageFont, ImageEnhance, ImageFilter | |
| import cmath | |
| import pandas as pd | |
| import uuid | |
| import numpy as np | |
| from code_interpreter import CodeInterpreter | |
| # bring in your image processing helpers | |
| from image_processing import encode_image, decode_image, save_image | |
| # LangGraph and tooling imports | |
| from langgraph.graph import START, StateGraph, MessagesState | |
| from langgraph.prebuilt import ToolNode, tools_condition | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_community.document_loaders import WikipediaLoader, ArxivLoader | |
| from langchain_community.vectorstores import SupabaseVectorStore | |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
| from langchain_core.tools import tool | |
| from supabase.client import create_client | |
| from langchain_openai import ChatOpenAI | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| # Initialize environment | |
| load_dotenv() | |
| # Initialize code interpreter for execute_code_multilang tool | |
| interpreter_instance = CodeInterpreter() | |
| # === TOOL DEFINITIONS === | |
| def wiki_search(query: str) -> str: | |
| """ | |
| Search Wikipedia for a query and return up to 2 formatted results. | |
| """ | |
| docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
| return {"wiki_results": "\n\n---\n\n".join( | |
| f'<Document source="{d.metadata["source"]}" page="{d.metadata.get("page","")}"/>\n{d.page_content}\n</Document>' | |
| for d in docs | |
| )} | |
| def web_search(query: str) -> str: | |
| """ | |
| Search the web via Tavily for a query and return up to 3 formatted results. | |
| """ | |
| docs = TavilySearchResults(max_results=3).invoke(query=query) | |
| return {"web_results": "\n\n---\n\n".join( | |
| f'<Document source="{d.metadata["source"]}" page="{d.metadata.get("page","")}"/>\n{d.page_content}\n</Document>' | |
| for d in docs | |
| )} | |
| def arxiv_search(query: str) -> str: | |
| """ | |
| Search arXiv for a query and return up to 3 formatted results. | |
| """ | |
| docs = ArxivLoader(query=query, load_max_docs=3).load() | |
| return {"arxiv_results": "\n\n---\n\n".join( | |
| f'<Document source="{d.metadata["source"]}" page="{d.metadata.get("page","")}"/>\n{d.page_content[:1000]}\n</Document>' | |
| for d in docs | |
| )} | |
| def execute_code_multilang(code: str, language: str = "python") -> str: | |
| """ | |
| Execute code in multiple languages (Python, Bash, SQL, C, Java) and return execution output. | |
| """ | |
| return interpreter_instance.execute_code(code, language=language) | |
| # example numeric tools | |
| def multiply(a: float, b: float) -> float: | |
| """ | |
| Multiply two numbers and return the product. | |
| """ | |
| return a * b | |
| def add(a: float, b: float) -> float: | |
| """ | |
| Add two numbers and return the sum. | |
| """ | |
| return a + b | |
| def subtract(a: float, b: float) -> float: | |
| """ | |
| Subtract the second number from the first and return the result. | |
| """ | |
| return a - b | |
| def divide(a: float, b: float) -> float: | |
| """ | |
| Divide the first number by the second; raises error if division by zero. | |
| """ | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| def modulus(a: int, b: int) -> int: | |
| """ | |
| Return the remainder of a divided by b. | |
| """ | |
| return a % b | |
| def power(a: float, b: float) -> float: | |
| """ | |
| Raise a to the power of b and return the result. | |
| """ | |
| return a ** b | |
| def square_root(a: float) -> float | complex: | |
| """ | |
| Return the square root of a number; returns complex for negative inputs. | |
| """ | |
| if a >= 0: | |
| return a ** 0.5 | |
| return cmath.sqrt(a) | |
| # file and document tools (save/read, download, OCR, CSV/Excel) | |
| def save_and_read_file(content: str, filename: Optional[str] = None) -> str: | |
| """ | |
| Save content to a temporary file and return the file path. | |
| """ | |
| temp_dir = tempfile.gettempdir() | |
| filepath = os.path.join(temp_dir, filename or f"file_{uuid.uuid4().hex[:8]}.txt") | |
| with open(filepath, "w") as f: | |
| f.write(content) | |
| return f"Saved to {filepath}" | |
| def download_file_from_url(url: str, filename: Optional[str] = None) -> str: | |
| """ | |
| Download a file from a URL, save locally, and return the file path or error string. | |
| """ | |
| try: | |
| fname = filename or os.path.basename(urlparse(url).path) or f"file_{uuid.uuid4().hex[:8]}" | |
| path = os.path.join(tempfile.gettempdir(), fname) | |
| resp = requests.get(url, stream=True) | |
| resp.raise_for_status() | |
| with open(path, "wb") as f: | |
| for chunk in resp.iter_content(8192): | |
| f.write(chunk) | |
| return f"Downloaded to {path}" | |
| except Exception as e: | |
| return str(e) | |
| def extract_text_from_image(image_path: str) -> str: | |
| """ | |
| Extract and return text from an image file using OCR. | |
| """ | |
| try: | |
| img = Image.open(image_path) | |
| return pytesseract.image_to_string(img) | |
| except Exception as e: | |
| return str(e) | |
| def analyze_csv_file(file_path: str, query: str) -> str: | |
| """ | |
| Analyze a CSV file: return row/column counts and summary statistics. | |
| """ | |
| df = pd.read_csv(file_path) | |
| return f"Rows: {len(df)}, Columns: {list(df.columns)}\n{df.describe()}" | |
| def analyze_excel_file(file_path: str, query: str) -> str: | |
| """ | |
| Analyze an Excel file: return row/column counts and summary statistics. | |
| """ | |
| df = pd.read_excel(file_path) | |
| return f"Rows: {len(df)}, Columns: {list(df.columns)}\n{df.describe()}" | |
| # image analysis/transforms | |
| def analyze_image(image_base64: str) -> Dict[str, Any]: | |
| """ | |
| Analyze a base64-encoded image: return dimensions and mode. | |
| """ | |
| img = decode_image(image_base64) | |
| w, h = img.size | |
| return {"dimensions": (w, h), "mode": img.mode} | |
| def transform_image(image_base64: str, operation: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| """ | |
| Apply a transformation to a base64-encoded image; placeholder implementation. | |
| """ | |
| img = decode_image(image_base64) | |
| # operations logic here | |
| return {"error": "placeholder"} | |
| # combine all tools into list | |
| tools = [ | |
| wiki_search, web_search, arxiv_search, | |
| execute_code_multilang, | |
| multiply, add, subtract, divide, modulus, power, square_root, | |
| save_and_read_file, download_file_from_url, extract_text_from_image, | |
| analyze_csv_file, analyze_excel_file, | |
| analyze_image, transform_image | |
| ] | |
| # system prompt loader | |
| with open("system_prompt.txt", "r", encoding="utf-8") as f: | |
| sys_msg = SystemMessage(content=f.read()) | |
| # vectorstore setup (Supabase) | |
| emb = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-mpnet-base-v2" | |
| ) | |
| sup = create_client( | |
| os.getenv("SUPABASE_URL"), | |
| os.getenv("SUPABASE_SERVICE_ROLE_KEY") | |
| ) | |
| vector_store = SupabaseVectorStore( | |
| client=sup, | |
| embedding=emb, | |
| table_name=os.getenv("VECTORTABLE_NAME","documents2"), | |
| query_name=os.getenv("VECTOR_QUERY_NAME","match_documents_langchain") | |
| ) | |
| def build_graph(): | |
| """ | |
| Build the LangGraph agent using OpenAI ChatGPT only. | |
| """ | |
| # Initialize the OpenAI LLM | |
| llm = ChatOpenAI( | |
| model="gpt-3.5-turbo", | |
| temperature=0, | |
| openai_api_key=os.getenv("OPENAI_API_KEY") | |
| ) | |
| llm_with_tools = llm.bind_tools(tools) | |
| # Retriever: try vector lookup first, else prompt LLM | |
| def retriever(state: MessagesState): | |
| query = state["messages"][0].content | |
| hits = vector_store.similarity_search(query, k=1) | |
| if hits: | |
| return {"messages": [sys_msg, HumanMessage(content=hits[0].page_content)]} | |
| resp = llm_with_tools.invoke([sys_msg] + state["messages"]) | |
| return {"messages": [resp]} | |
| # Assistant: always call LLM-with-tools | |
| def assistant(state: MessagesState): | |
| resp = llm_with_tools.invoke(state["messages"]) | |
| return {"messages": [resp]} | |
| # Wire up the graph | |
| builder = StateGraph(MessagesState) | |
| builder.add_node("retriever", retriever) | |
| builder.add_node("assistant", assistant) | |
| builder.add_node("tools", ToolNode(tools)) | |
| builder.add_edge(START, "retriever") | |
| builder.add_edge("retriever", "assistant") | |
| builder.add_conditional_edges("assistant", tools_condition) | |
| builder.add_edge("tools", "assistant") | |
| return builder.compile() | |
| # Optional test | |
| if __name__ == "__main__": | |
| graph = build_graph() | |
| msgs = graph.invoke({"messages": [HumanMessage(content="Hello world")]}) | |
| for m in msgs["messages"]: | |
| print(m.content) | |