Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import os | |
| import uuid | |
| from datetime import datetime | |
| from pathlib import Path | |
| from config import settings | |
| _memory_file: Path | None = None | |
| def _get_memory_file() -> Path: | |
| global _memory_file | |
| if _memory_file is None: | |
| os.makedirs(settings.memory_dir, exist_ok=True) | |
| _memory_file = Path(settings.memory_dir) / "problems.jsonl" | |
| return _memory_file | |
| def save_record( | |
| input_type: str, | |
| extracted_text: str, | |
| parsed_problem: dict, | |
| topic: str, | |
| retrieved_chunks: list[str], | |
| solution: str, | |
| solution_steps: list[str], | |
| verification: dict, | |
| explanation: str, | |
| user_feedback: str = "", | |
| user_comment: str = "", | |
| ) -> str: | |
| record_id = str(uuid.uuid4()) | |
| record = { | |
| "id": record_id, | |
| "timestamp": datetime.now().isoformat(), | |
| "input_type": input_type, | |
| "extracted_text": extracted_text, | |
| "parsed_problem": parsed_problem, | |
| "topic": topic, | |
| "retrieved_chunk_sources": retrieved_chunks, | |
| "solution": solution, | |
| "solution_steps": solution_steps, | |
| "verification": verification, | |
| "explanation": explanation, | |
| "user_feedback": user_feedback, | |
| "user_comment": user_comment, | |
| } | |
| path = _get_memory_file() | |
| with open(path, "a", encoding="utf-8") as f: | |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| return record_id | |
| def get_all_records() -> list[dict]: | |
| path = _get_memory_file() | |
| if not path.exists(): | |
| return [] | |
| records = [] | |
| with open(path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| try: | |
| records.append(json.loads(line)) | |
| except json.JSONDecodeError: | |
| continue | |
| return records | |
| def update_feedback(record_id: str, feedback: str, comment: str = "") -> bool: | |
| path = _get_memory_file() | |
| if not path.exists(): | |
| return False | |
| records = get_all_records() | |
| updated = False | |
| with open(path, "w", encoding="utf-8") as f: | |
| for record in records: | |
| if record.get("id") == record_id: | |
| record["user_feedback"] = feedback | |
| record["user_comment"] = comment | |
| updated = True | |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| return updated | |