File size: 4,635 Bytes
e00e744
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import asyncio
import sys
import os
from typing import Dict, Any, List, Optional, Union
import json
from datetime import datetime
import uuid

from .utils import get_logger, get_model, validate_api_keys

logger = get_logger(__name__)

from .utils.tracers import LogTracer
from .agent_manager import agent_manager
from agents import add_trace_processor
from .models.models import AgentResult


class FrameworkAgent:
    
    def __init__(self, agent_name: str, model_name: str = "gpt-4o-mini", 
                 source_code: str = None):
        """
        Initialize the Agent Framework.
        
        Args:
            agent_name (str): The name of the agent to use
            model_name (str): The model to use for the agents (default: "gpt-4o-mini")
            lineage_config (LineageConfig): Configuration for OpenLineage event metadata
            
        Raises:
            ValueError: If lineage_config is not provided   
        """
        if not source_code:
            raise ValueError("source_code is required and cannot be None")
        
        self.agent_name = agent_name
        self.model_name = model_name
        self.source_code = source_code
        self.agent_manager = agent_manager
        
        # Validate API keys on initialization
        validate_api_keys()
        
        logger.info(f"FrameworkAgent initialized: agent_name={agent_name}, model_name={model_name}")

    
    
    async def run_agent_plugin(self, **kwargs) -> Dict[str, Any]:
        """
        Run a specific agent with a source code.
        
        Args:
            **kwargs: Additional arguments to pass to the agent
            
        Returns:
            Dict[str, Any]: The results from the agent with merged OpenLineage metadata
        """
        logger.info(f"Starting agent: {self.agent_name} with model: {self.model_name}")
        add_trace_processor(LogTracer())
        
        try:
            # Create the agent using the plugin's factory function
            logger.info(f"Creating agent instance for: {self.agent_name}")
            agent = self.agent_manager.create_agent(
                agent_name=self.agent_name, 
                source_code=self.source_code, 
                model_name=self.model_name,
                **kwargs
            )
            
            # Run the agent
            logger.info(f"Running agent: {self.agent_name}")
            results = await agent.run()
            logger.info(f"Agent {self.agent_name} completed successfully")
                                  
            return results
            
        except Exception as e:
            logger.error(f"Error running agent {self.agent_name}: {e}")
            return {"error": str(e)}

    def map_results_to_objects(self, results: Dict[str, Any]) -> Union[AgentResult, Dict[str, Any]]:
        """
        Map JSON results from agent to structured AgentResult objects.
        
        Args:
            results: Dictionary containing the agent results
            
        Returns:
            AgentResult: Structured object representation of the results, or original dict if mapping fails
        """
        try:
            
            # Check if it's an error response
            if "error" in results:
                return results
            
            # Check if it has the expected structure for lineage results
            if "inputs" in results and "outputs" in results:
                return AgentResult.from_dict(results)
            
            # If it doesn't match the expected structure, return as-is
            return results
            
        except Exception as e:
            logger.error(f"Error mapping results to objects: {e}")
            return results

    async def run_agent(self, **kwargs) -> Union[AgentResult, Dict[str, Any]]:
        """
        Run a specific agent and return structured objects instead of raw dictionaries.
        
        Args:
            **kwargs: Additional arguments to pass to the agent
            
        Returns:
            Union[AgentResult, Dict[str, Any]]: Structured AgentResult object or error dict
        """
        logger.info(f"Starting run_agent for {self.agent_name}")
        raw_results = await self.run_agent_plugin(**kwargs)
        mapped_results = self.map_results_to_objects(raw_results)
        logger.info(f"Agent {self.agent_name} completed. Results type: {type(mapped_results)}")
        if hasattr(mapped_results, 'to_dict'):
            logger.info(f"Mapped results: {mapped_results.to_dict()}")
        else:
            logger.info(f"Raw results: {mapped_results}")
        return mapped_results