| import os |
| import streamlit as st |
| import chromadb |
| from chromadb import Documents, EmbeddingFunction, Embeddings |
| import PyPDF2 |
| import zipfile |
|
|
|
|
| import google.generativeai as genai |
| from google.generativeai import types |
| from google.api_core import retry |
| from dotenv import load_dotenv |
| import gdown |
|
|
|
|
|
|
| |
| from PIL import Image |
| import base64 |
|
|
| |
| with st.sidebar: |
| st.image("https://drive.google.com/uc?export=view&id=17-9xtpS0LSToUJBky4_O11JOj5F3tysI", width=180) |
| st.markdown("<h2 style='text-align:center;color:#2d6a4f;'>BiblioIA</h2>", unsafe_allow_html=True) |
|
|
| |
| st.markdown(''' |
| <style> |
| .main {background-color: #f8f9fa;} |
| .stButton>button {background-color: #2d6a4f; color:white; border-radius:8px;} |
| .stDownloadButton>button {background-color: #40916c; color:white; border-radius:8px;} |
| .stTextInput>div>div>input {background-color: #fff !important; color: #222 !important; border: 1.5px solid #2d6a4f; border-radius: 6px; font-size: 1.05em;} |
| .stMarkdown h2, .stMarkdown h3 {color: #2d6a4f;} |
| </style> |
| ''', unsafe_allow_html=True) |
|
|
| |
| load_dotenv() |
| PERSIST_DIRECTORY = "chroma_db_bible_NEW" |
| INDEX_LOCAL_DIR = PERSIST_DIRECTORY |
|
|
| |
| INDEX_ZIP_URL = "https://huggingface.co/datasets/Sidoineko/kolaTech/resolve/main/chroma_db_bible_NEW.zip" |
| INDEX_ZIP_PATH = "chroma_db_bible_NEW.zip" |
|
|
| |
| if os.path.exists(INDEX_LOCAL_DIR): |
| st.sidebar.success("Index chargé ✅") |
| else: |
| st.sidebar.error("Index absent ❌") |
|
|
| |
| if "qa_history" not in st.session_state: |
| st.session_state.qa_history = [] |
|
|
| |
| if st.session_state.qa_history: |
| st.markdown("<h4>Historique de session</h4>", unsafe_allow_html=True) |
| for idx, (q, a, fb) in enumerate(reversed(st.session_state.qa_history[-5:])): |
| st.markdown(f"<b>Q:</b> {q}", unsafe_allow_html=True) |
| st.markdown(f"<b>R:</b> {a}", unsafe_allow_html=True) |
| if fb: |
| st.markdown(f"Feedback : {'👍' if fb==1 else '👎'}", unsafe_allow_html=True) |
| st.markdown("<hr>") |
|
|
|
|
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") or st.secrets.get("GOOGLE_API_KEY") |
| if not GOOGLE_API_KEY: |
| st.error("Clé API Google non trouvée. Définir GOOGLE_API_KEY dans les secrets Hugging Face.") |
| st.stop() |
|
|
| DB_COLLECTION_NAME = "bible_rag_collection" |
| EMBEDDING_MODEL_NAME = "models/text-embedding-004" |
| LLM_MODEL_NAME = "models/gemini-1.5-flash-latest" |
| PDF_PATH = "bible_english.pdf" |
| |
| |
|
|
| |
| INDEX_DRIVE_FOLDER_URL = "https://drive.google.com/drive/folders/1BmvOh8eCcvC6QKwutcBOKLVi_c2VRJrj?usp=sharing" |
|
|
| def check_index_zip(): |
| zip_path = "chroma_db_bible_NEW.zip" |
|
|
| if not os.path.exists(INDEX_LOCAL_DIR): |
| st.warning("🗂️ Aucun index local trouvé. Téléchargement de l'index depuis Hugging Face... ⏬") |
| try: |
| gdown.download(INDEX_ZIP_URL, INDEX_ZIP_PATH, quiet=False, fuzzy=True) |
| with zipfile.ZipFile(INDEX_ZIP_PATH, 'r') as zip_ref: |
| zip_ref.extractall(INDEX_LOCAL_DIR) |
| st.success(f"✅ Index téléchargé et extrait dans {INDEX_LOCAL_DIR}. Prêt à répondre aux questions ! 🚀") |
| except Exception as e: |
| st.error(f"❌ Erreur lors du téléchargement ou de l'extraction de l'index ChromaDB : {e}") |
| st.stop() |
| else: |
| pass |
|
|
| def download_from_gdrive(gdrive_url, dest_path): |
| import re |
| import requests |
| file_id = re.search(r'/d/([a-zA-Z0-9_-]+)', gdrive_url).group(1) |
| download_url = f"https://drive.google.com/uc?export=download&id={file_id}" |
| with requests.get(download_url, stream=True) as r: |
| r.raise_for_status() |
| total_length = int(r.headers.get('content-length', 0)) |
| chunk_size = 8192 |
| downloaded = 0 |
| progress_bar = st.progress(0, text="Téléchargement du PDF en cours...") |
| with open(dest_path, "wb") as f: |
| for chunk in r.iter_content(chunk_size=chunk_size): |
| if chunk: |
| f.write(chunk) |
| downloaded += len(chunk) |
| if total_length > 0: |
| progress = min(downloaded / total_length, 1.0) |
| progress_bar.progress(progress, text=f"Téléchargement du PDF : {int(progress*100)}%") |
| progress_bar.progress(1.0, text="Téléchargement terminé !") |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| try: |
| genai.configure(api_key=GOOGLE_API_KEY) |
| except Exception as e: |
| st.error(f"Erreur lors de la configuration de l'API Google Generative : {e}") |
| st.stop() |
|
|
| |
| def safe_api_call(callable_fn, *args, **kwargs): |
| try: |
| return callable_fn(*args, **kwargs) |
| except Exception as e: |
| import traceback |
| st.error( |
| "\n".join([ |
| "Erreur de connexion à l’API Google (ex : 503 Service Unavailable, Timeout, etc.).", |
| "Vérifie ta connexion Internet, ta clé API, ou réessaie plus tard.", |
| "", |
| f"Détail technique : {e}", |
| "", |
| "Traceback :", |
| traceback.format_exc() |
| ]) |
| ) |
| st.stop() |
|
|
| |
| is_retriable = lambda e: hasattr(e, 'reason') and e.reason in ['RATE_LIMIT_EXCEEDED', 'SERVICE_UNAVAILABLE', 'INTERNAL'] |
|
|
| class GeminiEmbeddingFunction(EmbeddingFunction): |
| def __init__(self, model_name=EMBEDDING_MODEL_NAME, task_type="retrieval_document"): |
| self._model_name = model_name |
| self._task_type = task_type |
|
|
| @retry.Retry(predicate=is_retriable, initial=1.0, maximum=15.0, multiplier=2.0) |
| def embed_content_with_retry(self, text: str): |
| return genai.embed_content(model=self._model_name, content=text, task_type=self._task_type) |
|
|
| def __call__(self, input_texts: Documents) -> Embeddings: |
| embeddings = [] |
| for doc in input_texts: |
| try: |
| response = self.embed_content_with_retry(doc) |
| embeddings.append(response['embedding']) |
| except Exception as e: |
| st.error(f"Erreur embedding: {e}") |
| raise e |
| return embeddings |
|
|
| def set_task_type(self, task_type: str): |
| self._task_type = task_type |
|
|
| @st.cache_resource |
| def get_embedding_function_instance(): |
| return GeminiEmbeddingFunction(model_name=EMBEDDING_MODEL_NAME) |
|
|
| @st.cache_resource |
| def extract_and_chunk_pdf(pdf_path, chunk_size=1000): |
| import pdfplumber, time |
| start_time = time.time() |
| try: |
| with pdfplumber.open(pdf_path) as pdf: |
| pages = [page.extract_text() or "" for page in pdf.pages] |
| full_text = "\n".join(pages) |
| extract_info = f"Texte extrait avec pdfplumber ({len(pages)} pages) en {time.time()-start_time:.2f}s." |
| except Exception as e: |
| import PyPDF2 |
| with open(pdf_path, "rb") as f: |
| reader = PyPDF2.PdfReader(f) |
| full_text = "\n".join(page.extract_text() for page in reader.pages if page.extract_text()) |
| extract_info = f"pdfplumber a échoué ({e}), fallback PyPDF2. Extraction en {time.time()-start_time:.2f}s." |
| chunks = [full_text[i:i+chunk_size] for i in range(0, len(full_text), chunk_size)] |
| return chunks, extract_info, len(full_text) |
|
|
| |
| |
| index_all_chunks = st.checkbox("Indexer tout le PDF (ignorer la limite de chunks)") |
| if index_all_chunks: |
| max_chunks = None |
| st.info("Tous les chunks du PDF seront indexés. Cela peut prendre plusieurs minutes sur un gros fichier.") |
| else: |
| max_chunks = st.number_input( |
| "Nombre maximal de chunks à indexer", |
| min_value=1, |
| max_value=10000, |
| value=5000, |
| step=1 |
| ) |
|
|
| embed_fn = get_embedding_function_instance() |
| chunk_size = 1000 |
| if not os.path.exists(PERSIST_DIRECTORY): |
| |
| chunks, extract_info, text_len = extract_and_chunk_pdf(PDF_PATH, chunk_size) |
| st.info(extract_info) |
| st.write(f"Taille du texte extrait : {text_len} caractères") |
| st.write(f"Nombre de chunks extraits : {len(chunks)}") |
| if len(chunks) > 0: |
| st.write("Exemple chunk 1:", chunks[0][:300]) |
| if len(chunks) > 1: |
| st.write("Exemple chunk 2:", chunks[1][:300]) |
| if max_chunks is not None and max_chunks > 0: |
| chunks = chunks[:max_chunks] |
| st.write(f"Limite max_chunks appliquée : {len(chunks)} chunks à indexer") |
| client = chromadb.PersistentClient(path=PERSIST_DIRECTORY) |
| collection = client.create_collection(name=DB_COLLECTION_NAME, embedding_function=embed_fn) |
| batch_size = 200 |
| total = len(chunks) |
| st.info(f"Indexation en batchs de {batch_size} chunks...") |
| progress_bar = st.progress(0, text="Indexation des chunks en cours...") |
| for start in range(0, total, batch_size): |
| end = min(start + batch_size, total) |
| batch_chunks = chunks[start:end] |
| batch_ids = [f"chunk_{i}" for i in range(start, end)] |
| collection.add(documents=batch_chunks, ids=batch_ids) |
| st.write(f"Batch indexé : {start} à {end}") |
| progress = (end) / total |
| if progress_bar is not None: |
| progress_bar.progress(progress, text=f"Chunks {end}/{total} indexés...") |
| if progress_bar is not None: |
| progress_bar.progress(1.0, text="Indexation terminée !") |
| st.success(f"Indexation terminée : {total} chunks indexés dans la collection '{DB_COLLECTION_NAME}' !") |
| else: |
| |
| |
| import os |
| try: |
| |
| pass |
| except Exception as e: |
| st.warning(f"Impossible de lister le contenu du dossier d'index : {e}") |
| client = chromadb.PersistentClient(path=PERSIST_DIRECTORY) |
| |
| try: |
| all_collections = client.list_collections() |
| |
| except Exception as e: |
| st.warning(f"Impossible de lister les collections ChromaDB : {e}") |
| collection = client.get_collection(name=DB_COLLECTION_NAME, embedding_function=embed_fn) |
|
|
| @st.cache_resource |
| def get_generative_model(_model_name): |
| try: |
| return genai.GenerativeModel(_model_name) |
| except Exception as e: |
| st.error(f"Erreur initialisation modèle génératif: {e}") |
| return None |
|
|
| def ask_bible_streamlit(question: str, collection, embed_fn, gen_model, n_results: int = 3): |
| if not question: |
| return None, None |
| embed_fn.set_task_type("retrieval_query") |
| question_emb = embed_fn([question])[0] |
| results = collection.query(query_embeddings=[question_emb], n_results=n_results) |
| if not results["documents"] or len(results["documents"][0]) == 0: |
| return None, None |
| context = "\n\n".join(results["documents"][0]) |
| gen_prompt = f"Voici des extraits de la Bible :\n{context}\n\nQuestion : {question}\nRéponse :" |
| response = gen_model.generate_content(gen_prompt) |
| return response.text, context |
|
|
| |
| st.markdown(""" |
| <div style='text-align:center;'> |
| <img src='logo_kola.jpg' width='120'> |
| </div> |
| """, unsafe_allow_html=True) |
| st.markdown("<h1 style='text-align:center; color:#2d8659;'>❓ BiblioIA</h1>", unsafe_allow_html=True) |
| st.markdown("<p style='text-align:center; color:#444;'>📖 Posez une question sur la Bible (en anglais)</p>", unsafe_allow_html=True) |
| st.markdown("<hr style='margin-top:1em;margin-bottom:1em'>", unsafe_allow_html=True) |
|
|
| with st.sidebar: |
| st.image("logo_kola.jpg", width=100) |
| st.markdown("<h3 style='text-align:center;'>⚙️ Configuration</h3>", unsafe_allow_html=True) |
| st.info("Ce Space indexe le PDF à la première exécution (quelques minutes). Clé API Gemini requise dans les secrets.") |
| n_results = st.slider("Nombre de passages à utiliser", 1, 5, 3) |
| st.markdown("---") |
| st.markdown("<h4>📤 Exporter vos résultats</h4>", unsafe_allow_html=True) |
| export_format = st.radio("Format d'export", ["Aucun", "CSV", "Texte"], index=0) |
| st.markdown("---") |
| if st.button("Aide / FAQ"): |
| st.info("- Posez votre question en anglais.\n- Choisissez le nombre de passages à utiliser pour la réponse.\n- Vous pouvez exporter les résultats après génération.") |
| st.markdown(""" |
| <div style='font-size:13px;'> |
| <b>Besoin d'aide ?</b><br> |
| Contact : <a href='mailto:sidoineko@gmail.com'>sidoineko@gmail.com</a><br> |
| Nom : YEBADOKPO SIDOINE<br> |
| LinkedIn : <a href='https://www.linkedin.com/in/sidoineko' target='_blank'>www.linkedin.com/in/sidoineko</a><br> |
| Tél : +229 0196911346 |
| </div> |
| """, unsafe_allow_html=True) |
|
|
| question = st.text_input("Votre question sur la Bible (en anglais)", |
| placeholder="Ex: Who was Moses?", |
| key="main_question_box", |
| label_visibility="visible" |
| ) |
|
|
| embed_fn = get_embedding_function_instance() |
| |
| |
|
|
| gen_model = get_generative_model(LLM_MODEL_NAME) |
|
|
| if st.button("Envoyer"): |
| if not question: |
| st.warning("Entrez une question.") |
| st.stop() |
| with st.spinner("Recherche et génération de la réponse..."): |
| answer, context = ask_bible_streamlit(question, collection, embed_fn, gen_model, n_results) |
| if answer: |
| st.success(answer) |
| with st.expander("Passages utilisés"): |
| st.write(context) |
| |
| st.markdown("<hr>", unsafe_allow_html=True) |
| st.markdown("<b>📤 Exporter ce résultat :</b>", unsafe_allow_html=True) |
| if export_format == "Aucun": |
| st.warning("Sélectionnez un format d'export dans le menu de gauche pour activer le bouton d'export.") |
| else: |
| if export_format == "CSV": |
| import pandas as pd |
| df = pd.DataFrame({"Réponse":[answer], "Passages":[context]}) |
| st.download_button("⬇️ Télécharger la réponse (CSV)", df.to_csv(index=False).encode(), "reponse_bible.csv", "text/csv", key="dl_csv") |
| elif export_format == "Texte": |
| txt = f"Réponse :\n{answer}\n\nPassages utilisés :\n{context}" |
| st.download_button("⬇️ Télécharger la réponse (TXT)", txt, "reponse_bible.txt", "text/plain", key="dl_txt") |
| else: |
| st.error("Aucune réponse trouvée ou erreur lors de la génération.") |
|
|
| |
| from datetime import datetime |
| st.markdown("<hr style='margin-top:2em;margin-bottom:1em'>", unsafe_allow_html=True) |
| st.markdown("<div style='text-align:center; color:gray;'>© Kola Tech<br>Dernière mise à jour : {} </div>".format(datetime.now().strftime('%Y-%m-%d %H:%M')), unsafe_allow_html=True) |
|
|
|
|