import streamlit as st import hashlib import requests import os import pytz import time import unicodedata import importlib.metadata import re import tempfile import uuid import base64 import logging from pydantic import BaseModel, validator from dotenv import load_dotenv from sentence_transformers import SentenceTransformer from qdrant_client import QdrantClient from qdrant_client.http import models from typing import List, Optional, Union, Dict, Any, Literal, Set from datetime import datetime, timedelta from collections import defaultdict from PyPDF2 import PdfReader from docx import Document from urllib.parse import urlparse, parse_qs try: import pdfplumber print("✅ pdfplumber est installé et fonctionnel.") except ImportError: print("❌ pdfplumber n'est pas installé. Exécutez : pip install pdfplumber") raise ################################################################# # 1. CLASSES, FONCTIONS UTILITAIRES, INITIALISATION DES VARIABLES ################################################################# # --- Persistance de l'historique via session_state --- def save_historique(): # Limite le nombre d'entrées pour éviter la surcharge (ex: 50 dernières) if len(st.session_state.full_historique) > 50: # Supprime les entrées les plus anciennes sorted_entries = sorted( st.session_state.full_historique.items(), key=lambda x: x[1]["metadata"].get("timestamp", "") ) st.session_state.full_historique = dict(sorted_entries[-50:]) st.session_state.historique_cache = st.session_state.full_historique.copy() def load_historique(): if "historique_cache" in st.session_state: st.session_state.full_historique = st.session_state.historique_cache.copy() # Pour permettre de renommer une collection if 'show_rename_modal' not in st.session_state: st.session_state.show_rename_modal = False if 'current_doc_to_rename' not in st.session_state: st.session_state.current_doc_to_rename = None # Initialisation des états de certaines variables si non existants if 'use_priority_docs' not in st.session_state: st.session_state.use_priority_docs = False if 'priority_docs' not in st.session_state: st.session_state.priority_docs = [] # Estimation du nombre de tokens d'un texte en français def estimate_tokens(text): """Estime le nombre de tokens pour Mistral Large (1 token ≈ 4 caractères en français).""" return len(text) // 4 # Fonction utilitaire pour tronquer le texte def truncate_text(text: str, max_tokens: int = 500) -> str: """Tronque un texte à un nombre maximal de tokens (1 token ≈ 4 caractères).""" max_chars = max_tokens * 4 return (text[:max_chars] + "...") if len(text) > max_chars else text # Fonction de conversion des dates def safe_parse_date(date_str: Optional[str]) -> datetime: """Convertit une date hétérogène en datetime, ou datetime.min si invalide.""" if not date_str: return datetime.min for fmt in ("%Y-%m-%d", "%d/%m/%Y", "%Y/%m/%d"): try: return datetime.strptime(date_str, fmt) except ValueError: continue return datetime.min # Masquer les messages de warning de pdfminer logging.getLogger("pdfminer").setLevel(logging.WARNING) # Clé numérique def sort_key(article_num: str): """ Transforme un identifiant d'article (ex: 'D146-12-1') en tuple numérique pour un tri correct. """ if not article_num: return ("",) prefix = article_num[0] parts = re.findall(r'\d+', article_num) nums = [int(p) for p in parts] return (prefix, *nums) # Charger l'historique au démarrage load_historique() # --- Configuration de la page (doit être unique et en premier) --- st.set_page_config( page_title="Parlement RAG", page_icon="🗳️", layout="wide" # ← élargit toute la page ) # --- Classe CSS pour tous les messages de statut st.markdown(""" """, unsafe_allow_html=True) # --- Chargement des variables d'environnement --- load_dotenv() QDRANT_URL = os.getenv("QDRANT_URL", "").strip() QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", "").strip() MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY", "").strip() TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "").strip() GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "").strip() GOOGLE_CX = os.getenv("GOOGLE_CX", "").strip() QDRANT_COLLECTION = "QuestionParlementaire" if not QDRANT_URL.startswith("https://"): raise RuntimeError("❌ QDRANT_URL doit commencer par https://") if not QDRANT_API_KEY: raise RuntimeError("❌ QDRANT_API_KEY manquant. Vérifiez votre fichier .env") if not MISTRAL_API_KEY: raise RuntimeError("❌ MISTRAL_API_KEY manquant. Vérifiez votre fichier .env") print("QDRANT_URL:", QDRANT_URL) print("QDRANT_API_KEY (début):", QDRANT_API_KEY[:10], "...") # --- Chargement du modèle --- LOCAL_MODEL_PATH = "./models/camembert_finetuned_progressive" HUB_MODEL_PATH = "Whisler/camembert_finetuned_progressive" @st.cache_resource def load_embedding_model(): try: if os.path.exists(LOCAL_MODEL_PATH): model = SentenceTransformer(LOCAL_MODEL_PATH) print("✅ Modèle chargé en local.") else: model = SentenceTransformer(HUB_MODEL_PATH) print("✅ Modèle téléchargé depuis HuggingFace Hub.") test_embedding = model.encode("Test de chargement du modèle.") VECTOR_SIZE = len(test_embedding) print("Dimension des embeddings:", VECTOR_SIZE) return model except Exception as e: st.error(f"❌ Erreur de chargement du modèle: {str(e)}") raise embedding_model = load_embedding_model() # --- Connexion à Qdrant --- try: qdrant_client = QdrantClient( url=QDRANT_URL, api_key=QDRANT_API_KEY, timeout=10.0, check_compatibility=False ) collections = qdrant_client.get_collections() print(f"✅ Connexion réussie. Collections disponibles: {[c.name for c in collections.collections]}") except Exception as e: print(f"❌ Erreur de connexion à Qdrant: {e}") raise # --- Modèles Pydantic --- # Modele Pydantic pour les articles juridiques class BaseLegislativeRef(BaseModel): uid: str collection: str class RetrievedLegalDocument(BaseModel): # Identifiants chunk_id: str num: str titre: str # Contenu contenu: str article_complet: str # Contexte contexte_hierarchique: str collection: str # Hiérarchie optionnelle partie: Optional[str] = None livre: Optional[str] = None titre_structure: Optional[str] = None chapitre: Optional[str] = None section: Optional[str] = None sous_section: Optional[str] = None paragraphe: Optional[str] = None sous_paragraphe: Optional[str] = None # Références législatives base_legislative: Optional[List[BaseLegislativeRef]] = None # Score du retrieval score: Optional[float] = None # Modele Pydantic pour les documents generiques class GenericDocument(BaseModel): # Identifiants uid: Optional[str] = None # Contenu text: str title: Optional[str] = None part: Optional[str] = None # ex. "Annexe 7", "partie 1" # Métadonnées source: Optional[str] = None date_document: Optional[str] = None type_document: Optional[str] = None # Score du retrieval score: Optional[float] = None # Modele Pydantic pour les reponses RAG class ResponseDocument(BaseModel): # Identifiants uid: str # Contenu question: str reponse: str # Métadonnées legislature: Optional[str] = None chambre: Optional[str] = None # Assemblée ou Sénat (à ajouter si tu l’as dans tes données) rubrique: Optional[str] = None analyse: Optional[str] = None ministeres_attribues: Optional[List[str]] = None # Dates date_question: Optional[str] = None date_reponse: Optional[str] = None # Références juridiques éventuelles textes_juridiques: Optional[List[str]] = None # Score du retrieval score: Optional[float] = None ################################################################# # -------------- 1. PRINCIPALES FONCTIONS ----------------------- ################################################################# # --- 1a. Fonctions d'upload, d'indexation et d'embedding # Fonction pour extraire le texte d'un document pdf def extract_text(pdf_path: str, max_pages: Optional[int] = None) -> str: """ Extrait le texte d'un PDF avec plusieurs stratégies : - pdfplumber pour le texte brut - fallback OCR si une page est vide - nettoyage des espaces et des sauts de ligne """ text_chunks = [] try: with pdfplumber.open(pdf_path) as pdf: pages = pdf.pages[:max_pages] if max_pages else pdf.pages for i, page in enumerate(pages, start=1): try: # Extraction brute page_text = page.extract_text() or "" word_count = len(page_text.split()) if page_text else 0 # # Si vide, tenter une extraction par OCR (optionnel) # if not page_text.strip(): # try: # from pdf2image import convert_from_path # import pytesseract # images = convert_from_path(pdf_path, first_page=i, last_page=i) # ocr_text = pytesseract.image_to_string(images[0], lang="fra") # page_text = ocr_text # st.write(f"Page {i} → OCR fallback → {len(page_text.split())} mots") # except Exception as e: # print(f"⚠️ OCR non disponible pour la page {i}: {e}") # pass # Nettoyage basique page_text = re.sub(r"\s+", " ", page_text).strip() if page_text: text_chunks.append(page_text) except Exception as e: print(f"⚠️ Erreur page {i}: {e}") continue except Exception as e: print(f"❌ Erreur lors de l'ouverture du PDF: {e}") return "" raw_text = "\n\n".join(text_chunks).strip() return raw_text # Fonction pour extraire le texte d'un document Word def extract_text_from_docx(file_path: str) -> str: """Extrait le texte d'un fichier Word.""" try: doc = Document(file_path) text = "\n".join([para.text for para in doc.paragraphs if para.text.strip()]) return text.strip() except Exception as e: st.error(f"❌ Erreur lors de l'extraction du DOCX: {e}") return "" # Fonction pour nettoyer les textes extraits def clean_text(text: str) -> str: """Nettoie le texte extrait d'un PDF administratif en préservant la structure et les données utiles.""" lines = text.split('\n') cleaned_lines = [] for line in lines: line = line.strip() # Ignore les lignes vides ou presque vides if not line: continue # Ignore les numéros de page isolés (ex: "Page 57" ou "57") if re.match(r'^(?:Page\s*)?\d+\s*$', line, re.IGNORECASE): continue # Ignore les en-têtes/pieds de page répétitifs (ex: "PLFSS 2025 - Annexe 7") if re.match(r'^(?:PLFSS\s*\d{4}\s*-\s*Annexe\s*\d+|ANNEXE\s*DÉPENSES\s*DE\s*LA\s*BRANCHE\s*.*|securite-sociale\.fr|Source\s*:?\s*.*|Génération\s*X-Book)$', line, re.IGNORECASE): continue # Ignore les lignes avec seulement des caractères spéciaux ou des tirets if re.match(r'^[•○◘\-—~=]+$', line): continue # Conserve les lignes même courtes (titres, sous-titres, etc.) cleaned_lines.append(line) # Reconstitue le texte text = '\n'.join(cleaned_lines) # Nettoyage global text = re.sub(r'\s+', ' ', text) # Espaces multiples text = re.sub(r'(\w)\s+-\s+(\w)', r'\1\2', text) # Mots coupés par tiret text = re.sub(r'\bhttps?://\S+', '', text) # URLs text = re.sub(r'[•○◘]+|[-=~]{5,}', '', text) # Artefacts visuels # Nettoie les espaces résiduels text = text.strip() return text # Fonction de suppression du sommaire def remove_summary(text: str) -> str: """Supprime le sommaire et les tables des matières.""" cleaned = re.sub( r"(?i)(SOMMAIRE|TABLE DES MATIÈRES).*?(?=PARTIE\s+\d+|ANNEXE\s+\d+|Article\s+\d+|$)", "", text, flags=re.DOTALL ) return cleaned.strip() if cleaned.strip() else text # Fonction de pré-traitement des titres def preprocess_for_titles(text: str) -> str: """Insère des sauts de ligne avant chaque titre pour améliorer la segmentation.""" sentences = re.split(r'(?<=[.!?])\s+', text) processed_text = [] for sentence in sentences: sentence = sentence.strip() if not sentence: continue if detect_titles(sentence): processed_text.append(f"\n{sentence}\n") else: processed_text.append(sentence) text = " ".join(processed_text) # Ajout de sauts de ligne autour des titres title_patterns = [ r"(Article\s*\d*\s*[-–]?)", r"(ANNEXE\s+\d+)", r"(PARTIE\s+\d+)", r"(Fiches?\s+d’?évaluation\s+préalable)", r"(\d+\.\s+)", r"([IVXLCDM]+\.\s+)" ] for pattern in title_patterns: text = re.sub(pattern, r"\n\1\n", text, flags=re.IGNORECASE) text = re.sub(r'\n\s*\n', '\n', text) return text.strip() # Fonction de détection des titres def detect_titles(line: str) -> bool: """Détecte les titres de manière robuste.""" line = line.strip() if not line: return False regex_patterns = [ r"^Article\s+\d+\s*[–—-]?", r"^ANNEXE\s+\d+", r"^PARTIE\s+\d+", r"^(TITRE|Chapitre|Section)\s+\d+", r"^[IVXLCDM]+\.\s+", r"^\d+(\.\d+)*\s+" ] if any(re.match(p, line, flags=re.IGNORECASE) for p in regex_patterns): return True title_keywords = [ "Article ", "ANNEXE ", "PARTIE ", "Fiches d’évaluation préalable", "Synthèse", "Conclusion", "TITRE ", "Chapitre ", "Section ", "I. ", "II. ", "III. ", "1. ", "2. " ] return any(keyword.lower() in line.lower() for keyword in title_keywords) # Fonction de segmentation du texte def segment_text(text: str, max_words: int = 300, min_words: int = 50) -> List[Dict[str, str]]: """Découpe le texte en segments avec titres pour Qdrant.""" blocks = re.split(r'\n\n|\.\s+', text) segments = [] current_title = "AUTRE" current_content = [] for block in blocks: block = block.strip() if not block: continue if detect_titles(block): if current_content: seg_text = ' '.join(current_content) if len(seg_text.split()) >= min_words: segments.append({"title": current_title, "text": seg_text}) current_content = [] current_title = block else: current_content.append(block) if current_content: seg_text = ' '.join(current_content) if len(seg_text.split()) >= min_words: segments.append({"title": current_title, "text": seg_text}) return segments # Fonction pour préparer les chunks def prepare_chunks_fixed(text: str, file_name: str, chunk_size=350, overlap=50) -> List[Dict]: """ Découpe le texte en chunks fixes (~350 mots ≈ 512 tokens) avec overlap, en conservant le dernier titre détecté comme métadonnée. """ words = text.split() chunks = [] start = 0 current_title = "AUTRE" while start < len(words): end = start + chunk_size chunk_words = words[start:end] # Met à jour le titre courant si un mot ressemble à un titre for w in chunk_words: if detect_titles(w): current_title = w if len(chunk_words) >= 50: # filtre chunks trop courts chunk_text = " ".join(chunk_words) chunks.append({ "id": str(uuid.uuid4()), "text": chunk_text, "metadata": { "source": file_name, "section": current_title, "position": start, "word_count": len(chunk_words), "upload_date": datetime.now().isoformat() } }) start += chunk_size - overlap return chunks # Fonction pour l'upload et l'indexation avec logs détaillés def process_and_index_document(file_path: str, file_type: str, collection_name: str, qdrant_client=None, embedding_model=None, progress_callback=None): """Traite et indexe un document dans SA PROPRE COLLECTION avec suivi de progression et logs détaillés.""" try: # 1. Extraction du texte try: if file_type == "pdf": raw_text = extract_text(file_path) else: raw_text = extract_text_from_docx(file_path) if not raw_text: if progress_callback: progress_callback(0, 100, "Échec : extraction du texte") return False if progress_callback: progress_callback(5, 100, "Extraction du texte terminée") except Exception as e: st.error(f"❌ Erreur extraction: {repr(e)}") return False # 2. Nettoyage et segmentation try: cleaned_text = clean_text(raw_text) segments = segment_text(cleaned_text) or [{"title": "Document", "text": cleaned_text}] if progress_callback: progress_callback(10, 100, "Nettoyage et segmentation terminés") except Exception as e: st.error(f"❌ Erreur nettoyage/segmentation: {repr(e)}") return False # 3. Pré-traitement titres try: preprocessed_text = preprocess_for_titles(cleaned_text) except Exception as e: st.error(f"❌ Erreur preprocess_for_titles: {repr(e)}") return False # 4. Chunking try: chunks = prepare_chunks_fixed( preprocessed_text, file_name=collection_name, chunk_size=350, # ≈ 512 tokens overlap=50 ) except Exception as e: st.error(f"❌ Erreur chunking: {repr(e)}") return False if progress_callback: progress_callback(30, 100, f"Préparation des chunks terminée ({len(chunks)} chunks)") # 5. Génération des embeddings (par petits lots) texts = [chunk["text"] for chunk in chunks] embeddings = [] total_chunks = len(chunks) for i in range(0, total_chunks, 5): batch_texts = texts[i:i+5] try: batch_embeddings = embedding_model.encode(batch_texts).tolist() embeddings.extend(batch_embeddings) except Exception as e: st.error(f"❌ Erreur génération embeddings: {repr(e)}") if progress_callback: progress_callback(0, 100, f"Erreur génération embeddings: {str(e)}") return False if progress_callback: current_chunk = min(i + 5, total_chunks) progress_callback(30 + int(30 * current_chunk / total_chunks), 100, f"Génération des embeddings : {current_chunk}/{total_chunks}") # 6. Indexation dans Qdrant (par petits lots avec réessais) points = [ models.PointStruct( id=chunk["id"], vector=embedding, payload={"text": chunk["text"], **chunk["metadata"]} ) for chunk, embedding in zip(chunks, embeddings) ] batch_size = 10 max_retries = 2 retry_delay = 2 for i in range(0, len(points), batch_size): batch = points[i:i + batch_size] success = False retry_count = 0 while retry_count < max_retries and not success: try: qdrant_client.upsert( collection_name=collection_name, points=batch, wait=True, ) success = True except Exception as e: retry_count += 1 st.error(f"⚠️ Erreur upsert batch {i//batch_size+1}: {repr(e)}") if progress_callback: progress_callback(0, 100, f"Échec batch {i//batch_size + 1}, tentative {retry_count}/{max_retries}") if retry_count < max_retries: time.sleep(retry_delay) if not success: if progress_callback: progress_callback(0, 100, f"Échec définitif batch {i//batch_size + 1}") return False if progress_callback: current_point = min(i + batch_size, len(points)) progress_callback(60 + int(40 * current_point / len(points)), 100, f"Indexation : {current_point}/{len(points)} chunks") if progress_callback: progress_callback(100, 100, "Indexation terminée avec succès") st.success("🎉 Document indexé avec succès") return True except Exception as e: if progress_callback: progress_callback(0, 100, f"Erreur : {str(e)}") st.error(f"Erreur dans process_and_index_document: {e}") return False # --- 1b. Fonctions de recherche --- # Fonction qui supprime les accents def normalize_query(query: str) -> str: # Supprime les accents et normalise en ASCII return ''.join( c for c in unicodedata.normalize('NFD', query) if unicodedata.category(c) != 'Mn' ) # Fonction qui extrait le sujet principal de la question def extract_subject(question: str) -> str: """ Extrait le sujet principal d'une question parlementaire sous forme de 3 à 5 mots-clés. - Supprime toute mention de députés, du Gouvernement ou de formulations inutiles. - Nettoie la ponctuation et tronque à quelques mots. """ url = "https://api.mistral.ai/v1/chat/completions" headers = { "Authorization": f"Bearer {MISTRAL_API_KEY}", "Content-Type": "application/json" } payload = { "model": "mistral-small-latest", # possibilité de remplacer "small" par "medium" "messages": [ { "role": "system", "content": ( "Tu es un assistant qui identifie uniquement le sujet principal " "d'une question parlementaire. " "Ne mentionne jamais le député ou le gouvernement. " "Donne une réponse sous forme de 3 à 5 mots-clés concis, " "centrés sur le thème (pas de phrase complète)." ) }, { "role": "user", "content": question } ], "temperature": 0.2, "max_tokens": 20 } response = requests.post(url, headers=headers, json=payload) response.raise_for_status() data = response.json() try: subject = data["choices"][0]["message"]["content"].strip() except (KeyError, IndexError): subject = "sujet non identifié" # Nettoyage supplémentaire côté Python subject = re.sub(r"\b(députée?|député|gouvernement|ministre|assemblée nationale|sénat)\b", "", subject, flags=re.IGNORECASE) subject = subject.replace("**Sujet principal**", "").strip() subject = re.sub(r"[^\w\s]", " ", subject) # supprime ponctuation subject = re.sub(r"\s+", " ", subject) # Tronquer à 5 mots max tokens = subject.split() subject = " ".join(tokens[:5]) print("=== Sujet extrait (compact) ===", subject) return subject # Fonction de recherche Tavily (et filtre pour les scores de pertinence <0.5) def search_tavily_government(subject: str, min_score: float = 0.5): """ Recherche les annonces gouvernementales récentes sur un sujet donné via Tavily. - Filtre par domaines autorisés - Filtre par date (moins d'un an si disponible) - Filtre par score de pertinence (>= min_score) """ url = "https://api.tavily.com/search" headers = {"Authorization": f"Bearer {TAVILY_API_KEY}"} allowed_domains = [ "gouvernement.fr", "education.gouv.fr", "vie-publique.fr", "elysee.fr", "solidarites.gouv.fr", "sante.gouv.fr", "travail-sante-solidarites.gouv.fr", "securite-sociale.fr", "ameli.fr", "lassuranceretraite.fr", "caf.fr", "msa.fr", "urssaf.fr", "legifrance.gouv.fr", "drees.solidarites-sante.gouv.fr", "ars.sante.fr", "cnsa.fr", "en3s.fr", "francetravail.fr" ] payload = { "query": f"dernières annonces gouvernement France {subject}", "max_results": 100, "include_answer": True, "include_domains": allowed_domains } response = requests.post(url, json=payload, headers=headers) response.raise_for_status() data = response.json() raw_results = data.get("results", []) # Filtre par domaine by_domain = [r for r in raw_results if any(d in r.get("url", "") for d in allowed_domains)] # Filtre par date (moins d'un an si dispo) cutoff = datetime.now() - timedelta(days=365) recent = [] for r in by_domain: ds = r.get("published_date") or r.get("date") if ds: try: pub = datetime.fromisoformat(ds.replace("Z", "")) # ✅ Condition supplémentaire : année 2025 if pub.year == 2025 or pub >= cutoff: recent.append(r) continue else: continue except Exception: # date non exploitable → on garde recent.append(r) else: recent.append(r) # Filtre par score filtered = [r for r in recent if r.get("score", 0) >= min_score] filtered.sort(key=lambda x: x.get("score", 0), reverse=True) # ✅ Limiter à 10 après filtrage data["results"] = filtered[:10] return data # Fonction de recherche Google def search_google_government(subject: str, min_score: float = 0.5, max_results: int = 10): """ Recherche des informations via Google Custom Search API sur un sujet donné. - Même structure et format que search_tavily_government - Entrée: subject (str), min_score, max_results - Sortie: dict {"results": [{"title","url","content","score","published_date"}]} """ url = "https://www.googleapis.com/customsearch/v1" params = { "q": f"dernières annonces gouvernement France {subject}", "key": GOOGLE_API_KEY, "cx": GOOGLE_CX, "num": max_results } response = requests.get(url, params=params, timeout=20) response.raise_for_status() data = response.json() raw_results = data.get("items", []) allowed_domains = [ "gouvernement.fr", "info.gouv.fr", "elysee.fr", "vie-publique.fr", "education.gouv.fr", "solidarites.gouv.fr", "sante.gouv.fr", "travail-sante-solidarites.gouv.fr", "securite-sociale.fr", "ameli.fr", "lassuranceretraite.fr", "caf.fr", "msa.fr", "urssaf.fr", "legifrance.gouv.fr", "drees.solidarites-sante.gouv.fr", "ars.sante.fr", "cnsa.fr", "en3s.fr", "francetravail.fr" ] # Filtre par domaine by_domain = [r for r in raw_results if any(d in r.get("link", "") for d in allowed_domains)] # Filtre par date (moins d'un an si dispo) cutoff = datetime.now() - timedelta(days=365) recent = [] for r in by_domain: date_str = r.get("pagemap", {}).get("metatags", [{}])[0].get("article:published_time") if date_str: try: pub = datetime.fromisoformat(date_str.replace("Z", "")) if pub >= cutoff: recent.append(r) except Exception: recent.append(r) else: recent.append(r) # Filtre par score (Google ne fournit pas de score → fallback = 1.0 si snippet présent) filtered = [r for r in recent if r.get("snippet")] filtered = filtered[:max_results] # Format homogène comme Tavily results = [] for r in filtered: results.append({ "title": r.get("title", ""), "url": r.get("link", ""), "content": r.get("snippet", ""), # Tavily utilisait "content" "score": 1.0, # Valeur par défaut "published_date": date_str if r.get("pagemap", {}).get("metatags") else None }) return {"results": results} # --- Fonction de log pour le debug --- def log_debug(title: str, data: Any, max_length: int = 500): """Affiche un log de debug dans un fichier.""" import os from datetime import datetime # Chemin absolu vers le dossier de logs (dans le répertoire courant) log_dir = os.path.join(os.getcwd(), "logs") # Créer le dossier s'il n'existe pas if not os.path.exists(log_dir): os.makedirs(log_dir) print(f"📁 Dossier créé : {log_dir}") # Chemin absolu vers le fichier de log log_file = os.path.join(log_dir, "debug_logs.txt") try: with open(log_file, "a", encoding="utf-8") as f: f.write(f"\n--- {title} ---\n") f.write(f"Timestamp: {datetime.now().isoformat()}\n") if isinstance(data, dict): for k, v in data.items(): f.write(f"{k}: {str(v)[:max_length]}\n") elif isinstance(data, list): f.write(f"List of {len(data)} items:\n") for i, item in enumerate(data[:3]): f.write(f" Item {i}: {str(item)[:max_length]}\n") else: f.write(f"{str(data)[:max_length]}\n") f.write("---\n") # Confirmation dans le terminal print(f"✅ Log enregistré dans {log_file}: {title}") except Exception as e: print(f"❌ Erreur lors de l'écriture du log : {str(e)}") # Fonctions utilitaires car le champ base_legislative peut contenir soit des objets BaseLegislativeRef, soit des dicts (selon la provenance des données) def get_ref_uid(ref) -> str | None: return ref.uid if hasattr(ref, "uid") else ref.get("uid") def get_ref_collection(ref, default_collection: str) -> str: return ref.collection if hasattr(ref, "collection") else ref.get("collection", default_collection) # Préparer pour préparer le contenu en vue d'un export .txt def build_export_content(response_data: dict, mode: str, include_legal_articles: bool = False) -> str: lines = [] # Question question = response_data.get("question") if question: lines.append("❓ Question\n") lines.append(str(question)) lines.append("\n\n") # En-tête (Réponse ou Analyse) if mode == "analyse": lines.append("🔎 Analyse juridique\n") else: lines.append("📜 Réponse\n") lines.append(str(response_data.get("response", "Pas de texte généré"))) lines.append("\n\n") # Résumé (si présent) summary = response_data.get("summary") if summary: lines.append("📝 Résumé\n") lines.append(str(summary)) lines.append("\n\n") # Résultats de recherche (si présents) search_results = response_data.get("search_results", []) if search_results: lines.append("🌐 Résultats de recherche\n") for idx, item in enumerate(search_results, start=1): titre = item.get("title", "Sans titre") url = item.get("url", "") extrait = item.get("content") or item.get("snippet") or "" score = item.get("score", "N/A") date = item.get("published_date", "N/A") lines.append(f"{idx}. {titre}\n") if url: lines.append(f" Lien: {url}\n") if extrait: lines.append(f" Extrait: {extrait}\n") lines.append(f" Score: {score} | Date: {date}\n\n") # Anciennes QE (si présentes) lines.append("🏛️ Anciennes QE\n") for doc in response_data.get("similar_documents", []): score = f"{getattr(doc, 'score', 0):.2f}" if getattr(doc, "score", None) is not None else "N/A" chambre = getattr(doc, "chambre", "Inconnue") lines.append(f"- QE {doc.uid} ({chambre}) - Score : {score}\n") lines.append(f" Question: {doc.question}\n") lines.append(f" Réponse: {doc.reponse}\n\n") # Articles juridiques (pour les deux modes) if mode == "analyse" or include_legal_articles: lines.append("⚖️ Articles juridiques\n") # Mode "Analyse juridique" : utilise "sources" (avec relations parents/enfants) if mode == "analyse": sources = response_data.get("sources", []) if not sources: lines.append("Aucun article juridique enregistré.\n") else: for source in sources: art = source["article"] lines.append(f"--- Article {art.num} ({art.collection}) ---\n") lines.append(f"Titre: {art.titre}\n") lines.append(f"Texte: {art.article_complet}\n") # Relations parents (articles cités) if source.get("parents"): lines.append("Articles cités:\n") for parent in source["parents"]: lines.append(f"- {parent.num}: {parent.titre}\n") # Relations enfants (référencé par) if source.get("enfants"): lines.append("Référencé par:\n") for enfant in source["enfants"]: lines.append(f"- {enfant.num}: {enfant.titre}\n") lines.append("\n") # Mode "Réponse parlementaire" : utilise "legal_sources" (sans relations) else: legal_sources = response_data.get("legal_sources", []) if not legal_sources: lines.append("Aucun texte juridique cité.\n") else: for art in legal_sources: lines.append(f"- Article {getattr(art, 'num', 'N/A')} ({getattr(art, 'collection', 'N/A')})\n") if getattr(art, "titre", None): lines.append(f" Titre: {art.titre}\n") contenu = art.article_complet if hasattr(art, 'article_complet') else getattr(art, 'contenu', '') lines.append(" Texte:\n" + str(contenu) + "\n\n") return "\n".join(str(x) for x in lines) # Fonction qui construit un objet RetrievedLegalDocument à partir d'un payload Qdrant def make_retrieved_document(payload: dict, uid: str, collection: str, score: float = 0) -> RetrievedLegalDocument: """Construit un objet RetrievedLegalDocument à partir d'un payload Qdrant.""" return RetrievedLegalDocument( chunk_id=uid, num=payload.get("num", ""), titre=payload.get("titre", ""), contenu=payload.get("contenu", ""), article_complet=payload.get("article_complet", payload.get("contenu", "")), contexte_hierarchique=payload.get("contexte_hierarchique", ""), collection=collection, partie=payload.get("partie"), livre=payload.get("livre"), titre_structure=payload.get("titre_structure"), chapitre=payload.get("chapitre"), section=payload.get("section"), sous_section=payload.get("sous_section"), paragraphe=payload.get("paragraphe"), sous_paragraphe=payload.get("sous_paragraphe"), base_legislative=payload.get("base_legislative", []), score=score ) # Fonction qui recherche des articles juridiques dans les collections Qdrant en utilisant des embeddings def search_articles( query: str, partie: Optional[str] = None, limit: int = 5, must_contain: Optional[str] = None, debug: bool = False, threshold: float = 0.0 ) -> Dict[str, Any]: """Recherche optimisée d'articles juridiques dans Qdrant.""" target_collections = ["CASF", "Code du travail", "Code de la santé publique", "Code de la sécurité sociale"] collections = qdrant_client.get_collections() valid_collections = [c.name for c in collections.collections if c.name in target_collections] if not valid_collections: return {"sources": [], "total": 0, "limit": limit, "offset": 0} try: # Recherche vectorielle embedding = embedding_model.encode(query).tolist() query_filter = models.Filter( must=[models.FieldCondition(key="partie", match=models.MatchValue(value=partie))] ) if partie else None all_results = [] for collection in valid_collections: try: hits = qdrant_client.search( collection_name=collection, query_vector=embedding, query_filter=query_filter, limit=limit, with_payload=True, with_vectors=False ) all_results.extend(hits) except Exception as e: continue # Filtrage par score results = [r for r in all_results if r.score >= threshold] # Filtrage par mot-clé if must_contain: results = [ r for r in results if must_contain.lower() in r.payload.get("contenu", "").lower() ] if not results: return {"sources": [], "total": 0, "limit": limit, "offset": 0} # Mapping vers RetrievedLegalDocument documents: List[RetrievedLegalDocument] = [] for result in results: try: # Utilise payload["chunk_id"] (str) au lieu de result.id (int/UUID) chunk_id = result.payload.get("chunk_id") # Ex: "L111-1_chunk0" if not chunk_id: chunk_id = result.payload.get("uid", str(result.id)) # Fallback pour les collections sans chunk_id article = make_retrieved_document( payload=result.payload, uid=chunk_id, # ← Ici, chunk_id est TOUJOURS une chaîne collection=result.payload.get("collection", ""), score=result.score ) documents.append(article) except Exception as e: continue # Déduplication par numéro seen = set() unique_documents = [] for doc in documents: if doc.num not in seen: seen.add(doc.num) unique_documents.append(doc) # Tri par numéro unique_documents.sort(key=lambda d: d.num) return { "sources": unique_documents[:limit], "total": len(unique_documents), "limit": limit, "offset": 0 } except Exception as e: import traceback traceback.print_exc() return {"sources": [], "total": 0, "limit": limit, "offset": 0} # Fonction pour construire un arbre législatif def build_legislative_tree(sources: List[RetrievedLegalDocument]) -> Dict[str, Any]: """ Version corrigée qui: 1. Évite de modifier le dictionnaire pendant l'itération 2. Utilise une copie des clés pour l'itération 3. Conserve tous les logs et la structure uniforme """ # 1. Ajouter tous les articles sources (déjà des objets) enrichis = {} for art in sources: enrichis[art.num] = { "article": art, "parents": [], "enfants": [], "type": art.num[0] if art.num else "?" } # 2. Pour chaque article source, récupérer les articles de même groupe hiérarchique for art in sources: # Priorité des niveaux hiérarchiques (du plus précis au plus général) hierarchy_levels = [ ("sous_paragraphe", art.sous_paragraphe), ("paragraphe", art.paragraphe), ("sous_section", art.sous_section), ("section", art.section), ("chapitre", art.chapitre) ] for level_key, level_value in hierarchy_levels: if not level_value: continue try: flt = models.Filter( must=[ models.FieldCondition( key=level_key, match=models.MatchText(text=level_value) ), models.FieldCondition( key="collection", match=models.MatchValue(value=art.collection) ) ] ) points, _ = qdrant_client.scroll( collection_name=art.collection, scroll_filter=flt, limit=100, with_payload=True, with_vectors=False, ) for point in points: payload = point.payload art_num = payload.get("num") if not art_num or art_num in enrichis: continue new_art = make_retrieved_document( payload=payload, uid=payload.get("chunk_id", str(point.id)), collection=payload.get("collection", art.collection), score=0 ) enrichis[art_num] = { "article": new_art, "parents": [], "enfants": [], "type": art_num[0] if art_num else "?" } break except Exception as e: continue # 3. Gestion des références (base_legislative) pour TOUS les articles # ✅ Solution: Itérer sur une COPIE des clés pour éviter de modifier le dict pendant l'itération for art_uid in list(enrichis.keys()): # ← COPIE des clés ! art_data = enrichis[art_uid] art = art_data["article"] for ref in art.base_legislative or []: ref_uid = ref.uid if hasattr(ref, 'uid') else ref.get('uid') if not ref_uid or ref_uid in enrichis: continue try: ref_collection = ref.collection if hasattr(ref, 'collection') else art.collection ref_points, _ = qdrant_client.scroll( collection_name=ref_collection, scroll_filter=models.Filter( must=[models.FieldCondition( key="num", match=models.MatchValue(value=ref_uid) )] ), limit=1, with_payload=True, with_vectors=False, ) if ref_points: ref_payload = ref_points[0].payload ref_art = make_retrieved_document( payload=ref_payload, uid=ref_payload.get("chunk_id", str(ref_points[0].id)), collection=ref_payload.get("collection", ref_collection), score=0 ) enrichis[ref_uid] = { # ✅ Ajout d'un nouvel élément au dict "article": ref_art, "parents": [], "enfants": [], "type": ref_uid[0] if ref_uid else "?" } # Ajout des relations parent/enfant art_data["parents"].append(enrichis[ref_uid]["article"]) enrichis[ref_uid]["enfants"].append(art_data["article"]) except Exception as e: continue return enrichis # Fonction de tri des numéros def sort_key_num(article_data: Dict[str, Any]) -> tuple: """Clé de tri pour les numéros d'article (ex: L241-3).""" num = article_data["article"].num # Accès direct à l'attribut Pydantic m = re.match(r"[LRD](\d+)(?:-(\d+))?", num) if m: base = int(m.group(1)) suffix = int(m.group(2)) if m.group(2) else 0 return (base, suffix) return (float("inf"), float("inf")) # Fonction qui ajoute un article dans l'arbre partagé selon les niveaux hiérarchiques def add_to_tree(tree: dict, article: RetrievedLegalDocument, item: dict): """ Ajoute un article dans l'arbre hiérarchique avec ses relations. - article: Objet RetrievedLegalDocument (pour la hiérarchie) - item: Dictionnaire COMPLET avec parents/enfants (pour les relations) """ # Construire la hiérarchie depuis les champs de l'article levels = [ article.collection, article.partie, article.livre, article.titre_structure, article.chapitre, article.section, article.sous_section, article.paragraphe, article.sous_paragraphe, ] # Filtrer les niveaux vides levels = [lvl for lvl in levels if lvl] # Naviguer dans l'arbre node = tree for label in levels: node = node.setdefault(label, {}) # Ajouter l'article AVEC SES RELATIONS node.setdefault("_items", []).append(item) # ✅ item contient parents/enfants # Fonction qui affiche récursivement l'arbre sous forme d'expanders def render_tree(container, node: dict, level: int = 0): """ Version simplifiée qui affiche : 1. Tous les articles dans une arborescence unique 2. Pour chaque article : uniquement "Référencé par" et "Articles cités" """ # Styles CSS container.markdown(""" """, unsafe_allow_html=True) # Affichage des articles avec tri numérique for data in sorted(node.get("_items", []), key=lambda x: [ int(part) if part.isdigit() else part for part in x["article"].num.split('-') ]): art = data["article"] with container.expander(f"📜 {art.num} - {art.titre}"): # Contenu de l'article container.markdown(f'
Veuillez entrer vos identifiants pour accéder au générateur de réponses aux questions écrites.
', unsafe_allow_html=True) # Colonnes pour réduire la largeur et centrer les champs # Ajuste les ratios pour obtenir la largeur souhaitée (ici ~25% de la page) left, center, right = st.columns([3, 2, 3]) with center: st.text_input("Nom d'utilisateur", key="username") st.text_input("Mot de passe", type="password", key="password") if st.button("Se connecter"): check_password() st.rerun() if st.session_state.authentication_status is False: st.error("Identifiants incorrects. Veuillez réessayer.") else: # Réafficher la sidebar après connexion st.markdown(""" """, unsafe_allow_html=True) ################################################################# ### ---- 3. AFFICHAGE DU SITE APRES CONNEXION --------------- ### ################################################################# # --- Initialisation de l'historique (UNIQUEMENT si non existant) --- if "full_historique" not in st.session_state: st.session_state.full_historique = {} save_historique() # Sauvegarde initiale # --- Chargement de l'historique depuis session_state (si vide, on vérifie le cache) --- if not st.session_state.full_historique and "historique_cache" in st.session_state: st.session_state.full_historique = st.session_state.historique_cache # --- Configuration de la page et CSS --- st.markdown(""" """, unsafe_allow_html=True) st.title("🏛️ Générateur de réponses aux questions écrites parlementaires") st.markdown(""" Application (version Beta) de réponse aux questions parlementaires, appuyée sur une base documentaire (embedding avec **camemBERT**), un moteur de recherche (**Tavily** ou **Google**) et le modèle **Mistral**. """) ################################################################# #### -------------------- 3a. SIDEBAR --------------------- #### ################################################################# with st.sidebar: # Bouton Déconnexion if st.button('Déconnexion', key="logout"): st.session_state.authentication_status = None st.rerun() # Message de bienvenue st.write(f'Bienvenue *{st.session_state["name"]}*') # --- Version du site --- st.markdown( """