wu981526092's picture
add
7bc750c
"""
Hybrid Knowledge Extraction Method (2-Task Approach)
A hybrid approach that combines the efficiency of the unified method with the
thoroughness of the original method. Uses 2 tasks: one for entity extraction
and relationship analysis combined, and another for knowledge graph validation
and enhancement.
"""
# Import the LiteLLM fix FIRST, before any other imports that might use LiteLLM
import os
import sys
# Add the parent directory to the path to ensure imports work correctly
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
import json
import logging
import time
from datetime import datetime
from typing import Any, Dict
from crewai import Agent, Crew, Process, Task
from evaluation.knowledge_extraction.baselines.base_method import BaseKnowledgeExtractionMethod
from evaluation.knowledge_extraction.baselines.unified_method import KnowledgeGraph
# Import shared prompt templates
from evaluation.knowledge_extraction.utils.prompts import (
ENTITY_EXTRACTION_INSTRUCTION_PROMPT,
ENTITY_EXTRACTION_SYSTEM_PROMPT,
RELATION_EXTRACTION_INSTRUCTION_PROMPT,
RELATION_EXTRACTION_SYSTEM_PROMPT,
)
from utils.fix_litellm_stop_param import * # This applies the patches # noqa: F403
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Set higher log levels for noisy libraries
logging.getLogger("openai").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("litellm").setLevel(logging.WARNING)
logging.getLogger("chromadb").setLevel(logging.WARNING)
# Set default verbosity level
verbose_level = 0
# Set environment variables
os.environ["OPENAI_MODEL_NAME"] = "gpt-5-mini"
class HybridKnowledgeExtractionMethod(BaseKnowledgeExtractionMethod):
"""Hybrid 2-task knowledge extraction method using CrewAI."""
def __init__(self, **kwargs):
super().__init__("hybrid_method", **kwargs)
self._setup_agents_and_tasks()
def _setup_agents_and_tasks(self):
"""Set up the CrewAI agents and tasks."""
# Create extraction agent (combines entity and relationship extraction)
self.extraction_agent = Agent(
role="Knowledge Extraction Specialist",
goal="Extract comprehensive entities and relationships from agent system data efficiently",
backstory=f"""{ENTITY_EXTRACTION_SYSTEM_PROMPT}
{RELATION_EXTRACTION_SYSTEM_PROMPT}""",
verbose=bool(verbose_level),
llm=os.environ["OPENAI_MODEL_NAME"]
)
# Create validation and enhancement agent
self.validation_agent = Agent(
role="Knowledge Graph Validator and Enhancer",
goal="Validate, enhance, and structure extracted knowledge into a comprehensive knowledge graph",
backstory="""You are a knowledge graph validation and enhancement specialist who ensures
the quality, completeness, and coherence of extracted knowledge graphs. You take raw
extracted entities and relationships and transform them into polished, well-structured
knowledge graphs.
Your expertise includes:
- Validating entity and relationship consistency
- Identifying and filling gaps in knowledge extraction
- Ensuring proper connectivity and graph coherence
- Creating meaningful system summaries and assessments
- Optimizing knowledge graph structure for clarity and usability
You serve as the quality assurance layer that transforms good extractions into
excellent knowledge graphs.""",
verbose=bool(verbose_level),
llm=os.environ["OPENAI_MODEL_NAME"]
)
# Create extraction task
self.extraction_task = Task(
description=f"""
{ENTITY_EXTRACTION_INSTRUCTION_PROMPT}
{RELATION_EXTRACTION_INSTRUCTION_PROMPT}
""",
agent=self.extraction_agent,
expected_output="Structured extraction with entities, relations, and preliminary analysis",
)
# Create validation and enhancement task
self.validation_task = Task(
description="""
Validate, enhance, and structure the extracted knowledge into a comprehensive knowledge graph.
Take the extracted entities and relationships from the previous task and:
1. VALIDATION AND ENHANCEMENT:
- Verify all entities have proper IDs, types, names, and descriptions
- Ensure all relationships use correct predefined types
- Check that every entity connects to at least one other entity
- Fill any gaps in entity descriptions or relationship mappings
- Validate that relationship directions and types are correct
2. CONNECTIVITY OPTIMIZATION:
- Ensure no isolated entities (all must be connected)
- Verify logical flow from inputs through processing to outputs
- Add missing relationships if entities should be connected
- Optimize relationship network for clarity and completeness
3. KNOWLEDGE GRAPH CONSTRUCTION:
- Create descriptive system name (3-7 words)
- Write comprehensive 2-3 sentence system summary explaining purpose, coordination, and value
- Include metadata with timestamp, statistics, and processing information
- Ensure all components are reachable (no isolated subgraphs)
- Validate connectivity: inputs consumed, outputs produced, agents have roles
4. QUALITY ASSURANCE:
- Double-check entity uniqueness and proper categorization
- Verify relationship consistency and logical flow
- Ensure system summary accurately reflects the extracted knowledge
- Validate that the knowledge graph tells a coherent story
Output a complete, validated KnowledgeGraph object with entities, relations, system_name,
system_summary, and metadata. Ensure the knowledge graph is comprehensive, accurate,
well-connected, and represents the system effectively.
""",
agent=self.validation_agent,
expected_output="A complete, validated knowledge graph with entities, relations, and metadata",
context=[self.extraction_task],
output_pydantic=KnowledgeGraph,
)
# Create crew
self.hybrid_crew = Crew(
agents=[self.extraction_agent, self.validation_agent],
tasks=[self.extraction_task, self.validation_task],
verbose=bool(verbose_level),
memory=False,
planning=False,
process=Process.sequential,
)
def process_text(self, text: str) -> Dict[str, Any]:
"""
Process input text using the hybrid 2-task CrewAI approach.
Args:
text: Input text to process
Returns:
Dictionary with kg_data, metadata, success, and optional error
"""
start_time = time.time()
try:
logger.info(f"process_text called with text length: {len(text)}")
logger.info(f"text first 200 chars: {repr(text[:200])}")
logger.info("Starting hybrid crew execution with input_data...")
# Run the crew with proper input mechanism
result = self.hybrid_crew.kickoff(inputs={"input_data": text})
logger.info(f"Crew execution completed, result type: {type(result)}")
processing_time = time.time() - start_time
# Extract the knowledge graph from the result
if hasattr(result, 'pydantic') and result.pydantic:
kg_data = result.pydantic.dict()
elif hasattr(result, 'raw'):
# Try to parse as JSON
try:
kg_data = json.loads(result.raw)
except: # noqa: E722
kg_data = {"entities": [], "relations": [], "error": "Failed to parse result"}
else:
kg_data = {"entities": [], "relations": [], "error": "Unknown result format"}
# Validate kg_data structure
if not isinstance(kg_data, dict):
raise ValueError("kg_data is not a dict after parsing")
if not ("entities" in kg_data and "relations" in kg_data):
raise ValueError("kg_data missing 'entities' or 'relations'")
# Add metadata
if "metadata" not in kg_data:
kg_data["metadata"] = {}
kg_data["metadata"]["processing_info"] = {
"method": "hybrid_2_task",
"processing_time_seconds": processing_time,
"processed_at": datetime.now().isoformat(),
"agent_count": 2,
"task_count": 2,
"api_calls": 2
}
# Calculate statistics
entity_count = len(kg_data.get("entities", []))
relation_count = len(kg_data.get("relations", []))
return {
"success": True,
"kg_data": kg_data,
"metadata": {
"approach": "hybrid_2_task",
"tasks_executed": 2,
"agents_used": 2,
"method": self.method_name,
"processing_time_seconds": processing_time,
"entity_count": entity_count,
"relation_count": relation_count,
"entities_per_second": entity_count / processing_time if processing_time > 0 else 0,
"relations_per_second": relation_count / processing_time if processing_time > 0 else 0,
"api_calls": 2
}
}
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Error in hybrid knowledge extraction method: {e}")
logger.error(f"Error type: {type(e).__name__}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return {
"success": False,
"error": str(e),
"kg_data": {"entities": [], "relations": []},
"metadata": {
"approach": "hybrid_2_task",
"tasks_executed": 0,
"agents_used": 0,
"method": self.method_name,
"processing_time_seconds": processing_time,
"api_calls": 2
}
}
def extract_knowledge_graph(self, trace_data: str) -> Dict[str, Any]:
"""
Extract knowledge graph from trace data.
Args:
trace_data: Agent trace data as JSON string
Returns:
Dictionary with entities and relations
"""
try:
# Debug logging
logger.info(f"extract_knowledge_graph called with trace_data type: {type(trace_data)}")
if isinstance(trace_data, str):
logger.info(f"trace_data length: {len(trace_data)}")
logger.info(f"trace_data first 200 chars: {repr(trace_data[:200])}")
# Pass the JSON string directly to process_text without re-encoding
result = self.process_text(trace_data)
# Return just the knowledge graph data
if result.get("success", False):
return result.get("kg_data", {"entities": [], "relations": []})
else:
# Return empty knowledge graph on failure
return {"entities": [], "relations": []}
except Exception as e:
logger.error(f"Error in extract_knowledge_graph: {e}")
logger.error(f"trace_data type: {type(trace_data)}")
if isinstance(trace_data, str):
logger.error(f"trace_data content (first 200 chars): {repr(trace_data[:200])}")
return {"entities": [], "relations": []}