Spaces:
Runtime error
Runtime error
| """ | |
| Logging utilities for pharmaceutical data management agents. | |
| """ | |
| import os | |
| import json | |
| import time | |
| from datetime import datetime | |
| from typing import Dict, Any, Optional | |
| def log_agent_activity(agent_type: str, state: Dict[str, Any], log_dir: str = "./logs/agent_activity"): | |
| """ | |
| Enhanced agent activity logging with detailed state information. | |
| Args: | |
| agent_type: The type of agent or component being logged | |
| state: The current state of the agent workflow | |
| log_dir: Directory to store logs | |
| """ | |
| # Ensure log directory exists | |
| os.makedirs(log_dir, exist_ok=True) | |
| # Create log timestamp | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| # Extract key state information for logging | |
| current_agent = state.get("current_agent", "unknown") | |
| # Create log entry | |
| log_entry = { | |
| "timestamp": timestamp, | |
| "agent_type": agent_type, | |
| "current_agent": current_agent, | |
| "user_intent_available": "user_intent" in state and bool(state.get("user_intent")), | |
| "pipeline_plan_available": "pipeline_plan" in state and bool(state.get("pipeline_plan")), | |
| "sql_queries_available": "sql_queries" in state and bool(state.get("sql_queries")), | |
| "execution_status": state.get("status", "unknown"), | |
| } | |
| # Generate log file name based on date | |
| log_file = os.path.join(log_dir, f"agent_log_{datetime.now().strftime('%Y%m%d')}.jsonl") | |
| # Append to log file | |
| with open(log_file, "a") as f: | |
| f.write(json.dumps(log_entry) + "\n") | |
| def log_ai_function(response: str, file_name: str, log: bool = True, log_path: str = './logs/', overwrite: bool = True): | |
| """ | |
| Logs the response of an AI function to a file. | |
| Parameters | |
| ---------- | |
| response : str | |
| The response of the AI function. | |
| file_name : str | |
| The name of the file to save the response to. | |
| log : bool, optional | |
| Whether to log the response or not. The default is True. | |
| log_path : str, optional | |
| The path to save the log file. The default is './logs/'. | |
| overwrite : bool, optional | |
| Whether to overwrite the file if it already exists. The default is True. | |
| - If True, the file will be overwritten. | |
| - If False, a unique file name will be created. | |
| Returns | |
| ------- | |
| tuple | |
| The path and name of the log file. | |
| """ | |
| if log: | |
| # Ensure the directory exists | |
| os.makedirs(log_path, exist_ok=True) | |
| # file_name = 'data_wrangler.py' | |
| file_path = os.path.join(log_path, file_name) | |
| if not overwrite: | |
| # If file already exists and we're NOT overwriting, we create a new name | |
| if os.path.exists(file_path): | |
| # Use an incremental suffix (e.g., data_wrangler_1.py, data_wrangler_2.py, etc.) | |
| # or a time-based suffix if you prefer. | |
| base_name, ext = os.path.splitext(file_name) | |
| i = 1 | |
| while True: | |
| new_file_name = f"{base_name}_{i}{ext}" | |
| new_file_path = os.path.join(log_path, new_file_name) | |
| if not os.path.exists(new_file_path): | |
| file_path = new_file_path | |
| file_name = new_file_name | |
| break | |
| i += 1 | |
| # Write the file | |
| with open(file_path, 'w', encoding='utf-8') as file: | |
| file.write(response) | |
| print(f" File saved to: {file_path}") | |
| return (file_path, file_name) | |
| else: | |
| return (None, None) | |
| def get_execution_metrics(state: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Collects execution metrics from the agent workflow state. | |
| Args: | |
| state: The current state of the agent workflow | |
| Returns: | |
| Dictionary containing execution metrics | |
| """ | |
| execution_results = state.get("execution_results", {}) | |
| metrics = { | |
| "success_rate": execution_results.get("success_rate", 0), | |
| "queries_executed": execution_results.get("queries_executed", 0), | |
| "time_taken": 0, # Placeholder for actual timing | |
| "confidence_scores": state.get("confidence_scores", {}) | |
| } | |
| # Calculate time taken if we have timestamps | |
| if "execution_results" in state and "completed_at" in state["execution_results"]: | |
| # Find the earliest timestamp in the state | |
| start_time = None | |
| if "user_intent" in state and "time" in state["user_intent"]: | |
| start_time = state["user_intent"]["time"] | |
| elif "pipeline_plan" in state and "created_at" in state["pipeline_plan"]: | |
| start_time = state["pipeline_plan"]["created_at"] | |
| if start_time: | |
| end_time = state["execution_results"]["completed_at"] | |
| metrics["time_taken"] = round(end_time - start_time, 2) | |
| return metrics |