Spaces:
Running
Running
| """ | |
| Enhanced Adaptive Knowledge Extraction System | |
| A Claude/Cursor-like system that discovers and extracts knowledge incrementally using tools | |
| for exploration, just like how Claude analyzes codebases piece by piece. | |
| Key Design Principles: | |
| 1. Incremental Discovery: Explore content section by section | |
| 2. Tool-Driven Exploration: Use tools to find interesting patterns | |
| 3. Contextual Building: Build understanding incrementally | |
| 4. Goal-Oriented: Work backwards from extraction goals | |
| 5. Token Optimization: Process small sections instead of entire content | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import logging | |
| import time | |
| import re | |
| from datetime import datetime | |
| from typing import Dict, List, Any, Optional, Tuple | |
| from pathlib import Path | |
| # Add path for imports | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) | |
| from crewai import Agent, Task, Crew, Process | |
| from crewai.tools import tool | |
| from pydantic import BaseModel, Field | |
| # Import reference-based models | |
| from agentgraph.shared.models.reference_based import ( | |
| Entity, Relation, KnowledgeGraph, ContentReference, Failure | |
| ) | |
| # Import existing analysis tools | |
| from agentgraph.input.content_analysis import SemanticAnalyzer, LogTypeDetector | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Environment setup | |
| os.environ["OPENAI_MODEL_NAME"] = "gpt-5-mini" | |
| class ContentSection(BaseModel): | |
| """Represents a section of content for analysis""" | |
| section_id: str = Field(..., description="Unique identifier for this section") | |
| content: str = Field(..., description="The content of this section") | |
| start_line: int = Field(..., description="Starting line number") | |
| end_line: int = Field(..., description="Ending line number") | |
| section_type: str = Field(..., description="Type of section (e.g., 'agent_interaction', 'tool_usage', 'json_block')") | |
| importance: str = Field(..., description="Importance level: 'high', 'medium', 'low'") | |
| patterns: List[str] = Field(default_factory=list, description="Key patterns identified in this section") | |
| class DiscoveryResult(BaseModel): | |
| """Results from content discovery phase""" | |
| total_sections: int = Field(..., description="Total number of sections discovered") | |
| high_priority_sections: List[ContentSection] = Field(default_factory=list, description="High priority sections") | |
| medium_priority_sections: List[ContentSection] = Field(default_factory=list, description="Medium priority sections") | |
| low_priority_sections: List[ContentSection] = Field(default_factory=list, description="Low priority sections") | |
| content_overview: str = Field(..., description="High-level overview of content structure") | |
| extraction_strategy: str = Field(..., description="Recommended extraction strategy") | |
| class SectionAnalysis(BaseModel): | |
| """Analysis results for a specific section""" | |
| section_id: str = Field(..., description="ID of the analyzed section") | |
| entities_found: List[Dict] = Field(default_factory=list, description="Entities discovered in this section") | |
| relations_found: List[Dict] = Field(default_factory=list, description="Relations discovered in this section") | |
| cross_references: List[str] = Field(default_factory=list, description="References to other sections") | |
| quality_score: float = Field(..., description="Quality score for this section's analysis") | |
| needs_expansion: bool = Field(False, description="Whether this section needs context expansion") | |
| # ============================================================================= | |
| # ENHANCED DISCOVERY TOOLS (Claude-like exploration) | |
| # ============================================================================= | |
| def analyze_content_semantically(content: str) -> Dict[str, Any]: | |
| """ | |
| Analyze content using semantic analysis to find natural boundaries and structure. | |
| Like how Claude analyzes code structure before diving into details. | |
| """ | |
| try: | |
| analyzer = SemanticAnalyzer() | |
| result = analyzer.analyze_semantic_structure(content) | |
| # Convert to simplified format | |
| return { | |
| "total_segments": len(result.get("segments", [])), | |
| "breakpoints": len(result.get("breakpoints", [])), | |
| "coherence_score": result.get("coherence_score", 0.0), | |
| "method_used": result.get("method", "unknown"), | |
| "agent_trace_type": result.get("agent_trace_type", "unknown"), | |
| "recommended_chunk_size": min(2000, max(500, len(content) // 10)) | |
| } | |
| except Exception as e: | |
| logger.error(f"Semantic analysis failed: {e}") | |
| return {"error": str(e), "total_segments": 0} | |
| def read_content_section(content: str, start_line: int, end_line: int) -> str: | |
| """ | |
| Read a specific section of content, like how Claude reads specific parts of files. | |
| """ | |
| try: | |
| lines = content.split('\n') | |
| if start_line < 1 or start_line > len(lines): | |
| return f"Error: start_line {start_line} out of range (1-{len(lines)})" | |
| actual_end = min(end_line, len(lines)) | |
| section_lines = lines[start_line-1:actual_end] | |
| return '\n'.join(section_lines) | |
| except Exception as e: | |
| return f"Error reading section: {e}" | |
| def search_content_patterns(content: str, pattern: str, max_matches: int = 20) -> List[Dict]: | |
| """ | |
| Search for patterns in content, like how Claude searches for specific code patterns. | |
| """ | |
| try: | |
| matches = [] | |
| lines = content.split('\n') | |
| # Try regex first, fall back to simple text search | |
| try: | |
| regex = re.compile(pattern, re.IGNORECASE) | |
| use_regex = True | |
| except re.error: | |
| use_regex = False | |
| for i, line in enumerate(lines, 1): | |
| if use_regex: | |
| if regex.search(line): | |
| matches.append({ | |
| "line_number": i, | |
| "content": line.strip(), | |
| "pattern": pattern | |
| }) | |
| elif pattern.lower() in line.lower(): | |
| matches.append({ | |
| "line_number": i, | |
| "content": line.strip(), | |
| "pattern": pattern | |
| }) | |
| if len(matches) >= max_matches: | |
| break | |
| return matches | |
| except Exception as e: | |
| logger.error(f"Pattern search failed: {e}") | |
| return [] | |
| def find_cross_references(content: str, entity_name: str) -> List[Dict]: | |
| """ | |
| Find cross-references to entities across content, like how Claude traces usage across files. | |
| """ | |
| try: | |
| references = [] | |
| lines = content.split('\n') | |
| # Search for various reference patterns | |
| patterns = [ | |
| entity_name, | |
| entity_name.lower(), | |
| entity_name.upper(), | |
| entity_name.replace(' ', '_'), | |
| entity_name.replace(' ', '-') | |
| ] | |
| for i, line in enumerate(lines, 1): | |
| for pattern in patterns: | |
| if pattern in line: | |
| references.append({ | |
| "line_number": i, | |
| "content": line.strip(), | |
| "reference_type": "direct_mention", | |
| "pattern_matched": pattern | |
| }) | |
| break | |
| return references[:10] # Limit to top 10 references | |
| except Exception as e: | |
| logger.error(f"Cross-reference search failed: {e}") | |
| return [] | |
| def expand_context_around_line(content: str, line_number: int, context_lines: int = 5) -> str: | |
| """ | |
| Expand context around a specific line, like how Claude shows surrounding code. | |
| """ | |
| try: | |
| lines = content.split('\n') | |
| start = max(0, line_number - context_lines - 1) | |
| end = min(len(lines), line_number + context_lines) | |
| context_lines_list = lines[start:end] | |
| return '\n'.join(context_lines_list) | |
| except Exception as e: | |
| return f"Error expanding context: {e}" | |
| def discover_entities_in_section(section_content: str, section_type: str) -> List[Dict]: | |
| """ | |
| Discover entities in a specific section, like how Claude identifies key components. | |
| """ | |
| try: | |
| entities = [] | |
| lines = section_content.split('\n') | |
| # Entity patterns based on section type | |
| patterns = { | |
| 'agent_interaction': [ | |
| (r'(?:agent|assistant|system)[\s:]*([^,\n]+)', 'Agent'), | |
| (r'(?:task|instruction)[\s:]*([^,\n]+)', 'Task'), | |
| (r'(?:tool|function)[\s:]*([^,\n]+)', 'Tool') | |
| ], | |
| 'tool_usage': [ | |
| (r'@tool\s*\(\s*["\']([^"\']+)["\']', 'Tool'), | |
| (r'def\s+([a-zA-Z_][a-zA-Z0-9_]*)', 'Tool'), | |
| (r'class\s+([a-zA-Z_][a-zA-Z0-9_]*)', 'Tool') | |
| ], | |
| 'json_block': [ | |
| (r'"type"\s*:\s*"([^"]+)"', 'Entity'), | |
| (r'"name"\s*:\s*"([^"]+)"', 'Entity'), | |
| (r'"id"\s*:\s*"([^"]+)"', 'Entity') | |
| ] | |
| } | |
| section_patterns = patterns.get(section_type, patterns['agent_interaction']) | |
| for i, line in enumerate(lines, 1): | |
| for pattern, entity_type in section_patterns: | |
| matches = re.findall(pattern, line, re.IGNORECASE) | |
| for match in matches: | |
| entities.append({ | |
| "name": match.strip(), | |
| "type": entity_type, | |
| "line_number": i, | |
| "confidence": 0.8 | |
| }) | |
| return entities[:10] # Limit to top 10 entities | |
| except Exception as e: | |
| logger.error(f"Entity discovery failed: {e}") | |
| return [] | |
| def map_content_structure(content: str) -> Dict[str, Any]: | |
| """ | |
| Map the overall structure of content, like how Claude understands file organization. | |
| """ | |
| try: | |
| lines = content.split('\n') | |
| structure = { | |
| "total_lines": len(lines), | |
| "sections": [], | |
| "patterns": { | |
| "agent_mentions": 0, | |
| "tool_definitions": 0, | |
| "json_blocks": 0, | |
| "code_blocks": 0 | |
| } | |
| } | |
| current_section = None | |
| section_start = 1 | |
| for i, line in enumerate(lines, 1): | |
| line_lower = line.lower().strip() | |
| # Detect section boundaries | |
| if any(marker in line_lower for marker in ['<l', '# ', '## ', '### ', '```', '---']): | |
| if current_section: | |
| structure["sections"].append({ | |
| "type": current_section, | |
| "start_line": section_start, | |
| "end_line": i-1, | |
| "line_count": i - section_start | |
| }) | |
| # Determine section type | |
| if '<l' in line_lower: | |
| current_section = "agent_interaction" | |
| elif line_lower.startswith('#'): | |
| current_section = "documentation" | |
| elif '```' in line_lower: | |
| current_section = "code_block" | |
| else: | |
| current_section = "general" | |
| section_start = i | |
| # Count patterns | |
| if any(word in line_lower for word in ['agent', 'assistant', 'system']): | |
| structure["patterns"]["agent_mentions"] += 1 | |
| if any(word in line_lower for word in ['@tool', 'def ', 'function']): | |
| structure["patterns"]["tool_definitions"] += 1 | |
| if line_lower.strip().startswith('{') or line_lower.strip().startswith('['): | |
| structure["patterns"]["json_blocks"] += 1 | |
| if any(word in line_lower for word in ['```', 'def ', 'class ', 'import ']): | |
| structure["patterns"]["code_blocks"] += 1 | |
| # Close last section | |
| if current_section: | |
| structure["sections"].append({ | |
| "type": current_section, | |
| "start_line": section_start, | |
| "end_line": len(lines), | |
| "line_count": len(lines) - section_start + 1 | |
| }) | |
| return structure | |
| except Exception as e: | |
| logger.error(f"Structure mapping failed: {e}") | |
| return {"error": str(e)} | |
| # ============================================================================= | |
| # ENHANCED AGENTS (Claude-like reasoning with proper Pydantic output) | |
| # ============================================================================= | |
| def create_discovery_agent(): | |
| """Create an agent that discovers interesting content sections like Claude explores codebases""" | |
| return Agent( | |
| role="Content Discovery Specialist", | |
| goal="Explore and discover interesting sections of content for knowledge extraction, like Claude explores codebases", | |
| backstory="""You are a content discovery specialist who works like Claude/Cursor. | |
| You use tools to explore content systematically, identify patterns, and find the most | |
| interesting sections for knowledge extraction. You work incrementally, building | |
| understanding piece by piece rather than trying to process everything at once. | |
| Your output should be a KnowledgeGraph with initial entities and relations discovered | |
| during the exploration phase.""", | |
| verbose=True, | |
| tools=[ | |
| analyze_content_semantically, | |
| map_content_structure, | |
| search_content_patterns, | |
| read_content_section | |
| ], | |
| max_iter=5, | |
| memory=True | |
| ) | |
| def create_analysis_agent(): | |
| """Create an agent that analyzes specific sections in detail""" | |
| return Agent( | |
| role="Section Analysis Expert", | |
| goal="Analyze specific content sections to extract entities and relations with high precision", | |
| backstory="""You are a section analysis expert who focuses on extracting knowledge | |
| from specific sections of content. You work like Claude analyzing specific parts of | |
| code - you focus deeply on small sections to understand them completely before | |
| moving to the next section. | |
| Your output should be a KnowledgeGraph with detailed entities and relations found | |
| in the analyzed sections.""", | |
| verbose=True, | |
| tools=[ | |
| discover_entities_in_section, | |
| find_cross_references, | |
| expand_context_around_line, | |
| search_content_patterns | |
| ], | |
| max_iter=3, | |
| memory=True | |
| ) | |
| def create_synthesis_agent(): | |
| """Create an agent that synthesizes findings from multiple sections""" | |
| return Agent( | |
| role="Knowledge Synthesis Coordinator", | |
| goal="Synthesize findings from multiple sections into a coherent knowledge graph", | |
| backstory="""You are a knowledge synthesis coordinator who combines findings from | |
| multiple content sections into a comprehensive knowledge graph. You work like Claude | |
| building understanding across multiple files - you find connections, resolve | |
| duplicates, and create a unified view. | |
| Your output should be a final KnowledgeGraph with all entities, relations, and | |
| system metadata properly organized.""", | |
| verbose=True, | |
| tools=[ | |
| find_cross_references, | |
| search_content_patterns | |
| ], | |
| max_iter=3, | |
| memory=True | |
| ) | |
| # ============================================================================= | |
| # ENHANCED ADAPTIVE SYSTEM (Main Implementation) | |
| # ============================================================================= | |
| class EnhancedAdaptiveKnowledgeExtractor: | |
| """ | |
| Enhanced adaptive system that works like Claude/Cursor with incremental discovery | |
| """ | |
| def __init__(self, **kwargs): | |
| self.method_name = "enhanced_adaptive_experimental" | |
| self.config = kwargs | |
| self.max_sections = kwargs.get('max_sections', 5) # Reduced for testing | |
| self.quality_threshold = kwargs.get('quality_threshold', 0.7) | |
| self.token_limit_per_section = kwargs.get('token_limit_per_section', 1500) | |
| # Create specialized agents | |
| self.discovery_agent = create_discovery_agent() | |
| self.analysis_agent = create_analysis_agent() | |
| self.synthesis_agent = create_synthesis_agent() | |
| # Initialize tools | |
| self.semantic_analyzer = SemanticAnalyzer() | |
| self.log_detector = LogTypeDetector() | |
| logger.info(f"Enhanced Adaptive System initialized with {self.max_sections} max sections") | |
| def process_text(self, text: str) -> Dict[str, Any]: | |
| """ | |
| Process text using Claude-like incremental discovery approach | |
| """ | |
| start_time = time.time() | |
| try: | |
| logger.info(f"Starting enhanced adaptive extraction for text length: {len(text)}") | |
| # Phase 1: Content Discovery (like Claude exploring codebase structure) | |
| logger.info("Phase 1: Content Discovery") | |
| discovery_kg = self._discover_content_sections(text) | |
| # Phase 2: Incremental Analysis (like Claude analyzing specific files) | |
| logger.info("Phase 2: Incremental Section Analysis") | |
| analysis_kg = self._analyze_sections_incrementally(discovery_kg, text) | |
| # Phase 3: Knowledge Synthesis (like Claude building comprehensive understanding) | |
| logger.info("Phase 3: Knowledge Synthesis") | |
| final_kg = self._synthesize_knowledge_graph(discovery_kg, analysis_kg, text) | |
| processing_time = time.time() - start_time | |
| # Add processing metadata | |
| final_kg_dict = final_kg.dict() if hasattr(final_kg, 'dict') else final_kg | |
| if "metadata" not in final_kg_dict: | |
| final_kg_dict["metadata"] = {} | |
| final_kg_dict["metadata"]["processing_info"] = { | |
| "method": "enhanced_adaptive_experimental", | |
| "processing_time_seconds": processing_time, | |
| "processed_at": datetime.now().isoformat(), | |
| "quality_threshold": self.quality_threshold, | |
| "discovery_strategy": "incremental_section_analysis" | |
| } | |
| return { | |
| "success": True, | |
| "kg_data": final_kg_dict, | |
| "metadata": final_kg_dict.get("metadata", {}) | |
| } | |
| except Exception as e: | |
| processing_time = time.time() - start_time | |
| logger.error(f"Error in enhanced adaptive extraction: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "kg_data": {"entities": [], "relations": [], "failures": []}, | |
| "metadata": { | |
| "method": "enhanced_adaptive_experimental", | |
| "processing_time_seconds": processing_time, | |
| "error": str(e) | |
| } | |
| } | |
| def _discover_content_sections(self, content: str) -> KnowledgeGraph: | |
| """ | |
| Phase 1: Discover interesting sections like Claude exploring codebase | |
| """ | |
| # Create discovery task | |
| discovery_task = Task( | |
| description=f""" | |
| You are like Claude exploring a codebase. Analyze this content to discover the most | |
| interesting sections for knowledge extraction. | |
| DISCOVERY PROCESS: | |
| 1. Use content_structure_mapper to understand overall structure | |
| 2. Use analyze_content_semantically to find natural boundaries | |
| 3. Use pattern_searcher to find key patterns (agents, tools, tasks) | |
| 4. Identify the top {self.max_sections} most important sections | |
| SECTION PRIORITIZATION: | |
| - HIGH: Sections with agent interactions, tool definitions, task descriptions | |
| - MEDIUM: Sections with entity mentions, relationship indicators | |
| - LOW: Sections with simple text, logs, or noise | |
| Content length: {len(content)} characters | |
| Return a KnowledgeGraph with initial entities and relations discovered during exploration. | |
| The entities should represent the main components found (agents, tools, tasks). | |
| The relations should represent the high-level interactions discovered. | |
| """, | |
| agent=self.discovery_agent, | |
| expected_output="KnowledgeGraph with initial entities and relations discovered during content exploration", | |
| output_pydantic=KnowledgeGraph | |
| ) | |
| # Execute discovery | |
| discovery_crew = Crew( | |
| agents=[self.discovery_agent], | |
| tasks=[discovery_task], | |
| verbose=True, | |
| process=Process.sequential | |
| ) | |
| result = discovery_crew.kickoff(inputs={"input_data": content}) | |
| # Extract KnowledgeGraph from result | |
| if hasattr(result, 'pydantic') and result.pydantic: | |
| return result.pydantic | |
| elif hasattr(result, 'raw'): | |
| # Try to parse as KnowledgeGraph JSON | |
| try: | |
| kg_dict = json.loads(result.raw) | |
| return KnowledgeGraph(**kg_dict) | |
| except: | |
| logger.warning("Failed to parse discovery result as KnowledgeGraph") | |
| return KnowledgeGraph(entities=[], relations=[], failures=[]) | |
| else: | |
| logger.warning("Unknown discovery result format") | |
| return KnowledgeGraph(entities=[], relations=[], failures=[]) | |
| def _analyze_sections_incrementally(self, discovery_kg: KnowledgeGraph, content: str) -> KnowledgeGraph: | |
| """ | |
| Phase 2: Analyze each section incrementally like Claude analyzing specific files | |
| """ | |
| # Use discovered entities to guide section analysis | |
| discovered_entities = [entity.name for entity in discovery_kg.entities] | |
| # Create focused analysis task | |
| analysis_task = Task( | |
| description=f""" | |
| Analyze the content in detail to extract comprehensive entities and relations. | |
| Work like Claude analyzing specific files - focus deeply on important sections. | |
| DISCOVERED ENTITIES: {discovered_entities} | |
| ANALYSIS REQUIREMENTS: | |
| 1. Use entity_discovery_probe to find more entities in detail | |
| 2. Use pattern_searcher to find relationship patterns | |
| 3. Use cross_reference_finder to find references between entities | |
| 4. Use context_expander to get more context around important findings | |
| CONTENT TO ANALYZE: | |
| {content[:self.token_limit_per_section * 2]} # Allow more content for analysis | |
| Return a KnowledgeGraph with detailed entities and relations found through analysis. | |
| Build upon the discovered entities and add more detailed relationships. | |
| """, | |
| agent=self.analysis_agent, | |
| expected_output="KnowledgeGraph with detailed entities and relations from section analysis", | |
| output_pydantic=KnowledgeGraph | |
| ) | |
| # Execute analysis | |
| analysis_crew = Crew( | |
| agents=[self.analysis_agent], | |
| tasks=[analysis_task], | |
| verbose=True, | |
| process=Process.sequential | |
| ) | |
| result = analysis_crew.kickoff(inputs={"input_data": content}) | |
| # Extract KnowledgeGraph from result | |
| if hasattr(result, 'pydantic') and result.pydantic: | |
| return result.pydantic | |
| elif hasattr(result, 'raw'): | |
| # Try to parse as KnowledgeGraph JSON | |
| try: | |
| kg_dict = json.loads(result.raw) | |
| return KnowledgeGraph(**kg_dict) | |
| except: | |
| logger.warning("Failed to parse analysis result as KnowledgeGraph") | |
| return KnowledgeGraph(entities=[], relations=[], failures=[]) | |
| else: | |
| logger.warning("Unknown analysis result format") | |
| return KnowledgeGraph(entities=[], relations=[], failures=[]) | |
| def _synthesize_knowledge_graph(self, discovery_kg: KnowledgeGraph, analysis_kg: KnowledgeGraph, content: str) -> KnowledgeGraph: | |
| """ | |
| Phase 3: Synthesize findings into final knowledge graph like Claude building comprehensive understanding | |
| """ | |
| # Create synthesis task | |
| synthesis_task = Task( | |
| description=f""" | |
| Synthesize the findings from discovery and analysis phases into a final knowledge graph. | |
| Work like Claude building comprehensive understanding across multiple files. | |
| SYNTHESIS PROCESS: | |
| 1. Combine entities from discovery and analysis, removing duplicates | |
| 2. Merge similar entities with different names | |
| 3. Create relations between entities found in different phases | |
| 4. Use cross_reference_finder to validate relationships | |
| 5. Ensure reference-based schema compliance | |
| DISCOVERY FINDINGS: | |
| Entities: {len(discovery_kg.entities)} | |
| Relations: {len(discovery_kg.relations)} | |
| ANALYSIS FINDINGS: | |
| Entities: {len(analysis_kg.entities)} | |
| Relations: {len(analysis_kg.relations)} | |
| Build the final knowledge graph with entities, relations, and content references. | |
| Add system_name and system_summary based on the extracted knowledge. | |
| """, | |
| agent=self.synthesis_agent, | |
| expected_output="Complete KnowledgeGraph with all entities, relations, and system metadata", | |
| output_pydantic=KnowledgeGraph | |
| ) | |
| # Execute synthesis | |
| synthesis_crew = Crew( | |
| agents=[self.synthesis_agent], | |
| tasks=[synthesis_task], | |
| verbose=True, | |
| process=Process.sequential | |
| ) | |
| # Prepare input data combining both knowledge graphs | |
| input_data = { | |
| "discovery_kg": discovery_kg.dict(), | |
| "analysis_kg": analysis_kg.dict(), | |
| "content": content | |
| } | |
| result = synthesis_crew.kickoff(inputs=input_data) | |
| # Extract KnowledgeGraph from result | |
| if hasattr(result, 'pydantic') and result.pydantic: | |
| return result.pydantic | |
| elif hasattr(result, 'raw'): | |
| # Try to parse as KnowledgeGraph JSON | |
| try: | |
| kg_dict = json.loads(result.raw) | |
| return KnowledgeGraph(**kg_dict) | |
| except: | |
| logger.warning("Failed to parse synthesis result as KnowledgeGraph") | |
| # Return merged KnowledgeGraph as fallback | |
| return self._merge_knowledge_graphs(discovery_kg, analysis_kg) | |
| else: | |
| logger.warning("Unknown synthesis result format") | |
| return self._merge_knowledge_graphs(discovery_kg, analysis_kg) | |
| def _merge_knowledge_graphs(self, kg1: KnowledgeGraph, kg2: KnowledgeGraph) -> KnowledgeGraph: | |
| """Merge two knowledge graphs as fallback""" | |
| return KnowledgeGraph( | |
| entities=kg1.entities + kg2.entities, | |
| relations=kg1.relations + kg2.relations, | |
| failures=kg1.failures + kg2.failures, | |
| system_name="Enhanced Adaptive System", | |
| system_summary="System extracted using enhanced adaptive approach with incremental discovery" | |
| ) | |
| # ============================================================================= | |
| # MAIN INTERFACE FUNCTION | |
| # ============================================================================= | |
| def extract_knowledge_graph_enhanced(trace_data: str, **kwargs) -> Dict[str, Any]: | |
| """ | |
| Enhanced knowledge extraction using Claude-like incremental discovery | |
| """ | |
| extractor = EnhancedAdaptiveKnowledgeExtractor(**kwargs) | |
| return extractor.process_text(trace_data) | |
| # ============================================================================= | |
| # FACTORY FUNCTION | |
| # ============================================================================= | |
| def create_enhanced_adaptive_crew() -> Crew: | |
| """Create a crew with enhanced adaptive capabilities""" | |
| discovery_agent = create_discovery_agent() | |
| analysis_agent = create_analysis_agent() | |
| synthesis_agent = create_synthesis_agent() | |
| return Crew( | |
| agents=[discovery_agent, analysis_agent, synthesis_agent], | |
| tasks=[], # Tasks are created dynamically | |
| verbose=True, | |
| process=Process.sequential | |
| ) | |