| | import os |
| | import chardet |
| | import gradio as gr |
| | import fitz |
| | import faiss |
| | import numpy as np |
| | from sentence_transformers import SentenceTransformer |
| | from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForSeq2SeqLM |
| |
|
| | |
| | INDEX_PATH = "faiss_index.idx" |
| | CHUNKS_PATH = "chunks.txt" |
| |
|
| | |
| | def extract_text_from_pdf(pdf_path): |
| | doc = fitz.open(pdf_path) |
| | return "\n".join(page.get_text() for page in doc) |
| |
|
| | def load_documents(folder_path): |
| | texts, filenames = [], [] |
| | for filename in os.listdir(folder_path): |
| | path = os.path.join(folder_path, filename) |
| | if filename.endswith(".pdf"): |
| | texts.append(extract_text_from_pdf(path)) |
| | filenames.append(filename) |
| | elif filename.endswith(".txt"): |
| | with open(path, "rb") as f: |
| | raw_data = f.read() |
| | result = chardet.detect(raw_data) |
| | encoding = result["encoding"] or "utf-8" |
| | try: |
| | text = raw_data.decode(encoding) |
| | texts.append(text) |
| | filenames.append(filename) |
| | except Exception as e: |
| | print(f"Erreur lors du décodage de {filename} : {e}") |
| | return texts, filenames |
| |
|
| | |
| | def chunk_text(text, max_len=500): |
| | paragraphs = text.split('\n\n') |
| | chunks, current_chunk = [], "" |
| | for p in paragraphs: |
| | if len(current_chunk) + len(p) < max_len: |
| | current_chunk += p + "\n\n" |
| | else: |
| | if current_chunk.strip(): |
| | chunks.append(current_chunk.strip()) |
| | current_chunk = p + "\n\n" |
| | if current_chunk.strip(): |
| | chunks.append(current_chunk.strip()) |
| | return chunks |
| |
|
| | |
| | def build_and_save_faiss_index(chunks, model): |
| | if not chunks: |
| | raise ValueError("Aucun chunk fourni pour l'indexation.") |
| | embeddings = model.encode(chunks, convert_to_numpy=True) |
| | if len(embeddings.shape) == 1: |
| | embeddings = embeddings.reshape(1, -1) |
| | embeddings = embeddings.astype("float32") |
| | dimension = embeddings.shape[1] |
| | index = faiss.IndexFlatL2(dimension) |
| | index.add(embeddings) |
| | faiss.write_index(index, INDEX_PATH) |
| | with open(CHUNKS_PATH, "w", encoding="utf-8") as f: |
| | f.write("\n<>\n".join(chunks)) |
| | return index, chunks |
| |
|
| | |
| | def load_faiss_index(): |
| | if not os.path.exists(INDEX_PATH) or not os.path.exists(CHUNKS_PATH): |
| | return None, None |
| | index = faiss.read_index(INDEX_PATH) |
| | with open(CHUNKS_PATH, "r", encoding="utf-8") as f: |
| | chunks = f.read().split("\n<>\n") |
| | return index, chunks |
| |
|
| | |
| | emotion_model_name = "astrosbd/french_emotion_camembert" |
| | emotion_tokenizer = AutoTokenizer.from_pretrained(emotion_model_name) |
| | emotion_model = AutoModelForSequenceClassification.from_pretrained(emotion_model_name) |
| | emotion_pipe = pipeline("text-classification", model=emotion_model, tokenizer=emotion_tokenizer) |
| |
|
| | gen_model_name = "plguillou/t5-base-fr-sum-cnndm" |
| | gen_tokenizer = AutoTokenizer.from_pretrained(gen_model_name) |
| | gen_model = AutoModelForSeq2SeqLM.from_pretrained(gen_model_name) |
| |
|
| | emb_model = SentenceTransformer('paraphrase-multilingual-mpnet-base-v2') |
| | index, chunks = load_faiss_index() |
| | if index is None or chunks is None: |
| | print("Reconstruction de l'index FAISS...") |
| | folder_path = "./happython" |
| | raw_texts, filenames = load_documents(folder_path) |
| | all_chunks = [] |
| | for text in raw_texts: |
| | all_chunks.extend(chunk_text(text)) |
| | if not all_chunks: |
| | raise ValueError("Aucun texte trouvé dans les documents.") |
| | index, chunks = build_and_save_faiss_index(all_chunks, emb_model) |
| |
|
| | |
| |
|
| | def accueil(): |
| | return ( |
| | "Je suis NEOLA, le Médiateur virtuel. Que souhaitez-vous faire aujourd'hui ?", |
| | gr.update(visible=True), |
| | gr.update(visible=False), |
| | gr.update(visible=False), |
| | gr.update(visible=False), |
| | ) |
| |
|
| | def choix_temoignage(): |
| | return ( |
| | "Merci ! Écris ici ton témoignage heureux puis valide.", |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=False), |
| | gr.update(visible=False), |
| | ) |
| |
|
| | def choix_sentiment(): |
| | return ( |
| | "Décris-moi en quelques mots comment tu te sens aujourd'hui.", |
| | gr.update(visible=False), |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | gr.update(visible=False), |
| | ) |
| |
|
| | def choix_question(): |
| | return ( |
| | "Quelle est ta question ?", |
| | gr.update(visible=False), |
| | gr.update(visible=False), |
| | gr.update(visible=False), |
| | gr.update(visible=True), |
| | ) |
| |
|
| | def handle_temoignage(temoignage): |
| | return f"Merci pour ton témoignage ! Il a bien été reçu. 😊", None, *accueil()[1:] |
| |
|
| | def handle_sentiment(sentiment): |
| | emotion_result = emotion_pipe(sentiment) |
| | emotion = emotion_result[0]['label'] |
| | suggestion = { |
| | "joy": "Tu sembles joyeux·se ! Veux-tu partager cette joie dans un témoignage ou essayer un atelier créatif ?", |
| | "sad": "Je ressens de la tristesse. Tu peux écrire ce que tu ressens ou consulter la boîte à outils 'réconfort'.", |
| | "anger": "Exprimer sa colère est important. Souhaites-tu en parler ou découvrir des exercices de gestion ?" |
| | }.get(emotion, "Merci pour ce partage. Veux-tu explorer un atelier ou écrire un témoignage ?") |
| | return f"Émotion détectée : {emotion}. {suggestion}", emotion, *accueil()[1:] |
| |
|
| | def handle_question(question): |
| | |
| | query_vec = emb_model.encode([question], convert_to_numpy=True).astype("float32") |
| | if query_vec.shape[1] != index.d: |
| | return "Désolé, problème technique avec la recherche.", None, *accueil()[1:] |
| | D, I = index.search(query_vec, k=3) |
| | relevant_chunks = [chunks[i] for i in I[0] if chunks[i].strip()] |
| | context = "\n".join(relevant_chunks) |
| | prompt = ( |
| | "Tu es NEOLA, le médiateur bienveillant du Happython Village.\n" |
| | "Réponds en français de façon claire, concise et bienveillante à la question suivante, en t'appuyant sur les souvenirs du village si utile.\n\n" |
| | f"Question du villageois : \"{question}\"\n" |
| | f"Contexte :\n{context}\n" |
| | "Réponse :" |
| | ) |
| | input_ids = gen_tokenizer(prompt, return_tensors="pt", max_length=1024, truncation=True).input_ids |
| | output_ids = gen_model.generate(input_ids, max_new_tokens=120) |
| | response = gen_tokenizer.decode(output_ids[0], skip_special_tokens=True) |
| | |
| | emotion_result = emotion_pipe(question) |
| | emotion = emotion_result[0]['label'] |
| | return response, emotion, *accueil()[1:] |
| |
|
| | |
| | def neola_api(message, user_info=None): |
| | if message.lower().startswith("témoignage:"): |
| | rep, emotion, *_ = handle_temoignage(message[11:].strip()) |
| | return rep, emotion |
| | elif message.lower().startswith("sentiment:"): |
| | rep, emotion, *_ = handle_sentiment(message[10:].strip()) |
| | return rep, emotion |
| | elif message.lower().startswith("question:"): |
| | rep, emotion, *_ = handle_question(message[9:].strip()) |
| | return rep, emotion |
| | else: |
| | emotion_result = emotion_pipe(message) |
| | emotion = emotion_result[0]['label'] |
| | response = f"Bienvenue ! (Émotion détectée : {emotion}) Que veux-tu faire aujourd'hui ? (Déposer un témoignage, exprimer un sentiment, poser une question...)" |
| | return response, emotion |
| |
|
| | |
| |
|
| | with gr.Blocks(theme="soft") as demo: |
| | gr.Markdown("## 👋 Bienvenue au Happython Village !") |
| | chatbot = gr.Textbox( |
| | label="NEOLA", |
| | value="Je suis NEOLA, le Médiateur virtuel. Que souhaitez-vous faire aujourd'hui ?", |
| | interactive=False |
| | ) |
| | with gr.Row(): |
| | btn_temoignage = gr.Button("Déposer un témoignage heureux") |
| | btn_sentiment = gr.Button("Je ne sais pas quoi faire") |
| | btn_question = gr.Button("J'ai des questions") |
| | zone_temoignage = gr.Textbox(label="Écris ton témoignage ici", visible=False) |
| | zone_sentiment = gr.Textbox(label="Décris tes sentiments", visible=False) |
| | zone_question = gr.Textbox(label="Pose ta question", visible=False) |
| |
|
| | btn_temoignage.click(choix_temoignage, None, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) |
| | btn_sentiment.click(choix_sentiment, None, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) |
| | btn_question.click(choix_question, None, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) |
| |
|
| | zone_temoignage.submit(handle_temoignage, zone_temoignage, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) |
| | zone_sentiment.submit(handle_sentiment, zone_sentiment, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) |
| | zone_question.submit(handle_question, zone_question, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) |
| |
|
| | |
| | gr.Interface( |
| | fn=neola_api, |
| | inputs=[gr.Textbox(label="Message"), gr.JSON(label="Infos utilisateur")], |
| | outputs=[ |
| | gr.Textbox(label="Réponse NEOLA"), |
| | gr.Textbox(label="Émotion détectée") |
| | ], |
| | title="API NEOLA (pour widget JS)", |
| | description="Appelle ce Space en POST sur /predict/ avec {'data': [message, user_info]}. La réponse contient [réponse, émotion]." |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |