Spaces:
Runtime error
Runtime error
File size: 4,947 Bytes
a7c5ebb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | """
Logging utilities for pharmaceutical data management agents.
"""
import os
import json
import time
from datetime import datetime
from typing import Dict, Any, Optional
def log_agent_activity(agent_type: str, state: Dict[str, Any], log_dir: str = "./logs/agent_activity"):
"""
Enhanced agent activity logging with detailed state information.
Args:
agent_type: The type of agent or component being logged
state: The current state of the agent workflow
log_dir: Directory to store logs
"""
# Ensure log directory exists
os.makedirs(log_dir, exist_ok=True)
# Create log timestamp
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Extract key state information for logging
current_agent = state.get("current_agent", "unknown")
# Create log entry
log_entry = {
"timestamp": timestamp,
"agent_type": agent_type,
"current_agent": current_agent,
"user_intent_available": "user_intent" in state and bool(state.get("user_intent")),
"pipeline_plan_available": "pipeline_plan" in state and bool(state.get("pipeline_plan")),
"sql_queries_available": "sql_queries" in state and bool(state.get("sql_queries")),
"execution_status": state.get("status", "unknown"),
}
# Generate log file name based on date
log_file = os.path.join(log_dir, f"agent_log_{datetime.now().strftime('%Y%m%d')}.jsonl")
# Append to log file
with open(log_file, "a") as f:
f.write(json.dumps(log_entry) + "\n")
def log_ai_function(response: str, file_name: str, log: bool = True, log_path: str = './logs/', overwrite: bool = True):
"""
Logs the response of an AI function to a file.
Parameters
----------
response : str
The response of the AI function.
file_name : str
The name of the file to save the response to.
log : bool, optional
Whether to log the response or not. The default is True.
log_path : str, optional
The path to save the log file. The default is './logs/'.
overwrite : bool, optional
Whether to overwrite the file if it already exists. The default is True.
- If True, the file will be overwritten.
- If False, a unique file name will be created.
Returns
-------
tuple
The path and name of the log file.
"""
if log:
# Ensure the directory exists
os.makedirs(log_path, exist_ok=True)
# file_name = 'data_wrangler.py'
file_path = os.path.join(log_path, file_name)
if not overwrite:
# If file already exists and we're NOT overwriting, we create a new name
if os.path.exists(file_path):
# Use an incremental suffix (e.g., data_wrangler_1.py, data_wrangler_2.py, etc.)
# or a time-based suffix if you prefer.
base_name, ext = os.path.splitext(file_name)
i = 1
while True:
new_file_name = f"{base_name}_{i}{ext}"
new_file_path = os.path.join(log_path, new_file_name)
if not os.path.exists(new_file_path):
file_path = new_file_path
file_name = new_file_name
break
i += 1
# Write the file
with open(file_path, 'w', encoding='utf-8') as file:
file.write(response)
print(f" File saved to: {file_path}")
return (file_path, file_name)
else:
return (None, None)
def get_execution_metrics(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Collects execution metrics from the agent workflow state.
Args:
state: The current state of the agent workflow
Returns:
Dictionary containing execution metrics
"""
execution_results = state.get("execution_results", {})
metrics = {
"success_rate": execution_results.get("success_rate", 0),
"queries_executed": execution_results.get("queries_executed", 0),
"time_taken": 0, # Placeholder for actual timing
"confidence_scores": state.get("confidence_scores", {})
}
# Calculate time taken if we have timestamps
if "execution_results" in state and "completed_at" in state["execution_results"]:
# Find the earliest timestamp in the state
start_time = None
if "user_intent" in state and "time" in state["user_intent"]:
start_time = state["user_intent"]["time"]
elif "pipeline_plan" in state and "created_at" in state["pipeline_plan"]:
start_time = state["pipeline_plan"]["created_at"]
if start_time:
end_time = state["execution_results"]["completed_at"]
metrics["time_taken"] = round(end_time - start_time, 2)
return metrics |