File size: 4,947 Bytes
a7c5ebb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""
Logging utilities for pharmaceutical data management agents.
"""

import os
import json
import time
from datetime import datetime
from typing import Dict, Any, Optional

def log_agent_activity(agent_type: str, state: Dict[str, Any], log_dir: str = "./logs/agent_activity"):
    """
    Enhanced agent activity logging with detailed state information.
    
    Args:
        agent_type: The type of agent or component being logged
        state: The current state of the agent workflow
        log_dir: Directory to store logs
    """
    # Ensure log directory exists
    os.makedirs(log_dir, exist_ok=True)
    
    # Create log timestamp
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    # Extract key state information for logging
    current_agent = state.get("current_agent", "unknown")
    
    # Create log entry
    log_entry = {
        "timestamp": timestamp,
        "agent_type": agent_type,
        "current_agent": current_agent,
        "user_intent_available": "user_intent" in state and bool(state.get("user_intent")),
        "pipeline_plan_available": "pipeline_plan" in state and bool(state.get("pipeline_plan")),
        "sql_queries_available": "sql_queries" in state and bool(state.get("sql_queries")),
        "execution_status": state.get("status", "unknown"),
    }
    
    # Generate log file name based on date
    log_file = os.path.join(log_dir, f"agent_log_{datetime.now().strftime('%Y%m%d')}.jsonl")
    
    # Append to log file
    with open(log_file, "a") as f:
        f.write(json.dumps(log_entry) + "\n")

def log_ai_function(response: str, file_name: str, log: bool = True, log_path: str = './logs/', overwrite: bool = True):
    """
    Logs the response of an AI function to a file.
    
    Parameters
    ----------
    response : str
        The response of the AI function.
    file_name : str
        The name of the file to save the response to.
    log : bool, optional
        Whether to log the response or not. The default is True.
    log_path : str, optional
        The path to save the log file. The default is './logs/'.
    overwrite : bool, optional
        Whether to overwrite the file if it already exists. The default is True.
        - If True, the file will be overwritten. 
        - If False, a unique file name will be created.
    
    Returns
    -------
    tuple
        The path and name of the log file.    
    """
    
    if log:
        # Ensure the directory exists
        os.makedirs(log_path, exist_ok=True)

        # file_name = 'data_wrangler.py'
        file_path = os.path.join(log_path, file_name)

        if not overwrite:
            # If file already exists and we're NOT overwriting, we create a new name
            if os.path.exists(file_path):
                # Use an incremental suffix (e.g., data_wrangler_1.py, data_wrangler_2.py, etc.)
                # or a time-based suffix if you prefer.
                base_name, ext = os.path.splitext(file_name)
                i = 1
                while True:
                    new_file_name = f"{base_name}_{i}{ext}"
                    new_file_path = os.path.join(log_path, new_file_name)
                    if not os.path.exists(new_file_path):
                        file_path = new_file_path
                        file_name = new_file_name
                        break
                    i += 1

        # Write the file
        with open(file_path, 'w', encoding='utf-8') as file:
            file.write(response)

        print(f"      File saved to: {file_path}")
        
        return (file_path, file_name)
    
    else:
        return (None, None)

def get_execution_metrics(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Collects execution metrics from the agent workflow state.
    
    Args:
        state: The current state of the agent workflow
        
    Returns:
        Dictionary containing execution metrics
    """
    execution_results = state.get("execution_results", {})
    metrics = {
        "success_rate": execution_results.get("success_rate", 0),
        "queries_executed": execution_results.get("queries_executed", 0),
        "time_taken": 0,  # Placeholder for actual timing
        "confidence_scores": state.get("confidence_scores", {})
    }
    
    # Calculate time taken if we have timestamps
    if "execution_results" in state and "completed_at" in state["execution_results"]:
        # Find the earliest timestamp in the state
        start_time = None
        if "user_intent" in state and "time" in state["user_intent"]:
            start_time = state["user_intent"]["time"]
        elif "pipeline_plan" in state and "created_at" in state["pipeline_plan"]:
            start_time = state["pipeline_plan"]["created_at"]
            
        if start_time:
            end_time = state["execution_results"]["completed_at"]
            metrics["time_taken"] = round(end_time - start_time, 2)
    
    return metrics