bbkdevops's picture
download
raw
6.36 kB
"""Evidence-gated self-rule evolution for TinyMind.
The model may propose new operating or training rules, but rules are promoted
only after measurable improvement, purity preservation, and rollback evidence.
"""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from hashlib import sha256
from typing import Any
VALID_RULE_SCOPES = {"data", "training", "inference", "tooling", "retrieval", "evaluation"}
VALID_RULE_ACTIONS = {"rebalance", "mask_loss", "retrieve_first", "constrain_format", "reject", "promote", "compress"}
@dataclass(frozen=True)
class RuleMetricSnapshot:
quality: float
purity: float
grounding: float
instruction: float
latency_ms: float
vram_mb: float
def score(self) -> float:
capability = (self.quality + self.purity + self.grounding + self.instruction) / 4.0
resource = max(self.latency_ms / 1000.0, 0.001) + max(self.vram_mb / 1024.0, 0.001)
return capability / resource
@dataclass(frozen=True)
class SelfRuleCandidate:
name: str
scope: str
trigger: str
action: str
rule_text: str
before: RuleMetricSnapshot
after: RuleMetricSnapshot
generated_by: str = "tinymind-rule-synthesizer"
evidence_paths: tuple[str, ...] = field(default_factory=tuple)
rollback_plan: str = "disable rule id and restore previous policy manifest"
risk_score: float = 0.0
def rule_id(self) -> str:
payload = f"{self.name}|{self.scope}|{self.trigger}|{self.action}|{self.rule_text}"
return sha256(payload.encode("utf-8")).hexdigest()[:16]
def deltas(self) -> dict[str, float]:
return {
"quality": self.after.quality - self.before.quality,
"purity": self.after.purity - self.before.purity,
"grounding": self.after.grounding - self.before.grounding,
"instruction": self.after.instruction - self.before.instruction,
"latency_ms": self.after.latency_ms - self.before.latency_ms,
"vram_mb": self.after.vram_mb - self.before.vram_mb,
"score": self.after.score() - self.before.score(),
}
def to_record(self) -> dict[str, Any]:
return {
"rule_id": self.rule_id(),
**asdict(self),
"before": asdict(self.before),
"after": asdict(self.after),
"deltas": self.deltas(),
}
class RuleEvolutionGovernor:
policy = {
"self_generated_rules_allowed": True,
"direct_weight_or_policy_mutation_without_eval": False,
"must_have_schema_valid_rule": True,
"must_have_holdout_or_probe_evidence": True,
"must_preserve_purity": True,
"must_preserve_grounding": True,
"must_preserve_instruction_following": True,
"rollback_required": True,
"world_best_claim_allowed_from_rule_evolution": False,
}
def __init__(
self,
min_quality_gain: float = 0.01,
min_score_gain: float = 0.0,
max_purity_drop: float = 0.0,
max_grounding_drop: float = 0.0,
max_instruction_drop: float = 0.0,
max_risk_score: float = 0.20,
):
self.min_quality_gain = float(min_quality_gain)
self.min_score_gain = float(min_score_gain)
self.max_purity_drop = float(max_purity_drop)
self.max_grounding_drop = float(max_grounding_drop)
self.max_instruction_drop = float(max_instruction_drop)
self.max_risk_score = float(max_risk_score)
def _schema_error(self, rule: SelfRuleCandidate) -> str | None:
if rule.scope not in VALID_RULE_SCOPES:
return f"invalid_scope:{rule.scope}"
if rule.action not in VALID_RULE_ACTIONS:
return f"invalid_action:{rule.action}"
if not rule.trigger.strip():
return "missing_trigger"
if len(rule.rule_text.strip()) < 24:
return "rule_text_too_short"
if not rule.evidence_paths:
return "missing_evidence_paths"
if not rule.rollback_plan.strip():
return "missing_rollback_plan"
return None
def _block_reason(self, rule: SelfRuleCandidate) -> str | None:
schema_error = self._schema_error(rule)
if schema_error:
return schema_error
deltas = rule.deltas()
if rule.risk_score > self.max_risk_score:
return f"risk_too_high:{rule.risk_score:.6f}"
if deltas["quality"] < self.min_quality_gain:
return f"quality_gain_too_small:{deltas['quality']:.6f}"
if -deltas["purity"] > self.max_purity_drop:
return f"purity_regression:{-deltas['purity']:.6f}"
if -deltas["grounding"] > self.max_grounding_drop:
return f"grounding_regression:{-deltas['grounding']:.6f}"
if -deltas["instruction"] > self.max_instruction_drop:
return f"instruction_regression:{-deltas['instruction']:.6f}"
if deltas["score"] < self.min_score_gain:
return f"efficiency_score_regression:{deltas['score']:.6f}"
return None
def decide(self, rules: list[SelfRuleCandidate]) -> dict[str, Any]:
accepted: list[SelfRuleCandidate] = []
blocked: list[dict[str, str]] = []
for rule in rules:
reason = self._block_reason(rule)
if reason:
blocked.append({"rule_id": rule.rule_id(), "name": rule.name, "reason": reason})
else:
accepted.append(rule)
selected = sorted(accepted, key=lambda rule: rule.deltas()["score"], reverse=True)
return {
"schema_version": "tinymind-rule-evolution-governor-v1",
"policy": dict(self.policy),
"candidate_count": len(rules),
"accepted_count": len(accepted),
"blocked_count": len(blocked),
"accepted_rules": [rule.to_record() for rule in selected],
"blocked_rules": blocked,
"promote_rule_ids": [rule.rule_id() for rule in selected],
"self_evolution_ready": bool(selected),
"world_best_claim_allowed": False,
"notes": [
"Rules are proposed by the system but promoted only after evidence-backed deltas.",
"No rule may mutate model behavior permanently without rollback metadata.",
],
}

Xet Storage Details

Size:
6.36 kB
·
Xet hash:
d5214d01f1a7709d5e04eb7806b43e2e6496599be241f54dc80512920c9baf54

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.