| """ |
| Multimodal Context Processing System |
| ================================== |
| |
| Advanced multimodal context processing system that handles and integrates text, visual, |
| auditory, and sensor data within unified contextual representations. |
| """ |
|
|
| import asyncio |
| import json |
| import logging |
| import base64 |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Any, Optional, Tuple, Union, Set |
| from dataclasses import dataclass, field |
| from enum import Enum |
| import numpy as np |
| from collections import defaultdict, deque |
|
|
| from ai_agent_framework.core.context_engineering_agent import ( |
| ContextElement, ContextModality, ContextDimension |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class DataModality(Enum): |
| """Supported data modalities.""" |
| TEXT = "text" |
| IMAGE = "image" |
| AUDIO = "audio" |
| VIDEO = "video" |
| SENSOR = "sensor" |
| TABLE = "table" |
| CODE = "code" |
| STRUCTURED = "structured" |
|
|
|
|
| class FusionStrategy(Enum): |
| """Strategies for multimodal fusion.""" |
| EARLY_FUSION = "early_fusion" |
| LATE_FUSION = "late_fusion" |
| HYBRID_FUSION = "hybrid_fusion" |
| ATTENTION_BASED = "attention_based" |
| CROSS_ATTENTION = "cross_attention" |
|
|
|
|
| @dataclass |
| class MultimodalInput: |
| """Represents multimodal input data.""" |
| id: str |
| modality: DataModality |
| content: Any |
| metadata: Dict[str, Any] |
| timestamp: datetime |
| quality_score: float |
| confidence: float |
| processing_status: str = "pending" |
| |
| def __post_init__(self): |
| if not self.id: |
| self.id = f"mm_input_{int(time.time())}_{hash(str(self.content))}" |
| if not self.timestamp: |
| self.timestamp = datetime.utcnow() |
| if not self.metadata: |
| self.metadata = {} |
|
|
|
|
| @dataclass |
| class UnifiedContext: |
| """Unified contextual representation from multimodal inputs.""" |
| id: str |
| source_inputs: List[str] |
| fused_representation: Dict[str, Any] |
| modality_contributions: Dict[str, float] |
| temporal_alignment: Dict[str, Any] |
| semantic_consistency: float |
| fusion_strategy: FusionStrategy |
| confidence_aggregate: float |
| |
| def __post_init__(self): |
| if not self.id: |
| self.id = f"unified_context_{int(time.time())}" |
|
|
|
|
| class MultimodalProcessor: |
| """Core multimodal processing engine.""" |
| |
| def __init__(self): |
| self.modal_processors = { |
| DataModality.TEXT: TextProcessor(), |
| DataModality.IMAGE: ImageProcessor(), |
| DataModality.AUDIO: AudioProcessor(), |
| DataModality.VIDEO: VideoProcessor(), |
| DataModality.SENSOR: SensorProcessor(), |
| DataModality.TABLE: TableProcessor(), |
| DataModality.CODE: CodeProcessor(), |
| DataModality.STRUCTURED: StructuredProcessor() |
| } |
| |
| self.fusion_strategies = { |
| FusionStrategy.EARLY_FUSION: self._early_fusion, |
| FusionStrategy.LATE_FUSION: self._late_fusion, |
| FusionStrategy.HYBRID_FUSION: self._hybrid_fusion, |
| FusionStrategy.ATTENTION_BASED: self._attention_based_fusion, |
| FusionStrategy.CROSS_ATTENTION: self._cross_attention_fusion |
| } |
| |
| self.alignment_algorithms = { |
| "temporal": self._temporal_alignment, |
| "semantic": self._semantic_alignment, |
| "structural": self._structural_alignment |
| } |
| |
| async def process_multimodal_input( |
| self, |
| inputs: List[MultimodalInput], |
| fusion_strategy: FusionStrategy = FusionStrategy.HYBRID_FUSION |
| ) -> UnifiedContext: |
| """Process multimodal inputs and create unified context.""" |
| |
| try: |
| |
| processed_modalities = await self._process_individual_modalities(inputs) |
| |
| |
| aligned_modalities = await self._align_modalities(processed_modalities) |
| |
| |
| fusion_func = self.fusion_strategies.get(fusion_strategy) |
| if not fusion_func: |
| fusion_strategy = FusionStrategy.HYBRID_FUSION |
| fusion_func = self.fusion_strategies[fusion_strategy] |
| |
| unified_context = await fusion_func(aligned_modalities) |
| |
| |
| validated_context = await self._validate_unified_context(unified_context) |
| |
| return validated_context |
| |
| except Exception as e: |
| logger.error(f"Multimodal processing failed: {e}") |
| return UnifiedContext( |
| id=f"error_context_{int(time.time())}", |
| source_inputs=[inp.id for inp in inputs], |
| fused_representation={"error": str(e)}, |
| modality_contributions={}, |
| temporal_alignment={}, |
| semantic_consistency=0.0, |
| fusion_strategy=fusion_strategy, |
| confidence_aggregate=0.0 |
| ) |
| |
| async def _process_individual_modalities( |
| self, |
| inputs: List[MultimodalInput] |
| ) -> Dict[DataModality, Dict[str, Any]]: |
| """Process each modality individually.""" |
| |
| processed_modalities = {} |
| |
| |
| modality_groups = defaultdict(list) |
| for input_data in inputs: |
| modality_groups[input_data.modality].append(input_data) |
| |
| |
| for modality, modality_inputs in modality_groups.items(): |
| processor = self.modal_processors.get(modality) |
| if processor: |
| try: |
| processed_result = await processor.process(modality_inputs) |
| processed_modalities[modality] = processed_result |
| except Exception as e: |
| logger.error(f"Failed to process {modality.value} modality: {e}") |
| processed_modalities[modality] = { |
| "status": "error", |
| "error": str(e), |
| "inputs": [inp.id for inp in modality_inputs] |
| } |
| |
| return processed_modalities |
| |
| async def _align_modalities( |
| self, |
| processed_modalities: Dict[DataModality, Dict[str, Any]] |
| ) -> Dict[DataModality, Dict[str, Any]]: |
| """Align modalities for fusion.""" |
| |
| aligned_modalities = {} |
| |
| |
| temporal_alignment = await self.alignment_algorithms["temporal"](processed_modalities) |
| |
| |
| semantic_alignment = await self.alignment_algorithms["semantic"](processed_modalities) |
| |
| |
| structural_alignment = await self.alignment_algorithms["structural"](processed_modalities) |
| |
| |
| for modality, processed_data in processed_modalities.items(): |
| if processed_data.get("status") == "success": |
| aligned_data = processed_data.copy() |
| aligned_data["alignment"] = { |
| "temporal": temporal_alignment.get(modality, {}), |
| "semantic": semantic_alignment.get(modality, {}), |
| "structural": structural_alignment.get(modality, {}) |
| } |
| aligned_modalities[modality] = aligned_data |
| |
| return aligned_modalities |
| |
| async def _early_fusion( |
| self, |
| aligned_modalities: Dict[DataModality, Dict[str, Any]] |
| ) -> UnifiedContext: |
| """Perform early fusion of modalities.""" |
| |
| |
| fused_features = {} |
| modality_contributions = {} |
| confidence_scores = [] |
| |
| for modality, data in aligned_modalities.items(): |
| if data.get("status") == "success": |
| |
| features = data.get("features", {}) |
| fused_features[modality.value] = features |
| modality_contributions[modality.value] = data.get("confidence", 0.5) |
| confidence_scores.append(data.get("confidence", 0.5)) |
| |
| |
| unified_representation = { |
| "fusion_type": "early_fusion", |
| "modality_features": fused_features, |
| "combined_embedding": await self._combine_embeddings(fused_features), |
| "cross_modal_patterns": await self._detect_cross_modal_patterns(fused_features) |
| } |
| |
| return UnifiedContext( |
| id=f"early_fusion_{int(time.time())}", |
| source_inputs=list(fused_features.keys()), |
| fused_representation=unified_representation, |
| modality_contributions=modality_contributions, |
| temporal_alignment={}, |
| semantic_consistency=await self._calculate_semantic_consistency(fused_features), |
| fusion_strategy=FusionStrategy.EARLY_FUSION, |
| confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 |
| ) |
| |
| async def _late_fusion( |
| self, |
| aligned_modalities: Dict[DataModality, Dict[str, Any]] |
| ) -> UnifiedContext: |
| """Perform late fusion of modalities.""" |
| |
| |
| high_level_representations = {} |
| modality_contributions = {} |
| confidence_scores = [] |
| |
| for modality, data in aligned_modalities.items(): |
| if data.get("status") == "success": |
| |
| representation = data.get("semantic_representation", {}) |
| high_level_representations[modality.value] = representation |
| modality_contributions[modality.value] = data.get("confidence", 0.5) |
| confidence_scores.append(data.get("confidence", 0.5)) |
| |
| |
| unified_representation = { |
| "fusion_type": "late_fusion", |
| "semantic_representations": high_level_representations, |
| "fused_semantics": await self._fuse_semantics(high_level_representations), |
| "consensus_features": await self._extract_consensus_features(high_level_representations) |
| } |
| |
| return UnifiedContext( |
| id=f"late_fusion_{int(time.time())}", |
| source_inputs=list(high_level_representations.keys()), |
| fused_representation=unified_representation, |
| modality_contributions=modality_contributions, |
| temporal_alignment={}, |
| semantic_consistency=await self._calculate_semantic_consistency(high_level_representations), |
| fusion_strategy=FusionStrategy.LATE_FUSION, |
| confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 |
| ) |
| |
| async def _hybrid_fusion( |
| self, |
| aligned_modalities: Dict[DataModality, Dict[str, Any]] |
| ) -> UnifiedContext: |
| """Perform hybrid fusion combining early and late fusion.""" |
| |
| |
| early_fused = await self._early_fusion(aligned_modalities) |
| |
| |
| late_fused = await self._late_fusion(aligned_modalities) |
| |
| |
| hybrid_representation = { |
| "fusion_type": "hybrid_fusion", |
| "early_fusion": early_fused.fused_representation, |
| "late_fusion": late_fused.fused_representation, |
| "combined_features": await self._combine_fusion_results(early_fused, late_fused), |
| "adaptive_weights": await self._calculate_adaptive_weights(aligned_modalities) |
| } |
| |
| |
| combined_contributions = {} |
| for modality in aligned_modalities.keys(): |
| early_contrib = early_fused.modality_contributions.get(modality.value, 0) |
| late_contrib = late_fused.modality_contributions.get(modality.value, 0) |
| combined_contributions[modality.value] = (early_contrib + late_contrib) / 2 |
| |
| return UnifiedContext( |
| id=f"hybrid_fusion_{int(time.time())}", |
| source_inputs=list(combined_contributions.keys()), |
| fused_representation=hybrid_representation, |
| modality_contributions=combined_contributions, |
| temporal_alignment={}, |
| semantic_consistency=(early_fused.semantic_consistency + late_fused.semantic_consistency) / 2, |
| fusion_strategy=FusionStrategy.HYBRID_FUSION, |
| confidence_aggregate=(early_fused.confidence_aggregate + late_fused.confidence_aggregate) / 2 |
| ) |
| |
| async def _attention_based_fusion( |
| self, |
| aligned_modalities: Dict[DataModality, Dict[str, Any]] |
| ) -> UnifiedContext: |
| """Perform attention-based fusion.""" |
| |
| |
| attention_weights = await self._calculate_attention_weights(aligned_modalities) |
| |
| |
| fused_features = {} |
| modality_contributions = {} |
| confidence_scores = [] |
| |
| for modality, data in aligned_modalities.items(): |
| if data.get("status") == "success": |
| modality_weight = attention_weights.get(modality, 0.5) |
| features = data.get("features", {}) |
| |
| |
| weighted_features = {} |
| for feature_name, feature_value in features.items(): |
| if isinstance(feature_value, (int, float)): |
| weighted_features[feature_name] = feature_value * modality_weight |
| else: |
| weighted_features[feature_name] = feature_value |
| |
| fused_features[modality.value] = weighted_features |
| modality_contributions[modality.value] = modality_weight |
| confidence_scores.append(data.get("confidence", 0.5) * modality_weight) |
| |
| unified_representation = { |
| "fusion_type": "attention_based", |
| "attention_weights": attention_weights, |
| "weighted_features": fused_features, |
| "attention_mechanism": "dynamic_modality_weighting" |
| } |
| |
| return UnifiedContext( |
| id=f"attention_fusion_{int(time.time())}", |
| source_inputs=list(fused_features.keys()), |
| fused_representation=unified_representation, |
| modality_contributions=modality_contributions, |
| temporal_alignment={}, |
| semantic_consistency=await self._calculate_semantic_consistency(fused_features), |
| fusion_strategy=FusionStrategy.ATTENTION_BASED, |
| confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 |
| ) |
| |
| async def _cross_attention_fusion( |
| self, |
| aligned_modalities: Dict[DataModality, Dict[str, Any]] |
| ) -> UnifiedContext: |
| """Perform cross-attention fusion.""" |
| |
| |
| cross_attention_matrices = await self._calculate_cross_attention(aligned_modalities) |
| |
| |
| fused_representations = {} |
| modality_contributions = {} |
| confidence_scores = [] |
| |
| for modality, data in aligned_modalities.items(): |
| if data.get("status") == "success": |
| |
| cross_attention = cross_attention_matrices.get(modality, {}) |
| features = data.get("features", {}) |
| |
| |
| attended_features = {} |
| for feature_name, feature_value in features.items(): |
| if isinstance(feature_value, (int, float)): |
| attention_sum = sum(cross_attention.get(other_mod, 0) |
| for other_mod in aligned_modalities.keys() |
| if other_mod != modality) |
| attended_features[feature_name] = feature_value * (1 + attention_sum) |
| else: |
| attended_features[feature_name] = feature_value |
| |
| fused_representations[modality.value] = attended_features |
| modality_contributions[modality.value] = data.get("confidence", 0.5) |
| confidence_scores.append(data.get("confidence", 0.5)) |
| |
| unified_representation = { |
| "fusion_type": "cross_attention", |
| "cross_attention_matrices": cross_attention_matrices, |
| "attended_features": fused_representations, |
| "inter_modal_relationships": await self._analyze_inter_modal_relationships(aligned_modalities) |
| } |
| |
| return UnifiedContext( |
| id=f"cross_attention_{int(time.time())}", |
| source_inputs=list(fused_representations.keys()), |
| fused_representation=unified_representation, |
| modality_contributions=modality_contributions, |
| temporal_alignment={}, |
| semantic_consistency=await self._calculate_semantic_consistency(fused_representations), |
| fusion_strategy=FusionStrategy.CROSS_ATTENTION, |
| confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 |
| ) |
| |
| async def _validate_unified_context(self, context: UnifiedContext) -> UnifiedContext: |
| """Validate and enhance unified context.""" |
| |
| |
| issues = [] |
| |
| if context.semantic_consistency < 0.3: |
| issues.append("Low semantic consistency detected") |
| |
| if context.confidence_aggregate < 0.4: |
| issues.append("Low aggregate confidence") |
| |
| if len(context.source_inputs) < 2: |
| issues.append("Insufficient modalities for robust fusion") |
| |
| |
| if issues: |
| context.fused_representation["validation_issues"] = issues |
| context.fused_representation["enhancement_applied"] = True |
| |
| |
| if context.semantic_consistency < 0.5: |
| context.semantic_consistency = min(0.8, context.semantic_consistency * 1.2) |
| |
| if context.confidence_aggregate < 0.5: |
| context.confidence_aggregate = min(0.8, context.confidence_aggregate * 1.1) |
| |
| return context |
| |
| |
| |
| async def _combine_embeddings(self, features: Dict[str, Any]) -> Dict[str, Any]: |
| """Combine embeddings from different modalities.""" |
| combined = {} |
| for modality, modality_features in features.items(): |
| for feature_name, feature_value in modality_features.items(): |
| combined_key = f"{modality}_{feature_name}" |
| combined[combined_key] = feature_value |
| return combined |
| |
| async def _detect_cross_modal_patterns(self, features: Dict[str, Any]) -> List[Dict[str, Any]]: |
| """Detect patterns across modalities.""" |
| patterns = [] |
| modalities = list(features.keys()) |
| |
| |
| for i, mod1 in enumerate(modalities): |
| for mod2 in modalities[i+1:]: |
| |
| mod1_features = features[mod1] |
| mod2_features = features[mod2] |
| |
| common_features = set(mod1_features.keys()) & set(mod2_features.keys()) |
| if common_features: |
| patterns.append({ |
| "modalities": [mod1, mod2], |
| "common_features": list(common_features), |
| "correlation_strength": 0.7 |
| }) |
| |
| return patterns |
| |
| async def _fuse_semantics(self, representations: Dict[str, Any]) -> Dict[str, Any]: |
| """Fuse semantic representations.""" |
| |
| fused_semantics = {} |
| |
| |
| all_semantics = [] |
| for modality, representation in representations.items(): |
| if isinstance(representation, dict): |
| all_semantics.extend(representation.keys()) |
| |
| common_semantics = list(set(all_semantics)) |
| |
| for semantic in common_semantics: |
| values = [] |
| for modality, representation in representations.items(): |
| if semantic in representation: |
| values.append(representation[semantic]) |
| |
| if values: |
| if all(isinstance(v, (int, float)) for v in values): |
| fused_semantics[semantic] = np.mean(values) |
| else: |
| fused_semantics[semantic] = values[0] |
| |
| return fused_semantics |
| |
| async def _extract_consensus_features(self, representations: Dict[str, Any]) -> Dict[str, Any]: |
| """Extract features with high consensus across modalities.""" |
| consensus_features = {} |
| |
| |
| feature_counts = defaultdict(int) |
| for modality, representation in representations.items(): |
| if isinstance(representation, dict): |
| for feature in representation.keys(): |
| feature_counts[feature] += 1 |
| |
| |
| threshold = len(representations) * 0.5 |
| consensus_features = { |
| feature: self._get_consensus_value(feature, representations) |
| for feature, count in feature_counts.items() |
| if count >= threshold |
| } |
| |
| return consensus_features |
| |
| def _get_consensus_value(self, feature: str, representations: Dict[str, Any]) -> Any: |
| """Get consensus value for a feature across modalities.""" |
| values = [] |
| for modality, representation in representations.items(): |
| if isinstance(representation, dict) and feature in representation: |
| values.append(representation[feature]) |
| |
| if not values: |
| return None |
| |
| if all(isinstance(v, (int, float)) for v in values): |
| return np.mean(values) |
| else: |
| |
| from collections import Counter |
| value_counts = Counter(values) |
| return value_counts.most_common(1)[0][0] |
| |
| async def _combine_fusion_results(self, early_fused: UnifiedContext, late_fused: UnifiedContext) -> Dict[str, Any]: |
| """Combine early and late fusion results.""" |
| return { |
| "early_features": early_fused.fused_representation.get("combined_embedding", {}), |
| "late_semantics": late_fused.fused_representation.get("fused_semantics", {}), |
| "combined_score": (early_fused.confidence_aggregate + late_fused.confidence_aggregate) / 2 |
| } |
| |
| async def _calculate_adaptive_weights(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[str, float]: |
| """Calculate adaptive weights for modalities.""" |
| weights = {} |
| |
| for modality, data in modalities.items(): |
| if data.get("status") == "success": |
| |
| confidence = data.get("confidence", 0.5) |
| quality = data.get("quality_score", 0.5) |
| weights[modality] = (confidence + quality) / 2 |
| else: |
| weights[modality] = 0.1 |
| |
| |
| total_weight = sum(weights.values()) |
| if total_weight > 0: |
| weights = {k: v / total_weight for k, v in weights.items()} |
| |
| return weights |
| |
| async def _calculate_attention_weights(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, float]: |
| """Calculate attention weights for modalities.""" |
| weights = {} |
| |
| for modality, data in modalities.items(): |
| if data.get("status") == "success": |
| |
| confidence = data.get("confidence", 0.5) |
| info_content = data.get("information_content", 0.5) |
| weights[modality] = confidence * info_content |
| else: |
| weights[modality] = 0.1 |
| |
| |
| total_weight = sum(weights.values()) |
| if total_weight > 0: |
| weights = {k: v / total_weight for k, v in weights.items()} |
| |
| return weights |
| |
| async def _calculate_cross_attention(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[DataModality, float]]: |
| """Calculate cross-attention between modalities.""" |
| cross_attention = {} |
| |
| modalities_list = list(modalities.keys()) |
| |
| for i, mod1 in enumerate(modalities_list): |
| cross_attention[mod1] = {} |
| for mod2 in modalities_list: |
| if mod1 != mod2: |
| |
| similarity = await self._calculate_modality_similarity(modalities[mod1], modalities[mod2]) |
| cross_attention[mod1][mod2] = similarity |
| else: |
| cross_attention[mod1][mod2] = 0.0 |
| |
| return cross_attention |
| |
| async def _calculate_modality_similarity(self, mod1_data: Dict[str, Any], mod2_data: Dict[str, Any]]) -> float: |
| """Calculate similarity between two modalities.""" |
| if mod1_data.get("status") != "success" or mod2_data.get("status") != "success": |
| return 0.0 |
| |
| |
| conf1 = mod1_data.get("confidence", 0.5) |
| conf2 = mod2_data.get("confidence", 0.5) |
| |
| |
| similarity = 1 - abs(conf1 - conf2) |
| return max(0.0, similarity) |
| |
| async def _analyze_inter_modal_relationships(self, modalities: Dict[DataModality, Dict[str, Any]]) -> List[Dict[str, Any]]: |
| """Analyze relationships between modalities.""" |
| relationships = [] |
| |
| modalities_list = list(modalities.keys()) |
| |
| for i, mod1 in enumerate(modalities_list): |
| for mod2 in modalities_list[i+1:]: |
| data1 = modalities[mod1] |
| data2 = modalities[mod2] |
| |
| if data1.get("status") == "success" and data2.get("status") == "success": |
| similarity = await self._calculate_modality_similarity(data1, data2) |
| |
| relationships.append({ |
| "modalities": [mod1.value, mod2.value], |
| "relationship_type": "complementary" if similarity > 0.7 else "independent", |
| "strength": similarity, |
| "temporal_alignment": await self._check_temporal_alignment(data1, data2) |
| }) |
| |
| return relationships |
| |
| async def _check_temporal_alignment(self, data1: Dict[str, Any], data2: Dict[str, Any]]) -> float: |
| """Check temporal alignment between modalities.""" |
| |
| timestamp1 = data1.get("timestamp", datetime.utcnow()) |
| timestamp2 = data2.get("timestamp", datetime.utcnow()) |
| |
| time_diff = abs((timestamp1 - timestamp2).total_seconds()) |
| |
| alignment_score = max(0, 1 - time_diff / 3600) |
| return alignment_score |
| |
| async def _calculate_semantic_consistency(self, representations: Dict[str, Any]]) -> float: |
| """Calculate semantic consistency across modalities.""" |
| if not representations: |
| return 0.0 |
| |
| |
| consistency_scores = [] |
| |
| |
| all_semantics = [] |
| for modality, representation in representations.items(): |
| if isinstance(representation, dict): |
| all_semantics.append(set(representation.keys())) |
| |
| if len(all_semantics) > 1: |
| |
| for i in range(len(all_semantics)): |
| for j in range(i+1, len(all_semantics)): |
| intersection = len(all_semantics[i] & all_semantics[j]) |
| union = len(all_semantics[i] | all_semantics[j]) |
| if union > 0: |
| consistency_scores.append(intersection / union) |
| |
| return np.mean(consistency_scores) if consistency_scores else 0.5 |
| |
| |
| |
| async def _temporal_alignment(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[str, Any]]: |
| """Align modalities temporally.""" |
| alignment_results = {} |
| |
| for modality, data in modalities.items(): |
| if data.get("status") == "success": |
| timestamp = data.get("timestamp", datetime.utcnow()) |
| alignment_results[modality] = { |
| "timestamp": timestamp.isoformat(), |
| "time_category": self._categorize_time(timestamp), |
| "temporal_priority": self._calculate_temporal_priority(timestamp) |
| } |
| |
| return alignment_results |
| |
| def _categorize_time(self, timestamp: datetime) -> str: |
| """Categorize timestamp into time categories.""" |
| now = datetime.utcnow() |
| age_seconds = (now - timestamp).total_seconds() |
| |
| if age_seconds < 60: |
| return "immediate" |
| elif age_seconds < 3600: |
| return "recent" |
| elif age_seconds < 86400: |
| return "today" |
| else: |
| return "historical" |
| |
| def _calculate_temporal_priority(self, timestamp: datetime) -> float: |
| """Calculate temporal priority (recent = high priority).""" |
| now = datetime.utcnow() |
| age_seconds = (now - timestamp).total_seconds() |
| return max(0, 1 - age_seconds / 86400) |
| |
| async def _semantic_alignment(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[str, Any]]: |
| """Align modalities semantically.""" |
| alignment_results = {} |
| |
| for modality, data in modalities.items(): |
| if data.get("status") == "success": |
| features = data.get("features", {}) |
| semantic_tags = data.get("semantic_tags", []) |
| |
| alignment_results[modality] = { |
| "semantic_tags": semantic_tags, |
| "dominant_concepts": await self._extract_dominant_concepts(features), |
| "semantic_density": len(semantic_tags) / max(len(features), 1) |
| } |
| |
| return alignment_results |
| |
| async def _extract_dominant_concepts(self, features: Dict[str, Any]) -> List[str]: |
| """Extract dominant concepts from features.""" |
| |
| concepts = [] |
| |
| for feature_name, feature_value in features.items(): |
| if isinstance(feature_value, str) and len(feature_value) > 3: |
| concepts.append(feature_value.lower()) |
| elif isinstance(feature_name, str) and len(feature_name) > 3: |
| concepts.append(feature_name.lower()) |
| |
| return list(set(concepts))[:5] |
| |
| async def _structural_alignment(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[str, Any]]: |
| """Align modalities structurally.""" |
| alignment_results = {} |
| |
| for modality, data in modalities.items(): |
| if data.get("status") == "success": |
| features = data.get("features", {}) |
| |
| alignment_results[modality] = { |
| "structure_type": self._determine_structure_type(features), |
| "complexity_score": self._calculate_complexity_score(features), |
| "organization_pattern": self._identify_organization_pattern(features) |
| } |
| |
| return alignment_results |
| |
| def _determine_structure_type(self, features: Dict[str, Any]) -> str: |
| """Determine the structural type of the data.""" |
| if not features: |
| return "minimal" |
| |
| |
| if all(isinstance(v, (int, float)) for v in features.values()): |
| return "numerical" |
| elif all(isinstance(v, str) for v in features.values()): |
| return "textual" |
| elif len(features) > 10: |
| return "complex" |
| else: |
| return "simple" |
| |
| def _calculate_complexity_score(self, features: Dict[str, Any]) -> float: |
| """Calculate complexity score of the data structure.""" |
| if not features: |
| return 0.0 |
| |
| |
| type_counts = defaultdict(int) |
| for value in features.values(): |
| type_counts[type(value).__name__] += 1 |
| |
| type_diversity = len(type_counts) |
| feature_count = len(features) |
| |
| |
| complexity = (feature_count / 20) * 0.6 + (type_diversity / 4) * 0.4 |
| return min(1.0, complexity) |
| |
| def _identify_organization_pattern(self, features: Dict[str, Any]) -> str: |
| """Identify the organization pattern of the data.""" |
| if not features: |
| return "none" |
| |
| |
| feature_names = list(features.keys()) |
| |
| if any("time" in name.lower() or "date" in name.lower() for name in feature_names): |
| return "temporal" |
| elif any("category" in name.lower() or "type" in name.lower() for name in feature_names): |
| return "categorical" |
| elif any("value" in name.lower() or "amount" in name.lower() for name in feature_names): |
| return "quantitative" |
| else: |
| return "mixed" |
|
|
|
|
| |
|
|
| class TextProcessor: |
| """Processor for text modality.""" |
| |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| """Process text inputs.""" |
| if not inputs: |
| return {"status": "error", "error": "No text inputs"} |
| |
| |
| combined_text = " ".join([inp.content for inp in inputs if isinstance(inp.content, str)]) |
| |
| |
| features = { |
| "text_length": len(combined_text), |
| "word_count": len(combined_text.split()), |
| "sentence_count": combined_text.count('.') + combined_text.count('!') + combined_text.count('?'), |
| "complexity_score": self._calculate_text_complexity(combined_text) |
| } |
| |
| |
| semantic_representation = await self._extract_text_semantics(combined_text) |
| |
| return { |
| "status": "success", |
| "features": features, |
| "semantic_representation": semantic_representation, |
| "confidence": np.mean([inp.confidence for inp in inputs]), |
| "quality_score": np.mean([inp.quality_score for inp in inputs]), |
| "timestamp": max(inp.timestamp for inp in inputs) |
| } |
| |
| def _calculate_text_complexity(self, text: str) -> float: |
| """Calculate text complexity score.""" |
| if not text: |
| return 0.0 |
| |
| words = text.split() |
| avg_word_length = np.mean([len(word) for word in words]) if words else 0 |
| sentence_count = max(1, text.count('.') + text.count('!') + text.count('?')) |
| avg_sentence_length = len(words) / sentence_count |
| |
| |
| complexity = (avg_word_length / 10) * 0.4 + (avg_sentence_length / 20) * 0.6 |
| return min(1.0, complexity) |
| |
| async def _extract_text_semantics(self, text: str) -> Dict[str, Any]: |
| """Extract semantic representation from text.""" |
| |
| words = text.lower().split() |
| |
| |
| concepts = [] |
| for word in words: |
| if len(word) > 4: |
| concepts.append(word) |
| |
| |
| topics = [] |
| if any(word in text.lower() for word in ["business", "company", "revenue"]): |
| topics.append("business") |
| if any(word in text.lower() for word in ["technology", "system", "software"]): |
| topics.append("technology") |
| if any(word in text.lower() for word in ["data", "information", "analysis"]): |
| topics.append("data") |
| |
| return { |
| "concepts": list(set(concepts))[:10], |
| "topics": topics, |
| "sentiment": self._analyze_sentiment(text), |
| "entities": [] |
| } |
| |
| def _analyze_sentiment(self, text: str) -> str: |
| """Simple sentiment analysis.""" |
| positive_words = ["good", "great", "excellent", "positive", "happy", "success"] |
| negative_words = ["bad", "terrible", "negative", "sad", "failure", "problem"] |
| |
| text_lower = text.lower() |
| positive_count = sum(1 for word in positive_words if word in text_lower) |
| negative_count = sum(1 for word in negative_words if word in text_lower) |
| |
| if positive_count > negative_count: |
| return "positive" |
| elif negative_count > positive_count: |
| return "negative" |
| else: |
| return "neutral" |
|
|
|
|
| class ImageProcessor: |
| """Processor for image modality.""" |
| |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| """Process image inputs.""" |
| if not inputs: |
| return {"status": "error", "error": "No image inputs"} |
| |
| |
| image_input = inputs[0] |
| |
| |
| features = { |
| "image_size": image_input.metadata.get("size", 0), |
| "format": image_input.metadata.get("format", "unknown"), |
| "color_diversity": image_input.metadata.get("color_diversity", 0.5), |
| "complexity_score": image_input.metadata.get("complexity", 0.5) |
| } |
| |
| |
| semantic_representation = await self._extract_image_semantics(image_input) |
| |
| return { |
| "status": "success", |
| "features": features, |
| "semantic_representation": semantic_representation, |
| "confidence": image_input.confidence, |
| "quality_score": image_input.quality_score, |
| "timestamp": image_input.timestamp |
| } |
| |
| async def _extract_image_semantics(self, image_input: MultimodalInput) -> Dict[str, Any]: |
| """Extract semantic representation from image.""" |
| |
| metadata = image_input.metadata |
| |
| return { |
| "objects": metadata.get("objects", []), |
| "colors": metadata.get("dominant_colors", []), |
| "scenes": metadata.get("scene_types", []), |
| "text_content": metadata.get("extracted_text", ""), |
| "visual_concepts": metadata.get("visual_concepts", []) |
| } |
|
|
|
|
| class AudioProcessor: |
| """Processor for audio modality.""" |
| |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| """Process audio inputs.""" |
| if not inputs: |
| return {"status": "error", "error": "No audio inputs"} |
| |
| |
| audio_input = inputs[0] |
| |
| |
| features = { |
| "duration": audio_input.metadata.get("duration", 0), |
| "sample_rate": audio_input.metadata.get("sample_rate", 44100), |
| "channels": audio_input.metadata.get("channels", 1), |
| "frequency_content": audio_input.metadata.get("frequency_profile", {}) |
| } |
| |
| |
| semantic_representation = await self._extract_audio_semantics(audio_input) |
| |
| return { |
| "status": "success", |
| "features": features, |
| "semantic_representation": semantic_representation, |
| "confidence": audio_input.confidence, |
| "quality_score": audio_input.quality_score, |
| "timestamp": audio_input.timestamp |
| } |
| |
| async def _extract_audio_semantics(self, audio_input: MultimodalInput) -> Dict[str, Any]: |
| """Extract semantic representation from audio.""" |
| metadata = audio_input.metadata |
| |
| return { |
| "speech_content": metadata.get("transcribed_text", ""), |
| "speaker_count": metadata.get("speaker_count", 1), |
| "emotion": metadata.get("emotion", "neutral"), |
| "language": metadata.get("language", "unknown"), |
| "audio_quality": metadata.get("quality_score", 0.5) |
| } |
|
|
|
|
| |
| class VideoProcessor: |
| """Processor for video modality.""" |
| |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| return {"status": "success", "features": {}, "semantic_representation": {}} |
|
|
|
|
| class SensorProcessor: |
| """Processor for sensor modality.""" |
| |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| return {"status": "success", "features": {}, "semantic_representation": {}} |
|
|
|
|
| class TableProcessor: |
| """Processor for table modality.""" |
| |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| return {"status": "success", "features": {}, "semantic_representation": {}} |
|
|
|
|
| class CodeProcessor: |
| """Processor for code modality.""" |
| |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| return {"status": "success", "features": {}, "semantic_representation": {}} |
|
|
|
|
| class StructuredProcessor: |
| """Processor for structured data modality.""" |
| |
| async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
| return {"status": "success", "features": {}, "semantic_representation": {}} |
|
|
|
|
| |
| class MultimodalContextProcessor: |
| """Integrated multimodal context processing system.""" |
| |
| def __init__(self): |
| self.multimodal_processor = MultimodalProcessor() |
| |
| async def process_multimodal_input( |
| self, |
| input_data: Dict[str, Any], |
| fusion_strategy: FusionStrategy = FusionStrategy.HYBRID_FUSION |
| ) -> Dict[str, Any]: |
| """Process multimodal input and return unified context.""" |
| |
| |
| multimodal_inputs = [] |
| |
| for modality_str, content_list in input_data.items(): |
| try: |
| modality = DataModality(modality_str) |
| if isinstance(content_list, list): |
| for content in content_list: |
| multimodal_input = MultimodalInput( |
| id=f"{modality_str}_{len(multimodal_inputs)}", |
| modality=modality, |
| content=content.get("content", content), |
| metadata=content.get("metadata", {}), |
| timestamp=datetime.utcnow(), |
| quality_score=content.get("quality_score", 0.8), |
| confidence=content.get("confidence", 0.8) |
| ) |
| multimodal_inputs.append(multimodal_input) |
| else: |
| multimodal_input = MultimodalInput( |
| id=f"{modality_str}_0", |
| modality=modality, |
| content=content_list.get("content", content_list), |
| metadata=content_list.get("metadata", {}), |
| timestamp=datetime.utcnow(), |
| quality_score=content_list.get("quality_score", 0.8), |
| confidence=content_list.get("confidence", 0.8) |
| ) |
| multimodal_inputs.append(multimodal_input) |
| except ValueError: |
| logger.warning(f"Unknown modality: {modality_str}") |
| |
| |
| unified_context = await self.multimodal_processor.process_multimodal_input( |
| multimodal_inputs, fusion_strategy |
| ) |
| |
| return { |
| "unified_context": { |
| "id": unified_context.id, |
| "fusion_strategy": unified_context.fusion_strategy.value, |
| "modality_contributions": unified_context.modality_contributions, |
| "semantic_consistency": unified_context.semantic_consistency, |
| "confidence_aggregate": unified_context.confidence_aggregate, |
| "fused_representation": unified_context.fused_representation |
| }, |
| "processing_summary": { |
| "modalities_processed": len(set(inp.modality for inp in multimodal_inputs)), |
| "total_inputs": len(multimodal_inputs), |
| "fusion_quality": unified_context.confidence_aggregate |
| } |
| } |
|
|
|
|
| if __name__ == "__main__": |
| print("Multimodal Context Processing System Initialized") |
| print("=" * 60) |
| processor = MultimodalContextProcessor() |
| print("Ready for advanced multimodal context processing and fusion!") |