Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| ╔══════════════════════════════════════════════════════════════════════════════╗ | |
| ║ MNEMOSYNE v4.3.3 — Interface Web avec Voix ║ | |
| ║ Créé par Mike Amega (Logo) — Ame Web Studio ║ | |
| ║ https://amewebstudio.com ║ | |
| ╚══════════════════════════════════════════════════════════════════════════════╝ | |
| """ | |
| import os | |
| import gc | |
| import torch | |
| import gradio as gr | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
| from threading import Thread | |
| # ============================================================================ | |
| # CONFIGURATION | |
| # ============================================================================ | |
| MODEL_ID = "amewebstudio/mnemosyne-v4-finetuned" | |
| MAX_NEW_TOKENS = 512 | |
| TEMPERATURE = 0.7 | |
| TOP_P = 0.9 | |
| SYSTEM_PROMPT = """Tu es Mnemosyne v4.3.3, une IA cognitive avancée créée par Mike Amega (Logo) de Ame Web Studio. | |
| IDENTITÉ: | |
| - Nom: Mnemosyne v4.3.3 | |
| - Créateur: Mike Amega (Logo) | |
| - Studio: Ame Web Studio (amewebstudio.com) | |
| STYLE: | |
| - Utilise <thinking>...</thinking> pour ton raisonnement interne quand nécessaire | |
| - Sois précise, structurée et pédagogue | |
| - Réponds en français par défaut, sauf si l'utilisateur parle une autre langue""" | |
| # ============================================================================ | |
| # CHARGEMENT DU MODÈLE | |
| # ============================================================================ | |
| print("🧠 Chargement de Mnemosyne v4.3.3...") | |
| print(" (Cela peut prendre quelques minutes sur CPU)") | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| # Charger pour CPU | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| torch_dtype=torch.float32, # float32 pour CPU | |
| device_map="cpu", | |
| low_cpu_mem_usage=True, | |
| trust_remote_code=True, | |
| ) | |
| model.eval() | |
| print("✅ Mnemosyne prête!") | |
| # ============================================================================ | |
| # FONCTIONS DE CHAT | |
| # ============================================================================ | |
| def format_messages(history, message, system_prompt): | |
| """Formate l'historique pour le modèle.""" | |
| messages = [] | |
| # System prompt | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| # Historique | |
| for user_msg, assistant_msg in history: | |
| if user_msg: | |
| messages.append({"role": "user", "content": user_msg}) | |
| if assistant_msg: | |
| messages.append({"role": "assistant", "content": assistant_msg}) | |
| # Message actuel | |
| messages.append({"role": "user", "content": message}) | |
| return messages | |
| def generate_response(message, history, system_prompt, temperature, max_tokens): | |
| """Génère une réponse en streaming.""" | |
| if not message.strip(): | |
| yield "" | |
| return | |
| # Formater les messages | |
| messages = format_messages(history, message, system_prompt) | |
| # Tokenizer | |
| try: | |
| input_text = tokenizer.apply_chat_template( | |
| messages, | |
| tokenize=False, | |
| add_generation_prompt=True | |
| ) | |
| except: | |
| # Fallback format | |
| input_text = "" | |
| for msg in messages: | |
| role = msg["role"] | |
| content = msg["content"] | |
| if role == "system": | |
| input_text += f"<|start_header_id|>system<|end_header_id|>\n\n{content}<|eot_id|>" | |
| elif role == "user": | |
| input_text += f"<|start_header_id|>user<|end_header_id|>\n\n{content}<|eot_id|>" | |
| elif role == "assistant": | |
| input_text += f"<|start_header_id|>assistant<|end_header_id|>\n\n{content}<|eot_id|>" | |
| input_text += "<|start_header_id|>assistant<|end_header_id|>\n\n" | |
| inputs = tokenizer(input_text, return_tensors="pt").to(model.device) | |
| # Streamer | |
| streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) | |
| generation_kwargs = dict( | |
| **inputs, | |
| max_new_tokens=int(max_tokens), | |
| temperature=float(temperature), | |
| top_p=TOP_P, | |
| do_sample=True, | |
| streamer=streamer, | |
| pad_token_id=tokenizer.pad_token_id, | |
| eos_token_id=tokenizer.eos_token_id, | |
| ) | |
| # Générer dans un thread | |
| thread = Thread(target=model.generate, kwargs=generation_kwargs) | |
| thread.start() | |
| # Stream la réponse | |
| response = "" | |
| for new_text in streamer: | |
| response += new_text | |
| yield response | |
| thread.join() | |
| def process_file(file): | |
| """Traite un fichier uploadé et retourne son contenu.""" | |
| if file is None: | |
| return "" | |
| try: | |
| file_path = file.name if hasattr(file, 'name') else file | |
| # Lire selon le type | |
| if file_path.endswith(('.txt', '.md', '.py', '.js', '.html', '.css', '.json', '.csv')): | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| return f"📄 Contenu du fichier:\n```\n{content[:5000]}\n```\n\n" | |
| elif file_path.endswith(('.pdf',)): | |
| try: | |
| import fitz # PyMuPDF | |
| doc = fitz.open(file_path) | |
| text = "" | |
| for page in doc: | |
| text += page.get_text() | |
| doc.close() | |
| return f"📄 Contenu du PDF:\n```\n{text[:5000]}\n```\n\n" | |
| except: | |
| return "⚠️ Impossible de lire le PDF. Installez PyMuPDF." | |
| elif file_path.endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): | |
| return f"🖼️ Image reçue: {os.path.basename(file_path)}\n(L'analyse d'image sera disponible dans une future version)\n\n" | |
| else: | |
| return f"📎 Fichier reçu: {os.path.basename(file_path)}\n\n" | |
| except Exception as e: | |
| return f"⚠️ Erreur lors de la lecture du fichier: {e}\n\n" | |
| def chat_with_file(message, file, history, system_prompt, temperature, max_tokens): | |
| """Chat avec support de fichiers.""" | |
| # Traiter le fichier si présent | |
| file_content = process_file(file) if file else "" | |
| # Combiner avec le message | |
| full_message = file_content + message if message else file_content | |
| if not full_message.strip(): | |
| yield "" | |
| return | |
| # Générer la réponse | |
| for response in generate_response(full_message, history, system_prompt, temperature, max_tokens): | |
| yield response | |
| def transcribe_audio(audio): | |
| """Transcrit l'audio en texte (reconnaissance vocale).""" | |
| if audio is None: | |
| return "" | |
| try: | |
| # Utiliser l'API de reconnaissance vocale de Gradio/navigateur | |
| # Pour CPU basic, on informe l'utilisateur | |
| return "🎤 [Audio reçu - tape ton message ou utilise un service de transcription externe]" | |
| except Exception as e: | |
| return f"⚠️ Erreur: {e}" | |
| def text_to_speech(text): | |
| """Convertit le texte en audio (synthèse vocale).""" | |
| if not text or not text.strip(): | |
| return None | |
| try: | |
| from gtts import gTTS | |
| import tempfile | |
| # Nettoyer le texte (enlever les balises thinking) | |
| clean_text = text.replace("<thinking>", "").replace("</thinking>", "") | |
| clean_text = clean_text[:500] # Limiter la longueur | |
| tts = gTTS(text=clean_text, lang='fr') | |
| with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: | |
| tts.save(f.name) | |
| return f.name | |
| except Exception as e: | |
| print(f"TTS error: {e}") | |
| return None | |
| # ============================================================================ | |
| # INTERFACE GRADIO | |
| # ============================================================================ | |
| # CSS personnalisé | |
| custom_css = """ | |
| .gradio-container { | |
| max-width: 900px !important; | |
| margin: auto !important; | |
| } | |
| .header { | |
| text-align: center; | |
| padding: 20px; | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| border-radius: 15px; | |
| margin-bottom: 20px; | |
| color: white; | |
| } | |
| .header h1 { | |
| margin: 0; | |
| font-size: 2.5em; | |
| } | |
| .header p { | |
| margin: 10px 0 0 0; | |
| opacity: 0.9; | |
| } | |
| .footer { | |
| text-align: center; | |
| padding: 15px; | |
| margin-top: 20px; | |
| opacity: 0.7; | |
| font-size: 0.9em; | |
| } | |
| .chatbot { | |
| min-height: 400px; | |
| } | |
| """ | |
| # Interface | |
| with gr.Blocks(css=custom_css, title="Mnemosyne v4.3.3", theme=gr.themes.Soft()) as demo: | |
| # Header | |
| gr.HTML(""" | |
| <div class="header"> | |
| <h1>🧠 Mnemosyne v4.3.3</h1> | |
| <p>IA Cognitive par Mike Amega (Logo) — Ame Web Studio</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| # Chatbot | |
| chatbot = gr.Chatbot( | |
| label="Conversation", | |
| height=450, | |
| show_copy_button=True, | |
| avatar_images=(None, "https://huggingface.co/spaces/amewebstudio/mnemosyne-chat/resolve/main/avatar.png"), | |
| ) | |
| # Zone de saisie | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| label="Message", | |
| placeholder="Écris ton message ici... (ou utilise le micro 🎤)", | |
| scale=4, | |
| lines=2, | |
| ) | |
| submit_btn = gr.Button("Envoyer 📤", variant="primary", scale=1) | |
| # Audio input/output | |
| with gr.Row(): | |
| audio_input = gr.Audio( | |
| label="🎤 Entrée vocale", | |
| sources=["microphone"], | |
| type="filepath", | |
| scale=2, | |
| ) | |
| audio_output = gr.Audio( | |
| label="🔊 Réponse vocale", | |
| type="filepath", | |
| scale=2, | |
| autoplay=True, | |
| ) | |
| # File upload | |
| file_input = gr.File( | |
| label="📎 Joindre un fichier (PDF, TXT, code, image...)", | |
| file_types=[".txt", ".md", ".py", ".js", ".json", ".csv", ".pdf", ".png", ".jpg", ".jpeg"], | |
| ) | |
| # Boutons | |
| with gr.Row(): | |
| clear_btn = gr.Button("🗑️ Effacer") | |
| speak_btn = gr.Button("🔊 Lire la réponse") | |
| with gr.Column(scale=1): | |
| # Paramètres | |
| gr.Markdown("### ⚙️ Paramètres") | |
| system_prompt = gr.Textbox( | |
| label="System Prompt", | |
| value=SYSTEM_PROMPT, | |
| lines=6, | |
| ) | |
| temperature = gr.Slider( | |
| label="Température", | |
| minimum=0.1, | |
| maximum=1.5, | |
| value=TEMPERATURE, | |
| step=0.1, | |
| ) | |
| max_tokens = gr.Slider( | |
| label="Tokens max", | |
| minimum=64, | |
| maximum=2048, | |
| value=MAX_NEW_TOKENS, | |
| step=64, | |
| ) | |
| gr.Markdown("### ℹ️ Info") | |
| gr.Markdown(""" | |
| **Mnemosyne v4.3.3** | |
| - 🧠 Raisonnement avancé | |
| - 💻 Code & Debug | |
| - 📐 Mathématiques | |
| - 🎤 Voix (entrée/sortie) | |
| - 📎 Fichiers | |
| *Créé par [Ame Web Studio](https://amewebstudio.com)* | |
| """) | |
| # Footer | |
| gr.HTML(""" | |
| <div class="footer"> | |
| <p>🧠 Mnemosyne v4.3.3 — Créé par Mike Amega (Logo) — <a href="https://amewebstudio.com" target="_blank">Ame Web Studio</a></p> | |
| </div> | |
| """) | |
| # ========== EVENTS ========== | |
| # Transcription audio → texte | |
| def transcribe_and_fill(audio): | |
| if audio: | |
| text = transcribe_audio(audio) | |
| return text | |
| return "" | |
| audio_input.change( | |
| fn=transcribe_and_fill, | |
| inputs=[audio_input], | |
| outputs=[msg], | |
| ) | |
| # Submit message | |
| def user_message(message, file, history): | |
| """Ajoute le message utilisateur.""" | |
| file_info = "" | |
| if file: | |
| file_info = f"📎 {os.path.basename(file.name)}\n" if hasattr(file, 'name') else "📎 Fichier\n" | |
| user_text = file_info + message | |
| return "", None, history + [[user_text, None]] | |
| def bot_response(history, system_prompt, temperature, max_tokens, file): | |
| """Génère la réponse du bot.""" | |
| if not history: | |
| yield history, None | |
| return | |
| user_message = history[-1][0] | |
| # Extraire le message sans l'info fichier | |
| if user_message.startswith("📎"): | |
| lines = user_message.split("\n", 1) | |
| clean_message = lines[1] if len(lines) > 1 else "" | |
| else: | |
| clean_message = user_message | |
| # Générer | |
| history[-1][1] = "" | |
| for response in chat_with_file(clean_message, file, history[:-1], system_prompt, temperature, max_tokens): | |
| history[-1][1] = response | |
| yield history, None | |
| # Bouton Envoyer | |
| submit_btn.click( | |
| fn=user_message, | |
| inputs=[msg, file_input, chatbot], | |
| outputs=[msg, file_input, chatbot], | |
| ).then( | |
| fn=bot_response, | |
| inputs=[chatbot, system_prompt, temperature, max_tokens, file_input], | |
| outputs=[chatbot, audio_output], | |
| ) | |
| # Enter pour envoyer | |
| msg.submit( | |
| fn=user_message, | |
| inputs=[msg, file_input, chatbot], | |
| outputs=[msg, file_input, chatbot], | |
| ).then( | |
| fn=bot_response, | |
| inputs=[chatbot, system_prompt, temperature, max_tokens, file_input], | |
| outputs=[chatbot, audio_output], | |
| ) | |
| # Clear | |
| clear_btn.click( | |
| fn=lambda: ([], None, None), | |
| outputs=[chatbot, audio_output, file_input], | |
| ) | |
| # Text to Speech | |
| def speak_last_response(history): | |
| if history and history[-1][1]: | |
| audio = text_to_speech(history[-1][1]) | |
| return audio | |
| return None | |
| speak_btn.click( | |
| fn=speak_last_response, | |
| inputs=[chatbot], | |
| outputs=[audio_output], | |
| ) | |
| # ============================================================================ | |
| # LAUNCH | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| demo.queue().launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| ) | |