""" Enhanced Adaptive Knowledge Extraction System A Claude/Cursor-like system that discovers and extracts knowledge incrementally using tools for exploration, just like how Claude analyzes codebases piece by piece. Key Design Principles: 1. Incremental Discovery: Explore content section by section 2. Tool-Driven Exploration: Use tools to find interesting patterns 3. Contextual Building: Build understanding incrementally 4. Goal-Oriented: Work backwards from extraction goals 5. Token Optimization: Process small sections instead of entire content """ import os import sys import json import logging import time import re from datetime import datetime from typing import Dict, List, Any, Optional, Tuple from pathlib import Path # Add path for imports sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from crewai import Agent, Task, Crew, Process from crewai.tools import tool from pydantic import BaseModel, Field # Import reference-based models from agentgraph.shared.models.reference_based import ( Entity, Relation, KnowledgeGraph, ContentReference, Failure ) # Import existing analysis tools from agentgraph.input.content_analysis import SemanticAnalyzer, LogTypeDetector # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Environment setup os.environ["OPENAI_MODEL_NAME"] = "gpt-5-mini" class ContentSection(BaseModel): """Represents a section of content for analysis""" section_id: str = Field(..., description="Unique identifier for this section") content: str = Field(..., description="The content of this section") start_line: int = Field(..., description="Starting line number") end_line: int = Field(..., description="Ending line number") section_type: str = Field(..., description="Type of section (e.g., 'agent_interaction', 'tool_usage', 'json_block')") importance: str = Field(..., description="Importance level: 'high', 'medium', 'low'") patterns: List[str] = Field(default_factory=list, description="Key patterns identified in this section") class DiscoveryResult(BaseModel): """Results from content discovery phase""" total_sections: int = Field(..., description="Total number of sections discovered") high_priority_sections: List[ContentSection] = Field(default_factory=list, description="High priority sections") medium_priority_sections: List[ContentSection] = Field(default_factory=list, description="Medium priority sections") low_priority_sections: List[ContentSection] = Field(default_factory=list, description="Low priority sections") content_overview: str = Field(..., description="High-level overview of content structure") extraction_strategy: str = Field(..., description="Recommended extraction strategy") class SectionAnalysis(BaseModel): """Analysis results for a specific section""" section_id: str = Field(..., description="ID of the analyzed section") entities_found: List[Dict] = Field(default_factory=list, description="Entities discovered in this section") relations_found: List[Dict] = Field(default_factory=list, description="Relations discovered in this section") cross_references: List[str] = Field(default_factory=list, description="References to other sections") quality_score: float = Field(..., description="Quality score for this section's analysis") needs_expansion: bool = Field(False, description="Whether this section needs context expansion") # ============================================================================= # ENHANCED DISCOVERY TOOLS (Claude-like exploration) # ============================================================================= @tool("semantic_content_analyzer") def analyze_content_semantically(content: str) -> Dict[str, Any]: """ Analyze content using semantic analysis to find natural boundaries and structure. Like how Claude analyzes code structure before diving into details. """ try: analyzer = SemanticAnalyzer() result = analyzer.analyze_semantic_structure(content) # Convert to simplified format return { "total_segments": len(result.get("segments", [])), "breakpoints": len(result.get("breakpoints", [])), "coherence_score": result.get("coherence_score", 0.0), "method_used": result.get("method", "unknown"), "agent_trace_type": result.get("agent_trace_type", "unknown"), "recommended_chunk_size": min(2000, max(500, len(content) // 10)) } except Exception as e: logger.error(f"Semantic analysis failed: {e}") return {"error": str(e), "total_segments": 0} @tool("content_section_reader") def read_content_section(content: str, start_line: int, end_line: int) -> str: """ Read a specific section of content, like how Claude reads specific parts of files. """ try: lines = content.split('\n') if start_line < 1 or start_line > len(lines): return f"Error: start_line {start_line} out of range (1-{len(lines)})" actual_end = min(end_line, len(lines)) section_lines = lines[start_line-1:actual_end] return '\n'.join(section_lines) except Exception as e: return f"Error reading section: {e}" @tool("pattern_searcher") def search_content_patterns(content: str, pattern: str, max_matches: int = 20) -> List[Dict]: """ Search for patterns in content, like how Claude searches for specific code patterns. """ try: matches = [] lines = content.split('\n') # Try regex first, fall back to simple text search try: regex = re.compile(pattern, re.IGNORECASE) use_regex = True except re.error: use_regex = False for i, line in enumerate(lines, 1): if use_regex: if regex.search(line): matches.append({ "line_number": i, "content": line.strip(), "pattern": pattern }) elif pattern.lower() in line.lower(): matches.append({ "line_number": i, "content": line.strip(), "pattern": pattern }) if len(matches) >= max_matches: break return matches except Exception as e: logger.error(f"Pattern search failed: {e}") return [] @tool("cross_reference_finder") def find_cross_references(content: str, entity_name: str) -> List[Dict]: """ Find cross-references to entities across content, like how Claude traces usage across files. """ try: references = [] lines = content.split('\n') # Search for various reference patterns patterns = [ entity_name, entity_name.lower(), entity_name.upper(), entity_name.replace(' ', '_'), entity_name.replace(' ', '-') ] for i, line in enumerate(lines, 1): for pattern in patterns: if pattern in line: references.append({ "line_number": i, "content": line.strip(), "reference_type": "direct_mention", "pattern_matched": pattern }) break return references[:10] # Limit to top 10 references except Exception as e: logger.error(f"Cross-reference search failed: {e}") return [] @tool("context_expander") def expand_context_around_line(content: str, line_number: int, context_lines: int = 5) -> str: """ Expand context around a specific line, like how Claude shows surrounding code. """ try: lines = content.split('\n') start = max(0, line_number - context_lines - 1) end = min(len(lines), line_number + context_lines) context_lines_list = lines[start:end] return '\n'.join(context_lines_list) except Exception as e: return f"Error expanding context: {e}" @tool("entity_discovery_probe") def discover_entities_in_section(section_content: str, section_type: str) -> List[Dict]: """ Discover entities in a specific section, like how Claude identifies key components. """ try: entities = [] lines = section_content.split('\n') # Entity patterns based on section type patterns = { 'agent_interaction': [ (r'(?:agent|assistant|system)[\s:]*([^,\n]+)', 'Agent'), (r'(?:task|instruction)[\s:]*([^,\n]+)', 'Task'), (r'(?:tool|function)[\s:]*([^,\n]+)', 'Tool') ], 'tool_usage': [ (r'@tool\s*\(\s*["\']([^"\']+)["\']', 'Tool'), (r'def\s+([a-zA-Z_][a-zA-Z0-9_]*)', 'Tool'), (r'class\s+([a-zA-Z_][a-zA-Z0-9_]*)', 'Tool') ], 'json_block': [ (r'"type"\s*:\s*"([^"]+)"', 'Entity'), (r'"name"\s*:\s*"([^"]+)"', 'Entity'), (r'"id"\s*:\s*"([^"]+)"', 'Entity') ] } section_patterns = patterns.get(section_type, patterns['agent_interaction']) for i, line in enumerate(lines, 1): for pattern, entity_type in section_patterns: matches = re.findall(pattern, line, re.IGNORECASE) for match in matches: entities.append({ "name": match.strip(), "type": entity_type, "line_number": i, "confidence": 0.8 }) return entities[:10] # Limit to top 10 entities except Exception as e: logger.error(f"Entity discovery failed: {e}") return [] @tool("content_structure_mapper") def map_content_structure(content: str) -> Dict[str, Any]: """ Map the overall structure of content, like how Claude understands file organization. """ try: lines = content.split('\n') structure = { "total_lines": len(lines), "sections": [], "patterns": { "agent_mentions": 0, "tool_definitions": 0, "json_blocks": 0, "code_blocks": 0 } } current_section = None section_start = 1 for i, line in enumerate(lines, 1): line_lower = line.lower().strip() # Detect section boundaries if any(marker in line_lower for marker in [' Dict[str, Any]: """ Process text using Claude-like incremental discovery approach """ start_time = time.time() try: logger.info(f"Starting enhanced adaptive extraction for text length: {len(text)}") # Phase 1: Content Discovery (like Claude exploring codebase structure) logger.info("Phase 1: Content Discovery") discovery_kg = self._discover_content_sections(text) # Phase 2: Incremental Analysis (like Claude analyzing specific files) logger.info("Phase 2: Incremental Section Analysis") analysis_kg = self._analyze_sections_incrementally(discovery_kg, text) # Phase 3: Knowledge Synthesis (like Claude building comprehensive understanding) logger.info("Phase 3: Knowledge Synthesis") final_kg = self._synthesize_knowledge_graph(discovery_kg, analysis_kg, text) processing_time = time.time() - start_time # Add processing metadata final_kg_dict = final_kg.dict() if hasattr(final_kg, 'dict') else final_kg if "metadata" not in final_kg_dict: final_kg_dict["metadata"] = {} final_kg_dict["metadata"]["processing_info"] = { "method": "enhanced_adaptive_experimental", "processing_time_seconds": processing_time, "processed_at": datetime.now().isoformat(), "quality_threshold": self.quality_threshold, "discovery_strategy": "incremental_section_analysis" } return { "success": True, "kg_data": final_kg_dict, "metadata": final_kg_dict.get("metadata", {}) } except Exception as e: processing_time = time.time() - start_time logger.error(f"Error in enhanced adaptive extraction: {e}") return { "success": False, "error": str(e), "kg_data": {"entities": [], "relations": [], "failures": []}, "metadata": { "method": "enhanced_adaptive_experimental", "processing_time_seconds": processing_time, "error": str(e) } } def _discover_content_sections(self, content: str) -> KnowledgeGraph: """ Phase 1: Discover interesting sections like Claude exploring codebase """ # Create discovery task discovery_task = Task( description=f""" You are like Claude exploring a codebase. Analyze this content to discover the most interesting sections for knowledge extraction. DISCOVERY PROCESS: 1. Use content_structure_mapper to understand overall structure 2. Use analyze_content_semantically to find natural boundaries 3. Use pattern_searcher to find key patterns (agents, tools, tasks) 4. Identify the top {self.max_sections} most important sections SECTION PRIORITIZATION: - HIGH: Sections with agent interactions, tool definitions, task descriptions - MEDIUM: Sections with entity mentions, relationship indicators - LOW: Sections with simple text, logs, or noise Content length: {len(content)} characters Return a KnowledgeGraph with initial entities and relations discovered during exploration. The entities should represent the main components found (agents, tools, tasks). The relations should represent the high-level interactions discovered. """, agent=self.discovery_agent, expected_output="KnowledgeGraph with initial entities and relations discovered during content exploration", output_pydantic=KnowledgeGraph ) # Execute discovery discovery_crew = Crew( agents=[self.discovery_agent], tasks=[discovery_task], verbose=True, process=Process.sequential ) result = discovery_crew.kickoff(inputs={"input_data": content}) # Extract KnowledgeGraph from result if hasattr(result, 'pydantic') and result.pydantic: return result.pydantic elif hasattr(result, 'raw'): # Try to parse as KnowledgeGraph JSON try: kg_dict = json.loads(result.raw) return KnowledgeGraph(**kg_dict) except: logger.warning("Failed to parse discovery result as KnowledgeGraph") return KnowledgeGraph(entities=[], relations=[], failures=[]) else: logger.warning("Unknown discovery result format") return KnowledgeGraph(entities=[], relations=[], failures=[]) def _analyze_sections_incrementally(self, discovery_kg: KnowledgeGraph, content: str) -> KnowledgeGraph: """ Phase 2: Analyze each section incrementally like Claude analyzing specific files """ # Use discovered entities to guide section analysis discovered_entities = [entity.name for entity in discovery_kg.entities] # Create focused analysis task analysis_task = Task( description=f""" Analyze the content in detail to extract comprehensive entities and relations. Work like Claude analyzing specific files - focus deeply on important sections. DISCOVERED ENTITIES: {discovered_entities} ANALYSIS REQUIREMENTS: 1. Use entity_discovery_probe to find more entities in detail 2. Use pattern_searcher to find relationship patterns 3. Use cross_reference_finder to find references between entities 4. Use context_expander to get more context around important findings CONTENT TO ANALYZE: {content[:self.token_limit_per_section * 2]} # Allow more content for analysis Return a KnowledgeGraph with detailed entities and relations found through analysis. Build upon the discovered entities and add more detailed relationships. """, agent=self.analysis_agent, expected_output="KnowledgeGraph with detailed entities and relations from section analysis", output_pydantic=KnowledgeGraph ) # Execute analysis analysis_crew = Crew( agents=[self.analysis_agent], tasks=[analysis_task], verbose=True, process=Process.sequential ) result = analysis_crew.kickoff(inputs={"input_data": content}) # Extract KnowledgeGraph from result if hasattr(result, 'pydantic') and result.pydantic: return result.pydantic elif hasattr(result, 'raw'): # Try to parse as KnowledgeGraph JSON try: kg_dict = json.loads(result.raw) return KnowledgeGraph(**kg_dict) except: logger.warning("Failed to parse analysis result as KnowledgeGraph") return KnowledgeGraph(entities=[], relations=[], failures=[]) else: logger.warning("Unknown analysis result format") return KnowledgeGraph(entities=[], relations=[], failures=[]) def _synthesize_knowledge_graph(self, discovery_kg: KnowledgeGraph, analysis_kg: KnowledgeGraph, content: str) -> KnowledgeGraph: """ Phase 3: Synthesize findings into final knowledge graph like Claude building comprehensive understanding """ # Create synthesis task synthesis_task = Task( description=f""" Synthesize the findings from discovery and analysis phases into a final knowledge graph. Work like Claude building comprehensive understanding across multiple files. SYNTHESIS PROCESS: 1. Combine entities from discovery and analysis, removing duplicates 2. Merge similar entities with different names 3. Create relations between entities found in different phases 4. Use cross_reference_finder to validate relationships 5. Ensure reference-based schema compliance DISCOVERY FINDINGS: Entities: {len(discovery_kg.entities)} Relations: {len(discovery_kg.relations)} ANALYSIS FINDINGS: Entities: {len(analysis_kg.entities)} Relations: {len(analysis_kg.relations)} Build the final knowledge graph with entities, relations, and content references. Add system_name and system_summary based on the extracted knowledge. """, agent=self.synthesis_agent, expected_output="Complete KnowledgeGraph with all entities, relations, and system metadata", output_pydantic=KnowledgeGraph ) # Execute synthesis synthesis_crew = Crew( agents=[self.synthesis_agent], tasks=[synthesis_task], verbose=True, process=Process.sequential ) # Prepare input data combining both knowledge graphs input_data = { "discovery_kg": discovery_kg.dict(), "analysis_kg": analysis_kg.dict(), "content": content } result = synthesis_crew.kickoff(inputs=input_data) # Extract KnowledgeGraph from result if hasattr(result, 'pydantic') and result.pydantic: return result.pydantic elif hasattr(result, 'raw'): # Try to parse as KnowledgeGraph JSON try: kg_dict = json.loads(result.raw) return KnowledgeGraph(**kg_dict) except: logger.warning("Failed to parse synthesis result as KnowledgeGraph") # Return merged KnowledgeGraph as fallback return self._merge_knowledge_graphs(discovery_kg, analysis_kg) else: logger.warning("Unknown synthesis result format") return self._merge_knowledge_graphs(discovery_kg, analysis_kg) def _merge_knowledge_graphs(self, kg1: KnowledgeGraph, kg2: KnowledgeGraph) -> KnowledgeGraph: """Merge two knowledge graphs as fallback""" return KnowledgeGraph( entities=kg1.entities + kg2.entities, relations=kg1.relations + kg2.relations, failures=kg1.failures + kg2.failures, system_name="Enhanced Adaptive System", system_summary="System extracted using enhanced adaptive approach with incremental discovery" ) # ============================================================================= # MAIN INTERFACE FUNCTION # ============================================================================= def extract_knowledge_graph_enhanced(trace_data: str, **kwargs) -> Dict[str, Any]: """ Enhanced knowledge extraction using Claude-like incremental discovery """ extractor = EnhancedAdaptiveKnowledgeExtractor(**kwargs) return extractor.process_text(trace_data) # ============================================================================= # FACTORY FUNCTION # ============================================================================= def create_enhanced_adaptive_crew() -> Crew: """Create a crew with enhanced adaptive capabilities""" discovery_agent = create_discovery_agent() analysis_agent = create_analysis_agent() synthesis_agent = create_synthesis_agent() return Crew( agents=[discovery_agent, analysis_agent, synthesis_agent], tasks=[], # Tasks are created dynamically verbose=True, process=Process.sequential )