Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import hashlib | |
| import requests | |
| import os | |
| import pytz | |
| import time | |
| import unicodedata | |
| import importlib.metadata | |
| import re | |
| import tempfile | |
| import uuid | |
| import base64 | |
| import logging | |
| from pydantic import BaseModel, validator | |
| from dotenv import load_dotenv | |
| from sentence_transformers import SentenceTransformer | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.http import models | |
| from typing import List, Optional, Union, Dict, Any, Literal, Set | |
| from datetime import datetime, timedelta | |
| from collections import defaultdict | |
| from PyPDF2 import PdfReader | |
| from docx import Document | |
| from urllib.parse import urlparse, parse_qs | |
| try: | |
| import pdfplumber | |
| print("✅ pdfplumber est installé et fonctionnel.") | |
| except ImportError: | |
| print("❌ pdfplumber n'est pas installé. Exécutez : pip install pdfplumber") | |
| raise | |
| ################################################################# | |
| # 1. CLASSES, FONCTIONS UTILITAIRES, INITIALISATION DES VARIABLES | |
| ################################################################# | |
| # --- Persistance de l'historique via session_state --- | |
| def save_historique(): | |
| # Limite le nombre d'entrées pour éviter la surcharge (ex: 50 dernières) | |
| if len(st.session_state.full_historique) > 50: | |
| # Supprime les entrées les plus anciennes | |
| sorted_entries = sorted( | |
| st.session_state.full_historique.items(), | |
| key=lambda x: x[1]["metadata"].get("timestamp", "") | |
| ) | |
| st.session_state.full_historique = dict(sorted_entries[-50:]) | |
| st.session_state.historique_cache = st.session_state.full_historique.copy() | |
| def load_historique(): | |
| if "historique_cache" in st.session_state: | |
| st.session_state.full_historique = st.session_state.historique_cache.copy() | |
| # Pour permettre de renommer une collection | |
| if 'show_rename_modal' not in st.session_state: | |
| st.session_state.show_rename_modal = False | |
| if 'current_doc_to_rename' not in st.session_state: | |
| st.session_state.current_doc_to_rename = None | |
| # Initialisation des états de certaines variables si non existants | |
| if 'use_priority_docs' not in st.session_state: | |
| st.session_state.use_priority_docs = False | |
| if 'priority_docs' not in st.session_state: | |
| st.session_state.priority_docs = [] | |
| # Estimation du nombre de tokens d'un texte en français | |
| def estimate_tokens(text): | |
| """Estime le nombre de tokens pour Mistral Large (1 token ≈ 4 caractères en français).""" | |
| return len(text) // 4 | |
| # Fonction utilitaire pour tronquer le texte | |
| def truncate_text(text: str, max_tokens: int = 500) -> str: | |
| """Tronque un texte à un nombre maximal de tokens (1 token ≈ 4 caractères).""" | |
| max_chars = max_tokens * 4 | |
| return (text[:max_chars] + "...") if len(text) > max_chars else text | |
| # Fonction de conversion des dates | |
| def safe_parse_date(date_str: Optional[str]) -> datetime: | |
| """Convertit une date hétérogène en datetime, ou datetime.min si invalide.""" | |
| if not date_str: | |
| return datetime.min | |
| for fmt in ("%Y-%m-%d", "%d/%m/%Y", "%Y/%m/%d"): | |
| try: | |
| return datetime.strptime(date_str, fmt) | |
| except ValueError: | |
| continue | |
| return datetime.min | |
| # Masquer les messages de warning de pdfminer | |
| logging.getLogger("pdfminer").setLevel(logging.WARNING) | |
| # Clé numérique | |
| def sort_key(article_num: str): | |
| """ | |
| Transforme un identifiant d'article (ex: 'D146-12-1') en tuple numérique | |
| pour un tri correct. | |
| """ | |
| if not article_num: | |
| return ("",) | |
| prefix = article_num[0] | |
| parts = re.findall(r'\d+', article_num) | |
| nums = [int(p) for p in parts] | |
| return (prefix, *nums) | |
| # Charger l'historique au démarrage | |
| load_historique() | |
| # --- Configuration de la page (doit être unique et en premier) --- | |
| st.set_page_config( | |
| page_title="Parlement RAG", | |
| page_icon="🗳️", | |
| layout="wide" # ← élargit toute la page | |
| ) | |
| # --- Classe CSS pour tous les messages de statut | |
| st.markdown(""" | |
| <style> | |
| /* ===== STYLES GLOBAUX ===== */ | |
| /* Messages de statut (utilisé par status_placeholder et prep_placeholder) */ | |
| .status-message, .prep-message { | |
| display: flex; | |
| justify-content: center; | |
| align-items: center; | |
| margin: 0.5rem auto !important; | |
| padding: 0.7rem 1rem; | |
| text-align: center; | |
| font-size: 14px; | |
| color: #555; | |
| background-color: #f0f2f6; | |
| border-radius: 6px; | |
| max-width: 600px; | |
| box-shadow: 0 1px 3px rgba(0,0,0,0.1); | |
| } | |
| /* Style spécifique pour les messages dans les onglets */ | |
| .prep-message { | |
| min-height: 40px; | |
| font-size: 15px; | |
| } | |
| /* Conteneur principal */ | |
| .stApp { | |
| display: flex; | |
| flex-direction: column; | |
| } | |
| .block-container { | |
| padding-top: 1rem; | |
| padding-bottom: 1rem; | |
| } | |
| /* Expanders et contenu */ | |
| div[data-testid="stExpander"] { | |
| margin: 0 auto 0.5rem auto !important; | |
| max-width: 1000px; | |
| width: 100%; | |
| } | |
| div.streamlit-expanderHeader { | |
| padding: 0.3rem 0.6rem !important; | |
| font-size: 13px !important; | |
| background-color: #f0f2f6 !important; | |
| border-radius: 4px !important; | |
| } | |
| div.streamlit-expanderContent { | |
| padding: 0.5rem 0.8rem !important; | |
| text-align: justify; | |
| } | |
| /* Titres et séparateurs */ | |
| h3 { | |
| text-align: center; | |
| margin: 0.4rem auto !important; | |
| color: #0066cc; | |
| } | |
| hr { | |
| margin: 0.3rem auto; | |
| width: 80%; | |
| border: none; | |
| border-top: 1px solid #ddd; | |
| } | |
| /* Suppression des marges inutiles */ | |
| .stMarkdown > div { | |
| margin: 0 !important; | |
| } | |
| p, ul, ol { | |
| margin: 0.2rem 0 !important; | |
| padding: 0 !important; | |
| } | |
| /* Style pour les expanders de l'historique */ | |
| div[data-testid="stExpander"] > details > summary { | |
| padding: 0.5rem 1rem !important; | |
| background-color: #f0f2f6 !important; | |
| border-radius: 6px !important; | |
| margin-bottom: 0.5rem !important; | |
| } | |
| /* Espacement entre les entrées */ | |
| .history-entry { | |
| margin-bottom: 1rem !important; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # --- Chargement des variables d'environnement --- | |
| load_dotenv() | |
| QDRANT_URL = os.getenv("QDRANT_URL", "").strip() | |
| QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", "").strip() | |
| MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY", "").strip() | |
| TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "").strip() | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "").strip() | |
| GOOGLE_CX = os.getenv("GOOGLE_CX", "").strip() | |
| QDRANT_COLLECTION = "QuestionParlementaire" | |
| if not QDRANT_URL.startswith("https://"): | |
| raise RuntimeError("❌ QDRANT_URL doit commencer par https://") | |
| if not QDRANT_API_KEY: | |
| raise RuntimeError("❌ QDRANT_API_KEY manquant. Vérifiez votre fichier .env") | |
| if not MISTRAL_API_KEY: | |
| raise RuntimeError("❌ MISTRAL_API_KEY manquant. Vérifiez votre fichier .env") | |
| print("QDRANT_URL:", QDRANT_URL) | |
| print("QDRANT_API_KEY (début):", QDRANT_API_KEY[:10], "...") | |
| # --- Chargement du modèle --- | |
| LOCAL_MODEL_PATH = "./models/camembert_finetuned_progressive" | |
| HUB_MODEL_PATH = "Whisler/camembert_finetuned_progressive" | |
| def load_embedding_model(): | |
| try: | |
| if os.path.exists(LOCAL_MODEL_PATH): | |
| model = SentenceTransformer(LOCAL_MODEL_PATH) | |
| print("✅ Modèle chargé en local.") | |
| else: | |
| model = SentenceTransformer(HUB_MODEL_PATH) | |
| print("✅ Modèle téléchargé depuis HuggingFace Hub.") | |
| test_embedding = model.encode("Test de chargement du modèle.") | |
| VECTOR_SIZE = len(test_embedding) | |
| print("Dimension des embeddings:", VECTOR_SIZE) | |
| return model | |
| except Exception as e: | |
| st.error(f"❌ Erreur de chargement du modèle: {str(e)}") | |
| raise | |
| embedding_model = load_embedding_model() | |
| # --- Connexion à Qdrant --- | |
| try: | |
| qdrant_client = QdrantClient( | |
| url=QDRANT_URL, | |
| api_key=QDRANT_API_KEY, | |
| timeout=10.0, | |
| check_compatibility=False | |
| ) | |
| collections = qdrant_client.get_collections() | |
| print(f"✅ Connexion réussie. Collections disponibles: {[c.name for c in collections.collections]}") | |
| except Exception as e: | |
| print(f"❌ Erreur de connexion à Qdrant: {e}") | |
| raise | |
| # --- Modèles Pydantic --- | |
| # Modele Pydantic pour les articles juridiques | |
| class BaseLegislativeRef(BaseModel): | |
| uid: str | |
| collection: str | |
| class RetrievedLegalDocument(BaseModel): | |
| # Identifiants | |
| chunk_id: str | |
| num: str | |
| titre: str | |
| # Contenu | |
| contenu: str | |
| article_complet: str | |
| # Contexte | |
| contexte_hierarchique: str | |
| collection: str | |
| # Hiérarchie optionnelle | |
| partie: Optional[str] = None | |
| livre: Optional[str] = None | |
| titre_structure: Optional[str] = None | |
| chapitre: Optional[str] = None | |
| section: Optional[str] = None | |
| sous_section: Optional[str] = None | |
| paragraphe: Optional[str] = None | |
| sous_paragraphe: Optional[str] = None | |
| # Références législatives | |
| base_legislative: Optional[List[BaseLegislativeRef]] = None | |
| # Score du retrieval | |
| score: Optional[float] = None | |
| # Modele Pydantic pour les documents generiques | |
| class GenericDocument(BaseModel): | |
| # Identifiants | |
| uid: Optional[str] = None | |
| # Contenu | |
| text: str | |
| title: Optional[str] = None | |
| part: Optional[str] = None # ex. "Annexe 7", "partie 1" | |
| # Métadonnées | |
| source: Optional[str] = None | |
| date_document: Optional[str] = None | |
| type_document: Optional[str] = None | |
| # Score du retrieval | |
| score: Optional[float] = None | |
| # Modele Pydantic pour les reponses RAG | |
| class ResponseDocument(BaseModel): | |
| # Identifiants | |
| uid: str | |
| # Contenu | |
| question: str | |
| reponse: str | |
| # Métadonnées | |
| legislature: Optional[str] = None | |
| chambre: Optional[str] = None # Assemblée ou Sénat (à ajouter si tu l’as dans tes données) | |
| rubrique: Optional[str] = None | |
| analyse: Optional[str] = None | |
| ministeres_attribues: Optional[List[str]] = None | |
| # Dates | |
| date_question: Optional[str] = None | |
| date_reponse: Optional[str] = None | |
| # Références juridiques éventuelles | |
| textes_juridiques: Optional[List[str]] = None | |
| # Score du retrieval | |
| score: Optional[float] = None | |
| ################################################################# | |
| # -------------- 1. PRINCIPALES FONCTIONS ----------------------- | |
| ################################################################# | |
| # --- 1a. Fonctions d'upload, d'indexation et d'embedding | |
| # Fonction pour extraire le texte d'un document pdf | |
| def extract_text(pdf_path: str, max_pages: Optional[int] = None) -> str: | |
| """ | |
| Extrait le texte d'un PDF avec plusieurs stratégies : | |
| - pdfplumber pour le texte brut | |
| - fallback OCR si une page est vide | |
| - nettoyage des espaces et des sauts de ligne | |
| """ | |
| text_chunks = [] | |
| try: | |
| with pdfplumber.open(pdf_path) as pdf: | |
| pages = pdf.pages[:max_pages] if max_pages else pdf.pages | |
| for i, page in enumerate(pages, start=1): | |
| try: | |
| # Extraction brute | |
| page_text = page.extract_text() or "" | |
| word_count = len(page_text.split()) if page_text else 0 | |
| # # Si vide, tenter une extraction par OCR (optionnel) | |
| # if not page_text.strip(): | |
| # try: | |
| # from pdf2image import convert_from_path | |
| # import pytesseract | |
| # images = convert_from_path(pdf_path, first_page=i, last_page=i) | |
| # ocr_text = pytesseract.image_to_string(images[0], lang="fra") | |
| # page_text = ocr_text | |
| # st.write(f"Page {i} → OCR fallback → {len(page_text.split())} mots") | |
| # except Exception as e: | |
| # print(f"⚠️ OCR non disponible pour la page {i}: {e}") | |
| # pass | |
| # Nettoyage basique | |
| page_text = re.sub(r"\s+", " ", page_text).strip() | |
| if page_text: | |
| text_chunks.append(page_text) | |
| except Exception as e: | |
| print(f"⚠️ Erreur page {i}: {e}") | |
| continue | |
| except Exception as e: | |
| print(f"❌ Erreur lors de l'ouverture du PDF: {e}") | |
| return "" | |
| raw_text = "\n\n".join(text_chunks).strip() | |
| return raw_text | |
| # Fonction pour extraire le texte d'un document Word | |
| def extract_text_from_docx(file_path: str) -> str: | |
| """Extrait le texte d'un fichier Word.""" | |
| try: | |
| doc = Document(file_path) | |
| text = "\n".join([para.text for para in doc.paragraphs if para.text.strip()]) | |
| return text.strip() | |
| except Exception as e: | |
| st.error(f"❌ Erreur lors de l'extraction du DOCX: {e}") | |
| return "" | |
| # Fonction pour nettoyer les textes extraits | |
| def clean_text(text: str) -> str: | |
| """Nettoie le texte extrait d'un PDF administratif en préservant la structure et les données utiles.""" | |
| lines = text.split('\n') | |
| cleaned_lines = [] | |
| for line in lines: | |
| line = line.strip() | |
| # Ignore les lignes vides ou presque vides | |
| if not line: | |
| continue | |
| # Ignore les numéros de page isolés (ex: "Page 57" ou "57") | |
| if re.match(r'^(?:Page\s*)?\d+\s*$', line, re.IGNORECASE): | |
| continue | |
| # Ignore les en-têtes/pieds de page répétitifs (ex: "PLFSS 2025 - Annexe 7") | |
| if re.match(r'^(?:PLFSS\s*\d{4}\s*-\s*Annexe\s*\d+|ANNEXE\s*DÉPENSES\s*DE\s*LA\s*BRANCHE\s*.*|securite-sociale\.fr|Source\s*:?\s*.*|Génération\s*X-Book)$', line, re.IGNORECASE): | |
| continue | |
| # Ignore les lignes avec seulement des caractères spéciaux ou des tirets | |
| if re.match(r'^[•○◘\-—~=]+$', line): | |
| continue | |
| # Conserve les lignes même courtes (titres, sous-titres, etc.) | |
| cleaned_lines.append(line) | |
| # Reconstitue le texte | |
| text = '\n'.join(cleaned_lines) | |
| # Nettoyage global | |
| text = re.sub(r'\s+', ' ', text) # Espaces multiples | |
| text = re.sub(r'(\w)\s+-\s+(\w)', r'\1\2', text) # Mots coupés par tiret | |
| text = re.sub(r'\bhttps?://\S+', '', text) # URLs | |
| text = re.sub(r'[•○◘]+|[-=~]{5,}', '', text) # Artefacts visuels | |
| # Nettoie les espaces résiduels | |
| text = text.strip() | |
| return text | |
| # Fonction de suppression du sommaire | |
| def remove_summary(text: str) -> str: | |
| """Supprime le sommaire et les tables des matières.""" | |
| cleaned = re.sub( | |
| r"(?i)(SOMMAIRE|TABLE DES MATIÈRES).*?(?=PARTIE\s+\d+|ANNEXE\s+\d+|Article\s+\d+|$)", | |
| "", | |
| text, | |
| flags=re.DOTALL | |
| ) | |
| return cleaned.strip() if cleaned.strip() else text | |
| # Fonction de pré-traitement des titres | |
| def preprocess_for_titles(text: str) -> str: | |
| """Insère des sauts de ligne avant chaque titre pour améliorer la segmentation.""" | |
| sentences = re.split(r'(?<=[.!?])\s+', text) | |
| processed_text = [] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| if detect_titles(sentence): | |
| processed_text.append(f"\n{sentence}\n") | |
| else: | |
| processed_text.append(sentence) | |
| text = " ".join(processed_text) | |
| # Ajout de sauts de ligne autour des titres | |
| title_patterns = [ | |
| r"(Article\s*\d*\s*[-–]?)", r"(ANNEXE\s+\d+)", r"(PARTIE\s+\d+)", | |
| r"(Fiches?\s+d’?évaluation\s+préalable)", r"(\d+\.\s+)", r"([IVXLCDM]+\.\s+)" | |
| ] | |
| for pattern in title_patterns: | |
| text = re.sub(pattern, r"\n\1\n", text, flags=re.IGNORECASE) | |
| text = re.sub(r'\n\s*\n', '\n', text) | |
| return text.strip() | |
| # Fonction de détection des titres | |
| def detect_titles(line: str) -> bool: | |
| """Détecte les titres de manière robuste.""" | |
| line = line.strip() | |
| if not line: | |
| return False | |
| regex_patterns = [ | |
| r"^Article\s+\d+\s*[–—-]?", r"^ANNEXE\s+\d+", r"^PARTIE\s+\d+", | |
| r"^(TITRE|Chapitre|Section)\s+\d+", r"^[IVXLCDM]+\.\s+", r"^\d+(\.\d+)*\s+" | |
| ] | |
| if any(re.match(p, line, flags=re.IGNORECASE) for p in regex_patterns): | |
| return True | |
| title_keywords = [ | |
| "Article ", "ANNEXE ", "PARTIE ", "Fiches d’évaluation préalable", | |
| "Synthèse", "Conclusion", "TITRE ", "Chapitre ", "Section ", | |
| "I. ", "II. ", "III. ", "1. ", "2. " | |
| ] | |
| return any(keyword.lower() in line.lower() for keyword in title_keywords) | |
| # Fonction de segmentation du texte | |
| def segment_text(text: str, max_words: int = 300, min_words: int = 50) -> List[Dict[str, str]]: | |
| """Découpe le texte en segments avec titres pour Qdrant.""" | |
| blocks = re.split(r'\n\n|\.\s+', text) | |
| segments = [] | |
| current_title = "AUTRE" | |
| current_content = [] | |
| for block in blocks: | |
| block = block.strip() | |
| if not block: | |
| continue | |
| if detect_titles(block): | |
| if current_content: | |
| seg_text = ' '.join(current_content) | |
| if len(seg_text.split()) >= min_words: | |
| segments.append({"title": current_title, "text": seg_text}) | |
| current_content = [] | |
| current_title = block | |
| else: | |
| current_content.append(block) | |
| if current_content: | |
| seg_text = ' '.join(current_content) | |
| if len(seg_text.split()) >= min_words: | |
| segments.append({"title": current_title, "text": seg_text}) | |
| return segments | |
| # Fonction pour préparer les chunks | |
| def prepare_chunks_fixed(text: str, file_name: str, chunk_size=350, overlap=50) -> List[Dict]: | |
| """ | |
| Découpe le texte en chunks fixes (~350 mots ≈ 512 tokens) avec overlap, | |
| en conservant le dernier titre détecté comme métadonnée. | |
| """ | |
| words = text.split() | |
| chunks = [] | |
| start = 0 | |
| current_title = "AUTRE" | |
| while start < len(words): | |
| end = start + chunk_size | |
| chunk_words = words[start:end] | |
| # Met à jour le titre courant si un mot ressemble à un titre | |
| for w in chunk_words: | |
| if detect_titles(w): | |
| current_title = w | |
| if len(chunk_words) >= 50: # filtre chunks trop courts | |
| chunk_text = " ".join(chunk_words) | |
| chunks.append({ | |
| "id": str(uuid.uuid4()), | |
| "text": chunk_text, | |
| "metadata": { | |
| "source": file_name, | |
| "section": current_title, | |
| "position": start, | |
| "word_count": len(chunk_words), | |
| "upload_date": datetime.now().isoformat() | |
| } | |
| }) | |
| start += chunk_size - overlap | |
| return chunks | |
| # Fonction pour l'upload et l'indexation avec logs détaillés | |
| def process_and_index_document(file_path: str, file_type: str, collection_name: str, | |
| qdrant_client=None, embedding_model=None, | |
| progress_callback=None): | |
| """Traite et indexe un document dans SA PROPRE COLLECTION avec suivi de progression et logs détaillés.""" | |
| try: | |
| # 1. Extraction du texte | |
| try: | |
| if file_type == "pdf": | |
| raw_text = extract_text(file_path) | |
| else: | |
| raw_text = extract_text_from_docx(file_path) | |
| if not raw_text: | |
| if progress_callback: | |
| progress_callback(0, 100, "Échec : extraction du texte") | |
| return False | |
| if progress_callback: | |
| progress_callback(5, 100, "Extraction du texte terminée") | |
| except Exception as e: | |
| st.error(f"❌ Erreur extraction: {repr(e)}") | |
| return False | |
| # 2. Nettoyage et segmentation | |
| try: | |
| cleaned_text = clean_text(raw_text) | |
| segments = segment_text(cleaned_text) or [{"title": "Document", "text": cleaned_text}] | |
| if progress_callback: | |
| progress_callback(10, 100, "Nettoyage et segmentation terminés") | |
| except Exception as e: | |
| st.error(f"❌ Erreur nettoyage/segmentation: {repr(e)}") | |
| return False | |
| # 3. Pré-traitement titres | |
| try: | |
| preprocessed_text = preprocess_for_titles(cleaned_text) | |
| except Exception as e: | |
| st.error(f"❌ Erreur preprocess_for_titles: {repr(e)}") | |
| return False | |
| # 4. Chunking | |
| try: | |
| chunks = prepare_chunks_fixed( | |
| preprocessed_text, | |
| file_name=collection_name, | |
| chunk_size=350, # ≈ 512 tokens | |
| overlap=50 | |
| ) | |
| except Exception as e: | |
| st.error(f"❌ Erreur chunking: {repr(e)}") | |
| return False | |
| if progress_callback: | |
| progress_callback(30, 100, f"Préparation des chunks terminée ({len(chunks)} chunks)") | |
| # 5. Génération des embeddings (par petits lots) | |
| texts = [chunk["text"] for chunk in chunks] | |
| embeddings = [] | |
| total_chunks = len(chunks) | |
| for i in range(0, total_chunks, 5): | |
| batch_texts = texts[i:i+5] | |
| try: | |
| batch_embeddings = embedding_model.encode(batch_texts).tolist() | |
| embeddings.extend(batch_embeddings) | |
| except Exception as e: | |
| st.error(f"❌ Erreur génération embeddings: {repr(e)}") | |
| if progress_callback: | |
| progress_callback(0, 100, f"Erreur génération embeddings: {str(e)}") | |
| return False | |
| if progress_callback: | |
| current_chunk = min(i + 5, total_chunks) | |
| progress_callback(30 + int(30 * current_chunk / total_chunks), | |
| 100, | |
| f"Génération des embeddings : {current_chunk}/{total_chunks}") | |
| # 6. Indexation dans Qdrant (par petits lots avec réessais) | |
| points = [ | |
| models.PointStruct( | |
| id=chunk["id"], | |
| vector=embedding, | |
| payload={"text": chunk["text"], **chunk["metadata"]} | |
| ) | |
| for chunk, embedding in zip(chunks, embeddings) | |
| ] | |
| batch_size = 10 | |
| max_retries = 2 | |
| retry_delay = 2 | |
| for i in range(0, len(points), batch_size): | |
| batch = points[i:i + batch_size] | |
| success = False | |
| retry_count = 0 | |
| while retry_count < max_retries and not success: | |
| try: | |
| qdrant_client.upsert( | |
| collection_name=collection_name, | |
| points=batch, | |
| wait=True, | |
| ) | |
| success = True | |
| except Exception as e: | |
| retry_count += 1 | |
| st.error(f"⚠️ Erreur upsert batch {i//batch_size+1}: {repr(e)}") | |
| if progress_callback: | |
| progress_callback(0, 100, | |
| f"Échec batch {i//batch_size + 1}, tentative {retry_count}/{max_retries}") | |
| if retry_count < max_retries: | |
| time.sleep(retry_delay) | |
| if not success: | |
| if progress_callback: | |
| progress_callback(0, 100, f"Échec définitif batch {i//batch_size + 1}") | |
| return False | |
| if progress_callback: | |
| current_point = min(i + batch_size, len(points)) | |
| progress_callback(60 + int(40 * current_point / len(points)), | |
| 100, | |
| f"Indexation : {current_point}/{len(points)} chunks") | |
| if progress_callback: | |
| progress_callback(100, 100, "Indexation terminée avec succès") | |
| st.success("🎉 Document indexé avec succès") | |
| return True | |
| except Exception as e: | |
| if progress_callback: | |
| progress_callback(0, 100, f"Erreur : {str(e)}") | |
| st.error(f"Erreur dans process_and_index_document: {e}") | |
| return False | |
| # --- 1b. Fonctions de recherche --- | |
| # Fonction qui supprime les accents | |
| def normalize_query(query: str) -> str: | |
| # Supprime les accents et normalise en ASCII | |
| return ''.join( | |
| c for c in unicodedata.normalize('NFD', query) | |
| if unicodedata.category(c) != 'Mn' | |
| ) | |
| # Fonction qui extrait le sujet principal de la question | |
| def extract_subject(question: str) -> str: | |
| """ | |
| Extrait le sujet principal d'une question parlementaire sous forme de 3 à 5 mots-clés. | |
| - Supprime toute mention de députés, du Gouvernement ou de formulations inutiles. | |
| - Nettoie la ponctuation et tronque à quelques mots. | |
| """ | |
| url = "https://api.mistral.ai/v1/chat/completions" | |
| headers = { | |
| "Authorization": f"Bearer {MISTRAL_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": "mistral-small-latest", # possibilité de remplacer "small" par "medium" | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "Tu es un assistant qui identifie uniquement le sujet principal " | |
| "d'une question parlementaire. " | |
| "Ne mentionne jamais le député ou le gouvernement. " | |
| "Donne une réponse sous forme de 3 à 5 mots-clés concis, " | |
| "centrés sur le thème (pas de phrase complète)." | |
| ) | |
| }, | |
| { | |
| "role": "user", | |
| "content": question | |
| } | |
| ], | |
| "temperature": 0.2, | |
| "max_tokens": 20 | |
| } | |
| response = requests.post(url, headers=headers, json=payload) | |
| response.raise_for_status() | |
| data = response.json() | |
| try: | |
| subject = data["choices"][0]["message"]["content"].strip() | |
| except (KeyError, IndexError): | |
| subject = "sujet non identifié" | |
| # Nettoyage supplémentaire côté Python | |
| subject = re.sub(r"\b(députée?|député|gouvernement|ministre|assemblée nationale|sénat)\b", "", subject, flags=re.IGNORECASE) | |
| subject = subject.replace("**Sujet principal**", "").strip() | |
| subject = re.sub(r"[^\w\s]", " ", subject) # supprime ponctuation | |
| subject = re.sub(r"\s+", " ", subject) | |
| # Tronquer à 5 mots max | |
| tokens = subject.split() | |
| subject = " ".join(tokens[:5]) | |
| print("=== Sujet extrait (compact) ===", subject) | |
| return subject | |
| # Fonction de recherche Tavily (et filtre pour les scores de pertinence <0.5) | |
| def search_tavily_government(subject: str, min_score: float = 0.5): | |
| """ | |
| Recherche les annonces gouvernementales récentes sur un sujet donné via Tavily. | |
| - Filtre par domaines autorisés | |
| - Filtre par date (moins d'un an si disponible) | |
| - Filtre par score de pertinence (>= min_score) | |
| """ | |
| url = "https://api.tavily.com/search" | |
| headers = {"Authorization": f"Bearer {TAVILY_API_KEY}"} | |
| allowed_domains = [ | |
| "gouvernement.fr", "education.gouv.fr", "vie-publique.fr", "elysee.fr", | |
| "solidarites.gouv.fr", "sante.gouv.fr", "travail-sante-solidarites.gouv.fr", | |
| "securite-sociale.fr", "ameli.fr", "lassuranceretraite.fr", "caf.fr", | |
| "msa.fr", "urssaf.fr", "legifrance.gouv.fr", "drees.solidarites-sante.gouv.fr", | |
| "ars.sante.fr", "cnsa.fr", "en3s.fr", "francetravail.fr" | |
| ] | |
| payload = { | |
| "query": f"dernières annonces gouvernement France {subject}", | |
| "max_results": 100, | |
| "include_answer": True, | |
| "include_domains": allowed_domains | |
| } | |
| response = requests.post(url, json=payload, headers=headers) | |
| response.raise_for_status() | |
| data = response.json() | |
| raw_results = data.get("results", []) | |
| # Filtre par domaine | |
| by_domain = [r for r in raw_results if any(d in r.get("url", "") for d in allowed_domains)] | |
| # Filtre par date (moins d'un an si dispo) | |
| cutoff = datetime.now() - timedelta(days=365) | |
| recent = [] | |
| for r in by_domain: | |
| ds = r.get("published_date") or r.get("date") | |
| if ds: | |
| try: | |
| pub = datetime.fromisoformat(ds.replace("Z", "")) | |
| # ✅ Condition supplémentaire : année 2025 | |
| if pub.year == 2025 or pub >= cutoff: | |
| recent.append(r) | |
| continue | |
| else: | |
| continue | |
| except Exception: | |
| # date non exploitable → on garde | |
| recent.append(r) | |
| else: | |
| recent.append(r) | |
| # Filtre par score | |
| filtered = [r for r in recent if r.get("score", 0) >= min_score] | |
| filtered.sort(key=lambda x: x.get("score", 0), reverse=True) | |
| # ✅ Limiter à 10 après filtrage | |
| data["results"] = filtered[:10] | |
| return data | |
| # Fonction de recherche Google | |
| def search_google_government(subject: str, | |
| min_score: float = 0.5, | |
| max_results: int = 10): | |
| """ | |
| Recherche des informations via Google Custom Search API sur un sujet donné. | |
| - Même structure et format que search_tavily_government | |
| - Entrée: subject (str), min_score, max_results | |
| - Sortie: dict {"results": [{"title","url","content","score","published_date"}]} | |
| """ | |
| url = "https://www.googleapis.com/customsearch/v1" | |
| params = { | |
| "q": f"dernières annonces gouvernement France {subject}", | |
| "key": GOOGLE_API_KEY, | |
| "cx": GOOGLE_CX, | |
| "num": max_results | |
| } | |
| response = requests.get(url, params=params, timeout=20) | |
| response.raise_for_status() | |
| data = response.json() | |
| raw_results = data.get("items", []) | |
| allowed_domains = [ | |
| "gouvernement.fr", "info.gouv.fr", "elysee.fr", "vie-publique.fr", | |
| "education.gouv.fr", "solidarites.gouv.fr", "sante.gouv.fr", | |
| "travail-sante-solidarites.gouv.fr", "securite-sociale.fr", "ameli.fr", | |
| "lassuranceretraite.fr", "caf.fr", "msa.fr", "urssaf.fr", | |
| "legifrance.gouv.fr", "drees.solidarites-sante.gouv.fr", "ars.sante.fr", | |
| "cnsa.fr", "en3s.fr", "francetravail.fr" | |
| ] | |
| # Filtre par domaine | |
| by_domain = [r for r in raw_results if any(d in r.get("link", "") for d in allowed_domains)] | |
| # Filtre par date (moins d'un an si dispo) | |
| cutoff = datetime.now() - timedelta(days=365) | |
| recent = [] | |
| for r in by_domain: | |
| date_str = r.get("pagemap", {}).get("metatags", [{}])[0].get("article:published_time") | |
| if date_str: | |
| try: | |
| pub = datetime.fromisoformat(date_str.replace("Z", "")) | |
| if pub >= cutoff: | |
| recent.append(r) | |
| except Exception: | |
| recent.append(r) | |
| else: | |
| recent.append(r) | |
| # Filtre par score (Google ne fournit pas de score → fallback = 1.0 si snippet présent) | |
| filtered = [r for r in recent if r.get("snippet")] | |
| filtered = filtered[:max_results] | |
| # Format homogène comme Tavily | |
| results = [] | |
| for r in filtered: | |
| results.append({ | |
| "title": r.get("title", ""), | |
| "url": r.get("link", ""), | |
| "content": r.get("snippet", ""), # Tavily utilisait "content" | |
| "score": 1.0, # Valeur par défaut | |
| "published_date": date_str if r.get("pagemap", {}).get("metatags") else None | |
| }) | |
| return {"results": results} | |
| # --- Fonction de log pour le debug --- | |
| def log_debug(title: str, data: Any, max_length: int = 500): | |
| """Affiche un log de debug dans un fichier.""" | |
| import os | |
| from datetime import datetime | |
| # Chemin absolu vers le dossier de logs (dans le répertoire courant) | |
| log_dir = os.path.join(os.getcwd(), "logs") | |
| # Créer le dossier s'il n'existe pas | |
| if not os.path.exists(log_dir): | |
| os.makedirs(log_dir) | |
| print(f"📁 Dossier créé : {log_dir}") | |
| # Chemin absolu vers le fichier de log | |
| log_file = os.path.join(log_dir, "debug_logs.txt") | |
| try: | |
| with open(log_file, "a", encoding="utf-8") as f: | |
| f.write(f"\n--- {title} ---\n") | |
| f.write(f"Timestamp: {datetime.now().isoformat()}\n") | |
| if isinstance(data, dict): | |
| for k, v in data.items(): | |
| f.write(f"{k}: {str(v)[:max_length]}\n") | |
| elif isinstance(data, list): | |
| f.write(f"List of {len(data)} items:\n") | |
| for i, item in enumerate(data[:3]): | |
| f.write(f" Item {i}: {str(item)[:max_length]}\n") | |
| else: | |
| f.write(f"{str(data)[:max_length]}\n") | |
| f.write("---\n") | |
| # Confirmation dans le terminal | |
| print(f"✅ Log enregistré dans {log_file}: {title}") | |
| except Exception as e: | |
| print(f"❌ Erreur lors de l'écriture du log : {str(e)}") | |
| # Fonctions utilitaires car le champ base_legislative peut contenir soit des objets BaseLegislativeRef, soit des dicts (selon la provenance des données) | |
| def get_ref_uid(ref) -> str | None: | |
| return ref.uid if hasattr(ref, "uid") else ref.get("uid") | |
| def get_ref_collection(ref, default_collection: str) -> str: | |
| return ref.collection if hasattr(ref, "collection") else ref.get("collection", default_collection) | |
| # Préparer pour préparer le contenu en vue d'un export .txt | |
| def build_export_content(response_data: dict, mode: str, include_legal_articles: bool = False) -> str: | |
| lines = [] | |
| # Question | |
| question = response_data.get("question") | |
| if question: | |
| lines.append("❓ Question\n") | |
| lines.append(str(question)) | |
| lines.append("\n\n") | |
| # En-tête (Réponse ou Analyse) | |
| if mode == "analyse": | |
| lines.append("🔎 Analyse juridique\n") | |
| else: | |
| lines.append("📜 Réponse\n") | |
| lines.append(str(response_data.get("response", "Pas de texte généré"))) | |
| lines.append("\n\n") | |
| # Résumé (si présent) | |
| summary = response_data.get("summary") | |
| if summary: | |
| lines.append("📝 Résumé\n") | |
| lines.append(str(summary)) | |
| lines.append("\n\n") | |
| # Résultats de recherche (si présents) | |
| search_results = response_data.get("search_results", []) | |
| if search_results: | |
| lines.append("🌐 Résultats de recherche\n") | |
| for idx, item in enumerate(search_results, start=1): | |
| titre = item.get("title", "Sans titre") | |
| url = item.get("url", "") | |
| extrait = item.get("content") or item.get("snippet") or "" | |
| score = item.get("score", "N/A") | |
| date = item.get("published_date", "N/A") | |
| lines.append(f"{idx}. {titre}\n") | |
| if url: | |
| lines.append(f" Lien: {url}\n") | |
| if extrait: | |
| lines.append(f" Extrait: {extrait}\n") | |
| lines.append(f" Score: {score} | Date: {date}\n\n") | |
| # Anciennes QE (si présentes) | |
| lines.append("🏛️ Anciennes QE\n") | |
| for doc in response_data.get("similar_documents", []): | |
| score = f"{getattr(doc, 'score', 0):.2f}" if getattr(doc, "score", None) is not None else "N/A" | |
| chambre = getattr(doc, "chambre", "Inconnue") | |
| lines.append(f"- QE {doc.uid} ({chambre}) - Score : {score}\n") | |
| lines.append(f" Question: {doc.question}\n") | |
| lines.append(f" Réponse: {doc.reponse}\n\n") | |
| # Articles juridiques (pour les deux modes) | |
| if mode == "analyse" or include_legal_articles: | |
| lines.append("⚖️ Articles juridiques\n") | |
| # Mode "Analyse juridique" : utilise "sources" (avec relations parents/enfants) | |
| if mode == "analyse": | |
| sources = response_data.get("sources", []) | |
| if not sources: | |
| lines.append("Aucun article juridique enregistré.\n") | |
| else: | |
| for source in sources: | |
| art = source["article"] | |
| lines.append(f"--- Article {art.num} ({art.collection}) ---\n") | |
| lines.append(f"Titre: {art.titre}\n") | |
| lines.append(f"Texte: {art.article_complet}\n") | |
| # Relations parents (articles cités) | |
| if source.get("parents"): | |
| lines.append("Articles cités:\n") | |
| for parent in source["parents"]: | |
| lines.append(f"- {parent.num}: {parent.titre}\n") | |
| # Relations enfants (référencé par) | |
| if source.get("enfants"): | |
| lines.append("Référencé par:\n") | |
| for enfant in source["enfants"]: | |
| lines.append(f"- {enfant.num}: {enfant.titre}\n") | |
| lines.append("\n") | |
| # Mode "Réponse parlementaire" : utilise "legal_sources" (sans relations) | |
| else: | |
| legal_sources = response_data.get("legal_sources", []) | |
| if not legal_sources: | |
| lines.append("Aucun texte juridique cité.\n") | |
| else: | |
| for art in legal_sources: | |
| lines.append(f"- Article {getattr(art, 'num', 'N/A')} ({getattr(art, 'collection', 'N/A')})\n") | |
| if getattr(art, "titre", None): | |
| lines.append(f" Titre: {art.titre}\n") | |
| contenu = art.article_complet if hasattr(art, 'article_complet') else getattr(art, 'contenu', '') | |
| lines.append(" Texte:\n" + str(contenu) + "\n\n") | |
| return "\n".join(str(x) for x in lines) | |
| # Fonction qui construit un objet RetrievedLegalDocument à partir d'un payload Qdrant | |
| def make_retrieved_document(payload: dict, uid: str, collection: str, score: float = 0) -> RetrievedLegalDocument: | |
| """Construit un objet RetrievedLegalDocument à partir d'un payload Qdrant.""" | |
| return RetrievedLegalDocument( | |
| chunk_id=uid, | |
| num=payload.get("num", ""), | |
| titre=payload.get("titre", ""), | |
| contenu=payload.get("contenu", ""), | |
| article_complet=payload.get("article_complet", payload.get("contenu", "")), | |
| contexte_hierarchique=payload.get("contexte_hierarchique", ""), | |
| collection=collection, | |
| partie=payload.get("partie"), | |
| livre=payload.get("livre"), | |
| titre_structure=payload.get("titre_structure"), | |
| chapitre=payload.get("chapitre"), | |
| section=payload.get("section"), | |
| sous_section=payload.get("sous_section"), | |
| paragraphe=payload.get("paragraphe"), | |
| sous_paragraphe=payload.get("sous_paragraphe"), | |
| base_legislative=payload.get("base_legislative", []), | |
| score=score | |
| ) | |
| # Fonction qui recherche des articles juridiques dans les collections Qdrant en utilisant des embeddings | |
| def search_articles( | |
| query: str, | |
| partie: Optional[str] = None, | |
| limit: int = 5, | |
| must_contain: Optional[str] = None, | |
| debug: bool = False, | |
| threshold: float = 0.0 | |
| ) -> Dict[str, Any]: | |
| """Recherche optimisée d'articles juridiques dans Qdrant.""" | |
| target_collections = ["CASF", "Code du travail", "Code de la santé publique", "Code de la sécurité sociale"] | |
| collections = qdrant_client.get_collections() | |
| valid_collections = [c.name for c in collections.collections if c.name in target_collections] | |
| if not valid_collections: | |
| return {"sources": [], "total": 0, "limit": limit, "offset": 0} | |
| try: | |
| # Recherche vectorielle | |
| embedding = embedding_model.encode(query).tolist() | |
| query_filter = models.Filter( | |
| must=[models.FieldCondition(key="partie", match=models.MatchValue(value=partie))] | |
| ) if partie else None | |
| all_results = [] | |
| for collection in valid_collections: | |
| try: | |
| hits = qdrant_client.search( | |
| collection_name=collection, | |
| query_vector=embedding, | |
| query_filter=query_filter, | |
| limit=limit, | |
| with_payload=True, | |
| with_vectors=False | |
| ) | |
| all_results.extend(hits) | |
| except Exception as e: | |
| continue | |
| # Filtrage par score | |
| results = [r for r in all_results if r.score >= threshold] | |
| # Filtrage par mot-clé | |
| if must_contain: | |
| results = [ | |
| r for r in results | |
| if must_contain.lower() in r.payload.get("contenu", "").lower() | |
| ] | |
| if not results: | |
| return {"sources": [], "total": 0, "limit": limit, "offset": 0} | |
| # Mapping vers RetrievedLegalDocument | |
| documents: List[RetrievedLegalDocument] = [] | |
| for result in results: | |
| try: | |
| # Utilise payload["chunk_id"] (str) au lieu de result.id (int/UUID) | |
| chunk_id = result.payload.get("chunk_id") # Ex: "L111-1_chunk0" | |
| if not chunk_id: | |
| chunk_id = result.payload.get("uid", str(result.id)) # Fallback pour les collections sans chunk_id | |
| article = make_retrieved_document( | |
| payload=result.payload, | |
| uid=chunk_id, # ← Ici, chunk_id est TOUJOURS une chaîne | |
| collection=result.payload.get("collection", ""), | |
| score=result.score | |
| ) | |
| documents.append(article) | |
| except Exception as e: | |
| continue | |
| # Déduplication par numéro | |
| seen = set() | |
| unique_documents = [] | |
| for doc in documents: | |
| if doc.num not in seen: | |
| seen.add(doc.num) | |
| unique_documents.append(doc) | |
| # Tri par numéro | |
| unique_documents.sort(key=lambda d: d.num) | |
| return { | |
| "sources": unique_documents[:limit], | |
| "total": len(unique_documents), | |
| "limit": limit, | |
| "offset": 0 | |
| } | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return {"sources": [], "total": 0, "limit": limit, "offset": 0} | |
| # Fonction pour construire un arbre législatif | |
| def build_legislative_tree(sources: List[RetrievedLegalDocument]) -> Dict[str, Any]: | |
| """ | |
| Version corrigée qui: | |
| 1. Évite de modifier le dictionnaire pendant l'itération | |
| 2. Utilise une copie des clés pour l'itération | |
| 3. Conserve tous les logs et la structure uniforme | |
| """ | |
| # 1. Ajouter tous les articles sources (déjà des objets) | |
| enrichis = {} | |
| for art in sources: | |
| enrichis[art.num] = { | |
| "article": art, | |
| "parents": [], | |
| "enfants": [], | |
| "type": art.num[0] if art.num else "?" | |
| } | |
| # 2. Pour chaque article source, récupérer les articles de même groupe hiérarchique | |
| for art in sources: | |
| # Priorité des niveaux hiérarchiques (du plus précis au plus général) | |
| hierarchy_levels = [ | |
| ("sous_paragraphe", art.sous_paragraphe), | |
| ("paragraphe", art.paragraphe), | |
| ("sous_section", art.sous_section), | |
| ("section", art.section), | |
| ("chapitre", art.chapitre) | |
| ] | |
| for level_key, level_value in hierarchy_levels: | |
| if not level_value: | |
| continue | |
| try: | |
| flt = models.Filter( | |
| must=[ | |
| models.FieldCondition( | |
| key=level_key, | |
| match=models.MatchText(text=level_value) | |
| ), | |
| models.FieldCondition( | |
| key="collection", | |
| match=models.MatchValue(value=art.collection) | |
| ) | |
| ] | |
| ) | |
| points, _ = qdrant_client.scroll( | |
| collection_name=art.collection, | |
| scroll_filter=flt, | |
| limit=100, | |
| with_payload=True, | |
| with_vectors=False, | |
| ) | |
| for point in points: | |
| payload = point.payload | |
| art_num = payload.get("num") | |
| if not art_num or art_num in enrichis: | |
| continue | |
| new_art = make_retrieved_document( | |
| payload=payload, | |
| uid=payload.get("chunk_id", str(point.id)), | |
| collection=payload.get("collection", art.collection), | |
| score=0 | |
| ) | |
| enrichis[art_num] = { | |
| "article": new_art, | |
| "parents": [], | |
| "enfants": [], | |
| "type": art_num[0] if art_num else "?" | |
| } | |
| break | |
| except Exception as e: | |
| continue | |
| # 3. Gestion des références (base_legislative) pour TOUS les articles | |
| # ✅ Solution: Itérer sur une COPIE des clés pour éviter de modifier le dict pendant l'itération | |
| for art_uid in list(enrichis.keys()): # ← COPIE des clés ! | |
| art_data = enrichis[art_uid] | |
| art = art_data["article"] | |
| for ref in art.base_legislative or []: | |
| ref_uid = ref.uid if hasattr(ref, 'uid') else ref.get('uid') | |
| if not ref_uid or ref_uid in enrichis: | |
| continue | |
| try: | |
| ref_collection = ref.collection if hasattr(ref, 'collection') else art.collection | |
| ref_points, _ = qdrant_client.scroll( | |
| collection_name=ref_collection, | |
| scroll_filter=models.Filter( | |
| must=[models.FieldCondition( | |
| key="num", | |
| match=models.MatchValue(value=ref_uid) | |
| )] | |
| ), | |
| limit=1, | |
| with_payload=True, | |
| with_vectors=False, | |
| ) | |
| if ref_points: | |
| ref_payload = ref_points[0].payload | |
| ref_art = make_retrieved_document( | |
| payload=ref_payload, | |
| uid=ref_payload.get("chunk_id", str(ref_points[0].id)), | |
| collection=ref_payload.get("collection", ref_collection), | |
| score=0 | |
| ) | |
| enrichis[ref_uid] = { # ✅ Ajout d'un nouvel élément au dict | |
| "article": ref_art, | |
| "parents": [], | |
| "enfants": [], | |
| "type": ref_uid[0] if ref_uid else "?" | |
| } | |
| # Ajout des relations parent/enfant | |
| art_data["parents"].append(enrichis[ref_uid]["article"]) | |
| enrichis[ref_uid]["enfants"].append(art_data["article"]) | |
| except Exception as e: | |
| continue | |
| return enrichis | |
| # Fonction de tri des numéros | |
| def sort_key_num(article_data: Dict[str, Any]) -> tuple: | |
| """Clé de tri pour les numéros d'article (ex: L241-3).""" | |
| num = article_data["article"].num # Accès direct à l'attribut Pydantic | |
| m = re.match(r"[LRD](\d+)(?:-(\d+))?", num) | |
| if m: | |
| base = int(m.group(1)) | |
| suffix = int(m.group(2)) if m.group(2) else 0 | |
| return (base, suffix) | |
| return (float("inf"), float("inf")) | |
| # Fonction qui ajoute un article dans l'arbre partagé selon les niveaux hiérarchiques | |
| def add_to_tree(tree: dict, article: RetrievedLegalDocument, item: dict): | |
| """ | |
| Ajoute un article dans l'arbre hiérarchique avec ses relations. | |
| - article: Objet RetrievedLegalDocument (pour la hiérarchie) | |
| - item: Dictionnaire COMPLET avec parents/enfants (pour les relations) | |
| """ | |
| # Construire la hiérarchie depuis les champs de l'article | |
| levels = [ | |
| article.collection, | |
| article.partie, | |
| article.livre, | |
| article.titre_structure, | |
| article.chapitre, | |
| article.section, | |
| article.sous_section, | |
| article.paragraphe, | |
| article.sous_paragraphe, | |
| ] | |
| # Filtrer les niveaux vides | |
| levels = [lvl for lvl in levels if lvl] | |
| # Naviguer dans l'arbre | |
| node = tree | |
| for label in levels: | |
| node = node.setdefault(label, {}) | |
| # Ajouter l'article AVEC SES RELATIONS | |
| node.setdefault("_items", []).append(item) # ✅ item contient parents/enfants | |
| # Fonction qui affiche récursivement l'arbre sous forme d'expanders | |
| def render_tree(container, node: dict, level: int = 0): | |
| """ | |
| Version simplifiée qui affiche : | |
| 1. Tous les articles dans une arborescence unique | |
| 2. Pour chaque article : uniquement "Référencé par" et "Articles cités" | |
| """ | |
| # Styles CSS | |
| container.markdown(""" | |
| <style> | |
| .article-title { font-weight: bold; margin-bottom: 5px; } | |
| .article-body { font-size: 14px; line-height: 1.4; margin-bottom: 10px; } | |
| .relation-section { margin-top: 10px; margin-bottom: 10px; } | |
| .relation-title { font-weight: bold; color: #333; } | |
| .relation-item { margin-left: 15px; color: #555; } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Affichage des articles avec tri numérique | |
| for data in sorted(node.get("_items", []), | |
| key=lambda x: [ | |
| int(part) if part.isdigit() else part | |
| for part in x["article"].num.split('-') | |
| ]): | |
| art = data["article"] | |
| with container.expander(f"📜 {art.num} - {art.titre}"): | |
| # Contenu de l'article | |
| container.markdown(f'<div class="article-body">{art.article_complet}</div>', unsafe_allow_html=True) | |
| # Section "Référencé par" (anciennement "Enfants") | |
| if data.get("enfants"): | |
| container.markdown('<div class="relation-section">' | |
| '<div class="relation-title">👶 Référencé par :</div>', unsafe_allow_html=True) | |
| for enfant in data["enfants"]: | |
| container.markdown(f'<div class="relation-item">- {enfant.num} : {enfant.titre}</div>', | |
| unsafe_allow_html=True) | |
| # Section "Articles cités" (anciennement "Parents") | |
| if data.get("parents"): | |
| container.markdown('<div class="relation-section">' | |
| '<div class="relation-title">📚 Articles cités :</div>', unsafe_allow_html=True) | |
| for parent in data["parents"]: | |
| container.markdown(f'<div class="relation-item">- {parent.num} : {parent.titre}</div>', | |
| unsafe_allow_html=True) | |
| # Navigation hiérarchique (uniquement pour l'organisation visuelle) | |
| for label, child in sorted(((k, v) for k, v in node.items() if k != "_items"), key=lambda x: x[0]): | |
| with container.expander(f"📁 {label}"): | |
| render_tree(container, child, level + 1) | |
| # Fonction de recherches de documents dans tout le RAG (hors codes et jeu de données QE) pour alimenter l'API | |
| def search_uploaded_documents( | |
| query: str, | |
| qdrant_client: Any, | |
| embedding_model: Any, | |
| selected_collections: List[str] = None, | |
| top_k: int = 5, | |
| top_k_selected = 10, | |
| ) -> List[Dict]: | |
| """ | |
| Recherche dans les collections-documents (1 collection = 1 document). | |
| Args: | |
| query: Requête de recherche. | |
| qdrant_client: Client Qdrant. | |
| embedding_model: Modèle d'embedding. | |
| selected_collections: Liste des collections à rechercher (optionnel). | |
| top_k: Nombre de résultats max si aucune collection sélectionnée. | |
| top_k_selected: Nombre de résultats max si collections sélectionnées. | |
| Returns: | |
| Liste de dicts avec les champs essentiels. | |
| """ | |
| query_embedding = embedding_model.encode(query).tolist() | |
| all_results = [] | |
| try: | |
| # 1. Récupère les collections à rechercher | |
| protected = { | |
| "QuestionParlementaire", | |
| "Code de la sécurité sociale", | |
| "Code du travail", | |
| "CASF", | |
| "Code de la santé publique", | |
| } | |
| collections = qdrant_client.get_collections() | |
| doc_collections = [col.name for col in collections.collections if col.name not in protected] | |
| # 2. Limite aux collections sélectionnées si spécifiées | |
| if selected_collections: | |
| doc_collections = [col for col in doc_collections if col in selected_collections] | |
| # le nombre de chunks retournés passe à 10 si la recherche est limité sur une ou plusieurs collections | |
| top_k = top_k_selected | |
| if not doc_collections: | |
| doc_collections = [col for col in doc_collections if col not in protected] | |
| # 3. Recherche dans chaque collection-document | |
| for collection in doc_collections: | |
| try: | |
| results = qdrant_client.search( | |
| collection_name=collection, | |
| query_vector=query_embedding, | |
| limit=top_k, | |
| with_payload=True, | |
| with_vectors=False, | |
| ) | |
| for result in results: | |
| payload = result.payload or {} | |
| all_results.append({ | |
| "collection": collection, | |
| "text": payload.get("text", ""), | |
| "score": float(result.score) if hasattr(result, "score") else None, | |
| "title": payload.get("section"), | |
| }) | |
| except Exception as e: | |
| st.warning(f"Erreur sur {collection}: {e}") | |
| # 4. Trie par score et limite les résultats | |
| all_results.sort(key=lambda x: (x["score"] is not None, x["score"]), reverse=True) | |
| return all_results[:top_k] | |
| except Exception as e: | |
| st.error(f"Erreur de recherche dans les documents uploadés: {e}") | |
| return [] | |
| # Fonction qui ajuste la taille du contexte issu des "autres collections" à la pertinence (score) du retrieval | |
| def format_uploaded_docs_by_relevance( | |
| uploaded_results: List[Dict], | |
| min_score: float = 0.7, | |
| max_docs: int = 5, | |
| max_length: int = 400 | |
| ) -> str: | |
| """ | |
| Trie les résultats par score et formate les extraits textuels pour enrichir le prompt. | |
| - uploaded_results : liste de dictionnaires renvoyés par la recherche vectorielle | |
| - min_score : seuil de pertinence minimum | |
| - max_docs : nombre maximum de documents à inclure | |
| - max_length : longueur maximale de l'extrait | |
| """ | |
| if not uploaded_results: | |
| return "" | |
| # Filtrer par score | |
| filtered = [doc for doc in uploaded_results if doc.get("score", 0) >= min_score] | |
| # Trier par score décroissant | |
| filtered.sort(key=lambda d: d.get("score", 0), reverse=True) | |
| # Limiter le nombre de docs | |
| filtered = filtered[:max_docs] | |
| formatted = [] | |
| for doc in filtered: | |
| passage = doc.get("text", "") | |
| source = doc.get("collection", "inconnu").replace("_", " ") | |
| title = doc.get("title", "") or "" | |
| score = doc.get("score", 0) | |
| formatted.append( | |
| f"Source: {source} (Section: {title})\n" | |
| f"Passage: {passage[:max_length]}...\n" | |
| f"(Score: {score:.2f})\n" | |
| ) | |
| return "\n---\n".join(formatted) | |
| # Fonction de recherches d'anciennes questions / réponses dans le RAG Qdrant | |
| def search_question_parlementaire(query: str, top_k: int = 5) -> List[ResponseDocument]: | |
| embedding = embedding_model.encode(query).tolist() | |
| hits = qdrant_client.search( | |
| collection_name="QuestionParlementaire", | |
| query_vector=embedding, | |
| limit=top_k, | |
| with_payload=True, | |
| with_vectors=False | |
| ) | |
| results: List[ResponseDocument] = [] | |
| for i, r in enumerate(hits): | |
| p = r.payload or {} | |
| try: | |
| # Normalisation sécurisée des champs | |
| uid = str(p.get("uid", "")) if p.get("uid") is not None else "" | |
| legislature = str(p.get("legislature")) if p.get("legislature") is not None else None | |
| ministeres = p.get("ministeres_attribues") | |
| if isinstance(ministeres, str): | |
| ministeres = [ministeres] | |
| elif not isinstance(ministeres, list): | |
| ministeres = [] | |
| textes_juridiques = p.get("textes_juridiques") | |
| if isinstance(textes_juridiques, str): | |
| textes_juridiques = [textes_juridiques] | |
| elif not isinstance(textes_juridiques, list): | |
| textes_juridiques = [] | |
| # Création du document | |
| doc = ResponseDocument( | |
| uid=uid, | |
| question=p.get("question", ""), | |
| reponse=p.get("reponse", ""), | |
| legislature=legislature, | |
| chambre=p.get("chambre"), | |
| rubrique=p.get("rubrique"), | |
| analyse=p.get("analyse"), | |
| ministeres_attribues=ministeres, | |
| date_question=p.get("date_question"), | |
| date_reponse=p.get("date_reponse"), | |
| textes_juridiques=textes_juridiques, | |
| score=r.score | |
| ) | |
| results.append(doc) | |
| except Exception as e: | |
| continue # Passe au résultat suivant | |
| return results | |
| # Fonction qui extrait un ordre numérique à partir d'un label en chiffres romains | |
| ROMAN_MAP = { | |
| "I":1,"II":2,"III":3,"IV":4,"V":5,"VI":6,"VII":7,"VIII":8,"IX":9,"X":10, | |
| "XI":11,"XII":12,"XIII":13,"XIV":14,"XV":15,"XVI":16,"XVII":17,"XVIII":18,"XIX":19,"XX":20 | |
| } | |
| def extract_order(label: str) -> int: | |
| if not label: | |
| return float("inf") | |
| m_roman = re.search(r"\b(Ier|[IVXLCDM]+)\b", label) | |
| if m_roman: | |
| return 1 if m_roman.group(1) == "Ier" else ROMAN_MAP.get(m_roman.group(1), float("inf")) | |
| m_num = re.search(r"\b(\d+)\b", label) | |
| if m_num: | |
| return int(m_num.group(1)) | |
| return float("inf") | |
| # Construit un prompt pour Mistral Large afin de générer une analyse juridique. | |
| def build_legal_analysis_prompt(question: str, articles: List[RetrievedLegalDocument], stats: dict) -> str: | |
| # Construit un prompt pour Mistral Large afin de générer une analyse juridique. | |
| # Préparation des articles sous forme de contexte avec troncature brute | |
| max_chars = 70000 # limite totale pour les articles (≈ 3500-4000 tokens) | |
| articles_context = [] | |
| total_chars = 0 | |
| for article in articles: | |
| uid = article.num | |
| titre = article.titre | |
| contenu = article.article_complet # ou article.contenu selon votre besoin | |
| if total_chars + len(contenu) > max_chars: | |
| # Tronquer le dernier article pour ne pas dépasser la limite | |
| allowed = max_chars - total_chars | |
| truncated = contenu[:allowed] | |
| articles_context.append(f"### Article {uid}: {titre}\n{truncated}\n[Texte tronqué]\n") | |
| break | |
| else: | |
| articles_context.append(f"### Article {uid}: {titre}\n{contenu}\n") | |
| total_chars += len(contenu) | |
| articles_str = "\n".join(articles_context) | |
| # Consignes pour Mistral | |
| prompt = f""" | |
| [INST] | |
| **Consignes strictes pour une analyse juridique complète :** | |
| 1. **Structure obligatoire** à respecter impérativement : | |
| - Introduction (50-100 mots) : rappel du contexte juridique de la question | |
| - Analyse détaillée (80-90% du contenu) : | |
| * L'analyse doit être faite exclusivement à partir des articles suivants : {articles_str} | |
| * Présentation des principes généraux avec citations précises d'un maximum d'articles | |
| * Présentation des enjeux | |
| - Conclusion synthétique (100-150 mots) | |
| - Toute réponse qui se termine par une phrase tronquée sera considérée comme irrecevable | |
| 2. **Exigences de complétude** : | |
| - Toute phrase doit être grammaticalement complète | |
| - La dernière phrase doit impérativement résumer un point clé | |
| - Si le développement dépasse la limite, prioriser : | |
| 1. Les principes fondamentaux | |
| 2. Les exceptions majeures | |
| 3. Un exemple concret d'application | |
| 3. **Style requis** : | |
| - Style très concis | |
| - Phrases courtes (20-25 mots max) | |
| - Un paragraphe = une idée juridique précise | |
| - Citations systématiques des articles (ex: "L'article R241-12 précise que...") | |
| - Éviter les formules vagues ("certains cas", "parfois") → préciser les conditions | |
| **Question à analyser** : | |
| {question} | |
| [/INST] | |
| """ | |
| return prompt | |
| # Appelle l'API Mistral Large pour générer une analyse juridique ##### 4000 TOKENS DEFINIS ICI ###### | |
| def call_mistral_legal_analysis( | |
| prompt: str, | |
| max_tokens: int = 4000, | |
| temperature: float = 0.3, | |
| model_size: str = "small" # "large", "medium", "small" | |
| ): | |
| # Appelle l'API Mistral avec chainage automatique pour les réponses tronquées. | |
| mistral_api_url = "https://api.mistral.ai/v1/chat/completions" | |
| headers = { | |
| "Authorization": f"Bearer {MISTRAL_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| # Construire le nom du modèle dynamiquement | |
| model_name = f"mistral-{model_size}-latest" | |
| def is_complete(response_text): | |
| # Vérifie si une réponse est grammaticalement complète | |
| if not response_text: | |
| return False | |
| last_char = response_text[-1] | |
| last_sentence = response_text.split('.')[-1].strip() | |
| return ( | |
| (last_char in ('.', '!', '?')) and | |
| (len(last_sentence.split()) > 3) and | |
| (not last_sentence.endswith((':', ';', ','))) and | |
| (not response_text.endswith(('...', '–', '—'))) | |
| ) | |
| def complete_response(truncated_response): | |
| # Termine une réponse tronquée | |
| completion_prompt = f""" | |
| [INST] | |
| Terminez cette analyse juridique de manière complète et professionnelle. | |
| Analyse en cours: "{truncated_response[-500:]}" # Derniers 500 caractères | |
| Consignes strictes: | |
| 1. Résumez en 1 phrase le point juridique en cours | |
| 2. Ajoutez 2-3 phrases de conclusion qui: | |
| - Synthétisent les points clés | |
| - Proposent une application pratique | |
| - Se terminent impérativement par un point | |
| 3. Utilisez un style formel: "En conséquence...", "Ainsi, il ressort que...", etc. | |
| [/INST] | |
| """ | |
| try: | |
| completion_response = requests.post( | |
| mistral_api_url, | |
| headers=headers, | |
| json={ | |
| "model": model_name, # <-- correction ici | |
| "messages": [{"role": "user", "content": completion_prompt}], | |
| "max_tokens": 500, | |
| "temperature": 0.2, | |
| "stop": ["."] | |
| }, | |
| timeout=30 | |
| ) | |
| completion_response.raise_for_status() | |
| completion = completion_response.json()["choices"][0]["message"]["content"].strip() | |
| # if is_complete(completion): | |
| return truncated_response + "\n\n" + completion | |
| # else: | |
| # return truncated_response + "\n\nCette analyse couvre succinctement les principaux aspects juridiques de la question posée." | |
| except Exception as e: | |
| return truncated_response + f"\n\n[Note: La conclusion de cette analyse a été synthétisée. Erreur technique: {str(e)}]" | |
| try: | |
| response = requests.post( | |
| mistral_api_url, | |
| headers=headers, | |
| json={ | |
| "model": model_name, # <-- correction ici aussi | |
| "messages": [{"role": "user", "content": prompt}], | |
| "temperature": temperature, | |
| "max_tokens": max_tokens, | |
| "top_p": 0.9, | |
| "frequency_penalty": 0.1, | |
| "presence_penalty": 0.1 | |
| }, | |
| timeout=120 | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| analysis = data["choices"][0]["message"]["content"].strip() | |
| if not is_complete(analysis): | |
| analysis = complete_response(analysis) | |
| return analysis | |
| except requests.exceptions.RequestException as e: | |
| return f"Erreur lors de l'appel à l'API Mistral: {str(e)}" | |
| # Fonction de génération d'analyse juridique | |
| def generate_legal_analysis( | |
| question: str, | |
| must_contain: str = "", | |
| max_articles: int = 5, | |
| threshold: float = 0.5, | |
| model_size: str = "small", | |
| search_button: bool = False, | |
| generate_analysis_button: bool = False | |
| ) -> dict: | |
| try: | |
| import sys | |
| from io import StringIO | |
| old_stdout = sys.stdout | |
| sys.stdout = captured_output = StringIO() | |
| # Recherche des articles (avec cache) | |
| if "last_articles_search" not in st.session_state or search_button: | |
| st.session_state.last_articles_search = search_articles( | |
| query=question, | |
| limit=max_articles, | |
| must_contain=must_contain if must_contain else None, | |
| debug=True, | |
| threshold=threshold | |
| ) | |
| articles = st.session_state.last_articles_search | |
| # Préparation des sources enrichies | |
| enrichis = build_legislative_tree(articles["sources"]) | |
| stats = enrichis.pop('stats', {}) | |
| # Formatage des sources pour l'affichage et l'analyse | |
| sources = [] | |
| for uid, data in enrichis.items(): | |
| if isinstance(data, dict) and "article" in data: | |
| sources.append({ | |
| "uid": uid, | |
| "article": data["article"], | |
| "type": data.get("type"), | |
| "parents": data.get("parents", []), | |
| "enfants": data.get("enfants", []) | |
| }) | |
| # Fallback si aucun enrichissement | |
| if not sources: | |
| sources = [{ | |
| "uid": art.num, | |
| "article": art, | |
| "type": art.num[0] if art.num else "?", | |
| "parents": [], | |
| "enfants": [] | |
| } for art in articles["sources"]] | |
| # Gestion des cas d'erreur | |
| if not sources or len(sources) == 0: | |
| st.warning("Aucun article juridique valide après traitement.") | |
| return { | |
| "response": "Aucun article juridique valide après traitement.", | |
| "sources": [], | |
| "similar_documents": [], | |
| "debug_logs": captured_output.getvalue(), | |
| "stats": stats | |
| } | |
| # Mode "Recherche" (affichage des articles) | |
| if search_button: | |
| return { | |
| "response": "Voir les articles dans l'onglet sources.", | |
| "sources": sources, | |
| "similar_documents": [], | |
| "debug_logs": captured_output.getvalue(), | |
| "stats": stats | |
| } | |
| # Mode "Génération" (appel à Mistral) | |
| elif generate_analysis_button: | |
| prep_placeholder = st.empty() | |
| prep_placeholder.markdown( | |
| ''' | |
| <div class="prep-message"> | |
| 🔧 Production de l'analyse en cours<span id="dots">...</span> | |
| </div> | |
| <script> | |
| const dots = document.getElementById('dots'); | |
| let dotCount = 0; | |
| const interval = setInterval(() => { | |
| dotCount = (dotCount + 1) % 4; | |
| dots.textContent = '.'.repeat(dotCount); | |
| }, 500); | |
| </script> | |
| ''', | |
| unsafe_allow_html=True | |
| ) | |
| # Appel à Mistral avec les articles originaux (non enrichis) | |
| prompt = build_legal_analysis_prompt(question, articles["sources"], stats) | |
| legal_analysis = call_mistral_legal_analysis( | |
| prompt, | |
| max_tokens=4000, | |
| temperature=0.3, | |
| model_size=model_size | |
| ) | |
| # Efface le message de chargement | |
| prep_placeholder.empty() | |
| return { | |
| "response": legal_analysis, | |
| "sources": sources, # Sources enrichies pour l'affichage | |
| "similar_documents": [], | |
| "debug_logs": captured_output.getvalue(), | |
| "stats": stats | |
| } | |
| finally: | |
| sys.stdout = old_stdout | |
| # Construit le prompt d'appel à Mistral | |
| def build_parlementary_response_prompt( | |
| question: str, | |
| parliamentary_context: str, | |
| legal_context: str, | |
| uploaded_documents: str, | |
| detail_juridique: int, | |
| longueur: str, | |
| response_orientation: str, | |
| custom_instructions: str, | |
| search_context: str | |
| ) -> str: | |
| # Construit un prompt optimisé avec contexte parlementaire ET juridique. | |
| # Mapping des orientations de réponse | |
| orientation_mapping = { | |
| "Répondre de façon neutre": | |
| "Adoptez un ton neutre et factuel. " | |
| "Commencez par **souligner l'importance du sujet** pour le Gouvernement, sans reprendre les termes critiques du parlementaire. " | |
| "Utilisez des formulations comme : " | |
| "'Ce sujet est une priorité pour le Gouvernement, comme en témoignent [mesures existantes]', " | |
| "'Le Gouvernement est pleinement conscient des enjeux liés à [thème]', " | |
| "'Cette question, essentielle pour [public concerné], fait l'objet d'une attention constante de la part des services de l'État'. " | |
| "Évitez absolument les formulations du type : 'comme vous le soulignez à juste titre', 'vous avez raison de pointer', ou 'la situation est effectivement préoccupante'. " | |
| "Privilégiez les faits, les chiffres, et les actions en cours.", | |
| "Répondre négativement aux propositions du parlementaire": | |
| "Répondez de manière **polie mais ferme**, en **recentrant le débat sur les actions du Gouvernement** plutôt que sur les critiques. " | |
| "Structurez votre réponse ainsi : " | |
| "1. **Reconnaissez l'importance du sujet** (sans valider les critiques) : " | |
| "'La question que vous soulevez touche à un enjeu majeur pour [public concerné], auquel le Gouvernement apporte une réponse structurée.' " | |
| "2. **Rappelez le cadre existant** : " | |
| "'Conformément à [texte juridique ou politique publique], les actions menées visent à [objectif].' " | |
| "3. **Expliquez les contraintes** (si nécessaire) : " | |
| "'Les marges de manœuvre sont encadrées par [contrainte légale/budgétaire], mais le Gouvernement agit dans le respect de ces règles pour [objectif].' " | |
| "4. **Mettez en avant les alternatives ou mesures en cours** : " | |
| "'Plutôt que [proposition du parlementaire], le Gouvernement a choisi de [mesure alternative], qui permet de [bénéfice].' " | |
| "Exemple : 'Plutôt qu’une refonte complète du dispositif, nous avons renforcé [mesure X], qui a déjà permis [résultat].' " | |
| "Évitez les formulations défensives comme 'nous ne pouvons pas' – préférez 'notre approche privilégie [solution], car [raison].'", | |
| "Répondre positivement aux propositions du parlementaire": | |
| "Saluiez l’intérêt de la proposition **sans reprendre les critiques sous-jacentes**. " | |
| "Utilisez des formulations comme : " | |
| "'Votre proposition s’inscrit dans une dynamique que le Gouvernement partage, comme en attestent [mesures existantes].' " | |
| "'Nous partageons votre préoccupation pour [enjeu], et nos actions vont dans le sens de [objectif], comme le montre [exemple].' " | |
| "Évitez : 'Vous avez raison de souligner que...' → préférez : 'Votre attention à ce sujet rejoint nos priorités, illustrées par [action].'", | |
| "Répondre de manière technique et détaillée": | |
| "Fournissez une réponse **factuelle et technique**, en évitant tout commentaire sur les critiques du parlementaire. " | |
| "Structurez ainsi : " | |
| "1. **Cadre juridique** : 'Le dispositif actuel, défini par [article X], repose sur [principe].' " | |
| "2. **Données chiffrées** : 'Les derniers chiffres (source : [DREES/INSEE/...], [année]) montrent que [tendance].' " | |
| "3. **Mesures en cours** : 'Pour répondre à ces enjeux, [mesure A] et [mesure B] ont été mises en place, avec [résultat].' " | |
| "Utilisez un vocabulaire neutre et des verbes d’action : 'le Gouvernement a engagé', 'les services travaillent à', 'les résultats montrent que'." | |
| } | |
| # Longueur maximale selon le paramètre | |
| max_tokens = 500 if longueur.startswith("Courte") else 1000 if longueur.startswith("Moyenne") else 2200 | |
| # Construction du prompt | |
| prompt = f""" | |
| [INST] | |
| {orientation_mapping.get(response_orientation, "")} | |
| **Question parlementaire :** | |
| {question} | |
| **Contexte parlementaire (réponses similaires passées) :** | |
| {parliamentary_context} | |
| **Textes juridiques applicables (PRIORITAIRES) :** | |
| {legal_context} | |
| **Documents de référence uploadés (traités avec vos fonctions d'extraction) :** | |
| {uploaded_documents} | |
| **Résultats de recherche internet (actualités, positions du gouvernement) :** | |
| {search_context} | |
| **Consignes strictes :** | |
| 1. **Priorité juridique** : Votre réponse DOIT être cohérente avec les textes juridiques fournis. | |
| En cas de contradiction entre le contexte parlementaire et les textes juridiques, priorisez ces derniers. | |
| En cas de contradiction entre le contexte parlementaire et les résultats de recherche internet, priorisez ces derniers. | |
| Citez explicitement les articles pertinents (ex: "comme le précise l'article L124-5 du CASF..."). | |
| 2. **Structure** : | |
| - Reconnaître l'importance du sujet sans insister sur les difficultés soulevées par le parlementaire, surtout si elles sont critiques quant à l'action du Gouvernement | |
| - Rappelez éventuellement les chiffres et le cadre juridique | |
| - Poursuivez avec les éléments budgetaires | |
| - Intégrez les informations issues prioritairement des documents de référence uploadés puis de la recherche internet de manière fluide, sans mention explicite de la source ("recherche internet", "résultats de recherche") pour : | |
| - décrire les mesures prises par le Gouvernement et celles sur lesquelles le Gouvernement travaille | |
| - préciser la position du Gouvernement sur le sujet principal de la question parlementaire | |
| - Ne pas annoncer d'échéances à venir pour des dates antérieures à la date du jour (exemple : "Une concertation sera menée d’ici l’été 2024" alors que nous sommes en novembre 2025) | |
| - Concluez en réaffirmant l'engagement du Gouvernement. | |
| - Ne pas mélanger le sujet à d'autres sujets trop éloignés dans la conclusion. | |
| 3. **Niveau de détail** : {detail_juridique}/5 (adaptez la profondeur des explications juridiques). | |
| 4. **Longueur et ajustement dynamique** : | |
| - Limite absolue : {max_tokens} tokens. | |
| - Avant de finaliser, estimez le nombre de tokens de votre réponse. | |
| - Si vous dépassez {max_tokens} : | |
| - Supprimez les exemples, les répétitions ou les données secondaires. | |
| - Conservez impérativement : l’enjeu, le cadre juridique, et la conclusion. | |
| - Utilisez des formulations comme : "Pour respecter la limite, nous synthétisons les points clés :" | |
| - Si la réponse risque d’être trop courte, développez le cadre juridique ou les mesures en cours. | |
| 5. **Estimation préalable** : | |
| - Un paragraphe = ~100 tokens. Adaptez le nombre de paragraphes en conséquence. | |
| - Après chaque section, vérifiez que le total reste inférieur à {max_tokens}. | |
| 6. **Style** : | |
| - Utilisez un style administratif, formel et concis, comme dans les réponses ministérielles. | |
| - La réponse doit être rédigée en prose continue, sans titres, sans puces, sans numérotation. | |
| - Répondez précisément aux questions posées, par exemple sur les éléments budgétaires ou de calendrier. | |
| - La réponse doit être d'actualité et privilégier les informations les plus récentes. | |
| - Si les propositions faites par le parlementaire sont intéressantes, dites qu'elles seront étudiées. | |
| - Utilisez uniquement des paragraphes rédigés, comme dans les réponses ministérielles publiées au Journal Officiel. | |
| - Si vous avez plusieurs éléments à présenter, intégrez-les dans des phrases complètes reliées par des connecteurs ("par ailleurs", "en outre", "de plus"). | |
| - Les éléments de la réponse ne doivent pas être redondants. | |
| - Ne pas mettre de formule de politesse à la fin. | |
| - **Contrainte de longueur absolue** : La réponse ne doit pas dépasser {longueur}. | |
| - Toute réponse plus longue sera rejetée. | |
| - Si le sujet est trop complexe pour tenir dans cette limite, concentrez-vous sur les points les plus importants. | |
| - Toute réponse qui se termine par une phrase tronquée est incorrecte. | |
| - Toute réponse qui contient des listes ou des titres est incorrecte. | |
| {f"7. Instructions spécifiques strictes : {custom_instructions}" if custom_instructions else ""} | |
| [/INST] | |
| """ | |
| return prompt | |
| # Appelle l'API Mistral Large pour générer une réponse parlementaire | |
| def call_mistral_parlementary_response( | |
| prompt: str, | |
| longueur: str, | |
| question: str, | |
| max_retries: int = 2, | |
| model_size: str = "small" # "large", "medium", "small" | |
| ) -> str: | |
| # Appelle l'API Mistral pour générer une réponse parlementaire. Le modèle est choisi dynamiquement (small, medium, large). | |
| mistral_api_url = "https://api.mistral.ai/v1/chat/completions" | |
| headers = { | |
| "Authorization": f"Bearer {MISTRAL_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| # Construire le nom du modèle dynamiquement | |
| model_name = f"mistral-{model_size}-latest" | |
| # Déterminer la taille de sortie en fonction de la longueur souhaitée | |
| if longueur.startswith("Courte"): | |
| max_tokens = 500 | |
| elif longueur.startswith("Moyenne"): | |
| max_tokens = 1000 | |
| else: | |
| max_tokens = 2200 | |
| # Vérifier la taille du prompt | |
| prompt_tokens = estimate_tokens(prompt) | |
| if prompt_tokens > 30000: | |
| raise ValueError(f"Prompt trop long: {prompt_tokens} tokens (limite: 30000)") | |
| payload = { | |
| "model": model_name, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "temperature": 0.3, | |
| "max_tokens": max_tokens | |
| } | |
| for attempt in range(max_retries): | |
| try: | |
| response = requests.post(mistral_api_url, headers=headers, json=payload, timeout=90) | |
| response.raise_for_status() | |
| data = response.json() | |
| mistral_response = data["choices"][0]["message"]["content"] | |
| is_truncated = data["choices"][0].get("finish_reason") == "length" | |
| if is_truncated: | |
| try: | |
| mistral_response = handle_truncated_response(mistral_response, question, longueur) | |
| except Exception as e: | |
| raise Exception(f"Erreur lors de la complétion de la réponse tronquée: {str(e)}") | |
| return mistral_response | |
| except requests.exceptions.HTTPError as e: | |
| if response.status_code == 429 and attempt < max_retries - 1: # Too Many Requests | |
| st.warning(f"⏳ API Mistral temporairement encombrée. Tentative {attempt + 1}/{max_retries}. Relance dans 10 secondes...") | |
| time.sleep(10) | |
| else: | |
| raise Exception(f"Erreur HTTP {response.status_code}: {str(e)}") | |
| except requests.exceptions.RequestException as e: | |
| if attempt < max_retries - 1: | |
| st.warning(f"⚠️ Erreur réseau. Tentative {attempt + 1}/{max_retries}. Relance dans 10 secondes...") | |
| time.sleep(10) | |
| else: | |
| raise Exception(f"Erreur réseau: {str(e)}") | |
| # Complète une réponse tronquée | |
| def handle_truncated_response(response: str, question: str, longueur: str) -> str: | |
| # Gère les réponses tronquées par Mistral. | |
| last_period = response.rfind('.') | |
| if last_period > 0 and last_period < len(response) - 100: | |
| incomplete_part = response[last_period+1:] | |
| else: | |
| incomplete_part = response[-100:] | |
| completion_prompt = f""" | |
| [INST] | |
| Complétez UNIQUEMENT la phrase ou le paragraphe suivant en cours, sans ajouter de titre ni d'introduction. | |
| Contexte original: {question[:200]}... | |
| Texte à compléter: "{incomplete_part}" | |
| Consignes strictes: | |
| - Continuez directement le texte existant, sans recommencer la réponse. | |
| - N'ajoutez pas de formule comme "Réponse du ministère...".- Terminez la phrase/paragraphe en cours de manière cohérente. | |
| - Ajoutez une conclusion sur le thème de la question en 1-2 phrases maximum. | |
| - Utilisez un style administratif et formel, comme dans les réponses ministérielles. | |
| - Respectez strictement la limite de 400 caractères. | |
| - Tout complément de réponse plus long sera rejeté. | |
| - Concluez en réaffirmant l'engagement du Gouvernement. | |
| - La réponse doit être rédigée en prose continue, sans titres, sans puces, sans numérotation. | |
| - Utilisez uniquement des paragraphes rédigés, comme dans les réponses ministérielles publiées au Journal Officiel. | |
| - Si vous avez plusieurs éléments à présenter, intégrez-les dans des phrases complètes reliées par des connecteurs ("par ailleurs", "en outre", "de plus"). | |
| - Ne pas mettre de formule de politesse à la fin. | |
| - Tout complément de réponse qui se termine par une phrase tronquée est incorrecte. | |
| - Toute complément de réponse qui contient des listes ou des titres est incorrecte. | |
| [/INST] | |
| """ | |
| try: | |
| completion_response = requests.post( | |
| "https://api.mistral.ai/v1/chat/completions", | |
| headers={"Authorization": f"Bearer {MISTRAL_API_KEY}", "Content-Type": "application/json"}, | |
| json={ | |
| "model": "mistral-large-latest", | |
| "messages": [{"role": "user", "content": completion_prompt}], | |
| "temperature": 0.1, | |
| "max_tokens": 60 | |
| }, | |
| timeout=30 | |
| ) | |
| completion_response.raise_for_status() | |
| completion = completion_response.json()["choices"][0]["message"]["content"] | |
| if response.endswith("..."): | |
| final_response = response[:-3] + completion | |
| elif response.endswith(" "): | |
| final_response = response + completion | |
| else: | |
| final_response = response + " " + completion | |
| if not final_response.endswith(('.', '!', '?')): | |
| final_response += "." | |
| return final_response | |
| except Exception as e: | |
| st.warning(f"⚠️ Impossible de compléter la réponse tronquée: {str(e)}") | |
| return response + " (réponse incomplète)" | |
| # --- 7. Génération de la réponse --- | |
| def generate_response( | |
| question: str, | |
| legislature: Optional[str] = None, | |
| rubrique: Optional[str] = None, | |
| detail_juridique: int = 3, | |
| longueur: str = "Moyenne (500 mots)", | |
| response_orientation: str = "Répondre de façon neutre", | |
| custom_instructions: str = "", | |
| include_legal_articles: bool = False, | |
| must_contain: str = "", | |
| max_legal_articles: int = 3, | |
| model_size: str = "small" # choix du modèle Mistral | |
| ): | |
| try: | |
| status_placeholder = st.empty() | |
| # Étape 1 : Recherche d'anciennes questions / réponses parlementaires (uniquement si pas de limitation) | |
| parliamentary_context = "Aucun contexte parlementaire trouvé." | |
| similar_documents = [] | |
| if not (use_priority_docs and selected_docs): | |
| status_placeholder.markdown( | |
| '<div class="status-message">🏛️ Recherche dans la base des anciennes questions / réponses...</div>', | |
| unsafe_allow_html=True | |
| ) | |
| similar_documents = search_question_parlementaire(question, top_k=5) | |
| if similar_documents: | |
| parliamentary_context = "\n\n".join( | |
| [ | |
| f"Contexte parlementaire {i+1} (source: {doc.uid}):\n" | |
| f"Question: {doc.question}\nRéponse: {doc.reponse}" | |
| for i, doc in enumerate(similar_documents) | |
| ] | |
| ) | |
| # Étape 2 : Recherche des articles juridiques (si activée) (uniquement si pas de limitation) | |
| legal_context = "Aucun texte juridique spécifique n'a été identifié." | |
| legal_sources = [] | |
| if not (use_priority_docs and selected_docs) and include_legal_articles: | |
| status_placeholder.markdown( | |
| '<div class="status-message">📚 Recherche dans les codes juridiques (Code du travail, Code de la sécurité sociale, Code de la santé publique, Code de l\'action sociale et des familles)...</div>', | |
| unsafe_allow_html=True | |
| ) | |
| legal_sources_result = search_articles( | |
| query=question, | |
| partie=None, | |
| limit=max_legal_articles, | |
| must_contain=must_contain if must_contain else None, | |
| debug=True, | |
| threshold=0.5 | |
| ) | |
| legal_sources = legal_sources_result["sources"] | |
| if legal_sources: | |
| legal_context = "\n\n".join( | |
| [f"Article {art.num}: {art.titre}\n{art.article_complet}" for art in legal_sources] | |
| ) | |
| # Étape 3 : Recherche dans les documents uploadés (si mode parlementaire) - limité à 5 résultats | |
| status_placeholder.markdown( | |
| '<div class="status-message">📄 Recherche dans la base documentaire...</div>', | |
| unsafe_allow_html=True | |
| ) | |
| uploaded_results = search_uploaded_documents(question, qdrant_client, embedding_model, top_k=5) | |
| # Formatage basé sur la pertinence (seuil = 0.7) | |
| uploaded_docs_context = format_uploaded_docs_by_relevance(uploaded_results, min_score=0.7) | |
| # Étape 4 : Recherche internet (si mode parlementaire) | |
| search_context = "Aucune recherche internet effectuée." | |
| search_results = [] | |
| if not (use_priority_docs and selected_docs) and st.session_state.get("search_engine"): | |
| if st.session_state.get("search_engine") == "Tavily": | |
| status_placeholder.markdown( | |
| '<div class="status-message">🌐 Recherche internet (Tavily)...</div>', | |
| unsafe_allow_html=True | |
| ) | |
| results = search_tavily_government(extract_subject(question)) | |
| search_context = results.get("answer", "") | |
| search_results = results.get("results", []) | |
| # Ajout du contenu des résultats pour enrichir le contexte | |
| if search_results: | |
| search_context += "\n\n" + "\n\n".join([item.get("content", "") for item in search_results]) | |
| elif st.session_state.get("search_engine") == "Google": | |
| status_placeholder.markdown( | |
| '<div class="status-message">🌐 Recherche internet (Google)...</div>', | |
| unsafe_allow_html=True | |
| ) | |
| results = search_google_government(extract_subject(question)) | |
| search_results = results.get("results", []) | |
| # Google renvoie "snippet" | |
| search_context = "\n\n".join([item.get("snippet", "") for item in search_results]) | |
| # Étape 5 : Génération de la réponse | |
| status_placeholder.markdown( | |
| '<div class="status-message">🤖 Génération de la réponse par Mistral...</div>', | |
| unsafe_allow_html=True | |
| ) | |
| prompt = build_parlementary_response_prompt( | |
| question=question, | |
| parliamentary_context=parliamentary_context, | |
| legal_context=legal_context, | |
| uploaded_documents=uploaded_docs_context, | |
| detail_juridique=detail_juridique, | |
| longueur=longueur, | |
| response_orientation=response_orientation, | |
| custom_instructions=custom_instructions, | |
| search_context=search_context | |
| ) | |
| mistral_response = call_mistral_parlementary_response( | |
| prompt, | |
| longueur, | |
| question, | |
| model_size=model_size | |
| ) | |
| # ➡️ Effacer le message | |
| status_placeholder.empty() | |
| return { | |
| "question": question, | |
| "context": [doc.reponse for doc in similar_documents[:6] if doc.reponse], | |
| "context_str": parliamentary_context, | |
| "legal_context": legal_context, | |
| "response": mistral_response, | |
| "legal_sources": legal_sources if include_legal_articles else [], | |
| "similar_documents": similar_documents, | |
| "uploaded_documents": uploaded_results, | |
| "search_results": search_results, | |
| "metadata": { | |
| "status": "success", | |
| "model_used": f"mistral-{model_size}-latest", | |
| "timestamp": datetime.now(pytz.timezone('Europe/Paris')).isoformat(), | |
| "legislature": legislature, | |
| "rubrique": rubrique | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| "question": question, | |
| "response": f"Erreur lors de la génération de la réponse : {str(e)}", | |
| "error": str(e), | |
| "legal_sources": [], | |
| "similar_documents": [], | |
| "metadata": { | |
| "status": "error", | |
| "timestamp": datetime.now(pytz.timezone('Europe/Paris')).isoformat(), | |
| "legislature": legislature, | |
| "rubrique": rubrique | |
| } | |
| } | |
| # --- NOUVEL ENDPOINT SIMPLIFIÉ POUR LISTER LES DOCUMENTS --- | |
| # Fonction qui retourne juste les deux catégories de documents - VERSION STATIQUE | |
| def get_simple_documents_list(): | |
| return { | |
| "documents": [ | |
| { | |
| "type": "Questions écrites (QE) de l'Assemblée nationale", | |
| "periode": "2017-2025", | |
| "description": "Questions ayant obtenu une réponse ministérielle (avant le 1er novembre 2025)." | |
| }, | |
| { | |
| "type": "Questions écrites (QE) du Sénat", | |
| "periode": "2017-2025", | |
| "description": "Collection complète des questions écrites ayant obtenu une réponse ministérielle (avant le 1er novembre 2025)." | |
| } | |
| ] | |
| } | |
| ################################################################# | |
| ### ------------- 2. AUTHENTIFICATION -----------------------### | |
| ################################################################# | |
| config = { | |
| "credentials": { | |
| "usernames": { | |
| "Whisler": { | |
| "name": "Francois-Mathieu", | |
| "password": os.getenv("USER_WHISLER_PASSWORD") | |
| }, | |
| "Delphine": { | |
| "name": "Caudilla Delphine", | |
| "password": os.getenv("USER_DELPHINE_PASSWORD") | |
| }, | |
| "Isabelle": { | |
| "name": "Caudilla Isabelle", | |
| "password": os.getenv("USER_ISABELLE_PASSWORD") | |
| }, | |
| "Arnaud": { | |
| "name": "Arnaud", | |
| "password": os.getenv("USER_ARNAUD_PASSWORD") | |
| }, | |
| "DGCS": { | |
| "name": "DGCS", | |
| "password": os.getenv("USER_DGCS_PASSWORD") | |
| }, | |
| "DSS": { | |
| "name": "DSS", | |
| "password": os.getenv("USER_DSS_PASSWORD") | |
| }, | |
| "Julien": { | |
| "name": "Special Guest", | |
| "password": os.getenv("USER_SPECIAL_PASSWORD") | |
| }, | |
| "Invité": { | |
| "name": "Invité", | |
| "password": os.getenv("USER_GUEST_PASSWORD") | |
| } | |
| } | |
| } | |
| } | |
| if 'authentication_status' not in st.session_state: | |
| st.session_state.authentication_status = None | |
| def check_password(): | |
| if st.session_state["username"] in config['credentials']['usernames']: | |
| stored_password = config['credentials']['usernames'][st.session_state["username"]]["password"] | |
| if hashlib.sha256(st.session_state["password"].encode()).hexdigest() == stored_password: | |
| st.session_state["authentication_status"] = True | |
| st.session_state["name"] = config['credentials']['usernames'][st.session_state["username"]]["name"] | |
| return | |
| st.session_state["authentication_status"] = False | |
| # --- Connexion ou contenu principal --- | |
| if st.session_state.authentication_status is not True: | |
| # Masquer la sidebar avant connexion | |
| st.markdown(""" | |
| <style> | |
| [data-testid="stSidebar"] {display: none;} | |
| /* Centrer le titre */ | |
| .auth-title {text-align: center; margin-top: 0.5rem;} | |
| /* Centrer le paragraphe d'intro */ | |
| .auth-intro {text-align: center; color: #5c5c5c;} | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # --- Page d'authentification --- | |
| st.markdown('<h1 class="auth-title">🔐 Authentification requise</h1>', unsafe_allow_html=True) | |
| st.markdown('<p class="auth-intro">Veuillez entrer vos identifiants pour accéder au générateur de réponses aux questions écrites.</p>', unsafe_allow_html=True) | |
| # Colonnes pour réduire la largeur et centrer les champs | |
| # Ajuste les ratios pour obtenir la largeur souhaitée (ici ~25% de la page) | |
| left, center, right = st.columns([3, 2, 3]) | |
| with center: | |
| st.text_input("Nom d'utilisateur", key="username") | |
| st.text_input("Mot de passe", type="password", key="password") | |
| if st.button("Se connecter"): | |
| check_password() | |
| st.rerun() | |
| if st.session_state.authentication_status is False: | |
| st.error("Identifiants incorrects. Veuillez réessayer.") | |
| else: | |
| # Réafficher la sidebar après connexion | |
| st.markdown(""" | |
| <style> | |
| [data-testid="stSidebar"] {display: block;} | |
| </style> | |
| """, unsafe_allow_html=True) | |
| ################################################################# | |
| ### ---- 3. AFFICHAGE DU SITE APRES CONNEXION --------------- ### | |
| ################################################################# | |
| # --- Initialisation de l'historique (UNIQUEMENT si non existant) --- | |
| if "full_historique" not in st.session_state: | |
| st.session_state.full_historique = {} | |
| save_historique() # Sauvegarde initiale | |
| # --- Chargement de l'historique depuis session_state (si vide, on vérifie le cache) --- | |
| if not st.session_state.full_historique and "historique_cache" in st.session_state: | |
| st.session_state.full_historique = st.session_state.historique_cache | |
| # --- Configuration de la page et CSS --- | |
| st.markdown(""" | |
| <style> | |
| /* Élargir le conteneur principal */ | |
| .block-container { | |
| max-width: 95%; | |
| padding-left: 2rem; | |
| padding-right: 2rem; | |
| } | |
| /* Élargir les zones de saisie */ | |
| textarea, .stTextArea textarea { | |
| width: 100% !important; | |
| } | |
| /* Élargir les selectbox et sliders */ | |
| .stSelectbox, .stSlider { | |
| width: 100% !important; | |
| } | |
| /* Votre CSS existant */ | |
| .stApp { background-color: #f8f9fa; } | |
| .stTabs [data-baseweb="tab-list"] { gap: 0; background-color: #e9ecef; border-radius: 6px 6px 0 0; padding: 4px; } | |
| .stTabs [data-baseweb="tab"] { height: 36px; white-space: pre-wrap; background-color: #f8f9fa; border: none; border-radius: 4px 4px 0 0; padding: 0 12px; } | |
| .stTabs [aria-selected="true"] { background-color: #ffffff; font-weight: bold; color: #3d3d3d; } | |
| .stButton>button { background-color: #4a8bfc; color: white; border: none; border-radius: 4px; padding: 8px 16px; font-weight: 500; } | |
| .stButton>button:hover { background-color: #3a7bfc; } | |
| .stExpander { background-color: #ffffff; border: 1px solid #e9ecef; border-radius: 6px; margin-bottom: 0px; } /* ← réduit l’espace */ | |
| .stTextArea textarea { font-family: 'Segoe UI', sans-serif; font-size: 16px; line-height: 1.5; } | |
| .stAlert { border-radius: 6px; } | |
| .source-text { font-family: monospace; font-size: 14px; background-color: #f8f9fa; padding: 8px; border-radius: 4px; border-left: 3px solid #4a8bfc; } | |
| .response-text { font-family: 'Segoe UI', sans-serif; font-size: 16px; line-height: 1.6; white-space: pre-wrap; background-color: white; padding: 16px; border-radius: 6px; border: 1px solid #e9ecef; } | |
| /* Nouveau : titre de l'historique */ | |
| .history-title { | |
| margin-bottom: 15px; | |
| } | |
| /* Style carte avec ombre légère pour les expanders */ | |
| div[data-testid="stExpander"] { | |
| background-color: #ffffff; | |
| border: 1px solid #e9ecef; | |
| border-radius: 8px; | |
| margin-bottom: 5px !important; | |
| box-shadow: 0 2px 6px rgba(0,0,0,0.08); /* ombre douce */ | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| st.title("🏛️ Générateur de réponses aux questions écrites parlementaires") | |
| st.markdown(""" | |
| Application (version Beta) de réponse aux questions parlementaires, | |
| appuyée sur une base documentaire (embedding avec **camemBERT**), un moteur de recherche (**Tavily** ou **Google**) et le modèle **Mistral**. | |
| """) | |
| ################################################################# | |
| #### -------------------- 3a. SIDEBAR --------------------- #### | |
| ################################################################# | |
| with st.sidebar: | |
| # Bouton Déconnexion | |
| if st.button('Déconnexion', key="logout"): | |
| st.session_state.authentication_status = None | |
| st.rerun() | |
| # Message de bienvenue | |
| st.write(f'Bienvenue *{st.session_state["name"]}*') | |
| # --- Version du site --- | |
| st.markdown( | |
| """ | |
| <style> | |
| .version-text { | |
| position: absolute; | |
| top: 5px; | |
| left: 0px; | |
| color: #555; | |
| font-size: 14px; | |
| font-style: italic; | |
| } | |
| </style> | |
| <div class="version-text">Version v0.1.12 (Beta)</div> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| # Séparateur visuel | |
| st.markdown("---") | |
| # Paramètres de mode | |
| mode = st.radio( | |
| "Choix du module", | |
| ["Réponse parlementaire", "Base documentaire"], # Ajouter , "Analyse juridique" pour avoir le second mode - Permet de moduler les modes accessibles sur le site | |
| index=0 | |
| ) | |
| # Conteneur pour contrôler la largeur du bouton | |
| button_container = st.container() | |
| # Initialisation des boutons à False | |
| generate_parliamentary_button = False | |
| generate_analysis_button = False | |
| with button_container: | |
| if mode == "Réponse parlementaire": | |
| generate_parliamentary_button = st.button( | |
| "Générer la réponse", | |
| type="primary", | |
| key="generate_parliamentary_button" | |
| # Sans use_container_width pour une largeur automatique | |
| ) | |
| elif mode == "Analyse juridique": | |
| generate_analysis_button = st.button( | |
| "Générer l'analyse", | |
| type="primary", | |
| key="generate_analysis_button" | |
| # Sans use_container_width pour une largeur automatique | |
| ) | |
| # Ajoutez une séparation visuelle supplémentaire | |
| st.markdown("---") | |
| # Choix du modèle Mistral | |
| model_size = st.radio( | |
| "Choix du modèle Mistral", | |
| ["Small", "Medium", "Large (recommandé)"], | |
| index=0 | |
| ) | |
| # Normaliser la valeur pour l'appel API | |
| model_size = model_size.lower() | |
| # Choix du moteur de recherche | |
| search_engine = st.radio( | |
| "Moteur de recherche internet", | |
| ["Google", "Tavily"], | |
| index=0 | |
| ) | |
| st.session_state["search_engine"] = search_engine | |
| # Séparateur | |
| st.markdown("---") | |
| ########################################################################## | |
| ### ------ 3b. INTERFACE DE GESTION DE LA BASE DOCUMENTAIRE ---------- ### | |
| ########################################################################## | |
| # Initialisation du bouton search | |
| search_button = False | |
| if mode == "Base documentaire": | |
| st.markdown("---") | |
| st.markdown("#### 📚 Gestion de la base documentaire") | |
| # 1. Liste des documents (collections) | |
| try: | |
| collections = qdrant_client.get_collections() | |
| collection_names = [col.name for col in collections.collections] | |
| protected_collections = { | |
| "Code de la sécurité sociale", | |
| "Code du travail", | |
| "CASF", | |
| "QuestionParlementaire", | |
| "Code de la santé publique" | |
| } | |
| # Filtre pour ne garder que les collections "documents" (ex: "NomDuDocument_2023") | |
| doc_collections = [ | |
| name for name in collection_names | |
| if name not in protected_collections and "_" in name # Ex: "MonDocument_2023" | |
| ] | |
| if not doc_collections: | |
| st.info("Aucun document trouvé.") | |
| else: | |
| st.markdown("**Documents disponibles :**") | |
| # Tri alphabétique sur le nom nettoyé | |
| sorted_collections = sorted( | |
| doc_collections, | |
| key=lambda name: name.split('__')[0].replace('_', ' ').lower() | |
| ) | |
| for doc_name in sorted_collections: | |
| clean_name = doc_name.split('__')[0].replace('_', ' ') | |
| col1, col2, col3 = st.columns([18, 1, 1]) # plus de place pour le nom | |
| with col1: | |
| st.markdown(f"📄 {clean_name}") | |
| with col2: | |
| if st.button("✏️", key=f"rename_{doc_name}", help="Renommer le document"): | |
| st.session_state.show_rename_modal = True | |
| st.session_state.current_doc_to_rename = doc_name | |
| st.rerun() | |
| with col3: | |
| if st.button("🗑️", key=f"del_{doc_name}", help="Supprimer le document"): | |
| qdrant_client.delete_collection(collection_name=doc_name) | |
| st.success(f"Document '{clean_name}' supprimé.") | |
| st.rerun() | |
| # Fenêtre modale de renommage | |
| if st.session_state.show_rename_modal: | |
| doc_name = st.session_state.current_doc_to_rename | |
| clean_name = doc_name.split('__')[0].replace('_', ' ') | |
| with st.container(): | |
| st.markdown("---") | |
| st.subheader(f"Renommer le document ''{clean_name}''") | |
| new_name = st.text_input( | |
| "Nouveau nom:", | |
| value=clean_name.replace(" ", "_"), | |
| key=f"new_name_{doc_name}" | |
| ) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Valider", key=f"validate_rename_{doc_name}"): | |
| if new_name.strip(): | |
| try: | |
| # 1. Vérification si le nouveau nom existe déjà | |
| new_display_name = new_name.strip().replace(" ", "_") | |
| new_collection_name = f"{new_display_name}__{doc_name.split('__')[1]}" | |
| # Récupère toutes les collections existantes | |
| existing_collections = qdrant_client.get_collections() | |
| existing_names = [col.name for col in existing_collections.collections] | |
| # Vérifie si le nouveau nom existe déjà | |
| if new_collection_name in existing_names: | |
| st.error(f"❌ Le nom '{new_name}' existe déjà. Veuillez choisir un autre nom.") | |
| else: | |
| # Initialisation de la progression | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| estimated_total = 1000 # Estimation conservatrice du nombre total de points | |
| # 2. Crée la nouvelle collection (0-10%) | |
| status_text.text("Création de la nouvelle collection...") | |
| qdrant_client.create_collection( | |
| collection_name=new_collection_name, | |
| vectors_config=models.VectorParams( | |
| size=1024, | |
| distance=models.Distance.COSINE | |
| ) | |
| ) | |
| progress_bar.progress(10) | |
| # 3. Récupère les points (10-40%) | |
| status_text.text("Récupération des données...") | |
| offset = None | |
| all_points = [] | |
| while True: | |
| records, offset = qdrant_client.scroll( | |
| collection_name=doc_name, | |
| limit=100, | |
| offset=offset, | |
| with_payload=True, | |
| with_vectors=True | |
| ) | |
| for record in records: | |
| point_dict = { | |
| "id": str(record.id), | |
| "vector": record.vector, | |
| "payload": record.payload | |
| } | |
| all_points.append(models.PointStruct(**point_dict)) | |
| if offset is None: | |
| break | |
| # Calcul sécurisé de la progression (max 40%) | |
| current_progress = min(40, 10 + int(30 * len(all_points) / max(1, estimated_total))) | |
| progress_bar.progress(current_progress) | |
| # 4. Copie les points (40-90%) | |
| status_text.text(f"Copie des {len(all_points)} chunks...") | |
| batch_size = 50 | |
| for i in range(0, len(all_points), batch_size): | |
| batch = all_points[i:i + batch_size] | |
| qdrant_client.upsert( | |
| collection_name=new_collection_name, | |
| points=batch, | |
| wait=True | |
| ) | |
| # Calcul SÉCURISÉ de la progression (max 90%) | |
| batch_progress = min(90, 40 + int(50 * (i + len(batch)) / max(1, len(all_points)))) | |
| progress_bar.progress(batch_progress) | |
| # 5. Supprime l'ancienne collection (100%) | |
| status_text.text("Finalisation...") | |
| qdrant_client.delete_collection(collection_name=doc_name) | |
| progress_bar.progress(100) | |
| st.success(f"✅ Document renommé en '{new_name}' !") | |
| st.session_state.show_rename_modal = False | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Erreur: {e}") | |
| else: | |
| st.warning("Veuillez entrer un nom valide.") | |
| with col2: | |
| if st.button("Annuler", key=f"cancel_rename_{doc_name}"): | |
| st.session_state.show_rename_modal = False | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Erreur lors de la récupération des collections : {e}") | |
| # --- Section de téléchargement depuis un lien public --- | |
| st.markdown("---") | |
| st.markdown("**Ajouter un document (max 50 Mo) depuis un lien public**") | |
| def make_direct_link(file_url: str, prefer_format: str = "docx") -> str: | |
| """ | |
| Convertit les liens Google Docs / Google Drive / Dropbox / OneDrive en liens directs téléchargeables. | |
| - prefer_format: "docx" ou "pdf" pour Google Docs. | |
| Retourne le lien original si aucun cas particulier n'est détecté. | |
| """ | |
| u = file_url.strip() | |
| # --- Google Docs (document, pas un fichier Drive) --- | |
| if "docs.google.com/document" in u: | |
| m = re.search(r"/document/d/([^/]+)/", u) | |
| doc_id = m.group(1) if m else None | |
| if doc_id: | |
| fmt = "docx" if prefer_format == "docx" else "pdf" | |
| return f"https://docs.google.com/document/d/{doc_id}/export?format={fmt}" | |
| return u | |
| # --- Google Drive (fichiers) --- | |
| if "drive.google.com" in u: | |
| file_id = None | |
| if "/d/" in u: | |
| file_id = u.split("/d/")[1].split("/")[0] | |
| else: | |
| qs_id = parse_qs(urlparse(u).query).get("id", [None])[0] | |
| file_id = qs_id or file_id | |
| if file_id: | |
| return f"https://drive.google.com/uc?export=download&id={file_id}" | |
| return u | |
| # --- Dropbox --- | |
| if "dropbox.com" in u: | |
| parsed = urlparse(u) | |
| # Force le download binaire | |
| if "dl=" in parsed.query: | |
| return re.sub(r"dl=\d", "dl=1", u) | |
| # sinon ajouter dl=1 | |
| sep = "&" if parsed.query else "?" | |
| return u + f"{sep}dl=1" | |
| # --- OneDrive --- | |
| if "1drv.ms" in u or "onedrive.live.com" in u: | |
| if "download=" not in u: | |
| parsed = urlparse(u) | |
| sep = "&" if parsed.query else "?" | |
| return u + f"{sep}download=1" | |
| return u | |
| return u | |
| file_url = st.text_input( | |
| "🔗 Collez ici le lien public vers votre document (Google Drive, Dropbox, OneDrive, etc.) :", | |
| key="file_url_input", | |
| placeholder="Ex: https://drive.google.com/uc?export=download&id=..." | |
| ) | |
| if file_url: | |
| direct_url = make_direct_link(file_url) | |
| default_name = os.path.splitext(urlparse(direct_url).path.split('/')[-1])[0] | |
| custom_name = st.text_input( | |
| "Nom du document (sera aussi le nom de la collection) :", | |
| value=default_name, | |
| key="doc_name_input" | |
| ) | |
| # Bouton de validation du lien | |
| if st.button("✅ Valider le lien"): | |
| try: | |
| response = requests.get(direct_url, stream=True, timeout=10) | |
| if response.status_code == 200: | |
| st.success("Lien valide et accessible ✅") | |
| else: | |
| st.error(f"❌ Lien inaccessible (status {response.status_code})") | |
| except Exception as e: | |
| st.error(f"❌ Erreur lors de la validation : {e}") | |
| # Bouton de téléchargement | |
| if st.button("📥 Télécharger et traiter le document"): | |
| try: | |
| response = requests.get(direct_url, stream=True, timeout=30) | |
| response.raise_for_status() | |
| file_size = int(response.headers.get('content-length', 0)) | |
| if file_size > 50 * 1024 * 1024: | |
| st.error("❌ Fichier trop gros (max 50 Mo).") | |
| else: | |
| with st.spinner("Téléchargement en cours..."): | |
| downloaded = 0 | |
| total_size = file_size | |
| progress_bar = st.progress(0) | |
| # Déterminer l’extension via content-type ou fallback | |
| content_type = response.headers.get("content-type", "").lower() | |
| if "pdf" in content_type or direct_url.lower().endswith(".pdf"): | |
| file_extension = "pdf" | |
| elif "docx" in content_type or "wordprocessingml" in content_type or direct_url.lower().endswith(".docx"): | |
| file_extension = "docx" | |
| else: | |
| # fallback par défaut : PDF | |
| file_extension = "pdf" | |
| # Créer un fichier temporaire | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| tmp_path = f.name | |
| # Vérification rapide du contenu | |
| with open(tmp_path, "rb") as f: | |
| header = f.read(5) | |
| if file_extension == "pdf" and not header.startswith(b"%PDF"): | |
| st.error("❌ Le fichier téléchargé n'est pas un PDF valide.") | |
| os.unlink(tmp_path) | |
| st.stop() | |
| if file_extension == "docx": | |
| import zipfile | |
| try: | |
| with zipfile.ZipFile(tmp_path, 'r') as z: | |
| if '[Content_Types].xml' not in z.namelist(): | |
| st.error("❌ Le fichier téléchargé n'est pas un DOCX valide.") | |
| os.unlink(tmp_path) | |
| st.stop() | |
| except Exception: | |
| st.error("❌ Le fichier téléchargé n'est pas un DOCX valide.") | |
| os.unlink(tmp_path) | |
| st.stop() | |
| # Créer une collection Qdrant avec suivi d'avancement | |
| collection_name = f"{custom_name.replace(' ', '_')}__{int(datetime.now().timestamp())}" | |
| progress = st.progress(0) | |
| status = st.empty() | |
| status.info("📂 Initialisation de la collection...") | |
| progress.progress(10) | |
| try: | |
| qdrant_client.create_collection( | |
| collection_name=collection_name, | |
| vectors_config=models.VectorParams(size=1024, distance=models.Distance.COSINE) | |
| ) | |
| status.success(f"✅ Collection '{collection_name}' créée.") | |
| progress.progress(30) | |
| # Traiter et indexer le document | |
| status.info("📑 Extraction et préparation du texte...") | |
| success = process_and_index_document( | |
| file_path=tmp_path, | |
| file_type=file_extension, | |
| collection_name=collection_name, | |
| qdrant_client=qdrant_client, | |
| embedding_model=embedding_model, | |
| progress_callback=lambda current, total, message: ( | |
| progress.progress(min(current/total, 1.0)), | |
| status.info(message) | |
| ) | |
| ) | |
| if success: | |
| progress.progress(100) | |
| status.success(f"✅ Document ajouté sous le nom '{custom_name}' !") | |
| collection_info = qdrant_client.get_collection(collection_name) | |
| # st.info(f"📊 Nombre de vecteurs indexés : {collection_info.vectors_count}") | |
| else: | |
| status.error("❌ Échec du traitement.") | |
| except Exception as e: | |
| st.error(f"❌ Erreur lors de la récupération des collections : {e}") | |
| except Exception as e: | |
| st.error(f"❌ Echec du traitement du document {e}") | |
| ######################################################################################### | |
| #### ---------- 3c. INTERFACE DE REPONSE AUX QUESTIONS PARLEMENTAIRES -------------- #### | |
| ######################################################################################### | |
| elif mode == "Réponse parlementaire": | |
| st.markdown("#### Question parlementaire") | |
| question = st.text_area( | |
| "Question parlementaire", # libellé non vide | |
| height=150, | |
| placeholder="Copier ici le texte de la question parlementaire", | |
| key="question_input", | |
| label_visibility="collapsed" | |
| ) | |
| # Initialisation des variables des boutons (pour éviter NameError) | |
| search_button = False | |
| # --- Boutons conditionnels selon le mode --- | |
| if mode == "Réponse parlementaire": | |
| # --- Paramètres de réponse --- | |
| st.markdown("#### Paramètres de réponse") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| response_orientation_options = [ | |
| "Répondre de façon neutre", | |
| "Répondre négativement aux propositions du parlementaire", | |
| "Répondre positivement aux propositions du parlementaire", | |
| "Répondre de manière technique et détaillée" | |
| ] | |
| selected_orientation = st.selectbox( | |
| "Orientation de la réponse", | |
| response_orientation_options, | |
| index=0 | |
| ) | |
| with col2: | |
| longueur = st.selectbox( | |
| "Longueur de la réponse", | |
| ["Courte (300 mots)", "Moyenne (500 mots)", "Longue (1000 mots)"], | |
| index=1 | |
| ) | |
| with col3: | |
| include_legal_articles = st.selectbox( | |
| "Inclure une recherche dans les codes juridiques", | |
| ["Non", "Oui"], | |
| index=0, | |
| key="include_legal_articles_select" | |
| ) == "Oui" | |
| MAX_LEN = 300 | |
| # Ligne avec deux colonnes : champ texte + case à cocher | |
| colA, colB = st.columns([2, 1]) | |
| with colA: | |
| MAX_LEN = 300 | |
| custom_instructions = st.text_area( | |
| "Optionnel : instructions supplémentaires pour la réponse (max. 300 caractères)", | |
| placeholder="Ex: Insister sur l'aspect budgétaire, mentionner le projet de loi X...", | |
| height=100, | |
| key="custom_instructions" | |
| ) | |
| if custom_instructions: | |
| remaining = MAX_LEN - len(custom_instructions) | |
| if remaining < 0: | |
| st.warning(f"⚠️ Vous avez dépassé la limite de {MAX_LEN} caractères ({len(custom_instructions)} actuellement).") | |
| custom_instructions = custom_instructions[:MAX_LEN] | |
| with colB: | |
| detail_juridique = st.slider( | |
| "Niveau de détail juridique (1 = bas, 5 = élevé)", | |
| min_value=1, | |
| max_value=5, | |
| value=3 | |
| ) | |
| if include_legal_articles: | |
| must_contain = st.text_input( | |
| "Optionnel : les articles sélectionnés doivent contenir (mot ou expression exacte)", | |
| key="must_contain_input", | |
| placeholder="Ex: allocation, article 123, décret 2020-..." | |
| ) | |
| else: | |
| must_contain = "" | |
| st.markdown("**Limiter les documents sources** (attention : la réponse n'intègre ni plus les anciennes QE, ni les textes juridiques et ni la recherche internet)") | |
| use_priority_docs = st.checkbox( | |
| "Rechercher uniquement dans les documents suivants :", | |
| value=False, | |
| key="use_priority_docs" | |
| ) | |
| if use_priority_docs: | |
| # Récupère les collections "documents" | |
| collections = qdrant_client.get_collections() | |
| doc_collections = [ | |
| col.name for col in collections.collections | |
| if col.name not in {"Code_de_la_sécurité_sociale", "Code_du_travail", "CASF", "QuestionParlementaire", "Code_de_la_santé_publique"} | |
| and "_" in col.name | |
| ] | |
| # Affiche les noms propres (sans timestamp) | |
| doc_names = [col.split('__')[0].replace('_', ' ') for col in doc_collections] | |
| selected_docs = st.multiselect( | |
| "Sélectionnez les documents", | |
| options=doc_names, | |
| key="priority_docs", | |
| placeholder="Choisir..." | |
| ) | |
| elif mode == "Analyse juridique": | |
| # --- Paramètres de recherche juridique --- | |
| st.markdown("### Paramètres de recherche juridique") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| must_contain = st.text_input("🔎 Doit contenir (mot ou expression exacte)", key="must_contain_input") | |
| with col2: | |
| threshold = st.selectbox( | |
| "📊 Seuil de sélection", | |
| [0.4, 0.5, 0.6, 0.65, 0.70, 0.75, 0.80, 0.85], | |
| index=2, | |
| key="threshold_select" | |
| ) | |
| with col3: | |
| max_articles = st.number_input( | |
| "📌 Nombre maximum d'articles", | |
| min_value=1, | |
| max_value=30, | |
| value=5, | |
| key="max_articles_input" | |
| ) | |
| # --- Affichage du bouton d'analyse juridique --- | |
| search_button = st.button("Rechercher les articles", type="secondary", key="search_button") | |
| ################################################################### | |
| #### --------------- 4. GENERATION DE LA REPONSE ------------- #### | |
| ################################################################### | |
| if generate_parliamentary_button or search_button or generate_analysis_button: | |
| if not question.strip(): | |
| st.warning("Veuillez entrer une question.") | |
| else: | |
| try: | |
| debug_logs = "" | |
| response_data = {} | |
| stats = {} | |
| if mode == "Réponse parlementaire": | |
| # Logique inchangée pour le mode "Réponse parlementaire" | |
| response_data = generate_response( | |
| question=question, | |
| detail_juridique=detail_juridique, | |
| longueur=longueur, | |
| response_orientation=selected_orientation, | |
| custom_instructions=custom_instructions, | |
| include_legal_articles=include_legal_articles, | |
| must_contain=must_contain if include_legal_articles else "", | |
| max_legal_articles=detail_juridique # le nombre d'articles est égal au niveau de détail juridique | |
| ) | |
| elif mode == "Analyse juridique": | |
| response_data = generate_legal_analysis( | |
| question=question, | |
| must_contain=must_contain, | |
| max_articles=max_articles, | |
| threshold=threshold, | |
| model_size=model_size, | |
| search_button=search_button, # Bouton "Rechercher les articles" | |
| generate_analysis_button=generate_analysis_button # Bouton "Générer l'analyse" | |
| ) | |
| # Détermination des onglets à afficher | |
| tabs = [] | |
| if mode == "Réponse parlementaire": | |
| tabs = ["📜 Réponse"] | |
| if include_legal_articles: | |
| tabs.append("⚖️ Articles juridiques") | |
| tabs.extend(["🏛️ Anciennes QE", "📰 Recherches actualités", "📄 Base documentaire"]) | |
| elif mode == "Analyse juridique": | |
| tabs = ["⚖️ Articles juridiques"] | |
| if generate_analysis_button and response_data.get("response"): | |
| tabs.insert(0, "📜 Analyse") # Ajoute "Analyse" en premier si générée | |
| # Création dynamique des onglets | |
| if tabs: | |
| st_tabs = st.tabs(tabs) | |
| # Affichage du contenu en fonction des onglets | |
| for i, tab in enumerate(st_tabs): | |
| with tab: | |
| if mode == "Réponse parlementaire": | |
| if "📜 Réponse" in tabs[i]: | |
| st.markdown("#### Réponse générée") | |
| st.markdown(response_data["response"]) | |
| if response_data.get("debug_logs"): | |
| with st.expander("🐛 Voir les logs de recherche"): | |
| st.text_area("Logs", response_data["debug_logs"], height=200) | |
| # Bouton d'export | |
| if response_data.get("response"): | |
| export_content = build_export_content( | |
| response_data, | |
| mode="parlementaire", # ← Mode codé en dur (valide car dans le bloc "Réponse parlementaire") | |
| include_legal_articles=include_legal_articles # ← Utilise la variable existante | |
| ) | |
| st.download_button( | |
| label="📥 Exporter en TXT", | |
| data=export_content.encode("utf-8"), | |
| file_name="export_reponse_parlementaire.txt", # ← Nom de fichier plus clair | |
| mime="text/plain", | |
| key=f"export_reponse_{i}" # ← Clé unique basée sur l'index | |
| ) | |
| st.markdown('<div style="height: 300px;"></div>', unsafe_allow_html=True) | |
| elif "🏛️ Anciennes QE" in tabs[i]: | |
| st.markdown("#### 5 Questions parlementaires les plus similaires de la plus récente à la plus ancienne") | |
| if not response_data.get("similar_documents"): | |
| st.info("Aucune question parlementaire similaire trouvée.") | |
| else: | |
| similar_documents = response_data["similar_documents"] | |
| # Tri par date décroissante | |
| similar_documents = sorted( | |
| similar_documents, | |
| key=lambda d: safe_parse_date(d.date_reponse), | |
| reverse=True | |
| ) | |
| # Affichage avec score | |
| for idx, doc in enumerate(similar_documents): | |
| chambre = doc.chambre or ("Assemblée nationale" if str(doc.uid).startswith("QAN") else "Sénat") | |
| date_reponse = doc.date_reponse or "Inconnue" | |
| question_text = doc.question or "Question non disponible" | |
| reponse_text = doc.reponse or "Réponse non disponible" | |
| score = f"{doc.score:.2f}" if doc.score is not None else "N/A" | |
| with st.expander(f"{idx+1}. QE {doc.uid} ({chambre}) - Score de proximité : {score}"): | |
| st.markdown(f"**Date de réponse:** {date_reponse}") | |
| st.markdown(f"**Chambre:** {chambre}") | |
| st.markdown(f"**Question:** {question_text}") | |
| st.markdown(f"**Réponse:** {reponse_text}") | |
| st.markdown('<div style="height: 300px;"></div>', unsafe_allow_html=True) | |
| elif "⚖️ Articles juridiques" in tabs[i]: | |
| st.markdown("#### Articles juridiques pertinents") | |
| legal_sources = response_data.get("legal_sources", []) | |
| if not legal_sources: | |
| st.info("Aucun texte juridique cité.") | |
| else: | |
| for idx, art in enumerate(legal_sources): | |
| score = f"{art.score:.2f}" if art.score is not None else "N/A" | |
| with st.expander(f"{idx+1}. Article {art.num} ({art.collection}) - Score : {score}"): | |
| st.markdown(f"**Titre:** {art.titre}") | |
| st.markdown(f"**Contexte hiérarchique:** {art.contexte_hierarchique}") | |
| st.markdown(f"**Texte complet:**\n\n{art.article_complet}") | |
| elif "📰 Recherches actualités" in tabs[i]: | |
| st.markdown("#### Dernières annonces et actualités gouvernementales") | |
| search_results = response_data.get("search_results", []) | |
| if not search_results: | |
| st.info("Aucune actualité trouvée via le moteur de recherche.") | |
| else: | |
| for idx, item in enumerate(search_results): | |
| titre = item.get("title", "Sans titre") | |
| url = item.get("url", "") | |
| # Utiliser "content" pour Tavily, "snippet" pour Google | |
| extrait = item.get("content") or item.get("snippet") or "" | |
| with st.expander(f"{idx+1}. {titre}"): | |
| if url: | |
| st.markdown(f"[Lien vers la source]({url})") | |
| if extrait: | |
| st.markdown(extrait) | |
| elif "📄 Base documentaire" in tabs[i]: | |
| st.markdown("#### Résultats pertinents dans les documents uploadés") | |
| uploaded_results = response_data.get("uploaded_documents", []) | |
| # Filtrer par score | |
| min_score = 0.7 | |
| filtered_results = [res for res in uploaded_results if res.get("score", 0) >= min_score] | |
| if not filtered_results: | |
| st.info("Aucun extrait pertinent trouvé dans les documents uploadés.") | |
| else: | |
| # Regrouper par document | |
| grouped = {} | |
| for res in filtered_results: | |
| doc_name = res["collection"].split('__')[0].replace('_', ' ') | |
| grouped.setdefault(doc_name, []).append(res) | |
| for doc_name, results in grouped.items(): | |
| with st.expander(f"📄 {doc_name} ({len(results)} extraits)"): | |
| for idx, res in enumerate(results, start=1): | |
| score = f"{res['score']:.2f}" if res.get("score") is not None else "N/A" | |
| text_preview = res["text"] | |
| title = res.get("title") or "N/A" | |
| st.markdown(f"**Extrait {idx} (score: {score})**") | |
| st.markdown(text_preview) | |
| st.markdown(f"**Section :** {title}") | |
| st.markdown("---") | |
| # Actions sur la collection | |
| col1, col2 = st.columns([1, 1]) | |
| target_collection = results[0]["collection"] | |
| with col1: | |
| if st.button("✏️ Renommer", key=f"rename_{target_collection}"): | |
| st.session_state.show_rename_modal = True | |
| st.session_state.current_doc_to_rename = target_collection | |
| st.rerun() | |
| with col2: | |
| if st.button("🗑️ Supprimer", key=f"del_{target_collection}"): | |
| qdrant_client.delete_collection(collection_name=target_collection) | |
| st.success(f"Document '{doc_name}' supprimé.") | |
| st.rerun() | |
| elif mode == "Analyse juridique": | |
| if "📜 Analyse" in tabs[i]: | |
| st.markdown("#### Analyse juridique générée") | |
| st.markdown(response_data["response"]) | |
| # Affichage des logs (si disponibles) | |
| if response_data.get("debug_logs"): | |
| with st.expander("🐛 Voir les logs de recherche"): | |
| st.text_area("Logs", response_data["debug_logs"], height=200) | |
| # Bouton d'export | |
| if response_data.get("response"): | |
| export_content = build_export_content( | |
| response_data, | |
| mode="analyse", # Mode pour l'export | |
| include_legal_articles=False | |
| ) | |
| st.download_button( | |
| label="📥 Exporter en TXT", | |
| data=export_content.encode("utf-8"), | |
| file_name=f"analyse_juridique_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", # Nom de fichier unique | |
| mime="text/plain", | |
| key=f"export_analyse_{i}" | |
| ) | |
| st.markdown('<div style="height: 300px;"></div>', unsafe_allow_html=True) | |
| elif "⚖️ Articles juridiques" in tabs[i]: | |
| st.markdown("#### Articles juridiques pertinents") | |
| # Vérification de la présence de sources | |
| if not response_data.get("sources"): | |
| st.info("Aucun article juridique trouvé.") | |
| else: | |
| # Construction de l'arbre législatif (comme dans ton code original) | |
| tree = {} | |
| for source in response_data["sources"]: | |
| art = source["article"] # Objet RetrievedLegalDocument | |
| add_to_tree(tree, art, source) # Ajoute l'article à l'arbre | |
| # Affichage de l'arbre avec render_tree | |
| render_tree(st, tree) | |
| except Exception as e: | |
| st.error(f"Erreur inattendue: {str(e)}") | |
| st.exception(e) | |
| ################################################################### | |
| #### -------- 5. GESTION ET AFFICHAGE DE L'HISTORIQUE -------- #### | |
| ################################################################### | |
| # Ajout d'une entrée à l'historique après génération d'une réponse | |
| mode_value = "parlementaire" | |
| if generate_parliamentary_button or generate_analysis_button: | |
| if not question.strip(): | |
| st.warning("Veuillez entrer une question.") | |
| else: | |
| try: | |
| # Génération d'une clé unique pour la question | |
| q_hash = hashlib.md5(question.encode()).hexdigest() | |
| # Préparation des métadonnées communes | |
| metadata = { | |
| "mode": mode_value, | |
| "timestamp": datetime.now(pytz.timezone('Europe/Paris')).isoformat(), | |
| "model_used": f"mistral-{model_size}-latest", | |
| "include_legal_articles": include_legal_articles if mode_value == "parlementaire" else False, | |
| "legislature": None, | |
| "rubrique": None | |
| } | |
| # Filtrer les documents uploadés par score (seuil = 0.7) | |
| min_score = 0.7 | |
| uploaded_docs_filtered = [ | |
| res for res in response_data.get("uploaded_documents", []) | |
| if res.get("score", 0) >= min_score | |
| ] | |
| # Ajout à l'historique | |
| st.session_state.full_historique[q_hash] = { | |
| "question": question, | |
| "response": response_data.get("response", "Pas de réponse générée"), | |
| "similar_documents": response_data.get("similar_documents", []), | |
| "legal_sources": response_data.get("legal_sources", []), | |
| "sources": response_data.get("sources", []), # Pour le mode "analyse" | |
| "search_results": response_data.get("search_results", []), | |
| "uploaded_documents": uploaded_docs_filtered, | |
| "metadata": metadata | |
| } | |
| save_historique() # Sauvegarde immédiate | |
| except Exception as e: | |
| st.error(f"Erreur lors de l'ajout à l'historique: {str(e)}") | |
| # --- Affichage de l'historique complet --- | |
| st.markdown(""" | |
| <style> | |
| /* Centrage du bloc historique */ | |
| .history-container { | |
| max-width: 1000px; | |
| margin-left: auto; | |
| margin-right: 0; | |
| padding-left: 1rem; | |
| } | |
| /* Style des expanders */ | |
| .history-expander { | |
| margin-bottom: 0.5rem !important; | |
| border: 1px solid #e9ecef; | |
| border-radius: 8px; | |
| } | |
| /* Espacement des éléments */ | |
| .history-entry { | |
| margin-bottom: 1rem; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Conteneur pour centrer à gauche | |
| with st.container(): | |
| st.markdown('<div class="history-container">', unsafe_allow_html=True) | |
| if hasattr(st.session_state, 'full_historique') and st.session_state.full_historique: | |
| nb_entries = len(st.session_state.full_historique) | |
| st.markdown( | |
| f""" | |
| <div style="display:flex;align-items:center;gap:10px;margin-bottom:1rem;"> | |
| <span style="font-size:18px;font-weight:bold;">🗂️ Historique des questions ({nb_entries})</span> | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| # Tri du plus récent au plus ancien | |
| sorted_historique = sorted( | |
| st.session_state.full_historique.items(), | |
| key=lambda x: x[1]["metadata"].get("timestamp", ""), | |
| reverse=True | |
| ) | |
| for idx, (q_hash, entry) in enumerate(sorted_historique): | |
| with st.expander(f"{idx+1}. {truncate_text(entry['question'], max_tokens=50)}", expanded=False): | |
| # Métadonnées | |
| metadata = entry.get("metadata", {}) | |
| st.caption( | |
| f"🕒 {metadata.get('timestamp', '')[:16].replace('T', ' ')} " | |
| f"| Mode: {metadata.get('mode', 'inconnu')} " | |
| f"| Modèle: {metadata.get('model_used', 'inconnu')}" | |
| ) | |
| # Question | |
| st.markdown(f"**📝 Question:**\n{entry.get('question', 'Non disponible')}") | |
| # Réponse | |
| st.markdown(f"**💬 Réponse:**") | |
| st.markdown(entry.get('response', 'Non disponible')) | |
| # --- Sources juridiques (sans expander) --- | |
| if entry.get("legal_sources") or entry.get("sources"): | |
| st.markdown("### ⚖️ Articles juridiques") | |
| if metadata.get("mode") == "analyse_juridique": | |
| if not entry.get("sources"): | |
| st.info("Aucun article juridique enregistré.") | |
| else: | |
| tree = {} | |
| for source in entry["sources"]: | |
| art = source["article"] | |
| add_to_tree(tree, art, source) | |
| render_tree(st, tree) | |
| else: # Mode "Réponse parlementaire" | |
| for art in entry.get("legal_sources", []): | |
| st.markdown(f"#### Article {getattr(art, 'num', 'N/A')} ({getattr(art, 'collection', 'N/A')})") | |
| st.markdown(f"**Titre:** {getattr(art, 'titre', 'Non disponible')}") | |
| st.markdown(f"**Texte:**\n{getattr(art, 'article_complet', 'Non disponible')}") | |
| st.markdown("---") | |
| # --- Résultats de recherche internet (sans expander) --- | |
| if entry.get("search_results"): | |
| st.markdown("### 🌐 Résultats de recherche internet") | |
| for item_idx, item in enumerate(entry["search_results"]): | |
| st.markdown(f"#### Résultat {item_idx+1}: {item.get('title', 'Sans titre')}") | |
| if item.get("url"): | |
| st.markdown(f"[Lien]({item.get('url')})") | |
| st.markdown(item.get("content") or item.get("snippet", "Aucun extrait disponible")) | |
| st.markdown("---") | |
| # --- Anciennes QE similaires (sans expander) --- | |
| if entry.get("similar_documents"): | |
| st.markdown("### 🏛️ Questions parlementaires similaires") | |
| for doc_idx, doc in enumerate(entry["similar_documents"]): | |
| st.markdown(f"#### QE {doc_idx+1} - Score: {getattr(doc, 'score', 'N/A'):.2f}") | |
| st.markdown(f"**Question:** {getattr(doc, 'question', 'Non disponible')}") | |
| st.markdown(f"**Réponse:** {getattr(doc, 'reponse', 'Non disponible')}") | |
| st.markdown("---") | |
| # --- Résultats vectoriels sur documents uploadés (sans expander) --- | |
| if entry.get("uploaded_documents"): | |
| st.markdown("### 📄 Documents uploadés pertinents") | |
| grouped = {} | |
| for res in entry["uploaded_documents"]: | |
| doc_name = res["collection"].split('__')[0].replace('_', ' ') | |
| grouped.setdefault(doc_name, []).append(res) | |
| for doc_name, results in grouped.items(): | |
| st.markdown(f"#### {doc_name} ({len(results)} extraits)") | |
| for idx_res, res in enumerate(results, start=1): | |
| score = f"{res['score']:.2f}" if res.get("score") is not None else "N/A" | |
| text_full = res["text"] | |
| title = res.get("title") or "N/A" | |
| st.markdown(f"**Extrait {idx_res} (score: {score})**") | |
| st.markdown(text_full) | |
| st.markdown(f"**Section :** {title}") | |
| st.markdown("---") | |
| # Boutons d'action | |
| col1, col2 = st.columns([1, 1]) | |
| with col1: | |
| export_content = build_export_content( | |
| entry, | |
| mode=metadata.get("mode", "parlementaire"), | |
| include_legal_articles=metadata.get("include_legal_articles", False) | |
| ) | |
| st.download_button( | |
| label="⬇️ Exporter en TXT", | |
| data=export_content.encode("utf-8"), | |
| file_name=f"export_{idx+1}_{metadata.get('mode', 'parlementaire')}.txt", | |
| mime="text/plain", | |
| key=f"export_hist_{q_hash}" | |
| ) | |
| with col2: | |
| if st.button("🗑️ Supprimer", key=f"del_hist_{q_hash}"): | |
| del st.session_state.full_historique[q_hash] | |
| save_historique() | |
| st.rerun() | |
| else: | |
| st.info("Aucune question enregistrée dans l'historique pour le moment.") | |
| st.markdown('</div>', unsafe_allow_html=True) | |