""" GraphoLab core — RAG (Retrieval-Augmented Generation) + Ollama integration. Provides: - check_ollama() check whether Ollama server is reachable - ollama_list_models() list available models - set_rag_model() change the active generation model - rag_load_docs() load synthetic + cached documents at startup - rag_add_docs() index new uploaded PDF/DOCX files - rag_remove_doc() remove a document from the index - rag_doc_list() list indexed documents - rag_doc_choices() list indexed document names - rag_retrieve() retrieve top-k chunks for a query - stream_ollama() stream tokens from Ollama /api/generate - rag_chat_stream() full RAG chat: retrieve + build prompt + stream tokens - pipeline_llm_synthesis() LLM synthesis of forensic pipeline results """ from __future__ import annotations import hashlib import json import threading from pathlib import Path from typing import Generator import numpy as np import requests # ────────────────────────────────────────────────────────────────────────────── # Configuration # ────────────────────────────────────────────────────────────────────────────── OLLAMA_URL = "http://localhost:11434" OLLAMA_MODEL = "qwen3:4b" # text/reasoning model (agent, RAG, compliance) VLM_MODEL = "qwen3-vl:8b" # vision-language model (image analysis only) _embed_model = "nomic-embed-text" # dedicated embedding model — changing it invalidates cache _ENV_FILE = Path(__file__).parent.parent / ".env" def _load_model_from_env(key: str, default: str) -> str: # noqa: D401 """Read a model name from the .env file, falling back to default.""" try: if _ENV_FILE.exists(): for line in _ENV_FILE.read_text(encoding="utf-8").splitlines(): if line.startswith(f"{key}="): val = line.split("=", 1)[1].strip() if val: return val except Exception: pass return default _rag_model: str = _load_model_from_env("OLLAMA_MODEL", OLLAMA_MODEL) _vlm_model: str = _load_model_from_env("VLM_MODEL", VLM_MODEL) _embed_model = _load_model_from_env("OPENAI_EMBED_MODEL", _embed_model) # may be an OpenAI embed model # ────────────────────────────────────────────────────────────────────────────── # In-memory state # ────────────────────────────────────────────────────────────────────────────── _rag_chunks: list = [] _rag_indexed_files: set = set() _rag_ready = False _rag_lock = threading.Lock() # Built-in synthetic knowledge base _RAG_SYNTHETIC_DOCS = [ ( "Analisi della pressione", "La pressione grafica indica la forza con cui la penna o la matita viene premuta sul foglio. " "Una pressione forte (tratti profondi, rilevabili anche sul retro del foglio) è associata a " "carattere deciso, vitalità e a volte aggressività. Una pressione leggera (tratti quasi " "impercettibili) può indicare sensibilità, adattabilità o, in contesti patologici, stanchezza " "e astenia. La pressione irregolare — alternanza di tratti forti e deboli nello stesso scritto — " "può segnalare instabilità emotiva, stati di ansia o condizioni neurologiche. In grafologia " "forense la pressione è fondamentale per distinguere scritture apposte in condizioni normali " "da quelle prodotte sotto costrizione fisica o psicologica.", ), ( "Inclinazione del tratto", "L'inclinazione della scrittura descrive l'angolo dei tratti verticali delle lettere rispetto " "alla riga di base. Una scrittura verticale (0°) indica equilibrio e obiettività. " "L'inclinazione a destra (>15°) è associata a estroversia, impulsività e orientamento verso " "il futuro. L'inclinazione a sinistra (<−10°) può indicare introversione, tendenza al ripiegamento " "su se stessi o, in contesti forensi, un tentativo di camuffare la propria calligrafia. " "L'inclinazione variabile (misto destra/sinistra nello stesso testo) è indicatore di " "instabilità emotiva. La misurazione forense dell'inclinazione avviene tramite analisi " "angolare dei tratti ascendenti (h, l, b, f) e discendenti (g, p, q).", ), ( "Spaziatura grafica", "La spaziatura riguarda la distanza tra lettere, parole e righe. Spaziatura ampia tra le parole " "indica bisogno di spazio personale, pensiero indipendente e, talvolta, solitudine. " "Spaziatura ridotta (parole quasi attaccate) è correlata a socievolezza eccessiva, difficoltà " "nei confini relazionali e, in casi estremi, pensiero confusionario. La spaziatura irregolare — " "alternanza di parole distanti e ravvicinate — è un indicatore di disorganizzazione cognitiva " "o di scrittura non spontanea (es. copiatura o dettatura lenta). In perizie forensi, " "la spaziatura viene misurata in millimetri su campioni standardizzati.", ), ( "Margini e layout", "I margini del foglio riflettono il rapporto dello scrittore con l'ambiente e il contesto " "sociale. Un margine sinistro ampio e costante indica rispetto delle regole e pianificazione. " "Un margine sinistro che si allarga progressivamente (testo che 'scivola' verso destra) " "suggerisce entusiasmo crescente o impulsività. Margine destro ampio è associato a prudenza, " "timore del futuro e riservatezza. L'assenza di margini (testo che occupa tutto il foglio) " "indica esuberanza comunicativa o senso di urgenza. In perizia, il margine aiuta a " "distinguere scritti autentici da trascrizioni o copie, poiché l'autore mantiene " "inconsciamente le proprie abitudini spaziali.", ), ( "Firme autentiche", "Una firma autentica possiede caratteristiche di naturalezza e fluidità del movimento. " "I tratti sono continui, con accelerazione e decelerazione tipiche del gesto automatizzato. " "La pressione varia in modo coerente con il ritmo del tratto. I legamenti tra le lettere " "sono coerenti con il corpus grafico dello scrittore. La firma autentica presenta micro-tremori " "naturali (diversi dai tremori patologici) e piccole variazioni tra esecuzioni successive, " "mai perfettamente identiche. In perizia calligrafica, si confrontano almeno 10-15 firme " "autentiche per stabilire la 'gamma di variazione naturale' prima di esaminare la firma contestata.", ), ( "Firme false", "Le firme contraffatte si distinguono per diversi indicatori: velocità di esecuzione " "innaturalmente lenta (visibile nei 'tocchi' del pennino e nelle esitazioni), tremori " "artificiali (regolari, non spontanei), ritocchi e correzioni del tratto, interruzioni " "anomale del gesto. La falsificazione per imitazione diretta (calco o copia visiva) produce " "una firma con aspetto simile all'originale ma con movimenti invertiti rispetto alla direzione " "naturale. Il falsario tende a concentrarsi sulla forma complessiva trascurando i dettagli " "minuti (proporzioni tra lettere, angolo di attacco del tratto, pressione). " "L'analisi forense utilizza ingrandimenti 10x-40x e, nei casi dubbi, grafometria digitale.", ), ( "Velocità e ritmo", "La velocità di scrittura si manifesta nella forma delle lettere (semplificazione dei tratti " "in scrittura rapida), nell'inclinazione (più marcata ad alta velocità), nelle legature " "(frequenti in scrittura veloce, assenti in quella lenta). Il ritmo è la regolarità con cui " "si alternano tensione e distensione nel movimento grafico. Un ritmo regolare indica " "equilibrio psicofisico. Un ritmo aritmico (alternanza caotica di tratti tesi e distesi) " "può segnalare stati emotivi alterati, patologie neurologiche o scrittura non spontanea. " "In perizia forense la velocità è cruciale: una firma depositata 'lentamente' da una persona " "abitualmente veloce è un forte indicatore di contraffazione.", ), ( "Datazione documenti", "La datazione grafica di un documento si basa su elementi intrinseci ed estrinseci. " "Elementi intrinseci: evoluzione dello stile grafico dell'autore nel tempo (campioni noti " "datati permettono di costruire una 'curva di evoluzione'), deterioramento della calligrafia " "legato all'età, variazioni nelle abitudini punteggiatura e abbreviazioni. " "Elementi estrinseci: tipo di inchiostro (analisi spettroscopica), supporto cartaceo " "(filigrana, composizione chimica), strumento di scrittura (biro, stilografica, matita). " "L'analisi dell'inchiostro mediante cromatografia liquida può stabilire se l'inchiostro " "è compatibile con la data dichiarata. In perizia, la datazione grafica va sempre " "abbinata ad analisi chimiche per raggiungere un grado di certezza forense.", ), ] # ────────────────────────────────────────────────────────────────────────────── # Ollama helpers # ────────────────────────────────────────────────────────────────────────────── def check_ollama() -> bool: """Return True if Ollama server is reachable.""" try: requests.get(f"{OLLAMA_URL}/api/tags", timeout=3) return True except Exception: return False def ollama_list_models() -> list[str]: """Return sorted list of model names available in Ollama.""" try: r = requests.get(f"{OLLAMA_URL}/api/tags", timeout=3) models = [m["name"] for m in r.json().get("models", [])] return sorted(models) if models else [OLLAMA_MODEL] except Exception: return [OLLAMA_MODEL] def set_rag_model(model_name: str) -> str: """Set the active Ollama generation model and persist to .env. Returns a status message.""" global _rag_model if not model_name: return f"✅ Modello attivo: **{_rag_model}**" _rag_model = model_name _persist_model_to_env(model_name) return f"✅ Modello attivo: **{_rag_model}**" def _persist_model_to_env(model_name: str) -> None: """Write OLLAMA_MODEL= into the .env file, preserving all other lines.""" try: if _ENV_FILE.exists(): lines = _ENV_FILE.read_text(encoding="utf-8").splitlines(keepends=True) else: lines = [] new_line = f"OLLAMA_MODEL={model_name}\n" for i, line in enumerate(lines): if line.startswith("OLLAMA_MODEL="): lines[i] = new_line _ENV_FILE.write_text("".join(lines), encoding="utf-8") return # Key not found — append it if lines and not lines[-1].endswith("\n"): lines.append("\n") lines.append(new_line) _ENV_FILE.write_text("".join(lines), encoding="utf-8") except Exception as e: print(f"[RAG] Warning: could not persist model to .env: {e}") def get_vlm_model() -> str: """Return the currently active VLM (vision-language) model name.""" return _vlm_model def set_vlm_model(model_name: str) -> str: """Set the active VLM model and persist to .env. Returns a status message.""" global _vlm_model if not model_name: return f"✅ Modello VLM attivo: **{_vlm_model}**" _vlm_model = model_name _persist_vlm_model_to_env(model_name) return f"✅ Modello VLM attivo: **{_vlm_model}**" def _persist_vlm_model_to_env(model_name: str) -> None: """Write VLM_MODEL= into the .env file, preserving all other lines.""" try: if _ENV_FILE.exists(): lines = _ENV_FILE.read_text(encoding="utf-8").splitlines(keepends=True) else: lines = [] new_line = f"VLM_MODEL={model_name}\n" for i, line in enumerate(lines): if line.startswith("VLM_MODEL="): lines[i] = new_line _ENV_FILE.write_text("".join(lines), encoding="utf-8") return # Key not found — append it if lines and not lines[-1].endswith("\n"): lines.append("\n") lines.append(new_line) _ENV_FILE.write_text("".join(lines), encoding="utf-8") except Exception as e: print(f"[RAG] Warning: could not persist VLM model to .env: {e}") def get_embed_model() -> str: """Return the currently active embedding model name.""" return _embed_model def set_embed_model(model_name: str) -> str: """ Set the active embedding model. If the provider changes (Ollama ↔ OpenAI) the embedding dimensions change (768 vs 1536/3072), so all cached .npz files are deleted and the in-memory index is reset to synthetic-only chunks (they will be re-embedded on next startup). Returns a status message. """ global _embed_model, _rag_chunks, _rag_ready if not model_name: return f"✅ Modello embedding attivo: **{_embed_model}**" from core.providers import is_openai_model old_is_openai = is_openai_model(_embed_model) new_is_openai = is_openai_model(model_name) _embed_model = model_name env_key = "OPENAI_EMBED_MODEL" if new_is_openai else "EMBED_MODEL" _persist_env_key(env_key, model_name) if old_is_openai != new_is_openai: # Provider changed — dimensions differ, cache is invalid try: from backend.config import settings cache_dir: Path = settings.rag_cache_dir except Exception: cache_dir = Path(__file__).parent.parent / "data" / "rag_cache" deleted = 0 if cache_dir.exists(): for f in cache_dir.glob("*.npz"): try: f.unlink() deleted += 1 except Exception: pass # Reset to synthetic-only (embeddings will be regenerated) with _rag_lock: synthetic_sources = {s for s, _ in _RAG_SYNTHETIC_DOCS} _rag_chunks = [c for c in _rag_chunks if c["source"] in synthetic_sources] for c in _rag_chunks: c["emb"] = None # force re-embedding with new model _rag_ready = False msg = ( f"✅ Modello embedding: **{_embed_model}**\n" f"⚠️ Cache invalidata ({deleted} file eliminati) — " "ri-carica i documenti per re-indicizzarli." ) return msg return f"✅ Modello embedding attivo: **{_embed_model}**" def _persist_env_key(env_key: str, value: str) -> None: """Update or append *env_key*=*value* in the .env file.""" try: if _ENV_FILE.exists(): lines = _ENV_FILE.read_text(encoding="utf-8").splitlines(keepends=True) else: lines = [] new_line = f"{env_key}={value}\n" for i, line in enumerate(lines): if line.startswith(f"{env_key}="): lines[i] = new_line _ENV_FILE.write_text("".join(lines), encoding="utf-8") return if lines and not lines[-1].endswith("\n"): lines.append("\n") lines.append(new_line) _ENV_FILE.write_text("".join(lines), encoding="utf-8") except Exception as e: print(f"[RAG] Warning: could not persist {env_key} to .env: {e}") def stream_ollama(prompt: str) -> Generator[str, None, None]: """Yield response tokens from Ollama one at a time (streaming).""" with requests.post( f"{OLLAMA_URL}/api/generate", json={ "model": _rag_model, "prompt": prompt, "stream": True, "keep_alive": "10m", "options": {"num_ctx": 4096}, }, stream=True, timeout=180, ) as r: for line in r.iter_lines(): if line: data = json.loads(line) if not data.get("done"): yield data.get("response", "") def _ollama_embed(text: str) -> np.ndarray | None: try: r = requests.post( f"{OLLAMA_URL}/api/embeddings", json={"model": _embed_model, "prompt": text, "keep_alive": "10m"}, timeout=30, ) return np.array(r.json()["embedding"], dtype=np.float32) except Exception: return None def _ollama_embed_batch(texts: list[str]) -> list[np.ndarray | None]: """Embed a list of texts. Falls back to sequential calls if batch endpoint unavailable.""" try: r = requests.post( f"{OLLAMA_URL}/api/embed", json={"model": _embed_model, "input": texts, "keep_alive": "10m"}, timeout=max(30, len(texts) * 3), ) r.raise_for_status() data = r.json() embeddings = data.get("embeddings") or data.get("embedding") if embeddings and len(embeddings) == len(texts): return [np.array(e, dtype=np.float32) for e in embeddings] except Exception: pass return [_ollama_embed(t) for t in texts] # ── Provider-aware embedding wrappers ───────────────────────────────────────── def _embed(text: str) -> np.ndarray | None: """Embed *text* using the active provider (OpenAI or Ollama).""" from core.providers import is_openai_model if is_openai_model(_embed_model): try: from core.providers import get_openai_client client = get_openai_client() resp = client.embeddings.create(model=_embed_model, input=text) return np.array(resp.data[0].embedding, dtype=np.float32) except Exception as e: print(f"[RAG] OpenAI embed error: {e}") return None return _ollama_embed(text) def _embed_batch(texts: list[str]) -> list[np.ndarray | None]: """Embed a list of texts using the active provider (OpenAI or Ollama).""" from core.providers import is_openai_model if is_openai_model(_embed_model): try: from core.providers import get_openai_client client = get_openai_client() resp = client.embeddings.create(model=_embed_model, input=texts) items = sorted(resp.data, key=lambda x: x.index) return [np.array(d.embedding, dtype=np.float32) for d in items] except Exception as e: print(f"[RAG] OpenAI embed_batch error: {e}") return [_embed(t) for t in texts] return _ollama_embed_batch(texts) def stream_llm(prompt: str) -> Generator[str, None, None]: """Yield response tokens using the active LLM provider (OpenAI or Ollama).""" from core.providers import is_openai_model if is_openai_model(_rag_model): from core.providers import get_openai_client client = get_openai_client() stream = client.chat.completions.create( model=_rag_model, messages=[{"role": "user", "content": prompt}], stream=True, temperature=0, ) for chunk in stream: delta = chunk.choices[0].delta.content or "" if delta: yield delta else: yield from stream_ollama(prompt) # ────────────────────────────────────────────────────────────────────────────── # Cache helpers # ────────────────────────────────────────────────────────────────────────────── def _rag_cache_path(cache_dir: Path, filename: str, file_bytes: bytes) -> Path: h = hashlib.sha256(file_bytes).hexdigest()[:8] stem = Path(filename).stem[:40] safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in stem) return cache_dir / f"{safe}_{h}.npz" def _rag_cache_save(cache_path: Path, chunks: list, filename: str) -> None: cache_path.parent.mkdir(parents=True, exist_ok=True) good = [c for c in chunks if c["emb"] is not None] if not good: return np.savez_compressed( str(cache_path), texts=np.array([c["text"] for c in good], dtype=object), sources=np.array([c["source"] for c in good], dtype=object), embs=np.stack([c["emb"] for c in good]), filename=np.array(filename, dtype=object), ) def _rag_cache_load(cache_path: Path) -> tuple[list, str]: """Returns (chunks, original_filename).""" data = np.load(str(cache_path), allow_pickle=True) filename = str(data["filename"]) chunks = [ {"text": str(t), "source": str(s), "emb": e} for t, s, e in zip(data["texts"], data["sources"], data["embs"]) ] return chunks, filename # ────────────────────────────────────────────────────────────────────────────── # Chunking # ────────────────────────────────────────────────────────────────────────────── def _chunk_text(text: str, source: str, size: int = 500, overlap: int = 50) -> list: chunks = [] start = 0 while start < len(text): end = min(start + size, len(text)) chunk = text[start:end].strip() if chunk: chunks.append({"text": chunk, "source": source, "emb": None}) start += size - overlap return chunks # ────────────────────────────────────────────────────────────────────────────── # Document index queries # ────────────────────────────────────────────────────────────────────────────── def rag_doc_list() -> list[list]: """Return rows [[filename, chunk_count]] (user docs only, not synthetic).""" synthetic_sources = {s for s, _ in _RAG_SYNTHETIC_DOCS} counts: dict = {} for c in _rag_chunks: src = c["source"] if src not in synthetic_sources: counts[src] = counts.get(src, 0) + 1 return [[name, cnt] for name, cnt in sorted(counts.items())] def rag_doc_choices() -> list[str]: return [row[0] for row in rag_doc_list()] # ────────────────────────────────────────────────────────────────────────────── # Document loading and indexing # ────────────────────────────────────────────────────────────────────────────── def _extract_pdf_text(path: Path, ocr_fallback: bool = True) -> str: """Extract text from a PDF, optionally falling back to EasyOCR for scanned pages. Args: path: Path to the PDF file. ocr_fallback: If True (default), pages with fewer than 50 extracted characters are re-processed with EasyOCR. Set to False for structured PDFs where OCR is unnecessary and wastes memory. """ full_text = [] try: import pypdf except ImportError: print(f"[RAG] pypdf not installed — skipping {path.name}") return "" try: reader = pypdf.PdfReader(str(path)) for page_num, page in enumerate(reader.pages): page_text = page.extract_text() or "" if len(page_text.strip()) >= 50: full_text.append(page_text) elif ocr_fallback: try: import fitz import numpy as np from core.ocr import get_easyocr doc = fitz.open(str(path)) fitz_page = doc[page_num] mat = fitz.Matrix(150 / 72, 150 / 72) pix = fitz_page.get_pixmap(matrix=mat) img_arr = np.frombuffer(pix.samples, dtype=np.uint8).reshape( pix.height, pix.width, pix.n ) if pix.n == 4: img_arr = img_arr[:, :, :3] ocr_result = get_easyocr().readtext(img_arr, detail=0, paragraph=True) full_text.append(" ".join(ocr_result)) doc.close() except ImportError: print(f"[RAG] pymupdf not installed — cannot OCR scanned page {page_num+1}") except Exception as e: print(f"[RAG] OCR error on page {page_num+1} of {path.name}: {e}") # else: ocr_fallback=False — skip pages with little text silently except Exception as e: print(f"[RAG] Error reading PDF {path.name}: {e}") return "\n".join(full_text) def rag_load_docs(cache_dir: Path) -> None: """Load synthetic knowledge + cached user documents at startup (call once in background).""" global _rag_chunks, _rag_indexed_files, _rag_ready with _rag_lock: chunks: list = [] for source, text in _RAG_SYNTHETIC_DOCS: chunks.extend(_chunk_text(text, source)) cache_dir.mkdir(parents=True, exist_ok=True) for cache_file in sorted(cache_dir.glob("*.npz")): try: cached_chunks, orig_filename = _rag_cache_load(cache_file) chunks.extend(cached_chunks) _rag_indexed_files.add(orig_filename) print(f"[RAG] Loaded from cache: {orig_filename} ({len(cached_chunks)} chunks)") except Exception as e: print(f"[RAG] Corrupt cache file {cache_file.name}: {e} — skipping") _rag_chunks = chunks _rag_ready = True print(f"[RAG] Chunks loaded: {len(chunks)} (synthetic + cached)") to_embed = [c for c in _rag_chunks if c["emb"] is None] if to_embed: embeddings = _embed_batch([c["text"] for c in to_embed]) embedded = 0 for chunk, emb in zip(to_embed, embeddings): if emb is not None: chunk["emb"] = emb embedded += 1 print(f"[RAG] Synthetic embedding done: {embedded} chunks") def rag_add_docs(files: list, cache_dir: Path) -> tuple[str, list]: """Index uploaded PDF/DOCX files. Returns (status_message, doc_list).""" global _rag_indexed_files if not files: return "Nessun file caricato.", rag_doc_list() from core.providers import is_openai_model if not is_openai_model(_embed_model): try: requests.get(f"{OLLAMA_URL}/api/tags", timeout=3) except Exception: return ( "❌ Ollama non raggiungibile — i documenti non possono essere indicizzati.\n" "Avvia `ollama serve` e ricarica.", rag_doc_list(), ) lines = [] for f in files: path = Path(f.name) suffix = path.suffix.lower() if path.name in _rag_indexed_files: lines.append(f"ℹ️ `{path.name}` — già indicizzato, saltato.") continue file_bytes = path.read_bytes() cache_path = _rag_cache_path(cache_dir, path.name, file_bytes) if cache_path.exists(): try: cached_chunks, _ = _rag_cache_load(cache_path) with _rag_lock: _rag_chunks.extend(cached_chunks) _rag_indexed_files.add(path.name) lines.append(f"✅ `{path.name}` — {len(cached_chunks)} chunk caricati dalla cache.") continue except Exception: pass try: if suffix == ".pdf": text = _extract_pdf_text(path) elif suffix in (".docx", ".doc"): import docx as _docx doc_obj = _docx.Document(str(path)) text = "\n".join(p.text for p in doc_obj.paragraphs) else: lines.append(f"⚠️ `{path.name}` — formato non supportato (solo PDF/DOCX).") continue except Exception as e: lines.append(f"❌ `{path.name}` — errore: {e}") continue if not text.strip(): lines.append(f"⚠️ `{path.name}` — nessun testo estratto.") continue chunks = _chunk_text(text, path.name) embeddings = _embed_batch([c["text"] for c in chunks]) embedded = 0 for chunk, emb in zip(chunks, embeddings): if emb is not None: chunk["emb"] = emb embedded += 1 try: _rag_cache_save(cache_path, chunks, path.name) except Exception as e: print(f"[RAG] Cache write failed for {path.name}: {e}") with _rag_lock: _rag_chunks.extend(chunks) _rag_indexed_files.add(path.name) lines.append(f"✅ `{path.name}` — {len(chunks)} chunk, {embedded} indicizzati.") return "\n".join(lines), rag_doc_list() def rag_remove_doc(filename: str, cache_dir: Path) -> tuple[str, list]: """Remove all chunks for a document from memory and delete its cache file.""" global _rag_chunks, _rag_indexed_files if not filename or not filename.strip(): return "Nessun documento selezionato.", rag_doc_list() with _rag_lock: before = len(_rag_chunks) _rag_chunks = [c for c in _rag_chunks if c["source"] != filename] removed_chunks = before - len(_rag_chunks) _rag_indexed_files.discard(filename) deleted_files = 0 if cache_dir.exists(): for cache_file in cache_dir.glob("*.npz"): try: with np.load(str(cache_file), allow_pickle=True) as data: match = str(data["filename"]) == filename if match: cache_file.unlink() deleted_files += 1 except Exception: pass if removed_chunks == 0: return f"⚠️ `{filename}` non trovato nell'indice.", rag_doc_list() msg = f"🗑️ `{filename}` rimosso ({removed_chunks} chunk eliminati" if deleted_files: msg += ", cache eliminata" msg += ")." return msg, rag_doc_list() # ────────────────────────────────────────────────────────────────────────────── # Retrieval # ────────────────────────────────────────────────────────────────────────────── def rag_retrieve(question: str) -> tuple[list | None, str | None]: """Return (results, error_str). results is list of (score, chunk).""" embedded_chunks = [c for c in _rag_chunks if c["emb"] is not None] if not embedded_chunks: total = len(_rag_chunks) return None, ( f"⏳ Embedding in corso (0/{total} chunk pronti). " "Riprovare tra qualche secondo — l'indicizzazione procede in background." ) q_emb = _embed(question) if q_emb is None: from core.providers import is_openai_model provider = "OpenAI" if is_openai_model(_embed_model) else "Ollama" return None, f"❌ Impossibile generare l'embedding della domanda. {provider} è raggiungibile?" synthetic_sources = {s for s, _ in _RAG_SYNTHETIC_DOCS} user_chunks = [c for c in _rag_chunks if c["emb"] is not None and c["source"] not in synthetic_sources] synth_chunks = [c for c in _rag_chunks if c["emb"] is not None and c["source"] in synthetic_sources] def _top_k_from(pool, q, k): if not pool: return [] embs = np.stack([c["emb"] for c in pool]) q_n = q / (np.linalg.norm(q) + 1e-9) norms = np.linalg.norm(embs, axis=1, keepdims=True) + 1e-9 scores = (embs / norms) @ q_n idxs = np.argsort(scores)[::-1][:k] return [(float(scores[i]), pool[i]) for i in idxs] user_results = _top_k_from(user_chunks, q_emb, 5) synth_results = _top_k_from(synth_chunks, q_emb, 2 if user_results else 4) return user_results + synth_results, None # ────────────────────────────────────────────────────────────────────────────── # Chat stream (framework-agnostic) # ────────────────────────────────────────────────────────────────────────────── def rag_chat_stream( message: str, history: list[dict], ) -> Generator[tuple[str, str | None], None, None]: """Core RAG chat logic. Yields (partial_response, sources_footer|None). The caller (e.g. Gradio wrapper) is responsible for formatting history. history is a list of {"role": "user"|"assistant", "content": str}. """ from core.providers import is_openai_model if not is_openai_model(_rag_model) and not check_ollama(): yield ( "❌ **Ollama non raggiungibile.**\n\n" "Avvia il server con:\n```\nollama serve\n```\n" "e assicurati che il modello sia scaricato:\n" "```\nollama pull llama3.2\n```", None, ) return if not _rag_ready: yield "⏳ Indice della knowledge base in costruzione, riprovare tra qualche secondo…", None return results, err = rag_retrieve(message) if err: yield err, None return context = "\n\n".join(f"[{c['source']}]\n{c['text']}" for _, c in results) recent = history[-12:] if len(history) > 12 else history conv_text = "" i = 0 while i < len(recent) - 1: if recent[i]["role"] == "user" and recent[i + 1]["role"] == "assistant": u = recent[i]["content"] a = recent[i + 1]["content"].split("\n\n---\n")[0] conv_text += f"Utente: {u}\nAssistente: {a}\n\n" i += 2 else: i += 1 prompt = ( "Sei un esperto di grafologia forense. Rispondi in italiano, in modo preciso e " "conciso, basandoti ESCLUSIVAMENTE sui seguenti estratti.\n\n" f"{context}\n\n" ) if conv_text: prompt += f"Conversazione precedente:\n{conv_text}\n" prompt += f"Domanda: {message}\n\nRisposta:" sources = list(dict.fromkeys(c["source"] for _, c in results)) sources_footer = f"\n\n---\n*Fonti: {', '.join(sources)}*" partial = "" try: for token in stream_llm(prompt): partial += token yield partial, None except Exception as e: yield f"❌ Errore nella generazione: {e}", None return yield partial, sources_footer # ────────────────────────────────────────────────────────────────────────────── # Pipeline LLM synthesis # ────────────────────────────────────────────────────────────────────────────── def pipeline_llm_synthesis( step1_summary: str, step2_text: str, step3_summary: str, step4_report: str, step5_report: str, step6_report: str, ) -> str: """Call the active LLM to synthesise forensic pipeline results into a narrative report.""" from core.providers import is_openai_model if not is_openai_model(_rag_model): try: requests.get(f"{OLLAMA_URL}/api/tags", timeout=3) except Exception: return ( "❌ **Ollama non raggiungibile.** Avvia il server con:\n" "```\nollama serve\n```" ) prompt = ( "Sei un perito calligrafo forense esperto. " "Sulla base delle seguenti analisi tecniche su un documento, " "fornisci in italiano una valutazione complessiva professionale: " "evidenzia elementi di interesse forense, coerenze e incoerenze tra i risultati, " "e suggerisci eventuali ulteriori verifiche.\n\n" f"=== RILEVAMENTO FIRMA ===\n{step1_summary}\n\n" f"=== TRASCRIZIONE HTR ===\n{step2_text}\n\n" f"=== ENTITÀ RICONOSCIUTE (NER) ===\n{step3_summary}\n\n" f"=== IDENTIFICAZIONE AUTORE ===\n{step4_report}\n\n" f"=== ANALISI GRAFOLOGICA ===\n{step5_report}\n\n" f"=== VERIFICA FIRMA ===\n{step6_report}\n\n" "Valutazione forense integrata:" ) result = "" try: for token in stream_llm(prompt): result += token except Exception as e: return f"❌ Errore nella generazione LLM: {e}" return result if result else "*(Nessuna risposta dal modello)*"