import os import re import time import gzip import pickle import logging import threading import random import uuid import numpy as np import unicodedata from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from dataclasses import dataclass, field from typing import List, Dict, Any, Optional, Tuple from functools import lru_cache from multiprocessing import Pool, cpu_count from concurrent.futures import ThreadPoolExecutor, as_completed # langchain-ish imports (adapt to your environment) from langchain_core.documents import Document from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import FAISS as LangchainFAISS from rank_bm25 import BM25Okapi from sentence_transformers import CrossEncoder, SentenceTransformer from sentence_transformers import util from sklearn.metrics.pairwise import cosine_similarity from sklearn.feature_extraction.text import TfidfVectorizer from langchain_groq import ChatGroq # Optional tokenizer (HuggingFace) from transformers import AutoTokenizer import spacy # ========================================================== # CONFIGURAZIONE # ========================================================== logger = logging.getLogger("hybrid_rag_modular") logging.basicConfig(level=logging.INFO) # ----------------------------- # Config / Defaults # ----------------------------- GROQ_KEYS = ["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", "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" ] DEFAULT_TOKENIZER = AutoTokenizer.from_pretrained("bert-base-uncased") DEFAULT_MAX_CONTEXT_TOKENS = 4000 DEFAULT_SIM_THRESHOLD = 0.3 # dense score threshold (cosine/similarity scale depends on store) DEFAULT_DENSE_WEIGHT = 0.375 #0.7 DEFAULT_BM25_WEIGHT = 0.625 #0.3 PENALTY_MISSING_BM25 = 0.1 # severa: componente principale PENALTY_MISSING_DENSE = 0.02 # lieve: componente secondaria # ----------------------------- # Utilities # ----------------------------- def normalize_tokens(text: str) -> List[str]: """Fast tokenization: returns word tokens lowercased.""" if not isinstance(text, str): return [] return re.findall(r"\b\w+\b", text.lower()) nlp_model = spacy.load("it_core_news_sm", disable=["ner", "parser"]) # aggiungo il sentencizer → FIX E030 nlp_model.add_pipe("sentencizer") class SemanticChunker: def __init__(self, embedder, max_tokens=220, min_tokens=40, sim_threshold=0.75, overlap_ratio=0.30, spacy_model="it_core_news_sm"): self.embedder = embedder self.max_tokens = max_tokens self.min_tokens = min_tokens self.sim_threshold = sim_threshold self.overlap_ratio = overlap_ratio if spacy_model == "it_core_news_sm": # carico modello spaCy leggero self.nlp = nlp_model else: self.nlp = spacy.load("it_core_news_sm", disable=["ner", "parser"]) nlp_model.add_pipe("sentencizer") def tokenize_sentences(self, text): doc = self.nlp(text) return [s.text.strip() for s in doc.sents if len(s.text.strip()) > 0] def embed(self, text): return np.array(self.embedder.embed_query(text), dtype=float) def chunk(self, text): sentences = self.tokenize_sentences(text) if not sentences: return [] chunks = [] current = [] prev_emb = None for s in sentences: current.append(s) joined = " ".join(current) token_len = len(joined.split()) # HARD LIMIT if token_len >= self.max_tokens: chunks.append(joined) keep_n = max(1, int(len(current) * self.overlap_ratio)) current = current[-keep_n:] prev_emb = None continue # SEMANTIC BOUNDARY emb = self.embed(s) if prev_emb is not None: sim = float(np.dot(emb, prev_emb) / (np.linalg.norm(emb) * np.linalg.norm(prev_emb))) if sim < self.sim_threshold and token_len > self.min_tokens: chunks.append(" ".join(current[:-1])) current = [s] prev_emb = emb if current: chunks.append(" ".join(current)) return chunks # ----------------------------- # Tokenizer / Context management # ----------------------------- @dataclass class ContextManager: tokenizer: Any = DEFAULT_TOKENIZER max_tokens: int = DEFAULT_MAX_CONTEXT_TOKENS truncate_enabled: bool = False def truncate_context(self, text: str, max_tokens: Optional[int] = None) -> str: try: toks = self.tokenizer.encode(text, add_special_tokens=False) if not self.truncate_enabled: return self.tokenizer.decode(toks, clean_up_tokenization_spaces=True) if max_tokens is None: max_tokens = self.max_tokens if len(toks) <= max_tokens: return self.tokenizer.decode(toks, clean_up_tokenization_spaces=True) toks = toks[-max_tokens:] return self.tokenizer.decode(toks, clean_up_tokenization_spaces=True) except Exception as e: print(f"⚠️ Errore in ContextManager.truncate_context: {e}") # Fallback semplice if not self.truncate_enabled or len(text) <= max_tokens * 4: # Stima approssimativa return text return text[-(max_tokens * 4):] # Tronca per caratteri # ----------------------------- # Prompt manager # ----------------------------- @dataclass class PromptManager: templates: Dict[str, str] = field(default_factory=dict) def __post_init__(self): # default templates (small and editable) self.templates.setdefault("narrativo", "Rispondi in una frase in 1ª persona in tono epico e solenne come se fossi {treename}.\nUsa solo il contesto.\nContesto:{context}\nDomanda:{question}\nRisposta breve:") self.templates.setdefault("scientifico", "Rispondi in terza persona con stile enciclopedico. Usa solo il contesto.\nContesto:{context}\nDomanda:{question}\nRisposta breve:") def build(self, style: str, context: str, question: str, treename: Optional[str] = None) -> str: template = self.templates.get(style) if template is None: template = self.templates["scientifico"] safe_question = question.replace("{", "{{").replace("}", "}}") return template.format(context=context, question=safe_question, treename=treename or "l'albero") # ----------------------------- # Index Manager (FAISS + BM25) - VERSIONE OTTIMIZZATA # ----------------------------- class IndexManager: def __init__(self, embedding_model: Any): self.embedding = embedding_model self.index_cache: Dict[str, Dict[str, Any]] = {} self.cache_lock = threading.Lock() # Cache per embeddings pre-calcolati self.embedding_cache = {} self.embedding_cache_lock = threading.Lock() def _prepare_texts(self, docs: List[Document]) -> Tuple[List[str], List[Dict[str, Any]]]: """METODO MANCANTE - AGGIUNGI QUESTO""" texts = [d.page_content for d in docs] metadatas = [d.metadata for d in docs] return texts, metadatas def _embed_batch_parallel(self, texts: List[str], batch_size: int = 128) -> List[List[float]]: """Embedding parallelo ottimizzato""" # Controlla cache prima di calcolare with self.embedding_cache_lock: cached_results = [] texts_to_embed = [] indices_to_embed = [] for i, text in enumerate(texts): text_hash = hash(text) if text_hash in self.embedding_cache: cached_results.append((i, self.embedding_cache[text_hash])) else: texts_to_embed.append(text) indices_to_embed.append(i) # Se tutto è in cache, restituisci in ordine if not texts_to_embed: return [res for _, res in sorted(cached_results)] # Calcola embeddings per i testi mancanti with ThreadPoolExecutor(max_workers=min(8, cpu_count())) as executor: batches = [texts_to_embed[i:i + batch_size] for i in range(0, len(texts_to_embed), batch_size)] futures = [executor.submit(self.embedding.embed_documents, batch) for batch in batches] new_embeddings = [] for future in as_completed(futures): new_embeddings.extend(future.result()) # Aggiorna cache e costruisci risultato finale results = [None] * len(texts) # Inserisci risultati dalla cache for idx, emb in cached_results: results[idx] = emb # Inserisci nuovi risultati e aggiorna cache for idx, text_idx in enumerate(indices_to_embed): text = texts_to_embed[idx] embedding = new_embeddings[idx] text_hash = hash(text) with self.embedding_cache_lock: self.embedding_cache[text_hash] = embedding results[text_idx] = embedding return results def initialize_space(self, space_name: str, docs: List[Document], index_path: Optional[str] = None) -> Dict[str, Any]: if not docs: raise ValueError("docs empty") logger.info("Initializing space %s with %d docs", space_name, len(docs)) texts, metadatas = self._prepare_texts(docs) # ✅ OPTIMIZATION 1: Embedding parallelo con cache logger.info("Computing embeddings in parallel with cache...") start_time = time.time() try: embeddings = self._embed_batch_parallel(texts) # Usa l'approccio originale di LangChain ma con embeddings pre-calcolati text_embeddings = list(zip(texts, embeddings)) store = LangchainFAISS.from_embeddings(text_embeddings, self.embedding, metadatas=metadatas) except Exception as e: logger.warning("Parallel embedding failed, falling back to sequential: %s", e) # Fallback all'approccio originale store = LangchainFAISS.from_documents(docs, self.embedding) embedding_time = time.time() - start_time logger.info(f"Embedding completed in {embedding_time:.2f} seconds") # BM25 - versione ottimizzata con chunking logger.info("Building BM25 index...") start_time = time.time() # Ottimizzazione: processa in chunk per evitare memory overflow chunk_size = 1000 tokenized = [] for i in range(0, len(texts), chunk_size): chunk_texts = texts[i:i + chunk_size] with Pool(min(cpu_count(), 4)) as p: tokenized.extend(p.map(normalize_tokens, chunk_texts)) bm25_index = BM25Okapi(tokenized) bm25_time = time.time() - start_time logger.info(f"BM25 indexing completed in {bm25_time:.2f} seconds") data = { "store": store, "docs": docs, "bm25": { "bm25_index": bm25_index, "texts": texts, "metadatas": metadatas, "tokenized": tokenized, }, "path": index_path, "timestamp": time.time(), } # Persistenza (opzionale) - fatta in background if index_path: def save_index_async(): try: os.makedirs(index_path, exist_ok=True) store.save_local(index_path) with gzip.open(os.path.join(index_path, "bm25_tokens.pkl.gz"), "wb") as f: pickle.dump(tokenized, f) logger.info("Index saved asynchronously to %s", index_path) except Exception as e: logger.warning("Could not save index to %s: %s", index_path, e) # Avvia il salvataggio in background threading.Thread(target=save_index_async, daemon=True).start() with self.cache_lock: self.index_cache[space_name] = data total_time = time.time() - start_time logger.info("Space %s initialized in %.2f seconds", space_name, total_time) return data def get_index(self, space_name: str) -> Optional[Dict[str, Any]]: with self.cache_lock: return self.index_cache.get(space_name) # ----------------------------- # Retriever: hybrid (dense + bm25) with weighted fusion # ----------------------------- class Retriever: def __init__(self, index_manager: IndexManager, dense_weight: float = DEFAULT_DENSE_WEIGHT, bm25_weight: float = DEFAULT_BM25_WEIGHT): self.index_manager = index_manager self.dense_weight = dense_weight self.bm25_weight = bm25_weight def _run_dense(self, store, query, k): """Dense retrieval in thread.""" try: dense_results = store.similarity_search_with_score(query, k=k) dense_docs, dense_scores = zip(*dense_results) if dense_results else ([], []) return list(dense_docs), list(dense_scores) except Exception: docs = store.similarity_search(query, k=k) return list(docs), [0.0] * len(docs) def _run_bm25(self, bm25_index, texts, query): """BM25 retrieval in thread.""" qtok = normalize_tokens(query) scores = np.array(bm25_index.get_scores(qtok)) return scores def retrieve(self, space_name: str, query: str, top_k: int = 5, similarity_threshold: float = DEFAULT_SIM_THRESHOLD) -> List[Tuple[Document, float]]: index = self.index_manager.get_index(space_name) if index is None: return [] store = index["store"] bm25 = index["bm25"] # -------- PARALLELIZZAZIONE -------- with ThreadPoolExecutor(max_workers=2) as exe: fut_dense = exe.submit(self._run_dense, store, query, top_k) fut_bm25 = exe.submit(self._run_bm25, bm25["bm25_index"], bm25["texts"], query) # Attendi entrambi for fut in as_completed([fut_dense, fut_bm25]): pass dense_docs, dense_scores = fut_dense.result() bm25_scores = fut_bm25.result() dense_scores = np.array(dense_scores) # ---- costruiamo candidati come prima ---- candidates = {} # dense top_dense_idx = np.argsort(dense_scores)[::-1][:top_k] if len(dense_scores) else [] for i in top_dense_idx: doc = dense_docs[i] score = float(dense_scores[i]) key = (doc.metadata.get("source",""), doc.metadata.get("chunk_index",""), hash(doc.page_content)) candidates[key] = {"doc": doc, "dense_score": score, "bm25_score": 0.0} # bm25 top_bm25_idx = np.argsort(bm25_scores)[::-1][:top_k] for i in top_bm25_idx: key_meta = (bm25["metadatas"][i].get("source",""), bm25["metadatas"][i].get("chunk_index",""), hash(bm25["texts"][i])) if key_meta in candidates: candidates[key_meta]["bm25_score"] = float(bm25_scores[i]) else: doc = Document( page_content=bm25["texts"][i], metadata=bm25["metadatas"][i] ) candidates[key_meta] = {"doc": doc, "dense_score": 0.0, "bm25_score": float(bm25_scores[i])} # ---------------------- NORMALIZZAZIONE ---------------------- dense_vals = np.array([v["dense_score"] for v in candidates.values()]) bm25_vals = np.array([v["bm25_score"] for v in candidates.values()]) # Z-score per dense dense_norm = ( (dense_vals - dense_vals.mean()) / (dense_vals.std() + 1e-9) if len(dense_vals) > 0 else dense_vals ) # log + Z-score per BM25 (più stabile) bm25_log = np.log1p(bm25_vals) bm25_norm = ( (bm25_log - bm25_log.mean()) / (bm25_log.std() + 1e-9) if len(bm25_vals) > 0 else bm25_vals ) # ---------------------- WEIGHTED FUSION (pre-rank) ---------------------- # Pesatura dei due moduli fused_scores = ( self.dense_weight * dense_norm + self.bm25_weight * bm25_norm ) # ---------------------- PENALITÀ PER MODULO MANCANTE ---------------------- fused_scores = fused_scores.copy() for i, (key, val) in enumerate(candidates.items()): if val["dense_score"] == 0: fused_scores[i] -= PENALTY_MISSING_DENSE if val["bm25_score"] == 0: fused_scores[i] -= PENALTY_MISSING_BM25 # ---------------------- RANKING DELLA FUSIONE ---------------------- # Rank globale sui fused_scores fused_rank = np.argsort(np.argsort(-fused_scores)) # rank 0 = migliore # ---------------------- RRF BASATO SU FUSED RANK ---------------------- k_rrf = 60 final_candidates = [] for (key, val), r in zip(candidates.items(), fused_rank): # score RRF su un solo rank (quello fuso) rrf_score = 1.0 / (k_rrf + r) # filtro di sicurezza if val["dense_score"] and val["dense_score"] < similarity_threshold: if val["bm25_score"] <= 0: continue final_candidates.append((val["doc"], float(rrf_score))) # Ordina definitivamente final_sorted = sorted(final_candidates, key=lambda x: x[1], reverse=True)[:top_k] return final_sorted # ----------------------------- # Reranker # ----------------------------- CROSS_ENCODER = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') E5_MODEL = SentenceTransformer('intfloat/e5-base-v2') class AdvancedHybridReranker: def __init__(self, tokenizer: Any = DEFAULT_TOKENIZER, max_doc_tokens: int = 400, variant: str = "narrativo", w_variant : int = 0.5): self.tokenizer = tokenizer self.max_doc_tokens = max_doc_tokens self.name = "advanced_hybrid" # ✅ Corretto operatore ternario self.w_variant = w_variant if variant == "scientifico" else 0 self._initialize_models() def _initialize_models(self): """Inizializza entrambi i modelli""" try: # ✅ Usa le variabili globali già inizializzate self.cross_encoder = CROSS_ENCODER self.e5_model = E5_MODEL print("✅ Advanced Hybrid Reranker caricato con successo") except Exception as e: print(f"❌ Errore nel caricamento modelli: {e}") self.cross_encoder = None self.e5_model = None def _dynamic_weight_calculation(self, query: str, docs: List[Document]) -> Tuple[float, float]: """ Calcola pesi dinamici in base alla complessità della query e diversità dei documenti """ query_complexity = self._calculate_query_complexity(query) doc_diversity = self._calculate_document_diversity(docs) # Calcolo pesi semplificato if query_complexity > 0.7: cross_weight = min(0.7 + self.w_variant, 0.975) elif doc_diversity > 0.6: cross_weight = min(0.4 + self.w_variant, 0.975) else: cross_weight = min(0.475 + self.w_variant, 0.975) e5_weight = 1.0 - cross_weight return cross_weight, e5_weight def _calculate_query_complexity(self, query: str) -> float: """Stima la complessità della query basata su lunghezza e specificità""" words = query.lower().split() unique_words = set(words) # Fattori di complessità length_factor = min(len(words) / 20, 1.0) # Normalizza su 20 parole uniqueness_factor = len(unique_words) / len(words) if words else 0 # Query con termini specifici (nomi propri, numeri) sono più complesse specific_terms = sum(1 for word in words if word.istitle() or word.isdigit()) specificity_factor = specific_terms / len(words) if words else 0 complexity = (length_factor * 0.4 + uniqueness_factor * 0.3 + specificity_factor * 0.3) return min(complexity, 1.0) def _calculate_document_diversity(self, docs: List[Document]) -> float: """Calcola quanto i documenti sono diversi tra loro""" if len(docs) <= 1: return 0.0 # Campiona i primi 100 caratteri di ogni documento per efficienza sample_texts = [doc.page_content[:100] for doc in docs[:5]] # Limita a 5 docs per performance try: vectorizer = TfidfVectorizer(max_features=50, stop_words='english') tfidf_matrix = vectorizer.fit_transform(sample_texts) # Calcola similarità media tra documenti similarity_matrix = cosine_similarity(tfidf_matrix) np.fill_diagonal(similarity_matrix, 0) # Rimuove auto-similarità # Diversità = 1 - similarità media avg_similarity = np.mean(similarity_matrix) if similarity_matrix.size > 0 else 0 diversity = 1 - avg_similarity return diversity except: return 0.5 # Fallback def rerank(self, query: str, docs: List[Document], top_k: int = 5) -> List[Document]: if not docs or not self.cross_encoder or not self.e5_model: return docs[:top_k] try: # Prepara i documenti troncati truncated_docs = [] for doc in docs: truncated = self._truncate_for_reranker(doc.page_content) truncated_docs.append(truncated) # 1. CALCOLA PESI DINAMICI cross_weight, e5_weight = self._dynamic_weight_calculation(query, docs) print(f"🎯 Pesi dinamici - Cross: {cross_weight:.2f}, E5: {e5_weight:.2f}") # 2. SCORE CROSS-ENCODER (preciso ma computazionalmente costoso) cross_pairs = [[query, doc_text] for doc_text in truncated_docs] cross_scores = self.cross_encoder.predict(cross_pairs, batch_size=32) cross_scores = np.array(cross_scores) # 3. SCORE E5 (veloce e buona copertura semantica) query_text = f"query: {query}" doc_texts = [f"passage: {doc_text}" for doc_text in truncated_docs] query_emb = self.e5_model.encode(query_text, normalize_embeddings=True) doc_embs = self.e5_model.encode(doc_texts, normalize_embeddings=True) e5_scores = util.cos_sim(query_emb, doc_embs)[0].numpy() # 4. NORMALIZZAZIONE ROBUSTA def robust_normalize(scores): if len(scores) == 0: return scores scores = np.array(scores) if scores.max() == scores.min(): return np.ones_like(scores) * 0.5 return (scores - scores.min()) / (scores.max() - scores.min()) cross_norm = robust_normalize(cross_scores) e5_norm = robust_normalize(e5_scores) # 5. FUSIONE PESATA CON CONFIDENCE SCORING final_scores = [] confidence_scores = [] for i, (cross_score, e5_score) in enumerate(zip(cross_norm, e5_norm)): # Score base con pesi dinamici base_score = cross_weight * cross_score + e5_weight * e5_score # Confidence bonus per accordo tra modelli agreement = 1 - abs(cross_score - e5_score) confidence_bonus = agreement * 0.1 # Bonus fino al 10% final_score = base_score + confidence_bonus final_scores.append(final_score) confidence_scores.append(agreement) # 6. RANKING FINALE CON DIVERSITY AWARE ranked_indices = np.argsort(final_scores)[::-1] # Applica leggera penalità per documenti troppo simili tra i top risultati final_ranking = [] selected_indices = set() for idx in ranked_indices: if len(final_ranking) >= top_k: break # Controlla similarità con documenti già selezionati if self._is_too_similar(idx, selected_indices, doc_embs, final_ranking): continue final_ranking.append(docs[idx]) selected_indices.add(idx) # Se non abbiamo abbastanza documenti diversi, aggiungi i migliori rimanenti if len(final_ranking) < top_k: remaining_indices = [i for i in ranked_indices if i not in selected_indices] for idx in remaining_indices[:top_k - len(final_ranking)]: final_ranking.append(docs[idx]) print(f"📊 Advanced Hybrid - Top {len(final_ranking)} documenti selezionati") return final_ranking except Exception as e: print(f"❌ Errore in Advanced Hybrid Reranker: {e}") # Fallback a cross-encoder semplice try: pairs = [[query, self._truncate_for_reranker(doc.page_content)] for doc in docs] scores = self.cross_encoder.predict(pairs, batch_size=32) idx = np.argsort(scores)[::-1][:top_k] return [docs[i] for i in idx] except: return docs[:top_k] def _is_too_similar(self, candidate_idx: int, selected_indices: set, doc_embs, current_ranking: List[Document], threshold: float = 0.85) -> bool: """Controlla se il documento candidato è troppo simile a quelli già selezionati""" if not selected_indices: return False candidate_emb = doc_embs[candidate_idx] for selected_idx in selected_indices: similarity = np.dot(candidate_emb, doc_embs[selected_idx]) if similarity > threshold: return True return False def __init__(self, model_name: str = "", variant= "narrativo", tokenizer: Any = DEFAULT_TOKENIZER, max_doc_tokens: int = 400): """ max_doc_tokens: numero massimo di token consentiti per il documento passato al CrossEncoder (MiniLM ha limite 512, quindi 400 è sicuro). """ if model_name == "advance": self.model = AdvancedHybridReranker(tokenizer=tokenizer, variant=variant) else: self.model = None self.tokenizer = tokenizer self.max_doc_tokens = max_doc_tokens def _truncate_for_reranker(self, text: str) -> str: """Tronca il documento per non superare il limite del CrossEncoder.""" toks = self.tokenizer.encode(text, add_special_tokens=False) if len(toks) <= self.max_doc_tokens: return text toks = toks[:self.max_doc_tokens] return self.tokenizer.decode(toks, skip_special_tokens=True) def rerank(self, query: str, docs: List[Document], top_k: int = 5) -> List[Document]: # Se non c'è modello, restituisci i primi top_k if self.model is None: return list(docs[:top_k]) if not docs: return [] # ✅ CORREZIONE: Usa direttamente il metodo rerank di AdvancedHybridReranker if isinstance(self.model, AdvancedHybridReranker): return self.model.rerank(query, docs, top_k) # ✅ CORREZIONE: Fallback per altri tipi di reranker try: # Prepara le coppie (query, documento troncato) pairs = [] for d in docs: truncated = self._truncate_for_reranker(d.page_content) pairs.append([query, truncated]) # Predizione con cross-encoder standard scores = self.model.predict(pairs, batch_size=64) # Ordina per score decrescente idx = np.argsort(scores)[::-1][:top_k] return [docs[i] for i in idx] except Exception as e: print(f"❌ Errore nel reranking: {e}") return docs[:top_k] # ----------------------------- # LLM Client (stub for Groq + local fallback) # ----------------------------- class LLMClient: def __init__(self, remote_keys: Optional[List[str]] = None, model_id: str = "llama-3.1-8b-instant", temperature: float = 0.2, timeout: int = 15): self.remote_keys = remote_keys or [] self.model_id = model_id self.temperature = temperature self.timeout = timeout def variant(self, temperature: float): self.temperature = temperature def call(self, prompt: str) -> str: last_err = None for key in GROQ_KEYS: try: llm = ChatGroq( model=self.model_id, groq_api_key=key, temperature=self.temperature, timeout=30,) ans = llm.invoke(prompt) return ans.content if hasattr(ans, "content") else str(ans) except Exception as e: last_err = e time.sleep(1 + random.random()) print(last_err) # fallback return f"[LLM FALLBACK] remote generation failed: {str(last_err)}" embedding_narrativo = HuggingFaceEmbeddings( model_name="dbmdz/bert-base-italian-xxl-cased", model_kwargs={'device': 'cpu'}, encode_kwargs={'normalize_embeddings': True}) embedding_scientifico = HuggingFaceEmbeddings( model_name="Geotrend/bert-base-en-it-cased", model_kwargs={'device': 'cpu'}) # Aggiungi questa funzione per selezionare l'embedder in base alla variant def get_embedding_model(variant: str = "narrativo"): """ Restituisce il modello di embedding appropriato in base alla variant. """ variant = variant.lower().strip() if variant == "narrativo": return embedding_narrativo elif variant == "scientifico": return embedding_scientifico else: # Fallback default return HuggingFaceEmbeddings( model_name="sentence-transformers/all-mpnet-base-v2", model_kwargs={'device': 'cpu'} ) # ----------------------------- # Pipeline # ----------------------------- class HybridCrossEncoderRAG: def __init__(self, embedding_model: Any, tokenizer: Any = DEFAULT_TOKENIZER, index_manager: Optional[IndexManager] = None, retriever: Optional[Retriever] = None, reranker: Optional[AdvancedHybridReranker] = None, variant: str = "narrativo", context_manager: Optional[ContextManager] = None, prompt_manager: Optional[PromptManager] = None, llm_client: Optional[LLMClient] = None): # COMPONENTI PRINCIPALI (iniezione + fallback) self.index_manager = index_manager or IndexManager(embedding_model) self.retriever = retriever or Retriever(self.index_manager) self.reranker = reranker or AdvancedHybridReranker(tokenizer=tokenizer, variant=variant, max_doc_tokens=400) self.context_manager = context_manager or ContextManager(tokenizer) self.prompt_manager = prompt_manager or PromptManager() self.llm_client = llm_client or LLMClient() # default prompt style self.set_mode(variant) # ---------- # Helpers # ---------- def clean_text(self, text: str) -> str: # lowercase text = text.lower() # normalizzazione unicode (rimuove accenti combinati strani, simboli anomali) text = unicodedata.normalize("NFKC", text) # rimozione segni grafici strani (tieni la punteggiatura standard . , ; : ! ? ' " ( ) - text = re.sub(r"[^a-z0-9à-ü\s\.,;:!\?\'\"\(\)\-\n]", " ", text) # rimuove punteggiatura opzionale (se vuoi eliminarla del tutto) # text = re.sub(r"[^\w\sà-ü]", " ", text) # sostituisce spazi multipli con uno solo text = re.sub(r"\s+", " ", text) # trim finale return text.strip() def build_docs_from_text(self, tree_data: str, preprocessing=True) -> List[Document]: tree_data = self.clean_text(tree_data) sections = tree_data.split('\n\n') all_docs = [] for i, section in enumerate(sections): match = re.search(r"(?i)DATI\s+([A-ZÀ-Ü\s]+)[:\-]?", section) title = match.group(1).strip() if match else f"Parte {i+1}" content = section.replace(match.group(0), '').strip() if match else section doc = Document(page_content=content, metadata={"source": f"tree_data_part_{i+1}", "title": f"DATI {title.upper()}"}) all_docs.append(doc) return all_docs def document_to_chunks(self, all_docs): """ Trasforma i Document originali in chunk semantici per il RAG, usando SemanticChunker (SpaCy + embeddings). """ # inizializza il semantic chunker usando lo stesso embedder del retriever chunker = SemanticChunker(embedder=self.index_manager.embedding, max_tokens=220, min_tokens=40, sim_threshold=0.75, overlap_ratio=0.30) final_chunks = [] for doc in all_docs: text = doc.page_content # generazione chunk semantici pieces = chunker.chunk(text) # ricostruzione Document con metadata for i, chunk_text in enumerate(pieces): metadata = dict(doc.metadata) metadata.update({ "chunk_index": i, "chunk_type": "semantic", "source_doc": doc.metadata.get("source", None) }) final_chunks.append(Document( page_content=chunk_text, metadata=metadata )) return final_chunks # ---------- # Public API # ---------- def build_space(self, space_name: str, context: str, index_path: Optional[str] = None) -> None: docs = self.build_docs_from_text(context) chunks = self.document_to_chunks(docs) self.index_manager.initialize_space(space_name, chunks, index_path=index_path) def set_mode(self, mode: str): """ Imposta prompt style e temperatura del modello in base a una stringa. - "narrativo" -> prompt narrativo, temperatura 3.0 - "scientifico" -> prompt scientifico, temperatura 1.5 """ mode = mode.lower().strip() if mode == "narrativo": self.prompt_style = "narrativo" self.llm_client.variant(temperature=0.3) elif mode == "scientifico": self.prompt_style = "scientifico" self.llm_client.variant(temperature=0.15) else: raise ValueError(f"Modalità '{mode}' non riconosciuta. Usa 'narrativo' o 'scientifico'.") def run(self, space_name: str, question: str, treename: Optional[str] = None, top_k: int = 3) -> str: # 1) retrieve hybrid candidates = self.retriever.retrieve(space_name, question, top_k=top_k*2) docs = [d for d, _ in candidates] if not docs: return "Nessun documento rilevante trovato nel contesto." # 2) rerank top candidates reranked = self.reranker.rerank(question, docs, top_k=top_k) # 3) build context ctx = "\n\n".join(d.page_content for d in reranked) ctx = self.context_manager.truncate_context(ctx) # 4) final prompt prompt = self.prompt_manager.build(self.prompt_style, ctx, question, treename=treename) # 5) call LLM return self.llm_client.call(prompt) # ========================================================== # MANAGER # ========================================================== class SimpleRAGManager: """ Gestore semplificato per multiple istanze RAG con cleanup automatico dopo 12 ore """ def __init__(self): self.instances = {} # instance_id -> HybridCrossEncoderRAG self.instance_spaces = {} # instance_id -> [space1, space2, ...] self.last_used_times = {} # instance_id -> ultimo uso timestamp self.space_creation_times = {} # space_id -> creation timestamp self.cleanup_interval = 12 * 60 * 60 # ✅ CAMBIATO: 12 ore in secondi self.last_cleanup_time = time.time() # Avvia il thread di cleanup automatico self._start_cleanup_thread() def _start_cleanup_thread(self): """Avvia il thread per cleanup automatico ogni 30 minuti""" def cleanup_loop(): while True: time.sleep(1800) # ✅ CAMBIATO: Controlla ogni 30 minuti (1800 secondi) self.cleanup_old_instances() cleanup_thread = threading.Thread(target=cleanup_loop, daemon=True) cleanup_thread.start() print("🔄 Thread di cleanup automatico avviato (controllo ogni 30 minuti)") def create_instance_with_variant(self, variant: str = "narrativo") -> str: """Crea una nuova istanza RAG con variant specifico""" instance_id = str(uuid.uuid4()) # Setup componenti my_embedder = get_embedding_model(variant) custom_llm = LLMClient(model_id="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.3) custom_prompt = PromptManager() reranker = AdvancedHybridReranker(variant=variant) # Crea l'istanza RAG rag_instance = HybridCrossEncoderRAG( embedding_model=my_embedder, llm_client=custom_llm, prompt_manager=custom_prompt, reranker=reranker, variant=variant ) self.instances[instance_id] = rag_instance self.instance_spaces[instance_id] = [] self.last_used_times[instance_id] = time.time() print(f"✅ Creata istanza RAG: {instance_id} con variant: {variant}") return instance_id def initialize_space(self, instance_id: str, context: str) -> str: """Inizializza uno spazio per un'istanza""" if instance_id not in self.instances: raise ValueError(f"Instance {instance_id} not found") rag_instance = self.instances[instance_id] space_id = str(uuid.uuid4()) # Inizializza lo spazio s = time.time() rag_instance.build_space(space_id, context) e = time.time() print(f"Time: {e-s}sec") self.instance_spaces[instance_id].append(space_id) self.space_creation_times[space_id] = time.time() self.last_used_times[instance_id] = time.time() print(f"✅ Spazio {space_id} creato per istanza {instance_id}") return space_id def query(self, instance_id: str, space_id: str, question: str, tree_name: str = "") -> str: """Esegue una query - aggiorna il timestamp di ultimo uso""" if instance_id not in self.instances: return "❌ Istanza non trovata" # AGGIORNA TIMESTAMP DI USO self.last_used_times[instance_id] = time.time() rag_instance = self.instances[instance_id] return rag_instance.run( space_name=space_id, question=question, treename=tree_name ) def cleanup_old_instances(self, max_age_hours: int = 12): # ✅ CAMBIATO: default 12 ore """ Elimina automaticamente istanze e spazi non utilizzati nelle ultime X ore """ current_time = time.time() max_age_seconds = max_age_hours * 60 * 60 instances_to_delete = [] spaces_to_delete = [] # Trova istanze vecchie for instance_id, last_used in self.last_used_times.items(): if current_time - last_used > max_age_seconds: instances_to_delete.append(instance_id) # Trova tutti gli spazi di questa istanza if instance_id in self.instance_spaces: spaces_to_delete.extend(self.instance_spaces[instance_id]) print(f"⏰ Istanza {instance_id} inattiva da {(current_time - last_used) / 3600:.1f} ore - marcata per cancellazione") # Trova spazi orfani (istanza eliminata ma spazio ancora nella mappa) for space_id, creation_time in self.space_creation_times.items(): if current_time - creation_time > max_age_seconds: # Verifica se lo spazio è ancora associato a un'istanza valida space_has_instance = False for instance_id, spaces in self.instance_spaces.items(): if space_id in spaces and instance_id in self.instances: space_has_instance = True break if not space_has_instance and space_id not in spaces_to_delete: spaces_to_delete.append(space_id) print(f"⏰ Spazio orfano {space_id} - marcato per cancellazione") # Esegui eliminazione deleted_count = 0 for instance_id in instances_to_delete: if self._delete_instance_internal(instance_id): deleted_count += 1 print(f"🧹 Cleanup AUTOMATICO: eliminata istanza {instance_id} (inattiva da {max_age_hours}h)") for space_id in spaces_to_delete: if self._delete_space_internal(space_id): deleted_count += 1 print(f"🧹 Cleanup AUTOMATICO: eliminato spazio {space_id} (inattivo da {max_age_hours}h)") self.last_cleanup_time = current_time if deleted_count > 0: print(f"🧹 Cleanup AUTOMATICO completato: {deleted_count} elementi eliminati dopo {max_age_hours} ore di inattività") return deleted_count def _delete_instance_internal(self, instance_id: str) -> bool: """Elimina istanza internamente (senza restituire messaggio)""" if instance_id in self.instances: # Elimina tutti gli spazi associati if instance_id in self.instance_spaces: for space_id in self.instance_spaces[instance_id]: self._delete_space_cache(space_id) if space_id in self.space_creation_times: del self.space_creation_times[space_id] del self.instance_spaces[instance_id] # Elimina l'istanza del self.instances[instance_id] if instance_id in self.last_used_times: del self.last_used_times[instance_id] return True return False def _delete_space_internal(self, space_id: str) -> bool: """Elimina spazio internamente (senza restituire messaggio)""" # Cerca in tutte le istanze for instance_id, spaces in self.instance_spaces.items(): if space_id in spaces: spaces.remove(space_id) self._delete_space_cache(space_id) if space_id in self.space_creation_times: del self.space_creation_times[space_id] return True return False def _delete_space_cache(self, space_id: str): """Pulizia della cache FAISS per uno spazio""" for instance_id, rag_instance in self.instances.items(): if space_id in rag_instance.index_manager.index_cache: del rag_instance.index_manager.index_cache[space_id] # ✅ RIMUOVI IL METODO DELETE PUBBLICO se vuoi eliminazione completamente automatica # def delete(self, instance_id: str = None, space_id: str = None): # """Elimina istanza O spazio - UNICO METODO DELETE""" # if instance_id: # # Elimina tutta l'istanza # if self._delete_instance_internal(instance_id): # return f"✅ Istanza {instance_id} eliminata" # else: # return "❌ Istanza non trovata" # # elif space_id: # # Elimina solo uno spazio # if self._delete_space_internal(space_id): # return f"✅ Spazio {space_id} eliminato" # else: # return "❌ Spazio non trovato" # # else: # return "❌ Specificare instance_id O space_id" def get_instance_by_space(self, space_id: str) -> Optional[str]: """Trova l'instance_id dato uno space_id""" for instance_id, spaces in self.instance_spaces.items(): if space_id in spaces: return instance_id return None def get_stats(self) -> Dict: """Restituisce statistiche sul manager incluso cleanup info""" current_time = time.time() old_instances = [] for instance_id, last_used in self.last_used_times.items(): hours_old = (current_time - last_used) / 3600 if hours_old > 12: # ✅ CAMBIATO: 12 ore old_instances.append({ "instance_id": instance_id, "hours_inactive": round(hours_old, 1), "will_be_deleted_soon": hours_old > 11 # ✅ Aggiunto avviso }) return { "total_instances": len(self.instances), "total_spaces": len(self.space_creation_times), "cleanup_policy": "AUTOMATICO dopo 12 ore di inattività", # ✅ Aggiunto info policy "last_cleanup": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.last_cleanup_time)), "old_instances_count": len(old_instances), "old_instances": old_instances } def manual_cleanup(self, max_age_hours: int = 12) -> Dict: # ✅ CAMBIATO: default 12 ore """ Esegue cleanup manuale e restituisce report """ deleted_count = self.cleanup_old_instances(max_age_hours) return { "success": True, "deleted_count": deleted_count, "max_age_hours": max_age_hours, "message": f"Cleanup manuale completato: {deleted_count} elementi eliminati" } # Istanza globale rag_manager = SimpleRAGManager() # ========================================================== # ENDPOINTS # ========================================================== app = FastAPI() # CORS per Vercel o altri frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.post("/initialize") async def initialize(request: Request): """Crea automaticamente istanza e spazio con configurazione variant""" try: data = await request.json() context = data.get("context", "") variant = data.get("variant", "narrativo") # ✅ AGGIUNGI variant if not context: return JSONResponse( status_code=400, content={"error": "Context vuoto"} ) # ✅ CREA ISTANZA CON VARIANT SPECIFICO instance_id = rag_manager.create_instance_with_variant(variant) # ✅ INIZIALIZZA SPAZIO space_id = rag_manager.initialize_space(instance_id, context) return { "success": True, "space_id": space_id, "instance_id": instance_id, # ✅ RESTITUISCI ANCHE instance_id "variant": variant, "message": f"Spazio creato con variant: {variant}" } except Exception as e: return JSONResponse( status_code=400, content={"error": str(e)} ) @app.post("/query") async def query(request: Request): """Query semplice - basta space_id, NO prompt_style""" try: data = await request.json() space_id = data.get("space_id") question = data.get("question", "") tree_name = data.get("tree_name", "") if not space_id: return JSONResponse( status_code=400, content={"error": "space_id mancante"} ) # Cerca l'istanza che contiene questo space_id instance_id = rag_manager.get_instance_by_space(space_id) if not instance_id: return JSONResponse( status_code=400, content={"error": "Spazio non trovato"} ) result = rag_manager.query( instance_id=instance_id, space_id=space_id, question=question, tree_name=tree_name ) return { "success": True, "result": result } except Exception as e: return JSONResponse( status_code=400, content={"error": str(e)} ) # ✅ COMMENTA O RIMUOVI QUESTI ENDPOINT: # @app.delete("/delete") # async def delete_item(request: Request): # """UNICO DELETE - elimina quello che gli passi""" # try: # data = await request.json() # instance_id = data.get("instance_id") # space_id = data.get("space_id") # # # ✅ SE PASSI SPACE_ID, TROVA AUTOMATICAMENTE INSTANCE_ID # if space_id and not instance_id: # instance_id = rag_manager.get_instance_by_space(space_id) # # result = rag_manager.delete(instance_id=instance_id, space_id=space_id) # # return { # "success": True, # "message": result # } # # except Exception as e: # return JSONResponse( # status_code=400, # content={"error": str(e)} # ) @app.get("/cleanup/stats") async def get_cleanup_stats(): """Restituisce statistiche sul cleanup automatico""" try: stats = rag_manager.get_stats() return { "success": True, "cleanup_stats": stats } except Exception as e: return JSONResponse( status_code=400, content={"error": str(e)} ) @app.post("/cleanup/run") async def run_manual_cleanup(request: Request): """Esegue un cleanup manuale""" try: data = await request.json() max_age_hours = data.get("max_age_hours", 24) result = rag_manager.manual_cleanup(max_age_hours) return result except Exception as e: return JSONResponse( status_code=400, content={"error": str(e)} ) @app.get("/debug/instances") async def debug_instances(): """Endpoint di debug per vedere tutte le istanze""" try: instances_info = {} current_time = time.time() for instance_id, last_used in rag_manager.last_used_times.items(): hours_old = (current_time - last_used) / 3600 spaces_count = len(rag_manager.instance_spaces.get(instance_id, [])) instances_info[instance_id] = { "last_used_hours_ago": round(hours_old, 1), "spaces_count": spaces_count, "spaces": rag_manager.instance_spaces.get(instance_id, []), "last_used_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_used)) } return { "success": True, "instances": instances_info, "total_instances": len(rag_manager.instances), "total_spaces": len(rag_manager.space_creation_times) } except Exception as e: return JSONResponse( status_code=400, content={"error": str(e)} ) @app.get("/") async def health_check(): """Health check semplicissimo""" return {"status": "OK", "service": "Simple RAG API"} @app.get("/api/rag") async def get_rag_status(): """Ping di salute""" return {"status": "ok", "message": "Microservizio RAG attivo e funzionante 🚀"}