|
|
""" |
|
|
Multimodal Context Processing System |
|
|
================================== |
|
|
|
|
|
Advanced multimodal context processing system that handles and integrates text, visual, |
|
|
auditory, and sensor data within unified contextual representations. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import json |
|
|
import logging |
|
|
import base64 |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, List, Any, Optional, Tuple, Union, Set |
|
|
from dataclasses import dataclass, field |
|
|
from enum import Enum |
|
|
import numpy as np |
|
|
from collections import defaultdict, deque |
|
|
|
|
|
from ai_agent_framework.core.context_engineering_agent import ( |
|
|
ContextElement, ContextModality, ContextDimension |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class DataModality(Enum): |
|
|
"""Supported data modalities.""" |
|
|
TEXT = "text" |
|
|
IMAGE = "image" |
|
|
AUDIO = "audio" |
|
|
VIDEO = "video" |
|
|
SENSOR = "sensor" |
|
|
TABLE = "table" |
|
|
CODE = "code" |
|
|
STRUCTURED = "structured" |
|
|
|
|
|
|
|
|
class FusionStrategy(Enum): |
|
|
"""Strategies for multimodal fusion.""" |
|
|
EARLY_FUSION = "early_fusion" |
|
|
LATE_FUSION = "late_fusion" |
|
|
HYBRID_FUSION = "hybrid_fusion" |
|
|
ATTENTION_BASED = "attention_based" |
|
|
CROSS_ATTENTION = "cross_attention" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class MultimodalInput: |
|
|
"""Represents multimodal input data.""" |
|
|
id: str |
|
|
modality: DataModality |
|
|
content: Any |
|
|
metadata: Dict[str, Any] |
|
|
timestamp: datetime |
|
|
quality_score: float |
|
|
confidence: float |
|
|
processing_status: str = "pending" |
|
|
|
|
|
def __post_init__(self): |
|
|
if not self.id: |
|
|
self.id = f"mm_input_{int(time.time())}_{hash(str(self.content))}" |
|
|
if not self.timestamp: |
|
|
self.timestamp = datetime.utcnow() |
|
|
if not self.metadata: |
|
|
self.metadata = {} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class UnifiedContext: |
|
|
"""Unified contextual representation from multimodal inputs.""" |
|
|
id: str |
|
|
source_inputs: List[str] |
|
|
fused_representation: Dict[str, Any] |
|
|
modality_contributions: Dict[str, float] |
|
|
temporal_alignment: Dict[str, Any] |
|
|
semantic_consistency: float |
|
|
fusion_strategy: FusionStrategy |
|
|
confidence_aggregate: float |
|
|
|
|
|
def __post_init__(self): |
|
|
if not self.id: |
|
|
self.id = f"unified_context_{int(time.time())}" |
|
|
|
|
|
|
|
|
class MultimodalProcessor: |
|
|
"""Core multimodal processing engine.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.modal_processors = { |
|
|
DataModality.TEXT: TextProcessor(), |
|
|
DataModality.IMAGE: ImageProcessor(), |
|
|
DataModality.AUDIO: AudioProcessor(), |
|
|
DataModality.VIDEO: VideoProcessor(), |
|
|
DataModality.SENSOR: SensorProcessor(), |
|
|
DataModality.TABLE: TableProcessor(), |
|
|
DataModality.CODE: CodeProcessor(), |
|
|
DataModality.STRUCTURED: StructuredProcessor() |
|
|
} |
|
|
|
|
|
self.fusion_strategies = { |
|
|
FusionStrategy.EARLY_FUSION: self._early_fusion, |
|
|
FusionStrategy.LATE_FUSION: self._late_fusion, |
|
|
FusionStrategy.HYBRID_FUSION: self._hybrid_fusion, |
|
|
FusionStrategy.ATTENTION_BASED: self._attention_based_fusion, |
|
|
FusionStrategy.CROSS_ATTENTION: self._cross_attention_fusion |
|
|
} |
|
|
|
|
|
self.alignment_algorithms = { |
|
|
"temporal": self._temporal_alignment, |
|
|
"semantic": self._semantic_alignment, |
|
|
"structural": self._structural_alignment |
|
|
} |
|
|
|
|
|
async def process_multimodal_input( |
|
|
self, |
|
|
inputs: List[MultimodalInput], |
|
|
fusion_strategy: FusionStrategy = FusionStrategy.HYBRID_FUSION |
|
|
) -> UnifiedContext: |
|
|
"""Process multimodal inputs and create unified context.""" |
|
|
|
|
|
try: |
|
|
|
|
|
processed_modalities = await self._process_individual_modalities(inputs) |
|
|
|
|
|
|
|
|
aligned_modalities = await self._align_modalities(processed_modalities) |
|
|
|
|
|
|
|
|
fusion_func = self.fusion_strategies.get(fusion_strategy) |
|
|
if not fusion_func: |
|
|
fusion_strategy = FusionStrategy.HYBRID_FUSION |
|
|
fusion_func = self.fusion_strategies[fusion_strategy] |
|
|
|
|
|
unified_context = await fusion_func(aligned_modalities) |
|
|
|
|
|
|
|
|
validated_context = await self._validate_unified_context(unified_context) |
|
|
|
|
|
return validated_context |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Multimodal processing failed: {e}") |
|
|
return UnifiedContext( |
|
|
id=f"error_context_{int(time.time())}", |
|
|
source_inputs=[inp.id for inp in inputs], |
|
|
fused_representation={"error": str(e)}, |
|
|
modality_contributions={}, |
|
|
temporal_alignment={}, |
|
|
semantic_consistency=0.0, |
|
|
fusion_strategy=fusion_strategy, |
|
|
confidence_aggregate=0.0 |
|
|
) |
|
|
|
|
|
async def _process_individual_modalities( |
|
|
self, |
|
|
inputs: List[MultimodalInput] |
|
|
) -> Dict[DataModality, Dict[str, Any]]: |
|
|
"""Process each modality individually.""" |
|
|
|
|
|
processed_modalities = {} |
|
|
|
|
|
|
|
|
modality_groups = defaultdict(list) |
|
|
for input_data in inputs: |
|
|
modality_groups[input_data.modality].append(input_data) |
|
|
|
|
|
|
|
|
for modality, modality_inputs in modality_groups.items(): |
|
|
processor = self.modal_processors.get(modality) |
|
|
if processor: |
|
|
try: |
|
|
processed_result = await processor.process(modality_inputs) |
|
|
processed_modalities[modality] = processed_result |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to process {modality.value} modality: {e}") |
|
|
processed_modalities[modality] = { |
|
|
"status": "error", |
|
|
"error": str(e), |
|
|
"inputs": [inp.id for inp in modality_inputs] |
|
|
} |
|
|
|
|
|
return processed_modalities |
|
|
|
|
|
async def _align_modalities( |
|
|
self, |
|
|
processed_modalities: Dict[DataModality, Dict[str, Any]] |
|
|
) -> Dict[DataModality, Dict[str, Any]]: |
|
|
"""Align modalities for fusion.""" |
|
|
|
|
|
aligned_modalities = {} |
|
|
|
|
|
|
|
|
temporal_alignment = await self.alignment_algorithms["temporal"](processed_modalities) |
|
|
|
|
|
|
|
|
semantic_alignment = await self.alignment_algorithms["semantic"](processed_modalities) |
|
|
|
|
|
|
|
|
structural_alignment = await self.alignment_algorithms["structural"](processed_modalities) |
|
|
|
|
|
|
|
|
for modality, processed_data in processed_modalities.items(): |
|
|
if processed_data.get("status") == "success": |
|
|
aligned_data = processed_data.copy() |
|
|
aligned_data["alignment"] = { |
|
|
"temporal": temporal_alignment.get(modality, {}), |
|
|
"semantic": semantic_alignment.get(modality, {}), |
|
|
"structural": structural_alignment.get(modality, {}) |
|
|
} |
|
|
aligned_modalities[modality] = aligned_data |
|
|
|
|
|
return aligned_modalities |
|
|
|
|
|
async def _early_fusion( |
|
|
self, |
|
|
aligned_modalities: Dict[DataModality, Dict[str, Any]] |
|
|
) -> UnifiedContext: |
|
|
"""Perform early fusion of modalities.""" |
|
|
|
|
|
|
|
|
fused_features = {} |
|
|
modality_contributions = {} |
|
|
confidence_scores = [] |
|
|
|
|
|
for modality, data in aligned_modalities.items(): |
|
|
if data.get("status") == "success": |
|
|
|
|
|
features = data.get("features", {}) |
|
|
fused_features[modality.value] = features |
|
|
modality_contributions[modality.value] = data.get("confidence", 0.5) |
|
|
confidence_scores.append(data.get("confidence", 0.5)) |
|
|
|
|
|
|
|
|
unified_representation = { |
|
|
"fusion_type": "early_fusion", |
|
|
"modality_features": fused_features, |
|
|
"combined_embedding": await self._combine_embeddings(fused_features), |
|
|
"cross_modal_patterns": await self._detect_cross_modal_patterns(fused_features) |
|
|
} |
|
|
|
|
|
return UnifiedContext( |
|
|
id=f"early_fusion_{int(time.time())}", |
|
|
source_inputs=list(fused_features.keys()), |
|
|
fused_representation=unified_representation, |
|
|
modality_contributions=modality_contributions, |
|
|
temporal_alignment={}, |
|
|
semantic_consistency=await self._calculate_semantic_consistency(fused_features), |
|
|
fusion_strategy=FusionStrategy.EARLY_FUSION, |
|
|
confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 |
|
|
) |
|
|
|
|
|
async def _late_fusion( |
|
|
self, |
|
|
aligned_modalities: Dict[DataModality, Dict[str, Any]] |
|
|
) -> UnifiedContext: |
|
|
"""Perform late fusion of modalities.""" |
|
|
|
|
|
|
|
|
high_level_representations = {} |
|
|
modality_contributions = {} |
|
|
confidence_scores = [] |
|
|
|
|
|
for modality, data in aligned_modalities.items(): |
|
|
if data.get("status") == "success": |
|
|
|
|
|
representation = data.get("semantic_representation", {}) |
|
|
high_level_representations[modality.value] = representation |
|
|
modality_contributions[modality.value] = data.get("confidence", 0.5) |
|
|
confidence_scores.append(data.get("confidence", 0.5)) |
|
|
|
|
|
|
|
|
unified_representation = { |
|
|
"fusion_type": "late_fusion", |
|
|
"semantic_representations": high_level_representations, |
|
|
"fused_semantics": await self._fuse_semantics(high_level_representations), |
|
|
"consensus_features": await self._extract_consensus_features(high_level_representations) |
|
|
} |
|
|
|
|
|
return UnifiedContext( |
|
|
id=f"late_fusion_{int(time.time())}", |
|
|
source_inputs=list(high_level_representations.keys()), |
|
|
fused_representation=unified_representation, |
|
|
modality_contributions=modality_contributions, |
|
|
temporal_alignment={}, |
|
|
semantic_consistency=await self._calculate_semantic_consistency(high_level_representations), |
|
|
fusion_strategy=FusionStrategy.LATE_FUSION, |
|
|
confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 |
|
|
) |
|
|
|
|
|
async def _hybrid_fusion( |
|
|
self, |
|
|
aligned_modalities: Dict[DataModality, Dict[str, Any]] |
|
|
) -> UnifiedContext: |
|
|
"""Perform hybrid fusion combining early and late fusion.""" |
|
|
|
|
|
|
|
|
early_fused = await self._early_fusion(aligned_modalities) |
|
|
|
|
|
|
|
|
late_fused = await self._late_fusion(aligned_modalities) |
|
|
|
|
|
|
|
|
hybrid_representation = { |
|
|
"fusion_type": "hybrid_fusion", |
|
|
"early_fusion": early_fused.fused_representation, |
|
|
"late_fusion": late_fused.fused_representation, |
|
|
"combined_features": await self._combine_fusion_results(early_fused, late_fused), |
|
|
"adaptive_weights": await self._calculate_adaptive_weights(aligned_modalities) |
|
|
} |
|
|
|
|
|
|
|
|
combined_contributions = {} |
|
|
for modality in aligned_modalities.keys(): |
|
|
early_contrib = early_fused.modality_contributions.get(modality.value, 0) |
|
|
late_contrib = late_fused.modality_contributions.get(modality.value, 0) |
|
|
combined_contributions[modality.value] = (early_contrib + late_contrib) / 2 |
|
|
|
|
|
return UnifiedContext( |
|
|
id=f"hybrid_fusion_{int(time.time())}", |
|
|
source_inputs=list(combined_contributions.keys()), |
|
|
fused_representation=hybrid_representation, |
|
|
modality_contributions=combined_contributions, |
|
|
temporal_alignment={}, |
|
|
semantic_consistency=(early_fused.semantic_consistency + late_fused.semantic_consistency) / 2, |
|
|
fusion_strategy=FusionStrategy.HYBRID_FUSION, |
|
|
confidence_aggregate=(early_fused.confidence_aggregate + late_fused.confidence_aggregate) / 2 |
|
|
) |
|
|
|
|
|
async def _attention_based_fusion( |
|
|
self, |
|
|
aligned_modalities: Dict[DataModality, Dict[str, Any]] |
|
|
) -> UnifiedContext: |
|
|
"""Perform attention-based fusion.""" |
|
|
|
|
|
|
|
|
attention_weights = await self._calculate_attention_weights(aligned_modalities) |
|
|
|
|
|
|
|
|
fused_features = {} |
|
|
modality_contributions = {} |
|
|
confidence_scores = [] |
|
|
|
|
|
for modality, data in aligned_modalities.items(): |
|
|
if data.get("status") == "success": |
|
|
modality_weight = attention_weights.get(modality, 0.5) |
|
|
features = data.get("features", {}) |
|
|
|
|
|
|
|
|
weighted_features = {} |
|
|
for feature_name, feature_value in features.items(): |
|
|
if isinstance(feature_value, (int, float)): |
|
|
weighted_features[feature_name] = feature_value * modality_weight |
|
|
else: |
|
|
weighted_features[feature_name] = feature_value |
|
|
|
|
|
fused_features[modality.value] = weighted_features |
|
|
modality_contributions[modality.value] = modality_weight |
|
|
confidence_scores.append(data.get("confidence", 0.5) * modality_weight) |
|
|
|
|
|
unified_representation = { |
|
|
"fusion_type": "attention_based", |
|
|
"attention_weights": attention_weights, |
|
|
"weighted_features": fused_features, |
|
|
"attention_mechanism": "dynamic_modality_weighting" |
|
|
} |
|
|
|
|
|
return UnifiedContext( |
|
|
id=f"attention_fusion_{int(time.time())}", |
|
|
source_inputs=list(fused_features.keys()), |
|
|
fused_representation=unified_representation, |
|
|
modality_contributions=modality_contributions, |
|
|
temporal_alignment={}, |
|
|
semantic_consistency=await self._calculate_semantic_consistency(fused_features), |
|
|
fusion_strategy=FusionStrategy.ATTENTION_BASED, |
|
|
confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 |
|
|
) |
|
|
|
|
|
async def _cross_attention_fusion( |
|
|
self, |
|
|
aligned_modalities: Dict[DataModality, Dict[str, Any]] |
|
|
) -> UnifiedContext: |
|
|
"""Perform cross-attention fusion.""" |
|
|
|
|
|
|
|
|
cross_attention_matrices = await self._calculate_cross_attention(aligned_modalities) |
|
|
|
|
|
|
|
|
fused_representations = {} |
|
|
modality_contributions = {} |
|
|
confidence_scores = [] |
|
|
|
|
|
for modality, data in aligned_modalities.items(): |
|
|
if data.get("status") == "success": |
|
|
|
|
|
cross_attention = cross_attention_matrices.get(modality, {}) |
|
|
features = data.get("features", {}) |
|
|
|
|
|
|
|
|
attended_features = {} |
|
|
for feature_name, feature_value in features.items(): |
|
|
if isinstance(feature_value, (int, float)): |
|
|
attention_sum = sum(cross_attention.get(other_mod, 0) |
|
|
for other_mod in aligned_modalities.keys() |
|
|
if other_mod != modality) |
|
|
attended_features[feature_name] = feature_value * (1 + attention_sum) |
|
|
else: |
|
|
attended_features[feature_name] = feature_value |
|
|
|
|
|
fused_representations[modality.value] = attended_features |
|
|
modality_contributions[modality.value] = data.get("confidence", 0.5) |
|
|
confidence_scores.append(data.get("confidence", 0.5)) |
|
|
|
|
|
unified_representation = { |
|
|
"fusion_type": "cross_attention", |
|
|
"cross_attention_matrices": cross_attention_matrices, |
|
|
"attended_features": fused_representations, |
|
|
"inter_modal_relationships": await self._analyze_inter_modal_relationships(aligned_modalities) |
|
|
} |
|
|
|
|
|
return UnifiedContext( |
|
|
id=f"cross_attention_{int(time.time())}", |
|
|
source_inputs=list(fused_representations.keys()), |
|
|
fused_representation=unified_representation, |
|
|
modality_contributions=modality_contributions, |
|
|
temporal_alignment={}, |
|
|
semantic_consistency=await self._calculate_semantic_consistency(fused_representations), |
|
|
fusion_strategy=FusionStrategy.CROSS_ATTENTION, |
|
|
confidence_aggregate=np.mean(confidence_scores) if confidence_scores else 0.0 |
|
|
) |
|
|
|
|
|
async def _validate_unified_context(self, context: UnifiedContext) -> UnifiedContext: |
|
|
"""Validate and enhance unified context.""" |
|
|
|
|
|
|
|
|
issues = [] |
|
|
|
|
|
if context.semantic_consistency < 0.3: |
|
|
issues.append("Low semantic consistency detected") |
|
|
|
|
|
if context.confidence_aggregate < 0.4: |
|
|
issues.append("Low aggregate confidence") |
|
|
|
|
|
if len(context.source_inputs) < 2: |
|
|
issues.append("Insufficient modalities for robust fusion") |
|
|
|
|
|
|
|
|
if issues: |
|
|
context.fused_representation["validation_issues"] = issues |
|
|
context.fused_representation["enhancement_applied"] = True |
|
|
|
|
|
|
|
|
if context.semantic_consistency < 0.5: |
|
|
context.semantic_consistency = min(0.8, context.semantic_consistency * 1.2) |
|
|
|
|
|
if context.confidence_aggregate < 0.5: |
|
|
context.confidence_aggregate = min(0.8, context.confidence_aggregate * 1.1) |
|
|
|
|
|
return context |
|
|
|
|
|
|
|
|
|
|
|
async def _combine_embeddings(self, features: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Combine embeddings from different modalities.""" |
|
|
combined = {} |
|
|
for modality, modality_features in features.items(): |
|
|
for feature_name, feature_value in modality_features.items(): |
|
|
combined_key = f"{modality}_{feature_name}" |
|
|
combined[combined_key] = feature_value |
|
|
return combined |
|
|
|
|
|
async def _detect_cross_modal_patterns(self, features: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
|
"""Detect patterns across modalities.""" |
|
|
patterns = [] |
|
|
modalities = list(features.keys()) |
|
|
|
|
|
|
|
|
for i, mod1 in enumerate(modalities): |
|
|
for mod2 in modalities[i+1:]: |
|
|
|
|
|
mod1_features = features[mod1] |
|
|
mod2_features = features[mod2] |
|
|
|
|
|
common_features = set(mod1_features.keys()) & set(mod2_features.keys()) |
|
|
if common_features: |
|
|
patterns.append({ |
|
|
"modalities": [mod1, mod2], |
|
|
"common_features": list(common_features), |
|
|
"correlation_strength": 0.7 |
|
|
}) |
|
|
|
|
|
return patterns |
|
|
|
|
|
async def _fuse_semantics(self, representations: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Fuse semantic representations.""" |
|
|
|
|
|
fused_semantics = {} |
|
|
|
|
|
|
|
|
all_semantics = [] |
|
|
for modality, representation in representations.items(): |
|
|
if isinstance(representation, dict): |
|
|
all_semantics.extend(representation.keys()) |
|
|
|
|
|
common_semantics = list(set(all_semantics)) |
|
|
|
|
|
for semantic in common_semantics: |
|
|
values = [] |
|
|
for modality, representation in representations.items(): |
|
|
if semantic in representation: |
|
|
values.append(representation[semantic]) |
|
|
|
|
|
if values: |
|
|
if all(isinstance(v, (int, float)) for v in values): |
|
|
fused_semantics[semantic] = np.mean(values) |
|
|
else: |
|
|
fused_semantics[semantic] = values[0] |
|
|
|
|
|
return fused_semantics |
|
|
|
|
|
async def _extract_consensus_features(self, representations: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Extract features with high consensus across modalities.""" |
|
|
consensus_features = {} |
|
|
|
|
|
|
|
|
feature_counts = defaultdict(int) |
|
|
for modality, representation in representations.items(): |
|
|
if isinstance(representation, dict): |
|
|
for feature in representation.keys(): |
|
|
feature_counts[feature] += 1 |
|
|
|
|
|
|
|
|
threshold = len(representations) * 0.5 |
|
|
consensus_features = { |
|
|
feature: self._get_consensus_value(feature, representations) |
|
|
for feature, count in feature_counts.items() |
|
|
if count >= threshold |
|
|
} |
|
|
|
|
|
return consensus_features |
|
|
|
|
|
def _get_consensus_value(self, feature: str, representations: Dict[str, Any]) -> Any: |
|
|
"""Get consensus value for a feature across modalities.""" |
|
|
values = [] |
|
|
for modality, representation in representations.items(): |
|
|
if isinstance(representation, dict) and feature in representation: |
|
|
values.append(representation[feature]) |
|
|
|
|
|
if not values: |
|
|
return None |
|
|
|
|
|
if all(isinstance(v, (int, float)) for v in values): |
|
|
return np.mean(values) |
|
|
else: |
|
|
|
|
|
from collections import Counter |
|
|
value_counts = Counter(values) |
|
|
return value_counts.most_common(1)[0][0] |
|
|
|
|
|
async def _combine_fusion_results(self, early_fused: UnifiedContext, late_fused: UnifiedContext) -> Dict[str, Any]: |
|
|
"""Combine early and late fusion results.""" |
|
|
return { |
|
|
"early_features": early_fused.fused_representation.get("combined_embedding", {}), |
|
|
"late_semantics": late_fused.fused_representation.get("fused_semantics", {}), |
|
|
"combined_score": (early_fused.confidence_aggregate + late_fused.confidence_aggregate) / 2 |
|
|
} |
|
|
|
|
|
async def _calculate_adaptive_weights(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[str, float]: |
|
|
"""Calculate adaptive weights for modalities.""" |
|
|
weights = {} |
|
|
|
|
|
for modality, data in modalities.items(): |
|
|
if data.get("status") == "success": |
|
|
|
|
|
confidence = data.get("confidence", 0.5) |
|
|
quality = data.get("quality_score", 0.5) |
|
|
weights[modality] = (confidence + quality) / 2 |
|
|
else: |
|
|
weights[modality] = 0.1 |
|
|
|
|
|
|
|
|
total_weight = sum(weights.values()) |
|
|
if total_weight > 0: |
|
|
weights = {k: v / total_weight for k, v in weights.items()} |
|
|
|
|
|
return weights |
|
|
|
|
|
async def _calculate_attention_weights(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, float]: |
|
|
"""Calculate attention weights for modalities.""" |
|
|
weights = {} |
|
|
|
|
|
for modality, data in modalities.items(): |
|
|
if data.get("status") == "success": |
|
|
|
|
|
confidence = data.get("confidence", 0.5) |
|
|
info_content = data.get("information_content", 0.5) |
|
|
weights[modality] = confidence * info_content |
|
|
else: |
|
|
weights[modality] = 0.1 |
|
|
|
|
|
|
|
|
total_weight = sum(weights.values()) |
|
|
if total_weight > 0: |
|
|
weights = {k: v / total_weight for k, v in weights.items()} |
|
|
|
|
|
return weights |
|
|
|
|
|
async def _calculate_cross_attention(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[DataModality, float]]: |
|
|
"""Calculate cross-attention between modalities.""" |
|
|
cross_attention = {} |
|
|
|
|
|
modalities_list = list(modalities.keys()) |
|
|
|
|
|
for i, mod1 in enumerate(modalities_list): |
|
|
cross_attention[mod1] = {} |
|
|
for mod2 in modalities_list: |
|
|
if mod1 != mod2: |
|
|
|
|
|
similarity = await self._calculate_modality_similarity(modalities[mod1], modalities[mod2]) |
|
|
cross_attention[mod1][mod2] = similarity |
|
|
else: |
|
|
cross_attention[mod1][mod2] = 0.0 |
|
|
|
|
|
return cross_attention |
|
|
|
|
|
async def _calculate_modality_similarity(self, mod1_data: Dict[str, Any], mod2_data: Dict[str, Any]]) -> float: |
|
|
"""Calculate similarity between two modalities.""" |
|
|
if mod1_data.get("status") != "success" or mod2_data.get("status") != "success": |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
conf1 = mod1_data.get("confidence", 0.5) |
|
|
conf2 = mod2_data.get("confidence", 0.5) |
|
|
|
|
|
|
|
|
similarity = 1 - abs(conf1 - conf2) |
|
|
return max(0.0, similarity) |
|
|
|
|
|
async def _analyze_inter_modal_relationships(self, modalities: Dict[DataModality, Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
|
"""Analyze relationships between modalities.""" |
|
|
relationships = [] |
|
|
|
|
|
modalities_list = list(modalities.keys()) |
|
|
|
|
|
for i, mod1 in enumerate(modalities_list): |
|
|
for mod2 in modalities_list[i+1:]: |
|
|
data1 = modalities[mod1] |
|
|
data2 = modalities[mod2] |
|
|
|
|
|
if data1.get("status") == "success" and data2.get("status") == "success": |
|
|
similarity = await self._calculate_modality_similarity(data1, data2) |
|
|
|
|
|
relationships.append({ |
|
|
"modalities": [mod1.value, mod2.value], |
|
|
"relationship_type": "complementary" if similarity > 0.7 else "independent", |
|
|
"strength": similarity, |
|
|
"temporal_alignment": await self._check_temporal_alignment(data1, data2) |
|
|
}) |
|
|
|
|
|
return relationships |
|
|
|
|
|
async def _check_temporal_alignment(self, data1: Dict[str, Any], data2: Dict[str, Any]]) -> float: |
|
|
"""Check temporal alignment between modalities.""" |
|
|
|
|
|
timestamp1 = data1.get("timestamp", datetime.utcnow()) |
|
|
timestamp2 = data2.get("timestamp", datetime.utcnow()) |
|
|
|
|
|
time_diff = abs((timestamp1 - timestamp2).total_seconds()) |
|
|
|
|
|
alignment_score = max(0, 1 - time_diff / 3600) |
|
|
return alignment_score |
|
|
|
|
|
async def _calculate_semantic_consistency(self, representations: Dict[str, Any]]) -> float: |
|
|
"""Calculate semantic consistency across modalities.""" |
|
|
if not representations: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
consistency_scores = [] |
|
|
|
|
|
|
|
|
all_semantics = [] |
|
|
for modality, representation in representations.items(): |
|
|
if isinstance(representation, dict): |
|
|
all_semantics.append(set(representation.keys())) |
|
|
|
|
|
if len(all_semantics) > 1: |
|
|
|
|
|
for i in range(len(all_semantics)): |
|
|
for j in range(i+1, len(all_semantics)): |
|
|
intersection = len(all_semantics[i] & all_semantics[j]) |
|
|
union = len(all_semantics[i] | all_semantics[j]) |
|
|
if union > 0: |
|
|
consistency_scores.append(intersection / union) |
|
|
|
|
|
return np.mean(consistency_scores) if consistency_scores else 0.5 |
|
|
|
|
|
|
|
|
|
|
|
async def _temporal_alignment(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[str, Any]]: |
|
|
"""Align modalities temporally.""" |
|
|
alignment_results = {} |
|
|
|
|
|
for modality, data in modalities.items(): |
|
|
if data.get("status") == "success": |
|
|
timestamp = data.get("timestamp", datetime.utcnow()) |
|
|
alignment_results[modality] = { |
|
|
"timestamp": timestamp.isoformat(), |
|
|
"time_category": self._categorize_time(timestamp), |
|
|
"temporal_priority": self._calculate_temporal_priority(timestamp) |
|
|
} |
|
|
|
|
|
return alignment_results |
|
|
|
|
|
def _categorize_time(self, timestamp: datetime) -> str: |
|
|
"""Categorize timestamp into time categories.""" |
|
|
now = datetime.utcnow() |
|
|
age_seconds = (now - timestamp).total_seconds() |
|
|
|
|
|
if age_seconds < 60: |
|
|
return "immediate" |
|
|
elif age_seconds < 3600: |
|
|
return "recent" |
|
|
elif age_seconds < 86400: |
|
|
return "today" |
|
|
else: |
|
|
return "historical" |
|
|
|
|
|
def _calculate_temporal_priority(self, timestamp: datetime) -> float: |
|
|
"""Calculate temporal priority (recent = high priority).""" |
|
|
now = datetime.utcnow() |
|
|
age_seconds = (now - timestamp).total_seconds() |
|
|
return max(0, 1 - age_seconds / 86400) |
|
|
|
|
|
async def _semantic_alignment(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[str, Any]]: |
|
|
"""Align modalities semantically.""" |
|
|
alignment_results = {} |
|
|
|
|
|
for modality, data in modalities.items(): |
|
|
if data.get("status") == "success": |
|
|
features = data.get("features", {}) |
|
|
semantic_tags = data.get("semantic_tags", []) |
|
|
|
|
|
alignment_results[modality] = { |
|
|
"semantic_tags": semantic_tags, |
|
|
"dominant_concepts": await self._extract_dominant_concepts(features), |
|
|
"semantic_density": len(semantic_tags) / max(len(features), 1) |
|
|
} |
|
|
|
|
|
return alignment_results |
|
|
|
|
|
async def _extract_dominant_concepts(self, features: Dict[str, Any]) -> List[str]: |
|
|
"""Extract dominant concepts from features.""" |
|
|
|
|
|
concepts = [] |
|
|
|
|
|
for feature_name, feature_value in features.items(): |
|
|
if isinstance(feature_value, str) and len(feature_value) > 3: |
|
|
concepts.append(feature_value.lower()) |
|
|
elif isinstance(feature_name, str) and len(feature_name) > 3: |
|
|
concepts.append(feature_name.lower()) |
|
|
|
|
|
return list(set(concepts))[:5] |
|
|
|
|
|
async def _structural_alignment(self, modalities: Dict[DataModality, Dict[str, Any]]) -> Dict[DataModality, Dict[str, Any]]: |
|
|
"""Align modalities structurally.""" |
|
|
alignment_results = {} |
|
|
|
|
|
for modality, data in modalities.items(): |
|
|
if data.get("status") == "success": |
|
|
features = data.get("features", {}) |
|
|
|
|
|
alignment_results[modality] = { |
|
|
"structure_type": self._determine_structure_type(features), |
|
|
"complexity_score": self._calculate_complexity_score(features), |
|
|
"organization_pattern": self._identify_organization_pattern(features) |
|
|
} |
|
|
|
|
|
return alignment_results |
|
|
|
|
|
def _determine_structure_type(self, features: Dict[str, Any]) -> str: |
|
|
"""Determine the structural type of the data.""" |
|
|
if not features: |
|
|
return "minimal" |
|
|
|
|
|
|
|
|
if all(isinstance(v, (int, float)) for v in features.values()): |
|
|
return "numerical" |
|
|
elif all(isinstance(v, str) for v in features.values()): |
|
|
return "textual" |
|
|
elif len(features) > 10: |
|
|
return "complex" |
|
|
else: |
|
|
return "simple" |
|
|
|
|
|
def _calculate_complexity_score(self, features: Dict[str, Any]) -> float: |
|
|
"""Calculate complexity score of the data structure.""" |
|
|
if not features: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
type_counts = defaultdict(int) |
|
|
for value in features.values(): |
|
|
type_counts[type(value).__name__] += 1 |
|
|
|
|
|
type_diversity = len(type_counts) |
|
|
feature_count = len(features) |
|
|
|
|
|
|
|
|
complexity = (feature_count / 20) * 0.6 + (type_diversity / 4) * 0.4 |
|
|
return min(1.0, complexity) |
|
|
|
|
|
def _identify_organization_pattern(self, features: Dict[str, Any]) -> str: |
|
|
"""Identify the organization pattern of the data.""" |
|
|
if not features: |
|
|
return "none" |
|
|
|
|
|
|
|
|
feature_names = list(features.keys()) |
|
|
|
|
|
if any("time" in name.lower() or "date" in name.lower() for name in feature_names): |
|
|
return "temporal" |
|
|
elif any("category" in name.lower() or "type" in name.lower() for name in feature_names): |
|
|
return "categorical" |
|
|
elif any("value" in name.lower() or "amount" in name.lower() for name in feature_names): |
|
|
return "quantitative" |
|
|
else: |
|
|
return "mixed" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TextProcessor: |
|
|
"""Processor for text modality.""" |
|
|
|
|
|
async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
|
|
"""Process text inputs.""" |
|
|
if not inputs: |
|
|
return {"status": "error", "error": "No text inputs"} |
|
|
|
|
|
|
|
|
combined_text = " ".join([inp.content for inp in inputs if isinstance(inp.content, str)]) |
|
|
|
|
|
|
|
|
features = { |
|
|
"text_length": len(combined_text), |
|
|
"word_count": len(combined_text.split()), |
|
|
"sentence_count": combined_text.count('.') + combined_text.count('!') + combined_text.count('?'), |
|
|
"complexity_score": self._calculate_text_complexity(combined_text) |
|
|
} |
|
|
|
|
|
|
|
|
semantic_representation = await self._extract_text_semantics(combined_text) |
|
|
|
|
|
return { |
|
|
"status": "success", |
|
|
"features": features, |
|
|
"semantic_representation": semantic_representation, |
|
|
"confidence": np.mean([inp.confidence for inp in inputs]), |
|
|
"quality_score": np.mean([inp.quality_score for inp in inputs]), |
|
|
"timestamp": max(inp.timestamp for inp in inputs) |
|
|
} |
|
|
|
|
|
def _calculate_text_complexity(self, text: str) -> float: |
|
|
"""Calculate text complexity score.""" |
|
|
if not text: |
|
|
return 0.0 |
|
|
|
|
|
words = text.split() |
|
|
avg_word_length = np.mean([len(word) for word in words]) if words else 0 |
|
|
sentence_count = max(1, text.count('.') + text.count('!') + text.count('?')) |
|
|
avg_sentence_length = len(words) / sentence_count |
|
|
|
|
|
|
|
|
complexity = (avg_word_length / 10) * 0.4 + (avg_sentence_length / 20) * 0.6 |
|
|
return min(1.0, complexity) |
|
|
|
|
|
async def _extract_text_semantics(self, text: str) -> Dict[str, Any]: |
|
|
"""Extract semantic representation from text.""" |
|
|
|
|
|
words = text.lower().split() |
|
|
|
|
|
|
|
|
concepts = [] |
|
|
for word in words: |
|
|
if len(word) > 4: |
|
|
concepts.append(word) |
|
|
|
|
|
|
|
|
topics = [] |
|
|
if any(word in text.lower() for word in ["business", "company", "revenue"]): |
|
|
topics.append("business") |
|
|
if any(word in text.lower() for word in ["technology", "system", "software"]): |
|
|
topics.append("technology") |
|
|
if any(word in text.lower() for word in ["data", "information", "analysis"]): |
|
|
topics.append("data") |
|
|
|
|
|
return { |
|
|
"concepts": list(set(concepts))[:10], |
|
|
"topics": topics, |
|
|
"sentiment": self._analyze_sentiment(text), |
|
|
"entities": [] |
|
|
} |
|
|
|
|
|
def _analyze_sentiment(self, text: str) -> str: |
|
|
"""Simple sentiment analysis.""" |
|
|
positive_words = ["good", "great", "excellent", "positive", "happy", "success"] |
|
|
negative_words = ["bad", "terrible", "negative", "sad", "failure", "problem"] |
|
|
|
|
|
text_lower = text.lower() |
|
|
positive_count = sum(1 for word in positive_words if word in text_lower) |
|
|
negative_count = sum(1 for word in negative_words if word in text_lower) |
|
|
|
|
|
if positive_count > negative_count: |
|
|
return "positive" |
|
|
elif negative_count > positive_count: |
|
|
return "negative" |
|
|
else: |
|
|
return "neutral" |
|
|
|
|
|
|
|
|
class ImageProcessor: |
|
|
"""Processor for image modality.""" |
|
|
|
|
|
async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
|
|
"""Process image inputs.""" |
|
|
if not inputs: |
|
|
return {"status": "error", "error": "No image inputs"} |
|
|
|
|
|
|
|
|
image_input = inputs[0] |
|
|
|
|
|
|
|
|
features = { |
|
|
"image_size": image_input.metadata.get("size", 0), |
|
|
"format": image_input.metadata.get("format", "unknown"), |
|
|
"color_diversity": image_input.metadata.get("color_diversity", 0.5), |
|
|
"complexity_score": image_input.metadata.get("complexity", 0.5) |
|
|
} |
|
|
|
|
|
|
|
|
semantic_representation = await self._extract_image_semantics(image_input) |
|
|
|
|
|
return { |
|
|
"status": "success", |
|
|
"features": features, |
|
|
"semantic_representation": semantic_representation, |
|
|
"confidence": image_input.confidence, |
|
|
"quality_score": image_input.quality_score, |
|
|
"timestamp": image_input.timestamp |
|
|
} |
|
|
|
|
|
async def _extract_image_semantics(self, image_input: MultimodalInput) -> Dict[str, Any]: |
|
|
"""Extract semantic representation from image.""" |
|
|
|
|
|
metadata = image_input.metadata |
|
|
|
|
|
return { |
|
|
"objects": metadata.get("objects", []), |
|
|
"colors": metadata.get("dominant_colors", []), |
|
|
"scenes": metadata.get("scene_types", []), |
|
|
"text_content": metadata.get("extracted_text", ""), |
|
|
"visual_concepts": metadata.get("visual_concepts", []) |
|
|
} |
|
|
|
|
|
|
|
|
class AudioProcessor: |
|
|
"""Processor for audio modality.""" |
|
|
|
|
|
async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
|
|
"""Process audio inputs.""" |
|
|
if not inputs: |
|
|
return {"status": "error", "error": "No audio inputs"} |
|
|
|
|
|
|
|
|
audio_input = inputs[0] |
|
|
|
|
|
|
|
|
features = { |
|
|
"duration": audio_input.metadata.get("duration", 0), |
|
|
"sample_rate": audio_input.metadata.get("sample_rate", 44100), |
|
|
"channels": audio_input.metadata.get("channels", 1), |
|
|
"frequency_content": audio_input.metadata.get("frequency_profile", {}) |
|
|
} |
|
|
|
|
|
|
|
|
semantic_representation = await self._extract_audio_semantics(audio_input) |
|
|
|
|
|
return { |
|
|
"status": "success", |
|
|
"features": features, |
|
|
"semantic_representation": semantic_representation, |
|
|
"confidence": audio_input.confidence, |
|
|
"quality_score": audio_input.quality_score, |
|
|
"timestamp": audio_input.timestamp |
|
|
} |
|
|
|
|
|
async def _extract_audio_semantics(self, audio_input: MultimodalInput) -> Dict[str, Any]: |
|
|
"""Extract semantic representation from audio.""" |
|
|
metadata = audio_input.metadata |
|
|
|
|
|
return { |
|
|
"speech_content": metadata.get("transcribed_text", ""), |
|
|
"speaker_count": metadata.get("speaker_count", 1), |
|
|
"emotion": metadata.get("emotion", "neutral"), |
|
|
"language": metadata.get("language", "unknown"), |
|
|
"audio_quality": metadata.get("quality_score", 0.5) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
class VideoProcessor: |
|
|
"""Processor for video modality.""" |
|
|
|
|
|
async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
|
|
return {"status": "success", "features": {}, "semantic_representation": {}} |
|
|
|
|
|
|
|
|
class SensorProcessor: |
|
|
"""Processor for sensor modality.""" |
|
|
|
|
|
async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
|
|
return {"status": "success", "features": {}, "semantic_representation": {}} |
|
|
|
|
|
|
|
|
class TableProcessor: |
|
|
"""Processor for table modality.""" |
|
|
|
|
|
async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
|
|
return {"status": "success", "features": {}, "semantic_representation": {}} |
|
|
|
|
|
|
|
|
class CodeProcessor: |
|
|
"""Processor for code modality.""" |
|
|
|
|
|
async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
|
|
return {"status": "success", "features": {}, "semantic_representation": {}} |
|
|
|
|
|
|
|
|
class StructuredProcessor: |
|
|
"""Processor for structured data modality.""" |
|
|
|
|
|
async def process(self, inputs: List[MultimodalInput]) -> Dict[str, Any]: |
|
|
return {"status": "success", "features": {}, "semantic_representation": {}} |
|
|
|
|
|
|
|
|
|
|
|
class MultimodalContextProcessor: |
|
|
"""Integrated multimodal context processing system.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.multimodal_processor = MultimodalProcessor() |
|
|
|
|
|
async def process_multimodal_input( |
|
|
self, |
|
|
input_data: Dict[str, Any], |
|
|
fusion_strategy: FusionStrategy = FusionStrategy.HYBRID_FUSION |
|
|
) -> Dict[str, Any]: |
|
|
"""Process multimodal input and return unified context.""" |
|
|
|
|
|
|
|
|
multimodal_inputs = [] |
|
|
|
|
|
for modality_str, content_list in input_data.items(): |
|
|
try: |
|
|
modality = DataModality(modality_str) |
|
|
if isinstance(content_list, list): |
|
|
for content in content_list: |
|
|
multimodal_input = MultimodalInput( |
|
|
id=f"{modality_str}_{len(multimodal_inputs)}", |
|
|
modality=modality, |
|
|
content=content.get("content", content), |
|
|
metadata=content.get("metadata", {}), |
|
|
timestamp=datetime.utcnow(), |
|
|
quality_score=content.get("quality_score", 0.8), |
|
|
confidence=content.get("confidence", 0.8) |
|
|
) |
|
|
multimodal_inputs.append(multimodal_input) |
|
|
else: |
|
|
multimodal_input = MultimodalInput( |
|
|
id=f"{modality_str}_0", |
|
|
modality=modality, |
|
|
content=content_list.get("content", content_list), |
|
|
metadata=content_list.get("metadata", {}), |
|
|
timestamp=datetime.utcnow(), |
|
|
quality_score=content_list.get("quality_score", 0.8), |
|
|
confidence=content_list.get("confidence", 0.8) |
|
|
) |
|
|
multimodal_inputs.append(multimodal_input) |
|
|
except ValueError: |
|
|
logger.warning(f"Unknown modality: {modality_str}") |
|
|
|
|
|
|
|
|
unified_context = await self.multimodal_processor.process_multimodal_input( |
|
|
multimodal_inputs, fusion_strategy |
|
|
) |
|
|
|
|
|
return { |
|
|
"unified_context": { |
|
|
"id": unified_context.id, |
|
|
"fusion_strategy": unified_context.fusion_strategy.value, |
|
|
"modality_contributions": unified_context.modality_contributions, |
|
|
"semantic_consistency": unified_context.semantic_consistency, |
|
|
"confidence_aggregate": unified_context.confidence_aggregate, |
|
|
"fused_representation": unified_context.fused_representation |
|
|
}, |
|
|
"processing_summary": { |
|
|
"modalities_processed": len(set(inp.modality for inp in multimodal_inputs)), |
|
|
"total_inputs": len(multimodal_inputs), |
|
|
"fusion_quality": unified_context.confidence_aggregate |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("Multimodal Context Processing System Initialized") |
|
|
print("=" * 60) |
|
|
processor = MultimodalContextProcessor() |
|
|
print("Ready for advanced multimodal context processing and fusion!") |