grapholab / core /rag.py
Fabio Antonini
feat: enable RAG and Agent on HF Spaces via OpenAI API key
28a01c7
"""
GraphoLab core — RAG (Retrieval-Augmented Generation) + Ollama integration.
Provides:
- check_ollama() check whether Ollama server is reachable
- ollama_list_models() list available models
- set_rag_model() change the active generation model
- rag_load_docs() load synthetic + cached documents at startup
- rag_add_docs() index new uploaded PDF/DOCX files
- rag_remove_doc() remove a document from the index
- rag_doc_list() list indexed documents
- rag_doc_choices() list indexed document names
- rag_retrieve() retrieve top-k chunks for a query
- stream_ollama() stream tokens from Ollama /api/generate
- rag_chat_stream() full RAG chat: retrieve + build prompt + stream tokens
- pipeline_llm_synthesis() LLM synthesis of forensic pipeline results
"""
from __future__ import annotations
import hashlib
import json
import threading
from pathlib import Path
from typing import Generator
import numpy as np
import requests
# ──────────────────────────────────────────────────────────────────────────────
# Configuration
# ──────────────────────────────────────────────────────────────────────────────
OLLAMA_URL = "http://localhost:11434"
OLLAMA_MODEL = "qwen3:4b" # text/reasoning model (agent, RAG, compliance)
VLM_MODEL = "qwen3-vl:8b" # vision-language model (image analysis only)
_embed_model = "nomic-embed-text" # dedicated embedding model — changing it invalidates cache
_ENV_FILE = Path(__file__).parent.parent / ".env"
def _load_model_from_env(key: str, default: str) -> str: # noqa: D401
"""Read a model name from the .env file, falling back to default."""
try:
if _ENV_FILE.exists():
for line in _ENV_FILE.read_text(encoding="utf-8").splitlines():
if line.startswith(f"{key}="):
val = line.split("=", 1)[1].strip()
if val:
return val
except Exception:
pass
return default
_rag_model: str = _load_model_from_env("OLLAMA_MODEL", OLLAMA_MODEL)
_vlm_model: str = _load_model_from_env("VLM_MODEL", VLM_MODEL)
_embed_model = _load_model_from_env("OPENAI_EMBED_MODEL", _embed_model) # may be an OpenAI embed model
# ──────────────────────────────────────────────────────────────────────────────
# In-memory state
# ──────────────────────────────────────────────────────────────────────────────
_rag_chunks: list = []
_rag_indexed_files: set = set()
_rag_ready = False
_rag_lock = threading.Lock()
# Built-in synthetic knowledge base
_RAG_SYNTHETIC_DOCS = [
(
"Analisi della pressione",
"La pressione grafica indica la forza con cui la penna o la matita viene premuta sul foglio. "
"Una pressione forte (tratti profondi, rilevabili anche sul retro del foglio) è associata a "
"carattere deciso, vitalità e a volte aggressività. Una pressione leggera (tratti quasi "
"impercettibili) può indicare sensibilità, adattabilità o, in contesti patologici, stanchezza "
"e astenia. La pressione irregolare — alternanza di tratti forti e deboli nello stesso scritto — "
"può segnalare instabilità emotiva, stati di ansia o condizioni neurologiche. In grafologia "
"forense la pressione è fondamentale per distinguere scritture apposte in condizioni normali "
"da quelle prodotte sotto costrizione fisica o psicologica.",
),
(
"Inclinazione del tratto",
"L'inclinazione della scrittura descrive l'angolo dei tratti verticali delle lettere rispetto "
"alla riga di base. Una scrittura verticale (0°) indica equilibrio e obiettività. "
"L'inclinazione a destra (>15°) è associata a estroversia, impulsività e orientamento verso "
"il futuro. L'inclinazione a sinistra (<−10°) può indicare introversione, tendenza al ripiegamento "
"su se stessi o, in contesti forensi, un tentativo di camuffare la propria calligrafia. "
"L'inclinazione variabile (misto destra/sinistra nello stesso testo) è indicatore di "
"instabilità emotiva. La misurazione forense dell'inclinazione avviene tramite analisi "
"angolare dei tratti ascendenti (h, l, b, f) e discendenti (g, p, q).",
),
(
"Spaziatura grafica",
"La spaziatura riguarda la distanza tra lettere, parole e righe. Spaziatura ampia tra le parole "
"indica bisogno di spazio personale, pensiero indipendente e, talvolta, solitudine. "
"Spaziatura ridotta (parole quasi attaccate) è correlata a socievolezza eccessiva, difficoltà "
"nei confini relazionali e, in casi estremi, pensiero confusionario. La spaziatura irregolare — "
"alternanza di parole distanti e ravvicinate — è un indicatore di disorganizzazione cognitiva "
"o di scrittura non spontanea (es. copiatura o dettatura lenta). In perizie forensi, "
"la spaziatura viene misurata in millimetri su campioni standardizzati.",
),
(
"Margini e layout",
"I margini del foglio riflettono il rapporto dello scrittore con l'ambiente e il contesto "
"sociale. Un margine sinistro ampio e costante indica rispetto delle regole e pianificazione. "
"Un margine sinistro che si allarga progressivamente (testo che 'scivola' verso destra) "
"suggerisce entusiasmo crescente o impulsività. Margine destro ampio è associato a prudenza, "
"timore del futuro e riservatezza. L'assenza di margini (testo che occupa tutto il foglio) "
"indica esuberanza comunicativa o senso di urgenza. In perizia, il margine aiuta a "
"distinguere scritti autentici da trascrizioni o copie, poiché l'autore mantiene "
"inconsciamente le proprie abitudini spaziali.",
),
(
"Firme autentiche",
"Una firma autentica possiede caratteristiche di naturalezza e fluidità del movimento. "
"I tratti sono continui, con accelerazione e decelerazione tipiche del gesto automatizzato. "
"La pressione varia in modo coerente con il ritmo del tratto. I legamenti tra le lettere "
"sono coerenti con il corpus grafico dello scrittore. La firma autentica presenta micro-tremori "
"naturali (diversi dai tremori patologici) e piccole variazioni tra esecuzioni successive, "
"mai perfettamente identiche. In perizia calligrafica, si confrontano almeno 10-15 firme "
"autentiche per stabilire la 'gamma di variazione naturale' prima di esaminare la firma contestata.",
),
(
"Firme false",
"Le firme contraffatte si distinguono per diversi indicatori: velocità di esecuzione "
"innaturalmente lenta (visibile nei 'tocchi' del pennino e nelle esitazioni), tremori "
"artificiali (regolari, non spontanei), ritocchi e correzioni del tratto, interruzioni "
"anomale del gesto. La falsificazione per imitazione diretta (calco o copia visiva) produce "
"una firma con aspetto simile all'originale ma con movimenti invertiti rispetto alla direzione "
"naturale. Il falsario tende a concentrarsi sulla forma complessiva trascurando i dettagli "
"minuti (proporzioni tra lettere, angolo di attacco del tratto, pressione). "
"L'analisi forense utilizza ingrandimenti 10x-40x e, nei casi dubbi, grafometria digitale.",
),
(
"Velocità e ritmo",
"La velocità di scrittura si manifesta nella forma delle lettere (semplificazione dei tratti "
"in scrittura rapida), nell'inclinazione (più marcata ad alta velocità), nelle legature "
"(frequenti in scrittura veloce, assenti in quella lenta). Il ritmo è la regolarità con cui "
"si alternano tensione e distensione nel movimento grafico. Un ritmo regolare indica "
"equilibrio psicofisico. Un ritmo aritmico (alternanza caotica di tratti tesi e distesi) "
"può segnalare stati emotivi alterati, patologie neurologiche o scrittura non spontanea. "
"In perizia forense la velocità è cruciale: una firma depositata 'lentamente' da una persona "
"abitualmente veloce è un forte indicatore di contraffazione.",
),
(
"Datazione documenti",
"La datazione grafica di un documento si basa su elementi intrinseci ed estrinseci. "
"Elementi intrinseci: evoluzione dello stile grafico dell'autore nel tempo (campioni noti "
"datati permettono di costruire una 'curva di evoluzione'), deterioramento della calligrafia "
"legato all'età, variazioni nelle abitudini punteggiatura e abbreviazioni. "
"Elementi estrinseci: tipo di inchiostro (analisi spettroscopica), supporto cartaceo "
"(filigrana, composizione chimica), strumento di scrittura (biro, stilografica, matita). "
"L'analisi dell'inchiostro mediante cromatografia liquida può stabilire se l'inchiostro "
"è compatibile con la data dichiarata. In perizia, la datazione grafica va sempre "
"abbinata ad analisi chimiche per raggiungere un grado di certezza forense.",
),
]
# ──────────────────────────────────────────────────────────────────────────────
# Ollama helpers
# ──────────────────────────────────────────────────────────────────────────────
def check_ollama() -> bool:
"""Return True if Ollama server is reachable."""
try:
requests.get(f"{OLLAMA_URL}/api/tags", timeout=3)
return True
except Exception:
return False
def ollama_list_models() -> list[str]:
"""Return sorted list of model names available in Ollama."""
try:
r = requests.get(f"{OLLAMA_URL}/api/tags", timeout=3)
models = [m["name"] for m in r.json().get("models", [])]
return sorted(models) if models else [OLLAMA_MODEL]
except Exception:
return [OLLAMA_MODEL]
def set_rag_model(model_name: str) -> str:
"""Set the active Ollama generation model and persist to .env. Returns a status message."""
global _rag_model
if not model_name:
return f"✅ Modello attivo: **{_rag_model}**"
_rag_model = model_name
_persist_model_to_env(model_name)
return f"✅ Modello attivo: **{_rag_model}**"
def _persist_model_to_env(model_name: str) -> None:
"""Write OLLAMA_MODEL=<model_name> into the .env file, preserving all other lines."""
try:
if _ENV_FILE.exists():
lines = _ENV_FILE.read_text(encoding="utf-8").splitlines(keepends=True)
else:
lines = []
new_line = f"OLLAMA_MODEL={model_name}\n"
for i, line in enumerate(lines):
if line.startswith("OLLAMA_MODEL="):
lines[i] = new_line
_ENV_FILE.write_text("".join(lines), encoding="utf-8")
return
# Key not found — append it
if lines and not lines[-1].endswith("\n"):
lines.append("\n")
lines.append(new_line)
_ENV_FILE.write_text("".join(lines), encoding="utf-8")
except Exception as e:
print(f"[RAG] Warning: could not persist model to .env: {e}")
def get_vlm_model() -> str:
"""Return the currently active VLM (vision-language) model name."""
return _vlm_model
def set_vlm_model(model_name: str) -> str:
"""Set the active VLM model and persist to .env. Returns a status message."""
global _vlm_model
if not model_name:
return f"✅ Modello VLM attivo: **{_vlm_model}**"
_vlm_model = model_name
_persist_vlm_model_to_env(model_name)
return f"✅ Modello VLM attivo: **{_vlm_model}**"
def _persist_vlm_model_to_env(model_name: str) -> None:
"""Write VLM_MODEL=<model_name> into the .env file, preserving all other lines."""
try:
if _ENV_FILE.exists():
lines = _ENV_FILE.read_text(encoding="utf-8").splitlines(keepends=True)
else:
lines = []
new_line = f"VLM_MODEL={model_name}\n"
for i, line in enumerate(lines):
if line.startswith("VLM_MODEL="):
lines[i] = new_line
_ENV_FILE.write_text("".join(lines), encoding="utf-8")
return
# Key not found — append it
if lines and not lines[-1].endswith("\n"):
lines.append("\n")
lines.append(new_line)
_ENV_FILE.write_text("".join(lines), encoding="utf-8")
except Exception as e:
print(f"[RAG] Warning: could not persist VLM model to .env: {e}")
def get_embed_model() -> str:
"""Return the currently active embedding model name."""
return _embed_model
def set_embed_model(model_name: str) -> str:
"""
Set the active embedding model.
If the provider changes (Ollama ↔ OpenAI) the embedding dimensions change
(768 vs 1536/3072), so all cached .npz files are deleted and the in-memory
index is reset to synthetic-only chunks (they will be re-embedded on next startup).
Returns a status message.
"""
global _embed_model, _rag_chunks, _rag_ready
if not model_name:
return f"✅ Modello embedding attivo: **{_embed_model}**"
from core.providers import is_openai_model
old_is_openai = is_openai_model(_embed_model)
new_is_openai = is_openai_model(model_name)
_embed_model = model_name
env_key = "OPENAI_EMBED_MODEL" if new_is_openai else "EMBED_MODEL"
_persist_env_key(env_key, model_name)
if old_is_openai != new_is_openai:
# Provider changed — dimensions differ, cache is invalid
try:
from backend.config import settings
cache_dir: Path = settings.rag_cache_dir
except Exception:
cache_dir = Path(__file__).parent.parent / "data" / "rag_cache"
deleted = 0
if cache_dir.exists():
for f in cache_dir.glob("*.npz"):
try:
f.unlink()
deleted += 1
except Exception:
pass
# Reset to synthetic-only (embeddings will be regenerated)
with _rag_lock:
synthetic_sources = {s for s, _ in _RAG_SYNTHETIC_DOCS}
_rag_chunks = [c for c in _rag_chunks if c["source"] in synthetic_sources]
for c in _rag_chunks:
c["emb"] = None # force re-embedding with new model
_rag_ready = False
msg = (
f"✅ Modello embedding: **{_embed_model}**\n"
f"⚠️ Cache invalidata ({deleted} file eliminati) — "
"ri-carica i documenti per re-indicizzarli."
)
return msg
return f"✅ Modello embedding attivo: **{_embed_model}**"
def _persist_env_key(env_key: str, value: str) -> None:
"""Update or append *env_key*=*value* in the .env file."""
try:
if _ENV_FILE.exists():
lines = _ENV_FILE.read_text(encoding="utf-8").splitlines(keepends=True)
else:
lines = []
new_line = f"{env_key}={value}\n"
for i, line in enumerate(lines):
if line.startswith(f"{env_key}="):
lines[i] = new_line
_ENV_FILE.write_text("".join(lines), encoding="utf-8")
return
if lines and not lines[-1].endswith("\n"):
lines.append("\n")
lines.append(new_line)
_ENV_FILE.write_text("".join(lines), encoding="utf-8")
except Exception as e:
print(f"[RAG] Warning: could not persist {env_key} to .env: {e}")
def stream_ollama(prompt: str) -> Generator[str, None, None]:
"""Yield response tokens from Ollama one at a time (streaming)."""
with requests.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": _rag_model,
"prompt": prompt,
"stream": True,
"keep_alive": "10m",
"options": {"num_ctx": 4096},
},
stream=True,
timeout=180,
) as r:
for line in r.iter_lines():
if line:
data = json.loads(line)
if not data.get("done"):
yield data.get("response", "")
def _ollama_embed(text: str) -> np.ndarray | None:
try:
r = requests.post(
f"{OLLAMA_URL}/api/embeddings",
json={"model": _embed_model, "prompt": text, "keep_alive": "10m"},
timeout=30,
)
return np.array(r.json()["embedding"], dtype=np.float32)
except Exception:
return None
def _ollama_embed_batch(texts: list[str]) -> list[np.ndarray | None]:
"""Embed a list of texts. Falls back to sequential calls if batch endpoint unavailable."""
try:
r = requests.post(
f"{OLLAMA_URL}/api/embed",
json={"model": _embed_model, "input": texts, "keep_alive": "10m"},
timeout=max(30, len(texts) * 3),
)
r.raise_for_status()
data = r.json()
embeddings = data.get("embeddings") or data.get("embedding")
if embeddings and len(embeddings) == len(texts):
return [np.array(e, dtype=np.float32) for e in embeddings]
except Exception:
pass
return [_ollama_embed(t) for t in texts]
# ── Provider-aware embedding wrappers ─────────────────────────────────────────
def _embed(text: str) -> np.ndarray | None:
"""Embed *text* using the active provider (OpenAI or Ollama)."""
from core.providers import is_openai_model
if is_openai_model(_embed_model):
try:
from core.providers import get_openai_client
client = get_openai_client()
resp = client.embeddings.create(model=_embed_model, input=text)
return np.array(resp.data[0].embedding, dtype=np.float32)
except Exception as e:
print(f"[RAG] OpenAI embed error: {e}")
return None
return _ollama_embed(text)
def _embed_batch(texts: list[str]) -> list[np.ndarray | None]:
"""Embed a list of texts using the active provider (OpenAI or Ollama)."""
from core.providers import is_openai_model
if is_openai_model(_embed_model):
try:
from core.providers import get_openai_client
client = get_openai_client()
resp = client.embeddings.create(model=_embed_model, input=texts)
items = sorted(resp.data, key=lambda x: x.index)
return [np.array(d.embedding, dtype=np.float32) for d in items]
except Exception as e:
print(f"[RAG] OpenAI embed_batch error: {e}")
return [_embed(t) for t in texts]
return _ollama_embed_batch(texts)
def stream_llm(prompt: str) -> Generator[str, None, None]:
"""Yield response tokens using the active LLM provider (OpenAI or Ollama)."""
from core.providers import is_openai_model
if is_openai_model(_rag_model):
from core.providers import get_openai_client
client = get_openai_client()
stream = client.chat.completions.create(
model=_rag_model,
messages=[{"role": "user", "content": prompt}],
stream=True,
temperature=0,
)
for chunk in stream:
delta = chunk.choices[0].delta.content or ""
if delta:
yield delta
else:
yield from stream_ollama(prompt)
# ──────────────────────────────────────────────────────────────────────────────
# Cache helpers
# ──────────────────────────────────────────────────────────────────────────────
def _rag_cache_path(cache_dir: Path, filename: str, file_bytes: bytes) -> Path:
h = hashlib.sha256(file_bytes).hexdigest()[:8]
stem = Path(filename).stem[:40]
safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in stem)
return cache_dir / f"{safe}_{h}.npz"
def _rag_cache_save(cache_path: Path, chunks: list, filename: str) -> None:
cache_path.parent.mkdir(parents=True, exist_ok=True)
good = [c for c in chunks if c["emb"] is not None]
if not good:
return
np.savez_compressed(
str(cache_path),
texts=np.array([c["text"] for c in good], dtype=object),
sources=np.array([c["source"] for c in good], dtype=object),
embs=np.stack([c["emb"] for c in good]),
filename=np.array(filename, dtype=object),
)
def _rag_cache_load(cache_path: Path) -> tuple[list, str]:
"""Returns (chunks, original_filename)."""
data = np.load(str(cache_path), allow_pickle=True)
filename = str(data["filename"])
chunks = [
{"text": str(t), "source": str(s), "emb": e}
for t, s, e in zip(data["texts"], data["sources"], data["embs"])
]
return chunks, filename
# ──────────────────────────────────────────────────────────────────────────────
# Chunking
# ──────────────────────────────────────────────────────────────────────────────
def _chunk_text(text: str, source: str, size: int = 500, overlap: int = 50) -> list:
chunks = []
start = 0
while start < len(text):
end = min(start + size, len(text))
chunk = text[start:end].strip()
if chunk:
chunks.append({"text": chunk, "source": source, "emb": None})
start += size - overlap
return chunks
# ──────────────────────────────────────────────────────────────────────────────
# Document index queries
# ──────────────────────────────────────────────────────────────────────────────
def rag_doc_list() -> list[list]:
"""Return rows [[filename, chunk_count]] (user docs only, not synthetic)."""
synthetic_sources = {s for s, _ in _RAG_SYNTHETIC_DOCS}
counts: dict = {}
for c in _rag_chunks:
src = c["source"]
if src not in synthetic_sources:
counts[src] = counts.get(src, 0) + 1
return [[name, cnt] for name, cnt in sorted(counts.items())]
def rag_doc_choices() -> list[str]:
return [row[0] for row in rag_doc_list()]
# ──────────────────────────────────────────────────────────────────────────────
# Document loading and indexing
# ──────────────────────────────────────────────────────────────────────────────
def _extract_pdf_text(path: Path, ocr_fallback: bool = True) -> str:
"""Extract text from a PDF, optionally falling back to EasyOCR for scanned pages.
Args:
path: Path to the PDF file.
ocr_fallback: If True (default), pages with fewer than 50 extracted characters
are re-processed with EasyOCR. Set to False for structured PDFs
where OCR is unnecessary and wastes memory.
"""
full_text = []
try:
import pypdf
except ImportError:
print(f"[RAG] pypdf not installed — skipping {path.name}")
return ""
try:
reader = pypdf.PdfReader(str(path))
for page_num, page in enumerate(reader.pages):
page_text = page.extract_text() or ""
if len(page_text.strip()) >= 50:
full_text.append(page_text)
elif ocr_fallback:
try:
import fitz
import numpy as np
from core.ocr import get_easyocr
doc = fitz.open(str(path))
fitz_page = doc[page_num]
mat = fitz.Matrix(150 / 72, 150 / 72)
pix = fitz_page.get_pixmap(matrix=mat)
img_arr = np.frombuffer(pix.samples, dtype=np.uint8).reshape(
pix.height, pix.width, pix.n
)
if pix.n == 4:
img_arr = img_arr[:, :, :3]
ocr_result = get_easyocr().readtext(img_arr, detail=0, paragraph=True)
full_text.append(" ".join(ocr_result))
doc.close()
except ImportError:
print(f"[RAG] pymupdf not installed — cannot OCR scanned page {page_num+1}")
except Exception as e:
print(f"[RAG] OCR error on page {page_num+1} of {path.name}: {e}")
# else: ocr_fallback=False — skip pages with little text silently
except Exception as e:
print(f"[RAG] Error reading PDF {path.name}: {e}")
return "\n".join(full_text)
def rag_load_docs(cache_dir: Path) -> None:
"""Load synthetic knowledge + cached user documents at startup (call once in background)."""
global _rag_chunks, _rag_indexed_files, _rag_ready
with _rag_lock:
chunks: list = []
for source, text in _RAG_SYNTHETIC_DOCS:
chunks.extend(_chunk_text(text, source))
cache_dir.mkdir(parents=True, exist_ok=True)
for cache_file in sorted(cache_dir.glob("*.npz")):
try:
cached_chunks, orig_filename = _rag_cache_load(cache_file)
chunks.extend(cached_chunks)
_rag_indexed_files.add(orig_filename)
print(f"[RAG] Loaded from cache: {orig_filename} ({len(cached_chunks)} chunks)")
except Exception as e:
print(f"[RAG] Corrupt cache file {cache_file.name}: {e} — skipping")
_rag_chunks = chunks
_rag_ready = True
print(f"[RAG] Chunks loaded: {len(chunks)} (synthetic + cached)")
to_embed = [c for c in _rag_chunks if c["emb"] is None]
if to_embed:
embeddings = _embed_batch([c["text"] for c in to_embed])
embedded = 0
for chunk, emb in zip(to_embed, embeddings):
if emb is not None:
chunk["emb"] = emb
embedded += 1
print(f"[RAG] Synthetic embedding done: {embedded} chunks")
def rag_add_docs(files: list, cache_dir: Path) -> tuple[str, list]:
"""Index uploaded PDF/DOCX files. Returns (status_message, doc_list)."""
global _rag_indexed_files
if not files:
return "Nessun file caricato.", rag_doc_list()
from core.providers import is_openai_model
if not is_openai_model(_embed_model):
try:
requests.get(f"{OLLAMA_URL}/api/tags", timeout=3)
except Exception:
return (
"❌ Ollama non raggiungibile — i documenti non possono essere indicizzati.\n"
"Avvia `ollama serve` e ricarica.",
rag_doc_list(),
)
lines = []
for f in files:
path = Path(f.name)
suffix = path.suffix.lower()
if path.name in _rag_indexed_files:
lines.append(f"ℹ️ `{path.name}` — già indicizzato, saltato.")
continue
file_bytes = path.read_bytes()
cache_path = _rag_cache_path(cache_dir, path.name, file_bytes)
if cache_path.exists():
try:
cached_chunks, _ = _rag_cache_load(cache_path)
with _rag_lock:
_rag_chunks.extend(cached_chunks)
_rag_indexed_files.add(path.name)
lines.append(f"✅ `{path.name}` — {len(cached_chunks)} chunk caricati dalla cache.")
continue
except Exception:
pass
try:
if suffix == ".pdf":
text = _extract_pdf_text(path)
elif suffix in (".docx", ".doc"):
import docx as _docx
doc_obj = _docx.Document(str(path))
text = "\n".join(p.text for p in doc_obj.paragraphs)
else:
lines.append(f"⚠️ `{path.name}` — formato non supportato (solo PDF/DOCX).")
continue
except Exception as e:
lines.append(f"❌ `{path.name}` — errore: {e}")
continue
if not text.strip():
lines.append(f"⚠️ `{path.name}` — nessun testo estratto.")
continue
chunks = _chunk_text(text, path.name)
embeddings = _embed_batch([c["text"] for c in chunks])
embedded = 0
for chunk, emb in zip(chunks, embeddings):
if emb is not None:
chunk["emb"] = emb
embedded += 1
try:
_rag_cache_save(cache_path, chunks, path.name)
except Exception as e:
print(f"[RAG] Cache write failed for {path.name}: {e}")
with _rag_lock:
_rag_chunks.extend(chunks)
_rag_indexed_files.add(path.name)
lines.append(f"✅ `{path.name}` — {len(chunks)} chunk, {embedded} indicizzati.")
return "\n".join(lines), rag_doc_list()
def rag_remove_doc(filename: str, cache_dir: Path) -> tuple[str, list]:
"""Remove all chunks for a document from memory and delete its cache file."""
global _rag_chunks, _rag_indexed_files
if not filename or not filename.strip():
return "Nessun documento selezionato.", rag_doc_list()
with _rag_lock:
before = len(_rag_chunks)
_rag_chunks = [c for c in _rag_chunks if c["source"] != filename]
removed_chunks = before - len(_rag_chunks)
_rag_indexed_files.discard(filename)
deleted_files = 0
if cache_dir.exists():
for cache_file in cache_dir.glob("*.npz"):
try:
with np.load(str(cache_file), allow_pickle=True) as data:
match = str(data["filename"]) == filename
if match:
cache_file.unlink()
deleted_files += 1
except Exception:
pass
if removed_chunks == 0:
return f"⚠️ `{filename}` non trovato nell'indice.", rag_doc_list()
msg = f"🗑️ `{filename}` rimosso ({removed_chunks} chunk eliminati"
if deleted_files:
msg += ", cache eliminata"
msg += ")."
return msg, rag_doc_list()
# ──────────────────────────────────────────────────────────────────────────────
# Retrieval
# ──────────────────────────────────────────────────────────────────────────────
def rag_retrieve(question: str) -> tuple[list | None, str | None]:
"""Return (results, error_str). results is list of (score, chunk)."""
embedded_chunks = [c for c in _rag_chunks if c["emb"] is not None]
if not embedded_chunks:
total = len(_rag_chunks)
return None, (
f"⏳ Embedding in corso (0/{total} chunk pronti). "
"Riprovare tra qualche secondo — l'indicizzazione procede in background."
)
q_emb = _embed(question)
if q_emb is None:
from core.providers import is_openai_model
provider = "OpenAI" if is_openai_model(_embed_model) else "Ollama"
return None, f"❌ Impossibile generare l'embedding della domanda. {provider} è raggiungibile?"
synthetic_sources = {s for s, _ in _RAG_SYNTHETIC_DOCS}
user_chunks = [c for c in _rag_chunks if c["emb"] is not None and c["source"] not in synthetic_sources]
synth_chunks = [c for c in _rag_chunks if c["emb"] is not None and c["source"] in synthetic_sources]
def _top_k_from(pool, q, k):
if not pool:
return []
embs = np.stack([c["emb"] for c in pool])
q_n = q / (np.linalg.norm(q) + 1e-9)
norms = np.linalg.norm(embs, axis=1, keepdims=True) + 1e-9
scores = (embs / norms) @ q_n
idxs = np.argsort(scores)[::-1][:k]
return [(float(scores[i]), pool[i]) for i in idxs]
user_results = _top_k_from(user_chunks, q_emb, 5)
synth_results = _top_k_from(synth_chunks, q_emb, 2 if user_results else 4)
return user_results + synth_results, None
# ──────────────────────────────────────────────────────────────────────────────
# Chat stream (framework-agnostic)
# ──────────────────────────────────────────────────────────────────────────────
def rag_chat_stream(
message: str,
history: list[dict],
) -> Generator[tuple[str, str | None], None, None]:
"""Core RAG chat logic. Yields (partial_response, sources_footer|None).
The caller (e.g. Gradio wrapper) is responsible for formatting history.
history is a list of {"role": "user"|"assistant", "content": str}.
"""
from core.providers import is_openai_model
if not is_openai_model(_rag_model) and not check_ollama():
yield (
"❌ **Ollama non raggiungibile.**\n\n"
"Avvia il server con:\n```\nollama serve\n```\n"
"e assicurati che il modello sia scaricato:\n"
"```\nollama pull llama3.2\n```",
None,
)
return
if not _rag_ready:
yield "⏳ Indice della knowledge base in costruzione, riprovare tra qualche secondo…", None
return
results, err = rag_retrieve(message)
if err:
yield err, None
return
context = "\n\n".join(f"[{c['source']}]\n{c['text']}" for _, c in results)
recent = history[-12:] if len(history) > 12 else history
conv_text = ""
i = 0
while i < len(recent) - 1:
if recent[i]["role"] == "user" and recent[i + 1]["role"] == "assistant":
u = recent[i]["content"]
a = recent[i + 1]["content"].split("\n\n---\n")[0]
conv_text += f"Utente: {u}\nAssistente: {a}\n\n"
i += 2
else:
i += 1
prompt = (
"Sei un esperto di grafologia forense. Rispondi in italiano, in modo preciso e "
"conciso, basandoti ESCLUSIVAMENTE sui seguenti estratti.\n\n"
f"{context}\n\n"
)
if conv_text:
prompt += f"Conversazione precedente:\n{conv_text}\n"
prompt += f"Domanda: {message}\n\nRisposta:"
sources = list(dict.fromkeys(c["source"] for _, c in results))
sources_footer = f"\n\n---\n*Fonti: {', '.join(sources)}*"
partial = ""
try:
for token in stream_llm(prompt):
partial += token
yield partial, None
except Exception as e:
yield f"❌ Errore nella generazione: {e}", None
return
yield partial, sources_footer
# ──────────────────────────────────────────────────────────────────────────────
# Pipeline LLM synthesis
# ──────────────────────────────────────────────────────────────────────────────
def pipeline_llm_synthesis(
step1_summary: str,
step2_text: str,
step3_summary: str,
step4_report: str,
step5_report: str,
step6_report: str,
) -> str:
"""Call the active LLM to synthesise forensic pipeline results into a narrative report."""
from core.providers import is_openai_model
if not is_openai_model(_rag_model):
try:
requests.get(f"{OLLAMA_URL}/api/tags", timeout=3)
except Exception:
return (
"❌ **Ollama non raggiungibile.** Avvia il server con:\n"
"```\nollama serve\n```"
)
prompt = (
"Sei un perito calligrafo forense esperto. "
"Sulla base delle seguenti analisi tecniche su un documento, "
"fornisci in italiano una valutazione complessiva professionale: "
"evidenzia elementi di interesse forense, coerenze e incoerenze tra i risultati, "
"e suggerisci eventuali ulteriori verifiche.\n\n"
f"=== RILEVAMENTO FIRMA ===\n{step1_summary}\n\n"
f"=== TRASCRIZIONE HTR ===\n{step2_text}\n\n"
f"=== ENTITÀ RICONOSCIUTE (NER) ===\n{step3_summary}\n\n"
f"=== IDENTIFICAZIONE AUTORE ===\n{step4_report}\n\n"
f"=== ANALISI GRAFOLOGICA ===\n{step5_report}\n\n"
f"=== VERIFICA FIRMA ===\n{step6_report}\n\n"
"Valutazione forense integrata:"
)
result = ""
try:
for token in stream_llm(prompt):
result += token
except Exception as e:
return f"❌ Errore nella generazione LLM: {e}"
return result if result else "*(Nessuna risposta dal modello)*"