Spaces:
Running
Running
| """ | |
| Hybrid Knowledge Extraction Method (2-Task Approach) | |
| A hybrid approach that combines the efficiency of the unified method with the | |
| thoroughness of the original method. Uses 2 tasks: one for entity extraction | |
| and relationship analysis combined, and another for knowledge graph validation | |
| and enhancement. | |
| """ | |
| # Import the LiteLLM fix FIRST, before any other imports that might use LiteLLM | |
| import os | |
| import sys | |
| # Add the parent directory to the path to ensure imports work correctly | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) | |
| import json | |
| import logging | |
| import time | |
| from datetime import datetime | |
| from typing import Any, Dict | |
| from crewai import Agent, Crew, Process, Task | |
| from evaluation.knowledge_extraction.baselines.base_method import BaseKnowledgeExtractionMethod | |
| from evaluation.knowledge_extraction.baselines.unified_method import KnowledgeGraph | |
| # Import shared prompt templates | |
| from evaluation.knowledge_extraction.utils.prompts import ( | |
| ENTITY_EXTRACTION_INSTRUCTION_PROMPT, | |
| ENTITY_EXTRACTION_SYSTEM_PROMPT, | |
| RELATION_EXTRACTION_INSTRUCTION_PROMPT, | |
| RELATION_EXTRACTION_SYSTEM_PROMPT, | |
| ) | |
| from utils.fix_litellm_stop_param import * # This applies the patches # noqa: F403 | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Set higher log levels for noisy libraries | |
| logging.getLogger("openai").setLevel(logging.WARNING) | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("litellm").setLevel(logging.WARNING) | |
| logging.getLogger("chromadb").setLevel(logging.WARNING) | |
| # Set default verbosity level | |
| verbose_level = 0 | |
| # Set environment variables | |
| os.environ["OPENAI_MODEL_NAME"] = "gpt-5-mini" | |
| class HybridKnowledgeExtractionMethod(BaseKnowledgeExtractionMethod): | |
| """Hybrid 2-task knowledge extraction method using CrewAI.""" | |
| def __init__(self, **kwargs): | |
| super().__init__("hybrid_method", **kwargs) | |
| self._setup_agents_and_tasks() | |
| def _setup_agents_and_tasks(self): | |
| """Set up the CrewAI agents and tasks.""" | |
| # Create extraction agent (combines entity and relationship extraction) | |
| self.extraction_agent = Agent( | |
| role="Knowledge Extraction Specialist", | |
| goal="Extract comprehensive entities and relationships from agent system data efficiently", | |
| backstory=f"""{ENTITY_EXTRACTION_SYSTEM_PROMPT} | |
| {RELATION_EXTRACTION_SYSTEM_PROMPT}""", | |
| verbose=bool(verbose_level), | |
| llm=os.environ["OPENAI_MODEL_NAME"] | |
| ) | |
| # Create validation and enhancement agent | |
| self.validation_agent = Agent( | |
| role="Knowledge Graph Validator and Enhancer", | |
| goal="Validate, enhance, and structure extracted knowledge into a comprehensive knowledge graph", | |
| backstory="""You are a knowledge graph validation and enhancement specialist who ensures | |
| the quality, completeness, and coherence of extracted knowledge graphs. You take raw | |
| extracted entities and relationships and transform them into polished, well-structured | |
| knowledge graphs. | |
| Your expertise includes: | |
| - Validating entity and relationship consistency | |
| - Identifying and filling gaps in knowledge extraction | |
| - Ensuring proper connectivity and graph coherence | |
| - Creating meaningful system summaries and assessments | |
| - Optimizing knowledge graph structure for clarity and usability | |
| You serve as the quality assurance layer that transforms good extractions into | |
| excellent knowledge graphs.""", | |
| verbose=bool(verbose_level), | |
| llm=os.environ["OPENAI_MODEL_NAME"] | |
| ) | |
| # Create extraction task | |
| self.extraction_task = Task( | |
| description=f""" | |
| {ENTITY_EXTRACTION_INSTRUCTION_PROMPT} | |
| {RELATION_EXTRACTION_INSTRUCTION_PROMPT} | |
| """, | |
| agent=self.extraction_agent, | |
| expected_output="Structured extraction with entities, relations, and preliminary analysis", | |
| ) | |
| # Create validation and enhancement task | |
| self.validation_task = Task( | |
| description=""" | |
| Validate, enhance, and structure the extracted knowledge into a comprehensive knowledge graph. | |
| Take the extracted entities and relationships from the previous task and: | |
| 1. VALIDATION AND ENHANCEMENT: | |
| - Verify all entities have proper IDs, types, names, and descriptions | |
| - Ensure all relationships use correct predefined types | |
| - Check that every entity connects to at least one other entity | |
| - Fill any gaps in entity descriptions or relationship mappings | |
| - Validate that relationship directions and types are correct | |
| 2. CONNECTIVITY OPTIMIZATION: | |
| - Ensure no isolated entities (all must be connected) | |
| - Verify logical flow from inputs through processing to outputs | |
| - Add missing relationships if entities should be connected | |
| - Optimize relationship network for clarity and completeness | |
| 3. KNOWLEDGE GRAPH CONSTRUCTION: | |
| - Create descriptive system name (3-7 words) | |
| - Write comprehensive 2-3 sentence system summary explaining purpose, coordination, and value | |
| - Include metadata with timestamp, statistics, and processing information | |
| - Ensure all components are reachable (no isolated subgraphs) | |
| - Validate connectivity: inputs consumed, outputs produced, agents have roles | |
| 4. QUALITY ASSURANCE: | |
| - Double-check entity uniqueness and proper categorization | |
| - Verify relationship consistency and logical flow | |
| - Ensure system summary accurately reflects the extracted knowledge | |
| - Validate that the knowledge graph tells a coherent story | |
| Output a complete, validated KnowledgeGraph object with entities, relations, system_name, | |
| system_summary, and metadata. Ensure the knowledge graph is comprehensive, accurate, | |
| well-connected, and represents the system effectively. | |
| """, | |
| agent=self.validation_agent, | |
| expected_output="A complete, validated knowledge graph with entities, relations, and metadata", | |
| context=[self.extraction_task], | |
| output_pydantic=KnowledgeGraph, | |
| ) | |
| # Create crew | |
| self.hybrid_crew = Crew( | |
| agents=[self.extraction_agent, self.validation_agent], | |
| tasks=[self.extraction_task, self.validation_task], | |
| verbose=bool(verbose_level), | |
| memory=False, | |
| planning=False, | |
| process=Process.sequential, | |
| ) | |
| def process_text(self, text: str) -> Dict[str, Any]: | |
| """ | |
| Process input text using the hybrid 2-task CrewAI approach. | |
| Args: | |
| text: Input text to process | |
| Returns: | |
| Dictionary with kg_data, metadata, success, and optional error | |
| """ | |
| start_time = time.time() | |
| try: | |
| logger.info(f"process_text called with text length: {len(text)}") | |
| logger.info(f"text first 200 chars: {repr(text[:200])}") | |
| logger.info("Starting hybrid crew execution with input_data...") | |
| # Run the crew with proper input mechanism | |
| result = self.hybrid_crew.kickoff(inputs={"input_data": text}) | |
| logger.info(f"Crew execution completed, result type: {type(result)}") | |
| processing_time = time.time() - start_time | |
| # Extract the knowledge graph from the result | |
| if hasattr(result, 'pydantic') and result.pydantic: | |
| kg_data = result.pydantic.dict() | |
| elif hasattr(result, 'raw'): | |
| # Try to parse as JSON | |
| try: | |
| kg_data = json.loads(result.raw) | |
| except: # noqa: E722 | |
| kg_data = {"entities": [], "relations": [], "error": "Failed to parse result"} | |
| else: | |
| kg_data = {"entities": [], "relations": [], "error": "Unknown result format"} | |
| # Validate kg_data structure | |
| if not isinstance(kg_data, dict): | |
| raise ValueError("kg_data is not a dict after parsing") | |
| if not ("entities" in kg_data and "relations" in kg_data): | |
| raise ValueError("kg_data missing 'entities' or 'relations'") | |
| # Add metadata | |
| if "metadata" not in kg_data: | |
| kg_data["metadata"] = {} | |
| kg_data["metadata"]["processing_info"] = { | |
| "method": "hybrid_2_task", | |
| "processing_time_seconds": processing_time, | |
| "processed_at": datetime.now().isoformat(), | |
| "agent_count": 2, | |
| "task_count": 2, | |
| "api_calls": 2 | |
| } | |
| # Calculate statistics | |
| entity_count = len(kg_data.get("entities", [])) | |
| relation_count = len(kg_data.get("relations", [])) | |
| return { | |
| "success": True, | |
| "kg_data": kg_data, | |
| "metadata": { | |
| "approach": "hybrid_2_task", | |
| "tasks_executed": 2, | |
| "agents_used": 2, | |
| "method": self.method_name, | |
| "processing_time_seconds": processing_time, | |
| "entity_count": entity_count, | |
| "relation_count": relation_count, | |
| "entities_per_second": entity_count / processing_time if processing_time > 0 else 0, | |
| "relations_per_second": relation_count / processing_time if processing_time > 0 else 0, | |
| "api_calls": 2 | |
| } | |
| } | |
| except Exception as e: | |
| processing_time = time.time() - start_time | |
| logger.error(f"Error in hybrid knowledge extraction method: {e}") | |
| logger.error(f"Error type: {type(e).__name__}") | |
| import traceback | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "kg_data": {"entities": [], "relations": []}, | |
| "metadata": { | |
| "approach": "hybrid_2_task", | |
| "tasks_executed": 0, | |
| "agents_used": 0, | |
| "method": self.method_name, | |
| "processing_time_seconds": processing_time, | |
| "api_calls": 2 | |
| } | |
| } | |
| def extract_knowledge_graph(self, trace_data: str) -> Dict[str, Any]: | |
| """ | |
| Extract knowledge graph from trace data. | |
| Args: | |
| trace_data: Agent trace data as JSON string | |
| Returns: | |
| Dictionary with entities and relations | |
| """ | |
| try: | |
| # Debug logging | |
| logger.info(f"extract_knowledge_graph called with trace_data type: {type(trace_data)}") | |
| if isinstance(trace_data, str): | |
| logger.info(f"trace_data length: {len(trace_data)}") | |
| logger.info(f"trace_data first 200 chars: {repr(trace_data[:200])}") | |
| # Pass the JSON string directly to process_text without re-encoding | |
| result = self.process_text(trace_data) | |
| # Return just the knowledge graph data | |
| if result.get("success", False): | |
| return result.get("kg_data", {"entities": [], "relations": []}) | |
| else: | |
| # Return empty knowledge graph on failure | |
| return {"entities": [], "relations": []} | |
| except Exception as e: | |
| logger.error(f"Error in extract_knowledge_graph: {e}") | |
| logger.error(f"trace_data type: {type(trace_data)}") | |
| if isinstance(trace_data, str): | |
| logger.error(f"trace_data content (first 200 chars): {repr(trace_data[:200])}") | |
| return {"entities": [], "relations": []} | |