Spaces:
Sleeping
Sleeping
File size: 4,460 Bytes
f511b0b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
from langchain_core.messages import HumanMessage
from langchain_core.runnables import RunnableConfig
from prompt import user_goal_prompt
from langgraph.prebuilt import create_react_agent
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_google_genai import ChatGoogleGenerativeAI
from typing import Optional, Tuple, Any, Callable
import asyncio
cfg = RunnableConfig(recursion_limit=100)
def initialize_model(google_api_key: str) -> ChatGoogleGenerativeAI:
return ChatGoogleGenerativeAI(
model="gemini-2.5-flash",
google_api_key=google_api_key
)
async def setup_agent_with_tools(
google_api_key: str,
youtube_pipedream_url: str,
drive_pipedream_url: Optional[str] = None,
notion_pipedream_url: Optional[str] = None,
progress_callback: Optional[Callable[[str], None]] = None
) -> Any:
"""
Set up the agent with YouTube (mandatory) and optional Drive or Notion tools.
"""
try:
if progress_callback:
progress_callback("Setting up agent with tools... β
")
# Initialize tools configuration with mandatory YouTube
tools_config = {
"youtube": {
"url": youtube_pipedream_url,
"transport": "streamable_http"
}
}
# Add Drive if URL provided
if drive_pipedream_url:
tools_config["drive"] = {
"url": drive_pipedream_url,
"transport": "streamable_http"
}
if progress_callback:
progress_callback("Added Google Drive integration... β
")
# Add Notion if URL provided
if notion_pipedream_url:
tools_config["notion"] = {
"url": notion_pipedream_url,
"transport": "streamable_http"
}
if progress_callback:
progress_callback("Added Notion integration... β
")
if progress_callback:
progress_callback("Initializing MCP client... β
")
# Initialize MCP client with configured tools
mcp_client = MultiServerMCPClient(tools_config)
if progress_callback:
progress_callback("Getting available tools... β
")
# Get all tools
tools = await mcp_client.get_tools()
if progress_callback:
progress_callback("Creating AI agent... β
")
# Create agent with initialized model
mcp_orch_model = initialize_model(google_api_key)
agent = create_react_agent(mcp_orch_model, tools)
if progress_callback:
progress_callback("Setup complete! Starting to generate learning path... β
")
return agent
except Exception as e:
print(f"Error in setup_agent_with_tools: {str(e)}")
raise
def run_agent_sync(
google_api_key: str,
youtube_pipedream_url: str,
drive_pipedream_url: Optional[str] = None,
notion_pipedream_url: Optional[str] = None,
user_goal: str = "",
progress_callback: Optional[Callable[[str], None]] = None
) -> dict:
"""
Synchronous wrapper for running the agent.
"""
async def _run():
try:
agent = await setup_agent_with_tools(
google_api_key=google_api_key,
youtube_pipedream_url=youtube_pipedream_url,
drive_pipedream_url=drive_pipedream_url,
notion_pipedream_url=notion_pipedream_url,
progress_callback=progress_callback
)
# Combine user goal with prompt template
learning_path_prompt = "User Goal: " + user_goal + "\n" + user_goal_prompt
if progress_callback:
progress_callback("Generating your learning path...")
# Run the agent
result = await agent.ainvoke(
{"messages": [HumanMessage(content=learning_path_prompt)]},
config=cfg
)
if progress_callback:
progress_callback("Learning path generation complete!")
return result
except Exception as e:
print(f"Error in _run: {str(e)}")
raise
# Run in new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(_run())
finally:
loop.close()
|