DSN / app /task_a_rag.py
nexusbert's picture
Add agent workflow documentation and refactor user modeling and recommendation services
1c181b2
Raw
History Blame Contribute Delete
4.34 kB
from __future__ import annotations
import json
import logging
import math
import re
from collections import defaultdict
from pathlib import Path
from typing import Any
import numpy as np
logger = logging.getLogger(__name__)
def extract_user_id(persona: str) -> str | None:
for pat in (
r"(?im)^user_id:\s*(\S+)",
r"(?im)^User ID:\s*(\S+)",
r"(?i)\buser_id\s*=\s*(\S+)",
):
m = re.search(pat, persona)
if m:
return m.group(1).strip()
return None
class TaskAReviewRagIndex:
def __init__(self, path: Path) -> None:
self.path = path
self._rows: list[dict[str, Any]] = []
self._mat: np.ndarray | None = None
self._by_user: dict[str, list[int]] = defaultdict(list)
self._loaded = False
@property
def loaded(self) -> bool:
return self._loaded
def load(self) -> None:
if self._loaded:
return
if not self.path.is_file():
logger.warning("Task A RAG file missing — run scripts/build_task_a_review_rag.py or Docker build (%s)", self.path)
self._rows = []
self._mat = None
self._loaded = True
return
embeddings: list[list[float]] = []
with self.path.open(encoding="utf-8", errors="replace") as f:
for line in f:
line = line.strip()
if not line:
continue
row = json.loads(line)
emb = row.get("embedding")
if not emb:
continue
uid = row.get("user_id") or ""
clean = {k: v for k, v in row.items() if k != "embedding"}
idx = len(self._rows)
self._rows.append(clean)
embeddings.append(emb)
if uid:
self._by_user[str(uid)].append(idx)
if not embeddings:
logger.warning("Task A RAG file empty or invalid: %s", self.path)
self._mat = None
self._loaded = True
return
self._mat = np.asarray(embeddings, dtype=np.float32)
norms = np.linalg.norm(self._mat, axis=1, keepdims=True)
norms[norms == 0] = 1.0
self._mat = self._mat / norms
self._loaded = True
logger.info("Task A RAG loaded %d snippets from %s", len(self._rows), self.path)
def _encode_query(self, embedder: Any, text: str) -> np.ndarray:
t = text.replace("\n", " ")[:8000]
vec = embedder.encode([t], convert_to_numpy=True, normalize_embeddings=False)[0]
q = np.asarray(vec, dtype=np.float32)
nq = np.linalg.norm(q)
if nq == 0:
q = np.ones_like(q) / math.sqrt(len(q))
else:
q = q / nq
return q
def retrieve(
self,
persona: str,
product: str,
embedder: Any,
top_k: int,
user_examples_cap: int = 3,
) -> list[dict[str, Any]]:
self.load()
if self._mat is None or not self._rows:
return []
k = max(1, top_k)
uid = extract_user_id(persona)
qtext = f"{persona.strip()}\n{product.strip()}"
q = self._encode_query(embedder, qtext)
scores = self._mat @ q
order = np.argsort(-scores)
picked_idx: list[int] = []
seen: set[int] = set()
if uid and uid in self._by_user:
uidx = self._by_user[uid]
ranked_u = sorted(uidx, key=lambda i: float(scores[i]), reverse=True)
for i in ranked_u[: min(user_examples_cap, k)]:
picked_idx.append(i)
seen.add(i)
for i in order.tolist():
if len(picked_idx) >= k:
break
if i in seen:
continue
picked_idx.append(i)
seen.add(i)
out: list[dict[str, Any]] = []
for i in picked_idx[:k]:
r = self._rows[i]
out.append(
{
"stars": r.get("stars"),
"review_excerpt": r.get("review_excerpt", ""),
"business_context": r.get("business_context", ""),
"same_user_as_persona": bool(uid and r.get("user_id") == uid),
}
)
return out