AgentGraph / agentgraph /methods /experimental /enhanced_adaptive_extractor.py
wu981526092's picture
add
7bc750c
"""
Enhanced Adaptive Knowledge Extraction System
A Claude/Cursor-like system that discovers and extracts knowledge incrementally using tools
for exploration, just like how Claude analyzes codebases piece by piece.
Key Design Principles:
1. Incremental Discovery: Explore content section by section
2. Tool-Driven Exploration: Use tools to find interesting patterns
3. Contextual Building: Build understanding incrementally
4. Goal-Oriented: Work backwards from extraction goals
5. Token Optimization: Process small sections instead of entire content
"""
import os
import sys
import json
import logging
import time
import re
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple
from pathlib import Path
# Add path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from crewai import Agent, Task, Crew, Process
from crewai.tools import tool
from pydantic import BaseModel, Field
# Import reference-based models
from agentgraph.shared.models.reference_based import (
Entity, Relation, KnowledgeGraph, ContentReference, Failure
)
# Import existing analysis tools
from agentgraph.input.content_analysis import SemanticAnalyzer, LogTypeDetector
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Environment setup
os.environ["OPENAI_MODEL_NAME"] = "gpt-5-mini"
class ContentSection(BaseModel):
"""Represents a section of content for analysis"""
section_id: str = Field(..., description="Unique identifier for this section")
content: str = Field(..., description="The content of this section")
start_line: int = Field(..., description="Starting line number")
end_line: int = Field(..., description="Ending line number")
section_type: str = Field(..., description="Type of section (e.g., 'agent_interaction', 'tool_usage', 'json_block')")
importance: str = Field(..., description="Importance level: 'high', 'medium', 'low'")
patterns: List[str] = Field(default_factory=list, description="Key patterns identified in this section")
class DiscoveryResult(BaseModel):
"""Results from content discovery phase"""
total_sections: int = Field(..., description="Total number of sections discovered")
high_priority_sections: List[ContentSection] = Field(default_factory=list, description="High priority sections")
medium_priority_sections: List[ContentSection] = Field(default_factory=list, description="Medium priority sections")
low_priority_sections: List[ContentSection] = Field(default_factory=list, description="Low priority sections")
content_overview: str = Field(..., description="High-level overview of content structure")
extraction_strategy: str = Field(..., description="Recommended extraction strategy")
class SectionAnalysis(BaseModel):
"""Analysis results for a specific section"""
section_id: str = Field(..., description="ID of the analyzed section")
entities_found: List[Dict] = Field(default_factory=list, description="Entities discovered in this section")
relations_found: List[Dict] = Field(default_factory=list, description="Relations discovered in this section")
cross_references: List[str] = Field(default_factory=list, description="References to other sections")
quality_score: float = Field(..., description="Quality score for this section's analysis")
needs_expansion: bool = Field(False, description="Whether this section needs context expansion")
# =============================================================================
# ENHANCED DISCOVERY TOOLS (Claude-like exploration)
# =============================================================================
@tool("semantic_content_analyzer")
def analyze_content_semantically(content: str) -> Dict[str, Any]:
"""
Analyze content using semantic analysis to find natural boundaries and structure.
Like how Claude analyzes code structure before diving into details.
"""
try:
analyzer = SemanticAnalyzer()
result = analyzer.analyze_semantic_structure(content)
# Convert to simplified format
return {
"total_segments": len(result.get("segments", [])),
"breakpoints": len(result.get("breakpoints", [])),
"coherence_score": result.get("coherence_score", 0.0),
"method_used": result.get("method", "unknown"),
"agent_trace_type": result.get("agent_trace_type", "unknown"),
"recommended_chunk_size": min(2000, max(500, len(content) // 10))
}
except Exception as e:
logger.error(f"Semantic analysis failed: {e}")
return {"error": str(e), "total_segments": 0}
@tool("content_section_reader")
def read_content_section(content: str, start_line: int, end_line: int) -> str:
"""
Read a specific section of content, like how Claude reads specific parts of files.
"""
try:
lines = content.split('\n')
if start_line < 1 or start_line > len(lines):
return f"Error: start_line {start_line} out of range (1-{len(lines)})"
actual_end = min(end_line, len(lines))
section_lines = lines[start_line-1:actual_end]
return '\n'.join(section_lines)
except Exception as e:
return f"Error reading section: {e}"
@tool("pattern_searcher")
def search_content_patterns(content: str, pattern: str, max_matches: int = 20) -> List[Dict]:
"""
Search for patterns in content, like how Claude searches for specific code patterns.
"""
try:
matches = []
lines = content.split('\n')
# Try regex first, fall back to simple text search
try:
regex = re.compile(pattern, re.IGNORECASE)
use_regex = True
except re.error:
use_regex = False
for i, line in enumerate(lines, 1):
if use_regex:
if regex.search(line):
matches.append({
"line_number": i,
"content": line.strip(),
"pattern": pattern
})
elif pattern.lower() in line.lower():
matches.append({
"line_number": i,
"content": line.strip(),
"pattern": pattern
})
if len(matches) >= max_matches:
break
return matches
except Exception as e:
logger.error(f"Pattern search failed: {e}")
return []
@tool("cross_reference_finder")
def find_cross_references(content: str, entity_name: str) -> List[Dict]:
"""
Find cross-references to entities across content, like how Claude traces usage across files.
"""
try:
references = []
lines = content.split('\n')
# Search for various reference patterns
patterns = [
entity_name,
entity_name.lower(),
entity_name.upper(),
entity_name.replace(' ', '_'),
entity_name.replace(' ', '-')
]
for i, line in enumerate(lines, 1):
for pattern in patterns:
if pattern in line:
references.append({
"line_number": i,
"content": line.strip(),
"reference_type": "direct_mention",
"pattern_matched": pattern
})
break
return references[:10] # Limit to top 10 references
except Exception as e:
logger.error(f"Cross-reference search failed: {e}")
return []
@tool("context_expander")
def expand_context_around_line(content: str, line_number: int, context_lines: int = 5) -> str:
"""
Expand context around a specific line, like how Claude shows surrounding code.
"""
try:
lines = content.split('\n')
start = max(0, line_number - context_lines - 1)
end = min(len(lines), line_number + context_lines)
context_lines_list = lines[start:end]
return '\n'.join(context_lines_list)
except Exception as e:
return f"Error expanding context: {e}"
@tool("entity_discovery_probe")
def discover_entities_in_section(section_content: str, section_type: str) -> List[Dict]:
"""
Discover entities in a specific section, like how Claude identifies key components.
"""
try:
entities = []
lines = section_content.split('\n')
# Entity patterns based on section type
patterns = {
'agent_interaction': [
(r'(?:agent|assistant|system)[\s:]*([^,\n]+)', 'Agent'),
(r'(?:task|instruction)[\s:]*([^,\n]+)', 'Task'),
(r'(?:tool|function)[\s:]*([^,\n]+)', 'Tool')
],
'tool_usage': [
(r'@tool\s*\(\s*["\']([^"\']+)["\']', 'Tool'),
(r'def\s+([a-zA-Z_][a-zA-Z0-9_]*)', 'Tool'),
(r'class\s+([a-zA-Z_][a-zA-Z0-9_]*)', 'Tool')
],
'json_block': [
(r'"type"\s*:\s*"([^"]+)"', 'Entity'),
(r'"name"\s*:\s*"([^"]+)"', 'Entity'),
(r'"id"\s*:\s*"([^"]+)"', 'Entity')
]
}
section_patterns = patterns.get(section_type, patterns['agent_interaction'])
for i, line in enumerate(lines, 1):
for pattern, entity_type in section_patterns:
matches = re.findall(pattern, line, re.IGNORECASE)
for match in matches:
entities.append({
"name": match.strip(),
"type": entity_type,
"line_number": i,
"confidence": 0.8
})
return entities[:10] # Limit to top 10 entities
except Exception as e:
logger.error(f"Entity discovery failed: {e}")
return []
@tool("content_structure_mapper")
def map_content_structure(content: str) -> Dict[str, Any]:
"""
Map the overall structure of content, like how Claude understands file organization.
"""
try:
lines = content.split('\n')
structure = {
"total_lines": len(lines),
"sections": [],
"patterns": {
"agent_mentions": 0,
"tool_definitions": 0,
"json_blocks": 0,
"code_blocks": 0
}
}
current_section = None
section_start = 1
for i, line in enumerate(lines, 1):
line_lower = line.lower().strip()
# Detect section boundaries
if any(marker in line_lower for marker in ['<l', '# ', '## ', '### ', '```', '---']):
if current_section:
structure["sections"].append({
"type": current_section,
"start_line": section_start,
"end_line": i-1,
"line_count": i - section_start
})
# Determine section type
if '<l' in line_lower:
current_section = "agent_interaction"
elif line_lower.startswith('#'):
current_section = "documentation"
elif '```' in line_lower:
current_section = "code_block"
else:
current_section = "general"
section_start = i
# Count patterns
if any(word in line_lower for word in ['agent', 'assistant', 'system']):
structure["patterns"]["agent_mentions"] += 1
if any(word in line_lower for word in ['@tool', 'def ', 'function']):
structure["patterns"]["tool_definitions"] += 1
if line_lower.strip().startswith('{') or line_lower.strip().startswith('['):
structure["patterns"]["json_blocks"] += 1
if any(word in line_lower for word in ['```', 'def ', 'class ', 'import ']):
structure["patterns"]["code_blocks"] += 1
# Close last section
if current_section:
structure["sections"].append({
"type": current_section,
"start_line": section_start,
"end_line": len(lines),
"line_count": len(lines) - section_start + 1
})
return structure
except Exception as e:
logger.error(f"Structure mapping failed: {e}")
return {"error": str(e)}
# =============================================================================
# ENHANCED AGENTS (Claude-like reasoning with proper Pydantic output)
# =============================================================================
def create_discovery_agent():
"""Create an agent that discovers interesting content sections like Claude explores codebases"""
return Agent(
role="Content Discovery Specialist",
goal="Explore and discover interesting sections of content for knowledge extraction, like Claude explores codebases",
backstory="""You are a content discovery specialist who works like Claude/Cursor.
You use tools to explore content systematically, identify patterns, and find the most
interesting sections for knowledge extraction. You work incrementally, building
understanding piece by piece rather than trying to process everything at once.
Your output should be a KnowledgeGraph with initial entities and relations discovered
during the exploration phase.""",
verbose=True,
tools=[
analyze_content_semantically,
map_content_structure,
search_content_patterns,
read_content_section
],
max_iter=5,
memory=True
)
def create_analysis_agent():
"""Create an agent that analyzes specific sections in detail"""
return Agent(
role="Section Analysis Expert",
goal="Analyze specific content sections to extract entities and relations with high precision",
backstory="""You are a section analysis expert who focuses on extracting knowledge
from specific sections of content. You work like Claude analyzing specific parts of
code - you focus deeply on small sections to understand them completely before
moving to the next section.
Your output should be a KnowledgeGraph with detailed entities and relations found
in the analyzed sections.""",
verbose=True,
tools=[
discover_entities_in_section,
find_cross_references,
expand_context_around_line,
search_content_patterns
],
max_iter=3,
memory=True
)
def create_synthesis_agent():
"""Create an agent that synthesizes findings from multiple sections"""
return Agent(
role="Knowledge Synthesis Coordinator",
goal="Synthesize findings from multiple sections into a coherent knowledge graph",
backstory="""You are a knowledge synthesis coordinator who combines findings from
multiple content sections into a comprehensive knowledge graph. You work like Claude
building understanding across multiple files - you find connections, resolve
duplicates, and create a unified view.
Your output should be a final KnowledgeGraph with all entities, relations, and
system metadata properly organized.""",
verbose=True,
tools=[
find_cross_references,
search_content_patterns
],
max_iter=3,
memory=True
)
# =============================================================================
# ENHANCED ADAPTIVE SYSTEM (Main Implementation)
# =============================================================================
class EnhancedAdaptiveKnowledgeExtractor:
"""
Enhanced adaptive system that works like Claude/Cursor with incremental discovery
"""
def __init__(self, **kwargs):
self.method_name = "enhanced_adaptive_experimental"
self.config = kwargs
self.max_sections = kwargs.get('max_sections', 5) # Reduced for testing
self.quality_threshold = kwargs.get('quality_threshold', 0.7)
self.token_limit_per_section = kwargs.get('token_limit_per_section', 1500)
# Create specialized agents
self.discovery_agent = create_discovery_agent()
self.analysis_agent = create_analysis_agent()
self.synthesis_agent = create_synthesis_agent()
# Initialize tools
self.semantic_analyzer = SemanticAnalyzer()
self.log_detector = LogTypeDetector()
logger.info(f"Enhanced Adaptive System initialized with {self.max_sections} max sections")
def process_text(self, text: str) -> Dict[str, Any]:
"""
Process text using Claude-like incremental discovery approach
"""
start_time = time.time()
try:
logger.info(f"Starting enhanced adaptive extraction for text length: {len(text)}")
# Phase 1: Content Discovery (like Claude exploring codebase structure)
logger.info("Phase 1: Content Discovery")
discovery_kg = self._discover_content_sections(text)
# Phase 2: Incremental Analysis (like Claude analyzing specific files)
logger.info("Phase 2: Incremental Section Analysis")
analysis_kg = self._analyze_sections_incrementally(discovery_kg, text)
# Phase 3: Knowledge Synthesis (like Claude building comprehensive understanding)
logger.info("Phase 3: Knowledge Synthesis")
final_kg = self._synthesize_knowledge_graph(discovery_kg, analysis_kg, text)
processing_time = time.time() - start_time
# Add processing metadata
final_kg_dict = final_kg.dict() if hasattr(final_kg, 'dict') else final_kg
if "metadata" not in final_kg_dict:
final_kg_dict["metadata"] = {}
final_kg_dict["metadata"]["processing_info"] = {
"method": "enhanced_adaptive_experimental",
"processing_time_seconds": processing_time,
"processed_at": datetime.now().isoformat(),
"quality_threshold": self.quality_threshold,
"discovery_strategy": "incremental_section_analysis"
}
return {
"success": True,
"kg_data": final_kg_dict,
"metadata": final_kg_dict.get("metadata", {})
}
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Error in enhanced adaptive extraction: {e}")
return {
"success": False,
"error": str(e),
"kg_data": {"entities": [], "relations": [], "failures": []},
"metadata": {
"method": "enhanced_adaptive_experimental",
"processing_time_seconds": processing_time,
"error": str(e)
}
}
def _discover_content_sections(self, content: str) -> KnowledgeGraph:
"""
Phase 1: Discover interesting sections like Claude exploring codebase
"""
# Create discovery task
discovery_task = Task(
description=f"""
You are like Claude exploring a codebase. Analyze this content to discover the most
interesting sections for knowledge extraction.
DISCOVERY PROCESS:
1. Use content_structure_mapper to understand overall structure
2. Use analyze_content_semantically to find natural boundaries
3. Use pattern_searcher to find key patterns (agents, tools, tasks)
4. Identify the top {self.max_sections} most important sections
SECTION PRIORITIZATION:
- HIGH: Sections with agent interactions, tool definitions, task descriptions
- MEDIUM: Sections with entity mentions, relationship indicators
- LOW: Sections with simple text, logs, or noise
Content length: {len(content)} characters
Return a KnowledgeGraph with initial entities and relations discovered during exploration.
The entities should represent the main components found (agents, tools, tasks).
The relations should represent the high-level interactions discovered.
""",
agent=self.discovery_agent,
expected_output="KnowledgeGraph with initial entities and relations discovered during content exploration",
output_pydantic=KnowledgeGraph
)
# Execute discovery
discovery_crew = Crew(
agents=[self.discovery_agent],
tasks=[discovery_task],
verbose=True,
process=Process.sequential
)
result = discovery_crew.kickoff(inputs={"input_data": content})
# Extract KnowledgeGraph from result
if hasattr(result, 'pydantic') and result.pydantic:
return result.pydantic
elif hasattr(result, 'raw'):
# Try to parse as KnowledgeGraph JSON
try:
kg_dict = json.loads(result.raw)
return KnowledgeGraph(**kg_dict)
except:
logger.warning("Failed to parse discovery result as KnowledgeGraph")
return KnowledgeGraph(entities=[], relations=[], failures=[])
else:
logger.warning("Unknown discovery result format")
return KnowledgeGraph(entities=[], relations=[], failures=[])
def _analyze_sections_incrementally(self, discovery_kg: KnowledgeGraph, content: str) -> KnowledgeGraph:
"""
Phase 2: Analyze each section incrementally like Claude analyzing specific files
"""
# Use discovered entities to guide section analysis
discovered_entities = [entity.name for entity in discovery_kg.entities]
# Create focused analysis task
analysis_task = Task(
description=f"""
Analyze the content in detail to extract comprehensive entities and relations.
Work like Claude analyzing specific files - focus deeply on important sections.
DISCOVERED ENTITIES: {discovered_entities}
ANALYSIS REQUIREMENTS:
1. Use entity_discovery_probe to find more entities in detail
2. Use pattern_searcher to find relationship patterns
3. Use cross_reference_finder to find references between entities
4. Use context_expander to get more context around important findings
CONTENT TO ANALYZE:
{content[:self.token_limit_per_section * 2]} # Allow more content for analysis
Return a KnowledgeGraph with detailed entities and relations found through analysis.
Build upon the discovered entities and add more detailed relationships.
""",
agent=self.analysis_agent,
expected_output="KnowledgeGraph with detailed entities and relations from section analysis",
output_pydantic=KnowledgeGraph
)
# Execute analysis
analysis_crew = Crew(
agents=[self.analysis_agent],
tasks=[analysis_task],
verbose=True,
process=Process.sequential
)
result = analysis_crew.kickoff(inputs={"input_data": content})
# Extract KnowledgeGraph from result
if hasattr(result, 'pydantic') and result.pydantic:
return result.pydantic
elif hasattr(result, 'raw'):
# Try to parse as KnowledgeGraph JSON
try:
kg_dict = json.loads(result.raw)
return KnowledgeGraph(**kg_dict)
except:
logger.warning("Failed to parse analysis result as KnowledgeGraph")
return KnowledgeGraph(entities=[], relations=[], failures=[])
else:
logger.warning("Unknown analysis result format")
return KnowledgeGraph(entities=[], relations=[], failures=[])
def _synthesize_knowledge_graph(self, discovery_kg: KnowledgeGraph, analysis_kg: KnowledgeGraph, content: str) -> KnowledgeGraph:
"""
Phase 3: Synthesize findings into final knowledge graph like Claude building comprehensive understanding
"""
# Create synthesis task
synthesis_task = Task(
description=f"""
Synthesize the findings from discovery and analysis phases into a final knowledge graph.
Work like Claude building comprehensive understanding across multiple files.
SYNTHESIS PROCESS:
1. Combine entities from discovery and analysis, removing duplicates
2. Merge similar entities with different names
3. Create relations between entities found in different phases
4. Use cross_reference_finder to validate relationships
5. Ensure reference-based schema compliance
DISCOVERY FINDINGS:
Entities: {len(discovery_kg.entities)}
Relations: {len(discovery_kg.relations)}
ANALYSIS FINDINGS:
Entities: {len(analysis_kg.entities)}
Relations: {len(analysis_kg.relations)}
Build the final knowledge graph with entities, relations, and content references.
Add system_name and system_summary based on the extracted knowledge.
""",
agent=self.synthesis_agent,
expected_output="Complete KnowledgeGraph with all entities, relations, and system metadata",
output_pydantic=KnowledgeGraph
)
# Execute synthesis
synthesis_crew = Crew(
agents=[self.synthesis_agent],
tasks=[synthesis_task],
verbose=True,
process=Process.sequential
)
# Prepare input data combining both knowledge graphs
input_data = {
"discovery_kg": discovery_kg.dict(),
"analysis_kg": analysis_kg.dict(),
"content": content
}
result = synthesis_crew.kickoff(inputs=input_data)
# Extract KnowledgeGraph from result
if hasattr(result, 'pydantic') and result.pydantic:
return result.pydantic
elif hasattr(result, 'raw'):
# Try to parse as KnowledgeGraph JSON
try:
kg_dict = json.loads(result.raw)
return KnowledgeGraph(**kg_dict)
except:
logger.warning("Failed to parse synthesis result as KnowledgeGraph")
# Return merged KnowledgeGraph as fallback
return self._merge_knowledge_graphs(discovery_kg, analysis_kg)
else:
logger.warning("Unknown synthesis result format")
return self._merge_knowledge_graphs(discovery_kg, analysis_kg)
def _merge_knowledge_graphs(self, kg1: KnowledgeGraph, kg2: KnowledgeGraph) -> KnowledgeGraph:
"""Merge two knowledge graphs as fallback"""
return KnowledgeGraph(
entities=kg1.entities + kg2.entities,
relations=kg1.relations + kg2.relations,
failures=kg1.failures + kg2.failures,
system_name="Enhanced Adaptive System",
system_summary="System extracted using enhanced adaptive approach with incremental discovery"
)
# =============================================================================
# MAIN INTERFACE FUNCTION
# =============================================================================
def extract_knowledge_graph_enhanced(trace_data: str, **kwargs) -> Dict[str, Any]:
"""
Enhanced knowledge extraction using Claude-like incremental discovery
"""
extractor = EnhancedAdaptiveKnowledgeExtractor(**kwargs)
return extractor.process_text(trace_data)
# =============================================================================
# FACTORY FUNCTION
# =============================================================================
def create_enhanced_adaptive_crew() -> Crew:
"""Create a crew with enhanced adaptive capabilities"""
discovery_agent = create_discovery_agent()
analysis_agent = create_analysis_agent()
synthesis_agent = create_synthesis_agent()
return Crew(
agents=[discovery_agent, analysis_agent, synthesis_agent],
tasks=[], # Tasks are created dynamically
verbose=True,
process=Process.sequential
)