Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from constants import OUTPUT_DIR | |
| DIAGNOSTICS_DIR = os.path.join(OUTPUT_DIR, "diagnostics") | |
| os.makedirs(DIAGNOSTICS_DIR, exist_ok=True) | |
| class TraceManager: | |
| _instance = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super(TraceManager, cls).__new__(cls) | |
| cls._instance.active_tasks = {} | |
| return cls._instance | |
| def start_trace(self, task_id: str): | |
| self.active_tasks[task_id] = [] | |
| self._save(task_id) | |
| def add_flag(self, task_id: str, flag: str, details: str = ""): | |
| if not task_id: | |
| return | |
| if task_id not in self.active_tasks: | |
| self.active_tasks[task_id] = [] | |
| entry = { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "flag": flag, | |
| "details": str(details)[:500] # truncate details to keep json small | |
| } | |
| self.active_tasks[task_id].append(entry) | |
| self._save(task_id) | |
| # Also log to standard logger | |
| logging.getLogger("wealth_engine").info(f"[{flag}] {details}") | |
| def _save(self, task_id: str): | |
| try: | |
| path = os.path.join(DIAGNOSTICS_DIR, f"trace_{task_id}.json") | |
| with open(path, "w") as f: | |
| json.dump({ | |
| "task_id": task_id, | |
| "last_updated": datetime.utcnow().isoformat(), | |
| "flags": self.active_tasks[task_id] | |
| }, f, indent=4) | |
| except Exception: | |
| pass | |
| def get_trace(self, task_id: str): | |
| return self.active_tasks.get(task_id, []) | |
| tracer = TraceManager() | |