NextQuest.ai / src /observability.py
ajeet9843
Deploy to Hugging Face Spaces
f10fe83
import logging
import time
import uuid
from contextlib import contextmanager
from functools import wraps
from typing import Optional, Callable, Any
from datetime import datetime, timezone
from src.config import config
class StructuredLogger:
def __init__(self, name: str, correlation_id: Optional[str] = None):
self.logger = logging.getLogger(name)
self.correlation_id = correlation_id or str(uuid.uuid4())[:8]
self._set_level()
def _set_level(self):
level = getattr(logging, config.observability.log_level)
self.logger.setLevel(level)
for handler in self.logger.handlers:
handler.setLevel(level)
def _format(self, msg: str, **kwargs) -> str:
parts = [f"[{self.correlation_id}]", msg]
for k, v in kwargs.items():
parts.append(f"{k}={v}")
return " ".join(parts)
def debug(self, msg: str, **kwargs):
self.logger.debug(self._format(msg, **kwargs))
def info(self, msg: str, **kwargs):
self.logger.info(self._format(msg, **kwargs))
def warning(self, msg: str, **kwargs):
self.logger.warning(self._format(msg, **kwargs))
def error(self, msg: str, **kwargs):
self.logger.error(self._format(msg, **kwargs))
class ProgressTracker:
def __init__(self, correlation_id: str, total_steps: int = 5):
self.correlation_id = correlation_id
self.total_steps = total_steps
self.current_step = 0
self.started_at = datetime.now(timezone.utc)
self.step_times: list[tuple[str, float]] = []
def update(self, step: str, node_name: str):
if not config.observability.trace_nodes:
return
elapsed = (datetime.now(timezone.utc) - self.started_at).total_seconds()
self.step_times.append((node_name, elapsed))
self.current_step += 1
progress_pct = min(100, int((self.current_step / self.total_steps) * 100))
logging.getLogger("progress").info(
f"[{self.correlation_id}] {step} ({node_name}) - {progress_pct}% complete"
)
def summary(self) -> dict:
if not self.step_times:
return {}
return {
"correlation_id": self.correlation_id,
"total_duration_s": round(self.step_times[-1][1], 2) if self.step_times else 0,
"node_count": len(self.step_times),
}
class TokenMonitor:
def __init__(self):
self.total_input_tokens = 0
self.total_output_tokens = 0
self.agent_costs: dict[str, dict[str, int]] = {}
def record(self, agent_name: str, input_tokens: int, output_tokens: int):
if not config.observability.track_tokens:
return
self.total_input_tokens += input_tokens
self.total_output_tokens += output_tokens
if agent_name not in self.agent_costs:
self.agent_costs[agent_name] = {"input": 0, "output": 0, "calls": 0}
self.agent_costs[agent_name]["input"] += input_tokens
self.agent_costs[agent_name]["output"] += output_tokens
self.agent_costs[agent_name]["calls"] += 1
logging.getLogger("tokens").debug(
f"[{agent_name}] input={input_tokens} output={output_tokens}"
)
def summary(self) -> dict:
return {
"total_input_tokens": self.total_input_tokens,
"total_output_tokens": self.total_output_tokens,
"total_tokens": self.total_input_tokens + self.total_output_tokens,
"by_agent": dict(self.agent_costs),
}
_global_token_monitor = TokenMonitor()
def get_token_monitor() -> TokenMonitor:
return _global_token_monitor
@contextmanager
def track_node_execution(node_name: str, correlation_id: str):
start = time.perf_counter()
logger = logging.getLogger("nodes")
logger.debug(f"[{correlation_id}] → {node_name} started")
try:
yield
finally:
elapsed = time.perf_counter() - start
logger.info(f"[{correlation_id}] ← {node_name} completed in {elapsed:.2f}s")
def with_token_tracking(agent_name: str):
def decorator(fn: Callable) -> Callable:
@wraps(fn)
def wrapper(*args, **kwargs):
monitor = get_token_monitor()
start_tokens = monitor.total_input_tokens + monitor.total_output_tokens
result = fn(*args, **kwargs)
end_tokens = monitor.total_input_tokens + monitor.total_output_tokens
if end_tokens > start_tokens:
tokens_delta = end_tokens - start_tokens
monitor.record(agent_name, tokens_delta // 2, tokens_delta // 2)
return result
return wrapper
return decorator