Spaces:
Sleeping
Sleeping
File size: 2,688 Bytes
6df13ef |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# services/agentic_integration_logger.py
"""
Agentic Integration Logger - Comprehensive audit trail for all agentic executions.
"""
import json
import logging
from datetime import datetime, timezone
from typing import Dict, Any, Optional
logger = logging.getLogger("agentic.integration")
def log_agentic_attempt(
session_id: str,
pipeline: Dict[str, Any],
decision: str
):
"""Log decision to use/skip agentic orchestration."""
logger.info(json.dumps({
"event": "agentic_decision",
"session_id": session_id,
"pipeline_name": pipeline.get("pipeline_name"),
"decision": decision,
"timestamp": datetime.now(timezone.utc).isoformat()
}))
def log_agentic_execution(
session_id: str,
pipeline: Dict[str, Any],
agentic_summary: Dict[str, Any],
result: str,
fallback_reason: Optional[str] = None
):
"""Log complete agentic execution with all metadata."""
log_entry = {
"event": "agentic_execution",
"session_id": session_id,
"pipeline_id": pipeline.get("pipeline_id"),
"pipeline_name": pipeline.get("pipeline_name"),
"result": result, # "success" | "fallback"
"timestamp": datetime.now(timezone.utc).isoformat(),
"plan_versions": len(agentic_summary.get("plan_versions", [])),
"total_messages": agentic_summary.get("total_messages", 0),
"rejections": len(agentic_summary.get("rejections", [])),
"verification": agentic_summary.get("verification", {})
}
if fallback_reason:
log_entry["fallback_reason"] = fallback_reason
if result == "success":
logger.info(json.dumps(log_entry))
else:
logger.warning(json.dumps(log_entry))
def log_fallback_trigger(
session_id: str,
reason: str,
exception: Optional[Exception] = None
):
"""Log when fallback to legacy is triggered."""
logger.warning(json.dumps({
"event": "fallback_triggered",
"session_id": session_id,
"reason": reason,
"exception": str(exception) if exception else None,
"timestamp": datetime.now(timezone.utc).isoformat()
}))
def log_shadow_comparison(
session_id: str,
legacy_result: Dict[str, Any],
agentic_result: Dict[str, Any],
differences: Dict[str, Any]
):
"""Log shadow mode execution comparison."""
logger.info(json.dumps({
"event": "shadow_mode_comparison",
"session_id": session_id,
"legacy_status": legacy_result.get("status"),
"agentic_status": agentic_result.get("status"),
"differences": differences,
"timestamp": datetime.now(timezone.utc).isoformat()
}))
|