bbkdevops's picture
download
raw
7.22 kB
from __future__ import annotations
from collections import Counter
from datetime import datetime, timezone
import hashlib
import json
from pathlib import Path
import re
from typing import Any
TOKEN_RE = re.compile(r"[\w\u0E00-\u0E7F]{2,}", re.UNICODE)
def _tokens(text: str) -> list[str]:
out: list[str] = []
for token in TOKEN_RE.findall(text):
low = token.lower()
out.append(low)
if low.endswith("ing") and len(low) > 5:
out.append(low[:-3])
if low.endswith("ed") and len(low) > 4:
out.append(low[:-2])
if low.endswith("s") and len(low) > 4:
out.append(low[:-1])
if re.search(r"[\u0E00-\u0E7F]", low):
chars = [ch for ch in low if "\u0E00" <= ch <= "\u0E7F"]
out.extend("".join(chars[i : i + 3]) for i in range(max(0, len(chars) - 2)))
return out
def _read_evidence(path: str | Path) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
p = Path(path)
for i, line in enumerate(p.read_text(encoding="utf-8", errors="replace").splitlines(), start=1):
if not line.strip():
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
text = str(row.get("text") or row.get("content") or "")
if not text.strip():
continue
row_id = str(row.get("id") or row.get("source") or f"row-{i}")
digest = hashlib.sha256(text.encode("utf-8", errors="ignore")).hexdigest()
rows.append({"id": row_id, "text": text, "sha256": digest, "line": i})
return rows
class DeepResearchRLAgent:
"""Deterministic retrieval-reasoning-reward loop for training/eval evidence."""
def __init__(self, top_k: int = 4):
self.top_k = top_k
def retrieve(self, question: str, evidence: list[dict[str, Any]]) -> list[dict[str, Any]]:
q = Counter(_tokens(question))
scored: list[tuple[float, dict[str, Any]]] = []
for row in evidence:
toks = Counter(_tokens(row["text"]))
overlap = sum(min(q[t], toks[t]) for t in q)
density = overlap / max(1, len(q))
rare_bonus = sum(1.0 / max(1, toks[t]) for t in q if toks[t]) / max(1, len(q))
score = density + 0.15 * rare_bonus
if score > 0:
enriched = dict(row)
enriched["retrieval_score"] = round(score, 6)
scored.append((score, enriched))
scored.sort(key=lambda x: (-x[0], x[1]["id"]))
return [row for _, row in scored[: self.top_k]]
def answer(self, question: str, *, evidence_path: str | Path) -> dict[str, Any]:
evidence = _read_evidence(evidence_path)
retrieved = self.retrieve(question, evidence)
plan = [
"decompose_question",
"retrieve_exact_evidence",
"synthesize_only_from_cited_text",
"score_grounding_reward",
]
if not retrieved:
answer = "I do not have enough cited evidence to answer this accurately."
else:
facts = [f"[{row['id']}] {row['text'].strip()}" for row in retrieved]
answer = "Evidence-backed answer:\n" + "\n".join(f"- {fact}" for fact in facts)
reward = self._reward(question, answer, retrieved)
return {
"schema_version": "tinymind-deep-research-rl-sample-v1",
"question": question,
"plan": plan,
"answer": answer,
"citations": [
{
"id": row["id"],
"sha256": row["sha256"],
"line": row["line"],
"retrieval_score": row["retrieval_score"],
}
for row in retrieved
],
"reward": reward,
"claim_gate": {
"unsupported_answer_allowed": False,
"external_internet_claim_allowed": False,
"reason": "This loop answers only from the supplied evidence ledger unless a separate internet ingestion step supplies fresh evidence.",
},
}
def _reward(self, question: str, answer: str, retrieved: list[dict[str, Any]]) -> dict[str, float]:
q_tokens = set(_tokens(question))
evidence_tokens = set()
for row in retrieved:
evidence_tokens.update(_tokens(row["text"]))
coverage = len(q_tokens & evidence_tokens) / max(1, len(q_tokens))
citation_strength = min(1.0, len(retrieved) / 2.0)
hallucination_penalty = 0.0 if retrieved else 0.7
compactness = 1.0 if len(answer) <= 2400 else 2400 / len(answer)
total = max(0.0, 0.45 * coverage + 0.35 * citation_strength + 0.20 * compactness - hallucination_penalty)
return {
"coverage": round(coverage, 6),
"citation_strength": round(citation_strength, 6),
"compactness": round(compactness, 6),
"hallucination_penalty": round(hallucination_penalty, 6),
"total": round(total, 6),
}
def build_deep_research_rl_report(
out_dir: str | Path,
*,
questions: list[str],
evidence_path: str | Path,
top_k: int = 4,
) -> dict[str, Any]:
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
agent = DeepResearchRLAgent(top_k=top_k)
samples = [agent.answer(q, evidence_path=evidence_path) for q in questions]
sft_path = out / "deep_research_rl_sft.jsonl"
with sft_path.open("w", encoding="utf-8", newline="\n") as f:
for sample in samples:
f.write(
json.dumps(
{
"messages": [
{"role": "system", "content": "Use Deep Research RL: retrieve, cite, verify, then answer."},
{"role": "user", "content": sample["question"]},
{"role": "assistant", "content": sample["answer"]},
],
"source": "deep_research_rl",
"reward": sample["reward"],
"citations": sample["citations"],
},
ensure_ascii=False,
sort_keys=True,
)
+ "\n"
)
avg_reward = sum(s["reward"]["total"] for s in samples) / max(1, len(samples))
report = {
"schema_version": "tinymind-deep-research-rl-report-v1",
"created_at": datetime.now(timezone.utc).isoformat(),
"evidence_path": str(evidence_path),
"question_count": len(questions),
"avg_reward": round(avg_reward, 6),
"samples": samples,
"sft_path": str(sft_path),
"claim_gate": {
"deep_research_rl_ready": bool(samples),
"beats_frontier_research_claim_allowed": False,
"reason": "This is a local evidence-backed RL data/eval loop. External comparison is still required.",
},
}
path = out / "deep_research_rl_report.json"
report["json_path"] = str(path)
path.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
return report

Xet Storage Details

Size:
7.22 kB
·
Xet hash:
4a502ead5291737e8c3e4b4b160cb316848d202da9624069c89c46edfa4d0e42

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.