Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| ================================================================================ | |
| 🧠 MNEMOSYNE v4.3.3 - HuggingFace Space (HYBRID GPU/CPU) | |
| ================================================================================ | |
| Author: Mike Amega (Logo) - Ame Web Studio | |
| Date: 2024 | |
| DUAL LICENSE: | |
| - Open Source: Apache 2.0 (non-commercial use) | |
| - Commercial: Contact amewebstudio@gmail.com for enterprise licensing | |
| HYBRID MODE: | |
| ✅ Auto-detect ZeroGPU (HF Spaces with GPU) | |
| ✅ Auto-detect local CUDA | |
| ✅ Fallback to CPU seamlessly | |
| ✅ No code change needed when switching Space hardware | |
| ================================================================================ | |
| """ | |
| # ============================================================================== | |
| # 🚨 CRITICAL: Import spaces FIRST before torch/CUDA | |
| # ============================================================================== | |
| import os | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| # Detect environment and import spaces if available (MUST be before torch!) | |
| SPACES_AVAILABLE = False | |
| ZEROGPU_AVAILABLE = False | |
| gpu_decorator = None | |
| try: | |
| import spaces | |
| SPACES_AVAILABLE = True | |
| if os.environ.get("SPACE_ID"): | |
| ZEROGPU_AVAILABLE = True | |
| gpu_decorator = spaces.GPU | |
| except ImportError: | |
| pass | |
| # ============================================================================== | |
| # Now safe to import torch and other CUDA packages | |
| # ============================================================================== | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import gradio as gr | |
| import json | |
| import math | |
| import re | |
| import warnings | |
| from pathlib import Path | |
| from typing import Optional, Tuple, List, Callable | |
| from functools import wraps | |
| warnings.filterwarnings('ignore') | |
| # ============================================================================== | |
| # 🔧 RUNTIME CONFIGURATION | |
| # ============================================================================== | |
| class RuntimeConfig: | |
| """Configuration automatique de l'environnement""" | |
| def __init__(self): | |
| self.spaces_available = SPACES_AVAILABLE | |
| self.zerogpu_available = ZEROGPU_AVAILABLE | |
| self.cuda_available = torch.cuda.is_available() | |
| self.device = "cpu" | |
| self.gpu_decorator = gpu_decorator | |
| self._configure_device() | |
| def _configure_device(self): | |
| """Configure le device approprié""" | |
| if self.zerogpu_available: | |
| self.device = "cuda" | |
| print("🚀 ZeroGPU detected (HuggingFace Spaces)") | |
| print(" Mode: ZeroGPU (GPU allocated on demand)") | |
| elif self.cuda_available: | |
| self.device = "cuda" | |
| print(f"🖥️ Local CUDA detected: {torch.cuda.get_device_name(0)}") | |
| print(f" VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") | |
| else: | |
| self.device = "cpu" | |
| print("💻 CPU mode (no GPU detected)") | |
| print(f" Device: {self.device}") | |
| def gpu_task(self, duration: int = 120): | |
| """ | |
| Décorateur hybride pour les tâches GPU. | |
| - Sur ZeroGPU: utilise @spaces.GPU(duration=X) | |
| - Sur CUDA local: exécute directement sur GPU | |
| - Sur CPU: exécute sur CPU | |
| """ | |
| def decorator(func: Callable) -> Callable: | |
| if self.zerogpu_available and self.gpu_decorator: | |
| # ZeroGPU mode - wrap with spaces.GPU | |
| return self.gpu_decorator(duration=duration)(func) | |
| else: | |
| # Local mode - just return the function as-is | |
| def wrapper(*args, **kwargs): | |
| return func(*args, **kwargs) | |
| return wrapper | |
| return decorator | |
| def get_device(self) -> torch.device: | |
| """Retourne le device approprié""" | |
| return torch.device(self.device) | |
| def to_device(self, tensor_or_model): | |
| """Déplace un tensor ou modèle sur le bon device""" | |
| if hasattr(tensor_or_model, 'to'): | |
| return tensor_or_model.to(self.device) | |
| return tensor_or_model | |
| # Initialize runtime config | |
| runtime = RuntimeConfig() | |
| MODEL_ID = "amewebstudio/mnemosyne-multimodal-v4" | |
| print("=" * 60) | |
| print("🧠 MNEMOSYNE v4.3.3 - LOADING") | |
| print("=" * 60) | |
| # ============================================================================== | |
| # IMPORTS HUGGINGFACE | |
| # ============================================================================== | |
| from huggingface_hub import snapshot_download | |
| from safetensors.torch import load_file | |
| from transformers import AutoTokenizer, PreTrainedModel, PretrainedConfig | |
| from transformers.modeling_outputs import CausalLMOutputWithPast | |
| # ============================================================================== | |
| # WHISPER POUR AUDIO (chargement lazy) | |
| # ============================================================================== | |
| whisper_model = None | |
| whisper_processor = None | |
| def load_whisper(): | |
| """Charge Whisper de manière lazy pour économiser la mémoire""" | |
| global whisper_model, whisper_processor | |
| if whisper_model is None: | |
| try: | |
| from transformers import WhisperProcessor, WhisperForConditionalGeneration | |
| print("🎤 Loading Whisper...") | |
| whisper_processor = WhisperProcessor.from_pretrained("openai/whisper-small") | |
| whisper_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small") | |
| whisper_model.eval() | |
| print(" ✅ Whisper loaded") | |
| except Exception as e: | |
| print(f" ⚠️ Whisper failed: {e}") | |
| return whisper_model, whisper_processor | |
| # ============================================================================== | |
| # MODEL CLASSES | |
| # ============================================================================== | |
| class MnemosyneConfig(PretrainedConfig): | |
| model_type = "mnemosyne" | |
| def __init__( | |
| self, | |
| vocab_size: int = 128256, | |
| hidden_size: int = 3072, | |
| intermediate_size: int = 8192, | |
| num_hidden_layers: int = 28, | |
| num_attention_heads: int = 24, | |
| num_key_value_heads: int = 8, | |
| max_position_embeddings: int = 131072, | |
| rms_norm_eps: float = 1e-5, | |
| rope_theta: float = 500000.0, | |
| **kwargs | |
| ): | |
| self.vocab_size = vocab_size | |
| self.hidden_size = hidden_size | |
| self.intermediate_size = intermediate_size | |
| self.num_hidden_layers = num_hidden_layers | |
| self.num_attention_heads = num_attention_heads | |
| self.num_key_value_heads = num_key_value_heads | |
| self.max_position_embeddings = max_position_embeddings | |
| self.rms_norm_eps = rms_norm_eps | |
| self.rope_theta = rope_theta | |
| super().__init__(**kwargs) | |
| class RMSNorm(nn.Module): | |
| def __init__(self, hidden_size: int, eps: float = 1e-5): | |
| super().__init__() | |
| self.weight = nn.Parameter(torch.ones(hidden_size)) | |
| self.eps = eps | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| variance = x.float().pow(2).mean(-1, keepdim=True) | |
| x_normed = x.float() * torch.rsqrt(variance + self.eps) | |
| return (self.weight * x_normed).to(x.dtype) | |
| class RotaryEmbedding(nn.Module): | |
| def __init__(self, dim: int, base: float = 500000.0): | |
| super().__init__() | |
| inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float() / dim)) | |
| self.register_buffer("inv_freq", inv_freq) | |
| def forward(self, x: torch.Tensor, position_ids: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: | |
| freqs = torch.outer(position_ids[0].float(), self.inv_freq.to(x.device)) | |
| emb = torch.cat((freqs, freqs), dim=-1).unsqueeze(0).unsqueeze(0) | |
| return emb.cos().to(x.dtype), emb.sin().to(x.dtype) | |
| def rotate_half(x: torch.Tensor) -> torch.Tensor: | |
| x1, x2 = x[..., :x.shape[-1]//2], x[..., x.shape[-1]//2:] | |
| return torch.cat((-x2, x1), dim=-1) | |
| class Attention(nn.Module): | |
| def __init__(self, config: MnemosyneConfig, layer_idx: int): | |
| super().__init__() | |
| self.hidden_size = config.hidden_size | |
| self.num_heads = config.num_attention_heads | |
| self.head_dim = self.hidden_size // self.num_heads | |
| self.num_kv_heads = config.num_key_value_heads | |
| self.num_groups = self.num_heads // self.num_kv_heads | |
| self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False) | |
| self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False) | |
| self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False) | |
| self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False) | |
| self.rotary_emb = RotaryEmbedding(self.head_dim, config.rope_theta) | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| attention_mask: torch.Tensor, | |
| position_ids: torch.Tensor, | |
| past_key_value: Optional[Tuple[torch.Tensor, torch.Tensor]] = None, | |
| use_cache: bool = False | |
| ) -> Tuple[torch.Tensor, Optional[Tuple[torch.Tensor, torch.Tensor]]]: | |
| batch_size, seq_len, _ = hidden_states.size() | |
| q = self.q_proj(hidden_states).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2) | |
| k = self.k_proj(hidden_states).view(batch_size, seq_len, self.num_kv_heads, self.head_dim).transpose(1, 2) | |
| v = self.v_proj(hidden_states).view(batch_size, seq_len, self.num_kv_heads, self.head_dim).transpose(1, 2) | |
| cos, sin = self.rotary_emb(q, position_ids) | |
| q = (q * cos) + (rotate_half(q) * sin) | |
| k = (k * cos) + (rotate_half(k) * sin) | |
| if past_key_value is not None: | |
| k = torch.cat([past_key_value[0], k], dim=2) | |
| v = torch.cat([past_key_value[1], v], dim=2) | |
| new_kv = (k, v) if use_cache else None | |
| k = k.repeat_interleave(self.num_groups, dim=1) | |
| v = v.repeat_interleave(self.num_groups, dim=1) | |
| attn_weights = torch.matmul(q.float(), k.float().transpose(2, 3)) / math.sqrt(self.head_dim) | |
| attn_weights = attn_weights + attention_mask.float() | |
| attn_weights = F.softmax(attn_weights, dim=-1).to(hidden_states.dtype) | |
| attn_output = torch.matmul(attn_weights, v) | |
| attn_output = attn_output.transpose(1, 2).reshape(batch_size, seq_len, self.hidden_size) | |
| return self.o_proj(attn_output), new_kv | |
| class MLP(nn.Module): | |
| def __init__(self, config: MnemosyneConfig): | |
| super().__init__() | |
| self.gate_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False) | |
| self.up_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False) | |
| self.down_proj = nn.Linear(config.intermediate_size, config.hidden_size, bias=False) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| return self.down_proj(F.silu(self.gate_proj(x)) * self.up_proj(x)) | |
| class DecoderLayer(nn.Module): | |
| def __init__(self, config: MnemosyneConfig, layer_idx: int): | |
| super().__init__() | |
| self.self_attn = Attention(config, layer_idx) | |
| self.mlp = MLP(config) | |
| self.input_layernorm = RMSNorm(config.hidden_size, config.rms_norm_eps) | |
| self.post_attention_layernorm = RMSNorm(config.hidden_size, config.rms_norm_eps) | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| attention_mask: torch.Tensor, | |
| position_ids: torch.Tensor, | |
| past_key_value: Optional[Tuple[torch.Tensor, torch.Tensor]] = None, | |
| use_cache: bool = False | |
| ) -> Tuple[torch.Tensor, Optional[Tuple[torch.Tensor, torch.Tensor]]]: | |
| residual = hidden_states | |
| hidden_states = self.input_layernorm(hidden_states) | |
| hidden_states, new_kv = self.self_attn(hidden_states, attention_mask, position_ids, past_key_value, use_cache) | |
| hidden_states = residual + hidden_states | |
| residual = hidden_states | |
| hidden_states = self.post_attention_layernorm(hidden_states) | |
| hidden_states = residual + self.mlp(hidden_states) | |
| return hidden_states, new_kv | |
| class MnemosyneModel(nn.Module): | |
| def __init__(self, config: MnemosyneConfig): | |
| super().__init__() | |
| self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size) | |
| self.layers = nn.ModuleList([DecoderLayer(config, i) for i in range(config.num_hidden_layers)]) | |
| self.norm = RMSNorm(config.hidden_size, config.rms_norm_eps) | |
| def forward( | |
| self, | |
| input_ids: torch.Tensor, | |
| past_key_values: Optional[List[Tuple[torch.Tensor, torch.Tensor]]] = None, | |
| use_cache: bool = False | |
| ) -> Tuple[torch.Tensor, Optional[List[Tuple[torch.Tensor, torch.Tensor]]]]: | |
| hidden_states = self.embed_tokens(input_ids) | |
| batch_size, seq_len = input_ids.shape | |
| past_len = past_key_values[0][0].shape[2] if past_key_values else 0 | |
| position_ids = torch.arange(past_len, past_len + seq_len, device=input_ids.device).unsqueeze(0) | |
| attention_mask = torch.triu( | |
| torch.full((seq_len, seq_len), float("-inf"), device=input_ids.device), | |
| diagonal=1 | |
| ).unsqueeze(0).unsqueeze(0) | |
| new_kvs = [] if use_cache else None | |
| for i, layer in enumerate(self.layers): | |
| past_kv = past_key_values[i] if past_key_values else None | |
| hidden_states, new_kv = layer(hidden_states, attention_mask, position_ids, past_kv, use_cache) | |
| if use_cache: | |
| new_kvs.append(new_kv) | |
| return self.norm(hidden_states), new_kvs | |
| class MnemosyneLM(PreTrainedModel): | |
| config_class = MnemosyneConfig | |
| def __init__(self, config: MnemosyneConfig): | |
| super().__init__(config) | |
| self.model = MnemosyneModel(config) | |
| self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) | |
| def forward( | |
| self, | |
| input_ids: torch.Tensor, | |
| past_key_values: Optional[List[Tuple[torch.Tensor, torch.Tensor]]] = None, | |
| use_cache: bool = False, | |
| **kwargs | |
| ) -> CausalLMOutputWithPast: | |
| hidden_states, new_kvs = self.model(input_ids, past_key_values, use_cache) | |
| logits = self.lm_head(hidden_states) | |
| return CausalLMOutputWithPast(logits=logits, past_key_values=new_kvs) | |
| def generate( | |
| self, | |
| input_ids: torch.Tensor, | |
| max_new_tokens: int = 512, | |
| temperature: float = 0.7, | |
| top_p: float = 0.9, | |
| eos_token_id: Optional[int] = None | |
| ) -> torch.Tensor: | |
| past_key_values = None | |
| generated = input_ids | |
| for _ in range(max_new_tokens): | |
| inp = generated if past_key_values is None else generated[:, -1:] | |
| outputs = self(inp, past_key_values=past_key_values, use_cache=True) | |
| logits = outputs.logits[:, -1, :] / temperature | |
| past_key_values = outputs.past_key_values | |
| sorted_logits, sorted_indices = torch.sort(logits, descending=True) | |
| cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) | |
| sorted_indices_to_remove = cumulative_probs > top_p | |
| sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() | |
| sorted_indices_to_remove[..., 0] = 0 | |
| indices_to_remove = sorted_indices_to_remove.scatter(1, sorted_indices, sorted_indices_to_remove) | |
| logits[indices_to_remove] = float("-inf") | |
| next_token = torch.multinomial(F.softmax(logits, dim=-1), num_samples=1) | |
| generated = torch.cat([generated, next_token], dim=-1) | |
| if eos_token_id is not None and (next_token == eos_token_id).all(): | |
| break | |
| return generated | |
| # ============================================================================== | |
| # SYMBOLIC CALCULATOR | |
| # ============================================================================== | |
| class SymbolicCalculator: | |
| """Calculatrice symbolique avec SymPy""" | |
| def __init__(self): | |
| self.available = False | |
| try: | |
| import sympy | |
| self.sympy = sympy | |
| self.available = True | |
| print(" ✅ SymPy loaded - symbolic math enabled") | |
| except ImportError: | |
| print(" ⚠️ SymPy not available") | |
| def solve(self, expression: str) -> str: | |
| if not self.available: | |
| return "" | |
| try: | |
| expression = expression.strip() | |
| # Simple arithmetic | |
| if re.match(r'^[\d\s\+\-\*\/\(\)\.\^]+$', expression): | |
| expr = expression.replace('^', '**') | |
| result = eval(expr) | |
| return f"{expression} = {result}" | |
| # Symbolic | |
| expr_clean = re.sub(r'[=\?].*', '', expression).strip() | |
| # Variables | |
| variables = set(re.findall(r'[a-zA-Z]', expr_clean)) | |
| if variables: | |
| symbols = {v: self.sympy.Symbol(v) for v in variables} | |
| expr_sympy = expr_clean.replace('^', '**') | |
| for var, sym in symbols.items(): | |
| expr_sympy = re.sub(rf'(?<![a-zA-Z]){var}(?![a-zA-Z])', f'symbols["{var}"]', expr_sympy) | |
| result = eval(expr_sympy) | |
| simplified = self.sympy.simplify(result) | |
| return f"{expr_clean} = {simplified}" | |
| return "" | |
| except Exception: | |
| return "" | |
| calculator = SymbolicCalculator() | |
| # ============================================================================== | |
| # LOAD MODEL | |
| # ============================================================================== | |
| print("📦 Loading model...") | |
| model_path = Path(snapshot_download(MODEL_ID)) | |
| with open(model_path / "config.json") as f: | |
| cfg = json.load(f) | |
| tokenizer = AutoTokenizer.from_pretrained(model_path) | |
| tokenizer.pad_token = tokenizer.eos_token | |
| config = MnemosyneConfig( | |
| vocab_size=cfg.get("vocab_size", 128256), | |
| hidden_size=cfg.get("hidden_size", 3072), | |
| intermediate_size=cfg.get("intermediate_size", 8192), | |
| num_hidden_layers=cfg.get("num_hidden_layers", 28), | |
| num_attention_heads=cfg.get("num_attention_heads", 24), | |
| num_key_value_heads=cfg.get("num_key_value_heads", 8), | |
| max_position_embeddings=cfg.get("max_position_embeddings", 131072), | |
| rms_norm_eps=cfg.get("rms_norm_eps", 1e-5), | |
| rope_theta=cfg.get("rope_theta", 500000.0), | |
| ) | |
| model = MnemosyneLM(config) | |
| # Load weights | |
| idx_path = model_path / "model.safetensors.index.json" | |
| if idx_path.exists(): | |
| with open(idx_path) as f: | |
| index = json.load(f) | |
| weights = {} | |
| for sf in set(index["weight_map"].values()): | |
| print(f" Loading {sf}...") | |
| weights.update(load_file(model_path / sf)) | |
| # Map weights | |
| state_dict = {} | |
| for k, v in weights.items(): | |
| if "backbone" in k: | |
| new_key = k.replace("mnemosyne.backbone.", "") | |
| state_dict[new_key] = v | |
| model.load_state_dict(state_dict, strict=False) | |
| # Keep model on CPU - ZeroGPU or gpu_task will handle device placement | |
| model = model.float().eval() | |
| print(f" Model loaded (will use {runtime.device} for inference)") | |
| print("✅ Model ready!") | |
| # Load facts | |
| facts = {} | |
| for p in ["cognitive_states.pt", "states.pt"]: | |
| if (model_path / p).exists(): | |
| try: | |
| data = torch.load(model_path / p, map_location="cpu", weights_only=False) | |
| facts = data.get("facts", {}) | |
| break | |
| except: | |
| pass | |
| print(f" {len(facts)} facts loaded") | |
| # ============================================================================== | |
| # AUDIO TRANSCRIPTION | |
| # ============================================================================== | |
| def transcribe_audio(audio_path: str) -> str: | |
| """Transcrit l'audio avec Whisper""" | |
| if audio_path is None: | |
| return "" | |
| try: | |
| import librosa | |
| wm, wp = load_whisper() | |
| if wm is None: | |
| return "[Whisper non disponible]" | |
| audio, sr = librosa.load(audio_path, sr=16000) | |
| inputs = wp(audio, sampling_rate=16000, return_tensors="pt") | |
| with torch.no_grad(): | |
| predicted_ids = wm.generate(inputs.input_features, max_new_tokens=256) | |
| transcription = wp.batch_decode(predicted_ids, skip_special_tokens=True)[0] | |
| return transcription.strip() | |
| except Exception as e: | |
| return f"[Erreur transcription: {e}]" | |
| # ============================================================================== | |
| # CHAT FUNCTION WITH HYBRID GPU DECORATOR | |
| # ============================================================================== | |
| def generate_response(prompt: str, max_tokens: int = 512) -> str: | |
| """Génère une réponse - GPU ou CPU selon l'environnement""" | |
| try: | |
| # Move model to appropriate device inside the function | |
| if runtime.zerogpu_available: | |
| model.to("cuda") | |
| dev = torch.device("cuda") | |
| elif runtime.cuda_available: | |
| model.to("cuda") | |
| dev = torch.device("cuda") | |
| else: | |
| dev = torch.device("cpu") | |
| inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=4096) | |
| input_ids = inputs.input_ids.to(dev) | |
| output = model.generate( | |
| input_ids, | |
| max_new_tokens=max_tokens, | |
| temperature=0.7, | |
| top_p=0.9, | |
| eos_token_id=tokenizer.eos_token_id | |
| ) | |
| response = tokenizer.decode(output[0][input_ids.shape[1]:], skip_special_tokens=True) | |
| return response.strip() | |
| except Exception as e: | |
| return f"Erreur: {e}" | |
| def build_prompt(message: str, chat_history: List[Tuple[str, str]]) -> str: | |
| """Construit le prompt avec l'historique""" | |
| sys_prompt = "Tu es Mnemosyne, une IA cognitive avancée créée par Mike Amega (Ame Web Studio).\n" | |
| sys_prompt += "Tu réponds de manière intelligente, précise et naturelle.\n" | |
| if facts: | |
| facts_str = ", ".join([f"{k}={v['value'] if isinstance(v, dict) else v}" for k, v in list(facts.items())[:10]]) | |
| sys_prompt += f"Faits mémorisés: {facts_str}\n" | |
| prompt = f"<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n{sys_prompt}<|eot_id|>" | |
| # Last 5 turns | |
| for user_msg, bot_msg in chat_history[-5:]: | |
| if user_msg: | |
| prompt += f"<|start_header_id|>user<|end_header_id|>\n\n{user_msg}<|eot_id|>" | |
| if bot_msg: | |
| prompt += f"<|start_header_id|>assistant<|end_header_id|>\n\n{bot_msg}<|eot_id|>" | |
| prompt += f"<|start_header_id|>user<|end_header_id|>\n\n{message}<|eot_id|>" | |
| prompt += "<|start_header_id|>assistant<|end_header_id|>\n\n" | |
| return prompt | |
| def process_message(message: str) -> str: | |
| """Traite le message (calculs, etc.)""" | |
| math_patterns = [ | |
| r'\d+\s*[\+\-\*\/\^]\s*\d+', | |
| r'[a-zA-Z]\s*[\+\-\*\/]\s*[a-zA-Z]', | |
| r'calcul', | |
| r'combien', | |
| r'\=' | |
| ] | |
| for pattern in math_patterns: | |
| if re.search(pattern, message.lower()): | |
| expr_match = re.search(r'([\d\w\s\+\-\*\/\^\(\)=]+)', message) | |
| if expr_match: | |
| result = calculator.solve(expr_match.group(1)) | |
| if result: | |
| return result | |
| return "" | |
| def respond(message: str, chat_history: List[Tuple[str, str]], max_tokens: int = 512): | |
| """Fonction principale de réponse""" | |
| if not message or not message.strip(): | |
| return "", chat_history | |
| message = message.strip() | |
| # Process math | |
| math_result = process_message(message) | |
| # Build prompt | |
| prompt = build_prompt(message, chat_history) | |
| # Generate | |
| response = generate_response(prompt, max_tokens) | |
| # Add math result if available | |
| if math_result and math_result not in response: | |
| response = f"{math_result}\n\n{response}" | |
| chat_history.append((message, response)) | |
| return "", chat_history | |
| def respond_with_audio( | |
| message: str, | |
| audio: Optional[str], | |
| chat_history: List[Tuple[str, str]], | |
| max_tokens: int = 512 | |
| ): | |
| """Répond avec texte ou audio""" | |
| # Transcribe audio if provided | |
| if audio: | |
| transcription = transcribe_audio(audio) | |
| if transcription and not transcription.startswith("["): | |
| message = transcription | |
| if not message or not message.strip(): | |
| return "", None, chat_history | |
| _, updated_history = respond(message, chat_history, max_tokens) | |
| return "", None, updated_history | |
| # ============================================================================== | |
| # GRADIO INTERFACE | |
| # ============================================================================== | |
| def get_status_message() -> str: | |
| """Message de statut selon l'environnement""" | |
| if runtime.zerogpu_available: | |
| return "⚡ ZeroGPU: 120s max | 🎤 Parlez ou tapez" | |
| elif runtime.cuda_available: | |
| gpu_name = torch.cuda.get_device_name(0) | |
| return f"🖥️ GPU: {gpu_name} | 🎤 Parlez ou tapez" | |
| else: | |
| return "💻 CPU mode (~30-60s) | 🎤 Parlez ou tapez" | |
| css = """ | |
| .container { max-width: 900px; margin: auto; } | |
| .chatbot { min-height: 400px; } | |
| footer { visibility: hidden; } | |
| """ | |
| with gr.Blocks(title="Mnemosyne v4.3.3", css=css, theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(f""" | |
| # 🧠 Mnemosyne v4.3.3 | |
| *IA cognitive par Mike Amega - Ame Web Studio* | |
| **Features:** Audio input (auto-send) • Symbolic Math • Memory System | |
| {get_status_message()} | |
| """) | |
| chatbot = gr.Chatbot( | |
| label="Conversation", | |
| height=450, | |
| show_copy_button=True, | |
| elem_classes=["chatbot"] | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| msg = gr.Textbox( | |
| label="Message", | |
| placeholder="Tapez votre message ici...", | |
| lines=2, | |
| show_label=False | |
| ) | |
| with gr.Column(scale=1): | |
| audio_input = gr.Audio( | |
| sources=["microphone"], | |
| type="filepath", | |
| label="🎤 Audio", | |
| show_label=True | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| max_tokens = gr.Slider( | |
| minimum=64, | |
| maximum=2048, | |
| value=512, | |
| step=64, | |
| label="Max tokens" | |
| ) | |
| with gr.Column(scale=1): | |
| send_btn = gr.Button("📤 Envoyer", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| clear_btn = gr.Button("🗑️ Effacer", size="lg") | |
| gr.Markdown(""" | |
| --- | |
| 📜 **License:** Apache 2.0 (non-commercial) | Commercial: amewebstudio@gmail.com | |
| """) | |
| # Event handlers | |
| # Text submit | |
| msg.submit( | |
| fn=respond, | |
| inputs=[msg, chatbot, max_tokens], | |
| outputs=[msg, chatbot] | |
| ) | |
| # Button click | |
| send_btn.click( | |
| fn=respond_with_audio, | |
| inputs=[msg, audio_input, chatbot, max_tokens], | |
| outputs=[msg, audio_input, chatbot] | |
| ) | |
| # Audio auto-send when recording stops | |
| audio_input.stop_recording( | |
| fn=respond_with_audio, | |
| inputs=[msg, audio_input, chatbot, max_tokens], | |
| outputs=[msg, audio_input, chatbot] | |
| ) | |
| # Clear | |
| clear_btn.click( | |
| fn=lambda: ([], "", None), | |
| inputs=None, | |
| outputs=[chatbot, msg, audio_input] | |
| ) | |
| # Launch | |
| if __name__ == "__main__": | |
| demo.queue() | |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=False) |