Spaces:
Running
Running
| """ | |
| Original Knowledge Extraction Method (3-Task Approach) | |
| Copied from core/agent_monitoring.py and adapted for evaluation framework. | |
| Uses the original 3-task CrewAI approach with separate agents for entity extraction, | |
| relationship analysis, and knowledge graph building. | |
| """ | |
| # Import the LiteLLM fix FIRST, before any other imports that might use LiteLLM | |
| import os | |
| import sys | |
| # Add the parent directory to the path to ensure imports work correctly | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) | |
| import json | |
| import logging | |
| from typing import Any, Dict | |
| from crewai import Agent, Crew, Process, Task | |
| from evaluation.knowledge_extraction.baselines.base_method import BaseKnowledgeExtractionMethod | |
| from evaluation.knowledge_extraction.utils.models import Entity, KnowledgeGraph, Relation | |
| # Import shared prompt templates | |
| from evaluation.knowledge_extraction.utils.prompts import ( | |
| ENTITY_EXTRACTION_INSTRUCTION_PROMPT, | |
| ENTITY_EXTRACTION_SYSTEM_PROMPT, | |
| GRAPH_BUILDER_INSTRUCTION_PROMPT, | |
| GRAPH_BUILDER_SYSTEM_PROMPT, | |
| RELATION_EXTRACTION_INSTRUCTION_PROMPT, | |
| RELATION_EXTRACTION_SYSTEM_PROMPT, | |
| ) | |
| from utils.fix_litellm_stop_param import * # This applies the patches # noqa: F403 | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Set higher log levels for noisy libraries | |
| logging.getLogger("openai").setLevel(logging.WARNING) | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("litellm").setLevel(logging.WARNING) | |
| logging.getLogger("chromadb").setLevel(logging.WARNING) | |
| # Set default verbosity level | |
| verbose_level = 0 | |
| # Set environment variables | |
| os.environ["OPENAI_MODEL_NAME"] = "gpt-5-mini" | |
| class OriginalKnowledgeExtractionMethod(BaseKnowledgeExtractionMethod): | |
| """Original 3-task knowledge extraction method using CrewAI.""" | |
| def __init__(self, **kwargs): | |
| super().__init__("original_method", **kwargs) | |
| self._setup_agents_and_tasks() | |
| def _setup_agents_and_tasks(self): | |
| """Set up the CrewAI agents and tasks.""" | |
| # Create agents | |
| self.entity_extractor_agent = Agent( | |
| role="Entity Extractor", | |
| goal="Identify and categorize entities from agent system data sources with clear descriptions", | |
| backstory=ENTITY_EXTRACTION_SYSTEM_PROMPT, | |
| verbose=bool(verbose_level), | |
| llm=os.environ["OPENAI_MODEL_NAME"] | |
| ) | |
| self.relationship_analyzer_agent = Agent( | |
| role="Relationship Analyzer", | |
| goal="Discover standard relationships between entities in the system using only predefined relationship types", | |
| backstory=RELATION_EXTRACTION_SYSTEM_PROMPT, | |
| verbose=bool(verbose_level), | |
| llm=os.environ["OPENAI_MODEL_NAME"] | |
| ) | |
| self.knowledge_graph_builder_agent = Agent( | |
| role="Knowledge Graph Builder", | |
| goal="Structure entities and relationships into a comprehensive knowledge graph with overall system assessment", | |
| backstory=GRAPH_BUILDER_SYSTEM_PROMPT, | |
| verbose=bool(verbose_level), | |
| llm=os.environ["OPENAI_MODEL_NAME"] | |
| ) | |
| # Create tasks | |
| self.entity_extraction_task = Task( | |
| description=ENTITY_EXTRACTION_INSTRUCTION_PROMPT, | |
| agent=self.entity_extractor_agent, | |
| expected_output="A structured list of entities with their properties", | |
| output_pydantic=Entity, | |
| ) | |
| self.relationship_analysis_task = Task( | |
| description=RELATION_EXTRACTION_INSTRUCTION_PROMPT, | |
| agent=self.relationship_analyzer_agent, | |
| expected_output="A structured list of relationships between entities", | |
| context=[self.entity_extraction_task], | |
| output_pydantic=Relation, | |
| ) | |
| self.knowledge_graph_creation_task = Task( | |
| description=GRAPH_BUILDER_INSTRUCTION_PROMPT, | |
| agent=self.knowledge_graph_builder_agent, | |
| expected_output="A complete knowledge graph saved to JSON", | |
| context=[self.entity_extraction_task, self.relationship_analysis_task], | |
| output_pydantic=KnowledgeGraph, | |
| ) | |
| # Create crew | |
| self.agent_monitoring_crew = Crew( | |
| agents=[self.entity_extractor_agent, self.relationship_analyzer_agent, self.knowledge_graph_builder_agent], | |
| tasks=[self.entity_extraction_task, self.relationship_analysis_task, self.knowledge_graph_creation_task], | |
| verbose=bool(verbose_level), | |
| memory=False, | |
| planning=False, | |
| process=Process.sequential, | |
| ) | |
| def process_text(self, text: str) -> Dict[str, Any]: | |
| """ | |
| Process input text using the original 3-task CrewAI approach. | |
| Args: | |
| text: Input text to process | |
| Returns: | |
| Dictionary with kg_data, metadata, success, and optional error | |
| """ | |
| try: | |
| # Run the crew with proper input mechanism | |
| result = self.agent_monitoring_crew.kickoff(inputs={"input_data": text}) | |
| # Extract the knowledge graph from the result | |
| if hasattr(result, 'pydantic') and result.pydantic: | |
| kg_data = result.pydantic.dict() | |
| elif hasattr(result, 'raw'): | |
| # Try to parse as JSON | |
| try: | |
| kg_data = json.loads(result.raw) | |
| except: # noqa: E722 | |
| kg_data = {"entities": [], "relations": [], "error": "Failed to parse result"} | |
| else: | |
| kg_data = {"entities": [], "relations": [], "error": "Unknown result format"} | |
| return { | |
| "success": True, | |
| "kg_data": kg_data, | |
| "metadata": { | |
| "approach": "original_3_task", | |
| "tasks_executed": 3, | |
| "agents_used": 3, | |
| "method": self.method_name | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in original knowledge extraction method: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "kg_data": {"entities": [], "relations": []}, | |
| "metadata": { | |
| "approach": "original_3_task", | |
| "tasks_executed": 0, | |
| "agents_used": 0, | |
| "method": self.method_name | |
| } | |
| } | |
| def extract_knowledge_graph(self, trace_data: str) -> Dict[str, Any]: | |
| """ | |
| Extract knowledge graph from trace data. | |
| Args: | |
| trace_data: Agent trace data as JSON string | |
| Returns: | |
| Dictionary with entities and relations | |
| """ | |
| # Pass the JSON string directly to process_text without re-encoding | |
| result = self.process_text(trace_data) | |
| # Return just the knowledge graph data | |
| if result.get("success", False): | |
| return result.get("kg_data", {"entities": [], "relations": []}) | |
| else: | |
| # Return empty knowledge graph on failure | |
| return {"entities": [], "relations": []} | |