| | """ |
| | PlannerAgent for SPARKNET - LangChain Version |
| | Breaks down complex VISTA scenarios into executable workflows |
| | Uses LangChain chains for structured task decomposition |
| | """ |
| |
|
| | from typing import List, Dict, Optional, Any |
| | from dataclasses import dataclass, field |
| | from loguru import logger |
| | import json |
| | import networkx as nx |
| | from pydantic import BaseModel, Field |
| |
|
| | from langchain_core.prompts import ChatPromptTemplate |
| | from langchain_core.output_parsers import JsonOutputParser |
| | from langchain_core.messages import HumanMessage, SystemMessage |
| |
|
| | from .base_agent import BaseAgent, Task, Message |
| | from ..llm.langchain_ollama_client import LangChainOllamaClient |
| | from ..workflow.langgraph_state import SubTask as SubTaskModel, TaskStatus |
| |
|
| |
|
| | |
| | class TaskDecomposition(BaseModel): |
| | """Structured output from planning chain""" |
| | subtasks: List[Dict[str, Any]] = Field(description="List of subtasks with dependencies") |
| | reasoning: str = Field(description="Explanation of the planning strategy") |
| | estimated_total_duration: float = Field(description="Total estimated duration in seconds") |
| |
|
| |
|
| | @dataclass |
| | class TaskGraph: |
| | """Directed acyclic graph of tasks with dependencies.""" |
| | subtasks: Dict[str, SubTaskModel] = field(default_factory=dict) |
| | graph: nx.DiGraph = field(default_factory=nx.DiGraph) |
| |
|
| | def add_subtask(self, subtask: SubTaskModel): |
| | """Add a subtask to the graph.""" |
| | self.subtasks[subtask.id] = subtask |
| | self.graph.add_node(subtask.id, task=subtask) |
| |
|
| | |
| | for dep_id in subtask.dependencies: |
| | if dep_id in self.subtasks: |
| | self.graph.add_edge(dep_id, subtask.id) |
| |
|
| | def get_execution_order(self) -> List[List[str]]: |
| | """ |
| | Get tasks in execution order (topological sort). |
| | Returns list of lists - inner lists can be executed in parallel. |
| | """ |
| | try: |
| | generations = list(nx.topological_generations(self.graph)) |
| | return generations |
| | except nx.NetworkXError as e: |
| | logger.error(f"Error in topological sort: {e}") |
| | return [] |
| |
|
| | def validate(self) -> bool: |
| | """Validate graph has no cycles.""" |
| | return nx.is_directed_acyclic_graph(self.graph) |
| |
|
| |
|
| | class PlannerAgent(BaseAgent): |
| | """ |
| | Agent specialized in task decomposition and workflow planning. |
| | Uses LangChain chains with qwen2.5:14b for complex reasoning. |
| | """ |
| |
|
| | |
| | SCENARIO_TEMPLATES = { |
| | 'patent_wakeup': { |
| | 'description': 'Analyze dormant patent and create valorization roadmap', |
| | 'stages': [ |
| | { |
| | 'name': 'document_analysis', |
| | 'agent': 'DocumentAnalysisAgent', |
| | 'description': 'Extract and analyze patent content', |
| | 'dependencies': [], |
| | }, |
| | { |
| | 'name': 'market_analysis', |
| | 'agent': 'MarketAnalysisAgent', |
| | 'description': 'Identify market opportunities for patent', |
| | 'dependencies': ['document_analysis'], |
| | }, |
| | { |
| | 'name': 'matchmaking', |
| | 'agent': 'MatchmakingAgent', |
| | 'description': 'Match patent with potential licensees', |
| | 'dependencies': ['document_analysis', 'market_analysis'], |
| | }, |
| | { |
| | 'name': 'outreach', |
| | 'agent': 'OutreachAgent', |
| | 'description': 'Generate valorization brief and outreach materials', |
| | 'dependencies': ['matchmaking'], |
| | }, |
| | ], |
| | }, |
| | 'agreement_safety': { |
| | 'description': 'Review legal agreement for risks and compliance', |
| | 'stages': [ |
| | { |
| | 'name': 'document_parsing', |
| | 'agent': 'LegalAnalysisAgent', |
| | 'description': 'Parse agreement and extract clauses', |
| | 'dependencies': [], |
| | }, |
| | { |
| | 'name': 'compliance_check', |
| | 'agent': 'ComplianceAgent', |
| | 'description': 'Check GDPR and Law 25 compliance', |
| | 'dependencies': ['document_parsing'], |
| | }, |
| | { |
| | 'name': 'risk_assessment', |
| | 'agent': 'RiskAssessmentAgent', |
| | 'description': 'Identify problematic clauses and risks', |
| | 'dependencies': ['document_parsing'], |
| | }, |
| | { |
| | 'name': 'recommendations', |
| | 'agent': 'RecommendationAgent', |
| | 'description': 'Generate improvement suggestions', |
| | 'dependencies': ['compliance_check', 'risk_assessment'], |
| | }, |
| | ], |
| | }, |
| | 'partner_matching': { |
| | 'description': 'Match stakeholders based on complementary capabilities', |
| | 'stages': [ |
| | { |
| | 'name': 'profiling', |
| | 'agent': 'ProfilingAgent', |
| | 'description': 'Extract stakeholder capabilities and needs', |
| | 'dependencies': [], |
| | }, |
| | { |
| | 'name': 'semantic_matching', |
| | 'agent': 'SemanticMatchingAgent', |
| | 'description': 'Find complementary partners using embeddings', |
| | 'dependencies': ['profiling'], |
| | }, |
| | { |
| | 'name': 'network_analysis', |
| | 'agent': 'NetworkAnalysisAgent', |
| | 'description': 'Identify strategic network connections', |
| | 'dependencies': ['profiling'], |
| | }, |
| | { |
| | 'name': 'facilitation', |
| | 'agent': 'ConnectionFacilitatorAgent', |
| | 'description': 'Generate introduction materials', |
| | 'dependencies': ['semantic_matching', 'network_analysis'], |
| | }, |
| | ], |
| | }, |
| | } |
| |
|
| | def __init__( |
| | self, |
| | llm_client: LangChainOllamaClient, |
| | memory_agent: Optional['MemoryAgent'] = None, |
| | temperature: float = 0.7, |
| | ): |
| | """ |
| | Initialize PlannerAgent with LangChain client. |
| | |
| | Args: |
| | llm_client: LangChain Ollama client |
| | memory_agent: Optional memory agent for context |
| | temperature: LLM temperature for planning |
| | """ |
| | self.llm_client = llm_client |
| | self.memory_agent = memory_agent |
| | self.temperature = temperature |
| |
|
| | |
| | self.planning_chain = self._create_planning_chain() |
| | self.refinement_chain = self._create_refinement_chain() |
| |
|
| | |
| | self.name = "PlannerAgent" |
| | self.description = "Task decomposition and workflow planning" |
| |
|
| | logger.info(f"Initialized PlannerAgent with LangChain (complexity: complex)") |
| |
|
| | def _create_planning_chain(self): |
| | """ |
| | Create LangChain chain for task decomposition. |
| | |
| | Returns: |
| | Runnable chain: prompt | llm | parser |
| | """ |
| | system_template = """You are a strategic planning agent for research valorization tasks. |
| | |
| | Your role is to: |
| | 1. Analyze complex tasks and break them into manageable subtasks |
| | 2. Identify dependencies between subtasks |
| | 3. Assign appropriate agents to each subtask |
| | 4. Estimate task complexity and duration |
| | 5. Create optimal execution plans |
| | |
| | Available agent types: |
| | - ExecutorAgent: General task execution |
| | - DocumentAnalysisAgent: Analyze patents and documents |
| | - MarketAnalysisAgent: Market research and opportunity identification |
| | - MatchmakingAgent: Stakeholder matching and connections |
| | - OutreachAgent: Generate outreach materials and briefs |
| | - LegalAnalysisAgent: Legal document analysis |
| | - ComplianceAgent: Compliance checking (GDPR, Law 25) |
| | - RiskAssessmentAgent: Risk identification |
| | - ProfilingAgent: Stakeholder profiling |
| | - SemanticMatchingAgent: Semantic similarity matching |
| | - NetworkAnalysisAgent: Network and relationship analysis |
| | |
| | Output your plan as a structured JSON object with: |
| | - subtasks: List of subtask objects with id, description, agent_type, dependencies, estimated_duration, priority |
| | - reasoning: Your strategic reasoning for this decomposition |
| | - estimated_total_duration: Total estimated time in seconds""" |
| |
|
| | human_template = """Given the following task, create a detailed execution plan: |
| | |
| | Task: {task_description} |
| | |
| | {context_section} |
| | |
| | Break this down into specific subtasks. For each subtask: |
| | - Give it a unique ID (use snake_case) |
| | - Describe what needs to be done |
| | - Specify which agent type should handle it |
| | - List any dependencies (IDs of tasks that must complete first) |
| | - Estimate duration in seconds |
| | - Set priority (1=highest) |
| | |
| | Think step-by-step about: |
| | - What is the ultimate goal? |
| | - What information is needed? |
| | - What are the logical stages? |
| | - Which subtasks can run in parallel? |
| | - What are the critical dependencies? |
| | |
| | Output JSON only.""" |
| |
|
| | prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_template), |
| | ("human", human_template) |
| | ]) |
| |
|
| | |
| | llm = self.llm_client.get_llm(complexity="complex", temperature=self.temperature) |
| |
|
| | |
| | parser = JsonOutputParser(pydantic_object=TaskDecomposition) |
| |
|
| | |
| | chain = prompt | llm | parser |
| |
|
| | return chain |
| |
|
| | def _create_refinement_chain(self): |
| | """ |
| | Create LangChain chain for replanning based on feedback. |
| | |
| | Returns: |
| | Runnable chain for refinement |
| | """ |
| | system_template = """You are refining an existing task plan based on feedback. |
| | |
| | Your role is to: |
| | 1. Review the original plan and feedback |
| | 2. Identify what went wrong or could be improved |
| | 3. Create an improved plan that addresses the issues |
| | 4. Maintain successful elements from the original plan |
| | |
| | Be thoughtful about what to change and what to keep.""" |
| |
|
| | human_template = """Refine the following plan based on feedback: |
| | |
| | Original Task: {task_description} |
| | |
| | Original Plan: |
| | {original_plan} |
| | |
| | Feedback from execution: |
| | {feedback} |
| | |
| | Issues encountered: |
| | {issues} |
| | |
| | Create an improved plan that addresses these issues while maintaining what worked well. |
| | Output JSON in the same format as before.""" |
| |
|
| | prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_template), |
| | ("human", human_template) |
| | ]) |
| |
|
| | llm = self.llm_client.get_llm(complexity="complex", temperature=self.temperature) |
| | parser = JsonOutputParser(pydantic_object=TaskDecomposition) |
| |
|
| | chain = prompt | llm | parser |
| |
|
| | return chain |
| |
|
| | async def process_task(self, task: Task) -> Task: |
| | """ |
| | Process planning task by decomposing into workflow. |
| | |
| | Args: |
| | task: High-level task to plan |
| | |
| | Returns: |
| | Updated task with plan in result |
| | """ |
| | logger.info(f"PlannerAgent planning task: {task.id}") |
| | task.status = "in_progress" |
| |
|
| | try: |
| | |
| | scenario = task.metadata.get('scenario') if task.metadata else None |
| |
|
| | if scenario and scenario in self.SCENARIO_TEMPLATES: |
| | |
| | logger.info(f"Using template for scenario: {scenario}") |
| | task_graph = await self._plan_from_template(task, scenario) |
| | else: |
| | |
| | logger.info("Using LangChain planning for custom task") |
| | task_graph = await self._plan_with_langchain(task) |
| |
|
| | |
| | if not task_graph.validate(): |
| | raise ValueError("Generated task graph contains cycles!") |
| |
|
| | |
| | task.result = { |
| | 'task_graph': task_graph, |
| | 'execution_order': task_graph.get_execution_order(), |
| | 'total_subtasks': len(task_graph.subtasks), |
| | } |
| | task.status = "completed" |
| |
|
| | logger.info(f"Planning completed: {len(task_graph.subtasks)} subtasks") |
| |
|
| | except Exception as e: |
| | logger.error(f"Planning failed: {e}") |
| | task.status = "failed" |
| | task.error = str(e) |
| |
|
| | return task |
| |
|
| | async def _plan_from_template(self, task: Task, scenario: str) -> TaskGraph: |
| | """ |
| | Create task graph from scenario template. |
| | |
| | Args: |
| | task: Original task |
| | scenario: Scenario identifier |
| | |
| | Returns: |
| | TaskGraph based on template |
| | """ |
| | template = self.SCENARIO_TEMPLATES[scenario] |
| | task_graph = TaskGraph() |
| |
|
| | |
| | params = task.metadata.get('parameters', {}) if task.metadata else {} |
| |
|
| | |
| | for i, stage in enumerate(template['stages']): |
| | subtask = SubTaskModel( |
| | id=f"{task.id}_{stage['name']}", |
| | description=stage['description'], |
| | agent_type=stage['agent'], |
| | dependencies=[f"{task.id}_{dep}" for dep in stage['dependencies']], |
| | estimated_duration=30.0, |
| | priority=i + 1, |
| | parameters=params, |
| | status=TaskStatus.PENDING |
| | ) |
| | task_graph.add_subtask(subtask) |
| |
|
| | logger.debug(f"Created task graph with {len(task_graph.subtasks)} subtasks from template") |
| |
|
| | return task_graph |
| |
|
| | async def _plan_with_langchain(self, task: Task, context: Optional[List[Any]] = None) -> TaskGraph: |
| | """ |
| | Create task graph using LangChain planning chain. |
| | |
| | Args: |
| | task: Original task |
| | context: Optional context from memory |
| | |
| | Returns: |
| | TaskGraph generated by LangChain |
| | """ |
| | |
| | context_section = "" |
| | if context and len(context) > 0: |
| | context_section = "Relevant past experiences:\n" |
| | for i, ctx in enumerate(context[:3], 1): |
| | context_section += f"{i}. {ctx.page_content[:200]}...\n" |
| |
|
| | |
| | try: |
| | result = await self.planning_chain.ainvoke({ |
| | "task_description": task.description, |
| | "context_section": context_section |
| | }) |
| |
|
| | |
| | task_graph = TaskGraph() |
| |
|
| | for subtask_data in result.get('subtasks', []): |
| | subtask = SubTaskModel( |
| | id=f"{task.id}_{subtask_data.get('id', f'subtask_{len(task_graph.subtasks)}')}", |
| | description=subtask_data.get('description', ''), |
| | agent_type=subtask_data.get('agent_type', 'ExecutorAgent'), |
| | dependencies=[f"{task.id}_{dep}" for dep in subtask_data.get('dependencies', [])], |
| | estimated_duration=subtask_data.get('estimated_duration', 30.0), |
| | priority=subtask_data.get('priority', 0), |
| | parameters=subtask_data.get('parameters', {}), |
| | status=TaskStatus.PENDING |
| | ) |
| | task_graph.add_subtask(subtask) |
| |
|
| | logger.debug(f"Created task graph with {len(task_graph.subtasks)} subtasks from LangChain") |
| |
|
| | return task_graph |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to parse LangChain planning response: {e}") |
| | raise ValueError(f"Failed to generate plan: {e}") |
| |
|
| | async def decompose_task( |
| | self, |
| | task_description: str, |
| | scenario: Optional[str] = None, |
| | context: Optional[List[Any]] = None |
| | ) -> TaskGraph: |
| | """ |
| | Decompose a high-level task into subtasks. |
| | |
| | Args: |
| | task_description: Natural language description |
| | scenario: Optional scenario identifier |
| | context: Optional context from memory |
| | |
| | Returns: |
| | TaskGraph with subtasks and dependencies |
| | """ |
| | |
| | task = Task( |
| | id=f"plan_{hash(task_description) % 10000}", |
| | description=task_description, |
| | metadata={'scenario': scenario} if scenario else {}, |
| | ) |
| |
|
| | |
| | result_task = await self.process_task(task) |
| |
|
| | if result_task.status == "completed" and result_task.result: |
| | return result_task.result['task_graph'] |
| | else: |
| | raise RuntimeError(f"Planning failed: {result_task.error}") |
| |
|
| | async def adapt_plan( |
| | self, |
| | task_graph: TaskGraph, |
| | feedback: str, |
| | issues: List[str] |
| | ) -> TaskGraph: |
| | """ |
| | Adapt an existing plan based on execution feedback. |
| | |
| | Args: |
| | task_graph: Original task graph |
| | feedback: Feedback from execution |
| | issues: List of issues encountered |
| | |
| | Returns: |
| | Updated task graph |
| | """ |
| | logger.info("Adapting plan based on feedback") |
| |
|
| | |
| | original_plan = { |
| | "subtasks": [ |
| | { |
| | "id": st.id, |
| | "description": st.description, |
| | "agent_type": st.agent_type, |
| | "dependencies": st.dependencies |
| | } |
| | for st in task_graph.subtasks.values() |
| | ] |
| | } |
| |
|
| | try: |
| | |
| | result = await self.refinement_chain.ainvoke({ |
| | "task_description": "Refine task decomposition", |
| | "original_plan": json.dumps(original_plan, indent=2), |
| | "feedback": feedback, |
| | "issues": "\n".join(f"- {issue}" for issue in issues) |
| | }) |
| |
|
| | |
| | new_task_graph = TaskGraph() |
| |
|
| | for subtask_data in result.get('subtasks', []): |
| | subtask = SubTaskModel( |
| | id=subtask_data.get('id', f'subtask_{len(new_task_graph.subtasks)}'), |
| | description=subtask_data.get('description', ''), |
| | agent_type=subtask_data.get('agent_type', 'ExecutorAgent'), |
| | dependencies=subtask_data.get('dependencies', []), |
| | estimated_duration=subtask_data.get('estimated_duration', 30.0), |
| | priority=subtask_data.get('priority', 0), |
| | parameters=subtask_data.get('parameters', {}), |
| | status=TaskStatus.PENDING |
| | ) |
| | new_task_graph.add_subtask(subtask) |
| |
|
| | logger.info(f"Plan adapted: {len(new_task_graph.subtasks)} subtasks") |
| | return new_task_graph |
| |
|
| | except Exception as e: |
| | logger.error(f"Plan adaptation failed: {e}, returning original plan") |
| | return task_graph |
| |
|
| | def get_parallel_tasks(self, task_graph: TaskGraph) -> List[List[SubTaskModel]]: |
| | """ |
| | Get tasks that can be executed in parallel. |
| | |
| | Args: |
| | task_graph: Task graph |
| | |
| | Returns: |
| | List of parallel task groups |
| | """ |
| | execution_order = task_graph.get_execution_order() |
| | parallel_groups = [] |
| |
|
| | for task_ids in execution_order: |
| | group = [task_graph.subtasks[task_id] for task_id in task_ids] |
| | parallel_groups.append(group) |
| |
|
| | return parallel_groups |
| |
|