"""LangGraph Agent""" import os from dotenv import load_dotenv from langgraph.graph import START, StateGraph, MessagesState from langgraph.prebuilt import tools_condition from langgraph.prebuilt import ToolNode from langchain_google_genai import ChatGoogleGenerativeAI from langchain_groq import ChatGroq from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint, HuggingFaceEmbeddings from langchain_community.tools.tavily_search import TavilySearchResults from langchain_community.document_loaders import WikipediaLoader from langchain_community.document_loaders import ArxivLoader from langchain_community.vectorstores import SupabaseVectorStore from langchain_core.messages import SystemMessage, HumanMessage from langchain_core.tools import tool from langchain.tools.retriever import create_retriever_tool from supabase.client import Client, create_client import requests # NEW: for HTTP requests to scoring API from dataclasses import dataclass import time # For timestamp in code agent wrapper from code_agent import run_agent # Compiled code-interpreter graph helper from langchain_core.messages import AIMessage from langfuse.langchain import CallbackHandler # Initialize Langfuse CallbackHandler for LangGraph/Langchain (tracing) try: langfuse_handler = CallbackHandler() except Exception as e: print(f"Warning: Could not initialize Langfuse handler: {e}") langfuse_handler = None # Load environment variables - try multiple files load_dotenv() # Try .env first load_dotenv("env.local") # Try env.local as backup print(f"SUPABASE_URL loaded: {bool(os.environ.get('SUPABASE_URL'))}") print(f"GROQ_API_KEY loaded: {bool(os.environ.get('GROQ_API_KEY'))}") # Base URL of the scoring API (duplicated here to avoid circular import with basic_agent) DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" @tool def wiki_search(input: str) -> str: """Search Wikipedia for a query and return maximum 2 results. Args: input: The search query.""" try: search_docs = WikipediaLoader(query=input, load_max_docs=2).load() if not search_docs: return {"wiki_results": "No Wikipedia results found for the query."} formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ]) return {"wiki_results": formatted_search_docs} except Exception as e: print(f"Error in wiki_search: {e}") return {"wiki_results": f"Error searching Wikipedia: {e}"} @tool def web_search(input: str) -> str: """Search Tavily for a query and return maximum 3 results. Args: input: The search query.""" try: # TavilySearchResults.invoke expects the query string as its first ("input") argument. # Passing it positionally avoids the earlier `missing 1 required positional argument: 'input'` error. search_docs = TavilySearchResults(max_results=3).invoke(input) if not search_docs: return {"web_results": "No web search results found for the query."} formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.get("content", "No content")}\n' for doc in search_docs ]) return {"web_results": formatted_search_docs} except Exception as e: print(f"Error in web_search: {e}") return {"web_results": f"Error searching web: {e}"} @tool def arvix_search(input: str) -> str: """Search Arxiv for a query and return maximum 3 result. Args: input: The search query.""" try: search_docs = ArxivLoader(query=input, load_max_docs=3).load() if not search_docs: return {"arvix_results": "No Arxiv results found for the query."} formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content[:1000]}\n' for doc in search_docs ]) return {"arvix_results": formatted_search_docs} except Exception as e: print(f"Error in arvix_search: {e}") return {"arvix_results": f"Error searching Arxiv: {e}"} @tool def run_python(input: str) -> str: """Execute Python code in a restricted sandbox (code-interpreter). Pass **any** coding or file-manipulation task here and the agent will compute the answer by running Python. The entire standard library is NOT available; heavy networking is disabled. Suitable for: math, data-frames, small file parsing, algorithmic questions. """ return run_agent(input) # load the system prompt from the file with open("system_prompt.txt", "r", encoding="utf-8") as f: system_prompt = f.read() # System message sys_msg = SystemMessage(content=system_prompt) # build a retriever embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") # dim=768 # Try to create Supabase client with error handling try: supabase_url = os.environ.get("SUPABASE_URL") supabase_key = os.environ.get("SUPABASE_SERVICE_KEY") if not supabase_url or not supabase_key: print("Warning: Supabase credentials not found, vector store will be disabled") vector_store = None create_retriever_tool = None else: supabase: Client = create_client(supabase_url, supabase_key) vector_store = SupabaseVectorStore( client=supabase, embedding= embeddings, table_name="documents", query_name="match_documents_langchain", ) create_retriever_tool = create_retriever_tool( retriever=vector_store.as_retriever(), name="Question Search", description="A tool to retrieve similar questions from a vector store.", ) except Exception as e: print(f"Warning: Could not initialize Supabase vector store: {e}") vector_store = None create_retriever_tool = None tools = [ wiki_search, web_search, arvix_search, run_python, ] if create_retriever_tool: tools.append(create_retriever_tool) # --------------------------------------------------------------------------- # Code-interpreter integration helpers # --------------------------------------------------------------------------- from code_agent import run_agent # Executes the compiled code-interpreter graph def _needs_code(state: dict) -> bool: # type: ignore[override] """Heuristic: does *state* look like a coding request?""" messages = state.get("messages", []) if not messages: return False last_content = messages[-1].content.lower() triggers = [ "```python", "write python", "run this code", "file manipulation", "csv", "pandas", "json", "plot", "fibonacci", ] return any(t in last_content for t in triggers) def _code_exec_wrapper(state: dict): # type: ignore[override] """Delegate the user query to the sandboxed Python interpreter.""" # Get the last human message's content (fallback to empty string) human_msgs = [m.content for m in state.get("messages", []) if m.type == "human"] query = "\n\n".join(human_msgs) # Execute code-interpreter with full context (question + attachments) result = run_agent(query) # Persist the raw stdout so we can convert it to an AI message downstream return {"code_result": result} def _code_to_message(state: dict): # type: ignore[override] """Turn the interpreter's stdout into an AIMessage so the LLM can see it.""" from langchain_core.messages import AIMessage # local import to avoid cycles if not state.get("code_result"): return {} return {"messages": [AIMessage(content=state["code_result"])]} # Build graph function def build_graph(provider: str = "groq"): """Build the graph""" # Load environment variables from .env file if provider == "google": # Google Gemini llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0) elif provider == "groq": # Groq https://console.groq.com/docs/models llm = ChatGroq(model="qwen-qwq-32b", temperature= 0.6) # optional : qwen-qwq-32b gemma2-9b-it elif provider == "huggingface": # TODO: Add huggingface endpoint llm = ChatHuggingFace( llm=HuggingFaceEndpoint( url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf", temperature=0, ), ) else: raise ValueError("Invalid provider. Choose 'google', 'groq' or 'huggingface'.") # Bind tools to LLM llm_with_tools = llm.bind_tools(tools) # Node def assistant(state: MessagesState): """Assistant node""" try: print(f"Assistant node: Processing {len(state['messages'])} messages") result = llm_with_tools.invoke(state["messages"]) print(f"Assistant node: LLM returned result type: {type(result)}") return {"messages": [result]} except Exception as e: print(f"Error in assistant node: {e}") error_msg = AIMessage(content=f"I encountered an error: {e}") return {"messages": [error_msg]} def retriever(state: MessagesState): """Retriever node""" try: print(f"Retriever node: Processing {len(state['messages'])} messages") if not state["messages"]: print("Retriever node: No messages in state") return {"messages": [sys_msg]} # Extract the user query content early for downstream steps query_content = state["messages"][0].content # ------------------- NEW: fetch attachment if available ------------------- attachment_msg = None try: resp = requests.get(f"{DEFAULT_API_URL}/questions", timeout=30) resp.raise_for_status() questions = resp.json() matched_task_id = None for q in questions: if str(q.get("question")).strip() == str(query_content).strip(): matched_task_id = str(q.get("task_id")) break if matched_task_id: print(f"Retriever node: Found task_id {matched_task_id} for current question, attempting to download attachment…") file_resp = requests.get(f"{DEFAULT_API_URL}/files/{matched_task_id}", timeout=60) if file_resp.status_code == 200 and file_resp.content: try: file_text = file_resp.content.decode("utf-8", errors="replace") except Exception: file_text = "(binary or non-UTF8 file omitted)" MAX_CHARS = 8000 if len(file_text) > MAX_CHARS: print(f"Retriever node: Attachment length {len(file_text)} > {MAX_CHARS}, truncating…") file_text = file_text[:MAX_CHARS] + "\n… (truncated)" attachment_msg = HumanMessage(content=f"Attached file content for task {matched_task_id}:\n```python\n{file_text}\n```") print("Retriever node: Prepared attachment message") else: print(f"Retriever node: No attachment found for task {matched_task_id} (status {file_resp.status_code})") except Exception as api_e: print(f"Retriever node: Error while fetching attachment – {api_e}") # ------------------------------------------------------------------------- # If vector store unavailable, simply return sys_msg + user message (+ attachment if any) if not vector_store: msgs = [sys_msg] + state["messages"] if attachment_msg: msgs.append(attachment_msg) print("Retriever node: Vector store not available, skipping retrieval") return {"messages": msgs} # Perform similarity search when vector store is available print(f"Retriever node: Searching for similar questions with query: {query_content[:100]}…") similar_question = vector_store.similarity_search(query_content) print(f"Retriever node: Found {len(similar_question)} similar questions") msgs = [sys_msg] + state["messages"] if similar_question: example_msg = HumanMessage(content=f"Here I provide a similar question and answer for reference: \n\n{similar_question[0].page_content}") msgs.append(example_msg) print("Retriever node: Added example message from similar question") else: print("Retriever node: No similar questions found, proceeding without example") # Attach the file content if we have it if attachment_msg: msgs.append(attachment_msg) print("Retriever node: Added attachment content to messages") return {"messages": msgs} except Exception as e: print(f"Error in retriever node: {e}") return {"messages": [sys_msg] + state["messages"]} builder = StateGraph(MessagesState) builder.add_node("retriever", retriever) builder.add_node("assistant", assistant) builder.add_node("tools", ToolNode(tools)) builder.add_node("code_exec", _code_exec_wrapper) builder.add_node("code_to_message", _code_to_message) builder.add_edge(START, "retriever") # Conditional branch: decide whether to run code interpreter builder.add_conditional_edges( "retriever", _needs_code, {True: "code_exec", False: "assistant"}, ) # Flow after code execution: inject result then resume chat builder.add_edge("code_exec", "code_to_message") builder.add_edge("code_to_message", "assistant") builder.add_conditional_edges( "assistant", tools_condition, ) builder.add_edge("tools", "assistant") # Compile graph return builder.compile() # test if __name__ == "__main__": question = "When was a picture of St. Thomas Aquinas first added to the Wikipedia page on the Principle of double effect?" # Build the graph graph = build_graph(provider="groq") # Run the graph messages = [HumanMessage(content=question)] messages = graph.invoke({"messages": messages}, config={"callbacks": [langfuse_handler]}) for m in messages["messages"]: m.pretty_print()