""" Multi-Stage Clustering Knowledge Extraction Method Implements a multi-stage clustering approach inspired by KGGen research. This method performs initial extraction followed by iterative clustering of entities and relationships to improve semantic consistency and reduce redundancy. """ # Import the LiteLLM fix FIRST, before any other imports that might use LiteLLM import os import sys # Add the parent directory to the path to ensure imports work correctly sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) import logging import time from datetime import datetime from typing import Any, Dict from crewai import Agent, Crew, Process, Task from evaluation.knowledge_extraction.baselines.base_method import BaseKnowledgeExtractionMethod from evaluation.knowledge_extraction.utils.models import KnowledgeGraph # Import shared prompt templates from evaluation.knowledge_extraction.utils.prompts import ( ENTITY_EXTRACTION_INSTRUCTION_PROMPT, ENTITY_EXTRACTION_SYSTEM_PROMPT, GRAPH_BUILDER_SYSTEM_PROMPT, RELATION_EXTRACTION_INSTRUCTION_PROMPT, RELATION_EXTRACTION_SYSTEM_PROMPT, ) from utils.fix_litellm_stop_param import * # This applies the patches # noqa: F403 # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Set higher log levels for noisy libraries logging.getLogger("openai").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("litellm").setLevel(logging.WARNING) logging.getLogger("chromadb").setLevel(logging.WARNING) # Import models (copied from core) # Set default verbosity level verbose_level = 0 # Set environment variables os.environ["OPENAI_MODEL_NAME"] = "gpt-5-mini" class ClusteringKnowledgeExtractionMethod(BaseKnowledgeExtractionMethod): """Multi-stage clustering knowledge extraction method using CrewAI.""" def __init__(self, **kwargs): super().__init__("clustering_method", **kwargs) self._setup_agents_and_tasks() # Test comment for code change detection def _setup_agents_and_tasks(self): """Set up the CrewAI agents and tasks.""" # Create extraction agent (similar to unified approach) self.extraction_agent = Agent( role="Knowledge Graph Extractor", goal="Extract comprehensive entities and relationships from agent system data", backstory=f"{ENTITY_EXTRACTION_SYSTEM_PROMPT}\n\n{RELATION_EXTRACTION_SYSTEM_PROMPT}", verbose=bool(verbose_level), llm=os.environ["OPENAI_MODEL_NAME"], ) # Create clustering agent for entity deduplication self.entity_clustering_agent = Agent( role="Entity Clustering Specialist", goal="Identify and merge duplicate or similar entities to improve graph consistency", backstory="""You are an expert in entity resolution and clustering. You can identify when different entity mentions refer to the same real-world entity, even when they have slight variations in naming, description, or representation. You excel at: - Identifying semantic equivalence between entities - Merging entities with different tenses, plurality, or capitalization - Consolidating entities that represent the same concept - Maintaining entity relationships during clustering You ensure the final entity set is clean, consistent, and free of redundancy.""", verbose=bool(verbose_level), llm=os.environ["OPENAI_MODEL_NAME"], ) # Create relationship clustering agent self.relationship_clustering_agent = Agent( role="Relationship Clustering Specialist", goal="Identify and merge duplicate or similar relationships to improve graph coherence", backstory="""You are an expert in relationship analysis and clustering. You can identify when different relationship expressions refer to the same underlying connection between entities. You excel at: - Identifying semantically equivalent relationships - Merging relationships with different phrasings but same meaning - Consolidating relationships that represent the same connection type - Ensuring relationship consistency across the knowledge graph You maintain the integrity of entity connections while improving relationship clarity.""", verbose=bool(verbose_level), llm=os.environ["OPENAI_MODEL_NAME"], ) # Create validation agent self.validation_agent = Agent( role="Knowledge Graph Validator", goal="Validate and finalize the clustered knowledge graph for quality and completeness", backstory=GRAPH_BUILDER_SYSTEM_PROMPT, verbose=bool(verbose_level), llm=os.environ["OPENAI_MODEL_NAME"], ) # Create extraction task self.extraction_task = Task( description=f""" Extract comprehensive entities and relationships from the provided agent system data. {ENTITY_EXTRACTION_INSTRUCTION_PROMPT} Also extract relationships: {RELATION_EXTRACTION_INSTRUCTION_PROMPT} Output a complete initial knowledge graph with all extracted entities and relationships. Focus on thoroughness and accuracy - clustering will happen in subsequent steps. """, agent=self.extraction_agent, expected_output="A complete initial knowledge graph with comprehensive entities and relationships", output_pydantic=KnowledgeGraph, ) # Create entity clustering task self.entity_clustering_task = Task( description=""" Analyze the extracted entities and identify clusters of entities that represent the same concept. You will receive the knowledge graph from the previous extraction task. Your task is to: 1. ENTITY ANALYSIS - Group similar entities: - Identify entities with same meaning but different expressions - Look for variations in tense, plurality, capitalization - Find entities that represent the same real-world concept - Consider semantic similarity and contextual equivalence 2. CLUSTERING DECISIONS - For each cluster: - Select the most representative entity as the canonical form - Merge descriptions and properties from all cluster members - Preserve all relevant information from clustered entities - Maintain entity type consistency 3. RELATIONSHIP UPDATES - Update relationships: - Replace clustered entity IDs with canonical entity IDs - Ensure all relationships remain valid after clustering - Remove duplicate relationships that may result from clustering Output an updated knowledge graph with clustered entities and updated relationships. Ensure no information is lost during the clustering process. """, agent=self.entity_clustering_agent, expected_output="Knowledge graph with clustered entities and updated relationships", context=[self.extraction_task], output_pydantic=KnowledgeGraph, ) # Create relationship clustering task self.relationship_clustering_task = Task( description=""" Analyze the relationships and identify clusters of relationships that represent the same connection type. You will receive the knowledge graph from the previous entity clustering task. Your task is to: 1. RELATIONSHIP ANALYSIS - Group similar relationships: - Identify relationships with same meaning but different expressions - Look for variations in phrasing, tense, or description - Find relationships that represent the same connection type - Consider semantic equivalence between relationship descriptions 2. CLUSTERING DECISIONS - For each relationship cluster: - Select the most clear and representative relationship type - Merge descriptions from all cluster members - Preserve the most informative relationship description - Maintain relationship directionality and constraints 3. GRAPH OPTIMIZATION - Optimize the relationship structure: - Remove redundant relationships between same entity pairs - Ensure relationship consistency across the graph - Maintain logical coherence in relationship types Output an optimized knowledge graph with clustered relationships and improved consistency. """, agent=self.relationship_clustering_agent, expected_output="Knowledge graph with clustered relationships and improved consistency", context=[self.entity_clustering_task], output_pydantic=KnowledgeGraph, ) # Create validation task self.validation_task = Task( description=""" Validate and finalize the clustered knowledge graph for quality and completeness. You will receive the knowledge graph from the previous relationship clustering task. Your task is to: 1. QUALITY VALIDATION - Check graph quality: - Ensure all entities are properly connected - Validate relationship consistency and logic - Check for orphaned entities or broken connections - Verify entity-relationship type compatibility 2. COMPLETENESS CHECK - Ensure completeness: - Verify all important entities are captured - Check that key relationships are represented - Ensure system functionality is properly modeled - Validate that the graph tells a complete story 3. FINALIZATION - Create final knowledge graph: - Generate descriptive system name (3-7 words) - Write comprehensive 2-3 sentence system summary - Include metadata with processing statistics - Ensure all components are reachable and connected Output the final, validated knowledge graph ready for use. """, agent=self.validation_agent, expected_output="Final validated knowledge graph with system summary and metadata", context=[self.relationship_clustering_task], output_pydantic=KnowledgeGraph, ) # Create crew self.clustering_crew = Crew( agents=[self.extraction_agent, self.entity_clustering_agent, self.relationship_clustering_agent, self.validation_agent], tasks=[self.extraction_task, self.entity_clustering_task, self.relationship_clustering_task, self.validation_task], verbose=bool(verbose_level), memory=False, planning=False, process=Process.sequential, ) def _calculate_token_cost(self, total_tokens: int, prompt_tokens: int, completion_tokens: int, model_name: str) -> float: """ Calculate token cost based on model pricing. Args: total_tokens: Total number of tokens prompt_tokens: Number of input/prompt tokens completion_tokens: Number of output/completion tokens model_name: Name of the model used Returns: Total cost in USD """ # Model pricing per 1k tokens (as of 2024) pricing = { "gpt-4o-mini": {"input": 0.00015, "output": 0.0006}, "gpt-4o": {"input": 0.005, "output": 0.015}, "gpt-4": {"input": 0.03, "output": 0.06}, "gpt-4-turbo": {"input": 0.01, "output": 0.03}, "gpt-3.5-turbo": {"input": 0.0015, "output": 0.002}, "claude-3-opus": {"input": 0.015, "output": 0.075}, "claude-3-sonnet": {"input": 0.003, "output": 0.015}, "claude-3-haiku": {"input": 0.00025, "output": 0.00125}, "claude-3.5-sonnet": {"input": 0.003, "output": 0.015}, "claude-3.5-haiku": {"input": 0.0008, "output": 0.004}, } # Normalize model name to match pricing keys model_key = model_name.lower() if "gpt-5-mini" in model_key: model_key = "gpt-5-mini" elif "gpt-4o-mini" in model_key: model_key = "gpt-4o-mini" elif "gpt-4o" in model_key: model_key = "gpt-4o" elif "gpt-4-turbo" in model_key or "gpt-4-1106" in model_key: model_key = "gpt-4-turbo" elif "gpt-4" in model_key: model_key = "gpt-4" elif "gpt-3.5" in model_key: model_key = "gpt-3.5-turbo" elif "claude-3.5-sonnet" in model_key: model_key = "claude-3.5-sonnet" elif "claude-3.5-haiku" in model_key: model_key = "claude-3.5-haiku" elif "claude-3-opus" in model_key: model_key = "claude-3-opus" elif "claude-3-sonnet" in model_key: model_key = "claude-3-sonnet" elif "claude-3-haiku" in model_key: model_key = "claude-3-haiku" if model_key not in pricing: # Default to gpt-5-mini pricing if model not found model_key = "gpt-5-mini" rates = pricing[model_key] # Calculate cost: (tokens / 1000) * rate_per_1k_tokens input_cost = (prompt_tokens / 1000) * rates["input"] output_cost = (completion_tokens / 1000) * rates["output"] return input_cost + output_cost def process_text(self, text: str) -> Dict[str, Any]: """ Process input text using the multi-stage clustering approach. Args: text: Input text to process Returns: Dictionary with kg_data, metadata, success, and optional error """ start_time = time.time() try: logger.info(f"process_text called with text length: {len(text)}") logger.info(f"text first 200 chars: {repr(text[:200])}") logger.info("Starting clustering crew execution with input_data...") # Run the crew with proper input mechanism result = self.clustering_crew.kickoff(inputs={"input_data": text}) logger.info(f"Clustering crew execution completed, result type: {type(result)}") processing_time = time.time() - start_time # Extract token usage from crew token_usage = { "total_tokens": 0, "prompt_tokens": 0, "completion_tokens": 0, "total_cost_usd": 0.0, "model_used": "gpt-4o-mini", "usage_available": False, } try: if hasattr(self.clustering_crew, "usage_metrics") and self.clustering_crew.usage_metrics: usage_metrics = self.clustering_crew.usage_metrics logger.info(f"Found usage metrics: {usage_metrics}") if isinstance(usage_metrics, dict): token_usage.update( { "total_tokens": usage_metrics.get("total_tokens", 0), "prompt_tokens": usage_metrics.get("prompt_tokens", 0), "completion_tokens": usage_metrics.get("completion_tokens", 0), "total_cost_usd": float(usage_metrics.get("total_cost", 0.0)), "model_used": usage_metrics.get("model", "gpt-4o-mini"), "usage_available": True, } ) # If cost is 0.0, calculate it manually if token_usage["total_cost_usd"] == 0.0 and token_usage["total_tokens"] > 0: calculated_cost = self._calculate_token_cost( token_usage["total_tokens"], token_usage["prompt_tokens"], token_usage["completion_tokens"], token_usage["model_used"] ) token_usage["total_cost_usd"] = calculated_cost logger.info( f"💰 Calculated cost: ${calculated_cost:.4f} for {token_usage['total_tokens']} tokens ({token_usage['model_used']})" ) else: # Handle object-style usage metrics token_usage.update( { "total_tokens": getattr(usage_metrics, "total_tokens", 0), "prompt_tokens": getattr(usage_metrics, "prompt_tokens", 0), "completion_tokens": getattr(usage_metrics, "completion_tokens", 0), "total_cost_usd": float(getattr(usage_metrics, "total_cost", 0.0)), "model_used": getattr(usage_metrics, "model", "gpt-4o-mini"), "usage_available": True, } ) # If cost is 0.0, calculate it manually if token_usage["total_cost_usd"] == 0.0 and token_usage["total_tokens"] > 0: calculated_cost = self._calculate_token_cost( token_usage["total_tokens"], token_usage["prompt_tokens"], token_usage["completion_tokens"], token_usage["model_used"] ) token_usage["total_cost_usd"] = calculated_cost logger.info( f"💰 Calculated cost: ${calculated_cost:.4f} for {token_usage['total_tokens']} tokens ({token_usage['model_used']})" ) else: logger.warning("No usage metrics found in crew") except Exception as e: logger.error(f"Error extracting token usage: {e}") # Extract the knowledge graph from the result if hasattr(result, "pydantic") and result.pydantic: kg_data = result.pydantic.dict() logger.info( f"Successfully extracted KG with {len(kg_data.get('entities', []))} entities and {len(kg_data.get('relations', []))} relations" ) # Add processing metadata if "metadata" not in kg_data: kg_data["metadata"] = {} kg_data["metadata"].update( { "timestamp": datetime.now().isoformat(), "processing_info": { "method": "multi_stage_clustering", "processing_time_seconds": processing_time, "processed_at": datetime.now().isoformat(), "agent_count": 4, "task_count": 4, "stages": ["extraction", "entity_clustering", "relationship_clustering", "validation"], }, "token_usage": token_usage, } ) return {"kg_data": kg_data, "metadata": kg_data["metadata"], "token_usage": token_usage, "success": True} else: # Handle case where result doesn't have pydantic attribute logger.warning(f"Result doesn't have pydantic attribute, result type: {type(result)}") if hasattr(result, "raw"): logger.info(f"Raw result: {result.raw[:500]}...") return { "kg_data": None, "metadata": { "timestamp": datetime.now().isoformat(), "processing_time_seconds": processing_time, "method": "multi_stage_clustering", "token_usage": token_usage, }, "token_usage": token_usage, "success": False, "error": f"Failed to extract pydantic result from crew output: {type(result)}", } except Exception as e: processing_time = time.time() - start_time logger.error(f"Error in clustering method processing: {e}") # Try to extract token usage even on error token_usage = { "total_tokens": 0, "prompt_tokens": 0, "completion_tokens": 0, "total_cost_usd": 0.0, "model_used": "gpt-4o-mini", "usage_available": False, } try: if hasattr(self.clustering_crew, "usage_metrics") and self.clustering_crew.usage_metrics: usage_metrics = self.clustering_crew.usage_metrics if isinstance(usage_metrics, dict): token_usage.update( { "total_tokens": usage_metrics.get("total_tokens", 0), "prompt_tokens": usage_metrics.get("prompt_tokens", 0), "completion_tokens": usage_metrics.get("completion_tokens", 0), "total_cost_usd": float(usage_metrics.get("total_cost", 0.0)), "model_used": usage_metrics.get("model", "gpt-4o-mini"), "usage_available": True, } ) # If cost is 0.0, calculate it manually if token_usage["total_cost_usd"] == 0.0 and token_usage["total_tokens"] > 0: calculated_cost = self._calculate_token_cost( token_usage["total_tokens"], token_usage["prompt_tokens"], token_usage["completion_tokens"], token_usage["model_used"] ) token_usage["total_cost_usd"] = calculated_cost logger.info( f"💰 Calculated cost: ${calculated_cost:.4f} for {token_usage['total_tokens']} tokens ({token_usage['model_used']})" ) else: # Handle object-style usage metrics token_usage.update( { "total_tokens": getattr(usage_metrics, "total_tokens", 0), "prompt_tokens": getattr(usage_metrics, "prompt_tokens", 0), "completion_tokens": getattr(usage_metrics, "completion_tokens", 0), "total_cost_usd": float(getattr(usage_metrics, "total_cost", 0.0)), "model_used": getattr(usage_metrics, "model", "gpt-4o-mini"), "usage_available": True, } ) # If cost is 0.0, calculate it manually if token_usage["total_cost_usd"] == 0.0 and token_usage["total_tokens"] > 0: calculated_cost = self._calculate_token_cost( token_usage["total_tokens"], token_usage["prompt_tokens"], token_usage["completion_tokens"], token_usage["model_used"] ) token_usage["total_cost_usd"] = calculated_cost logger.info( f"💰 Calculated cost: ${calculated_cost:.4f} for {token_usage['total_tokens']} tokens ({token_usage['model_used']})" ) except Exception as e: logger.error(f"Error extracting token usage: {e}") pass return { "kg_data": None, "metadata": { "timestamp": datetime.now().isoformat(), "processing_time_seconds": processing_time, "method": "multi_stage_clustering", "token_usage": token_usage, }, "token_usage": token_usage, "success": False, "error": str(e), } def extract_knowledge_graph(self, trace_data: str) -> Dict[str, Any]: """ Extract knowledge graph from trace data using multi-stage clustering. Args: trace_data: Input trace data as string Returns: Dictionary containing the extracted knowledge graph """ logger.info(f"extract_knowledge_graph called with trace_data type: {type(trace_data)}") logger.info(f"trace_data length: {len(trace_data)}") logger.info(f"trace_data first 200 chars: {repr(trace_data[:200])}") # Process the text using our clustering approach result = self.process_text(trace_data) if result["success"] and result["kg_data"]: logger.info("Successfully processed trace data") return result["kg_data"] else: logger.error(f"Failed to process trace data: {result.get('error', 'Unknown error')}") # Return a minimal structure to avoid breaking the evaluation return { "entities": [], "relations": [], "system_name": "Failed Extraction", "system_summary": "Knowledge graph extraction failed", "metadata": result.get("metadata", {}), }