cryogenic22's picture
Create utils/logging.py
a7c5ebb verified
"""
Logging utilities for pharmaceutical data management agents.
"""
import os
import json
import time
from datetime import datetime
from typing import Dict, Any, Optional
def log_agent_activity(agent_type: str, state: Dict[str, Any], log_dir: str = "./logs/agent_activity"):
"""
Enhanced agent activity logging with detailed state information.
Args:
agent_type: The type of agent or component being logged
state: The current state of the agent workflow
log_dir: Directory to store logs
"""
# Ensure log directory exists
os.makedirs(log_dir, exist_ok=True)
# Create log timestamp
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Extract key state information for logging
current_agent = state.get("current_agent", "unknown")
# Create log entry
log_entry = {
"timestamp": timestamp,
"agent_type": agent_type,
"current_agent": current_agent,
"user_intent_available": "user_intent" in state and bool(state.get("user_intent")),
"pipeline_plan_available": "pipeline_plan" in state and bool(state.get("pipeline_plan")),
"sql_queries_available": "sql_queries" in state and bool(state.get("sql_queries")),
"execution_status": state.get("status", "unknown"),
}
# Generate log file name based on date
log_file = os.path.join(log_dir, f"agent_log_{datetime.now().strftime('%Y%m%d')}.jsonl")
# Append to log file
with open(log_file, "a") as f:
f.write(json.dumps(log_entry) + "\n")
def log_ai_function(response: str, file_name: str, log: bool = True, log_path: str = './logs/', overwrite: bool = True):
"""
Logs the response of an AI function to a file.
Parameters
----------
response : str
The response of the AI function.
file_name : str
The name of the file to save the response to.
log : bool, optional
Whether to log the response or not. The default is True.
log_path : str, optional
The path to save the log file. The default is './logs/'.
overwrite : bool, optional
Whether to overwrite the file if it already exists. The default is True.
- If True, the file will be overwritten.
- If False, a unique file name will be created.
Returns
-------
tuple
The path and name of the log file.
"""
if log:
# Ensure the directory exists
os.makedirs(log_path, exist_ok=True)
# file_name = 'data_wrangler.py'
file_path = os.path.join(log_path, file_name)
if not overwrite:
# If file already exists and we're NOT overwriting, we create a new name
if os.path.exists(file_path):
# Use an incremental suffix (e.g., data_wrangler_1.py, data_wrangler_2.py, etc.)
# or a time-based suffix if you prefer.
base_name, ext = os.path.splitext(file_name)
i = 1
while True:
new_file_name = f"{base_name}_{i}{ext}"
new_file_path = os.path.join(log_path, new_file_name)
if not os.path.exists(new_file_path):
file_path = new_file_path
file_name = new_file_name
break
i += 1
# Write the file
with open(file_path, 'w', encoding='utf-8') as file:
file.write(response)
print(f" File saved to: {file_path}")
return (file_path, file_name)
else:
return (None, None)
def get_execution_metrics(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Collects execution metrics from the agent workflow state.
Args:
state: The current state of the agent workflow
Returns:
Dictionary containing execution metrics
"""
execution_results = state.get("execution_results", {})
metrics = {
"success_rate": execution_results.get("success_rate", 0),
"queries_executed": execution_results.get("queries_executed", 0),
"time_taken": 0, # Placeholder for actual timing
"confidence_scores": state.get("confidence_scores", {})
}
# Calculate time taken if we have timestamps
if "execution_results" in state and "completed_at" in state["execution_results"]:
# Find the earliest timestamp in the state
start_time = None
if "user_intent" in state and "time" in state["user_intent"]:
start_time = state["user_intent"]["time"]
elif "pipeline_plan" in state and "created_at" in state["pipeline_plan"]:
start_time = state["pipeline_plan"]["created_at"]
if start_time:
end_time = state["execution_results"]["completed_at"]
metrics["time_taken"] = round(end_time - start_time, 2)
return metrics