import os import chardet import gradio as gr import fitz # PyMuPDF import faiss import numpy as np from sentence_transformers import SentenceTransformer from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForSeq2SeqLM # Chemins pour index persistants INDEX_PATH = "faiss_index.idx" CHUNKS_PATH = "chunks.txt" # Extraction du texte def extract_text_from_pdf(pdf_path): doc = fitz.open(pdf_path) return "\n".join(page.get_text() for page in doc) def load_documents(folder_path): texts, filenames = [], [] for filename in os.listdir(folder_path): path = os.path.join(folder_path, filename) if filename.endswith(".pdf"): texts.append(extract_text_from_pdf(path)) filenames.append(filename) elif filename.endswith(".txt"): with open(path, "rb") as f: raw_data = f.read() result = chardet.detect(raw_data) encoding = result["encoding"] or "utf-8" try: text = raw_data.decode(encoding) texts.append(text) filenames.append(filename) except Exception as e: print(f"Erreur lors du décodage de {filename} : {e}") return texts, filenames # Segmentation def chunk_text(text, max_len=500): paragraphs = text.split('\n\n') chunks, current_chunk = [], "" for p in paragraphs: if len(current_chunk) + len(p) < max_len: current_chunk += p + "\n\n" else: if current_chunk.strip(): chunks.append(current_chunk.strip()) current_chunk = p + "\n\n" if current_chunk.strip(): chunks.append(current_chunk.strip()) return chunks # Construction et sauvegarde de l’index def build_and_save_faiss_index(chunks, model): if not chunks: raise ValueError("Aucun chunk fourni pour l'indexation.") embeddings = model.encode(chunks, convert_to_numpy=True) if len(embeddings.shape) == 1: embeddings = embeddings.reshape(1, -1) embeddings = embeddings.astype("float32") dimension = embeddings.shape[1] index = faiss.IndexFlatL2(dimension) index.add(embeddings) faiss.write_index(index, INDEX_PATH) with open(CHUNKS_PATH, "w", encoding="utf-8") as f: f.write("\n<>\n".join(chunks)) return index, chunks # Chargement de l’index si déjà présent def load_faiss_index(): if not os.path.exists(INDEX_PATH) or not os.path.exists(CHUNKS_PATH): return None, None index = faiss.read_index(INDEX_PATH) with open(CHUNKS_PATH, "r", encoding="utf-8") as f: chunks = f.read().split("\n<>\n") return index, chunks # Modèles Hugging Face emotion_model_name = "astrosbd/french_emotion_camembert" emotion_tokenizer = AutoTokenizer.from_pretrained(emotion_model_name) emotion_model = AutoModelForSequenceClassification.from_pretrained(emotion_model_name) emotion_pipe = pipeline("text-classification", model=emotion_model, tokenizer=emotion_tokenizer) gen_model_name = "plguillou/t5-base-fr-sum-cnndm" gen_tokenizer = AutoTokenizer.from_pretrained(gen_model_name) gen_model = AutoModelForSeq2SeqLM.from_pretrained(gen_model_name) emb_model = SentenceTransformer('paraphrase-multilingual-mpnet-base-v2') index, chunks = load_faiss_index() if index is None or chunks is None: print("Reconstruction de l'index FAISS...") folder_path = "./happython" raw_texts, filenames = load_documents(folder_path) all_chunks = [] for text in raw_texts: all_chunks.extend(chunk_text(text)) if not all_chunks: raise ValueError("Aucun texte trouvé dans les documents.") index, chunks = build_and_save_faiss_index(all_chunks, emb_model) # --- Logique du chatbot à menus --- def accueil(): return ( "Je suis NEOLA, le Médiateur virtuel. Que souhaitez-vous faire aujourd'hui ?", gr.update(visible=True), # boutons visibles gr.update(visible=False), # zone témoignage cachée gr.update(visible=False), # zone sentiment cachée gr.update(visible=False), # zone question cachée ) def choix_temoignage(): return ( "Merci ! Écris ici ton témoignage heureux puis valide.", gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), ) def choix_sentiment(): return ( "Décris-moi en quelques mots comment tu te sens aujourd'hui.", gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), ) def choix_question(): return ( "Quelle est ta question ?", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), ) def handle_temoignage(temoignage): return f"Merci pour ton témoignage ! Il a bien été reçu. 😊", None, *accueil()[1:] def handle_sentiment(sentiment): emotion_result = emotion_pipe(sentiment) emotion = emotion_result[0]['label'] suggestion = { "joy": "Tu sembles joyeux·se ! Veux-tu partager cette joie dans un témoignage ou essayer un atelier créatif ?", "sad": "Je ressens de la tristesse. Tu peux écrire ce que tu ressens ou consulter la boîte à outils 'réconfort'.", "anger": "Exprimer sa colère est important. Souhaites-tu en parler ou découvrir des exercices de gestion ?" }.get(emotion, "Merci pour ce partage. Veux-tu explorer un atelier ou écrire un témoignage ?") return f"Émotion détectée : {emotion}. {suggestion}", emotion, *accueil()[1:] def handle_question(question): # Recherche documentaire (RAG) query_vec = emb_model.encode([question], convert_to_numpy=True).astype("float32") if query_vec.shape[1] != index.d: return "Désolé, problème technique avec la recherche.", None, *accueil()[1:] D, I = index.search(query_vec, k=3) relevant_chunks = [chunks[i] for i in I[0] if chunks[i].strip()] context = "\n".join(relevant_chunks) prompt = ( "Tu es NEOLA, le médiateur bienveillant du Happython Village.\n" "Réponds en français de façon claire, concise et bienveillante à la question suivante, en t'appuyant sur les souvenirs du village si utile.\n\n" f"Question du villageois : \"{question}\"\n" f"Contexte :\n{context}\n" "Réponse :" ) input_ids = gen_tokenizer(prompt, return_tensors="pt", max_length=1024, truncation=True).input_ids output_ids = gen_model.generate(input_ids, max_new_tokens=120) response = gen_tokenizer.decode(output_ids[0], skip_special_tokens=True) # Analyse émotionnelle sur la question (ou la réponse si tu préfères) emotion_result = emotion_pipe(question) emotion = emotion_result[0]['label'] return response, emotion, *accueil()[1:] # --- API pour intégration widget JS --- def neola_api(message, user_info=None): if message.lower().startswith("témoignage:"): rep, emotion, *_ = handle_temoignage(message[11:].strip()) return rep, emotion elif message.lower().startswith("sentiment:"): rep, emotion, *_ = handle_sentiment(message[10:].strip()) return rep, emotion elif message.lower().startswith("question:"): rep, emotion, *_ = handle_question(message[9:].strip()) return rep, emotion else: emotion_result = emotion_pipe(message) emotion = emotion_result[0]['label'] response = f"Bienvenue ! (Émotion détectée : {emotion}) Que veux-tu faire aujourd'hui ? (Déposer un témoignage, exprimer un sentiment, poser une question...)" return response, emotion # --- Gradio UI --- with gr.Blocks(theme="soft") as demo: gr.Markdown("## 👋 Bienvenue au Happython Village !") chatbot = gr.Textbox( label="NEOLA", value="Je suis NEOLA, le Médiateur virtuel. Que souhaitez-vous faire aujourd'hui ?", interactive=False ) with gr.Row(): btn_temoignage = gr.Button("Déposer un témoignage heureux") btn_sentiment = gr.Button("Je ne sais pas quoi faire") btn_question = gr.Button("J'ai des questions") zone_temoignage = gr.Textbox(label="Écris ton témoignage ici", visible=False) zone_sentiment = gr.Textbox(label="Décris tes sentiments", visible=False) zone_question = gr.Textbox(label="Pose ta question", visible=False) btn_temoignage.click(choix_temoignage, None, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) btn_sentiment.click(choix_sentiment, None, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) btn_question.click(choix_question, None, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) zone_temoignage.submit(handle_temoignage, zone_temoignage, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) zone_sentiment.submit(handle_sentiment, zone_sentiment, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) zone_question.submit(handle_question, zone_question, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) # API Gradio pour widget externe : 2 outputs (réponse, émotion) gr.Interface( fn=neola_api, inputs=[gr.Textbox(label="Message"), gr.JSON(label="Infos utilisateur")], outputs=[ gr.Textbox(label="Réponse NEOLA"), gr.Textbox(label="Émotion détectée") ], title="API NEOLA (pour widget JS)", description="Appelle ce Space en POST sur /predict/ avec {'data': [message, user_info]}. La réponse contient [réponse, émotion]." ) if __name__ == "__main__": demo.launch()