| import os |
| import io |
| import contextlib |
| import pandas as pd |
| from typing import Dict, List, Union |
| import re |
|
|
| from PIL import Image as PILImage |
| from huggingface_hub import InferenceClient |
|
|
| from langgraph.graph import START, StateGraph, MessagesState |
| from langgraph.prebuilt import tools_condition, ToolNode |
| from langchain_openai import ChatOpenAI |
| from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint |
| from langchain_community.document_loaders import WikipediaLoader, ArxivLoader |
| from langchain_core.messages import SystemMessage, HumanMessage, ToolMessage |
| from langchain_google_genai import ChatGoogleGenerativeAI |
| from langchain_core.tools import tool |
| from langchain_google_community import GoogleSearchAPIWrapper |
|
|
| @tool |
| def multiply(a: int, b: int) -> int: |
| """Multiply two integers.""" |
| return a * b |
|
|
| @tool |
| def add(a: int, b: int) -> int: |
| """Add two integers.""" |
| return a + b |
|
|
| @tool |
| def subtract(a: int, b: int) -> int: |
| """Subtract the second integer from the first.""" |
| return a - b |
|
|
| @tool |
| def divide(a: int, b: int) -> float: |
| """Divide first integer by second; error if divisor is zero.""" |
| if b == 0: |
| raise ValueError("Cannot divide by zero.") |
| return a / b |
|
|
| @tool |
| def modulus(a: int, b: int) -> int: |
| """Return the remainder of dividing first integer by second.""" |
| return a % b |
|
|
| @tool |
| def wiki_search(query: str) -> dict: |
| """Search Wikipedia for a query and return up to 2 documents.""" |
| try: |
| docs = WikipediaLoader(query=query, load_max_docs=5, lang="en", doc_content_chars_max=7000).load() |
| if not docs: |
| return {"wiki_results": f"No documents found on Wikipedia for '{query}'."} |
| formatted = "\n\n---\n\n".join( |
| f'<Document source="{d.metadata.get("source", "N/A")}"/>\n{d.page_content}' |
| for d in docs |
| ) |
| return {"wiki_results": formatted} |
| except Exception as e: |
| print(f"Error in wiki_search tool: {e}") |
| return {"wiki_results": f"Error occurred while searching Wikipedia for '{query}'. Details: {str(e)}"} |
|
|
| search = GoogleSearchAPIWrapper() |
|
|
| @tool |
| def google_web_search(query: str) -> str: |
| """Perform a web search (via Google Custom Search) and return results.""" |
| try: |
| return search.run(query) |
| except Exception as e: |
| print(f"Error in google_web_search tool: {e}") |
| return f"Error occurred while searching the web for '{query}'. Details: {str(e)}" |
|
|
| @tool |
| def arvix_search(query: str) -> dict: |
| """Search arXiv for a query and return up to 3 paper excerpts.""" |
| docs = ArxivLoader(query=query, load_max_docs=3).load() |
| formatted = "\n\n---\n\n".join( |
| f'<Document source="{d.metadata["source"]}"/>\n{d.page_content[:1000]}' |
| for d in docs |
| ) |
| return {"arvix_results": formatted} |
|
|
| |
| |
| HF_API_TOKEN = os.getenv("HF_API_TOKEN") |
| MODEL = os.getenv("MODEL") |
| HF_INFERENCE_CLIENT = None |
| if HF_API_TOKEN: |
| HF_INFERENCE_CLIENT = InferenceClient(token=HF_API_TOKEN) |
| else: |
| print("WARNING: HF_API_TOKEN not set. If any other HF tools are used, they might not function.") |
|
|
| @tooldef |
| def read_file_content(file_path: str) -> Dict[str, str]: |
| """Reads the content of a file and returns its primary information. For text/code/excel, returns content. For media, indicates it's a blob for LLM processing.""" |
| try: |
| _, file_extension = os.path.splitext(file_path) |
| file_extension = file_extension.lower() |
|
|
| |
| if file_extension in (".mp4", ".avi", ".mov", ".mkv", ".webm"): |
| return {"file_type": "video", "file_name": file_path, "file_content": f"Video file '{file_path}' detected. The LLM (Gemini 2.5 Pro) can process this video content directly as a blob."} |
| elif file_extension == ".mp3": |
| return {"file_type": "audio", "file_name": file_path, "file_content": f"Audio file '{file_path}' detected. The LLM (Gemini 2.5 Pro) can process this audio content directly as a blob."} |
| elif file_extension in (".jpeg", ".jpg", ".png"): |
| return {"file_type": "image", "file_name": file_path, "file_content": f"Image file '{file_path}' detected. The LLM (Gemini 2.5 Pro) can process this image content directly as a blob."} |
| |
| |
| elif file_extension in (".txt", ".py"): |
| with open(file_path, "r", encoding="utf-8") as f: |
| content = f.read() |
| return {"file_type": "text/code", "file_name": file_path, "file_content": content} |
| |
| |
| elif file_extension == ".xlsx": |
| df = pd.read_excel(file_path) |
| content = df.to_string() |
| return {"file_type": "excel", "file_name": file_path, "file_content": content} |
| |
| else: |
| return {"file_type": "unsupported", "file_name": file_path, "file_content": f"Unsupported file type: {file_extension}. Only .txt, .py, .xlsx, .jpeg, .jpg, .png, .mp3, .mp4, .avi, .mov, .mkv, .webm files are recognized."} |
| |
| except FileNotFoundError: |
| return {"file_error": f"File not found: {file_path}. Please ensure the file exists in the environment."} |
| except Exception as e: |
| return {"file_error": f"Error reading file {file_path}: {e}"} |
|
|
|
|
| @tool |
| def python_interpreter(code: str) -> Dict[str, str]: |
| """Executes Python code and returns its standard output. If there's an error during execution, it returns the error message.""" |
| old_stdout = io.StringIO() |
| with contextlib.redirect_stdout(old_stdout): |
| try: |
| exec_globals = {} |
| exec_locals = {} |
| exec(code, exec_globals, exec_locals) |
| output = old_stdout.getvalue() |
| return {"execution_result": output.strip()} |
| except Exception as e: |
| return {"execution_error": str(e)} |
|
|
| |
| @tool |
| def Youtube(url: str, question: str) -> Dict[str, str]: |
| """ |
| Tells about the YouTube video identified by the given URL, answering a question about it. |
| Note: This is a simulated response. In a real application, this would interact with a YouTube API |
| or a video analysis service to get actual video information and transcripts. |
| """ |
| print(f"Youtube called with URL: {url}, Question: {question}") |
| |
| |
| |
| |
|
|
| |
| if "https://www.youtube.com/watch?v=1htKBjuUWec" in url or re.search(r'youtube\.com/watch\?v=|youtu\.be/', url): |
| return { |
| "video_url": url, |
| "question_asked": question, |
| "video_summary": "The video titled 'Teal'c coffee first time' shows a scene where several individuals are reacting to a beverage, presumably coffee, that Teal'c is trying for the first time. Key moments include: A person off-screen remarking, 'Wow this coffee's great'; another asking if it's 'cinnamon chicory tea oak'; and Teal'c reacting strongly to the taste or temperature, stating 'isn't that hot' indicating he finds it very warm.", |
| "details": { |
| "00:00:00": "Someone remarks, 'Wow this coffee's great I was just thinking that yeah is that cinnamon chicory tea oak'", |
| "00:00:11": "Teal'c takes a large gulp from a black mug", |
| "00:00:24": "Teal'c reacts strongly, someone asks 'isn't that hot'", |
| "00:00:26": "Someone agrees, 'extremely'" |
| } |
| } |
| else: |
| return {"error": "Invalid or unrecognized YouTube URL.", "url": url} |
|
|
| |
|
|
| API_KEY = os.getenv("GEMINI_API_KEY") |
| HF_API_TOKEN = os.getenv("HF_SPACE_TOKEN") |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
|
| |
| tools = [ |
| multiply, add, subtract, divide, modulus, |
| wiki_search, |
| google_web_search, |
| arvix_search, |
| read_file_content, |
| python_interpreter, |
| Youtube, |
| ] |
|
|
| with open("prompt.txt", "r", encoding="utf-8") as f: |
| system_prompt = f.read() |
| sys_msg = SystemMessage(content=system_prompt) |
|
|
| def build_graph(provider: str = "gemini"): |
| if provider == "gemini": |
| llm = ChatGoogleGenerativeAI( |
| model=MODEL, |
| temperature=1.0, |
| max_retries=2, |
| api_key=GEMINI_API_KEY, |
| max_tokens=5000 |
| ) |
| elif provider == "huggingface": |
| llm = ChatHuggingFace( |
| llm=HuggingFaceEndpoint( |
| url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf", |
| ), |
| temperature=0, |
| ) |
| else: |
| raise ValueError("Invalid provider. Choose 'gemini' or 'huggingface'.") |
|
|
| llm_with_tools = llm.bind_tools(tools) |
|
|
| def assistant(state: MessagesState): |
| messages_to_send = [sys_msg] + state["messages"] |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| llm_response = llm_with_tools.invoke(messages_to_send) |
| print(f"LLM Raw Response: {llm_response}") |
| return {"messages": [llm_response]} |
|
|
| builder = StateGraph(MessagesState) |
| builder.add_node("assistant", assistant) |
| builder.add_node("tools", ToolNode(tools)) |
| builder.add_edge(START, "assistant") |
| builder.add_conditional_edges("assistant", tools_condition) |
| builder.add_edge("tools", "assistant") |
|
|
| return builder.compile() |
|
|
| if __name__ == "__main__": |
| pass |