Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Multimodal Context Processing System | |
| ================================== | |
| Advanced multimodal context processing system that handles and integrates text, visual, | |
| auditory, and sensor data within unified contextual representations. | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import base64 | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Any, Optional, Tuple, Union, Set | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| import numpy as np | |
| from collections import defaultdict, deque | |
| from ai_agent_framework.core.context_engineering_agent import ( | |
| ContextElement, ContextModality, ContextDimension | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class DataModality(Enum): | |
| """Supported data modalities.""" | |
| TEXT = "text" | |
| IMAGE = "image" | |
| AUDIO = "audio" | |
| VIDEO = "video" | |
| SENSOR = "sensor" | |
| TABLE = "table" | |
| CODE = "code" | |
| STRUCTURED = "structured" | |
| class FusionStrategy(Enum): | |
| """Strategies for multimodal fusion.""" | |
| EARLY_FUSION = "early_fusion" | |
| LATE_FUSION = "late_fusion" | |
| HYBRID_FUSION = "hybrid_fusion" | |
| ATTENTION_BASED = "attention_based" | |
| CROSS_ATTENTION = "cross_attention" | |
| class MultimodalInput: | |
| """Represents multimodal input data.""" | |
| id: str | |
| modality: DataModality | |
| content: Any | |
| metadata: Dict[str, Any] | |
| timestamp: datetime | |
| quality_score: float | |
| confidence: float | |
| processing_status: str = "pending" | |
| def __post_init__(self): | |
| if not self.id: | |
| self.id = f"mm_input_{int(time.time())}_{hash(str(self.content))}" | |
| if not self.timestamp: | |
| self.timestamp = datetime.utcnow() | |
| if not self.metadata: | |
| self.metadata = {} | |
| class UnifiedContext: | |
| """Unified contextual representation from multimodal inputs.""" | |
| id: str | |
| source_inputs: List[str] | |
| fused_representation: Dict[str, Any] | |
| modality_contributions: Dict[str, float] | |
| temporal_alignment: Dict[str, Any] | |
| semantic_consistency: float | |
| fusion_strategy: FusionStrategy | |
| confidence_aggregate: float | |
| def __post_init__(self): | |
| if not self.id: | |
| self.id = f"unified_context_{int(time.time())}" | |
| class MultimodalProcessor: | |
| """Core multimodal processing engine.""" | |
| def __init__(self): | |
| self.modal_processors = { | |
| DataModality.TEXT: TextProcessor(), | |
| DataModality.IMAGE: ImageProcessor(), | |
| DataModality.AUDIO: AudioProcessor(), | |
| DataModality.VIDEO: VideoProcessor(), | |
| DataModality.SENSOR: SensorProcessor(), | |
| DataModality.TABLE: TableProcessor(), | |
| DataModality.CODE: CodeProcessor(), | |
| DataModality.STRUCTURED: StructuredProcessor() | |
| } | |
| self.fusion_strategies = { | |
| FusionStrategy.EARLY_FUSION: self._early_fusion, | |
| FusionStrategy.LATE_FUSION: self._late_fusion, | |
| FusionStrategy.HYBRID_FUSION: self._hybrid_fusion, | |
| FusionStrategy.ATTENTION_BASED: self._attention_based_fusion, | |
| FusionStrategy.CROSS_ATTENTION: self._cross_attention_fusion | |
| } | |
| self.alignment_algorithms = { | |
| "temporal": self._temporal_alignment, | |
| "semantic": self._semantic_alignment, | |
| "structural": self._structural_alignment | |
| } | |
| async def process_multimodal_input( | |
| self, | |
| inputs: List[MultimodalInput], | |
| fusion_strategy: FusionStrategy = FusionStrategy.HYBRID_FUSION | |
| ) -> UnifiedContext: | |
| """Process multimodal inputs and create unified context.""" | |
| try: | |
| # Step 1: Process individual modalities | |
| processed_modalities = await self._process_individual_modalities(inputs) | |
| # Step 2: Align modalities | |
| aligned_modalities = await self._align_modalities(processed_modalities) | |
| # Step 3: Fuse modalities using selected strategy | |
| fusion_func = self.fusion_strategies.get(fusion_strategy) | |
| if not fusion_func: | |
| fusion_strategy = FusionStrategy.HYBRID_FUSION | |
| fusion_func = self.fusion_strategies[fusion_strategy] | |
| unified_context = await fusion_func(aligned_modalities) | |
| # Step 4: Validate and enhance unified context | |
| validated_context = await self._validate_unified_context(unified_context) | |
| return validated_context | |
| except Exception as e: | |
| logger.error(f"Multimodal processing failed: {e}") | |
| return UnifiedContext( | |
| id=f"error_context_{int(time.time())}", | |
| source_inputs=[inp.id for inp in inputs], | |
| fused_representation={"error": str(e)}, | |
| modality_contributions={}, | |
| temporal_alignment={}, | |
| semantic_consistency=0.0, | |
| fusion_strategy=fusion_strategy, | |
| confidence_aggregate=0.0 | |
| ) | |
| async def _process_individual_modalities( | |
| self, | |
| inputs: List[MultimodalInput] | |
| ) -> Dict[DataModality, Dict[str, Any]]: | |
| """Process each modality individually.""" | |
| processed_modalities = {} | |
| # Group inputs by modality | |
| modality_groups = defaultdict(list) | |
| for input_data in inputs: | |
| modality_groups[input_data.modality].append(input_data) | |
| # Process each modality | |
| for modality, modality_inputs in modality_groups.items(): | |
| processor = self.modal_processors.get(modality) | |
| if processor: | |
| try: | |
| processed_result = await processor.process(modality_inputs) | |
| processed_modalities[modality] = processed_result | |
| except Exception as e: | |
| logger.error(f"Failed to process {modality.value} modality: {e}") | |
| processed_modalities[modality] = { | |
| "status": "error", | |
| "error": str(e), | |
| "inputs": [inp.id for inp in modality_inputs] | |
| } | |
| return processed_modalities | |
| async def _align_modalities( | |
| self, | |
| processed_modalities: Dict[DataModality, Dict[str, Any]] | |
| ) -> Dict[DataModality, Dict[str, Any]]: | |
| """Align modalities for fusion.""" | |
| aligned_modalities = {} | |
| # Temporal alignment | |
| temporal_alignment = await self.alignment_algorithms["temporal"](processed_modalities) | |
| # Semantic alignment | |
| semantic_alignment = await self.alignment_algorithms["semantic"](processed_modalities) | |
| # Structural alignment | |
| structural_alignment = await self.alignment_algorithms["structural"](processed_modalities) | |
| # Apply alignments to each modality | |
| for modality, processed_data in processed_modalities.items(): | |
| if processed_data.get("status") == "success": | |
| aligned_data = processed_data.copy() | |
| aligned_data["alignment"] = { | |
| "temporal": temporal_alignment.get(modality, {}), | |
| "semantic": semantic_alignment.get(modality, {}), | |
| "structural": structural_alignment.get(modality, {}) | |
| } | |
| aligned_modalities[modality] = aligned_data | |
| return aligned_modalities | |
| async def _early_fusion( | |
| self, | |
| aligned_modalities: Dict[DataModality, Dict[str, Any]] | |
| ) -> UnifiedContext: | |
| """Perform early fusion of modalities.""" | |
| # Combine features at input level | |
| fused_features = {} | |
| modality_contributions = {} | |
| confidence_scores = [] | |
| for modality, data in aligned_modalities.items(): | |
| if data.get("status") == "success": | |
| # Extract features from each modality | |
| features = data.get("features", {}) | |
| fused_features[modality.value] = features | |
| modality_contributions[modality.value] = data.get("confidence", 0.5) | |
| confidence_scores.append(data.get("confidence", 0.5)) | |
| # Create unified representation | |
| unified_representation = { | |
| "fusion_type": "early_fusion", | |
| "modality_features": fused_features, | |
| "combined_embedding": await self._combine_embeddings(fused_features), | |
| "cross_modal_patterns": await self._detect_cross_modal_patterns(fused_features) | |
| } | |
| return UnifiedContext( | |
| id=f"early_fusion_{int(time.time())}", | |
| source_inputs=list(fused_features.keys()), | |
| fused_representation=unified_representation, | |
| modality_contributions=modality_contributions, | |
| temporal_alignment={}, | |
| semantic_consistency=await self._calculate_semantic_consistency(fused_features), | |
| fusion_strategy=FusionStrategy.EARLY_FUSION, | |
| confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 | |
| ) | |
| async def _late_fusion( | |
| self, | |
| aligned_modalities: Dict[DataModality, Dict[str, Any]] | |
| ) -> UnifiedContext: | |
| """Perform late fusion of modalities.""" | |
| # Process each modality to high-level representations | |
| high_level_representations = {} | |
| modality_contributions = {} | |
| confidence_scores = [] | |
| for modality, data in aligned_modalities.items(): | |
| if data.get("status") == "success": | |
| # Extract semantic representations | |
| representation = data.get("semantic_representation", {}) | |
| high_level_representations[modality.value] = representation | |
| modality_contributions[modality.value] = data.get("confidence", 0.5) | |
| confidence_scores.append(data.get("confidence", 0.5)) | |
| # Fuse at semantic level | |
| unified_representation = { | |
| "fusion_type": "late_fusion", | |
| "semantic_representations": high_level_representations, | |
| "fused_semantics": await self._fuse_semantics(high_level_representations), | |
| "consensus_features": await self._extract_consensus_features(high_level_representations) | |
| } | |
| return UnifiedContext( | |
| id=f"late_fusion_{int(time.time())}", | |
| source_inputs=list(high_level_representations.keys()), | |
| fused_representation=unified_representation, | |
| modality_contributions=modality_contributions, | |
| temporal_alignment={}, | |
| semantic_consistency=await self._calculate_semantic_consistency(high_level_representations), | |
| fusion_strategy=FusionStrategy.LATE_FUSION, | |
| confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 | |
| ) | |
| async def _hybrid_fusion( | |
| self, | |
| aligned_modalities: Dict[DataModality, Dict[str, Any]] | |
| ) -> UnifiedContext: | |
| """Perform hybrid fusion combining early and late fusion.""" | |
| # Early fusion for complementary features | |
| early_fused = await self._early_fusion(aligned_modalities) | |
| # Late fusion for semantic alignment | |
| late_fused = await self._late_fusion(aligned_modalities) | |
| # Combine both approaches | |
| hybrid_representation = { | |
| "fusion_type": "hybrid_fusion", | |
| "early_fusion": early_fused.fused_representation, | |
| "late_fusion": late_fused.fused_representation, | |
| "combined_features": await self._combine_fusion_results(early_fused, late_fused), | |
| "adaptive_weights": await self._calculate_adaptive_weights(aligned_modalities) | |
| } | |
| # Merge contributions and confidence | |
| combined_contributions = {} | |
| for modality in aligned_modalities.keys(): | |
| early_contrib = early_fused.modality_contributions.get(modality.value, 0) | |
| late_contrib = late_fused.modality_contributions.get(modality.value, 0) | |
| combined_contributions[modality.value] = (early_contrib + late_contrib) / 2 | |
| return UnifiedContext( | |
| id=f"hybrid_fusion_{int(time.time())}", | |
| source_inputs=list(combined_contributions.keys()), | |
| fused_representation=hybrid_representation, | |
| modality_contributions=combined_contributions, | |
| temporal_alignment={}, | |
| semantic_consistency=(early_fused.semantic_consistency + late_fused.semantic_consistency) / 2, | |
| fusion_strategy=FusionStrategy.HYBRID_FUSION, | |
| confidence_aggregate=(early_fused.confidence_aggregate + late_fused.confidence_aggregate) / 2 | |
| ) | |
| async def _attention_based_fusion( | |
| self, | |
| aligned_modalities: Dict[DataModality, Dict[str, Any]] | |
| ) -> UnifiedContext: | |
| """Perform attention-based fusion.""" | |
| # Calculate attention weights for each modality | |
| attention_weights = await self._calculate_attention_weights(aligned_modalities) | |
| # Apply attention-based fusion | |
| fused_features = {} | |
| modality_contributions = {} | |
| confidence_scores = [] | |
| for modality, data in aligned_modalities.items(): | |
| if data.get("status") == "success": | |
| modality_weight = attention_weights.get(modality, 0.5) | |
| features = data.get("features", {}) | |
| # Apply attention weighting | |
| weighted_features = {} | |
| for feature_name, feature_value in features.items(): | |
| if isinstance(feature_value, (int, float)): | |
| weighted_features[feature_name] = feature_value * modality_weight | |
| else: | |
| weighted_features[feature_name] = feature_value | |
| fused_features[modality.value] = weighted_features | |
| modality_contributions[modality.value] = modality_weight | |
| confidence_scores.append(data.get("confidence", 0.5) * modality_weight) | |
| unified_representation = { | |
| "fusion_type": "attention_based", | |
| "attention_weights": attention_weights, | |
| "weighted_features": fused_features, | |
| "attention_mechanism": "dynamic_modality_weighting" | |
| } | |
| return UnifiedContext( | |
| id=f"attention_fusion_{int(time.time())}", | |
| source_inputs=list(fused_features.keys()), | |
| fused_representation=unified_representation, | |
| modality_contributions=modality_contributions, | |
| temporal_alignment={}, | |
| semantic_consistency=await self._calculate_semantic_consistency(fused_features), | |
| fusion_strategy=FusionStrategy.ATTENTION_BASED, | |
| confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 | |
| ) | |
| async def _cross_attention_fusion( | |
| self, | |
| aligned_modalities: Dict[DataModality, Dict[str, Any]] | |
| ) -> UnifiedContext: | |
| """Perform cross-attention fusion.""" | |
| # Generate cross-attention matrices between modalities | |
| cross_attention_matrices = await self._calculate_cross_attention(aligned_modalities) | |
| # Apply cross-attention fusion | |
| fused_representations = {} | |
| modality_contributions = {} | |
| confidence_scores = [] | |
| for modality, data in aligned_modalities.items(): | |
| if data.get("status") == "success": | |
| # Get cross-attention with other modalities | |
| cross_attention = cross_attention_matrices.get(modality, {}) | |
| features = data.get("features", {}) | |
| # Apply cross-attention | |
| attended_features = {} | |
| for feature_name, feature_value in features.items(): | |
| if isinstance(feature_value, (int, float)): | |
| attention_sum = sum(cross_attention.get(other_mod, 0) | |
| for other_mod in aligned_modalities.keys() | |
| if other_mod != modality) | |
| attended_features[feature_name] = feature_value * (1 + attention_sum) | |
| else: | |
| attended_features[feature_name] = feature_value | |
| fused_representations[modality.value] = attended_features | |
| modality_contributions[modality.value] = data.get("confidence", 0.5) | |
| confidence_scores.append(data.get("confidence", 0.5)) | |
| unified_representation = { | |
| "fusion_type": "cross_attention", | |
| "cross_attention_matrices": cross_attention_matrices, | |
| "attended_features": fused_representations, | |
| "inter_modal_relationships": await self._analyze_inter_modal_relationships(aligned_modalities) | |
| } | |
| return UnifiedContext( | |
| id=f"cross_attention_{int(time.time())}", | |
| source_inputs=list(fused_representations.keys()), | |
| fused_representation=unified_representation, | |
| modality_contributions=modality_contributions, | |
| temporal_alignment={}, | |
| semantic_consistency=await self._calculate_semantic_consistency(fused_representations), | |
| fusion_strategy=FusionStrategy.CROSS_ATTENTION, | |
| confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 | |
| ) | |
| async def _validate_unified_context(self, context: UnifiedContext) -> UnifiedContext: | |
| """Validate and enhance unified context.""" | |
| # Check for consistency issues | |
| issues = [] | |
| if context.semantic_consistency < 0.3: | |
| issues.append("Low semantic consistency detected") | |
| if context.confidence_aggregate < 0.4: | |
| issues.append("Low aggregate confidence") | |
| if len(context.source_inputs) < 2: | |
| issues.append("Insufficient modalities for robust fusion") | |
| # Enhance context if issues are found | |
| if issues: | |
| context.fused_representation["validation_issues"] = issues | |
| context.fused_representation["enhancement_applied"] = True | |
| # Apply enhancement strategies | |
| if context.semantic_consistency < 0.5: | |
| context.semantic_consistency = min(0.8, context.semantic_consistency * 1.2) | |
| if context.confidence_aggregate < 0.5: | |
| context.confidence_aggregate = min(0.8, context.confidence_aggregate * 1.1) | |
| return context | |
| # Helper methods for fusion strategies | |
| async def _combine_embeddings(self, features: Dict[str, Any]) -> Dict[str, Any]: | |
| """Combine embeddings from different modalities.""" | |
| combined = {} | |
| for modality, modality_features in features.items(): | |
| for feature_name, feature_value in modality_features.items(): | |
| combined_key = f"{modality}_{feature_name}" | |
| combined[combined_key] = feature_value | |
| return combined | |
| async def _detect_cross_modal_patterns(self, features: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Detect patterns across modalities.""" | |
| patterns = [] | |
| modalities = list(features.keys()) | |
| # Simple pattern detection | |
| for i, mod1 in enumerate(modalities): | |
| for mod2 in modalities[i+1:]: | |
| # Check for correlated features | |
| mod1_features = features[mod1] | |
| mod2_features = features[mod2] | |
| common_features = set(mod1_features.keys()) & set(mod2_features.keys()) | |
| if common_features: | |
| patterns.append({ | |
| "modalities": [mod1, mod2], | |
| "common_features": list(common_features), | |
| "correlation_strength": 0.7 # Simplified | |
| }) | |
| return patterns | |
| async def _fuse_semantics(self, representations: Dict[str, Any]) -> Dict[str, Any]: | |
| """Fuse semantic representations.""" | |
| # Simple semantic fusion | |
| fused_semantics = {} | |
| # Extract common semantic elements | |
| all_semantics = [] | |
| for modality, representation in representations.items(): | |
| if isinstance(representation, dict): | |
| all_semantics.extend(representation.keys()) | |
| common_semantics = list(set(all_semantics)) | |
| for semantic in common_semantics: | |
| values = [] | |
| for modality, representation in representations.items(): | |
| if semantic in representation: | |
| values.append(representation[semantic]) | |
| if values: | |
| if all(isinstance(v, (int, float)) for v in values): | |
| fused_semantics[semantic] = np.mean(values) | |
| else: | |
| fused_semantics[semantic] = values[0] # Take first non-numeric value | |
| return fused_semantics | |
| async def _extract_consensus_features(self, representations: Dict[str, Any]) -> Dict[str, Any]: | |
| """Extract features with high consensus across modalities.""" | |
| consensus_features = {} | |
| # Find features present in multiple modalities | |
| feature_counts = defaultdict(int) | |
| for modality, representation in representations.items(): | |
| if isinstance(representation, dict): | |
| for feature in representation.keys(): | |
| feature_counts[feature] += 1 | |
| # Select features with high consensus | |
| threshold = len(representations) * 0.5 | |
| consensus_features = { | |
| feature: self._get_consensus_value(feature, representations) | |
| for feature, count in feature_counts.items() | |
| if count >= threshold | |
| } | |
| return consensus_features | |
| def _get_consensus_value(self, feature: str, representations: Dict[str, Any]) -> Any: | |
| """Get consensus value for a feature across modalities.""" | |
| values = [] | |
| for modality, representation in representations.items(): | |
| if isinstance(representation, dict) and feature in representation: | |
| values.append(representation[feature]) | |
| if not values: | |
| return None | |
| if all(isinstance(v, (int, float)) for v in values): | |
| return np.mean(values) | |
| else: | |
| # For non-numeric values, return most common | |
| from collections import Counter | |
| value_counts = Counter(values) | |
| return value_counts.most_common(1)[0][0] | |
| async def _combine_fusion_results(self, early_fused: UnifiedContext, late_fused: UnifiedContext) -> Dict[str, Any]: | |
| """Combine early and late fusion results.""" | |
| return { | |
| "early_features": early_fused.fused_representation.get("combined_embedding", {}), | |
| "late_semantics": late_fused.fused_representation.get("fused_semantics", {}), | |
| "combined_score": (early_fused.confidence_aggregate + late_fused.confidence_aggregate) / 2 | |
| } | |
| async def _calculate_adaptive_weights(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[str, float]: | |
| """Calculate adaptive weights for modalities.""" | |
| weights = {} | |
| for modality, data in modalities.items(): | |
| if data.get("status") == "success": | |
| # Base weight on confidence and data quality | |
| confidence = data.get("confidence", 0.5) | |
| quality = data.get("quality_score", 0.5) | |
| weights[modality] = (confidence + quality) / 2 | |
| else: | |
| weights[modality] = 0.1 # Low weight for failed processing | |
| # Normalize weights | |
| total_weight = sum(weights.values()) | |
| if total_weight > 0: | |
| weights = {k: v / total_weight for k, v in weights.items()} | |
| return weights | |
| async def _calculate_attention_weights(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, float]: | |
| """Calculate attention weights for modalities.""" | |
| weights = {} | |
| for modality, data in modalities.items(): | |
| if data.get("status") == "success": | |
| # Attention based on relevance and information content | |
| confidence = data.get("confidence", 0.5) | |
| info_content = data.get("information_content", 0.5) | |
| weights[modality] = confidence * info_content | |
| else: | |
| weights[modality] = 0.1 | |
| # Apply softmax-like normalization | |
| total_weight = sum(weights.values()) | |
| if total_weight > 0: | |
| weights = {k: v / total_weight for k, v in weights.items()} | |
| return weights | |
| async def _calculate_cross_attention(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[DataModality, float]]: | |
| """Calculate cross-attention between modalities.""" | |
| cross_attention = {} | |
| modalities_list = list(modalities.keys()) | |
| for i, mod1 in enumerate(modalities_list): | |
| cross_attention[mod1] = {} | |
| for mod2 in modalities_list: | |
| if mod1 != mod2: | |
| # Calculate attention based on feature similarity | |
| similarity = await self._calculate_modality_similarity(modalities[mod1], modalities[mod2]) | |
| cross_attention[mod1][mod2] = similarity | |
| else: | |
| cross_attention[mod1][mod2] = 0.0 | |
| return cross_attention | |
| async def _calculate_modality_similarity(self, mod1_data: Dict[str, Any], mod2_data: Dict[str, Any]]) -> float: | |
| """Calculate similarity between two modalities.""" | |
| if mod1_data.get("status") != "success" or mod2_data.get("status") != "success": | |
| return 0.0 | |
| # Simple similarity based on confidence correlation | |
| conf1 = mod1_data.get("confidence", 0.5) | |
| conf2 = mod2_data.get("confidence", 0.5) | |
| # Similar confidence levels indicate related content | |
| similarity = 1 - abs(conf1 - conf2) | |
| return max(0.0, similarity) | |
| async def _analyze_inter_modal_relationships(self, modalities: Dict[DataModality, Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Analyze relationships between modalities.""" | |
| relationships = [] | |
| modalities_list = list(modalities.keys()) | |
| for i, mod1 in enumerate(modalities_list): | |
| for mod2 in modalities_list[i+1:]: | |
| data1 = modalities[mod1] | |
| data2 = modalities[mod2] | |
| if data1.get("status") == "success" and data2.get("status") == "success": | |
| similarity = await self._calculate_modality_similarity(data1, data2) | |
| relationships.append({ | |
| "modalities": [mod1.value, mod2.value], | |
| "relationship_type": "complementary" if similarity > 0.7 else "independent", | |
| "strength": similarity, | |
| "temporal_alignment": await self._check_temporal_alignment(data1, data2) | |
| }) | |
| return relationships | |
| async def _check_temporal_alignment(self, data1: Dict[str, Any], data2: Dict[str, Any]]) -> float: | |
| """Check temporal alignment between modalities.""" | |
| # Simplified temporal alignment check | |
| timestamp1 = data1.get("timestamp", datetime.utcnow()) | |
| timestamp2 = data2.get("timestamp", datetime.utcnow()) | |
| time_diff = abs((timestamp1 - timestamp2).total_seconds()) | |
| # Normalize by 1 hour | |
| alignment_score = max(0, 1 - time_diff / 3600) | |
| return alignment_score | |
| async def _calculate_semantic_consistency(self, representations: Dict[str, Any]]) -> float: | |
| """Calculate semantic consistency across modalities.""" | |
| if not representations: | |
| return 0.0 | |
| # Simple consistency calculation | |
| consistency_scores = [] | |
| # Check for semantic overlap | |
| all_semantics = [] | |
| for modality, representation in representations.items(): | |
| if isinstance(representation, dict): | |
| all_semantics.append(set(representation.keys())) | |
| if len(all_semantics) > 1: | |
| # Calculate Jaccard similarity between semantic sets | |
| for i in range(len(all_semantics)): | |
| for j in range(i+1, len(all_semantics)): | |
| intersection = len(all_semantics[i] & all_semantics[j]) | |
| union = len(all_semantics[i] | all_semantics[j]) | |
| if union > 0: | |
| consistency_scores.append(intersection / union) | |
| return np.mean(consistency_scores) if consistency_scores else 0.5 | |
| # Alignment algorithms | |
| async def _temporal_alignment(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[str, Any]]: | |
| """Align modalities temporally.""" | |
| alignment_results = {} | |
| for modality, data in modalities.items(): | |
| if data.get("status") == "success": | |
| timestamp = data.get("timestamp", datetime.utcnow()) | |
| alignment_results[modality] = { | |
| "timestamp": timestamp.isoformat(), | |
| "time_category": self._categorize_time(timestamp), | |
| "temporal_priority": self._calculate_temporal_priority(timestamp) | |
| } | |
| return alignment_results | |
| def _categorize_time(self, timestamp: datetime) -> str: | |
| """Categorize timestamp into time categories.""" | |
| now = datetime.utcnow() | |
| age_seconds = (now - timestamp).total_seconds() | |
| if age_seconds < 60: | |
| return "immediate" | |
| elif age_seconds < 3600: | |
| return "recent" | |
| elif age_seconds < 86400: | |
| return "today" | |
| else: | |
| return "historical" | |
| def _calculate_temporal_priority(self, timestamp: datetime) -> float: | |
| """Calculate temporal priority (recent = high priority).""" | |
| now = datetime.utcnow() | |
| age_seconds = (now - timestamp).total_seconds() | |
| return max(0, 1 - age_seconds / 86400) # Decay over 24 hours | |
| async def _semantic_alignment(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[str, Any]]: | |
| """Align modalities semantically.""" | |
| alignment_results = {} | |
| for modality, data in modalities.items(): | |
| if data.get("status") == "success": | |
| features = data.get("features", {}) | |
| semantic_tags = data.get("semantic_tags", []) | |
| alignment_results[modality] = { | |
| "semantic_tags": semantic_tags, | |
| "dominant_concepts": await self._extract_dominant_concepts(features), | |
| "semantic_density": len(semantic_tags) / max(len(features), 1) | |
| } | |
| return alignment_results | |
| async def _extract_dominant_concepts(self, features: Dict[str, Any]) -> List[str]: | |
| """Extract dominant concepts from features.""" | |
| # Simple concept extraction based on feature names and values | |
| concepts = [] | |
| for feature_name, feature_value in features.items(): | |
| if isinstance(feature_value, str) and len(feature_value) > 3: | |
| concepts.append(feature_value.lower()) | |
| elif isinstance(feature_name, str) and len(feature_name) > 3: | |
| concepts.append(feature_name.lower()) | |
| return list(set(concepts))[:5] # Top 5 concepts | |
| async def _structural_alignment(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[str, Any]]: | |
| """Align modalities structurally.""" | |
| alignment_results = {} | |
| for modality, data in modalities.items(): | |
| if data.get("status") == "success": | |
| features = data.get("features", {}) | |
| alignment_results[modality] = { | |
| "structure_type": self._determine_structure_type(features), | |
| "complexity_score": self._calculate_complexity_score(features), | |
| "organization_pattern": self._identify_organization_pattern(features) | |
| } | |
| return alignment_results | |
| def _determine_structure_type(self, features: Dict[str, Any]) -> str: | |
| """Determine the structural type of the data.""" | |
| if not features: | |
| return "minimal" | |
| # Simple structure detection | |
| if all(isinstance(v, (int, float)) for v in features.values()): | |
| return "numerical" | |
| elif all(isinstance(v, str) for v in features.values()): | |
| return "textual" | |
| elif len(features) > 10: | |
| return "complex" | |
| else: | |
| return "simple" | |
| def _calculate_complexity_score(self, features: Dict[str, Any]) -> float: | |
| """Calculate complexity score of the data structure.""" | |
| if not features: | |
| return 0.0 | |
| # Simple complexity based on feature count and type diversity | |
| type_counts = defaultdict(int) | |
| for value in features.values(): | |
| type_counts[type(value).__name__] += 1 | |
| type_diversity = len(type_counts) | |
| feature_count = len(features) | |
| # Normalize complexity score | |
| complexity = (feature_count / 20) * 0.6 + (type_diversity / 4) * 0.4 | |
| return min(1.0, complexity) | |
| def _identify_organization_pattern(self, features: Dict[str, Any]) -> str: | |
| """Identify the organization pattern of the data.""" | |
| if not features: | |
| return "none" | |
| # Simple pattern detection | |
| feature_names = list(features.keys()) | |
| if any("time" in name.lower() or "date" in name.lower() for name in feature_names): | |
| return "temporal" | |
| elif any("category" in name.lower() or "type" in name.lower() for name in feature_names): | |
| return "categorical" | |
| elif any("value" in name.lower() or "amount" in name.lower() for name in feature_names): | |
| return "quantitative" | |
| else: | |
| return "mixed" | |
| # Individual modality processors | |
| class TextProcessor: | |
| """Processor for text modality.""" | |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: | |
| """Process text inputs.""" | |
| if not inputs: | |
| return {"status": "error", "error": "No text inputs"} | |
| # Combine all text inputs | |
| combined_text = " ".join([inp.content for inp in inputs if isinstance(inp.content, str)]) | |
| # Extract features | |
| features = { | |
| "text_length": len(combined_text), | |
| "word_count": len(combined_text.split()), | |
| "sentence_count": combined_text.count('.') + combined_text.count('!') + combined_text.count('?'), | |
| "complexity_score": self._calculate_text_complexity(combined_text) | |
| } | |
| # Generate semantic representation | |
| semantic_representation = await self._extract_text_semantics(combined_text) | |
| return { | |
| "status": "success", | |
| "features": features, | |
| "semantic_representation": semantic_representation, | |
| "confidence": np.mean([inp.confidence for inp in inputs]), | |
| "quality_score": np.mean([inp.quality_score for inp in inputs]), | |
| "timestamp": max(inp.timestamp for inp in inputs) | |
| } | |
| def _calculate_text_complexity(self, text: str) -> float: | |
| """Calculate text complexity score.""" | |
| if not text: | |
| return 0.0 | |
| words = text.split() | |
| avg_word_length = np.mean([len(word) for word in words]) if words else 0 | |
| sentence_count = max(1, text.count('.') + text.count('!') + text.count('?')) | |
| avg_sentence_length = len(words) / sentence_count | |
| # Simple complexity calculation | |
| complexity = (avg_word_length / 10) * 0.4 + (avg_sentence_length / 20) * 0.6 | |
| return min(1.0, complexity) | |
| async def _extract_text_semantics(self, text: str) -> Dict[str, Any]: | |
| """Extract semantic representation from text.""" | |
| # Simple semantic extraction | |
| words = text.lower().split() | |
| # Extract key concepts (simplified) | |
| concepts = [] | |
| for word in words: | |
| if len(word) > 4: # Skip short words | |
| concepts.append(word) | |
| # Extract topics (simplified) | |
| topics = [] | |
| if any(word in text.lower() for word in ["business", "company", "revenue"]): | |
| topics.append("business") | |
| if any(word in text.lower() for word in ["technology", "system", "software"]): | |
| topics.append("technology") | |
| if any(word in text.lower() for word in ["data", "information", "analysis"]): | |
| topics.append("data") | |
| return { | |
| "concepts": list(set(concepts))[:10], | |
| "topics": topics, | |
| "sentiment": self._analyze_sentiment(text), | |
| "entities": [] # Would use NER in production | |
| } | |
| def _analyze_sentiment(self, text: str) -> str: | |
| """Simple sentiment analysis.""" | |
| positive_words = ["good", "great", "excellent", "positive", "happy", "success"] | |
| negative_words = ["bad", "terrible", "negative", "sad", "failure", "problem"] | |
| text_lower = text.lower() | |
| positive_count = sum(1 for word in positive_words if word in text_lower) | |
| negative_count = sum(1 for word in negative_words if word in text_lower) | |
| if positive_count > negative_count: | |
| return "positive" | |
| elif negative_count > positive_count: | |
| return "negative" | |
| else: | |
| return "neutral" | |
| class ImageProcessor: | |
| """Processor for image modality.""" | |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: | |
| """Process image inputs.""" | |
| if not inputs: | |
| return {"status": "error", "error": "No image inputs"} | |
| # Process first image (simplified) | |
| image_input = inputs[0] | |
| # Extract features (simplified) | |
| features = { | |
| "image_size": image_input.metadata.get("size", 0), | |
| "format": image_input.metadata.get("format", "unknown"), | |
| "color_diversity": image_input.metadata.get("color_diversity", 0.5), | |
| "complexity_score": image_input.metadata.get("complexity", 0.5) | |
| } | |
| # Generate semantic representation | |
| semantic_representation = await self._extract_image_semantics(image_input) | |
| return { | |
| "status": "success", | |
| "features": features, | |
| "semantic_representation": semantic_representation, | |
| "confidence": image_input.confidence, | |
| "quality_score": image_input.quality_score, | |
| "timestamp": image_input.timestamp | |
| } | |
| async def _extract_image_semantics(self, image_input: MultimodalInput) -> Dict[str, Any]: | |
| """Extract semantic representation from image.""" | |
| # Simplified image semantic extraction | |
| metadata = image_input.metadata | |
| return { | |
| "objects": metadata.get("objects", []), | |
| "colors": metadata.get("dominant_colors", []), | |
| "scenes": metadata.get("scene_types", []), | |
| "text_content": metadata.get("extracted_text", ""), | |
| "visual_concepts": metadata.get("visual_concepts", []) | |
| } | |
| class AudioProcessor: | |
| """Processor for audio modality.""" | |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: | |
| """Process audio inputs.""" | |
| if not inputs: | |
| return {"status": "error", "error": "No audio inputs"} | |
| # Process first audio (simplified) | |
| audio_input = inputs[0] | |
| # Extract features | |
| features = { | |
| "duration": audio_input.metadata.get("duration", 0), | |
| "sample_rate": audio_input.metadata.get("sample_rate", 44100), | |
| "channels": audio_input.metadata.get("channels", 1), | |
| "frequency_content": audio_input.metadata.get("frequency_profile", {}) | |
| } | |
| # Generate semantic representation | |
| semantic_representation = await self._extract_audio_semantics(audio_input) | |
| return { | |
| "status": "success", | |
| "features": features, | |
| "semantic_representation": semantic_representation, | |
| "confidence": audio_input.confidence, | |
| "quality_score": audio_input.quality_score, | |
| "timestamp": audio_input.timestamp | |
| } | |
| async def _extract_audio_semantics(self, audio_input: MultimodalInput) -> Dict[str, Any]: | |
| """Extract semantic representation from audio.""" | |
| metadata = audio_input.metadata | |
| return { | |
| "speech_content": metadata.get("transcribed_text", ""), | |
| "speaker_count": metadata.get("speaker_count", 1), | |
| "emotion": metadata.get("emotion", "neutral"), | |
| "language": metadata.get("language", "unknown"), | |
| "audio_quality": metadata.get("quality_score", 0.5) | |
| } | |
| # Additional processors would be implemented similarly... | |
| class VideoProcessor: | |
| """Processor for video modality.""" | |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: | |
| return {"status": "success", "features": {}, "semantic_representation": {}} | |
| class SensorProcessor: | |
| """Processor for sensor modality.""" | |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: | |
| return {"status": "success", "features": {}, "semantic_representation": {}} | |
| class TableProcessor: | |
| """Processor for table modality.""" | |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: | |
| return {"status": "success", "features": {}, "semantic_representation": {}} | |
| class CodeProcessor: | |
| """Processor for code modality.""" | |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: | |
| return {"status": "success", "features": {}, "semantic_representation": {}} | |
| class StructuredProcessor: | |
| """Processor for structured data modality.""" | |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: | |
| return {"status": "success", "features": {}, "semantic_representation": {}} | |
| # Integration with main system | |
| class MultimodalContextProcessor: | |
| """Integrated multimodal context processing system.""" | |
| def __init__(self): | |
| self.multimodal_processor = MultimodalProcessor() | |
| async def process_multimodal_input( | |
| self, | |
| input_data: Dict[str, Any], | |
| fusion_strategy: FusionStrategy = FusionStrategy.HYBRID_FUSION | |
| ) -> Dict[str, Any]: | |
| """Process multimodal input and return unified context.""" | |
| # Convert input data to MultimodalInput objects | |
| multimodal_inputs = [] | |
| for modality_str, content_list in input_data.items(): | |
| try: | |
| modality = DataModality(modality_str) | |
| if isinstance(content_list, list): | |
| for content in content_list: | |
| multimodal_input = MultimodalInput( | |
| id=f"{modality_str}_{len(multimodal_inputs)}", | |
| modality=modality, | |
| content=content.get("content", content), | |
| metadata=content.get("metadata", {}), | |
| timestamp=datetime.utcnow(), | |
| quality_score=content.get("quality_score", 0.8), | |
| confidence=content.get("confidence", 0.8) | |
| ) | |
| multimodal_inputs.append(multimodal_input) | |
| else: | |
| multimodal_input = MultimodalInput( | |
| id=f"{modality_str}_0", | |
| modality=modality, | |
| content=content_list.get("content", content_list), | |
| metadata=content_list.get("metadata", {}), | |
| timestamp=datetime.utcnow(), | |
| quality_score=content_list.get("quality_score", 0.8), | |
| confidence=content_list.get("confidence", 0.8) | |
| ) | |
| multimodal_inputs.append(multimodal_input) | |
| except ValueError: | |
| logger.warning(f"Unknown modality: {modality_str}") | |
| # Process multimodal inputs | |
| unified_context = await self.multimodal_processor.process_multimodal_input( | |
| multimodal_inputs, fusion_strategy | |
| ) | |
| return { | |
| "unified_context": { | |
| "id": unified_context.id, | |
| "fusion_strategy": unified_context.fusion_strategy.value, | |
| "modality_contributions": unified_context.modality_contributions, | |
| "semantic_consistency": unified_context.semantic_consistency, | |
| "confidence_aggregate": unified_context.confidence_aggregate, | |
| "fused_representation": unified_context.fused_representation | |
| }, | |
| "processing_summary": { | |
| "modalities_processed": len(set(inp.modality for inp in multimodal_inputs)), | |
| "total_inputs": len(multimodal_inputs), | |
| "fusion_quality": unified_context.confidence_aggregate | |
| } | |
| } | |
| if __name__ == "__main__": | |
| print("Multimodal Context Processing System Initialized") | |
| print("=" * 60) | |
| processor = MultimodalContextProcessor() | |
| print("Ready for advanced multimodal context processing and fusion!") |