Spaces:
Running on Zero
Running on Zero
| """Castle Graph: Scientific concept extraction and cognitive structure mapping.""" | |
| from __future__ import annotations | |
| from typing import List, Dict, Any, Optional, Set, cast | |
| import time | |
| import re | |
| import math | |
| import hashlib | |
| from collections import Counter, defaultdict | |
| from dataclasses import dataclass | |
| import json | |
| import logging | |
| # Configure secure logging | |
| logger = logging.getLogger(__name__) | |
| class ConceptExtractionResult: | |
| """Scientific result of concept extraction with full validation metrics.""" | |
| concept_id: str | |
| confidence: float | |
| extraction_method: str | |
| supporting_terms: List[str] | |
| semantic_density: float | |
| novelty_score: float | |
| validation_hash: str | |
| extraction_time_ms: float | |
| linguistic_features: Dict[str, Any] | |
| statistical_significance: float | |
| class ConceptValidationMetrics: | |
| """Comprehensive validation metrics for concept extraction.""" | |
| precision: float | |
| recall: float | |
| f1_score: float | |
| semantic_coherence: float | |
| concept_uniqueness: float | |
| extraction_consistency: float | |
| statistical_significance: float | |
| effect_size: float | |
| class CastleGraph: | |
| """ | |
| Castle Graph: Scientific concept extraction and cognitive structure mapping. | |
| This implementation provides peer-review ready concept extraction with: | |
| - Multiple extraction algorithms with comparative analysis | |
| - Statistical validation and significance testing | |
| - Semantic coherence metrics | |
| - Reproducible results with deterministic hashing | |
| - Comprehensive logging for empirical studies | |
| """ | |
| def __init__(self, config: Optional[Dict[str, Any]] = None): | |
| """Initialize the castle graph.""" | |
| self.config = config or {} | |
| self.nodes: Dict[str, Dict[str, Any]] = {} # concept_id -> node_data | |
| self.edges: List[Dict[str, Any]] = [] # list of edge dicts | |
| self.updated_epoch = 0 | |
| # Scientific validation tracking | |
| self.extraction_history: List[ConceptExtractionResult] = [] | |
| self.validation_metrics: List[ConceptValidationMetrics] = [] | |
| self.concept_statistics: defaultdict[str, Dict[str, Any]] = defaultdict( | |
| lambda: {"frequency": 0, "contexts": [], "confidence_sum": 0.0} | |
| ) | |
| # Extraction algorithm configuration | |
| self.extraction_methods = { | |
| "linguistic": self._extract_linguistic_concept, | |
| "semantic": self._extract_semantic_concept, | |
| "statistical": self._extract_statistical_concept, | |
| "hybrid": self._extract_hybrid_concept, | |
| } | |
| self.primary_method = self.config.get("extraction_method", "hybrid") | |
| self.confidence_threshold = self.config.get("confidence_threshold", 0.6) | |
| self.enable_validation = self.config.get("enable_validation", True) | |
| # Linguistic analysis components | |
| self.stop_words = self._initialize_stop_words() | |
| self.concept_patterns = self._initialize_concept_patterns() | |
| self.semantic_weights = self._initialize_semantic_weights() | |
| def infuse(self, mist_lines: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Scientific infusion of mist lines with comprehensive concept extraction and validation. | |
| Returns detailed metrics for empirical analysis and reproducibility. | |
| """ | |
| start_time = time.time() | |
| extraction_results = [] | |
| infusion_metrics: Dict[str, Any] = { | |
| "total_mist_lines": len(mist_lines), | |
| "successful_extractions": 0, | |
| "failed_extractions": 0, | |
| "average_confidence": 0.0, | |
| "extraction_method_distribution": Counter(), | |
| "concept_novelty_distribution": Counter(), | |
| "processing_time_ms": 0.0, | |
| "validation_metrics": None, | |
| } | |
| for mist in mist_lines: | |
| try: | |
| # Advanced concept extraction with full validation | |
| extraction_result = self._extract_concept_scientific(mist) | |
| if extraction_result and extraction_result.confidence >= self.confidence_threshold: | |
| # Update node with scientific heat calculation | |
| self._heat_node_scientific( | |
| extraction_result.concept_id, mist, extraction_result | |
| ) | |
| extraction_results.append(extraction_result) | |
| infusion_metrics["successful_extractions"] += 1 | |
| infusion_metrics["extraction_method_distribution"][ | |
| extraction_result.extraction_method | |
| ] += 1 | |
| infusion_metrics["concept_novelty_distribution"][ | |
| self._categorize_novelty(extraction_result.novelty_score) | |
| ] += 1 | |
| # Track concept statistics for longitudinal analysis | |
| self._track_concept_statistics(extraction_result, mist) | |
| else: | |
| infusion_metrics["failed_extractions"] += 1 | |
| except Exception as e: # pylint: disable=W0718 | |
| # Log extraction failures for analysis | |
| infusion_metrics["failed_extractions"] += 1 | |
| self._log_extraction_error(mist, str(e)) | |
| # Calculate comprehensive metrics | |
| if extraction_results: | |
| infusion_metrics["average_confidence"] = sum( | |
| r.confidence for r in extraction_results | |
| ) / len(extraction_results) | |
| # Perform validation if enabled | |
| if self.enable_validation: | |
| infusion_metrics["validation_metrics"] = self._perform_validation_analysis( | |
| extraction_results, mist_lines | |
| ) | |
| infusion_metrics["processing_time_ms"] = (time.time() - start_time) * 1000 | |
| self.updated_epoch = int(time.time()) | |
| # Store extraction history for reproducibility | |
| self.extraction_history.extend(extraction_results) | |
| return infusion_metrics | |
| def get_top_rooms(self, limit: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve top castle rooms by scientifically calculated heat scores. | |
| Heat calculation incorporates: | |
| - Temporal decay (recency weighting) | |
| - Frequency weighting (visit patterns) | |
| - Confidence weighting (extraction quality) | |
| - Semantic diversity (concept uniqueness) | |
| """ | |
| current_time = time.time() | |
| # Calculate comprehensive heat scores | |
| scored_nodes = [] | |
| for concept_id, node_data in self.nodes.items(): | |
| # Base heat with temporal decay | |
| base_heat = node_data.get("heat", 0.0) | |
| last_visit = node_data.get("last_visit", current_time) | |
| age_hours = (current_time - last_visit) / 3600 | |
| temporal_decay = math.exp(-age_hours / 24) # 24-hour half-life | |
| temporal_heat = base_heat * temporal_decay | |
| # Frequency weighting | |
| visit_count = node_data.get("visit_count", 0) | |
| frequency_bonus = math.log(1 + visit_count) * 0.1 | |
| # Confidence weighting from extraction history | |
| concept_extractions = [e for e in self.extraction_history if e.concept_id == concept_id] | |
| avg_confidence = ( | |
| sum(e.confidence for e in concept_extractions) / len(concept_extractions) | |
| if concept_extractions | |
| else 0.5 | |
| ) | |
| confidence_weight = avg_confidence * 0.2 | |
| # Semantic diversity bonus | |
| semantic_diversity = self._calculate_semantic_diversity(concept_id) | |
| diversity_bonus = semantic_diversity * 0.1 | |
| # Comprehensive heat score | |
| comprehensive_heat = ( | |
| temporal_heat + frequency_bonus + confidence_weight + diversity_bonus | |
| ) | |
| scored_nodes.append((concept_id, comprehensive_heat, node_data)) | |
| # Sort by comprehensive heat score | |
| scored_nodes.sort(key=lambda x: x[1], reverse=True) | |
| # Return top rooms with full metadata | |
| top_rooms = [] | |
| for concept_id, heat_score, node_data in scored_nodes[:limit]: | |
| room_data = { | |
| "concept_id": concept_id, | |
| "heat": heat_score, | |
| "base_heat": node_data.get("heat", 0.0), | |
| "room_type": node_data.get("room_type", "chamber"), | |
| "last_visit": node_data.get("last_visit", 0), | |
| "visit_count": node_data.get("visit_count", 0), | |
| "age_hours": (current_time - node_data.get("last_visit", current_time)) / 3600, | |
| "temporal_decay": math.exp( | |
| -((current_time - node_data.get("last_visit", current_time)) / 3600) / 24 | |
| ), | |
| "extraction_count": len( | |
| [e for e in self.extraction_history if e.concept_id == concept_id] | |
| ), | |
| "avg_confidence": sum( | |
| e.confidence for e in self.extraction_history if e.concept_id == concept_id | |
| ) | |
| / max(1, len([e for e in self.extraction_history if e.concept_id == concept_id])), | |
| "semantic_diversity": self._calculate_semantic_diversity(concept_id), | |
| "creation_epoch": node_data.get("creation_epoch", current_time), | |
| } | |
| top_rooms.append(room_data) | |
| return top_rooms | |
| def _extract_concept_scientific( | |
| self, mist: Dict[str, Any] | |
| ) -> Optional[ConceptExtractionResult]: | |
| """ | |
| Scientific concept extraction with multiple algorithms and validation. | |
| This method implements peer-review ready concept extraction using: | |
| 1. Linguistic pattern matching with statistical validation | |
| 2. Semantic density analysis | |
| 3. Statistical significance testing | |
| 4. Cross-method consensus validation | |
| 5. Reproducible hashing for verification | |
| """ | |
| start_time = time.time() | |
| proto_thought = mist.get("proto_thought", "") | |
| if not proto_thought or len(proto_thought.strip()) < 3: | |
| return None | |
| # Run multiple extraction methods | |
| method_results = {} | |
| for method_name, method_func in self.extraction_methods.items(): | |
| try: | |
| result = method_func(proto_thought, mist) | |
| if result: | |
| method_results[method_name] = result | |
| except Exception as e: # pylint: disable=W0718 | |
| self._log_method_error(method_name, proto_thought, str(e)) | |
| if not method_results: | |
| return None | |
| # Select best result using consensus and confidence weighting | |
| best_result = self._select_best_extraction(method_results) | |
| # Calculate comprehensive validation metrics | |
| validation_metrics = self._calculate_extraction_validation( | |
| best_result, proto_thought | |
| ) | |
| # Create reproducible hash for verification | |
| validation_hash = self._create_validation_hash(best_result, proto_thought, mist) | |
| extraction_time = (time.time() - start_time) * 1000 | |
| # Return comprehensive result | |
| return ConceptExtractionResult( | |
| concept_id=best_result["concept_id"], | |
| confidence=best_result["confidence"], | |
| extraction_method=best_result["method"], | |
| supporting_terms=best_result["supporting_terms"], | |
| semantic_density=validation_metrics["semantic_density"], | |
| novelty_score=validation_metrics["novelty_score"], | |
| validation_hash=validation_hash, | |
| extraction_time_ms=extraction_time, | |
| linguistic_features=validation_metrics["linguistic_features"], | |
| statistical_significance=validation_metrics["statistical_significance"], | |
| ) | |
| def _extract_linguistic_concept( | |
| self, proto_thought: str, mist: Dict[str, Any] | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Linguistic concept extraction using pattern matching and grammatical analysis. | |
| Algorithm: | |
| 1. Tokenize and clean input text | |
| 2. Apply linguistic patterns for concept identification | |
| 3. Calculate confidence based on pattern strength and context | |
| 4. Extract supporting terms for validation | |
| """ | |
| # Clean and tokenize | |
| cleaned_text = self._clean_text(proto_thought) | |
| tokens = self._tokenize(cleaned_text) | |
| if not tokens: | |
| return None | |
| # Apply concept patterns with context weighting | |
| concept_candidates = [] | |
| for pattern_name, pattern_config in self.concept_patterns.items(): | |
| matches = pattern_config["regex"].findall(cleaned_text) | |
| for match in matches: | |
| if isinstance(match, tuple): | |
| match = match[0] # Take first group if tuple | |
| concept = match.lower().strip() | |
| if self._is_valid_concept(concept): | |
| # Base linguistic confidence | |
| confidence = self._calculate_linguistic_confidence( | |
| concept, pattern_config, cleaned_text | |
| ) | |
| # Apply context-based weighting from mist metadata | |
| context_weight = self._calculate_context_weight(concept, mist) | |
| confidence *= context_weight | |
| supporting_terms = self._extract_supporting_terms(concept, cleaned_text) | |
| concept_candidates.append( | |
| { | |
| "concept": concept, | |
| "confidence": confidence, | |
| "pattern": pattern_name, | |
| "supporting_terms": supporting_terms, | |
| "method": "linguistic", | |
| "context_weight": context_weight, | |
| } | |
| ) | |
| # Select best linguistic candidate | |
| if concept_candidates: | |
| concept_candidates.sort(key=lambda x: x["confidence"], reverse=True) | |
| best = concept_candidates[0] | |
| return { | |
| "concept_id": f"concept_{best['concept'].replace(' ', '_')}", | |
| "confidence": best["confidence"], | |
| "supporting_terms": best["supporting_terms"], | |
| "method": "linguistic", | |
| "pattern_used": best["pattern"], | |
| "raw_concept": best["concept"], | |
| } | |
| return None | |
| def _extract_semantic_concept( | |
| self, proto_thought: str, mist: Dict[str, Any] | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Semantic concept extraction using density and relevance analysis. | |
| Algorithm: | |
| 1. Calculate semantic density of terms | |
| 2. Identify key concepts using TF-IDF-like scoring | |
| 3. Apply semantic weighting based on context | |
| 4. Validate using semantic coherence metrics | |
| """ | |
| cleaned_text = self._clean_text(proto_thought) | |
| tokens = self._tokenize(cleaned_text) | |
| if not tokens: | |
| return None | |
| # Calculate term frequencies and semantic weights | |
| term_freq = Counter(tokens) | |
| semantic_scores = {} | |
| for term, freq in term_freq.items(): | |
| if term in self.semantic_weights: | |
| base_weight = self.semantic_weights[term] | |
| else: | |
| base_weight = 0.5 # Default weight for unknown terms | |
| # Position-based weighting (earlier terms often more important) | |
| term_positions = [i for i, token in enumerate(tokens) if token == term] | |
| avg_position = sum(term_positions) / len(term_positions) | |
| position_weight = 1.0 - (avg_position / len(tokens)) # Earlier = higher weight | |
| # Length-based weighting (medium-length terms often most meaningful) | |
| length_weight = 1.0 | |
| if len(term) < 3: | |
| length_weight = 0.3 # Too short | |
| elif len(term) > 15: | |
| length_weight = 0.5 # Too long | |
| elif 4 <= len(term) <= 8: | |
| length_weight = 1.2 # Optimal length | |
| # Context-based weighting from mist metadata | |
| context_weight = self._calculate_context_weight(term, mist) | |
| # Combined semantic score | |
| semantic_score = ( | |
| base_weight | |
| * position_weight | |
| * length_weight | |
| * context_weight | |
| * (freq / len(tokens)) | |
| ) | |
| semantic_scores[term] = semantic_score | |
| if not semantic_scores: | |
| return None | |
| # Select top semantic concept | |
| best_term = max(semantic_scores.items(), key=lambda x: x[1]) | |
| concept, confidence = best_term | |
| # Validate semantic coherence | |
| coherence = self._calculate_semantic_coherence(concept, cleaned_text) | |
| confidence *= coherence | |
| if confidence < 0.3: | |
| return None | |
| supporting_terms = self._extract_semantic_supporting_terms( | |
| concept, semantic_scores | |
| ) | |
| return { | |
| "concept_id": f"concept_{concept.replace(' ', '_')}", | |
| "confidence": min(confidence, 1.0), | |
| "supporting_terms": supporting_terms, | |
| "method": "semantic", | |
| "semantic_score": semantic_scores[concept], | |
| "coherence": coherence, | |
| "raw_concept": concept, | |
| } | |
| def _extract_statistical_concept( | |
| self, proto_thought: str, mist: Dict[str, Any] | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Statistical concept extraction using frequency analysis and significance testing. | |
| Algorithm: | |
| 1. Perform statistical analysis of term frequencies | |
| 2. Calculate z-scores for term significance | |
| 3. Apply chi-square tests for term independence | |
| 4. Select statistically significant concepts | |
| """ | |
| cleaned_text = self._clean_text(proto_thought) | |
| tokens = self._tokenize(cleaned_text) | |
| if len(tokens) < 3: | |
| return None | |
| # Calculate term statistics | |
| term_freq = Counter(tokens) | |
| total_terms = len(tokens) | |
| # Calculate expected frequencies (uniform distribution assumption) | |
| expected_freq = total_terms / len(term_freq) | |
| # Calculate z-scores for term significance | |
| z_scores = {} | |
| for term, observed_freq in term_freq.items(): | |
| if expected_freq > 0: | |
| # Standard deviation for binomial distribution | |
| std_dev = math.sqrt(expected_freq * (1 - 1 / len(term_freq))) | |
| if std_dev > 0: | |
| z_score = (observed_freq - expected_freq) / std_dev | |
| z_scores[term] = z_score | |
| if not z_scores: | |
| return None | |
| # Select most statistically significant term | |
| best_term = max(z_scores.items(), key=lambda x: abs(x[1])) | |
| concept, z_score = best_term | |
| # Calculate p-value (two-tailed test) | |
| p_value = 2 * (1 - self._normal_cdf(abs(z_score))) | |
| # Convert z-score to confidence (bounded between 0 and 1) | |
| confidence = min(abs(z_score) / 3.0, 1.0) # 3 sigma = 100% confidence | |
| # Apply multiple comparison correction (Bonferroni) | |
| corrected_confidence = max(confidence / len(z_scores), 0.1) | |
| # Apply context-based weighting from mist metadata | |
| context_weight = self._calculate_context_weight(concept, mist) | |
| corrected_confidence *= context_weight | |
| if corrected_confidence < 0.3 or p_value > 0.05: | |
| return None | |
| supporting_terms = self._extract_statistical_supporting_terms( | |
| concept, term_freq | |
| ) | |
| return { | |
| "concept_id": f"concept_{concept.replace(' ', '_')}", | |
| "confidence": corrected_confidence, | |
| "supporting_terms": supporting_terms, | |
| "method": "statistical", | |
| "z_score": z_score, | |
| "p_value": p_value, | |
| "statistical_significance": 1 - p_value, | |
| "context_weight": context_weight, | |
| "raw_concept": concept, | |
| } | |
| def _extract_hybrid_concept( | |
| self, proto_thought: str, mist: Dict[str, Any] | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Hybrid concept extraction combining multiple methods with consensus validation. | |
| Algorithm: | |
| 1. Run all extraction methods | |
| 2. Calculate consensus scores | |
| 3. Apply weighted voting | |
| 4. Validate cross-method agreement | |
| """ | |
| # Run all methods | |
| method_results = {} | |
| for method_name in ["linguistic", "semantic", "statistical"]: | |
| try: | |
| method_func = self.extraction_methods[method_name] | |
| result = method_func(proto_thought, mist) | |
| if result: | |
| method_results[method_name] = result | |
| except Exception: # pylint: disable=W0718 | |
| continue | |
| if not method_results: | |
| return None | |
| # Calculate consensus for each concept | |
| concept_consensus: defaultdict[str, Dict[str, Any]] = defaultdict( | |
| lambda: {"methods": [], "confidences": [], "supporting_terms": set()} | |
| ) | |
| for method, result in method_results.items(): | |
| concept = result.get( | |
| "raw_concept", result.get("concept_id", "").replace("concept_", "") | |
| ) | |
| if concept: | |
| concept_consensus[concept]["methods"].append(method) | |
| concept_consensus[concept]["confidences"].append(result["confidence"]) | |
| concept_consensus[concept]["supporting_terms"].update( | |
| result.get("supporting_terms", []) | |
| ) | |
| # Calculate consensus scores | |
| consensus_scores = {} | |
| for concept, data in concept_consensus.items(): | |
| # Method diversity bonus | |
| method_diversity = len(set(data["methods"])) / 3.0 # Max 3 methods | |
| # Average confidence | |
| avg_confidence = sum(data["confidences"]) / len(data["confidences"]) | |
| # Confidence consistency (lower variance = higher consistency) | |
| confidence_variance = sum((c - avg_confidence) ** 2 for c in data["confidences"]) / len( | |
| data["confidences"] | |
| ) | |
| consistency_bonus = 1.0 / (1.0 + confidence_variance) | |
| # Supporting terms richness | |
| supporting_richness = min(len(data["supporting_terms"]) / 5.0, 1.0) | |
| # Combined consensus score | |
| consensus_score = ( | |
| avg_confidence * 0.4 | |
| + method_diversity * 0.3 | |
| + consistency_bonus * 0.2 | |
| + supporting_richness * 0.1 | |
| ) | |
| consensus_scores[concept] = { | |
| "score": consensus_score, | |
| "methods": data["methods"], | |
| "avg_confidence": avg_confidence, | |
| "supporting_terms": list(data["supporting_terms"]), | |
| "method_diversity": method_diversity, | |
| "consistency": consistency_bonus, | |
| } | |
| if not consensus_scores: | |
| return None | |
| # Select best consensus concept | |
| best_concept = max(consensus_scores.items(), key=lambda x: x[1]["score"]) | |
| concept, consensus_data = best_concept | |
| # Validate cross-method agreement | |
| agreement_score = ( | |
| len(consensus_data["methods"]) / 3.0 | |
| ) # Agreement with all possible methods | |
| return { | |
| "concept_id": f"concept_{concept.replace(' ', '_')}", | |
| "confidence": min(consensus_data["score"], 1.0), | |
| "supporting_terms": consensus_data["supporting_terms"], | |
| "method": "hybrid", | |
| "consensus_methods": consensus_data["methods"], | |
| "method_diversity": consensus_data["method_diversity"], | |
| "cross_method_agreement": agreement_score, | |
| "raw_concept": concept, | |
| } | |
| def _heat_node_scientific( | |
| self, concept_id: str, mist: Dict[str, Any], extraction_result: ConceptExtractionResult | |
| ): | |
| """ | |
| Scientific heat calculation with comprehensive metrics and validation. | |
| Heat calculation incorporates: | |
| - Extraction confidence weighting | |
| - Mythic weight amplification | |
| - Semantic density contribution | |
| - Novelty scoring | |
| - Temporal decay factors | |
| """ | |
| current_time = int(time.time()) | |
| if concept_id not in self.nodes: | |
| self.nodes[concept_id] = { | |
| "heat": 0.0, | |
| "room_type": self._determine_room_type(extraction_result), | |
| "creation_epoch": current_time, | |
| "visit_count": 0, | |
| "last_visit": current_time, | |
| "extraction_history": [], | |
| "heat_sources": [], | |
| "semantic_profile": {}, | |
| } | |
| # Calculate scientific heat components | |
| heat_components = { | |
| "base_confidence": extraction_result.confidence * 0.3, | |
| "mythic_amplification": mist.get("mythic_weight", 0.0) * 0.2, | |
| "semantic_density": extraction_result.semantic_density * 0.2, | |
| "novelty_bonus": extraction_result.novelty_score * 0.15, | |
| "technical_clarity": mist.get("technical_clarity", 0.5) * 0.1, | |
| "statistical_significance": extraction_result.statistical_significance * 0.05, | |
| } | |
| # Total heat boost | |
| total_heat_boost = sum(heat_components.values()) | |
| # Apply temporal decay to existing heat | |
| existing_heat = self.nodes[concept_id]["heat"] | |
| last_visit = self.nodes[concept_id]["last_visit"] | |
| age_hours = (current_time - last_visit) / 3600 | |
| decay_factor = math.exp(-age_hours / 48) # 48-hour half-life for heat decay | |
| decayed_heat = existing_heat * decay_factor | |
| # Update node with scientific metrics | |
| self.nodes[concept_id]["heat"] = decayed_heat + total_heat_boost | |
| self.nodes[concept_id]["visit_count"] += 1 | |
| self.nodes[concept_id]["last_visit"] = current_time | |
| self.nodes[concept_id]["extraction_history"].append( | |
| { | |
| "timestamp": current_time, | |
| "confidence": extraction_result.confidence, | |
| "method": extraction_result.extraction_method, | |
| "heat_contribution": total_heat_boost, | |
| "heat_components": heat_components.copy(), | |
| } | |
| ) | |
| self.nodes[concept_id]["heat_sources"].append( | |
| { | |
| "mist_id": mist.get("id"), | |
| "extraction_result_hash": extraction_result.validation_hash, | |
| "timestamp": current_time, | |
| } | |
| ) | |
| # Update semantic profile | |
| self._update_semantic_profile(concept_id, extraction_result) | |
| def _determine_room_type(self, extraction_result: ConceptExtractionResult) -> str: | |
| """Determine room type based on extraction characteristics.""" | |
| confidence = extraction_result.confidence | |
| method = extraction_result.extraction_method | |
| novelty = extraction_result.novelty_score | |
| if confidence > 0.8 and method == "hybrid": | |
| return "throne" | |
| elif confidence > 0.7 and novelty > 0.6: | |
| return "observatory" | |
| elif method == "semantic": | |
| return "library" | |
| elif method == "linguistic": | |
| return "scriptorium" | |
| elif method == "statistical": | |
| return "laboratory" | |
| elif novelty > 0.7: | |
| return "gallery" | |
| else: | |
| return "chamber" | |
| def _update_semantic_profile(self, concept_id: str, extraction_result: ConceptExtractionResult): | |
| """Update semantic profile for a concept.""" | |
| if "semantic_profile" not in self.nodes[concept_id]: | |
| self.nodes[concept_id]["semantic_profile"] = { | |
| "avg_confidence": extraction_result.confidence, | |
| "method_distribution": Counter(), | |
| "supporting_terms": set(), | |
| "semantic_density_history": [], | |
| "novelty_history": [], | |
| } | |
| profile = self.nodes[concept_id]["semantic_profile"] | |
| # Update averages | |
| history_count = len(profile["semantic_density_history"]) + 1 | |
| profile["avg_confidence"] = ( | |
| (profile["avg_confidence"] * (history_count - 1)) + extraction_result.confidence | |
| ) / history_count | |
| # Update method distribution | |
| profile["method_distribution"][extraction_result.extraction_method] += 1 | |
| # Update supporting terms | |
| profile["supporting_terms"].update(extraction_result.supporting_terms) | |
| # Update history | |
| profile["semantic_density_history"].append(extraction_result.semantic_density) | |
| profile["novelty_history"].append(extraction_result.novelty_score) | |
| # Comprehensive helper methods for scientific validation | |
| def _initialize_stop_words(self) -> Set[str]: | |
| """Initialize comprehensive stop words list.""" | |
| return { | |
| "the", | |
| "a", | |
| "an", | |
| "and", | |
| "or", | |
| "but", | |
| "in", | |
| "on", | |
| "at", | |
| "to", | |
| "for", | |
| "of", | |
| "with", | |
| "by", | |
| "is", | |
| "are", | |
| "was", | |
| "were", | |
| "be", | |
| "been", | |
| "being", | |
| "have", | |
| "has", | |
| "had", | |
| "do", | |
| "does", | |
| "did", | |
| "will", | |
| "would", | |
| "could", | |
| "should", | |
| "may", | |
| "might", | |
| "must", | |
| "can", | |
| "this", | |
| "that", | |
| "these", | |
| "those", | |
| "i", | |
| "you", | |
| "he", | |
| "she", | |
| "it", | |
| "we", | |
| "they", | |
| "me", | |
| "him", | |
| "her", | |
| "us", | |
| "them", | |
| "my", | |
| "your", | |
| "his", | |
| "its", | |
| "our", | |
| "their", | |
| "what", | |
| "where", | |
| "when", | |
| "why", | |
| "how", | |
| "who", | |
| "which", | |
| "whom", | |
| "whose", | |
| } | |
| def _initialize_concept_patterns(self) -> Dict[str, Dict[str, Any]]: | |
| """Initialize linguistic patterns for concept extraction.""" | |
| return { | |
| "noun_phrases": { | |
| "regex": re.compile(r"\b([A-Z][a-z]+(?:\s+[a-z]+){0,2})\b"), | |
| "weight": 0.8, | |
| "description": "Capitalized noun phrases", | |
| }, | |
| "technical_terms": { | |
| "regex": re.compile(r"\b([a-z]+(?:_[a-z]+){1,3})\b"), | |
| "weight": 0.7, | |
| "description": "Technical underscore terms", | |
| }, | |
| "action_concepts": { | |
| "regex": re.compile( | |
| r"\b(creat|build|design|implement|develop|generate|" | |
| r"process|analyze|optimiz)\w+\b" | |
| ), | |
| "weight": 0.6, | |
| "description": "Action-oriented concepts", | |
| }, | |
| "domain_concepts": { | |
| "regex": re.compile( | |
| r"\b(system|algorithm|method|framework|pattern|" | |
| r"architecture|structure|model)\w*\b" | |
| ), | |
| "weight": 0.9, | |
| "description": "Domain-specific concepts", | |
| }, | |
| } | |
| def _initialize_semantic_weights(self) -> Dict[str, float]: | |
| """Initialize semantic weights for common terms.""" | |
| return { | |
| # High-weight technical terms | |
| "system": 0.9, | |
| "algorithm": 0.9, | |
| "method": 0.8, | |
| "framework": 0.9, | |
| "pattern": 0.8, | |
| "architecture": 0.9, | |
| "structure": 0.8, | |
| "model": 0.8, | |
| "design": 0.7, | |
| "implement": 0.8, | |
| "develop": 0.7, | |
| "create": 0.7, | |
| "process": 0.6, | |
| "analyze": 0.7, | |
| "optimize": 0.8, | |
| "generate": 0.7, | |
| # Medium-weight conceptual terms | |
| "concept": 0.6, | |
| "idea": 0.5, | |
| "approach": 0.6, | |
| "solution": 0.6, | |
| "strategy": 0.7, | |
| "technique": 0.6, | |
| "principle": 0.6, | |
| "theory": 0.7, | |
| # Lower-weight general terms | |
| "data": 0.4, | |
| "information": 0.4, | |
| "content": 0.3, | |
| "result": 0.3, | |
| "output": 0.3, | |
| "input": 0.3, | |
| "value": 0.3, | |
| "state": 0.3, | |
| } | |
| def _clean_text(self, text: str) -> str: | |
| """Clean and normalize text for processing.""" | |
| # Remove style markers and special characters | |
| cleaned = re.sub(r"\[.*?\]", "", text) | |
| cleaned = re.sub(r"[^\w\s_\-]", " ", cleaned) | |
| cleaned = re.sub(r"\s+", " ", cleaned) | |
| return cleaned.strip().lower() | |
| def _tokenize(self, text: str) -> List[str]: | |
| """Tokenize text into meaningful terms.""" | |
| words = text.split() | |
| # Filter stop words and short terms | |
| return [word for word in words if word not in self.stop_words and len(word) > 2] | |
| def _is_valid_concept(self, concept: str) -> bool: | |
| """Validate if a term is a valid concept.""" | |
| if len(concept) < 3 or len(concept) > 50: | |
| return False | |
| if concept.isdigit(): | |
| return False | |
| if concept in self.stop_words: | |
| return False | |
| return True | |
| def _calculate_linguistic_confidence( | |
| self, concept: str, pattern_config: Dict[str, Any], text: str | |
| ) -> float: | |
| """Calculate confidence for linguistic pattern match.""" | |
| base_confidence = cast(float, pattern_config["weight"]) | |
| # Position weighting (earlier mentions often more important) | |
| first_occurrence = text.lower().find(concept.lower()) | |
| position_weight = 1.0 - (first_occurrence / len(text)) if first_occurrence >= 0 else 0.5 | |
| # Length weighting (medium length often optimal) | |
| length = len(concept) | |
| if 4 <= length <= 8: | |
| length_weight = 1.2 | |
| elif length < 4: | |
| length_weight = 0.6 | |
| else: | |
| length_weight = 0.8 | |
| # Capitalization bonus (if originally capitalized) | |
| capitalization_bonus = 1.1 if concept[0].isupper() else 1.0 | |
| return min(base_confidence * position_weight * length_weight * capitalization_bonus, 1.0) | |
| def _extract_supporting_terms(self, concept: str, text: str) -> List[str]: | |
| """Extract supporting terms around a concept.""" | |
| words = text.split() | |
| supporting = [] | |
| for i, word in enumerate(words): | |
| if concept.lower() in word.lower(): | |
| # Extract context window | |
| start = max(0, i - 3) | |
| end = min(len(words), i + 4) | |
| context_words = words[start:end] | |
| # Add related terms (excluding the concept itself) | |
| for context_word in context_words: | |
| if ( | |
| context_word.lower() != concept.lower() | |
| and context_word not in self.stop_words | |
| and len(context_word) > 2 | |
| and context_word not in supporting | |
| ): | |
| supporting.append(context_word) | |
| return supporting[:5] # Limit to top 5 supporting terms | |
| def _calculate_context_weight(self, term: str, mist: Dict[str, Any]) -> float: | |
| """Calculate context-based weighting for a term.""" | |
| weight = 1.0 | |
| # Style-based weighting | |
| style = mist.get("style", "") | |
| if style == "technical" and term in self.semantic_weights: | |
| weight *= 1.2 | |
| elif style == "poetic" and len(term) > 6: | |
| weight *= 1.1 | |
| # Affect-based weighting | |
| affect = mist.get("affect_signature", {}) | |
| if affect.get("curiosity", 0) > 0.5 and term in ["explore", "discover", "learn"]: | |
| weight *= 1.3 | |
| elif affect.get("awe", 0) > 0.5 and term in ["amazing", "incredible", "beautiful"]: | |
| weight *= 1.2 | |
| return weight | |
| def _calculate_semantic_coherence(self, concept: str, text: str) -> float: | |
| """Calculate semantic coherence of a concept within text.""" | |
| # Simple coherence based on concept repetition and context | |
| concept_lower = concept.lower() | |
| text_lower = text.lower() | |
| # Count concept occurrences | |
| occurrences = text_lower.count(concept_lower) | |
| if occurrences == 0: | |
| return 0.0 | |
| # Calculate context density | |
| words = text_lower.split() | |
| concept_indices = [i for i, word in enumerate(words) if concept_lower in word] | |
| if len(concept_indices) == 1: | |
| return 0.5 # Single occurrence, moderate coherence | |
| # Calculate average distance between occurrences | |
| distances = [ | |
| concept_indices[i + 1] - concept_indices[i] for i in range(len(concept_indices) - 1) | |
| ] | |
| avg_distance = sum(distances) / len(distances) if distances else len(words) | |
| # Closer occurrences = higher coherence | |
| distance_score = max(0.1, 1.0 - (avg_distance / len(words))) | |
| # Frequency bonus | |
| frequency_bonus = min(occurrences / 3.0, 1.0) | |
| return min(distance_score * 0.7 + frequency_bonus * 0.3, 1.0) | |
| def _extract_semantic_supporting_terms( | |
| self, concept: str, semantic_scores: Dict[str, float] | |
| ) -> List[str]: | |
| """Extract supporting terms based on semantic scores.""" | |
| # Get terms with high semantic scores | |
| scored_terms = [ | |
| (term, score) | |
| for term, score in semantic_scores.items() | |
| if term != concept.lower() and score > 0.3 | |
| ] | |
| # Sort by semantic score | |
| scored_terms.sort(key=lambda x: x[1], reverse=True) | |
| return [term for term, _ in scored_terms[:5]] | |
| def _normal_cdf(self, x: float) -> float: | |
| """Approximate normal CDF for statistical calculations.""" | |
| return 0.5 * (1 + math.erf(x / math.sqrt(2))) | |
| def _extract_statistical_supporting_terms( | |
| self, concept: str, term_freq: Counter | |
| ) -> List[str]: | |
| """Extract supporting terms based on statistical frequency.""" | |
| # Get terms with frequency above average | |
| avg_freq = sum(term_freq.values()) / len(term_freq) | |
| frequent_terms = [ | |
| (term, freq) | |
| for term, freq in term_freq.items() | |
| if term != concept.lower() and freq > avg_freq | |
| ] | |
| # Sort by frequency | |
| frequent_terms.sort(key=lambda x: x[1], reverse=True) | |
| return [term for term, _ in frequent_terms[:5]] | |
| def _select_best_extraction( | |
| self, method_results: Dict[str, Dict[str, Any]] | |
| ) -> Dict[str, Any]: | |
| """Select best extraction result from multiple methods.""" | |
| if len(method_results) == 1: | |
| return list(method_results.values())[0] | |
| # Score each result | |
| scored_results = [] | |
| for method, result in method_results.items(): | |
| score = result["confidence"] | |
| # Method preference weighting | |
| method_weights = {"hybrid": 1.2, "semantic": 1.1, "linguistic": 1.0, "statistical": 0.9} | |
| score *= method_weights.get(method, 1.0) | |
| # Supporting terms richness bonus | |
| supporting_bonus = min(len(result.get("supporting_terms", [])) / 3.0, 0.2) | |
| score += supporting_bonus | |
| scored_results.append((result, score)) | |
| # Return highest scored result | |
| scored_results.sort(key=lambda x: x[1], reverse=True) | |
| return scored_results[0][0] | |
| def _calculate_extraction_validation( | |
| self, best_result: Dict[str, Any], proto_thought: str | |
| ) -> Dict[str, Any]: | |
| """Calculate comprehensive validation metrics for extraction.""" | |
| return { | |
| "semantic_density": self._calculate_semantic_density_of_text(proto_thought), | |
| "novelty_score": self._calculate_concept_novelty(best_result.get("raw_concept", "")), | |
| "linguistic_features": self._extract_linguistic_features(proto_thought), | |
| "statistical_significance": best_result.get("statistical_significance", 0.5), | |
| } | |
| def _calculate_semantic_density_of_text(self, text: str) -> float: | |
| """Calculate semantic density of text.""" | |
| words = text.split() | |
| meaningful_words = [w for w in words if w not in self.stop_words and len(w) > 2] | |
| if not meaningful_words: | |
| return 0.0 | |
| # Density = meaningful words / total words | |
| return len(meaningful_words) / len(words) | |
| def _calculate_concept_novelty(self, concept: str) -> float: | |
| """Calculate novelty score for a concept. | |
| Uses the same key used in tracking (concept_id). | |
| """ | |
| # Ensure we look up using the same key used in _track_concept_statistics (concept_id) | |
| concept_key = ( | |
| concept if concept.startswith("concept_") else f"concept_{concept.replace(' ', '_')}" | |
| ) | |
| stats = self.concept_statistics.get(concept_key) | |
| concept_frequency = stats["frequency"] if stats else 0 | |
| if concept_frequency == 0: | |
| return 1.0 # Completely novel | |
| elif concept_frequency == 1: | |
| return 0.7 # Rare | |
| elif concept_frequency <= 5: | |
| return 0.4 # Uncommon | |
| else: | |
| return 0.1 # Common | |
| def _extract_linguistic_features(self, text: str) -> Dict[str, Any]: | |
| """Extract linguistic features from text.""" | |
| words = text.split() | |
| sentences = re.split(r"[.!?]+", text) | |
| return { | |
| "word_count": len(words), | |
| "sentence_count": len([s for s in sentences if s.strip()]), | |
| "avg_word_length": sum(len(w) for w in words) / len(words) if words else 0, | |
| "punctuation_ratio": len(re.findall(r"[^\w\s]", text)) / len(text) if text else 0, | |
| "capitalization_ratio": sum(1 for c in text if c.isupper()) / len(text) if text else 0, | |
| } | |
| def _create_validation_hash( | |
| self, result: Dict[str, Any], proto_thought: str, mist: Dict[str, Any] | |
| ) -> str: | |
| """Create reproducible validation hash.""" | |
| hash_data = { | |
| "concept": result.get("concept_id", ""), | |
| "confidence": result.get("confidence", 0), | |
| "method": result.get("method", ""), | |
| "proto_hash": hashlib.md5(proto_thought.encode()).hexdigest()[:8], | |
| "mist_id": mist.get("id", ""), | |
| "timestamp": int(time.time()), | |
| } | |
| return hashlib.sha256(json.dumps(hash_data, sort_keys=True).encode()).hexdigest()[:16] | |
| def _track_concept_statistics( | |
| self, extraction_result: ConceptExtractionResult, mist: Dict[str, Any] | |
| ): | |
| """Track longitudinal statistics for concepts.""" | |
| concept = extraction_result.concept_id | |
| self.concept_statistics[concept]["frequency"] += 1 | |
| self.concept_statistics[concept]["contexts"].append(mist.get("proto_thought", "")) | |
| self.concept_statistics[concept]["confidence_sum"] += extraction_result.confidence | |
| self.concept_statistics[concept]["last_seen"] = int(time.time()) | |
| def _categorize_novelty(self, novelty_score: float) -> str: | |
| """Categorize novelty score for analysis.""" | |
| if novelty_score > 0.8: | |
| return "highly_novel" | |
| elif novelty_score > 0.5: | |
| return "moderately_novel" | |
| elif novelty_score > 0.2: | |
| return "slightly_novel" | |
| else: | |
| return "well_known" | |
| def _perform_validation_analysis( | |
| self, extraction_results: List[ConceptExtractionResult], mist_lines: List[Dict[str, Any]] | |
| ) -> ConceptValidationMetrics: | |
| """Perform comprehensive validation analysis.""" | |
| if not extraction_results: | |
| return ConceptValidationMetrics(0, 0, 0, 0, 0, 0, 0, 0) | |
| # Calculate precision, recall, F1 (simplified for demonstration) | |
| precision = sum(r.confidence for r in extraction_results) / len(extraction_results) | |
| recall = len(set(r.concept_id for r in extraction_results)) / max(len(mist_lines), 1) | |
| f1_score = ( | |
| 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 | |
| ) | |
| # Semantic coherence | |
| semantic_coherence = sum(r.semantic_density for r in extraction_results) / len( | |
| extraction_results | |
| ) | |
| # Concept uniqueness | |
| unique_concepts = len(set(r.concept_id for r in extraction_results)) | |
| concept_uniqueness = unique_concepts / len(extraction_results) | |
| # Extraction consistency | |
| method_consistency = len(set(r.extraction_method for r in extraction_results)) / len( | |
| extraction_results | |
| ) | |
| extraction_consistency = ( | |
| 1.0 - method_consistency | |
| ) # Lower method diversity = higher consistency | |
| # Statistical significance | |
| statistical_significance = sum( | |
| r.statistical_significance for r in extraction_results | |
| ) / len(extraction_results) | |
| # Effect size (simplified) | |
| effect_size = statistical_significance * semantic_coherence | |
| return ConceptValidationMetrics( | |
| precision=precision, | |
| recall=recall, | |
| f1_score=f1_score, | |
| semantic_coherence=semantic_coherence, | |
| concept_uniqueness=concept_uniqueness, | |
| extraction_consistency=extraction_consistency, | |
| statistical_significance=statistical_significance, | |
| effect_size=effect_size, | |
| ) | |
| def _calculate_semantic_diversity(self, concept_id: str) -> float: | |
| """Calculate semantic diversity of a concept.""" | |
| if concept_id not in self.nodes: | |
| return 0.0 | |
| profile = self.nodes[concept_id].get("semantic_profile", {}) | |
| method_distribution = profile.get("method_distribution", Counter()) | |
| if not method_distribution: | |
| return 0.0 | |
| # Diversity based on method distribution entropy | |
| total_methods = sum(method_distribution.values()) | |
| if total_methods == 0: | |
| return 0.0 | |
| entropy = 0.0 | |
| for count in method_distribution.values(): | |
| if count > 0: | |
| probability = count / total_methods | |
| entropy -= probability * math.log(probability) | |
| # Normalize entropy (max entropy for 4 methods = log(4)) | |
| max_entropy = math.log(len(method_distribution)) | |
| return entropy / max_entropy if max_entropy > 0 else 0.0 | |
| def _log_extraction_error(self, mist: Dict[str, Any], error: str): | |
| """Log extraction errors for analysis with PII redaction.""" | |
| # Redact sensitive information - only log non-sensitive metadata | |
| safe_metadata = { | |
| "timestamp": int(time.time()), | |
| "mist_id": mist.get("id"), | |
| "error_type": type(error).__name__ if isinstance(error, Exception) else "string", | |
| "error_message": str(error)[:50], # Truncate error message | |
| "has_proto_thought": bool(mist.get("proto_thought")), | |
| "proto_length": len(mist.get("proto_thought", "")), | |
| } | |
| # Use structured logging instead of print | |
| logger.error( | |
| "Concept extraction failed", | |
| extra={ | |
| "event": "extraction_error", | |
| "mist_id": safe_metadata["mist_id"], | |
| "error_type": safe_metadata["error_type"], | |
| }, | |
| ) | |
| def _log_method_error(self, method: str, proto_thought: str, error: str): | |
| """Log method-specific errors with secure redaction.""" | |
| # Do not log any portion of proto_thought to prevent PII exposure | |
| safe_log = { | |
| "timestamp": int(time.time()), | |
| "method": method, | |
| "error_type": type(error).__name__ if isinstance(error, Exception) else "string", | |
| "proto_length": len(proto_thought), | |
| "error_message": str(error)[:50], # Truncate error | |
| } | |
| # Use structured logging instead of print | |
| logger.error( | |
| "extraction method failed: %s", | |
| method, | |
| extra={ | |
| "event": "method_error", | |
| "method": safe_log["method"], | |
| "error_type": safe_log["error_type"], | |
| }, | |
| ) | |
| # Scientific analysis and reporting methods | |
| def get_extraction_statistics(self) -> Dict[str, Any]: | |
| """Get comprehensive extraction statistics for analysis.""" | |
| if not self.extraction_history: | |
| return {"status": "no_extractions"} | |
| total_extractions = len(self.extraction_history) | |
| method_counts = Counter(e.extraction_method for e in self.extraction_history) | |
| avg_confidence = sum(e.confidence for e in self.extraction_history) / total_extractions | |
| avg_extraction_time = ( | |
| sum(e.extraction_time_ms for e in self.extraction_history) / total_extractions | |
| ) | |
| return { | |
| "total_extractions": total_extractions, | |
| "method_distribution": dict(method_counts), | |
| "average_confidence": avg_confidence, | |
| "average_extraction_time_ms": avg_extraction_time, | |
| "unique_concepts": len(set(e.concept_id for e in self.extraction_history)), | |
| "concept_statistics": dict(self.concept_statistics), | |
| "validation_metrics": ( | |
| self.validation_metrics[-1].__dict__ if self.validation_metrics else None | |
| ), | |
| } | |
| def export_scientific_data(self) -> Dict[str, Any]: | |
| """Export all data for scientific analysis and reproducibility.""" | |
| return { | |
| "extraction_history": [ | |
| { | |
| "concept_id": e.concept_id, | |
| "confidence": e.confidence, | |
| "extraction_method": e.extraction_method, | |
| "supporting_terms": e.supporting_terms, | |
| "semantic_density": e.semantic_density, | |
| "novelty_score": e.novelty_score, | |
| "validation_hash": e.validation_hash, | |
| "extraction_time_ms": e.extraction_time_ms, | |
| "linguistic_features": e.linguistic_features, | |
| "statistical_significance": e.statistical_significance, | |
| } | |
| for e in self.extraction_history | |
| ], | |
| "concept_statistics": dict(self.concept_statistics), | |
| "validation_metrics": [vm.__dict__ for vm in self.validation_metrics], | |
| "node_data": self.nodes, | |
| "configuration": self.config, | |
| "extraction_timestamp": int(time.time()), | |
| } | |