Spaces:
Runtime error
Runtime error
| import smtplib | |
| from typing import Annotated | |
| from typing_extensions import TypedDict | |
| from langgraph.graph import StateGraph,START,END | |
| from langgraph.prebuilt import ToolNode | |
| from langgraph.prebuilt import tools_condition | |
| from langgraph.graph.message import add_messages | |
| from langchain_core.tools import tool | |
| from dotenv import load_dotenv | |
| from sendgrid import SendGridAPIClient | |
| from sendgrid.helpers.mail import Mail | |
| from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, ToolMessage | |
| from langchain_tavily import TavilySearch | |
| from langchain_groq import ChatGroq | |
| from langgraph.checkpoint.memory import MemorySaver | |
| from langgraph.types import Command, interrupt | |
| from drive_tools import search_and_download_doc_tool, AUTH_REQUIRED_PREFIX | |
| import os | |
| # ==================== LOAD ENV ======================= | |
| # memory = MemorySaver() | |
| load_dotenv() | |
| base_model = "openai/gpt-oss-120b" | |
| api = os.getenv("GROQ_API_KEY") | |
| tavily = os.getenv("TAVILY_API") | |
| SENDER_EMAIL = os.getenv("SENDER_EMAIL") | |
| SENDER_PASSWORD = os.getenv("SENDER_PASSWORD") | |
| SENDGRID_API_KEY = os.getenv("SENDGRID_API_KEY") | |
| # ==================== LLM ======================= | |
| llm = ChatGroq( | |
| api_key = api, | |
| model = base_model, | |
| temperature=0.3 | |
| ) | |
| # ==================== TOOL ======================= | |
| def send_email_tool(to_email: str, subject: str, body: str) -> str: | |
| """ | |
| Sends an email to a recipient. | |
| Args: | |
| to_email: Recipient email address | |
| subject: Email subject | |
| body: Email body content | |
| """ | |
| try: | |
| message = Mail( | |
| from_email=SENDER_EMAIL, | |
| to_emails=to_email, | |
| subject=subject, | |
| plain_text_content=body, | |
| ) | |
| sg = SendGridAPIClient(SENDGRID_API_KEY) | |
| sg.send(message) | |
| return f"Email successfully sent to {to_email}" | |
| except Exception as e: | |
| return f"Failed to send email: {str(e)}" | |
| tools = [search_and_download_doc_tool, send_email_tool] | |
| llm_with_tools = llm.bind_tools(tools) | |
| # ==================== STATE ======================= | |
| class State(TypedDict): | |
| messages: Annotated[list,add_messages] | |
| # ==================== NODES ======================= | |
| def chatbot(state:State): | |
| response = llm_with_tools.invoke([ | |
| SystemMessage(content=""" | |
| You are an AI assistant with access to tools. | |
| Available tools: | |
| 1. send_email_tool β Use when the user wants to send an email. | |
| 2. search_and_download_doc_tool β Use when the user wants to find or download a document from Google Drive. | |
| Rules: | |
| - Always call the appropriate tool when the request requires action. | |
| - Do NOT respond with plain text if an action is required. | |
| - After tool execution, summarize the result for the user. | |
| """), | |
| *state["messages"] | |
| ]) | |
| return {"messages":[response]} | |
| def handle_tools(state: State): | |
| """ | |
| Custom tool-execution node that intercepts AUTH_REQUIRED:: sentinels | |
| from the Drive tool and surfaces them via LangGraph interrupt instead | |
| of letting the agent loop silently. | |
| """ | |
| last_message: AIMessage = state["messages"][-1] | |
| results = [] | |
| for tool_call in last_message.tool_calls: | |
| # ββ Run the tool ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| matched_tool = next( | |
| (t for t in tools if t.name == tool_call["name"]), None | |
| ) | |
| if matched_tool is None: | |
| result_content = f"Unknown tool: {tool_call['name']}" | |
| else: | |
| result_content = matched_tool.invoke(tool_call["args"]) | |
| # ββ Auth-gate check βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if isinstance(result_content, str) and result_content.startswith(AUTH_REQUIRED_PREFIX): | |
| # Extract the OAuth URL (everything after the sentinel prefix, first line) | |
| first_line = result_content.split("\n")[0] | |
| auth_url = first_line.removeprefix(AUTH_REQUIRED_PREFIX).strip() | |
| # Interrupt the graph and surface the URL to the front-end | |
| # The interrupt value is returned to whoever is streaming the graph. | |
| interrupt({ | |
| "type": "auth_required", | |
| "auth_url": auth_url, | |
| "message": ( | |
| "π Google Drive access is required. " | |
| "Please authenticate by visiting the link below, then retry your request.\n\n" | |
| f"π {auth_url}" | |
| ), | |
| }) | |
| # After the user resumes (post-OAuth), return a helpful ToolMessage | |
| result_content = ( | |
| "Authentication flow initiated. Once you have completed Google sign-in, " | |
| "please repeat your Drive request." | |
| ) | |
| results.append( | |
| ToolMessage( | |
| content=result_content, | |
| tool_call_id=tool_call["id"], | |
| ) | |
| ) | |
| return {"messages": results} | |
| # ==================== GRAPH ======================= | |
| # Adding Node | |
| memory = MemorySaver() | |
| graph_builder=StateGraph(State) | |
| graph_builder.add_node("chatbot", chatbot) | |
| tool_node = ToolNode(tools=tools) | |
| graph_builder.add_node("tools", tool_node) | |
| graph_builder.add_edge(START, "chatbot") | |
| graph_builder.add_conditional_edges( | |
| "chatbot", | |
| tools_condition, | |
| { | |
| "tools": "tools", | |
| "__end__": END | |
| } | |
| ) | |
| graph_builder.add_edge("tools","chatbot") | |
| graph=graph_builder.compile(checkpointer=memory) | |
| # ==================== ENTRY FUNCTION ======================= | |
| def run_agent(user_input: str): | |
| result = graph.invoke({ | |
| "messages": [HumanMessage(content=user_input)] | |
| }) | |
| return result["messages"][-1].content | |
| # Start | |
| # LLM + promt -> Chatbot | |
| # | |
| # | |
| # | |
| # External Yools Tool Node | |
| # Make a tool Call || Tavily || -><- | |
| # | |
| # | |
| # End | |
| # How does chatbot know the when to use tools ? | |
| # LLM binds with the tools : | |
| # Addition function added as tool to the LLM | |
| # Doc String is used to know what are thre inputs and arguments : If they match LLM make a call to Tool | |
| # Instead of relying on its own response. | |
| # | |
| # ReACT agent aplits the query into multiple statements and repeatedly solves query part by part | |
| # 1. Act | |
| # 2. Observe | |
| # 3. Reason | |
| # # | |
| # Binding tools with LLMs # | |
| # Stategraph | |
| # tokenizer = AutoTokenizer.from_pretrained( | |
| # base_model, | |
| # use_auth_token=api, | |
| # cache_dir=local_dir, | |
| # ) | |
| # model = AutoModelForCausalLM.from_pretrained( | |
| # base_model, | |
| # use_auth_token=api, | |
| # torch_dtype=torch.float16, | |
| # cache_dir=local_dir, | |
| # device_map="auto", | |
| # ) | |
| # hf_pipeline = pipeline( | |
| # "text-generation", | |
| # model=model, | |
| # tokenizer=tokenizer, | |
| # max_new_tokens=512, | |
| # do_sample=True, | |
| # temperature=0.7 | |
| # ) | |
| # hf_llm = HuggingFacePipeline(pipeline=hf_pipeline) |