mvi-ai-engine / core /mvi_ai_full.py
Musombi's picture
Update core/mvi_ai_full.py
bed749e
import os
import tempfile
import shutil
import torch
import torch.nn.functional as F
import contextlib
import io as sysio
from typing import Optional, Dict, Any
from PIL import Image
import torchvision.transforms as T
import torchvision.io as tvio
import torchaudio
# ================= LANGUAGE =================
from language.tokenizer import SimpleTokenizer
from language.embeddings import EmbeddingLayer
from language.encoder import SentenceEncoder
from language.intent import IntentClassifier
from language.expertise_encoder import ExpertiseEncoder
from language.programming_encoder import ProgrammingEncoder
from language.expertise_embedding import ExpertiseEmbedding
from language.expertise_classifier import ExpertiseClassifier
from language.programming_embedding import ProgrammingEmbedding
from language.programming_classifier import ProgrammingClassifier
from language.models import SoilModel, VisionModel
from language.multi_model import MultiTheologyModel
# ================= EMOTION =================
from emotion.sentiment_model import SentimentRegressor
from emotion.emotion_mapper import map_sentiment_to_emotion
from emotion.tracker import EmotionTracker
# ================= MEDIA =================
from vision.image_encoder import ImageEncoder
from vision.video_encoder import VideoEncoder
from core.voice_encoder import VoiceEncoder
from core.humanizer_encoder import HumanizerEncoder
from core.music_generator import MusicGenerator
# ================= MEMORY =================
from memory.short_term import ShortTermMemory
from memory.long_term import LongTermMemory
# ================= REASONING =================
from reasoning.planner import ReasoningPlanner
from reasoning.adaptive_strategy import AdaptiveStrategy
from reasoning.persona import PersonaController
from core.model_registry import ModelRegistry
from reasoning.system_prompt import system_prompt
from core.response_engine import ResponseEngine
from models.global_brain_adapter import GlobalBrainAdapter
from core.business_model import BusinessModel
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
ARTIFACTS_DIR = "artifacts"
HIDDEN_DIM = 128
class MVI_AI:
def __init__(self):
# ===== MEMORY =====
self.stm = ShortTermMemory(max_len=10)
self.ltm = LongTermMemory(dim=HIDDEN_DIM)
self.emotion_tracker = EmotionTracker(window_size=6)
# ===== REASONING =====
self.persona = PersonaController()
self.planner = ReasoningPlanner()
self.strategy = AdaptiveStrategy()
self.registry = ModelRegistry()
self.response_engine = ResponseEngine(
ltm=self.ltm,
registry=self.registry
)
self.intent_classifier = None
self.expertise_classifier = None
self.programming_classifier = None
# ===== TOKENIZER =====
self.tokenizer = SimpleTokenizer()
self.tokenizer.load_vocab(os.path.join(ARTIFACTS_DIR, "vocab.json"))
# ===== LANGUAGE =====
self.embedder = EmbeddingLayer(
self.tokenizer.vocab_size,
pad_index=self.tokenizer.vocab.get(self.tokenizer.PAD_TOKEN, self.tokenizer.vocab["<pad>"])
).to(DEVICE)
self.encoder = SentenceEncoder(
vocab_size=self.tokenizer.vocab_size
).to(DEVICE)
encoder_dim = self.encoder.projection.out_features
# Cognitive fusion projection head
self.cognitive_projection = torch.nn.Linear(
encoder_dim,
encoder_dim
).to(DEVICE)
self.intent_classifier = IntentClassifier(
input_dim=encoder_dim,
intent_labels=[
"question", "advice", "statement", "greeting",
"farewell", "opinion", "command",
"complaint", "praise", "emotion", "other"
]
).to(DEVICE)
self.sentiment_model = SentimentRegressor(
input_dim=encoder_dim,
output_dim=6
).to(DEVICE)
self.expertise_encoder = ExpertiseEncoder(
input_dim=encoder_dim,
hidden_dim=HIDDEN_DIM,
output_dim=HIDDEN_DIM
).to(DEVICE)
self.programming_encoder = ProgrammingEncoder(
input_dim=encoder_dim,
hidden_dim=HIDDEN_DIM,
output_dim=HIDDEN_DIM
).to(DEVICE)
self.business_model = BusinessModel().to(DEVICE)
self.expertise_embedding = ExpertiseEmbedding().to(DEVICE)
self.expertise_classifier = ExpertiseClassifier().to(DEVICE)
self.programming_embedding = ProgrammingEmbedding().to(DEVICE)
self.programming_classifier = ProgrammingClassifier().to(DEVICE)
self.agriculture_soil_model = SoilModel().to(DEVICE)
self.agriculture_vision_model = VisionModel().to(DEVICE)
self.music_generator = MusicGenerator().to(DEVICE)
self.multi_theology_model = MultiTheologyModel(
vocab_size=self.tokenizer.vocab_size
).to(DEVICE)
self.global_brain_adapter = torch.nn.Sequential(
torch.nn.Linear(128, 256),
torch.nn.ReLU(),
torch.nn.Linear(256, 512)
).to(DEVICE)
# Load trained weights
adapter_path = os.path.join(ARTIFACTS_DIR, "global_brain_adapter.pt")
self._safe_load(self.global_brain_adapter, adapter_path)
self.registry_projection = torch.nn.Linear(
4096, # large enough buffer
encoder_dim
).to(DEVICE)
# ===== MEDIA =====
self.image_encoder = ImageEncoder(embed_dim=HIDDEN_DIM).to(DEVICE)
self.video_encoder = VideoEncoder(embed_dim=HIDDEN_DIM).to(DEVICE)
self.voice_encoder = VoiceEncoder(embed_dim=80).to(DEVICE)
self.humanizer = HumanizerEncoder(
os.path.join(ARTIFACTS_DIR, "humanizer_model.pt"),
DEVICE
)
self.extra_artifacts = {}
self._load_all_artifacts()
self._set_eval()
# ===== ARTIFACT LOADING =====
def _safe_load(self, model, path):
checkpoint = torch.load(path, map_location=DEVICE)
model_dict = model.state_dict()
filtered = {k: v for k, v in checkpoint.items()
if k in model_dict and v.shape == model_dict[k].shape}
model_dict.update(filtered)
model.load_state_dict(model_dict)
def _load_all_artifacts(self):
expected_files = {
"embedding_layer.pt": self.embedder,
"sentence_encoder.pt": self.encoder,
"intent_classifier.pt": self.intent_classifier,
"sentiment_regressor.pt": self.sentiment_model,
"expertise_encoder.pt": self.expertise_encoder,
"programming_encoder.pt": self.programming_encoder,
"image_encoder.pt": self.image_encoder,
"video_encoder.pt": self.video_encoder,
"voice_encoder_best.pt": self.voice_encoder,
"expertise_embedding.pt": self.expertise_embedding,
"expertise_classifier.pt": self.expertise_classifier,
"programming_embedding.pt": self.programming_embedding,
"programming_classifier.pt": self.programming_classifier,
"agriculture_soil_model.pt": self.agriculture_soil_model,
"agriculture_vision_model.pt": self.agriculture_vision_model,
"multi_theology_model_final.pt": self.multi_theology_model,
"global_brain_adapter.pt": self.global_brain_adapter,
"business_model.pt": self.business_model,
"music_generator.pt": self.music_generator,
}
for file in os.listdir(ARTIFACTS_DIR):
if file == "vocab.json":
continue
path = os.path.join(ARTIFACTS_DIR, file)
name = file.replace(".pt", "")
self.registry.register(
"global_brain_adapter",
self.global_brain_adapter,
task="external"
)
# ---------- Special case: responder.pt ----------
if file == "responder.pt":
from language.responder import Responder
import torch.nn as nn
checkpoint = torch.load(path, map_location=DEVICE)
# Create model and apply dynamic quantization
model = Responder(
vocab_size=len(self.tokenizer.vocab),
embed_dim=128,
hidden_dim=512
).to(DEVICE)
model = torch.quantization.quantize_dynamic(model, {nn.Linear}, dtype=torch.qint8)
# load quantized state_dict
model.load_state_dict(checkpoint)
self.extra_artifacts["responder"] = model
self.registry.register("responder", model, task="generation")
continue
# ---------- Humanizer ----------
elif file == "humanizer_model.pt":
self.registry.register(
"humanizer",
self.humanizer.model,
tokenizer=self.humanizer.tokenizer,
task="generation"
)
# ---------- Unknown artifacts ----------
else:
try:
artifact = torch.load(path, map_location=DEVICE)
self.extra_artifacts[name] = artifact
self.registry.register(
name,
artifact,
task="external"
)
except Exception:
print(f"[REGISTRY] skipped artifact {file}")
def _set_eval(self):
models = [
self.embedder, self.encoder, self.intent_classifier,
self.sentiment_model, self.expertise_encoder,
self.programming_encoder, self.expertise_embedding,
self.expertise_classifier, self.programming_embedding,
self.programming_classifier, self.image_encoder,
self.video_encoder, self.voice_encoder
]
for m in models:
m.eval()
def _run_registry_models(self, core_vec, result):
"""
Dynamically run models stored in the registry.
Each model can contribute to the reasoning state.
"""
outputs = {}
for name in self.registry.list_models():
model = self.registry.get(name)
# Skip models that aren't callable
if not hasattr(model, "forward"):
continue
try:
out = model(core_vec)
if isinstance(out, torch.Tensor):
outputs[name] = out
except Exception:
# Some models may require different inputs
continue
result["registry_outputs"] = outputs
return outputs
# ===== MULTI-MODAL FUSION =====
def _multimodal_fusion(self, embeddings):
cleaned = []
for e in embeddings:
if e is None:
continue
if not isinstance(e, torch.Tensor):
e = torch.tensor(e, dtype=torch.float32)
# force shape (1, D)
if e.ndim == 1:
e = e.unsqueeze(0)
cleaned.append(e)
if not cleaned:
return None
stacked = torch.stack(cleaned, dim=1) # (B, N, D)
scores = torch.mean(stacked, dim=-1) # (B, N)
weights = torch.softmax(scores, dim=1).unsqueeze(-1)
fused = torch.sum(stacked * weights, dim=1)
return F.normalize(fused, dim=-1)
# ===== MEMORY ATTENTION =====
def _attention_fusion(self, current_vec):
# ensure tensor
if not isinstance(current_vec, torch.Tensor):
current_vec = torch.tensor(current_vec, dtype=torch.float32)
# force shape (1, dim)
if current_vec.ndim == 1:
current_vec = current_vec.unsqueeze(0)
memory_vecs = self.ltm.retrieve_vectors(current_vec, k=5)
if memory_vecs is None:
return current_vec.squeeze(0)
# convert to tensor
memory_vecs = memory_vecs.clone().detach().float().to(current_vec.device)
dim = current_vec.shape[1]
# force correct reshape
memory_vecs = memory_vecs.reshape(-1, dim)
# attention
scores = torch.matmul(current_vec, memory_vecs.T)
weights = torch.softmax(scores, dim=-1)
fused = torch.matmul(weights, memory_vecs)
output = (current_vec + fused) / 2
return torch.nn.functional.normalize(output, dim=-1).squeeze(0)
# ===== SAFE EXEC =====
def _safe_exec(self, code: str) -> Dict[str, Any]:
allowed = {
"print": print, "range": range, "len": len,
"int": int, "float": float, "str": str,
"list": list, "dict": dict
}
buffer = sysio.StringIO()
try:
with contextlib.redirect_stdout(buffer):
exec(code, {"__builtins__": allowed}, {})
except Exception as e:
return {"error": str(e)}
return {"output": buffer.getvalue()}
# ===== MULTI-MODAL PREDICT =====
def predict(
self,
text: Optional[str] = None,
image: Optional[str] = None,
audio: Optional[str] = None,
video: Optional[str] = None,
system_prompt: Optional[str] = None
) -> Dict[str, Any]:
result = {}
with torch.no_grad():
sentence_vec = None
expertise_vec = None
programming_vec = None
img_emb = None
audio_emb = None
video_emb = None
# ----- IMAGE -----
if image:
img = Image.open(image).convert("RGB")
img = T.Resize((224, 224))(img)
img_tensor = T.ToTensor()(img).unsqueeze(0).to(DEVICE)
img_emb = F.normalize(self.image_encoder(img_tensor), dim=-1)
result["image_embedding"] = img_emb
# ----- AUDIO -----
if audio:
waveform, sr = torchaudio.load(audio)
waveform = waveform.mean(dim=0, keepdim=True).to(DEVICE)
audio_emb = F.normalize(self.voice_encoder(waveform), dim=-1)
result["audio_embedding"] = audio_emb
# ----- VIDEO -----
if video:
video_tensor = tvio.read_video(video, pts_unit="sec")[0]
video_tensor = video_tensor.permute(0, 3, 1, 2).float() / 255.0
video_emb = F.normalize(self.video_encoder(video_tensor), dim=-1)
result["video_embedding"] = video_emb
# ----- TEXT -----
if text:
tokens = torch.tensor([self.tokenizer.encode(text)], dtype=torch.long).to(DEVICE)
mask = (tokens != self.tokenizer.vocab[self.tokenizer.PAD_TOKEN]).long()
sentence_vec = F.normalize(self.encoder(tokens, mask), dim=-1)
programming_encoded = self.programming_encoder(sentence_vec)
programming_vec = F.normalize(self.programming_embedding(programming_encoded), dim=-1)
expertise_encoded = self.expertise_encoder(sentence_vec)
expertise_vec = F.normalize(self.expertise_embedding(expertise_encoded), dim=-1)
# ----- EXPERTISE INFERENCE -----
expertise_logits = self.expertise_classifier(expertise_vec)
expertise_idx = torch.argmax(expertise_logits, dim=-1)
# ----- DOMAIN MODEL SELECTION -----
domain_model = None
if expertise_idx.item() == 0:
domain_model = self.agriculture_soil_model
elif expertise_idx.item() == 1:
domain_model = self.agriculture_vision_model
elif expertise_idx.item() == 2:
domain_model = self.multi_theology_model
elif expertise_idx.item() == 3:
domain_model = self.programming_encoder
elif expertise_idx.item() == 4:
domain_model = self.business_model
elif expertise_idx.item() == 5:
domain_model = self.music_generator
# Compute domain output if applicable
domain_output = None
if domain_model is not None:
try:
domain_output = domain_model(sentence_vec) # or fused_vector if you prefer
except Exception as e:
print(f"[DOMAIN MODEL] Error: {e}")
result["domain_output"] = domain_output
sentence_vec = self._attention_fusion(sentence_vec)
# ----- TRUE MULTIMODAL FUSION -----
fused_vector = self._multimodal_fusion([
sentence_vec,
expertise_vec,
programming_vec,
img_emb,
audio_emb,
video_emb
])
if fused_vector is not None:
result["fused_embedding"] = fused_vector
# Check if memory already has similar knowledge
similar = self.ltm.retrieve_vectors(fused_vector, k=1)
store_memory = True
if similar is not None:
sim_score = torch.cosine_similarity(
fused_vector,
similar,
dim=-1
).item()
# if too similar, don't store
if sim_score > 0.92:
store_memory = False
if store_memory:
self.ltm.store(
fused_vector.detach().cpu(),
None # don't store raw text
)
# ===== COGNITIVE STATE CONSTRUCTION =====
if text and fused_vector is not None:
# Incorporate system_prompt embedding if available
if system_prompt:
tokens_prompt = torch.tensor(
[self.tokenizer.encode(system_prompt)],
dtype=torch.long
).to(DEVICE)
mask_prompt = (tokens_prompt != self.tokenizer.vocab[self.tokenizer.PAD_TOKEN]).long()
prompt_vec = F.normalize(self.encoder(tokens_prompt, mask_prompt), dim=-1)
# fuse with sentence_vec
sentence_vec = self._multimodal_fusion([sentence_vec, prompt_vec])
core_vec = self.cognitive_projection(fused_vector)
registry_outputs = self._run_registry_models(core_vec, result)
# Intent
intent_logits = self.intent_classifier(core_vec)
intent_idx = torch.argmax(intent_logits, dim=-1)
# Sentiment
sentiment_raw = self.sentiment_model(core_vec)
emotion = map_sentiment_to_emotion(sentiment_raw)
# Expertise
expertise_logits = self.expertise_classifier(expertise_vec)
expertise_idx = torch.argmax(expertise_logits, dim=-1)
# Programming
programming_logits = self.programming_classifier(programming_vec)
programming_idx = torch.argmax(programming_logits, dim=-1)
# Convert decisions to embeddings (so ALL models influence generation)
intent_onehot = F.one_hot(
intent_idx,
num_classes=len(self.intent_classifier.intent_labels)
).float()
expertise_onehot = F.one_hot(
expertise_idx,
num_classes=expertise_logits.shape[-1]
).float()
programming_onehot = F.one_hot(
programming_idx,
num_classes=programming_logits.shape[-1]
).float()
# ===== FINAL UNIFIED COGNITIVE VECTOR =====
extra_embeddings = []
for v in registry_outputs.values():
if not isinstance(v, torch.Tensor):
continue
# convert (D) -> (1,D)
if v.ndim == 1:
v = v.unsqueeze(0)
# convert (B,T,D) -> (B,D)
if v.ndim == 3:
v = v.mean(dim=1)
# ensure same batch dimension
if v.ndim != 2:
continue
# project to same dimension if needed
if v.shape[-1] != core_vec.shape[-1]:
v = F.adaptive_avg_pool1d(
v.unsqueeze(1),
core_vec.shape[-1]
).squeeze(1)
extra_embeddings.append(v)
cognitive_state = torch.cat([
core_vec,
sentiment_raw,
expertise_vec,
programming_vec,
intent_onehot,
expertise_onehot,
programming_onehot,
*extra_embeddings
], dim=-1)
# ===== GENERATE FINAL RESPONSE =====
if cognitive_state.shape[-1] > self.registry_projection.in_features:
cognitive_state = cognitive_state[:, :self.registry_projection.in_features]
pad_size = max(0, self.registry_projection.in_features - cognitive_state.shape[-1])
compressed_state = self.registry_projection(
F.pad(cognitive_state, (0, pad_size))
)
brain_input = self.global_brain_adapter(compressed_state)
intent_label = self.intent_classifier.intent_labels[intent_idx.item()]
responder = self.registry.get("responder")
ai_answer = None # safety
# ================= 🧠 RESPONSE ENGINE INTEGRATION =================
engine_output = self.response_engine.generate(
text=text,
intent=intent_label,
emotion=emotion["label"],
model_outputs=registry_outputs,
system_prompt=system_prompt
)
prompt_text = engine_output.get("context", text)
# ================= 🤖 RESPONDER =================
ai_answer = None
responder = self.registry.get("responder")
if responder:
try:
ai_answer = responder.generate_from_brain(
brain_input=brain_input,
tokenizer=self.tokenizer,
max_len=128
)
except Exception as e:
print(f"[RESPONDER ERROR] {e}")
if not ai_answer:
ai_answer = "[Responder failed to produce output]"
# ================= 🎨 HUMANIZER =================
generated_response = self.humanizer.generate(
text=ai_answer,
max_length=128
)
result["response"] = generated_response
brain_input = self.registry.get("global_brain_adapter")(compressed_state)
return result
def stream_generate(
self,
text: Optional[str] = None,
image: Optional[str] = None,
audio: Optional[str] = None,
video: Optional[str] = None
):
"""
True multimodal streaming generator.
"""
# Run full multimodal inference first (same pipeline as predict)
inference = self.predict(
text=text,
image=image,
audio=audio,
video=video
)
fused_embedding = inference.get("fused_embedding", None)
if fused_embedding is None:
return
# Cognitive reasoning state
core_vec = self.cognitive_projection(fused_embedding)
# Decision heads
intent_logits = self.intent_classifier(core_vec)
sentiment_raw = self.sentiment_model(core_vec)
emotion = map_sentiment_to_emotion(sentiment_raw)
intent_idx = torch.argmax(intent_logits, dim=-1)
# Build cognitive fusion state
intent_onehot = F.one_hot(
intent_idx,
num_classes=len(self.intent_classifier.intent_labels)
).float()
cognitive_state = torch.cat([
core_vec,
sentiment_raw,
intent_onehot
], dim=-1)
brain_input = self.global_brain_adapter(cognitive_state)
# Generate base response string
response = self.humanizer.generate(
embedding=brain_input,
context=text if text else "",
emotion=emotion["label"],
intent=self.intent_classifier.intent_labels[
intent_idx.item()
]
)
# ✅ Streaming yield
buffer = ""
for token in response.split():
buffer += token + " "
yield buffer.strip()
# ===== ASYNC WRAPPER =====
async def ask(
self,
text: Optional[str] = None,
image=None,
audio=None,
video=None,
system_prompt: Optional[str] = None
) -> Dict[str, Any]:
image_path = None
audio_path = None
video_path = None
# Save UploadFile to temp file if needed
async def save_temp(upload_file):
suffix = os.path.splitext(upload_file.filename)[-1]
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
content = await upload_file.read()
tmp.write(content)
tmp.close()
return tmp.name
if image:
image_path = await save_temp(image)
if audio:
audio_path = await save_temp(audio)
if video:
video_path = await save_temp(video)
# Merge system prompt into text
final_text = text
if system_prompt:
final_text = f"{system_prompt}\n\n{text}"
return self.predict(
text=final_text,
image=image_path,
audio=audio_path,
video=video_path
)