File size: 5,050 Bytes
96abbd8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""Convert debug_memory.jsonl records into a searchable MemoryBank."""

from __future__ import annotations

import argparse
import glob
import hashlib
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional

from .memory_bank import MemoryBank

PROJECT_ROOT = Path(__file__).resolve().parents[2]
LEGACY_ROOT = PROJECT_ROOT.parent / "debate_with_memory"


def _default_inputs() -> List[str]:
    candidates = [
        PROJECT_ROOT / "memory_storage" / "debug_memory.jsonl",
        LEGACY_ROOT / "memory_storage" / "debug_memory.jsonl",
        PROJECT_ROOT / "memory_storage" / "backups" / "*" / "debug_memory.jsonl",
        LEGACY_ROOT / "memory_storage" / "backups" / "*" / "debug_memory.jsonl",
    ]
    return [str(path) for path in candidates]


def _stable_id(signature: str) -> int:
    digest = hashlib.sha1(signature.encode("utf-8")).hexdigest()
    return int(digest[:12], 16)


def _parse_timestamp(ts: Optional[str]) -> datetime:
    if not ts:
        return datetime.min
    try:
        return datetime.fromisoformat(ts)
    except ValueError:
        return datetime.min


def load_debug_records(input_globs: List[str]) -> Dict[str, Dict]:
    records: Dict[str, Dict] = {}
    files: List[str] = []
    for pattern in input_globs:
        files.extend(glob.glob(pattern))
    files = sorted({Path(f) for f in files if Path(f).exists()})
    for file_path in files:
        with file_path.open("r", encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if not line:
                    continue
                try:
                    record = json.loads(line)
                except json.JSONDecodeError:
                    continue
                signature = record.get("signature")
                if not signature:
                    continue
                ts = _parse_timestamp(record.get("timestamp"))
                existing = records.get(signature)
                if existing is None or ts > existing.get("_ts", datetime.min):
                    record["_ts"] = ts
                    records[signature] = record
    return records


def build_debug_memory(records: Dict[str, Dict], output_dir: Path, clear: bool) -> None:
    if clear and output_dir.exists():
        for child in output_dir.iterdir():
            if child.is_file():
                child.unlink()
            else:
                import shutil

                shutil.rmtree(child)
    bank = MemoryBank(memory_dir=str(output_dir))
    added = 0
    for signature, record in records.items():
        description = record.get("description", "Unknown problem")
        error_text = record.get("error_text", "")
        guidance = record.get("guidance", "")
        status = record.get("status", "")
        metadata = {
            "signature": signature,
            "status": status,
            "timestamp": record.get("timestamp"),
            **(record.get("metadata") or {}),
        }
        note_lines = ["# Debug Memory Case", f"Signature: {signature}", f"Status: {status}"]
        if guidance:
            note_lines.append(f"Guidance: {guidance}")
        note_lines.append("---")
        if error_text:
            note_lines.append("Error snippet:\n" + error_text)
        note_lines.append("---")
        note_lines.append(f"Source metadata: {metadata}")
        prompt_desc = (
            f"{description}\n\n## Error Details\n```\n{error_text}\n```\n"
            f"## Guidance\n{guidance or 'N/A'}\n"
        )
        problem_id = record.get("problem_id")
        if problem_id is None:
            problem_id = _stable_id(signature)
        try:
            bank.add_case(
                problem_id=int(problem_id),
                problem_desc=prompt_desc,
                solution_code="\n".join(note_lines),
                objective_value=0.0,
                is_correct=True,
                metadata=metadata,
            )
            added += 1
        except Exception as exc:  # noqa: BLE001
            print(f"Failed to add debug case {signature}: {exc}")
    print(f"✅ Added {added} debug cases to {output_dir}")


def parse_args():
    parser = argparse.ArgumentParser(description="Build debug memory bank from debug_memory.jsonl records")
    parser.add_argument(
        "--input", nargs="*", default=_default_inputs(), help="Input files/globs containing debug records",
    )
    parser.add_argument(
        "--output_dir",
        type=str,
        default=str(PROJECT_ROOT / "debug_case_memory"),
        help="Where to store the constructed memory bank",
    )
    parser.add_argument(
        "--clear",
        action="store_true",
        help="Remove existing output_dir contents before rebuilding",
    )
    return parser.parse_args()


def main():
    args = parse_args()
    records = load_debug_records(args.input)
    print(f"Loaded {len(records)} unique debug signatures")
    build_debug_memory(records, Path(args.output_dir), clear=args.clear)


if __name__ == "__main__":
    main()