File size: 5,366 Bytes
96abbd8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | # -*- coding: utf-8 -*-
"""Lightweight persistence for debugging experiences."""
from __future__ import annotations
import hashlib
import json
import threading
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _normalise_error(text: str) -> str:
return (text or "").strip()
@dataclass
class DebugRecord:
"""Single debugging observation stored on disk."""
signature: str
status: str
error_text: str
guidance: str
problem_id: Optional[int]
description: str
metadata: Dict[str, Any]
timestamp: str
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
_PKG_DIR = Path(__file__).resolve().parent
_PROJECT_ROOT = _PKG_DIR.parent.parent
class DebugMemoryStore:
"""Append-only store keyed by error signature."""
DEFAULT_PATH = _PROJECT_ROOT / "memory_storage" / "debug_memory.jsonl"
def __init__(self, path: Optional[str] = None):
self.path = Path(path) if path else self.DEFAULT_PATH
self.path.parent.mkdir(parents=True, exist_ok=True)
if not self.path.exists():
self.path.touch()
self._lock = threading.Lock()
@staticmethod
def _signature_from_error(error_text: str, status: str) -> str:
basis = _normalise_error(error_text)
if not basis:
basis = status or "unknown"
digest = hashlib.sha1(basis.encode("utf-8")).hexdigest()[:12]
return digest
def _append(self, record: DebugRecord) -> None:
payload = json.dumps(record.to_dict(), ensure_ascii=False)
with self._lock, self.path.open("a", encoding="utf-8") as fh:
fh.write(payload + "\n")
def record_execution_feedback(
self,
*,
problem_id: Optional[int],
description: str,
status: str,
error_text: str,
guidance: str,
source: str,
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""Persist execution feedback and return the signature used."""
signature_core = self._signature_from_error(error_text, status)
signature = f"exec:{signature_core}"
record = DebugRecord(
signature=signature,
status=status or "unknown",
error_text=_normalise_error(error_text) or status or "",
guidance=(guidance or "").strip(),
problem_id=problem_id,
description=(description or "").strip(),
metadata={
"source": source,
**(metadata or {}),
},
timestamp=_now_iso(),
)
self._append(record)
return signature
def record_validation_feedback(
self,
*,
problem_id: Optional[int],
issues: Iterable[str],
metadata: Optional[Dict[str, Any]] = None,
source: str = "validation",
) -> List[str]:
"""Persist validation feedback items and return the signatures used."""
signatures: List[str] = []
for issue in issues:
if not issue:
continue
signature_core = self._signature_from_error(issue, "validation")
signature = f"validation:{signature_core}"
record = DebugRecord(
signature=signature,
status="validation",
error_text=_normalise_error(issue),
guidance="",
problem_id=problem_id,
description="",
metadata={
"source": source,
**(metadata or {}),
},
timestamp=_now_iso(),
)
self._append(record)
signatures.append(signature)
return signatures
def retrieve_for_problem(self, problem_id: int, limit: int = 3) -> List[DebugRecord]:
"""Return recent records for a given problem id (best-effort)."""
if problem_id is None:
return []
matches: List[DebugRecord] = []
with self.path.open("r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
payload = json.loads(line)
except json.JSONDecodeError:
continue
if payload.get("problem_id") != problem_id:
continue
matches.append(
DebugRecord(
signature=payload.get("signature", ""),
status=payload.get("status", ""),
error_text=payload.get("error_text", ""),
guidance=payload.get("guidance", ""),
problem_id=payload.get("problem_id"),
description=payload.get("description", ""),
metadata=payload.get("metadata", {}) or {},
timestamp=payload.get("timestamp", ""),
)
)
matches.sort(key=lambda item: item.timestamp, reverse=True)
return matches[:limit] if limit else matches
__all__ = ["DebugMemoryStore", "DebugRecord"]
|