biblioIA / app.py
Sidoine1991
Change titre principal : ❓ BiblioIA
2044402
import os
import streamlit as st
import chromadb
from chromadb import Documents, EmbeddingFunction, Embeddings
import PyPDF2
import zipfile
import google.generativeai as genai
from google.generativeai import types
from google.api_core import retry
from dotenv import load_dotenv
import gdown
# --- Personnalisation de l’interface ---
from PIL import Image
import base64
# Sidebar logo
with st.sidebar:
st.image("https://drive.google.com/uc?export=view&id=17-9xtpS0LSToUJBky4_O11JOj5F3tysI", width=180)
st.markdown("<h2 style='text-align:center;color:#2d6a4f;'>BiblioIA</h2>", unsafe_allow_html=True)
# Couleurs personnalisées via CSS
st.markdown('''
<style>
.main {background-color: #f8f9fa;}
.stButton>button {background-color: #2d6a4f; color:white; border-radius:8px;}
.stDownloadButton>button {background-color: #40916c; color:white; border-radius:8px;}
.stTextInput>div>div>input {background-color: #fff !important; color: #222 !important; border: 1.5px solid #2d6a4f; border-radius: 6px; font-size: 1.05em;}
.stMarkdown h2, .stMarkdown h3 {color: #2d6a4f;}
</style>
''', unsafe_allow_html=True)
# --- Config ---
load_dotenv()
PERSIST_DIRECTORY = "chroma_db_bible_NEW"
INDEX_LOCAL_DIR = PERSIST_DIRECTORY
# --- ChromaDB Index Google Drive Download ---
INDEX_ZIP_URL = "https://huggingface.co/datasets/Sidoineko/kolaTech/resolve/main/chroma_db_bible_NEW.zip"
INDEX_ZIP_PATH = "chroma_db_bible_NEW.zip"
# --- Statut index ---
if os.path.exists(INDEX_LOCAL_DIR):
st.sidebar.success("Index chargé ✅")
else:
st.sidebar.error("Index absent ❌")
# --- Historique Q/R ---
if "qa_history" not in st.session_state:
st.session_state.qa_history = []
# --- Affichage historique ---
if st.session_state.qa_history:
st.markdown("<h4>Historique de session</h4>", unsafe_allow_html=True)
for idx, (q, a, fb) in enumerate(reversed(st.session_state.qa_history[-5:])):
st.markdown(f"<b>Q:</b> {q}", unsafe_allow_html=True)
st.markdown(f"<b>R:</b> {a}", unsafe_allow_html=True)
if fb:
st.markdown(f"Feedback : {'👍' if fb==1 else '👎'}", unsafe_allow_html=True)
st.markdown("<hr>")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") or st.secrets.get("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
st.error("Clé API Google non trouvée. Définir GOOGLE_API_KEY dans les secrets Hugging Face.")
st.stop()
DB_COLLECTION_NAME = "bible_rag_collection"
EMBEDDING_MODEL_NAME = "models/text-embedding-004"
LLM_MODEL_NAME = "models/gemini-1.5-flash-latest"
PDF_PATH = "bible_english.pdf"
# GDRIVE_URL = "https://drive.google.com/file/d/11HP55cJJJluaNEfn2u0yznwFOmpmOEky/view?usp=sharing"
# Lien PDF supprimé à la demande de l'utilisateur
# Préparation pour gestion de l'index via Google Drive
INDEX_DRIVE_FOLDER_URL = "https://drive.google.com/drive/folders/1BmvOh8eCcvC6QKwutcBOKLVi_c2VRJrj?usp=sharing"
def check_index_zip():
zip_path = "chroma_db_bible_NEW.zip"
if not os.path.exists(INDEX_LOCAL_DIR):
st.warning("🗂️ Aucun index local trouvé. Téléchargement de l'index depuis Hugging Face... ⏬")
try:
gdown.download(INDEX_ZIP_URL, INDEX_ZIP_PATH, quiet=False, fuzzy=True)
with zipfile.ZipFile(INDEX_ZIP_PATH, 'r') as zip_ref:
zip_ref.extractall(INDEX_LOCAL_DIR)
st.success(f"✅ Index téléchargé et extrait dans {INDEX_LOCAL_DIR}. Prêt à répondre aux questions ! 🚀")
except Exception as e:
st.error(f"❌ Erreur lors du téléchargement ou de l'extraction de l'index ChromaDB : {e}")
st.stop()
else:
pass
def download_from_gdrive(gdrive_url, dest_path):
import re
import requests
file_id = re.search(r'/d/([a-zA-Z0-9_-]+)', gdrive_url).group(1)
download_url = f"https://drive.google.com/uc?export=download&id={file_id}"
with requests.get(download_url, stream=True) as r:
r.raise_for_status()
total_length = int(r.headers.get('content-length', 0))
chunk_size = 8192
downloaded = 0
progress_bar = st.progress(0, text="Téléchargement du PDF en cours...")
with open(dest_path, "wb") as f:
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_length > 0:
progress = min(downloaded / total_length, 1.0)
progress_bar.progress(progress, text=f"Téléchargement du PDF : {int(progress*100)}%")
progress_bar.progress(1.0, text="Téléchargement terminé !")
# Désactivation de la vérification du PDF car l'index ChromaDB est déjà présent
# if not os.path.exists(PDF_PATH):
# st.warning(
# "Impossible de télécharger automatiquement le PDF. "
# "Merci de le déposer manuellement dans le dossier d'exécution sous le nom 'bible_english.pdf'."
# )
# st.stop()
# else:
# # st.info(f"PDF trouvé localement : {PDF_PATH}")
# pass
try:
genai.configure(api_key=GOOGLE_API_KEY)
except Exception as e:
st.error(f"Erreur lors de la configuration de l'API Google Generative : {e}")
st.stop()
# --- Gestion d’erreur réseau globale pour les appels API ---
def safe_api_call(callable_fn, *args, **kwargs):
try:
return callable_fn(*args, **kwargs)
except Exception as e:
import traceback
st.error(
"\n".join([
"Erreur de connexion à l’API Google (ex : 503 Service Unavailable, Timeout, etc.).",
"Vérifie ta connexion Internet, ta clé API, ou réessaie plus tard.",
"",
f"Détail technique : {e}",
"",
"Traceback :",
traceback.format_exc()
])
)
st.stop()
# Correction : ne référence pas genai.APIError qui n’existe pas
is_retriable = lambda e: hasattr(e, 'reason') and e.reason in ['RATE_LIMIT_EXCEEDED', 'SERVICE_UNAVAILABLE', 'INTERNAL']
class GeminiEmbeddingFunction(EmbeddingFunction):
def __init__(self, model_name=EMBEDDING_MODEL_NAME, task_type="retrieval_document"):
self._model_name = model_name
self._task_type = task_type
@retry.Retry(predicate=is_retriable, initial=1.0, maximum=15.0, multiplier=2.0)
def embed_content_with_retry(self, text: str):
return genai.embed_content(model=self._model_name, content=text, task_type=self._task_type)
def __call__(self, input_texts: Documents) -> Embeddings:
embeddings = []
for doc in input_texts:
try:
response = self.embed_content_with_retry(doc)
embeddings.append(response['embedding'])
except Exception as e:
st.error(f"Erreur embedding: {e}")
raise e
return embeddings
def set_task_type(self, task_type: str):
self._task_type = task_type
@st.cache_resource
def get_embedding_function_instance():
return GeminiEmbeddingFunction(model_name=EMBEDDING_MODEL_NAME)
@st.cache_resource
def extract_and_chunk_pdf(pdf_path, chunk_size=1000):
import pdfplumber, time
start_time = time.time()
try:
with pdfplumber.open(pdf_path) as pdf:
pages = [page.extract_text() or "" for page in pdf.pages]
full_text = "\n".join(pages)
extract_info = f"Texte extrait avec pdfplumber ({len(pages)} pages) en {time.time()-start_time:.2f}s."
except Exception as e:
import PyPDF2
with open(pdf_path, "rb") as f:
reader = PyPDF2.PdfReader(f)
full_text = "\n".join(page.extract_text() for page in reader.pages if page.extract_text())
extract_info = f"pdfplumber a échoué ({e}), fallback PyPDF2. Extraction en {time.time()-start_time:.2f}s."
chunks = [full_text[i:i+chunk_size] for i in range(0, len(full_text), chunk_size)]
return chunks, extract_info, len(full_text)
# --- Dans le script principal (remplace l'appel à ensure_chroma_index) ---
# Option : indexer tout le PDF (ignorer la limite de chunks)
index_all_chunks = st.checkbox("Indexer tout le PDF (ignorer la limite de chunks)")
if index_all_chunks:
max_chunks = None
st.info("Tous les chunks du PDF seront indexés. Cela peut prendre plusieurs minutes sur un gros fichier.")
else:
max_chunks = st.number_input(
"Nombre maximal de chunks à indexer",
min_value=1,
max_value=10000,
value=5000,
step=1
)
embed_fn = get_embedding_function_instance()
chunk_size = 1000
if not os.path.exists(PERSIST_DIRECTORY):
# Extraction et indexation UNIQUEMENT si l’index n’existe pas
chunks, extract_info, text_len = extract_and_chunk_pdf(PDF_PATH, chunk_size)
st.info(extract_info)
st.write(f"Taille du texte extrait : {text_len} caractères")
st.write(f"Nombre de chunks extraits : {len(chunks)}")
if len(chunks) > 0:
st.write("Exemple chunk 1:", chunks[0][:300])
if len(chunks) > 1:
st.write("Exemple chunk 2:", chunks[1][:300])
if max_chunks is not None and max_chunks > 0:
chunks = chunks[:max_chunks]
st.write(f"Limite max_chunks appliquée : {len(chunks)} chunks à indexer")
client = chromadb.PersistentClient(path=PERSIST_DIRECTORY)
collection = client.create_collection(name=DB_COLLECTION_NAME, embedding_function=embed_fn)
batch_size = 200
total = len(chunks)
st.info(f"Indexation en batchs de {batch_size} chunks...")
progress_bar = st.progress(0, text="Indexation des chunks en cours...")
for start in range(0, total, batch_size):
end = min(start + batch_size, total)
batch_chunks = chunks[start:end]
batch_ids = [f"chunk_{i}" for i in range(start, end)]
collection.add(documents=batch_chunks, ids=batch_ids)
st.write(f"Batch indexé : {start} à {end}")
progress = (end) / total
if progress_bar is not None:
progress_bar.progress(progress, text=f"Chunks {end}/{total} indexés...")
if progress_bar is not None:
progress_bar.progress(1.0, text="Indexation terminée !")
st.success(f"Indexation terminée : {total} chunks indexés dans la collection '{DB_COLLECTION_NAME}' !")
else:
# L’index existe déjà, on ne touche pas au PDF
# Diagnostic : afficher le contenu du dossier d'index
import os
try:
# st.info(f"Contenu du dossier index : {os.listdir(PERSIST_DIRECTORY)}") # Debug (désactivé pour l'utilisateur final)
pass
except Exception as e:
st.warning(f"Impossible de lister le contenu du dossier d'index : {e}")
client = chromadb.PersistentClient(path=PERSIST_DIRECTORY)
# Debug: list all collections
try:
all_collections = client.list_collections()
# st.info(f"Collections trouvées dans la base ChromaDB : {[c.name for c in all_collections]}") # Debug (désactivé pour l'utilisateur final)
except Exception as e:
st.warning(f"Impossible de lister les collections ChromaDB : {e}")
collection = client.get_collection(name=DB_COLLECTION_NAME, embedding_function=embed_fn)
@st.cache_resource
def get_generative_model(_model_name):
try:
return genai.GenerativeModel(_model_name)
except Exception as e:
st.error(f"Erreur initialisation modèle génératif: {e}")
return None
def ask_bible_streamlit(question: str, collection, embed_fn, gen_model, n_results: int = 3):
if not question:
return None, None
embed_fn.set_task_type("retrieval_query")
question_emb = embed_fn([question])[0]
results = collection.query(query_embeddings=[question_emb], n_results=n_results)
if not results["documents"] or len(results["documents"][0]) == 0:
return None, None
context = "\n\n".join(results["documents"][0])
gen_prompt = f"Voici des extraits de la Bible :\n{context}\n\nQuestion : {question}\nRéponse :"
response = gen_model.generate_content(gen_prompt)
return response.text, context
# --- Interface Streamlit ---
st.markdown("""
<div style='text-align:center;'>
<img src='logo_kola.jpg' width='120'>
</div>
""", unsafe_allow_html=True)
st.markdown("<h1 style='text-align:center; color:#2d8659;'>❓ BiblioIA</h1>", unsafe_allow_html=True)
st.markdown("<p style='text-align:center; color:#444;'>📖 Posez une question sur la Bible (en anglais)</p>", unsafe_allow_html=True)
st.markdown("<hr style='margin-top:1em;margin-bottom:1em'>", unsafe_allow_html=True)
with st.sidebar:
st.image("logo_kola.jpg", width=100)
st.markdown("<h3 style='text-align:center;'>⚙️ Configuration</h3>", unsafe_allow_html=True)
st.info("Ce Space indexe le PDF à la première exécution (quelques minutes). Clé API Gemini requise dans les secrets.")
n_results = st.slider("Nombre de passages à utiliser", 1, 5, 3)
st.markdown("---")
st.markdown("<h4>📤 Exporter vos résultats</h4>", unsafe_allow_html=True)
export_format = st.radio("Format d'export", ["Aucun", "CSV", "Texte"], index=0)
st.markdown("---")
if st.button("Aide / FAQ"):
st.info("- Posez votre question en anglais.\n- Choisissez le nombre de passages à utiliser pour la réponse.\n- Vous pouvez exporter les résultats après génération.")
st.markdown("""
<div style='font-size:13px;'>
<b>Besoin d'aide ?</b><br>
Contact : <a href='mailto:sidoineko@gmail.com'>sidoineko@gmail.com</a><br>
Nom : YEBADOKPO SIDOINE<br>
LinkedIn : <a href='https://www.linkedin.com/in/sidoineko' target='_blank'>www.linkedin.com/in/sidoineko</a><br>
Tél : +229 0196911346
</div>
""", unsafe_allow_html=True)
question = st.text_input("Votre question sur la Bible (en anglais)",
placeholder="Ex: Who was Moses?",
key="main_question_box",
label_visibility="visible"
)
embed_fn = get_embedding_function_instance()
# La variable collection est déjà définie plus haut selon la présence de l’index
# (Suppression de l’appel à ensure_chroma_index)
gen_model = get_generative_model(LLM_MODEL_NAME)
if st.button("Envoyer"):
if not question:
st.warning("Entrez une question.")
st.stop()
with st.spinner("Recherche et génération de la réponse..."):
answer, context = ask_bible_streamlit(question, collection, embed_fn, gen_model, n_results)
if answer:
st.success(answer)
with st.expander("Passages utilisés"):
st.write(context)
# --- Export options ---
st.markdown("<hr>", unsafe_allow_html=True)
st.markdown("<b>📤 Exporter ce résultat :</b>", unsafe_allow_html=True)
if export_format == "Aucun":
st.warning("Sélectionnez un format d'export dans le menu de gauche pour activer le bouton d'export.")
else:
if export_format == "CSV":
import pandas as pd
df = pd.DataFrame({"Réponse":[answer], "Passages":[context]})
st.download_button("⬇️ Télécharger la réponse (CSV)", df.to_csv(index=False).encode(), "reponse_bible.csv", "text/csv", key="dl_csv")
elif export_format == "Texte":
txt = f"Réponse :\n{answer}\n\nPassages utilisés :\n{context}"
st.download_button("⬇️ Télécharger la réponse (TXT)", txt, "reponse_bible.txt", "text/plain", key="dl_txt")
else:
st.error("Aucune réponse trouvée ou erreur lors de la génération.")
# --- Footer copyright et date de mise à jour ---
from datetime import datetime
st.markdown("<hr style='margin-top:2em;margin-bottom:1em'>", unsafe_allow_html=True)
st.markdown("<div style='text-align:center; color:gray;'>© Kola Tech<br>Dernière mise à jour : {} </div>".format(datetime.now().strftime('%Y-%m-%d %H:%M')), unsafe_allow_html=True)