#!/usr/bin/env python3 """ ================================================================================ 🧠 MNEMOSYNE v4.3.3 - HuggingFace Space (CPU MODE) ================================================================================ Author: Mike Amega (Logo) - Ame Web Studio Date: 2024 DUAL LICENSE: - Open Source: Apache 2.0 (non-commercial use) - Commercial: Contact amewebstudio@gmail.com for enterprise licensing CPU MODE: ✅ Force CPU execution (no ZeroGPU quota issues) ✅ Auto-detect local CUDA if available ✅ No quota limitations ================================================================================ """ # ============================================================================== # 🚨 No ZeroGPU - CPU mode to avoid quota issues # ============================================================================== import os os.environ["TOKENIZERS_PARALLELISM"] = "false" # ============================================================================== # Now safe to import torch and other CUDA packages # ============================================================================== import torch import torch.nn as nn import torch.nn.functional as F import gradio as gr import json import math import re import warnings from pathlib import Path from typing import Optional, Tuple, List warnings.filterwarnings('ignore') # ============================================================================== # 🔧 RUNTIME CONFIGURATION # ============================================================================== class RuntimeConfig: """Configuration automatique de l'environnement - CPU mode (pas de ZeroGPU)""" def __init__(self): self.cuda_available = torch.cuda.is_available() self.device = "cpu" self._configure_device() def _configure_device(self): """Configure le device approprié - CPU ou CUDA local uniquement""" if self.cuda_available: self.device = "cuda" print(f"🖥️ Local CUDA detected: {torch.cuda.get_device_name(0)}") print(f" VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") else: self.device = "cpu" print("💻 CPU mode (no GPU detected)") print(f" Device: {self.device}") def get_device(self) -> torch.device: """Retourne le device approprié""" return torch.device(self.device) def to_device(self, tensor_or_model): """Déplace un tensor ou modèle sur le bon device""" if hasattr(tensor_or_model, 'to'): return tensor_or_model.to(self.device) return tensor_or_model # Initialize runtime config runtime = RuntimeConfig() MODEL_ID = "amewebstudio/mnemosyne-multimodal-v4" print("=" * 60) print("🧠 MNEMOSYNE v4.3.3 - LOADING") print("=" * 60) # ============================================================================== # IMPORTS HUGGINGFACE # ============================================================================== from huggingface_hub import snapshot_download from safetensors.torch import load_file from transformers import AutoTokenizer, PreTrainedModel, PretrainedConfig from transformers.modeling_outputs import CausalLMOutputWithPast # ============================================================================== # WHISPER POUR AUDIO (chargement lazy) # ============================================================================== whisper_model = None whisper_processor = None def load_whisper(): """Charge Whisper de manière lazy pour économiser la mémoire""" global whisper_model, whisper_processor if whisper_model is None: try: from transformers import WhisperProcessor, WhisperForConditionalGeneration print("🎤 Loading Whisper...") whisper_processor = WhisperProcessor.from_pretrained("openai/whisper-small") whisper_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small") whisper_model.eval() print(" ✅ Whisper loaded") except Exception as e: print(f" ⚠️ Whisper failed: {e}") return whisper_model, whisper_processor # ============================================================================== # MODEL CLASSES # ============================================================================== class MnemosyneConfig(PretrainedConfig): model_type = "mnemosyne" def __init__( self, vocab_size: int = 128256, hidden_size: int = 3072, intermediate_size: int = 8192, num_hidden_layers: int = 28, num_attention_heads: int = 24, num_key_value_heads: int = 8, max_position_embeddings: int = 131072, rms_norm_eps: float = 1e-5, rope_theta: float = 500000.0, **kwargs ): self.vocab_size = vocab_size self.hidden_size = hidden_size self.intermediate_size = intermediate_size self.num_hidden_layers = num_hidden_layers self.num_attention_heads = num_attention_heads self.num_key_value_heads = num_key_value_heads self.max_position_embeddings = max_position_embeddings self.rms_norm_eps = rms_norm_eps self.rope_theta = rope_theta super().__init__(**kwargs) class RMSNorm(nn.Module): def __init__(self, hidden_size: int, eps: float = 1e-5): super().__init__() self.weight = nn.Parameter(torch.ones(hidden_size)) self.eps = eps def forward(self, x: torch.Tensor) -> torch.Tensor: variance = x.float().pow(2).mean(-1, keepdim=True) x_normed = x.float() * torch.rsqrt(variance + self.eps) return (self.weight * x_normed).to(x.dtype) class RotaryEmbedding(nn.Module): def __init__(self, dim: int, base: float = 500000.0): super().__init__() inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float() / dim)) self.register_buffer("inv_freq", inv_freq) def forward(self, x: torch.Tensor, position_ids: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: freqs = torch.outer(position_ids[0].float(), self.inv_freq.to(x.device)) emb = torch.cat((freqs, freqs), dim=-1).unsqueeze(0).unsqueeze(0) return emb.cos().to(x.dtype), emb.sin().to(x.dtype) def rotate_half(x: torch.Tensor) -> torch.Tensor: x1, x2 = x[..., :x.shape[-1]//2], x[..., x.shape[-1]//2:] return torch.cat((-x2, x1), dim=-1) class Attention(nn.Module): def __init__(self, config: MnemosyneConfig, layer_idx: int): super().__init__() self.hidden_size = config.hidden_size self.num_heads = config.num_attention_heads self.head_dim = self.hidden_size // self.num_heads self.num_kv_heads = config.num_key_value_heads self.num_groups = self.num_heads // self.num_kv_heads self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False) self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False) self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False) self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False) self.rotary_emb = RotaryEmbedding(self.head_dim, config.rope_theta) def forward( self, hidden_states: torch.Tensor, attention_mask: torch.Tensor, position_ids: torch.Tensor, past_key_value: Optional[Tuple[torch.Tensor, torch.Tensor]] = None, use_cache: bool = False ) -> Tuple[torch.Tensor, Optional[Tuple[torch.Tensor, torch.Tensor]]]: batch_size, seq_len, _ = hidden_states.size() q = self.q_proj(hidden_states).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2) k = self.k_proj(hidden_states).view(batch_size, seq_len, self.num_kv_heads, self.head_dim).transpose(1, 2) v = self.v_proj(hidden_states).view(batch_size, seq_len, self.num_kv_heads, self.head_dim).transpose(1, 2) cos, sin = self.rotary_emb(q, position_ids) q = (q * cos) + (rotate_half(q) * sin) k = (k * cos) + (rotate_half(k) * sin) if past_key_value is not None: k = torch.cat([past_key_value[0], k], dim=2) v = torch.cat([past_key_value[1], v], dim=2) new_kv = (k, v) if use_cache else None k = k.repeat_interleave(self.num_groups, dim=1) v = v.repeat_interleave(self.num_groups, dim=1) attn_weights = torch.matmul(q.float(), k.float().transpose(2, 3)) / math.sqrt(self.head_dim) attn_weights = attn_weights + attention_mask.float() attn_weights = F.softmax(attn_weights, dim=-1).to(hidden_states.dtype) attn_output = torch.matmul(attn_weights, v) attn_output = attn_output.transpose(1, 2).reshape(batch_size, seq_len, self.hidden_size) return self.o_proj(attn_output), new_kv class MLP(nn.Module): def __init__(self, config: MnemosyneConfig): super().__init__() self.gate_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False) self.up_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False) self.down_proj = nn.Linear(config.intermediate_size, config.hidden_size, bias=False) def forward(self, x: torch.Tensor) -> torch.Tensor: return self.down_proj(F.silu(self.gate_proj(x)) * self.up_proj(x)) class DecoderLayer(nn.Module): def __init__(self, config: MnemosyneConfig, layer_idx: int): super().__init__() self.self_attn = Attention(config, layer_idx) self.mlp = MLP(config) self.input_layernorm = RMSNorm(config.hidden_size, config.rms_norm_eps) self.post_attention_layernorm = RMSNorm(config.hidden_size, config.rms_norm_eps) def forward( self, hidden_states: torch.Tensor, attention_mask: torch.Tensor, position_ids: torch.Tensor, past_key_value: Optional[Tuple[torch.Tensor, torch.Tensor]] = None, use_cache: bool = False ) -> Tuple[torch.Tensor, Optional[Tuple[torch.Tensor, torch.Tensor]]]: residual = hidden_states hidden_states = self.input_layernorm(hidden_states) hidden_states, new_kv = self.self_attn(hidden_states, attention_mask, position_ids, past_key_value, use_cache) hidden_states = residual + hidden_states residual = hidden_states hidden_states = self.post_attention_layernorm(hidden_states) hidden_states = residual + self.mlp(hidden_states) return hidden_states, new_kv class MnemosyneModel(nn.Module): def __init__(self, config: MnemosyneConfig): super().__init__() self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size) self.layers = nn.ModuleList([DecoderLayer(config, i) for i in range(config.num_hidden_layers)]) self.norm = RMSNorm(config.hidden_size, config.rms_norm_eps) def forward( self, input_ids: torch.Tensor, past_key_values: Optional[List[Tuple[torch.Tensor, torch.Tensor]]] = None, use_cache: bool = False ) -> Tuple[torch.Tensor, Optional[List[Tuple[torch.Tensor, torch.Tensor]]]]: hidden_states = self.embed_tokens(input_ids) batch_size, seq_len = input_ids.shape past_len = past_key_values[0][0].shape[2] if past_key_values else 0 position_ids = torch.arange(past_len, past_len + seq_len, device=input_ids.device).unsqueeze(0) attention_mask = torch.triu( torch.full((seq_len, seq_len), float("-inf"), device=input_ids.device), diagonal=1 ).unsqueeze(0).unsqueeze(0) new_kvs = [] if use_cache else None for i, layer in enumerate(self.layers): past_kv = past_key_values[i] if past_key_values else None hidden_states, new_kv = layer(hidden_states, attention_mask, position_ids, past_kv, use_cache) if use_cache: new_kvs.append(new_kv) return self.norm(hidden_states), new_kvs class MnemosyneLM(PreTrainedModel): config_class = MnemosyneConfig def __init__(self, config: MnemosyneConfig): super().__init__(config) self.model = MnemosyneModel(config) self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) def forward( self, input_ids: torch.Tensor, past_key_values: Optional[List[Tuple[torch.Tensor, torch.Tensor]]] = None, use_cache: bool = False, **kwargs ) -> CausalLMOutputWithPast: hidden_states, new_kvs = self.model(input_ids, past_key_values, use_cache) logits = self.lm_head(hidden_states) return CausalLMOutputWithPast(logits=logits, past_key_values=new_kvs) @torch.no_grad() def generate( self, input_ids: torch.Tensor, max_new_tokens: int = 512, temperature: float = 0.7, top_p: float = 0.9, eos_token_id: Optional[int] = None ) -> torch.Tensor: past_key_values = None generated = input_ids for _ in range(max_new_tokens): inp = generated if past_key_values is None else generated[:, -1:] outputs = self(inp, past_key_values=past_key_values, use_cache=True) logits = outputs.logits[:, -1, :] / temperature past_key_values = outputs.past_key_values sorted_logits, sorted_indices = torch.sort(logits, descending=True) cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) sorted_indices_to_remove = cumulative_probs > top_p sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() sorted_indices_to_remove[..., 0] = 0 indices_to_remove = sorted_indices_to_remove.scatter(1, sorted_indices, sorted_indices_to_remove) logits[indices_to_remove] = float("-inf") next_token = torch.multinomial(F.softmax(logits, dim=-1), num_samples=1) generated = torch.cat([generated, next_token], dim=-1) if eos_token_id is not None and (next_token == eos_token_id).all(): break return generated # ============================================================================== # SYMBOLIC CALCULATOR # ============================================================================== class SymbolicCalculator: """Calculatrice symbolique avec SymPy""" def __init__(self): self.available = False try: import sympy self.sympy = sympy self.available = True print(" ✅ SymPy loaded - symbolic math enabled") except ImportError: print(" ⚠️ SymPy not available") def solve(self, expression: str) -> str: if not self.available: return "" try: expression = expression.strip() # Simple arithmetic if re.match(r'^[\d\s\+\-\*\/\(\)\.\^]+$', expression): expr = expression.replace('^', '**') result = eval(expr) return f"{expression} = {result}" # Symbolic expr_clean = re.sub(r'[=\?].*', '', expression).strip() # Variables variables = set(re.findall(r'[a-zA-Z]', expr_clean)) if variables: symbols = {v: self.sympy.Symbol(v) for v in variables} expr_sympy = expr_clean.replace('^', '**') for var, sym in symbols.items(): expr_sympy = re.sub(rf'(? str: """Transcrit l'audio avec Whisper""" if audio_path is None: return "" try: import librosa wm, wp = load_whisper() if wm is None: return "[Whisper non disponible]" audio, sr = librosa.load(audio_path, sr=16000) inputs = wp(audio, sampling_rate=16000, return_tensors="pt") with torch.no_grad(): predicted_ids = wm.generate(inputs.input_features, max_new_tokens=256) transcription = wp.batch_decode(predicted_ids, skip_special_tokens=True)[0] return transcription.strip() except Exception as e: return f"[Erreur transcription: {e}]" # ============================================================================== # CHAT FUNCTION (CPU MODE - no ZeroGPU decorator) # ============================================================================== def generate_response(prompt: str, max_tokens: int = 512) -> str: """Génère une réponse - CPU ou CUDA local selon l'environnement""" try: # Use the configured device (cpu or local cuda) dev = runtime.get_device() model.to(dev) inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=4096) input_ids = inputs.input_ids.to(dev) output = model.generate( input_ids, max_new_tokens=max_tokens, temperature=0.7, top_p=0.9, eos_token_id=tokenizer.eos_token_id ) response = tokenizer.decode(output[0][input_ids.shape[1]:], skip_special_tokens=True) return response.strip() except Exception as e: return f"Erreur: {e}" def build_prompt(message: str, chat_history: List[Tuple[str, str]]) -> str: """Construit le prompt avec l'historique""" sys_prompt = "Tu es Mnemosyne, une IA cognitive avancée créée par Mike Amega (Ame Web Studio).\n" sys_prompt += "Tu réponds de manière intelligente, précise et naturelle.\n" if facts: facts_str = ", ".join([f"{k}={v['value'] if isinstance(v, dict) else v}" for k, v in list(facts.items())[:10]]) sys_prompt += f"Faits mémorisés: {facts_str}\n" prompt = f"<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n{sys_prompt}<|eot_id|>" # Last 5 turns for user_msg, bot_msg in chat_history[-5:]: if user_msg: prompt += f"<|start_header_id|>user<|end_header_id|>\n\n{user_msg}<|eot_id|>" if bot_msg: prompt += f"<|start_header_id|>assistant<|end_header_id|>\n\n{bot_msg}<|eot_id|>" prompt += f"<|start_header_id|>user<|end_header_id|>\n\n{message}<|eot_id|>" prompt += "<|start_header_id|>assistant<|end_header_id|>\n\n" return prompt def process_message(message: str) -> str: """Traite le message (calculs, etc.)""" math_patterns = [ r'\d+\s*[\+\-\*\/\^]\s*\d+', r'[a-zA-Z]\s*[\+\-\*\/]\s*[a-zA-Z]', r'calcul', r'combien', r'\=' ] for pattern in math_patterns: if re.search(pattern, message.lower()): expr_match = re.search(r'([\d\w\s\+\-\*\/\^\(\)=]+)', message) if expr_match: result = calculator.solve(expr_match.group(1)) if result: return result return "" def respond(message: str, chat_history: List[Tuple[str, str]], max_tokens: int = 512): """Fonction principale de réponse""" if not message or not message.strip(): return "", chat_history message = message.strip() # Process math math_result = process_message(message) # Build prompt prompt = build_prompt(message, chat_history) # Generate response = generate_response(prompt, max_tokens) # Add math result if available if math_result and math_result not in response: response = f"{math_result}\n\n{response}" chat_history.append((message, response)) return "", chat_history def respond_with_audio( message: str, audio: Optional[str], chat_history: List[Tuple[str, str]], max_tokens: int = 512 ): """Répond avec texte ou audio""" # Transcribe audio if provided if audio: transcription = transcribe_audio(audio) if transcription and not transcription.startswith("["): message = transcription if not message or not message.strip(): return "", None, chat_history _, updated_history = respond(message, chat_history, max_tokens) return "", None, updated_history # ============================================================================== # GRADIO INTERFACE # ============================================================================== def get_status_message() -> str: """Message de statut selon l'environnement""" if runtime.cuda_available: gpu_name = torch.cuda.get_device_name(0) return f"🖥️ GPU: {gpu_name} | 🎤 Parlez ou tapez" else: return "💻 CPU mode (~30-60s) | 🎤 Parlez ou tapez" css = """ .container { max-width: 900px; margin: auto; } .chatbot { min-height: 400px; } footer { visibility: hidden; } """ with gr.Blocks(title="Mnemosyne v4.3.3", css=css, theme=gr.themes.Soft()) as demo: gr.Markdown(f""" # 🧠 Mnemosyne v4.3.3 *IA cognitive par Mike Amega - Ame Web Studio* **Features:** Audio input (auto-send) • Symbolic Math • Memory System {get_status_message()} """) chatbot = gr.Chatbot( label="Conversation", height=450, show_copy_button=True, elem_classes=["chatbot"] ) with gr.Row(): with gr.Column(scale=4): msg = gr.Textbox( label="Message", placeholder="Tapez votre message ici...", lines=2, show_label=False ) with gr.Column(scale=1): audio_input = gr.Audio( sources=["microphone"], type="filepath", label="🎤 Audio", show_label=True ) with gr.Row(): with gr.Column(scale=1): max_tokens = gr.Slider( minimum=64, maximum=2048, value=512, step=64, label="Max tokens" ) with gr.Column(scale=1): send_btn = gr.Button("📤 Envoyer", variant="primary", size="lg") with gr.Column(scale=1): clear_btn = gr.Button("🗑️ Effacer", size="lg") gr.Markdown(""" --- 📜 **License:** Apache 2.0 (non-commercial) | Commercial: amewebstudio@gmail.com """) # Event handlers # Text submit msg.submit( fn=respond, inputs=[msg, chatbot, max_tokens], outputs=[msg, chatbot] ) # Button click send_btn.click( fn=respond_with_audio, inputs=[msg, audio_input, chatbot, max_tokens], outputs=[msg, audio_input, chatbot] ) # Audio auto-send when recording stops audio_input.stop_recording( fn=respond_with_audio, inputs=[msg, audio_input, chatbot, max_tokens], outputs=[msg, audio_input, chatbot] ) # Clear clear_btn.click( fn=lambda: ([], "", None), inputs=None, outputs=[chatbot, msg, audio_input] ) # Launch if __name__ == "__main__": demo.queue() demo.launch(server_name="0.0.0.0", server_port=7860, share=False)