""" Hybrid Knowledge Extraction Method (2-Task Approach) A hybrid approach that combines the efficiency of the unified method with the thoroughness of the original method. Uses 2 tasks: one for entity extraction and relationship analysis combined, and another for knowledge graph validation and enhancement. """ # Import the LiteLLM fix FIRST, before any other imports that might use LiteLLM import os import sys # Add the parent directory to the path to ensure imports work correctly sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) import json import logging import time from datetime import datetime from typing import Any, Dict from crewai import Agent, Crew, Process, Task from evaluation.knowledge_extraction.baselines.base_method import BaseKnowledgeExtractionMethod from evaluation.knowledge_extraction.baselines.unified_method import KnowledgeGraph # Import shared prompt templates from evaluation.knowledge_extraction.utils.prompts import ( ENTITY_EXTRACTION_INSTRUCTION_PROMPT, ENTITY_EXTRACTION_SYSTEM_PROMPT, RELATION_EXTRACTION_INSTRUCTION_PROMPT, RELATION_EXTRACTION_SYSTEM_PROMPT, ) from utils.fix_litellm_stop_param import * # This applies the patches # noqa: F403 # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Set higher log levels for noisy libraries logging.getLogger("openai").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("litellm").setLevel(logging.WARNING) logging.getLogger("chromadb").setLevel(logging.WARNING) # Set default verbosity level verbose_level = 0 # Set environment variables os.environ["OPENAI_MODEL_NAME"] = "gpt-5-mini" class HybridKnowledgeExtractionMethod(BaseKnowledgeExtractionMethod): """Hybrid 2-task knowledge extraction method using CrewAI.""" def __init__(self, **kwargs): super().__init__("hybrid_method", **kwargs) self._setup_agents_and_tasks() def _setup_agents_and_tasks(self): """Set up the CrewAI agents and tasks.""" # Create extraction agent (combines entity and relationship extraction) self.extraction_agent = Agent( role="Knowledge Extraction Specialist", goal="Extract comprehensive entities and relationships from agent system data efficiently", backstory=f"""{ENTITY_EXTRACTION_SYSTEM_PROMPT} {RELATION_EXTRACTION_SYSTEM_PROMPT}""", verbose=bool(verbose_level), llm=os.environ["OPENAI_MODEL_NAME"] ) # Create validation and enhancement agent self.validation_agent = Agent( role="Knowledge Graph Validator and Enhancer", goal="Validate, enhance, and structure extracted knowledge into a comprehensive knowledge graph", backstory="""You are a knowledge graph validation and enhancement specialist who ensures the quality, completeness, and coherence of extracted knowledge graphs. You take raw extracted entities and relationships and transform them into polished, well-structured knowledge graphs. Your expertise includes: - Validating entity and relationship consistency - Identifying and filling gaps in knowledge extraction - Ensuring proper connectivity and graph coherence - Creating meaningful system summaries and assessments - Optimizing knowledge graph structure for clarity and usability You serve as the quality assurance layer that transforms good extractions into excellent knowledge graphs.""", verbose=bool(verbose_level), llm=os.environ["OPENAI_MODEL_NAME"] ) # Create extraction task self.extraction_task = Task( description=f""" {ENTITY_EXTRACTION_INSTRUCTION_PROMPT} {RELATION_EXTRACTION_INSTRUCTION_PROMPT} """, agent=self.extraction_agent, expected_output="Structured extraction with entities, relations, and preliminary analysis", ) # Create validation and enhancement task self.validation_task = Task( description=""" Validate, enhance, and structure the extracted knowledge into a comprehensive knowledge graph. Take the extracted entities and relationships from the previous task and: 1. VALIDATION AND ENHANCEMENT: - Verify all entities have proper IDs, types, names, and descriptions - Ensure all relationships use correct predefined types - Check that every entity connects to at least one other entity - Fill any gaps in entity descriptions or relationship mappings - Validate that relationship directions and types are correct 2. CONNECTIVITY OPTIMIZATION: - Ensure no isolated entities (all must be connected) - Verify logical flow from inputs through processing to outputs - Add missing relationships if entities should be connected - Optimize relationship network for clarity and completeness 3. KNOWLEDGE GRAPH CONSTRUCTION: - Create descriptive system name (3-7 words) - Write comprehensive 2-3 sentence system summary explaining purpose, coordination, and value - Include metadata with timestamp, statistics, and processing information - Ensure all components are reachable (no isolated subgraphs) - Validate connectivity: inputs consumed, outputs produced, agents have roles 4. QUALITY ASSURANCE: - Double-check entity uniqueness and proper categorization - Verify relationship consistency and logical flow - Ensure system summary accurately reflects the extracted knowledge - Validate that the knowledge graph tells a coherent story Output a complete, validated KnowledgeGraph object with entities, relations, system_name, system_summary, and metadata. Ensure the knowledge graph is comprehensive, accurate, well-connected, and represents the system effectively. """, agent=self.validation_agent, expected_output="A complete, validated knowledge graph with entities, relations, and metadata", context=[self.extraction_task], output_pydantic=KnowledgeGraph, ) # Create crew self.hybrid_crew = Crew( agents=[self.extraction_agent, self.validation_agent], tasks=[self.extraction_task, self.validation_task], verbose=bool(verbose_level), memory=False, planning=False, process=Process.sequential, ) def process_text(self, text: str) -> Dict[str, Any]: """ Process input text using the hybrid 2-task CrewAI approach. Args: text: Input text to process Returns: Dictionary with kg_data, metadata, success, and optional error """ start_time = time.time() try: logger.info(f"process_text called with text length: {len(text)}") logger.info(f"text first 200 chars: {repr(text[:200])}") logger.info("Starting hybrid crew execution with input_data...") # Run the crew with proper input mechanism result = self.hybrid_crew.kickoff(inputs={"input_data": text}) logger.info(f"Crew execution completed, result type: {type(result)}") processing_time = time.time() - start_time # Extract the knowledge graph from the result if hasattr(result, 'pydantic') and result.pydantic: kg_data = result.pydantic.dict() elif hasattr(result, 'raw'): # Try to parse as JSON try: kg_data = json.loads(result.raw) except: # noqa: E722 kg_data = {"entities": [], "relations": [], "error": "Failed to parse result"} else: kg_data = {"entities": [], "relations": [], "error": "Unknown result format"} # Validate kg_data structure if not isinstance(kg_data, dict): raise ValueError("kg_data is not a dict after parsing") if not ("entities" in kg_data and "relations" in kg_data): raise ValueError("kg_data missing 'entities' or 'relations'") # Add metadata if "metadata" not in kg_data: kg_data["metadata"] = {} kg_data["metadata"]["processing_info"] = { "method": "hybrid_2_task", "processing_time_seconds": processing_time, "processed_at": datetime.now().isoformat(), "agent_count": 2, "task_count": 2, "api_calls": 2 } # Calculate statistics entity_count = len(kg_data.get("entities", [])) relation_count = len(kg_data.get("relations", [])) return { "success": True, "kg_data": kg_data, "metadata": { "approach": "hybrid_2_task", "tasks_executed": 2, "agents_used": 2, "method": self.method_name, "processing_time_seconds": processing_time, "entity_count": entity_count, "relation_count": relation_count, "entities_per_second": entity_count / processing_time if processing_time > 0 else 0, "relations_per_second": relation_count / processing_time if processing_time > 0 else 0, "api_calls": 2 } } except Exception as e: processing_time = time.time() - start_time logger.error(f"Error in hybrid knowledge extraction method: {e}") logger.error(f"Error type: {type(e).__name__}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") return { "success": False, "error": str(e), "kg_data": {"entities": [], "relations": []}, "metadata": { "approach": "hybrid_2_task", "tasks_executed": 0, "agents_used": 0, "method": self.method_name, "processing_time_seconds": processing_time, "api_calls": 2 } } def extract_knowledge_graph(self, trace_data: str) -> Dict[str, Any]: """ Extract knowledge graph from trace data. Args: trace_data: Agent trace data as JSON string Returns: Dictionary with entities and relations """ try: # Debug logging logger.info(f"extract_knowledge_graph called with trace_data type: {type(trace_data)}") if isinstance(trace_data, str): logger.info(f"trace_data length: {len(trace_data)}") logger.info(f"trace_data first 200 chars: {repr(trace_data[:200])}") # Pass the JSON string directly to process_text without re-encoding result = self.process_text(trace_data) # Return just the knowledge graph data if result.get("success", False): return result.get("kg_data", {"entities": [], "relations": []}) else: # Return empty knowledge graph on failure return {"entities": [], "relations": []} except Exception as e: logger.error(f"Error in extract_knowledge_graph: {e}") logger.error(f"trace_data type: {type(trace_data)}") if isinstance(trace_data, str): logger.error(f"trace_data content (first 200 chars): {repr(trace_data[:200])}") return {"entities": [], "relations": []}