Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Prompt Reconstructor for Agent Monitoring | |
| This module analyzes knowledge graphs to reconstruct the prompts used between components. | |
| It's used to prepare knowledge graphs for perturbation testing. | |
| """ | |
| import json | |
| import re | |
| import uuid | |
| from typing import Dict, List, Any, Optional, Union | |
| from datetime import datetime | |
| import logging | |
| import itertools | |
| from collections import defaultdict | |
| import copy | |
| import traceback | |
| # Configure logging for this module | |
| logger = logging.getLogger(__name__) | |
| class PromptReconstructor: | |
| def __init__(self, knowledge_graph: Dict[str, Any]): | |
| """ | |
| Initialize a PromptReconstructor with knowledge graph data. | |
| Args: | |
| knowledge_graph (Dict[str, Any]): Knowledge graph data with entities and relations | |
| """ | |
| if not knowledge_graph or 'entities' not in knowledge_graph or 'relations' not in knowledge_graph: | |
| raise ValueError("Invalid knowledge graph data - must contain 'entities' and 'relations'") | |
| self.kg = knowledge_graph | |
| # Create lookup dictionaries for efficient access | |
| self.entities = {entity["id"]: entity for entity in self.kg["entities"]} | |
| self.relations = {} | |
| self.relations_by_source = {} | |
| self.relations_by_target = {} | |
| # Organize relations for lookup | |
| for relation in self.kg["relations"]: | |
| self.relations[relation["id"]] = relation | |
| # Group relations by source and target | |
| if relation["source"] not in self.relations_by_source: | |
| self.relations_by_source[relation["source"]] = [] | |
| self.relations_by_source[relation["source"]].append(relation) | |
| if relation["target"] not in self.relations_by_target: | |
| self.relations_by_target[relation["target"]] = [] | |
| self.relations_by_target[relation["target"]].append(relation) | |
| logger.info(f"Successfully initialized PromptReconstructor with {len(self.entities)} entities and {len(self.relations)} relations") | |
| def get_tool_definitions(self, agent_id: str) -> List[str]: | |
| """Get tool definitions for tools used by an agent.""" | |
| tool_definitions = [] | |
| dependencies = {"entities": set(), "relations": set()} | |
| # No relations for this agent | |
| if agent_id not in self.relations_by_source: | |
| return tool_definitions, dependencies | |
| # Find USES relations for this agent | |
| for relation in self.relations_by_source[agent_id]: | |
| if relation["type"] == "USES": | |
| tool_id = relation["target"] | |
| if tool_id in self.entities and self.entities[tool_id]["type"] == "Tool": | |
| tool = self.entities[tool_id] | |
| # Add the raw prompt of the tool which contains its definition | |
| if tool.get("raw_prompt"): | |
| tool_definitions.append(tool["raw_prompt"]) | |
| # Track dependencies | |
| dependencies["entities"].add(tool_id) | |
| dependencies["relations"].add(relation["id"]) | |
| return tool_definitions, dependencies | |
| def enhance_with_required_tools(self, task_id: str) -> tuple: | |
| """Get information about tools required by this task to enhance prompt reconstruction.""" | |
| if task_id not in self.entities or self.entities[task_id]["type"] != "Task": | |
| return "", {"entities": set(), "relations": set()} | |
| required_tools_info = "" | |
| dependencies = {"entities": set(), "relations": set()} | |
| # Find all REQUIRES_TOOL relations for this task | |
| if task_id in self.relations_by_source: | |
| required_tools = [] | |
| for relation in self.relations_by_source[task_id]: | |
| if relation["type"] == "REQUIRES_TOOL": | |
| tool_id = relation["target"] | |
| if tool_id in self.entities and self.entities[tool_id]["type"] == "Tool": | |
| tool = self.entities[tool_id] | |
| # Add tool name and usage instruction if available | |
| tool_desc = f"- {tool['name']}" | |
| if relation.get("interaction_prompt"): | |
| tool_desc += f" (Use as directed: {relation['interaction_prompt']})" | |
| required_tools.append(tool_desc) | |
| # Track dependencies | |
| dependencies["entities"].add(tool_id) | |
| dependencies["relations"].add(relation["id"]) | |
| if required_tools: | |
| required_tools_info = "\nRequired tools for this task:\n" + "\n".join(required_tools) | |
| return required_tools_info, dependencies | |
| def get_task_sequence_info(self, task_id: str) -> tuple: | |
| """Get information about task sequencing (previous and next tasks).""" | |
| sequence_info = { | |
| "previous_task": "", | |
| "next_task": "" | |
| } | |
| dependencies = {"entities": set(), "relations": set()} | |
| # Find previous task (task that has NEXT relation to this task) | |
| if task_id in self.relations_by_target: | |
| for relation in self.relations_by_target[task_id]: | |
| if relation["type"] == "NEXT": | |
| prev_task_id = relation["source"] | |
| if prev_task_id in self.entities and self.entities[prev_task_id]["type"] == "Task": | |
| prev_task = self.entities[prev_task_id] | |
| prev_info = f"This task follows: {prev_task['name']}" | |
| if relation.get("interaction_prompt"): | |
| prev_info += f" ({relation['interaction_prompt']})" | |
| sequence_info["previous_task"] = prev_info | |
| # Track dependencies | |
| dependencies["entities"].add(prev_task_id) | |
| dependencies["relations"].add(relation["id"]) | |
| break | |
| # Find next task (task that this task has NEXT relation to) | |
| if task_id in self.relations_by_source: | |
| for relation in self.relations_by_source[task_id]: | |
| if relation["type"] == "NEXT": | |
| next_task_id = relation["target"] | |
| if next_task_id in self.entities and self.entities[next_task_id]["type"] == "Task": | |
| next_task = self.entities[next_task_id] | |
| next_info = f"After this task: {next_task['name']}" | |
| if relation.get("interaction_prompt"): | |
| next_info += f" ({relation['interaction_prompt']})" | |
| sequence_info["next_task"] = next_info | |
| # Track dependencies | |
| dependencies["entities"].add(next_task_id) | |
| dependencies["relations"].add(relation["id"]) | |
| break | |
| return sequence_info, dependencies | |
| def reconstruct_relation_prompt(self, relation_id: str) -> Dict[str, Any]: | |
| """Reconstruct the actual prompt that would have been sent during system execution for a specific relation.""" | |
| if relation_id not in self.relations: | |
| return {"error": f"Relation {relation_id} not found in knowledge graph"} | |
| relation = self.relations[relation_id] | |
| source_id = relation["source"] | |
| target_id = relation["target"] | |
| relation_type = relation["type"] | |
| # Initialize dependency tracking | |
| dependencies = { | |
| "entities": {source_id, target_id}, # Always include source and target entities | |
| "relations": {relation_id} # Always include the current relation | |
| } | |
| # Check if source and target entities exist | |
| if source_id not in self.entities or target_id not in self.entities: | |
| return {"error": f"Source or target entity for relation {relation_id} not found"} | |
| source = self.entities[source_id] | |
| target = self.entities[target_id] | |
| # Basic information about the relation | |
| result = { | |
| "relation_id": relation_id, | |
| "relation_type": relation_type, | |
| "source": { | |
| "id": source_id, | |
| "name": source["name"], | |
| "type": source["type"] | |
| }, | |
| "target": { | |
| "id": target_id, | |
| "name": target["name"], | |
| "type": target["type"] | |
| }, | |
| "description": relation.get("description", ""), | |
| "status": relation.get("status", None), | |
| "interaction_prompt": relation.get("interaction_prompt", ""), | |
| "timestamp": relation.get("start_time", "Unknown") | |
| } | |
| # Construct the prompt that would have been actually sent based on relation type | |
| if relation_type == "PERFORMS": | |
| # Agent performs Task - This represents when an agent receives a task to execute | |
| if source["type"] == "Agent" and target["type"] == "Task": | |
| # Start with the agent's system prompt | |
| agent_prompt = source.get("raw_prompt", f"You are {source['name']}.") | |
| # Get tool definitions this agent has access to | |
| tool_definitions, tool_deps = self.get_tool_definitions(source_id) | |
| dependencies["entities"].update(tool_deps["entities"]) | |
| dependencies["relations"].update(tool_deps["relations"]) | |
| tool_section = "" | |
| if tool_definitions: | |
| tool_section = "\n\nYou have access to the following tools:\n" + "\n\n".join(tool_definitions) | |
| # Get the task description the agent would receive | |
| task_prompt = target.get("raw_prompt", f"{target['name']}") | |
| # Get specific interaction prompt if available, which represents the actual task message | |
| interaction = relation.get("interaction_prompt", "") | |
| # Enhance with required tools information | |
| required_tools_info, req_tools_deps = self.enhance_with_required_tools(target_id) | |
| dependencies["entities"].update(req_tools_deps["entities"]) | |
| dependencies["relations"].update(req_tools_deps["relations"]) | |
| # Add sequence context to help agent understand workflow | |
| sequence_info, seq_deps = self.get_task_sequence_info(target_id) | |
| dependencies["entities"].update(seq_deps["entities"]) | |
| dependencies["relations"].update(seq_deps["relations"]) | |
| sequence_context = "" | |
| if sequence_info["previous_task"] or sequence_info["next_task"]: | |
| sequence_context = "\n" | |
| if sequence_info["previous_task"]: | |
| sequence_context += sequence_info["previous_task"] + ". " | |
| if sequence_info["next_task"]: | |
| sequence_context += sequence_info["next_task"] + "." | |
| # Construct what the agent would actually receive during execution | |
| # Format: Agent system prompt + tools + task as user message + additional context | |
| task_message = interaction if interaction else task_prompt | |
| # Enhanced system prompt with realistic agent reasoning patterns | |
| system_role = f"system: You are {source['name']}. " | |
| system_description = source.get("description", "") | |
| if system_description: | |
| system_role += f"You're an expert in {system_description.split(' responsible for ')[0].lower() if ' responsible for ' in system_description else system_description.lower()}.\n" | |
| system_role += f"Your personal goal is: {system_description}\n\n" | |
| else: | |
| system_role += "\n\n" | |
| # Add realistic agent behavior instructions based on real traces | |
| system_role += "CRITICAL INSTRUCTIONS:\n" | |
| system_role += "1. You must analyze the task step by step before taking action\n" | |
| system_role += "2. Always reference the user's exact request in your reasoning\n" | |
| system_role += "3. Break down complex tasks into sequential steps\n" | |
| system_role += "4. Choose appropriate tools for each step and explain your reasoning\n" | |
| system_role += "5. Be aware of system constraints and adapt accordingly\n\n" | |
| system_role += "You ONLY have access to the following tools, and should NEVER make up tools that are not listed here:\n\n" | |
| # Place system prompt first | |
| complete_prompt = system_role | |
| # Enhanced tool definitions processing with fallback handling | |
| tool_names_for_action = [] | |
| if tool_definitions: | |
| for tool_def in tool_definitions: | |
| # Extract tool name, args, and description with improved parsing | |
| tool_name = "" | |
| tool_args = "{}" | |
| tool_desc = "" | |
| # Check if tool definition follows expected format | |
| if "Tool Name:" in tool_def and "Tool Arguments:" in tool_def and "Tool Description:" in tool_def: | |
| # Parse structured format | |
| if "Tool Name:" in tool_def: | |
| name_start = tool_def.find("Tool Name:") + len("Tool Name:") | |
| name_end = tool_def.find("\n", name_start) if "\n" in tool_def[name_start:] else len(tool_def) | |
| tool_name = tool_def[name_start:name_end].strip() | |
| if "Tool Arguments:" in tool_def: | |
| args_start = tool_def.find("Tool Arguments:") + len("Tool Arguments:") | |
| args_end = tool_def.find("\n", args_start) if "\n" in tool_def[args_start:] else len(tool_def) | |
| tool_args = tool_def[args_start:args_end].strip() | |
| if "Tool Description:" in tool_def: | |
| desc_start = tool_def.find("Tool Description:") + len("Tool Description:") | |
| desc_end = len(tool_def) # Take rest of the text for description | |
| tool_desc = tool_def[desc_start:desc_end].strip() | |
| else: | |
| # Fallback: Extract from tool entity if structured format not available | |
| # Find the tool entity that matches this definition | |
| for entity_id, entity in self.entities.items(): | |
| if entity["type"] == "Tool" and entity.get("raw_prompt") == tool_def: | |
| tool_name = entity["name"] | |
| # Generate reasonable arguments based on tool purpose | |
| tool_desc = tool_def | |
| # Create basic arguments structure | |
| if "search" in tool_name.lower(): | |
| tool_args = '{"query": "search_term", "max_results": 10}' | |
| elif "load" in tool_name.lower() or "read" in tool_name.lower(): | |
| tool_args = '{"file_path": "path/to/file", "format": "auto"}' | |
| elif "calculat" in tool_name.lower() or "analyz" in tool_name.lower(): | |
| tool_args = '{"data": "input_data", "method": "default"}' | |
| elif "validat" in tool_name.lower() or "check" in tool_name.lower(): | |
| tool_args = '{"data": "target_data", "criteria": "standard"}' | |
| else: | |
| tool_args = '{"input": "data"}' | |
| break | |
| # If still no tool name found, use a generic name | |
| if not tool_name: | |
| tool_name = "UnknownTool" | |
| tool_desc = tool_def if tool_def else "Tool description not available" | |
| tool_args = '{"input": "data"}' | |
| # Add to tool names list for action format | |
| if tool_name and tool_name != "UnknownTool": | |
| tool_names_for_action.append(tool_name) | |
| # Format the tool entry | |
| complete_prompt += f"Tool Name: {tool_name}\n" | |
| complete_prompt += f"Tool Arguments: {tool_args}\n" | |
| complete_prompt += f"Tool Description: {tool_desc}\n\n" | |
| else: | |
| # No tools available - provide clear message | |
| complete_prompt += "No tools are currently available for this task.\n\n" | |
| # Enhanced response format with realistic agent reasoning patterns | |
| complete_prompt += "RESPONSE FORMAT - Follow this structure for each step:\n\n" | |
| complete_prompt += "```\n" | |
| complete_prompt += "Reasoning: [Analyze the current situation and explain your thinking process]\n" | |
| complete_prompt += "Task Analysis: [Break down what needs to be done and identify requirements]\n" | |
| complete_prompt += "Tool Selection: [Choose appropriate tool and justify why]\n" | |
| complete_prompt += "Action: [Tool name from: " + ", ".join(tool_names_for_action) + "]\n" | |
| complete_prompt += "Action Input: [JSON object with parameters, using \" for keys and values]\n" | |
| complete_prompt += "Observation: [Result of the action]\n" | |
| complete_prompt += "```\n\n" | |
| complete_prompt += "For your final response:\n\n" | |
| complete_prompt += "```\n" | |
| complete_prompt += "Final Reasoning: [Summarize your complete analysis and decision process]\n" | |
| complete_prompt += "Final Answer: [Complete answer addressing all requirements]\n" | |
| complete_prompt += "```\n\n" | |
| # Add required tools and sequence context information | |
| context_info = "" | |
| if required_tools_info or sequence_context: | |
| context_info = f"{required_tools_info}{sequence_context}\n" | |
| # Enhanced user message with realistic agent reasoning prompts | |
| formatted_task_message = f"user:\nCurrent Task: {task_message}\n" | |
| # Add step-by-step reasoning instruction based on real agent patterns | |
| formatted_task_message += "\nYour reasoning process should follow this pattern:\n" | |
| formatted_task_message += "1. Analyze the user's request and identify key requirements\n" | |
| formatted_task_message += "2. Break down the task into sequential steps\n" | |
| formatted_task_message += "3. For each step, determine which tools to use and why\n" | |
| formatted_task_message += "4. Consider system constraints and potential issues\n" | |
| formatted_task_message += "5. Execute the plan while monitoring for problems\n\n" | |
| # Add expected criteria as shown in example | |
| if target["type"] == "Task" and target.get("description"): | |
| formatted_task_message += f"Expected criteria for your final answer: {target.get('description')}\n" | |
| # Add standard completion instructions with emphasis on reasoning | |
| formatted_task_message += "IMPORTANT: You MUST show your step-by-step reasoning process and return the actual complete content as the final answer, not a summary.\n\n" | |
| # Add context section if there's additional context available | |
| if context_info: | |
| formatted_task_message += f"Context you're working with:\n{context_info}\n" | |
| # Add motivation with realistic urgency | |
| formatted_task_message += "Begin! Remember to think through each step carefully, use the available tools appropriately, and provide your best Final Answer. Your systematic approach is crucial for success!" | |
| # Add user message at the end | |
| complete_prompt += formatted_task_message | |
| result["reconstructed_prompt"] = self._remove_line_numbers(complete_prompt) | |
| elif relation_type == "USES": | |
| # Agent uses Tool - This represents when an agent uses a tool during task execution | |
| if source["type"] == "Agent" and target["type"] == "Tool": | |
| # An agent would already have its system prompt and tools list | |
| # Here we focus on the actual tool invocation | |
| tool_name = target["name"] | |
| tool_prompt = target.get("raw_prompt", "") | |
| # Extract tool arguments from raw prompt if available | |
| tool_args = {} | |
| if "Tool Arguments:" in tool_prompt: | |
| try: | |
| args_start = tool_prompt.find("Tool Arguments:") + len("Tool Arguments:") | |
| args_end = tool_prompt.find("\n", args_start) if "\n" in tool_prompt[args_start:] else len(tool_prompt) | |
| args_str = tool_prompt[args_start:args_end].strip() | |
| if args_str.startswith("{") and args_str.endswith("}"): | |
| args_str = args_str.replace("'", "\"") # Convert single quotes to double for JSON parsing | |
| tool_args = json.loads(args_str) | |
| except: | |
| pass # If parsing fails, use empty args | |
| # Get the interaction prompt which may contain example usage | |
| interaction = relation.get("interaction_prompt", "") | |
| # For tool usage, show the actual tool invocation format | |
| complete_prompt = f"Agent {source['name']} uses tool: {tool_name}\n" | |
| complete_prompt += f"Tool Definition: {tool_prompt}\n\n" | |
| # Add example invocation if available in interaction prompt | |
| if interaction: | |
| complete_prompt += f"Tool Invocation: {interaction}\n" | |
| else: | |
| # Construct a sample invocation based on tool arguments | |
| args_display = ", ".join([f"{k}=[{v} value]" for k, v in tool_args.items()]) if tool_args else "" | |
| complete_prompt += f"Tool Invocation: {tool_name}({args_display})\n" | |
| # Add context about which tasks typically require this tool | |
| related_tasks = [] | |
| if target_id in self.relations_by_target: | |
| for rel in self.relations_by_target[target_id]: | |
| if rel["type"] == "REQUIRES_TOOL": | |
| task_id = rel["source"] | |
| if task_id in self.entities and self.entities[task_id]["type"] == "Task": | |
| related_tasks.append(self.entities[task_id]["name"]) | |
| # Track dependencies | |
| dependencies["entities"].add(task_id) | |
| dependencies["relations"].add(rel["id"]) | |
| if related_tasks: | |
| complete_prompt += f"\nThis tool is typically used for: {', '.join(related_tasks)}" | |
| result["reconstructed_prompt"] = self._remove_line_numbers(complete_prompt) | |
| elif relation_type == "ASSIGNED_TO": | |
| # Task assigned to Agent - This represents the assignment message | |
| if source["type"] == "Task" and target["type"] == "Agent": | |
| task_prompt = source.get("raw_prompt", "") | |
| agent_prompt = target.get("raw_prompt", "") | |
| interaction = relation.get("interaction_prompt", "") | |
| # This would typically be a system or orchestrator message to the agent | |
| complete_prompt = f"System → {target['name']}: You are assigned the following task:\n" | |
| complete_prompt += f"{task_prompt}\n" | |
| if interaction: | |
| complete_prompt += f"\nSpecific instructions: {interaction}\n" | |
| # Add information about required tools | |
| required_tools_info, req_tools_deps = self.enhance_with_required_tools(source_id) | |
| dependencies["entities"].update(req_tools_deps["entities"]) | |
| dependencies["relations"].update(req_tools_deps["relations"]) | |
| if required_tools_info: | |
| complete_prompt += required_tools_info | |
| # Add sequence context | |
| sequence_info, seq_deps = self.get_task_sequence_info(source_id) | |
| dependencies["entities"].update(seq_deps["entities"]) | |
| dependencies["relations"].update(seq_deps["relations"]) | |
| if sequence_info["previous_task"] or sequence_info["next_task"]: | |
| complete_prompt += "\n:" | |
| if sequence_info["previous_task"]: | |
| complete_prompt += f"\n{sequence_info['previous_task']}" | |
| if sequence_info["next_task"]: | |
| complete_prompt += f"\n{sequence_info['next_task']}" | |
| result["reconstructed_prompt"] = self._remove_line_numbers(complete_prompt) | |
| elif relation_type == "CONSUMED_BY": | |
| # Input/Request consumed by Agent - This represents input routing/dispatch | |
| if source["type"] == "Input" and target["type"] == "Agent": | |
| # This represents the system routing user input to the appropriate specialist agent | |
| input_content = source.get("raw_prompt", source["name"]) | |
| agent_name = target["name"] | |
| agent_description = target.get("description", "") | |
| interaction = relation.get("interaction_prompt", "") | |
| # This would be the routing/dispatch message from the system orchestrator | |
| complete_prompt = f"SYSTEM ROUTING: Input Dispatch\n\n" | |
| complete_prompt += f"User Input: {input_content}\n\n" | |
| complete_prompt += f"🎯 ROUTING DECISION:\n" | |
| complete_prompt += f"Selected Agent: {agent_name}\n" | |
| if agent_description: | |
| complete_prompt += f"Agent Expertise: {agent_description}\n" | |
| complete_prompt += f"\n📋 ROUTING RATIONALE:\n" | |
| if interaction: | |
| complete_prompt += f"{interaction}\n" | |
| else: | |
| complete_prompt += f"Input '{source['name']}' has been routed to {agent_name} based on the agent's specialized capabilities.\n" | |
| complete_prompt += f"\n🔄 NEXT STEP:\n" | |
| complete_prompt += f"The system will now pass this input to {agent_name} for processing.\n" | |
| result["reconstructed_prompt"] = complete_prompt | |
| elif relation_type == "PRODUCES": | |
| # Task produces Output - This represents the result generation | |
| if source["type"] == "Task" and target["type"] == "Output": | |
| task_name = source["name"] | |
| output_name = target["name"] | |
| output_content = target.get("raw_prompt", target["name"]) | |
| interaction = relation.get("interaction_prompt", "") | |
| # This represents the task completion and output generation | |
| complete_prompt = f"TASK COMPLETION: Output Generation\n\n" | |
| complete_prompt += f"Completed Task: {task_name}\n" | |
| complete_prompt += f"Generated Output: {output_name}\n\n" | |
| complete_prompt += f"📤 OUTPUT DETAILS:\n" | |
| complete_prompt += f"{output_content}\n\n" | |
| if interaction: | |
| complete_prompt += f"📋 GENERATION NOTES:\n{interaction}\n\n" | |
| complete_prompt += f"✅ STATUS: Task successfully completed and output ready for delivery.\n" | |
| result["reconstructed_prompt"] = complete_prompt | |
| elif relation_type == "DELIVERS_TO": | |
| # Output delivers to Human - This represents final result delivery | |
| if source["type"] == "Output" and target["type"] == "Human": | |
| output_name = source["name"] | |
| output_content = source.get("raw_prompt", source["name"]) | |
| human_name = target["name"] | |
| interaction = relation.get("interaction_prompt", "") | |
| # This represents the final delivery to the end user | |
| complete_prompt = f"FINAL DELIVERY: Output to User\n\n" | |
| complete_prompt += f"📬 DELIVERING TO: {human_name}\n" | |
| complete_prompt += f"📦 OUTPUT: {output_name}\n\n" | |
| complete_prompt += f"📄 CONTENT:\n{output_content}\n\n" | |
| if interaction: | |
| complete_prompt += f"📋 DELIVERY NOTES:\n{interaction}\n\n" | |
| complete_prompt += f"✅ DELIVERY STATUS: Output successfully delivered to user.\n" | |
| result["reconstructed_prompt"] = complete_prompt | |
| elif relation_type == "REQUIRED_BY": | |
| # Tool required by Task - This represents tool dependency | |
| if source["type"] == "Tool" and target["type"] == "Task": | |
| tool_name = source["name"] | |
| task_name = target["name"] | |
| tool_desc = source.get("raw_prompt", "") | |
| interaction = relation.get("interaction_prompt", "") | |
| # This represents a tool dependency check or preparation | |
| complete_prompt = f"DEPENDENCY CHECK: Tool Requirement\n\n" | |
| complete_prompt += f"🔧 REQUIRED TOOL: {tool_name}\n" | |
| complete_prompt += f"📋 FOR TASK: {task_name}\n\n" | |
| if tool_desc: | |
| complete_prompt += f"🛠️ TOOL DESCRIPTION:\n{tool_desc}\n\n" | |
| if interaction: | |
| complete_prompt += f"📋 REQUIREMENT DETAILS:\n{interaction}\n\n" | |
| complete_prompt += f"✅ STATUS: Tool dependency verified and available for task execution.\n" | |
| result["reconstructed_prompt"] = complete_prompt | |
| elif relation_type == "SUBTASK_OF": | |
| # Task is subtask of another Task - This represents task hierarchy | |
| if source["type"] == "Task" and target["type"] == "Task": | |
| subtask_name = source["name"] | |
| parent_task_name = target["name"] | |
| interaction = relation.get("interaction_prompt", "") | |
| # This represents task decomposition or hierarchy | |
| complete_prompt = f"TASK HIERARCHY: Subtask Relationship\n\n" | |
| complete_prompt += f"🎯 PARENT TASK: {parent_task_name}\n" | |
| complete_prompt += f"📋 SUBTASK: {subtask_name}\n\n" | |
| if interaction: | |
| complete_prompt += f"📋 HIERARCHY DETAILS:\n{interaction}\n\n" | |
| complete_prompt += f"🔄 WORKFLOW: Subtask '{subtask_name}' is part of larger task '{parent_task_name}'.\n" | |
| result["reconstructed_prompt"] = complete_prompt | |
| elif relation_type == "INTERVENES": | |
| # Agent intervenes in process - This represents intervention/oversight | |
| agent_name = source["name"] if source["type"] == "Agent" else target["name"] | |
| process_name = target["name"] if source["type"] == "Agent" else source["name"] | |
| interaction = relation.get("interaction_prompt", "") | |
| # This represents agent intervention or oversight | |
| complete_prompt = f"PROCESS INTERVENTION: Agent Oversight\n\n" | |
| complete_prompt += f"👤 INTERVENING AGENT: {agent_name}\n" | |
| complete_prompt += f"⚙️ TARGET PROCESS: {process_name}\n\n" | |
| if interaction: | |
| complete_prompt += f"📋 INTERVENTION DETAILS:\n{interaction}\n\n" | |
| complete_prompt += f"🚨 ACTION: Agent '{agent_name}' is intervening in '{process_name}' for quality control or course correction.\n" | |
| result["reconstructed_prompt"] = complete_prompt | |
| elif relation_type == "REQUIRES_TOOL" or relation_type == "NEXT": | |
| # These relations don't typically correspond to actual prompts in the execution | |
| # They are metadata that help establish dependencies and flow | |
| # We'll include them for completeness, but mark them as "context relations" | |
| source_type = source["type"] | |
| target_type = target["type"] | |
| source_name = source["name"] | |
| target_name = target["name"] | |
| interaction = relation.get("interaction_prompt", "") | |
| if relation_type == "REQUIRES_TOOL": | |
| context_note = ( | |
| f"Note: This is a dependency relation showing that task '{source_name}' " | |
| f"requires tool '{target_name}'. It doesn't represent an actual prompt " | |
| f"exchange but provides context for task execution." | |
| ) | |
| metadata_prompt = f"METADATA: Task-Tool Dependency\n" | |
| metadata_prompt += f"Task: {source_name}\n" | |
| metadata_prompt += f"Requires tool: {target_name}\n" | |
| if interaction: | |
| metadata_prompt += f"Usage pattern: {interaction}\n" | |
| metadata_prompt += f"\n{context_note}" | |
| elif relation_type == "NEXT": | |
| context_note = ( | |
| f"Note: This is a sequencing relation showing that task '{target_name}' " | |
| f"follows task '{source_name}'. It doesn't represent an actual prompt " | |
| f"exchange but provides context for the execution flow." | |
| ) | |
| metadata_prompt = f"METADATA: Task Sequencing\n" | |
| metadata_prompt += f"Previous task: {source_name}\n" | |
| metadata_prompt += f"Next task: {target_name}\n" | |
| if interaction: | |
| metadata_prompt += f"Transition: {interaction}\n" | |
| metadata_prompt += f"\n{context_note}" | |
| result["reconstructed_prompt"] = metadata_prompt | |
| result["is_context_relation"] = True | |
| # If no specific reconstruction was created, provide a generic one using raw prompts | |
| if "reconstructed_prompt" not in result: | |
| source_prompt = source.get("raw_prompt", "") | |
| target_prompt = target.get("raw_prompt", "") | |
| interaction = relation.get("interaction_prompt", "") | |
| result["reconstructed_prompt"] = f"{source['type']}: {source['name']}\n" | |
| if source_prompt: | |
| result["reconstructed_prompt"] += f"{source_prompt}\n\n" | |
| result["reconstructed_prompt"] += f"{target['type']}: {target['name']}\n" | |
| if target_prompt: | |
| result["reconstructed_prompt"] += f"{target_prompt}\n\n" | |
| if interaction: | |
| result["reconstructed_prompt"] += f"Interaction: {interaction}\n" | |
| # FINAL CLEANUP: Remove any remaining line numbers from the reconstructed prompt | |
| if "reconstructed_prompt" in result: | |
| result["reconstructed_prompt"] = self._remove_line_numbers(result["reconstructed_prompt"]) | |
| # Convert sets to lists for JSON serialization and include in result | |
| result["dependencies"] = { | |
| "entities": list(dependencies["entities"]), | |
| "relations": list(dependencies["relations"]) | |
| } | |
| return result | |
| def _remove_line_numbers(self, content: str) -> str: | |
| """ | |
| Replace line number prefixes with appropriate newlines to restore text structure. | |
| Line numbers like <L42>, <L43> represent where line breaks should be in the original text. | |
| """ | |
| import re | |
| # First handle the case where content is already split by newlines | |
| # Remove line numbers at the start of existing lines | |
| clean_content = re.sub(r'^<L\d+>\s*', '', content, flags=re.MULTILINE) | |
| # Now handle embedded line numbers - these should become line breaks | |
| # Pattern like "text<L42>more text" or "text <L42> more text" should become "text\nmore text" | |
| clean_content = re.sub(r'<L\d+>\s*', '\n', clean_content) | |
| # Handle special cases where line numbers have prefixes | |
| # "=<L471> content" should become "=\ncontent" | |
| clean_content = re.sub(r'=<L\d+>\s*', '=\n', clean_content) | |
| # Clean up multiple consecutive newlines (but keep intentional spacing) | |
| clean_content = re.sub(r'\n\s*\n\s*\n', '\n\n', clean_content) # Max 2 consecutive newlines | |
| # Clean up any trailing/leading whitespace on lines | |
| lines = clean_content.split('\n') | |
| cleaned_lines = [line.strip() for line in lines if line.strip() or not line.strip()] # Keep empty lines that were intentionally empty | |
| # Remove empty lines at start and end, but preserve internal structure | |
| while cleaned_lines and not cleaned_lines[0]: | |
| cleaned_lines.pop(0) | |
| while cleaned_lines and not cleaned_lines[-1]: | |
| cleaned_lines.pop() | |
| clean_content = '\n'.join(cleaned_lines) | |
| return clean_content | |
| def reconstruct_relations(self) -> List[Dict[str, Any]]: | |
| """Reconstruct all relations, mapping each to its reconstructed prompt.""" | |
| reconstructed_relations = [] | |
| # Process all relations regardless of timestamp | |
| for relation_id, relation in self.relations.items(): | |
| relation_type = relation.get("type") | |
| # Skip context relations that don't represent actual prompts | |
| # unless you want to include them for completeness | |
| if relation_type in ["REQUIRES_TOOL", "NEXT"]: | |
| continue | |
| # Get source and target entities | |
| source_id = relation["source"] | |
| target_id = relation["target"] | |
| if source_id in self.entities and target_id in self.entities: | |
| source = self.entities[source_id] | |
| target = self.entities[target_id] | |
| # Reconstruct the prompt for this relation | |
| reconstructed = self.reconstruct_relation_prompt(relation_id) | |
| # Skip context relations in the execution sequence | |
| if reconstructed.get("is_context_relation", False): | |
| continue | |
| # Create a deep copy of the original relation to preserve all fields | |
| relation_entry = copy.deepcopy(relation) | |
| # Add reconstructed prompt information | |
| relation_entry["prompt"] = reconstructed.get("reconstructed_prompt", "Error reconstructing prompt") | |
| # Add dependencies information | |
| relation_entry["dependencies"] = reconstructed.get("dependencies", {"entities": [], "relations": []}) | |
| # Add basic source and target entity information for convenience | |
| relation_entry["source_entity"] = { | |
| "id": source_id, | |
| "name": source["name"], | |
| "type": source["type"] | |
| } | |
| relation_entry["target_entity"] = { | |
| "id": target_id, | |
| "name": target["name"], | |
| "type": target["type"] | |
| } | |
| reconstructed_relations.append(relation_entry) | |
| return reconstructed_relations | |
| # Pure function for reconstructing prompts from knowledge graph data | |
| def reconstruct_prompts_from_knowledge_graph(knowledge_graph: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """ | |
| Pure function to reconstruct prompts from knowledge graph data. | |
| Args: | |
| knowledge_graph: Knowledge graph data with entities and relations | |
| Returns: | |
| List of dictionaries containing reconstructed prompts for each relation | |
| """ | |
| reconstructor = PromptReconstructor(knowledge_graph) | |
| return reconstructor.reconstruct_relations() | |
| def enrich_knowledge_graph_with_prompts(knowledge_graph: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Pure function to enrich a knowledge graph with reconstructed prompts. | |
| Args: | |
| knowledge_graph: Knowledge graph data with entities and relations | |
| Returns: | |
| Enhanced knowledge graph with prompt_reconstructions field containing | |
| the reconstructed prompts for each relation | |
| """ | |
| reconstructor = PromptReconstructor(knowledge_graph) | |
| reconstructed_relations = reconstructor.reconstruct_relations() | |
| # Create enhanced knowledge graph | |
| enhanced_kg = copy.deepcopy(knowledge_graph) | |
| enhanced_kg["prompt_reconstructions"] = reconstructed_relations | |
| return enhanced_kg |