Spaces:
Sleeping
Sleeping
File size: 4,635 Bytes
e00e744 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
import asyncio
import sys
import os
from typing import Dict, Any, List, Optional, Union
import json
from datetime import datetime
import uuid
from .utils import get_logger, get_model, validate_api_keys
logger = get_logger(__name__)
from .utils.tracers import LogTracer
from .agent_manager import agent_manager
from agents import add_trace_processor
from .models.models import AgentResult
class FrameworkAgent:
def __init__(self, agent_name: str, model_name: str = "gpt-4o-mini",
source_code: str = None):
"""
Initialize the Agent Framework.
Args:
agent_name (str): The name of the agent to use
model_name (str): The model to use for the agents (default: "gpt-4o-mini")
lineage_config (LineageConfig): Configuration for OpenLineage event metadata
Raises:
ValueError: If lineage_config is not provided
"""
if not source_code:
raise ValueError("source_code is required and cannot be None")
self.agent_name = agent_name
self.model_name = model_name
self.source_code = source_code
self.agent_manager = agent_manager
# Validate API keys on initialization
validate_api_keys()
logger.info(f"FrameworkAgent initialized: agent_name={agent_name}, model_name={model_name}")
async def run_agent_plugin(self, **kwargs) -> Dict[str, Any]:
"""
Run a specific agent with a source code.
Args:
**kwargs: Additional arguments to pass to the agent
Returns:
Dict[str, Any]: The results from the agent with merged OpenLineage metadata
"""
logger.info(f"Starting agent: {self.agent_name} with model: {self.model_name}")
add_trace_processor(LogTracer())
try:
# Create the agent using the plugin's factory function
logger.info(f"Creating agent instance for: {self.agent_name}")
agent = self.agent_manager.create_agent(
agent_name=self.agent_name,
source_code=self.source_code,
model_name=self.model_name,
**kwargs
)
# Run the agent
logger.info(f"Running agent: {self.agent_name}")
results = await agent.run()
logger.info(f"Agent {self.agent_name} completed successfully")
return results
except Exception as e:
logger.error(f"Error running agent {self.agent_name}: {e}")
return {"error": str(e)}
def map_results_to_objects(self, results: Dict[str, Any]) -> Union[AgentResult, Dict[str, Any]]:
"""
Map JSON results from agent to structured AgentResult objects.
Args:
results: Dictionary containing the agent results
Returns:
AgentResult: Structured object representation of the results, or original dict if mapping fails
"""
try:
# Check if it's an error response
if "error" in results:
return results
# Check if it has the expected structure for lineage results
if "inputs" in results and "outputs" in results:
return AgentResult.from_dict(results)
# If it doesn't match the expected structure, return as-is
return results
except Exception as e:
logger.error(f"Error mapping results to objects: {e}")
return results
async def run_agent(self, **kwargs) -> Union[AgentResult, Dict[str, Any]]:
"""
Run a specific agent and return structured objects instead of raw dictionaries.
Args:
**kwargs: Additional arguments to pass to the agent
Returns:
Union[AgentResult, Dict[str, Any]]: Structured AgentResult object or error dict
"""
logger.info(f"Starting run_agent for {self.agent_name}")
raw_results = await self.run_agent_plugin(**kwargs)
mapped_results = self.map_results_to_objects(raw_results)
logger.info(f"Agent {self.agent_name} completed. Results type: {type(mapped_results)}")
if hasattr(mapped_results, 'to_dict'):
logger.info(f"Mapped results: {mapped_results.to_dict()}")
else:
logger.info(f"Raw results: {mapped_results}")
return mapped_results
|