Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Prompt Reconstructor for Agent Monitoring | |
| This module analyzes knowledge graphs to reconstruct the prompts used between components. | |
| It's used to prepare knowledge graphs for perturbation testing. | |
| """ | |
| import json | |
| import re | |
| import uuid | |
| from typing import Dict, List, Any, Optional, Union | |
| from datetime import datetime | |
| import logging | |
| import itertools | |
| from collections import defaultdict | |
| import copy | |
| import traceback | |
| # Configure logging for this module | |
| logger = logging.getLogger(__name__) | |
| class PromptReconstructor: | |
| def __init__(self, knowledge_graph: Dict[str, Any]): | |
| """ | |
| Initialize a PromptReconstructor with knowledge graph data. | |
| Args: | |
| knowledge_graph (Dict[str, Any]): Knowledge graph data with entities and relations | |
| """ | |
| if not knowledge_graph or 'entities' not in knowledge_graph or 'relations' not in knowledge_graph: | |
| raise ValueError("Invalid knowledge graph data - must contain 'entities' and 'relations'") | |
| self.kg = knowledge_graph | |
| # Create lookup dictionaries for efficient access | |
| self.entities = {entity["id"]: entity for entity in self.kg["entities"]} | |
| self.relations = {} | |
| self.relations_by_source = {} | |
| self.relations_by_target = {} | |
| # Organize relations for lookup | |
| for relation in self.kg["relations"]: | |
| self.relations[relation["id"]] = relation | |
| # Group relations by source and target | |
| if relation["source"] not in self.relations_by_source: | |
| self.relations_by_source[relation["source"]] = [] | |
| self.relations_by_source[relation["source"]].append(relation) | |
| if relation["target"] not in self.relations_by_target: | |
| self.relations_by_target[relation["target"]] = [] | |
| self.relations_by_target[relation["target"]].append(relation) | |
| logger.info(f"Successfully initialized PromptReconstructor with {len(self.entities)} entities and {len(self.relations)} relations") | |
| def get_tool_definitions(self, agent_id: str) -> List[str]: | |
| """Get tool definitions for tools used by an agent.""" | |
| tool_definitions = [] | |
| dependencies = {"entities": set(), "relations": set()} | |
| # No relations for this agent | |
| if agent_id not in self.relations_by_source: | |
| return tool_definitions, dependencies | |
| # Find USES relations for this agent | |
| for relation in self.relations_by_source[agent_id]: | |
| if relation["type"] == "USES": | |
| tool_id = relation["target"] | |
| if tool_id in self.entities and self.entities[tool_id]["type"] == "Tool": | |
| tool = self.entities[tool_id] | |
| # Add the raw prompt of the tool which contains its definition | |
| if tool.get("raw_prompt"): | |
| tool_definitions.append(tool["raw_prompt"]) | |
| # Track dependencies | |
| dependencies["entities"].add(tool_id) | |
| dependencies["relations"].add(relation["id"]) | |
| return tool_definitions, dependencies | |
| def enhance_with_required_tools(self, task_id: str) -> tuple: | |
| """Get information about tools required by this task to enhance prompt reconstruction.""" | |
| if task_id not in self.entities or self.entities[task_id]["type"] != "Task": | |
| return "", {"entities": set(), "relations": set()} | |
| required_tools_info = "" | |
| dependencies = {"entities": set(), "relations": set()} | |
| # Find all REQUIRES_TOOL relations for this task | |
| if task_id in self.relations_by_source: | |
| required_tools = [] | |
| for relation in self.relations_by_source[task_id]: | |
| if relation["type"] == "REQUIRES_TOOL": | |
| tool_id = relation["target"] | |
| if tool_id in self.entities and self.entities[tool_id]["type"] == "Tool": | |
| tool = self.entities[tool_id] | |
| # Add tool name and usage instruction if available | |
| tool_desc = f"- {tool['name']}" | |
| if relation.get("interaction_prompt"): | |
| tool_desc += f" (Use as directed: {relation['interaction_prompt']})" | |
| required_tools.append(tool_desc) | |
| # Track dependencies | |
| dependencies["entities"].add(tool_id) | |
| dependencies["relations"].add(relation["id"]) | |
| if required_tools: | |
| required_tools_info = "\nRequired tools for this task:\n" + "\n".join(required_tools) | |
| return required_tools_info, dependencies | |
| def get_task_sequence_info(self, task_id: str) -> tuple: | |
| """Get information about task sequencing (previous and next tasks).""" | |
| sequence_info = { | |
| "previous_task": "", | |
| "next_task": "" | |
| } | |
| dependencies = {"entities": set(), "relations": set()} | |
| # Find previous task (task that has NEXT relation to this task) | |
| if task_id in self.relations_by_target: | |
| for relation in self.relations_by_target[task_id]: | |
| if relation["type"] == "NEXT": | |
| prev_task_id = relation["source"] | |
| if prev_task_id in self.entities and self.entities[prev_task_id]["type"] == "Task": | |
| prev_task = self.entities[prev_task_id] | |
| prev_info = f"This task follows: {prev_task['name']}" | |
| if relation.get("interaction_prompt"): | |
| prev_info += f" ({relation['interaction_prompt']})" | |
| sequence_info["previous_task"] = prev_info | |
| # Track dependencies | |
| dependencies["entities"].add(prev_task_id) | |
| dependencies["relations"].add(relation["id"]) | |
| break | |
| # Find next task (task that this task has NEXT relation to) | |
| if task_id in self.relations_by_source: | |
| for relation in self.relations_by_source[task_id]: | |
| if relation["type"] == "NEXT": | |
| next_task_id = relation["target"] | |
| if next_task_id in self.entities and self.entities[next_task_id]["type"] == "Task": | |
| next_task = self.entities[next_task_id] | |
| next_info = f"After this task: {next_task['name']}" | |
| if relation.get("interaction_prompt"): | |
| next_info += f" ({relation['interaction_prompt']})" | |
| sequence_info["next_task"] = next_info | |
| # Track dependencies | |
| dependencies["entities"].add(next_task_id) | |
| dependencies["relations"].add(relation["id"]) | |
| break | |
| return sequence_info, dependencies | |
| def reconstruct_relation_prompt(self, relation_id: str) -> Dict[str, Any]: | |
| """Reconstruct the actual prompt that would have been sent during system execution for a specific relation.""" | |
| if relation_id not in self.relations: | |
| return {"error": f"Relation {relation_id} not found in knowledge graph"} | |
| relation = self.relations[relation_id] | |
| source_id = relation["source"] | |
| target_id = relation["target"] | |
| relation_type = relation["type"] | |
| # Initialize dependency tracking | |
| dependencies = { | |
| "entities": {source_id, target_id}, # Always include source and target entities | |
| "relations": {relation_id} # Always include the current relation | |
| } | |
| # Check if source and target entities exist | |
| if source_id not in self.entities or target_id not in self.entities: | |
| return {"error": f"Source or target entity for relation {relation_id} not found"} | |
| source = self.entities[source_id] | |
| target = self.entities[target_id] | |
| # Basic information about the relation | |
| result = { | |
| "relation_id": relation_id, | |
| "relation_type": relation_type, | |
| "source": { | |
| "id": source_id, | |
| "name": source["name"], | |
| "type": source["type"] | |
| }, | |
| "target": { | |
| "id": target_id, | |
| "name": target["name"], | |
| "type": target["type"] | |
| }, | |
| "description": relation.get("description", ""), | |
| "status": relation.get("status", None), | |
| "interaction_prompt": relation.get("interaction_prompt", ""), | |
| "timestamp": relation.get("start_time", "Unknown") | |
| } | |
| # Construct the prompt that would have been actually sent based on relation type | |
| if relation_type == "PERFORMS": | |
| # Agent performs Task - This represents when an agent receives a task to execute | |
| if source["type"] == "Agent" and target["type"] == "Task": | |
| # Start with the agent's system prompt | |
| agent_prompt = source.get("raw_prompt", f"You are {source['name']}.") | |
| # Get tool definitions this agent has access to | |
| tool_definitions, tool_deps = self.get_tool_definitions(source_id) | |
| dependencies["entities"].update(tool_deps["entities"]) | |
| dependencies["relations"].update(tool_deps["relations"]) | |
| tool_section = "" | |
| if tool_definitions: | |
| tool_section = "\n\nYou have access to the following tools:\n" + "\n\n".join(tool_definitions) | |
| # Get the task description the agent would receive | |
| task_prompt = target.get("raw_prompt", f"{target['name']}") | |
| # Get specific interaction prompt if available, which represents the actual task message | |
| interaction = relation.get("interaction_prompt", "") | |
| # Enhance with required tools information | |
| required_tools_info, req_tools_deps = self.enhance_with_required_tools(target_id) | |
| dependencies["entities"].update(req_tools_deps["entities"]) | |
| dependencies["relations"].update(req_tools_deps["relations"]) | |
| # Add sequence context to help agent understand workflow | |
| sequence_info, seq_deps = self.get_task_sequence_info(target_id) | |
| dependencies["entities"].update(seq_deps["entities"]) | |
| dependencies["relations"].update(seq_deps["relations"]) | |
| sequence_context = "" | |
| if sequence_info["previous_task"] or sequence_info["next_task"]: | |
| sequence_context = "\n" | |
| if sequence_info["previous_task"]: | |
| sequence_context += sequence_info["previous_task"] + ". " | |
| if sequence_info["next_task"]: | |
| sequence_context += sequence_info["next_task"] + "." | |
| # Construct what the agent would actually receive during execution | |
| # Format: Agent system prompt + tools + task as user message + additional context | |
| task_message = interaction if interaction else task_prompt | |
| # Start with enhanced system prompt - IMPROVED FORMATTING | |
| system_role = f"system: You are {source['name']}. " | |
| system_description = source.get("description", "") | |
| if system_description: | |
| system_role += f"You're an expert in {system_description.split(' responsible for ')[0].lower() if ' responsible for ' in system_description else system_description.lower()}.\n" | |
| system_role += f"Your personal goal is: {system_description}\n" | |
| else: | |
| system_role += "\n" | |
| # Add emphatic instruction about tools | |
| system_role += "You ONLY have access to the following tools, and should NEVER make up tools that are not listed here:\n\n" | |
| # Place system prompt first | |
| complete_prompt = system_role | |
| # Format tool definitions with better structure to match example | |
| if tool_definitions: | |
| for tool_def in tool_definitions: | |
| # Extract tool name, args, and description to reformat | |
| tool_name = "" | |
| tool_args = "{}" | |
| tool_desc = "" | |
| if "Tool Name:" in tool_def: | |
| name_start = tool_def.find("Tool Name:") + len("Tool Name:") | |
| name_end = tool_def.find("\n", name_start) if "\n" in tool_def[name_start:] else len(tool_def) | |
| tool_name = tool_def[name_start:name_end].strip() | |
| if "Tool Arguments:" in tool_def: | |
| args_start = tool_def.find("Tool Arguments:") + len("Tool Arguments:") | |
| args_end = tool_def.find("\n", args_start) if "\n" in tool_def[args_start:] else len(tool_def) | |
| tool_args = tool_def[args_start:args_end].strip() | |
| if "Tool Description:" in tool_def: | |
| desc_start = tool_def.find("Tool Description:") + len("Tool Description:") | |
| desc_end = tool_def.find("\n", desc_start) if "\n" in tool_def[desc_start:] else len(tool_def) | |
| tool_desc = tool_def[desc_start:desc_end].strip() | |
| # Format the tool entry more closely to the example format | |
| complete_prompt += f"Tool Name: {tool_name}\n" | |
| complete_prompt += f"Tool Arguments: {tool_args}\n" | |
| complete_prompt += f"Tool Description: {tool_desc}\n\n" | |
| # Add response format instructions with explicit Copy code markers | |
| complete_prompt += "IMPORTANT: Use the following format in your response:\n\n" | |
| complete_prompt += "Copy code\n" | |
| complete_prompt += "```\n" | |
| complete_prompt += "Thought: you should always think about what to do\n" | |
| complete_prompt += "Action: the action to take, only one name of [" + ", ".join([t.split("Tool Name:")[1].strip().split("\n")[0] for t in tool_definitions if "Tool Name:" in t]) + "], just the name, exactly as it's written.\n" | |
| complete_prompt += "Action Input: the input to the action, just a simple JSON object, enclosed in curly braces, using \" to wrap keys and values.\n" | |
| complete_prompt += "Observation: the result of the action\n" | |
| complete_prompt += "```\n\n" | |
| complete_prompt += "Once all necessary information is gathered, return the following format:\n\n" | |
| complete_prompt += "Copy code\n" | |
| complete_prompt += "```\n" | |
| complete_prompt += "Thought: I now know the final answer\n" | |
| complete_prompt += "Final Answer: the final answer to the original input question\n" | |
| complete_prompt += "```\n\n" | |
| # Add required tools and sequence context information | |
| context_info = "" | |
| if required_tools_info or sequence_context: | |
| context_info = f"{required_tools_info}{sequence_context}\n" | |
| # Format the user message with improved structure matching the example | |
| formatted_task_message = f"user:\nCurrent Task: {task_message}\n" | |
| # Add expected criteria as shown in example | |
| if target["type"] == "Task" and target.get("description"): | |
| formatted_task_message += f"\nThis is the expected criteria for your final answer: {target.get('description')}\n" | |
| # Add standard completion instructions | |
| formatted_task_message += "you MUST return the actual complete content as the final answer, not a summary.\n\n" | |
| # Add context section if there's additional context available | |
| if context_info: | |
| formatted_task_message += f"This is the context you're working with:\n{context_info}\n" | |
| # Add motivation closing like in the example | |
| formatted_task_message += "Begin! This is VERY important to you, use the tools available and give your best Final Answer, your job depends on it!" | |
| # Add user message at the end | |
| complete_prompt += formatted_task_message | |
| result["reconstructed_prompt"] = self._remove_line_numbers(complete_prompt) | |
| elif relation_type == "USES": | |
| # Agent uses Tool - This represents when an agent uses a tool during task execution | |
| if source["type"] == "Agent" and target["type"] == "Tool": | |
| # An agent would already have its system prompt and tools list | |
| # Here we focus on the actual tool invocation | |
| tool_name = target["name"] | |
| tool_prompt = target.get("raw_prompt", "") | |
| # Extract tool arguments from raw prompt if available | |
| tool_args = {} | |
| if "Tool Arguments:" in tool_prompt: | |
| try: | |
| args_start = tool_prompt.find("Tool Arguments:") + len("Tool Arguments:") | |
| args_end = tool_prompt.find("\n", args_start) if "\n" in tool_prompt[args_start:] else len(tool_prompt) | |
| args_str = tool_prompt[args_start:args_end].strip() | |
| if args_str.startswith("{") and args_str.endswith("}"): | |
| args_str = args_str.replace("'", "\"") # Convert single quotes to double for JSON parsing | |
| tool_args = json.loads(args_str) | |
| except: | |
| pass # If parsing fails, use empty args | |
| # Get the interaction prompt which may contain example usage | |
| interaction = relation.get("interaction_prompt", "") | |
| # For tool usage, show the actual tool invocation format | |
| complete_prompt = f"Agent {source['name']} uses tool: {tool_name}\n" | |
| complete_prompt += f"Tool Definition: {tool_prompt}\n\n" | |
| # Add example invocation if available in interaction prompt | |
| if interaction: | |
| complete_prompt += f"Tool Invocation: {interaction}\n" | |
| else: | |
| # Construct a sample invocation based on tool arguments | |
| args_display = ", ".join([f"{k}=[{v} value]" for k, v in tool_args.items()]) if tool_args else "" | |
| complete_prompt += f"Tool Invocation: {tool_name}({args_display})\n" | |
| # Add context about which tasks typically require this tool | |
| related_tasks = [] | |
| if target_id in self.relations_by_target: | |
| for rel in self.relations_by_target[target_id]: | |
| if rel["type"] == "REQUIRES_TOOL": | |
| task_id = rel["source"] | |
| if task_id in self.entities and self.entities[task_id]["type"] == "Task": | |
| related_tasks.append(self.entities[task_id]["name"]) | |
| # Track dependencies | |
| dependencies["entities"].add(task_id) | |
| dependencies["relations"].add(rel["id"]) | |
| if related_tasks: | |
| complete_prompt += f"\nThis tool is typically used for: {', '.join(related_tasks)}" | |
| result["reconstructed_prompt"] = self._remove_line_numbers(complete_prompt) | |
| elif relation_type == "ASSIGNED_TO": | |
| # Task assigned to Agent - This represents the assignment message | |
| if source["type"] == "Task" and target["type"] == "Agent": | |
| task_prompt = source.get("raw_prompt", "") | |
| agent_prompt = target.get("raw_prompt", "") | |
| interaction = relation.get("interaction_prompt", "") | |
| # This would typically be a system or orchestrator message to the agent | |
| complete_prompt = f"System → {target['name']}: You are assigned the following task:\n" | |
| complete_prompt += f"{task_prompt}\n" | |
| if interaction: | |
| complete_prompt += f"\nSpecific instructions: {interaction}\n" | |
| # Add information about required tools | |
| required_tools_info, req_tools_deps = self.enhance_with_required_tools(source_id) | |
| dependencies["entities"].update(req_tools_deps["entities"]) | |
| dependencies["relations"].update(req_tools_deps["relations"]) | |
| if required_tools_info: | |
| complete_prompt += required_tools_info | |
| # Add sequence context | |
| sequence_info, seq_deps = self.get_task_sequence_info(source_id) | |
| dependencies["entities"].update(seq_deps["entities"]) | |
| dependencies["relations"].update(seq_deps["relations"]) | |
| if sequence_info["previous_task"] or sequence_info["next_task"]: | |
| complete_prompt += "\n:" | |
| if sequence_info["previous_task"]: | |
| complete_prompt += f"\n{sequence_info['previous_task']}" | |
| if sequence_info["next_task"]: | |
| complete_prompt += f"\n{sequence_info['next_task']}" | |
| result["reconstructed_prompt"] = self._remove_line_numbers(complete_prompt) | |
| elif relation_type == "REQUIRES_TOOL" or relation_type == "NEXT": | |
| # These relations don't typically correspond to actual prompts in the execution | |
| # They are metadata that help establish dependencies and flow | |
| # We'll include them for completeness, but mark them as "context relations" | |
| source_type = source["type"] | |
| target_type = target["type"] | |
| source_name = source["name"] | |
| target_name = target["name"] | |
| interaction = relation.get("interaction_prompt", "") | |
| if relation_type == "REQUIRES_TOOL": | |
| context_note = ( | |
| f"Note: This is a dependency relation showing that task '{source_name}' " | |
| f"requires tool '{target_name}'. It doesn't represent an actual prompt " | |
| f"exchange but provides context for task execution." | |
| ) | |
| metadata_prompt = f"METADATA: Task-Tool Dependency\n" | |
| metadata_prompt += f"Task: {source_name}\n" | |
| metadata_prompt += f"Requires tool: {target_name}\n" | |
| if interaction: | |
| metadata_prompt += f"Usage pattern: {interaction}\n" | |
| metadata_prompt += f"\n{context_note}" | |
| elif relation_type == "NEXT": | |
| context_note = ( | |
| f"Note: This is a sequencing relation showing that task '{target_name}' " | |
| f"follows task '{source_name}'. It doesn't represent an actual prompt " | |
| f"exchange but provides context for the execution flow." | |
| ) | |
| metadata_prompt = f"METADATA: Task Sequencing\n" | |
| metadata_prompt += f"Previous task: {source_name}\n" | |
| metadata_prompt += f"Next task: {target_name}\n" | |
| if interaction: | |
| metadata_prompt += f"Transition: {interaction}\n" | |
| metadata_prompt += f"\n{context_note}" | |
| result["reconstructed_prompt"] = metadata_prompt | |
| result["is_context_relation"] = True | |
| # If no specific reconstruction was created, provide a generic one using raw prompts | |
| if "reconstructed_prompt" not in result: | |
| source_prompt = source.get("raw_prompt", "") | |
| target_prompt = target.get("raw_prompt", "") | |
| interaction = relation.get("interaction_prompt", "") | |
| result["reconstructed_prompt"] = f"{source['type']}: {source['name']}\n" | |
| if source_prompt: | |
| result["reconstructed_prompt"] += f"{source_prompt}\n\n" | |
| result["reconstructed_prompt"] += f"{target['type']}: {target['name']}\n" | |
| if target_prompt: | |
| result["reconstructed_prompt"] += f"{target_prompt}\n\n" | |
| if interaction: | |
| result["reconstructed_prompt"] += f"Interaction: {interaction}\n" | |
| # FINAL CLEANUP: Remove any remaining line numbers from the reconstructed prompt | |
| if "reconstructed_prompt" in result: | |
| result["reconstructed_prompt"] = self._remove_line_numbers(result["reconstructed_prompt"]) | |
| # Convert sets to lists for JSON serialization and include in result | |
| result["dependencies"] = { | |
| "entities": list(dependencies["entities"]), | |
| "relations": list(dependencies["relations"]) | |
| } | |
| return result | |
| def _remove_line_numbers(self, content: str) -> str: | |
| """ | |
| Replace line number prefixes with appropriate newlines to restore text structure. | |
| Line numbers like <L42>, <L43> represent where line breaks should be in the original text. | |
| """ | |
| import re | |
| # First handle the case where content is already split by newlines | |
| # Remove line numbers at the start of existing lines | |
| clean_content = re.sub(r'^<L\d+>\s*', '', content, flags=re.MULTILINE) | |
| # Now handle embedded line numbers - these should become line breaks | |
| # Pattern like "text<L42>more text" or "text <L42> more text" should become "text\nmore text" | |
| clean_content = re.sub(r'<L\d+>\s*', '\n', clean_content) | |
| # Handle special cases where line numbers have prefixes | |
| # "=<L471> content" should become "=\ncontent" | |
| clean_content = re.sub(r'=<L\d+>\s*', '=\n', clean_content) | |
| # Clean up multiple consecutive newlines (but keep intentional spacing) | |
| clean_content = re.sub(r'\n\s*\n\s*\n', '\n\n', clean_content) # Max 2 consecutive newlines | |
| # Clean up any trailing/leading whitespace on lines | |
| lines = clean_content.split('\n') | |
| cleaned_lines = [line.strip() for line in lines if line.strip() or not line.strip()] # Keep empty lines that were intentionally empty | |
| # Remove empty lines at start and end, but preserve internal structure | |
| while cleaned_lines and not cleaned_lines[0]: | |
| cleaned_lines.pop(0) | |
| while cleaned_lines and not cleaned_lines[-1]: | |
| cleaned_lines.pop() | |
| clean_content = '\n'.join(cleaned_lines) | |
| return clean_content | |
| def reconstruct_relations(self) -> List[Dict[str, Any]]: | |
| """Reconstruct all relations, mapping each to its reconstructed prompt.""" | |
| reconstructed_relations = [] | |
| # Process all relations regardless of timestamp | |
| for relation_id, relation in self.relations.items(): | |
| relation_type = relation.get("type") | |
| # Skip context relations that don't represent actual prompts | |
| # unless you want to include them for completeness | |
| if relation_type in ["REQUIRES_TOOL", "NEXT"]: | |
| continue | |
| # Get source and target entities | |
| source_id = relation["source"] | |
| target_id = relation["target"] | |
| if source_id in self.entities and target_id in self.entities: | |
| source = self.entities[source_id] | |
| target = self.entities[target_id] | |
| # Reconstruct the prompt for this relation | |
| reconstructed = self.reconstruct_relation_prompt(relation_id) | |
| # Skip context relations in the execution sequence | |
| if reconstructed.get("is_context_relation", False): | |
| continue | |
| # Create a deep copy of the original relation to preserve all fields | |
| relation_entry = copy.deepcopy(relation) | |
| # Add reconstructed prompt information | |
| relation_entry["prompt"] = reconstructed.get("reconstructed_prompt", "Error reconstructing prompt") | |
| # Add dependencies information | |
| relation_entry["dependencies"] = reconstructed.get("dependencies", {"entities": [], "relations": []}) | |
| # Add basic source and target entity information for convenience | |
| relation_entry["source_entity"] = { | |
| "id": source_id, | |
| "name": source["name"], | |
| "type": source["type"] | |
| } | |
| relation_entry["target_entity"] = { | |
| "id": target_id, | |
| "name": target["name"], | |
| "type": target["type"] | |
| } | |
| reconstructed_relations.append(relation_entry) | |
| return reconstructed_relations | |
| # Pure function for reconstructing prompts from knowledge graph data | |
| def reconstruct_prompts_from_knowledge_graph(knowledge_graph: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """ | |
| Pure function to reconstruct prompts from knowledge graph data. | |
| Args: | |
| knowledge_graph: Knowledge graph data with entities and relations | |
| Returns: | |
| List of dictionaries containing reconstructed prompts for each relation | |
| """ | |
| reconstructor = PromptReconstructor(knowledge_graph) | |
| return reconstructor.reconstruct_relations() | |
| def enrich_knowledge_graph_with_prompts(knowledge_graph: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Pure function to enrich a knowledge graph with reconstructed prompts. | |
| Args: | |
| knowledge_graph: Knowledge graph data with entities and relations | |
| Returns: | |
| Enhanced knowledge graph with prompt_reconstructions field containing | |
| the reconstructed prompts for each relation | |
| """ | |
| reconstructor = PromptReconstructor(knowledge_graph) | |
| reconstructed_relations = reconstructor.reconstruct_relations() | |
| # Create enhanced knowledge graph | |
| enhanced_kg = copy.deepcopy(knowledge_graph) | |
| enhanced_kg["prompt_reconstructions"] = reconstructed_relations | |
| return enhanced_kg |