Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| RAG-based Prompt Reconstructor for Agent Monitoring | |
| This module uses Retrieval-Augmented Generation (RAG) to reconstruct prompts from knowledge graphs. | |
| It leverages CrewAI's RAG capabilities to intelligently search through trace content and reconstruct | |
| the actual prompts that would have been sent to LLMs during system execution. | |
| """ | |
| import json | |
| import logging | |
| import tempfile | |
| import os | |
| from typing import Dict, List, Any, Optional, Tuple | |
| from datetime import datetime | |
| import copy | |
| from crewai import Agent, Task, Crew, LLM | |
| from crewai_tools import RagTool | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| logger = logging.getLogger(__name__) | |
| class RagPromptReconstructor: | |
| """ | |
| RAG-based prompt reconstructor that uses CrewAI to intelligently reconstruct | |
| prompts by searching through vectorized trace content. | |
| """ | |
| def __init__(self, knowledge_graph: Dict[str, Any], original_trace: str, llm_config: Optional[Dict] = None): | |
| """Initialize the RAG-based prompt reconstructor.""" | |
| if not knowledge_graph or 'entities' not in knowledge_graph or 'relations' not in knowledge_graph: | |
| raise ValueError("Invalid knowledge graph data - must contain 'entities' and 'relations'") | |
| if not original_trace or not original_trace.strip(): | |
| raise ValueError("Original trace content is required for RAG reconstruction") | |
| self.kg = knowledge_graph | |
| self.original_trace = original_trace | |
| # Create lookup dictionaries | |
| self.entities = {entity["id"]: entity for entity in self.kg["entities"]} | |
| self.relations = {} | |
| self.relations_by_source = {} | |
| self.relations_by_target = {} | |
| # Organize relations for lookup | |
| for relation in self.kg["relations"]: | |
| self.relations[relation["id"]] = relation | |
| if relation["source"] not in self.relations_by_source: | |
| self.relations_by_source[relation["source"]] = [] | |
| self.relations_by_source[relation["source"]].append(relation) | |
| if relation["target"] not in self.relations_by_target: | |
| self.relations_by_target[relation["target"]] = [] | |
| self.relations_by_target[relation["target"]].append(relation) | |
| # Initialize components | |
| self.llm = self._init_llm(llm_config) | |
| self.rag_tool = self._init_rag_tool() | |
| self.query_agent = self._create_query_agent() | |
| self.reconstruction_agent = self._create_reconstruction_agent() | |
| logger.info(f"Initialized RagPromptReconstructor with {len(self.entities)} entities and {len(self.relations)} relations") | |
| def _init_llm(self, llm_config: Optional[Dict]) -> LLM: | |
| """Initialize LLM for CrewAI agents.""" | |
| if llm_config: | |
| return LLM(**llm_config) | |
| return LLM( | |
| model="gpt-5-mini", | |
| temperature=0.1, | |
| ) | |
| def _init_rag_tool(self) -> RagTool: | |
| """Initialize RAG tool with trace content.""" | |
| try: | |
| # Create RAG tool first | |
| rag_tool = RagTool() | |
| # Add content directly as text instead of file | |
| # This avoids file type detection issues | |
| rag_tool.add(source=self.original_trace, data_type="text") | |
| logger.info("Successfully initialized RAG tool with trace content") | |
| return rag_tool | |
| except Exception as e: | |
| logger.error(f"Failed to initialize RAG tool: {e}") | |
| # If text doesn't work, try alternative approaches | |
| try: | |
| # Alternative: try as raw text content | |
| rag_tool = RagTool() | |
| # Create a temporary file and try text_file type | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as tmp_file: | |
| tmp_file.write(self.original_trace) | |
| tmp_file_path = tmp_file.name | |
| rag_tool.add(source=tmp_file_path, data_type="text_file") | |
| os.unlink(tmp_file_path) | |
| logger.info("Successfully initialized RAG tool with alternative method") | |
| return rag_tool | |
| except Exception as e2: | |
| logger.error(f"Alternative RAG tool initialization also failed: {e2}") | |
| raise RuntimeError(f"RAG tool initialization failed: {e}. Alternative also failed: {e2}") | |
| def _create_query_agent(self) -> Agent: | |
| """Create agent specialized in generating semantic search queries.""" | |
| return Agent( | |
| role="Query Generation Specialist", | |
| goal="Generate precise search queries to find relevant trace content for prompt reconstruction", | |
| backstory="""You are an expert at understanding conversation flows and generating | |
| semantic search queries. Your job is to analyze relationships between entities | |
| and create targeted queries that will retrieve the exact trace content needed | |
| to reconstruct original prompts.""", | |
| tools=[self.rag_tool], | |
| llm=self.llm, | |
| verbose=False, | |
| ) | |
| def _create_reconstruction_agent(self) -> Agent: | |
| """Create agent specialized in reconstructing prompts from retrieved content.""" | |
| return Agent( | |
| role="Prompt Reconstruction Expert", | |
| goal="Reconstruct natural, accurate prompts from retrieved trace content", | |
| backstory="""You are an expert at understanding how AI systems communicate | |
| and reconstructing the exact prompts that would be sent between components. | |
| You can identify user inputs, agent responses, system prompts, tool calls, | |
| and conversation context.""", | |
| tools=[self.rag_tool], | |
| llm=self.llm, | |
| verbose=False, | |
| ) | |
| def _generate_search_queries(self, relation: Dict[str, Any], source_entity: Dict[str, Any], | |
| target_entity: Dict[str, Any]) -> List[str]: | |
| """Generate semantic search queries for a specific relationship.""" | |
| context = { | |
| "relation_type": relation["type"], | |
| "source_name": source_entity["name"], | |
| "source_type": source_entity["type"], | |
| "target_name": target_entity["name"], | |
| "target_type": target_entity["type"], | |
| "interaction_prompt": relation.get("interaction_prompt", ""), | |
| } | |
| # Define task for query generation | |
| query_task = Task( | |
| description=f""" | |
| Generate 3-5 semantic search queries to find trace content. | |
| Relationship: {context['relation_type']} | |
| Source: {context['source_name']} ({context['source_type']}) | |
| Target: {context['target_name']} ({context['target_type']}) | |
| Return as JSON list of strings. | |
| """, | |
| agent=self.query_agent, | |
| expected_output="JSON list of search query strings" | |
| ) | |
| try: | |
| crew = Crew(agents=[self.query_agent], tasks=[query_task], verbose=False) | |
| result = crew.kickoff() | |
| if isinstance(result, str): | |
| queries = json.loads(result) | |
| else: | |
| queries = result | |
| if isinstance(queries, list): | |
| return [str(q) for q in queries] | |
| else: | |
| return self._fallback_queries(context) | |
| except Exception as e: | |
| logger.warning(f"Query generation failed: {e}") | |
| return self._fallback_queries(context) | |
| def _fallback_queries(self, context: Dict[str, Any]) -> List[str]: | |
| """Generate fallback queries.""" | |
| return [ | |
| f"{context['source_name']} {context['target_name']}", | |
| f"{context['relation_type'].lower()} {context['target_name']}", | |
| f"interaction between {context['source_name']} and {context['target_name']}" | |
| ] | |
| def _retrieve_and_reconstruct(self, relation: Dict[str, Any], source_entity: Dict[str, Any], | |
| target_entity: Dict[str, Any], queries: List[str]) -> str: | |
| """Retrieve relevant content and reconstruct the prompt.""" | |
| context = { | |
| "relation_type": relation["type"], | |
| "source": source_entity, | |
| "target": target_entity, | |
| "interaction_prompt": relation.get("interaction_prompt", ""), | |
| "queries": queries | |
| } | |
| # Define reconstruction task | |
| reconstruction_task = Task( | |
| description=f""" | |
| Use the RAG tool to search for trace content and reconstruct the original prompt. | |
| Relationship: {context['relation_type']} | |
| Source: {context['source']['name']} ({context['source']['type']}) | |
| Target: {context['target']['name']} ({context['target']['type']}) | |
| Search Queries: {', '.join(queries)} | |
| Use the RAG tool to search and reconstruct the exact prompt. | |
| Format as natural conversation. Remove line numbers or artifacts. | |
| Return ONLY the reconstructed prompt content. | |
| """, | |
| agent=self.reconstruction_agent, | |
| expected_output="The reconstructed prompt as it would appear in the actual system" | |
| ) | |
| try: | |
| crew = Crew(agents=[self.reconstruction_agent], tasks=[reconstruction_task], verbose=False) | |
| result = crew.kickoff() | |
| if isinstance(result, str): | |
| return result.strip() | |
| else: | |
| return str(result).strip() | |
| except Exception as e: | |
| logger.error(f"Prompt reconstruction failed: {e}") | |
| return self._fallback_reconstruction(context) | |
| def _fallback_reconstruction(self, context: Dict[str, Any]) -> str: | |
| """Generate fallback reconstruction when agent-based reconstruction fails.""" | |
| source = context["source"] | |
| target = context["target"] | |
| relation_type = context["relation_type"] | |
| interaction = context.get("interaction_prompt", "") | |
| if relation_type == "PERFORMS" and source["type"] == "Input" and target["type"] == "Agent": | |
| user_content = source.get("raw_prompt", interaction) | |
| return f"User: {user_content}" | |
| # Generic fallback | |
| source_content = source.get("raw_prompt", "") | |
| result = f"{source['name']}: {source_content}" | |
| if interaction: | |
| result += f"\nInteraction: {interaction}" | |
| return result.strip() | |
| def reconstruct_relation_prompt(self, relation_id: str) -> Dict[str, Any]: | |
| """Reconstruct the actual prompt for a specific relation using RAG.""" | |
| if relation_id not in self.relations: | |
| return {"error": f"Relation {relation_id} not found in knowledge graph"} | |
| relation = self.relations[relation_id] | |
| source_id = relation["source"] | |
| target_id = relation["target"] | |
| dependencies = { | |
| "entities": {source_id, target_id}, | |
| "relations": {relation_id} | |
| } | |
| if source_id not in self.entities or target_id not in self.entities: | |
| return {"error": f"Source or target entity for relation {relation_id} not found"} | |
| source_entity = self.entities[source_id] | |
| target_entity = self.entities[target_id] | |
| # Generate queries and reconstruct | |
| queries = self._generate_search_queries(relation, source_entity, target_entity) | |
| reconstructed_prompt = self._retrieve_and_reconstruct(relation, source_entity, target_entity, queries) | |
| return { | |
| "relation_id": relation_id, | |
| "relation_type": relation["type"], | |
| "source": {"id": source_id, "name": source_entity["name"], "type": source_entity["type"]}, | |
| "target": {"id": target_id, "name": target_entity["name"], "type": target_entity["type"]}, | |
| "reconstructed_prompt": reconstructed_prompt, | |
| "reconstruction_method": "rag_based", | |
| "search_queries_used": queries, | |
| "dependencies": {"entities": list(dependencies["entities"]), "relations": list(dependencies["relations"])} | |
| } | |
| def reconstruct_relations(self, parallel: bool = True, max_workers: int = 4) -> List[Dict[str, Any]]: | |
| """ | |
| Reconstruct all relations using RAG-based approach with optional parallel processing. | |
| Args: | |
| parallel: Whether to process relations in parallel (default: True) | |
| max_workers: Maximum number of parallel workers (default: 4) | |
| Returns: | |
| List of dictionaries containing reconstructed prompts for each relation | |
| """ | |
| # Filter valid relations first | |
| valid_relations = [] | |
| for relation_id, relation in self.relations.items(): | |
| if relation.get("type") in ["REQUIRES_TOOL", "NEXT"]: | |
| continue | |
| source_id = relation["source"] | |
| target_id = relation["target"] | |
| if source_id in self.entities and target_id in self.entities: | |
| valid_relations.append((relation_id, relation)) | |
| if not valid_relations: | |
| return [] | |
| if not parallel or len(valid_relations) <= 1: | |
| # Sequential processing | |
| reconstructed_relations = [] | |
| for relation_id, relation in valid_relations: | |
| reconstructed = self.reconstruct_relation_prompt(relation_id) | |
| if "error" not in reconstructed: | |
| relation_entry = copy.deepcopy(relation) | |
| relation_entry["prompt"] = reconstructed.get("reconstructed_prompt", "") | |
| relation_entry["reconstruction_method"] = "rag_based" | |
| relation_entry["dependencies"] = reconstructed.get("dependencies", {"entities": [], "relations": []}) | |
| reconstructed_relations.append(relation_entry) | |
| return reconstructed_relations | |
| # Parallel processing | |
| reconstructed_relations = [] | |
| def process_relation(relation_tuple): | |
| relation_id, relation = relation_tuple | |
| reconstructed = self.reconstruct_relation_prompt(relation_id) | |
| if "error" not in reconstructed: | |
| relation_entry = copy.deepcopy(relation) | |
| relation_entry["prompt"] = reconstructed.get("reconstructed_prompt", "") | |
| relation_entry["reconstruction_method"] = "rag_based_parallel" | |
| relation_entry["dependencies"] = reconstructed.get("dependencies", {"entities": [], "relations": []}) | |
| return relation_entry | |
| else: | |
| logger.warning(f"Failed to reconstruct relation {relation_id}: {reconstructed.get('error', 'Unknown error')}") | |
| return None | |
| with ThreadPoolExecutor(max_workers=min(max_workers, len(valid_relations))) as executor: | |
| # Submit all reconstruction tasks | |
| future_to_relation = { | |
| executor.submit(process_relation, relation_tuple): relation_tuple[0] | |
| for relation_tuple in valid_relations | |
| } | |
| # Collect results as they complete | |
| for future in as_completed(future_to_relation): | |
| relation_id = future_to_relation[future] | |
| try: | |
| result = future.result() | |
| if result is not None: | |
| reconstructed_relations.append(result) | |
| logger.info(f"Completed parallel reconstruction for relation {relation_id}") | |
| except Exception as e: | |
| logger.error(f"Failed to reconstruct relation {relation_id} in parallel: {e}") | |
| return reconstructed_relations | |
| # Pure functions for external API compatibility | |
| def reconstruct_prompts_from_knowledge_graph_rag(knowledge_graph: Dict[str, Any], | |
| original_trace: str, | |
| llm_config: Optional[Dict] = None, | |
| parallel: bool = True, | |
| max_workers: int = 4) -> List[Dict[str, Any]]: | |
| """ | |
| Pure function to reconstruct prompts from knowledge graph using RAG approach. | |
| Args: | |
| knowledge_graph: Knowledge graph data with entities and relations | |
| original_trace: Original trace content for RAG vectorization | |
| llm_config: Optional LLM configuration for CrewAI agents | |
| parallel: Whether to process relations in parallel (default: True) | |
| max_workers: Maximum number of parallel workers (default: 4) | |
| Returns: | |
| List of dictionaries containing reconstructed prompts for each relation | |
| """ | |
| reconstructor = RagPromptReconstructor(knowledge_graph, original_trace, llm_config) | |
| return reconstructor.reconstruct_relations(parallel=parallel, max_workers=max_workers) | |
| def enrich_knowledge_graph_with_prompts_rag(knowledge_graph: Dict[str, Any], | |
| original_trace: str, | |
| llm_config: Optional[Dict] = None, | |
| parallel: bool = True, | |
| max_workers: int = 4) -> Dict[str, Any]: | |
| """ | |
| Pure function to enrich a knowledge graph with RAG-reconstructed prompts. | |
| Args: | |
| knowledge_graph: Knowledge graph data with entities and relations | |
| original_trace: Original trace content for RAG vectorization | |
| llm_config: Optional LLM configuration for CrewAI agents | |
| parallel: Whether to process relations in parallel (default: True) | |
| max_workers: Maximum number of parallel workers (default: 4) | |
| Returns: | |
| Enhanced knowledge graph with prompt_reconstructions field containing | |
| the RAG-reconstructed prompts for each relation | |
| """ | |
| reconstructor = RagPromptReconstructor(knowledge_graph, original_trace, llm_config) | |
| reconstructed_relations = reconstructor.reconstruct_relations(parallel=parallel, max_workers=max_workers) | |
| # Create enhanced knowledge graph | |
| enhanced_kg = copy.deepcopy(knowledge_graph) | |
| enhanced_kg["prompt_reconstructions"] = reconstructed_relations | |
| enhanced_kg["reconstruction_metadata"] = { | |
| "method": "rag_based", | |
| "reconstructed_at": datetime.now().isoformat(), | |
| "total_relations_processed": len(reconstructed_relations), | |
| "original_trace_length": len(original_trace) | |
| } | |
| return enhanced_kg | |