Spaces:
Sleeping
Sleeping
File size: 1,733 Bytes
208fbf8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | import os
import json
import logging
from datetime import datetime
from constants import OUTPUT_DIR
DIAGNOSTICS_DIR = os.path.join(OUTPUT_DIR, "diagnostics")
os.makedirs(DIAGNOSTICS_DIR, exist_ok=True)
class TraceManager:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(TraceManager, cls).__new__(cls)
cls._instance.active_tasks = {}
return cls._instance
def start_trace(self, task_id: str):
self.active_tasks[task_id] = []
self._save(task_id)
def add_flag(self, task_id: str, flag: str, details: str = ""):
if not task_id:
return
if task_id not in self.active_tasks:
self.active_tasks[task_id] = []
entry = {
"timestamp": datetime.utcnow().isoformat(),
"flag": flag,
"details": str(details)[:500] # truncate details to keep json small
}
self.active_tasks[task_id].append(entry)
self._save(task_id)
# Also log to standard logger
logging.getLogger("wealth_engine").info(f"[{flag}] {details}")
def _save(self, task_id: str):
try:
path = os.path.join(DIAGNOSTICS_DIR, f"trace_{task_id}.json")
with open(path, "w") as f:
json.dump({
"task_id": task_id,
"last_updated": datetime.utcnow().isoformat(),
"flags": self.active_tasks[task_id]
}, f, indent=4)
except Exception:
pass
def get_trace(self, task_id: str):
return self.active_tasks.get(task_id, [])
tracer = TraceManager()
|