""" Original Knowledge Extraction Method (3-Task Approach) Copied from core/agent_monitoring.py and adapted for evaluation framework. Uses the original 3-task CrewAI approach with separate agents for entity extraction, relationship analysis, and knowledge graph building. """ # Import the LiteLLM fix FIRST, before any other imports that might use LiteLLM import os import sys # Add the parent directory to the path to ensure imports work correctly sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) import json import logging from typing import Any, Dict from crewai import Agent, Crew, Process, Task from evaluation.knowledge_extraction.baselines.base_method import BaseKnowledgeExtractionMethod from evaluation.knowledge_extraction.utils.models import Entity, KnowledgeGraph, Relation # Import shared prompt templates from evaluation.knowledge_extraction.utils.prompts import ( ENTITY_EXTRACTION_INSTRUCTION_PROMPT, ENTITY_EXTRACTION_SYSTEM_PROMPT, GRAPH_BUILDER_INSTRUCTION_PROMPT, GRAPH_BUILDER_SYSTEM_PROMPT, RELATION_EXTRACTION_INSTRUCTION_PROMPT, RELATION_EXTRACTION_SYSTEM_PROMPT, ) from utils.fix_litellm_stop_param import * # This applies the patches # noqa: F403 # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Set higher log levels for noisy libraries logging.getLogger("openai").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("litellm").setLevel(logging.WARNING) logging.getLogger("chromadb").setLevel(logging.WARNING) # Set default verbosity level verbose_level = 0 # Set environment variables os.environ["OPENAI_MODEL_NAME"] = "gpt-5-mini" class OriginalKnowledgeExtractionMethod(BaseKnowledgeExtractionMethod): """Original 3-task knowledge extraction method using CrewAI.""" def __init__(self, **kwargs): super().__init__("original_method", **kwargs) self._setup_agents_and_tasks() def _setup_agents_and_tasks(self): """Set up the CrewAI agents and tasks.""" # Create agents self.entity_extractor_agent = Agent( role="Entity Extractor", goal="Identify and categorize entities from agent system data sources with clear descriptions", backstory=ENTITY_EXTRACTION_SYSTEM_PROMPT, verbose=bool(verbose_level), llm=os.environ["OPENAI_MODEL_NAME"] ) self.relationship_analyzer_agent = Agent( role="Relationship Analyzer", goal="Discover standard relationships between entities in the system using only predefined relationship types", backstory=RELATION_EXTRACTION_SYSTEM_PROMPT, verbose=bool(verbose_level), llm=os.environ["OPENAI_MODEL_NAME"] ) self.knowledge_graph_builder_agent = Agent( role="Knowledge Graph Builder", goal="Structure entities and relationships into a comprehensive knowledge graph with overall system assessment", backstory=GRAPH_BUILDER_SYSTEM_PROMPT, verbose=bool(verbose_level), llm=os.environ["OPENAI_MODEL_NAME"] ) # Create tasks self.entity_extraction_task = Task( description=ENTITY_EXTRACTION_INSTRUCTION_PROMPT, agent=self.entity_extractor_agent, expected_output="A structured list of entities with their properties", output_pydantic=Entity, ) self.relationship_analysis_task = Task( description=RELATION_EXTRACTION_INSTRUCTION_PROMPT, agent=self.relationship_analyzer_agent, expected_output="A structured list of relationships between entities", context=[self.entity_extraction_task], output_pydantic=Relation, ) self.knowledge_graph_creation_task = Task( description=GRAPH_BUILDER_INSTRUCTION_PROMPT, agent=self.knowledge_graph_builder_agent, expected_output="A complete knowledge graph saved to JSON", context=[self.entity_extraction_task, self.relationship_analysis_task], output_pydantic=KnowledgeGraph, ) # Create crew self.agent_monitoring_crew = Crew( agents=[self.entity_extractor_agent, self.relationship_analyzer_agent, self.knowledge_graph_builder_agent], tasks=[self.entity_extraction_task, self.relationship_analysis_task, self.knowledge_graph_creation_task], verbose=bool(verbose_level), memory=False, planning=False, process=Process.sequential, ) def process_text(self, text: str) -> Dict[str, Any]: """ Process input text using the original 3-task CrewAI approach. Args: text: Input text to process Returns: Dictionary with kg_data, metadata, success, and optional error """ try: # Run the crew with proper input mechanism result = self.agent_monitoring_crew.kickoff(inputs={"input_data": text}) # Extract the knowledge graph from the result if hasattr(result, 'pydantic') and result.pydantic: kg_data = result.pydantic.dict() elif hasattr(result, 'raw'): # Try to parse as JSON try: kg_data = json.loads(result.raw) except: # noqa: E722 kg_data = {"entities": [], "relations": [], "error": "Failed to parse result"} else: kg_data = {"entities": [], "relations": [], "error": "Unknown result format"} return { "success": True, "kg_data": kg_data, "metadata": { "approach": "original_3_task", "tasks_executed": 3, "agents_used": 3, "method": self.method_name } } except Exception as e: logger.error(f"Error in original knowledge extraction method: {e}") return { "success": False, "error": str(e), "kg_data": {"entities": [], "relations": []}, "metadata": { "approach": "original_3_task", "tasks_executed": 0, "agents_used": 0, "method": self.method_name } } def extract_knowledge_graph(self, trace_data: str) -> Dict[str, Any]: """ Extract knowledge graph from trace data. Args: trace_data: Agent trace data as JSON string Returns: Dictionary with entities and relations """ # Pass the JSON string directly to process_text without re-encoding result = self.process_text(trace_data) # Return just the knowledge graph data if result.get("success", False): return result.get("kg_data", {"entities": [], "relations": []}) else: # Return empty knowledge graph on failure return {"entities": [], "relations": []}