RAG_APP / src /utils /search_docs_utils.py
sxid003's picture
Update src/utils/search_docs_utils.py
5c94786 verified
import os
import csv
import json
import numpy as np
from sentence_transformers import SentenceTransformer, util
from langdetect import detect
from typing import List, Dict
import functools
from src.configs.config import EMBEDDING_MODEL, RAW_CSV, METADATA_FILE, OUTPUT_DIR
import pandas as pd
from src.models.llm_wrapper import GeminiWrapper
# ==========================================
# Default paths for working files
# ==========================================
#CSV_PATH = os.path.join("dataset", "docs_metadata.csv")
EMBEDDINGS_PATH_TITLE_CAT = os.path.join(OUTPUT_DIR, "title_cat_embeddings.npy")
METADATA_PATH_TITLE_CAT = os.path.join(OUTPUT_DIR, "title_cat_metadatas.json")
PARLEMENT_EMBEDDINGS_PATH = os.path.join(OUTPUT_DIR, "parlement_titledate_embeddings.npy")
PARLEMENT_METADATA_PATH = os.path.join(OUTPUT_DIR, "parlement_titledate_metadatas.json")
# Load embeddings and metadata if they exist
try:
embeddings = np.load(EMBEDDINGS_PATH_TITLE_CAT)
with open(METADATA_PATH_TITLE_CAT, encoding="utf-8") as f:
metadatas = json.load(f)
#logging.info("Loaded document embeddings and metadata")
except FileNotFoundError:
embeddings = None
metadatas = None
#logging.warning("Document embeddings not found. Run preprocessing first.")
try:
parlement_embeddings = np.load(PARLEMENT_EMBEDDINGS_PATH)
with open(PARLEMENT_METADATA_PATH, encoding="utf-8") as f:
parlement_metadatas = json.load(f)
#logging.info("Loaded parliamentary embeddings and metadata")
except FileNotFoundError:
parlement_embeddings = None
parlement_metadatas = None
#logging.warning("Parliamentary embeddings not found. Run preprocessing first.")
@functools.lru_cache(maxsize=1)
def get_model():
"""
Load the SentenceTransformer embedding model only once per process.
Returns:
SentenceTransformer: the loaded model
"""
return SentenceTransformer(EMBEDDING_MODEL)
def detect_language(text: str) -> str:
"""
Detect the main language of a text ('fr', 'ar', or 'unknown').
Uses the langdetect library.
Args:
text (str): The text to analyze.
Returns:
str: Detected language code ('fr', 'ar', 'unknown')
"""
try:
lang = detect(text)
return lang
except Exception:
return 'unknown'
def preprocess_and_save_documents(csv_path=METADATA_FILE, embeddings_path=EMBEDDINGS_PATH_TITLE_CAT, metadata_path=METADATA_PATH_TITLE_CAT):
"""
Preprocess a CSV, compute embeddings for title+category, and save embeddings and metadata.
Args:
csv_path (str): Path to the source CSV.
embeddings_path (str): Path to save the embeddings (npy).
metadata_path (str): Path to save the metadata (json).
"""
model = get_model()
embeddings = []
metadatas = []
with open(csv_path, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
text = row['Nom du document'] + ' ' + row['Catégorie']
emb = model.encode(text)
embeddings.append(emb)
metadatas.append({
"Nom du document": row['Nom du document'],
"Lien": row['Lien'],
"Catégorie": row['Catégorie'],
"Langue": row['Langue'],
"Id": row['Id']
})
os.makedirs(os.path.dirname(embeddings_path), exist_ok=True)
np.save(embeddings_path, np.array(embeddings))
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(metadatas, f, ensure_ascii=False, indent=2)
print(f"Saved {len(embeddings)} embeddings and metadatas.")
def filter_by_language(metadatas: List[Dict], lang: str) -> List[int]:
"""
Return the indices of documents whose language matches 'lang'.
Args:
metadatas (List[Dict]): List of metadata dicts.
lang (str): Language code ('fr', 'ar', ...)
Returns:
List[int]: Indices of filtered documents
"""
return [i for i, doc in enumerate(metadatas) if doc['Langue'].strip().lower() != 'unknown' ]
def select_documents(query: str, embeddings: np.ndarray, metadatas: List[Dict], lang: str = None, top_k: int = 5) -> List[Dict]:
"""
Select the most relevant documents for the query, optionally filtered by language.
Uses cosine similarity between the query embedding and document embeddings.
A bonus is added if the category or title appears in the query.
Args:
query (str): User query
embeddings (np.ndarray): Document embeddings
metadatas (List[Dict]): Associated metadata
lang (str, optional): Language to filter
top_k (int): Number of results to return
Returns:
List[Dict]: Most relevant documents
"""
model = get_model()
query_emb = model.encode(query)
# Filter by language if specified
if lang:
indices = filter_by_language(metadatas, lang)
filtered_embeddings = embeddings[indices]
filtered_metadatas = [metadatas[i] for i in indices]
else:
filtered_embeddings = embeddings
filtered_metadatas = metadatas
import torch
embeddings_tensor = torch.tensor(filtered_embeddings)
query_tensor = torch.tensor(query_emb)
cos_sim = util.cos_sim(query_tensor, embeddings_tensor)[0]
query_lower = query.lower()
scored_docs = []
for idx, sim in enumerate(cos_sim.tolist()):
doc = filtered_metadatas[idx]
category = doc['Catégorie'].lower()
title = doc['Nom du document'].lower()
bonus = 0
if category in query_lower:
bonus += 0.3
if title in query_lower:
bonus += 0.3
scored_docs.append((sim + bonus, doc))
scored_docs.sort(reverse=True, key=lambda x: x[0])
return [doc for score, doc in scored_docs[:top_k] if score > 0.3]
def find_relevant_documents(query: str, embeddings_path=EMBEDDINGS_PATH_TITLE_CAT, metadata_path=METADATA_PATH_TITLE_CAT, top_k: int = 5) -> List[Dict]:
"""
Full pipeline: detect language, filter documents, select the most relevant ones.
Args:
query (str): User query
embeddings_path (str): Path to embeddings
metadata_path (str): Path to metadata
top_k (int): Number of results
Returns:
List[Dict]: Most relevant documents
"""
embeddings = np.load(embeddings_path)
with open(metadata_path, encoding="utf-8") as f:
metadatas = json.load(f)
lang = detect_language(query)
print(f"Detected query language: {lang}")
return select_documents(query, embeddings, metadatas, lang=lang, top_k=top_k)
def preprocess_and_save_parlement(csv_path=None, embeddings_path=None, metadata_path=None):
"""
Preprocess the parliamentary transcript CSV, compute embeddings for title+date, and save embeddings and metadata (without subtitle).
Args:
csv_path (str): Path to the source CSV
embeddings_path (str): Path to save embeddings
metadata_path (str): Path to save metadata
"""
model = get_model()
if csv_path is None:
csv_path = RAW_CSV
if embeddings_path is None:
embeddings_path = os.path.join(OUTPUT_DIR, "parlement_titledate_embeddings.npy")
if metadata_path is None:
metadata_path = os.path.join(OUTPUT_DIR, "parlement_titledate_metadatas.json")
cols = ["id", "titre", "date", "langue", "lien"]
df = pd.read_csv(csv_path, usecols=cols)
embeddings = []
metadatas = []
for index, row in df.iterrows():
text = row['titre'] + ' ' + row['date']
emb = model.encode(text)
embeddings.append(emb)
metadatas.append({
"id": row['id'],
"titre": row['titre'],
"date": row['date'],
"langue": row['langue'],
"lien": row['lien']
})
os.makedirs(os.path.dirname(embeddings_path), exist_ok=True)
np.save(embeddings_path, np.array(embeddings))
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(metadatas, f, ensure_ascii=False, indent=2)
print(f"Saved {len(embeddings)} parlement embeddings and metadatas (sans sous-titre).")
def detect_intention(query):
"""
Detect the user's query intention ("lois/règlements" or "parlement") using Gemini model.
Cleans the output to ensure compatibility with the pipeline.
Args:
query (str): User query
Returns:
str: detected intention ('lois/règlements' or 'parlement')
"""
prompt = f"""
Classify the following query: "{query}"
Context: This is for a Moroccan legal document search system with two categories:
- "parlement": Parliamentary content including debates, speeches, and discussions from the Moroccan Parliament (Majlis al-Nawab). This includes transcripts of parliamentary sessions, what parliamentarians said, debates on various topics, and parliamentary proceedings.
- "lois/règlements": Official legal documents including laws, codes, regulations, and legislative texts. This includes the Penal Code, Civil Code, Commercial Code, environmental laws, social security regulations, traffic rules, and other official legal texts.
Respond only with "lois/règlements" or "parlement".
"""
try:
gemini = GeminiWrapper()
raw = gemini.generate(prompt).strip().lower()
# Robust cleaning to avoid routing errors
cleaned = raw.replace('«', '').replace('»', '').replace('"', '').replace("'", '').strip()
if 'lois' in cleaned:
return 'lois/règlements'
if 'parlement' in cleaned:
return 'parlement'
return 'lois/règlements'
except Exception as e:
print(f"Error using Gemini model: {e}")
# Fallback to default behavior
return 'lois/règlements'
def select_parlement_transcript(query: str, embeddings_path="output/parlement_titledate_embeddings.npy", metadata_path="output/parlement_titledate_metadatas.json", top_k: int = 1):
"""
Search for the most relevant parliamentary transcript(s) for a query.
Uses similarity between the query and (title + date).
Args:
query (str): User query
embeddings_path (str): Path to parliamentary embeddings
metadata_path (str): Path to parliamentary metadata
top_k (int): Number of results to return
Returns:
List[Dict]: List of relevant results (id, title, date, language, link)
"""
model = get_model()
embeddings = np.load(embeddings_path)
with open(metadata_path, encoding="utf-8") as f:
metadatas = json.load(f)
query_emb = model.encode(query)
import torch
embeddings_tensor = torch.tensor(embeddings)
query_tensor = torch.tensor(query_emb)
cos_sim = util.cos_sim(query_tensor, embeddings_tensor)[0]
scored_docs = []
for idx, sim in enumerate(cos_sim.tolist()):
doc = metadatas[idx]
scored_docs.append((sim, doc))
scored_docs.sort(reverse=True, key=lambda x: x[0])
return [doc for score, doc in scored_docs[:top_k]]
def search_relevant_documents(query: str, top_k: int = 3) -> dict:
"""
Search for the most relevant documents following the logic of the provided graph:
Returns a dictionary with intention, language (if applicable), and the relevant documents.
"""
# Detect language
lang = detect_language(query)
if embeddings is None or metadatas is None:
return {
"error": "Document embeddings not available. Please run preprocessing first.",
"language": lang
}
# Filter by language and select documents
indices = filter_by_language(metadatas, lang)
filtered_embeddings = embeddings[indices]
filtered_metadatas = [metadatas[i] for i in indices]
results = select_documents(
query,
filtered_embeddings,
filtered_metadatas,
lang=lang,
top_k=top_k
)
return {
#"intention": 'intention',
"language": lang,
"results": results,
"count": len(results)
}