""" Logging utilities for pharmaceutical data management agents. """ import os import json import time from datetime import datetime from typing import Dict, Any, Optional def log_agent_activity(agent_type: str, state: Dict[str, Any], log_dir: str = "./logs/agent_activity"): """ Enhanced agent activity logging with detailed state information. Args: agent_type: The type of agent or component being logged state: The current state of the agent workflow log_dir: Directory to store logs """ # Ensure log directory exists os.makedirs(log_dir, exist_ok=True) # Create log timestamp timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Extract key state information for logging current_agent = state.get("current_agent", "unknown") # Create log entry log_entry = { "timestamp": timestamp, "agent_type": agent_type, "current_agent": current_agent, "user_intent_available": "user_intent" in state and bool(state.get("user_intent")), "pipeline_plan_available": "pipeline_plan" in state and bool(state.get("pipeline_plan")), "sql_queries_available": "sql_queries" in state and bool(state.get("sql_queries")), "execution_status": state.get("status", "unknown"), } # Generate log file name based on date log_file = os.path.join(log_dir, f"agent_log_{datetime.now().strftime('%Y%m%d')}.jsonl") # Append to log file with open(log_file, "a") as f: f.write(json.dumps(log_entry) + "\n") def log_ai_function(response: str, file_name: str, log: bool = True, log_path: str = './logs/', overwrite: bool = True): """ Logs the response of an AI function to a file. Parameters ---------- response : str The response of the AI function. file_name : str The name of the file to save the response to. log : bool, optional Whether to log the response or not. The default is True. log_path : str, optional The path to save the log file. The default is './logs/'. overwrite : bool, optional Whether to overwrite the file if it already exists. The default is True. - If True, the file will be overwritten. - If False, a unique file name will be created. Returns ------- tuple The path and name of the log file. """ if log: # Ensure the directory exists os.makedirs(log_path, exist_ok=True) # file_name = 'data_wrangler.py' file_path = os.path.join(log_path, file_name) if not overwrite: # If file already exists and we're NOT overwriting, we create a new name if os.path.exists(file_path): # Use an incremental suffix (e.g., data_wrangler_1.py, data_wrangler_2.py, etc.) # or a time-based suffix if you prefer. base_name, ext = os.path.splitext(file_name) i = 1 while True: new_file_name = f"{base_name}_{i}{ext}" new_file_path = os.path.join(log_path, new_file_name) if not os.path.exists(new_file_path): file_path = new_file_path file_name = new_file_name break i += 1 # Write the file with open(file_path, 'w', encoding='utf-8') as file: file.write(response) print(f" File saved to: {file_path}") return (file_path, file_name) else: return (None, None) def get_execution_metrics(state: Dict[str, Any]) -> Dict[str, Any]: """ Collects execution metrics from the agent workflow state. Args: state: The current state of the agent workflow Returns: Dictionary containing execution metrics """ execution_results = state.get("execution_results", {}) metrics = { "success_rate": execution_results.get("success_rate", 0), "queries_executed": execution_results.get("queries_executed", 0), "time_taken": 0, # Placeholder for actual timing "confidence_scores": state.get("confidence_scores", {}) } # Calculate time taken if we have timestamps if "execution_results" in state and "completed_at" in state["execution_results"]: # Find the earliest timestamp in the state start_time = None if "user_intent" in state and "time" in state["user_intent"]: start_time = state["user_intent"]["time"] elif "pipeline_plan" in state and "created_at" in state["pipeline_plan"]: start_time = state["pipeline_plan"]["created_at"] if start_time: end_time = state["execution_results"]["completed_at"] metrics["time_taken"] = round(end_time - start_time, 2) return metrics