| import re |
| import numpy as np |
| import nltk |
| from sklearn.metrics.pairwise import cosine_similarity |
| from dotenv import load_dotenv |
| from langchain_huggingface import HuggingFaceEmbeddings |
|
|
| |
| load_dotenv() |
|
|
| class ActivaSemanticSplitter: |
| def __init__(self, model_name="sentence-transformers/all-MiniLM-L6-v2", batch_size=32): |
| self.batch_size = batch_size |
| |
| print("🔄 Inizializzazione HuggingFace Embedding Engine...") |
| |
| |
| |
| try: |
| self.embedding_model = HuggingFaceEmbeddings(model_name=model_name) |
| print("✅ Modello caricato correttamente.") |
| except Exception as e: |
| print(f"❌ Errore caricamento modello: {e}") |
| raise e |
|
|
| |
| try: |
| nltk.data.find('tokenizers/punkt') |
| nltk.data.find('tokenizers/punkt_tab') |
| except LookupError: |
| print("⬇️ Download risorse NLTK...") |
| nltk.download('punkt', quiet=True) |
| nltk.download('punkt_tab', quiet=True) |
|
|
| def _split_sentences(self, text): |
| |
| text = text.strip() |
| try: |
| |
| |
| try: |
| tokenizer = nltk.data.load('tokenizers/punkt/italian.pickle') |
| except: |
| |
| from nltk.tokenize.punkt import PunktSentenceTokenizer |
| tokenizer = PunktSentenceTokenizer() |
|
|
| |
| |
| |
| custom_abbrevs = ['sec', 's', 'prof', 'dott', 'avv', 'pag', 'fig', 'nr', 'art'] |
| for abbr in custom_abbrevs: |
| tokenizer._params.abbrev_types.add(abbr) |
|
|
| sentences = tokenizer.tokenize(text) |
| |
| except ImportError: |
| print("⚠️ NLTK non installato. Fallback su Regex semplice.") |
| sentences = re.split(r'(?<=[.?!])\s+', text) |
| except Exception as e: |
| print(f"⚠️ Errore NLTK ({e}). Fallback su Regex.") |
| sentences = re.split(r'(?<=[.?!])\s+', text) |
| |
| |
| return [s.strip() for s in sentences if len(s.strip()) > 5] |
|
|
| def combine_sentences(self, sentences, buffer_size=1): |
| |
| |
| |
| combined = [] |
| for i in range(len(sentences)): |
| start = max(0, i - buffer_size) |
| end = min(len(sentences), i + 1 + buffer_size) |
| combined_context = " ".join(sentences[start:end]) |
| combined.append(combined_context) |
| return combined |
|
|
| def calculate_cosine_distances(self, sentences): |
| |
| embeddings = [] |
| total = len(sentences) |
| |
| for i in range(0, total, self.batch_size): |
| batch = sentences[i : i + self.batch_size] |
| batch_embeddings = self.embedding_model.embed_documents(batch) |
| embeddings.extend(batch_embeddings) |
|
|
| |
| distances = [] |
| for i in range(len(embeddings) - 1): |
| similarity = cosine_similarity([embeddings[i]], [embeddings[i+1]])[0][0] |
| |
| distance = 1.0 - similarity |
| distances.append(distance) |
| |
| return distances, embeddings |
|
|
| def create_chunks(self, text, percentile_threshold=95): |
| single_sentences = self._split_sentences(text) |
| if not single_sentences: |
| return [], [], 0 |
|
|
| combined_sentences = self.combine_sentences(single_sentences) |
| distances, _ = self.calculate_cosine_distances(combined_sentences) |
| |
| if not distances: |
| |
| return [text], [], 0 |
|
|
| |
| threshold = np.percentile(distances, percentile_threshold) |
| |
| |
| indices_above_thresh = [i for i, x in enumerate(distances) if x > threshold] |
| |
| chunks = [] |
| start_index = 0 |
| breakpoints = indices_above_thresh + [len(single_sentences)] |
|
|
| |
| |
| for i in breakpoints: |
| end_index = i + 1 |
| chunk_text = " ".join(single_sentences[start_index:end_index]) |
| if len(chunk_text) > 20: |
| chunks.append(chunk_text) |
| start_index = end_index |
| |
| return chunks, distances, threshold |