wu981526092's picture
add
7bc750c
"""
Original Knowledge Extraction Method (3-Task Approach)
Copied from core/agent_monitoring.py and adapted for evaluation framework.
Uses the original 3-task CrewAI approach with separate agents for entity extraction,
relationship analysis, and knowledge graph building.
"""
# Import the LiteLLM fix FIRST, before any other imports that might use LiteLLM
import os
import sys
# Add the parent directory to the path to ensure imports work correctly
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
import json
import logging
from typing import Any, Dict
from crewai import Agent, Crew, Process, Task
from evaluation.knowledge_extraction.baselines.base_method import BaseKnowledgeExtractionMethod
from evaluation.knowledge_extraction.utils.models import Entity, KnowledgeGraph, Relation
# Import shared prompt templates
from evaluation.knowledge_extraction.utils.prompts import (
ENTITY_EXTRACTION_INSTRUCTION_PROMPT,
ENTITY_EXTRACTION_SYSTEM_PROMPT,
GRAPH_BUILDER_INSTRUCTION_PROMPT,
GRAPH_BUILDER_SYSTEM_PROMPT,
RELATION_EXTRACTION_INSTRUCTION_PROMPT,
RELATION_EXTRACTION_SYSTEM_PROMPT,
)
from utils.fix_litellm_stop_param import * # This applies the patches # noqa: F403
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Set higher log levels for noisy libraries
logging.getLogger("openai").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("litellm").setLevel(logging.WARNING)
logging.getLogger("chromadb").setLevel(logging.WARNING)
# Set default verbosity level
verbose_level = 0
# Set environment variables
os.environ["OPENAI_MODEL_NAME"] = "gpt-5-mini"
class OriginalKnowledgeExtractionMethod(BaseKnowledgeExtractionMethod):
"""Original 3-task knowledge extraction method using CrewAI."""
def __init__(self, **kwargs):
super().__init__("original_method", **kwargs)
self._setup_agents_and_tasks()
def _setup_agents_and_tasks(self):
"""Set up the CrewAI agents and tasks."""
# Create agents
self.entity_extractor_agent = Agent(
role="Entity Extractor",
goal="Identify and categorize entities from agent system data sources with clear descriptions",
backstory=ENTITY_EXTRACTION_SYSTEM_PROMPT,
verbose=bool(verbose_level),
llm=os.environ["OPENAI_MODEL_NAME"]
)
self.relationship_analyzer_agent = Agent(
role="Relationship Analyzer",
goal="Discover standard relationships between entities in the system using only predefined relationship types",
backstory=RELATION_EXTRACTION_SYSTEM_PROMPT,
verbose=bool(verbose_level),
llm=os.environ["OPENAI_MODEL_NAME"]
)
self.knowledge_graph_builder_agent = Agent(
role="Knowledge Graph Builder",
goal="Structure entities and relationships into a comprehensive knowledge graph with overall system assessment",
backstory=GRAPH_BUILDER_SYSTEM_PROMPT,
verbose=bool(verbose_level),
llm=os.environ["OPENAI_MODEL_NAME"]
)
# Create tasks
self.entity_extraction_task = Task(
description=ENTITY_EXTRACTION_INSTRUCTION_PROMPT,
agent=self.entity_extractor_agent,
expected_output="A structured list of entities with their properties",
output_pydantic=Entity,
)
self.relationship_analysis_task = Task(
description=RELATION_EXTRACTION_INSTRUCTION_PROMPT,
agent=self.relationship_analyzer_agent,
expected_output="A structured list of relationships between entities",
context=[self.entity_extraction_task],
output_pydantic=Relation,
)
self.knowledge_graph_creation_task = Task(
description=GRAPH_BUILDER_INSTRUCTION_PROMPT,
agent=self.knowledge_graph_builder_agent,
expected_output="A complete knowledge graph saved to JSON",
context=[self.entity_extraction_task, self.relationship_analysis_task],
output_pydantic=KnowledgeGraph,
)
# Create crew
self.agent_monitoring_crew = Crew(
agents=[self.entity_extractor_agent, self.relationship_analyzer_agent, self.knowledge_graph_builder_agent],
tasks=[self.entity_extraction_task, self.relationship_analysis_task, self.knowledge_graph_creation_task],
verbose=bool(verbose_level),
memory=False,
planning=False,
process=Process.sequential,
)
def process_text(self, text: str) -> Dict[str, Any]:
"""
Process input text using the original 3-task CrewAI approach.
Args:
text: Input text to process
Returns:
Dictionary with kg_data, metadata, success, and optional error
"""
try:
# Run the crew with proper input mechanism
result = self.agent_monitoring_crew.kickoff(inputs={"input_data": text})
# Extract the knowledge graph from the result
if hasattr(result, 'pydantic') and result.pydantic:
kg_data = result.pydantic.dict()
elif hasattr(result, 'raw'):
# Try to parse as JSON
try:
kg_data = json.loads(result.raw)
except: # noqa: E722
kg_data = {"entities": [], "relations": [], "error": "Failed to parse result"}
else:
kg_data = {"entities": [], "relations": [], "error": "Unknown result format"}
return {
"success": True,
"kg_data": kg_data,
"metadata": {
"approach": "original_3_task",
"tasks_executed": 3,
"agents_used": 3,
"method": self.method_name
}
}
except Exception as e:
logger.error(f"Error in original knowledge extraction method: {e}")
return {
"success": False,
"error": str(e),
"kg_data": {"entities": [], "relations": []},
"metadata": {
"approach": "original_3_task",
"tasks_executed": 0,
"agents_used": 0,
"method": self.method_name
}
}
def extract_knowledge_graph(self, trace_data: str) -> Dict[str, Any]:
"""
Extract knowledge graph from trace data.
Args:
trace_data: Agent trace data as JSON string
Returns:
Dictionary with entities and relations
"""
# Pass the JSON string directly to process_text without re-encoding
result = self.process_text(trace_data)
# Return just the knowledge graph data
if result.get("success", False):
return result.get("kg_data", {"entities": [], "relations": []})
else:
# Return empty knowledge graph on failure
return {"entities": [], "relations": []}