from __future__ import annotations import json import os import uuid from datetime import datetime from pathlib import Path from config import settings _memory_file: Path | None = None def _get_memory_file() -> Path: global _memory_file if _memory_file is None: os.makedirs(settings.memory_dir, exist_ok=True) _memory_file = Path(settings.memory_dir) / "problems.jsonl" return _memory_file def save_record( input_type: str, extracted_text: str, parsed_problem: dict, topic: str, retrieved_chunks: list[str], solution: str, solution_steps: list[str], verification: dict, explanation: str, user_feedback: str = "", user_comment: str = "", ) -> str: record_id = str(uuid.uuid4()) record = { "id": record_id, "timestamp": datetime.now().isoformat(), "input_type": input_type, "extracted_text": extracted_text, "parsed_problem": parsed_problem, "topic": topic, "retrieved_chunk_sources": retrieved_chunks, "solution": solution, "solution_steps": solution_steps, "verification": verification, "explanation": explanation, "user_feedback": user_feedback, "user_comment": user_comment, } path = _get_memory_file() with open(path, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") return record_id def get_all_records() -> list[dict]: path = _get_memory_file() if not path.exists(): return [] records = [] with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: try: records.append(json.loads(line)) except json.JSONDecodeError: continue return records def update_feedback(record_id: str, feedback: str, comment: str = "") -> bool: path = _get_memory_file() if not path.exists(): return False records = get_all_records() updated = False with open(path, "w", encoding="utf-8") as f: for record in records: if record.get("id") == record_id: record["user_feedback"] = feedback record["user_comment"] = comment updated = True f.write(json.dumps(record, ensure_ascii=False) + "\n") return updated