chat / agents /manager_agent.py
WeMWish's picture
Add authentication, token quota tracking, and comprehensive usage logging
8d66edb
# agents/manager_agent.py
import json # Keep for potential future use, though primary JSON parsing shifts to other agents
import time # Keep for potential delays or timing if added later
# import inspect # Removed, was for Manager's own tool schema gen
# import tools.agent_tools # Removed, schema discovery now in GenerationAgent
import os # Added for image path validation
# import io # Removed as unused
import sys
# import traceback # Removed as unused
import importlib
import base64 # For PDF to image conversion
import io # For PDF to image conversion
from openai import OpenAI
# from contextlib import redirect_stdout # Removed as unused
# Import specialized agents
from agents.generation_agent import GenerationAgent
from agents.supervisor_agent import SupervisorAgent
from agents.executor_agent import ExecutorAgent
# ASSISTANT_NAME and BASE_ASSISTANT_INSTRUCTIONS are removed as Manager no longer has its own Assistant.
# POLLING_INTERVAL_S and MAX_POLLING_ATTEMPTS are removed, polling is handled by individual agents.
class ManagerAgent:
def __init__(self, openai_api_key=None, openai_client: OpenAI = None, r_callback_fn=None, supabase_client=None, user_id=None, hf_user_id=None):
"""
Initialize the Manager Agent with OpenAI credentials and sub-agents.
Args:
openai_api_key: OpenAI API key
openai_client: Pre-initialized OpenAI client
r_callback_fn: Callback function for R integration
supabase_client: Supabase client for logging and quota tracking
user_id: UUID of user from Supabase users table
hf_user_id: Hugging Face user ID
"""
if openai_client:
self.client = openai_client
elif openai_api_key:
self.client = OpenAI(api_key=openai_api_key)
else:
self.client = None
print("ManagerAgent Warning: No OpenAI client provided. Some functionality may be limited.")
# Storage for conversation history - list of dicts like [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]
self.conversation_history = []
# Storage for file information - dict like {"file_id": "...", "file_name": "...", "file_path": "..."}
self.file_info = {}
# Storage for pending literature confirmation
self.pending_literature_confirmation = None
self.pending_literature_query = None
# R callback function for thoughts
self.r_callback_fn = r_callback_fn
# Supabase client for logging and quota tracking
self.supabase_client = supabase_client
self.current_user_id = user_id
self.current_hf_user_id = hf_user_id
# Token tracking for current query
self.last_prompt_tokens = 0
self.last_completion_tokens = 0
self.last_total_tokens = 0
self.model_name = "gpt-4o"
# Initialize sub-agents
try:
if self.client:
from .generation_agent import GenerationAgent
from .supervisor_agent import SupervisorAgent
from .executor_agent import ExecutorAgent
self.generation_agent = GenerationAgent(client_openai=self.client)
self.supervisor_agent = SupervisorAgent(client_openai=self.client)
self.executor_agent = ExecutorAgent()
print("ManagerAgent: Successfully initialized all sub-agents.")
else:
print("ManagerAgent: No OpenAI client available, sub-agents not initialized.")
self.generation_agent = None
self.supervisor_agent = None
self.executor_agent = None
except Exception as e:
print(f"ManagerAgent: Error initializing sub-agents: {e}")
self.generation_agent = None
self.supervisor_agent = None
self.executor_agent = None
# Obsolete methods related to ManagerAgent's own Assistant will be removed next:
# _load_excel_schema, _prepare_tool_schemas, _create_or_retrieve_assistant,
# _poll_run_for_completion, _display_assistant_response, _start_new_thread (Thread management shifts to individual agents)
def _send_thought_to_r(self, thought_text: str):
"""Sends a thought message to the registered R callback function, if available."""
if self.r_callback_fn:
try:
# print(f"Python Agent: Sending thought to R: {thought_text}") # Optional: uncomment for verbose Python-side logging of thoughts
self.r_callback_fn(thought_text)
except Exception as e:
print(f"ManagerAgent Error: Exception while calling R callback: {e}")
# else:
# print(f"Python Agent (No R callback): Thought: {thought_text}") # Optional: uncomment to see thoughts even if no R callback
def _detect_literature_request(self, plan: dict, user_query: str = "") -> bool:
"""
Detects if the generated plan wants to use literature search or paper.pdf resources.
Returns True if literature resources are requested, False otherwise.
"""
# Check for literature search tools in the plan
plan_status = plan.get("status", "")
python_code = plan.get("python_code", "")
thought = plan.get("thought", "")
# Check for external literature search functions
literature_search_patterns = [
"multi_source_literature_search",
"fetch_text_from_urls",
"arxiv",
"pubmed",
"semantic_scholar"
]
# Check for paper-related patterns in user query (since paper.pdf is auto-loaded)
user_query_lower = user_query.lower()
paper_patterns = [
"what's the title of the paper",
"what does the paper say",
"according to the paper",
"the paper mentions",
"in the paper",
"paper.pdf",
"summarize the paper",
"analyze the paper"
]
# Check if any literature search patterns are in the code
code_has_literature = any(pattern in python_code for pattern in literature_search_patterns)
# Check if any literature search patterns are in the thought process
thought_has_literature = any(pattern in thought.lower() for pattern in literature_search_patterns)
# Check if user query directly references paper
query_references_paper = any(pattern in user_query_lower for pattern in paper_patterns)
result = code_has_literature or thought_has_literature or query_references_paper
print(f"[Manager._detect_literature_request] Result: {result}")
print(f" - Code has literature: {code_has_literature}")
print(f" - Thought has literature: {thought_has_literature}")
print(f" - Query references paper: {query_references_paper}")
return result
# REMOVED: _request_literature_confirmation_upfront - no longer needed
# Literature preferences are now handled as post-analysis options
def _continue_with_literature_plan(self, plan: dict) -> str:
"""Continue processing with the original plan that includes literature search."""
# Execute the original plan as intended
return self._execute_plan_with_literature(plan)
def _continue_without_literature_plan(self, plan: dict) -> str:
"""Continue processing but skip literature search components."""
# Modify the plan to remove literature search calls
modified_plan = self._remove_literature_from_plan(plan)
return self._execute_modified_plan(modified_plan)
def _remove_external_literature_from_plan(self, plan: dict) -> dict:
"""Remove literature search components from the plan."""
modified_plan = plan.copy()
python_code = modified_plan.get("python_code", "")
# Remove external literature search calls and replace with generic response
if "multi_source_literature_search" in python_code or "fetch_text_from_urls" in python_code:
# Replace with a simple response
modified_plan["python_code"] = 'print(json.dumps({"response": "I can provide analysis based on available data, but external literature search was not used per your preference."}))'
modified_plan["status"] = "CODE_COMPLETE"
modified_plan["explanation"] = "Providing analysis without external literature sources as requested."
return modified_plan
def _execute_plan_with_literature(self, plan: dict) -> str:
"""Execute the original plan with literature components."""
# This continues the normal execution flow
# We'll integrate this into the existing _process_turn method
return self._continue_plan_execution(plan)
def _execute_modified_plan(self, plan: dict) -> str:
"""Execute the modified plan without literature."""
return self._continue_plan_execution(plan)
def _continue_plan_execution(self, plan: dict) -> str:
"""Continue with plan execution after literature confirmation."""
# This method will be called from the existing _process_turn logic
# For now, return a simple response - the actual execution logic
# will be integrated into the existing code flow
return plan.get("explanation", "Processing completed.")
def _process_turn(self, user_query_text: str) -> tuple:
"""
Processes a single turn of the conversation.
This is the core logic used by both terminal and Shiny interfaces.
Assumes self.conversation_history has been updated with the latest user_query_text.
Returns a tuple of (response_text, is_image_response, image_path)
"""
print(f"[Manager._process_turn] Processing query: '{user_query_text[:100]}...'")
self._send_thought_to_r(f"Processing query: '{user_query_text[:50]}...'") # THOUGHT
# --- Process with dynamic literature settings based on frontend preference ---
use_external_literature = getattr(self, 'literature_enabled', False) # Default to False
print(f"[Manager._process_turn] Processing with literature enabled: {use_external_literature}")
self._send_thought_to_r(f"Processing query with external literature: {'enabled' if use_external_literature else 'disabled'}")
response_text = self._process_with_literature_preferences(
user_query_text,
use_paper=True, # Keep paper (internal data) always enabled
use_external_literature=use_external_literature # Use frontend preference
)
return response_text, False, None
def _process_with_literature_preferences(self, user_query: str, use_paper: bool, use_external_literature: bool) -> str:
"""
Continue processing with the plan, either with or without literature.
This method will execute the plan and return the final response.
"""
try:
# NOTE: conversation_history is already updated in process_single_query before calling _process_turn
# So we don't need to add the user query again here
# Track the current image being processed (if any)
current_image_path = None
is_image_response = False
# --- Multi-Stage Generation & Potential Retry Logic ---
max_regeneration_attempts = 3
current_generation_attempt = 0
final_plan_for_turn = None
code_approved_for_execution = False
current_query_for_generation_agent = user_query
previous_generation_attempts = []
# Track attempted operations to prevent infinite loops
attempted_operations = set()
# This variable will hold the File ID if the manager uploads a file and needs to re-call generate_code_plan
image_file_id_for_analysis_step = None
while current_generation_attempt < max_regeneration_attempts and not code_approved_for_execution:
current_generation_attempt += 1
print(f"[Manager._process_with_literature_preferences] Generation Attempt: {current_generation_attempt}/{max_regeneration_attempts}")
self._send_thought_to_r(f"Generation Attempt: {current_generation_attempt}/{max_regeneration_attempts}")
# Determine the query for the GenerationAgent for this attempt.
query_to_pass_to_llm = current_query_for_generation_agent
# Inner loop for data fetching/processing steps
max_data_fetch_attempts_per_generation = 3
current_data_fetch_attempt = 0
previous_data_fetch_attempts_for_current_generation = []
call_ga_again_for_follow_up = True
current_plan_holder = final_plan_for_turn
while call_ga_again_for_follow_up and current_data_fetch_attempt < max_data_fetch_attempts_per_generation:
current_data_fetch_attempt += 1
print(f"[DEBUG] Data fetch attempt {current_data_fetch_attempt}/{max_data_fetch_attempts_per_generation}")
if current_data_fetch_attempt > max_data_fetch_attempts_per_generation:
print(f"[DEBUG] Maximum data fetch attempts reached for generation {current_generation_attempt}")
break
call_ga_again_for_follow_up = False
if not self.generation_agent:
self._send_thought_to_r("Error: Generation capabilities are unavailable.")
return "Generation capabilities are unavailable. Cannot proceed."
effective_query_for_ga = query_to_pass_to_llm
self._send_thought_to_r(f"Asking GenerationAgent for a plan with literature preferences...")
# DEBUG: Log conversation history being passed to GenerationAgent
print(f"[DEBUG] Passing conversation history to GenerationAgent:")
print(f"[DEBUG] - History length: {len(self.conversation_history)}")
print(f"[DEBUG] - History roles: {[msg['role'] for msg in self.conversation_history]}")
for i, msg in enumerate(self.conversation_history[-3:]): # Show last 3 messages
print(f"[DEBUG] - Message {len(self.conversation_history)-3+i}: {msg['role']} - {msg['content'][:100]}...")
# Pass literature preferences to GenerationAgent
plan = self.generation_agent.generate_code_plan(
user_query=effective_query_for_ga,
conversation_history=self.conversation_history,
image_file_id_for_prompt=image_file_id_for_analysis_step,
previous_attempts_feedback=previous_generation_attempts,
literature_preferences={
"use_paper": use_paper,
"use_external_literature": use_external_literature
}
)
final_plan_for_turn = plan
current_plan_holder = plan
# Aggregate token usage from GenerationAgent
if 'usage' in plan:
self._aggregate_token_usage(plan['usage'])
print(f"[Manager] Aggregated GenerationAgent usage: {plan['usage'].get('total_tokens', 0)} tokens")
# Reset for next potential direct image analysis
image_file_id_for_analysis_step = None
generated_thought = plan.get('thought', 'No thought provided by GenerationAgent.')
print(f"[GenerationAgent] Thought: {generated_thought}")
self._send_thought_to_r(f"GenerationAgent thought: {generated_thought}")
# Process the plan based on its status
if plan.get("status") == "CODE_COMPLETE":
self._send_thought_to_r(f"Plan is CODE_COMPLETE. Explanation: {plan.get('explanation', '')[:100]}...")
code_approved_for_execution = True
call_ga_again_for_follow_up = False
elif plan.get("status") in ["AWAITING_DATA", "AWAITING_ANALYSIS_CODE"]:
# Execute the code in the plan
code_to_execute = plan.get("python_code", "").strip()
if not code_to_execute:
return "Plan requires code execution but no code provided."
if not self.supervisor_agent or not self.executor_agent:
return "Cannot execute code, Supervisor or Executor agent is missing."
# Have supervisor review the code
self._send_thought_to_r("Reviewing code for safety...")
review = self.supervisor_agent.review_code(code_to_execute, f"Reviewing plan: {plan.get('thought', '')}")
supervisor_status = review.get('safety_status', 'UNKNOWN_STATUS')
supervisor_feedback = review.get('safety_feedback', 'No feedback.')
# Aggregate token usage from SupervisorAgent
if 'usage' in review:
self._aggregate_token_usage(review['usage'])
print(f"[Manager] Aggregated SupervisorAgent usage: {review['usage'].get('total_tokens', 0)} tokens")
if supervisor_status != "APPROVED_FOR_EXECUTION":
return f"Code execution blocked by supervisor: {supervisor_feedback}"
# Check if this operation has been attempted before to prevent loops
operation_signature = f"{code_to_execute.strip()[:100]}" # Use first 100 chars as signature
if operation_signature in attempted_operations:
print(f"[DEBUG] Loop detected! Operation already attempted: {operation_signature[:50]}...")
return "Loop detected: This operation has already been attempted. Please try a different approach."
# Execute the code
self._send_thought_to_r("Executing code...")
attempted_operations.add(operation_signature)
print(f"[DEBUG] Added operation to attempted set: {operation_signature[:50]}...")
execution_result = self.executor_agent.execute_code(code_to_execute)
execution_output = execution_result.get("execution_output", "")
execution_status = execution_result.get("execution_status", "UNKNOWN")
# Aggregate token usage from ExecutorAgent (captures describe_image API calls)
if 'usage' in execution_result:
self._aggregate_token_usage(execution_result['usage'])
print(f"[Manager] Aggregated ExecutorAgent usage: {execution_result['usage'].get('total_tokens', 0)} tokens")
if execution_status == "SUCCESS":
self._send_thought_to_r(f"Code execution successful.")
# DEBUG: Log conversation history before storing results
print(f"[DEBUG] Conversation history length before storing ExecutorAgent result: {len(self.conversation_history)}")
print(f"[DEBUG] ExecutorAgent output being stored: {execution_output[:200]}...")
# Add results to conversation history
stored_content = f"```json\n{execution_output}\n```"
self.conversation_history.append({"role": "assistant", "content": stored_content})
# DEBUG: Log conversation history after storing results
print(f"[DEBUG] Conversation history length after storing result: {len(self.conversation_history)}")
print(f"[DEBUG] Last conversation entry: {self.conversation_history[-1]['content'][:200]}...")
print(f"[DEBUG] Full conversation history roles: {[msg['role'] for msg in self.conversation_history]}")
# Always continue to GenerationAgent for final formatting
# This ensures literature offers and proper response formatting
if "intermediate_data_for_llm" in execution_output:
print(f"[DEBUG] Found 'intermediate_data_for_llm' in output - continuing to GenerationAgent for processing")
call_ga_again_for_follow_up = True
else:
print(f"[DEBUG] No 'intermediate_data_for_llm' found - requesting final formatting from GenerationAgent")
# Instead of returning raw execution output, let GenerationAgent format it
call_ga_again_for_follow_up = True
# Set a flag so GenerationAgent knows this is final formatting phase
query_to_pass_to_llm = f"FINAL_FORMATTING_REQUEST: Format the results from the previous execution for user presentation. Original query: {user_query}"
else:
return f"Code execution failed: {execution_output}"
else:
# Unknown status, return explanation
return plan.get("explanation", "Processing completed with unknown status.")
# Break if approved
if code_approved_for_execution:
break
# Return final result
if final_plan_for_turn:
final_response = final_plan_for_turn.get('explanation', 'Processing completed.')
# Add the response to conversation history for future context
self.conversation_history.append({"role": "assistant", "content": final_response})
return final_response
else:
error_response = "Processing completed, but no response was generated."
self.conversation_history.append({"role": "assistant", "content": error_response})
return error_response
except Exception as e:
error_msg = f"Error processing with literature preferences: {str(e)}"
print(f"[ManagerAgent] {error_msg}")
# Add error to conversation history
self.conversation_history.append({"role": "assistant", "content": error_msg})
return error_msg
def process_single_query_with_preferences(self, user_query_text: str,
conversation_history_from_r: list = None,
literature_enabled: bool = True) -> str:
"""Process query with explicit literature preference from frontend."""
print(f"[Manager.process_single_query_with_preferences] Literature enabled: {literature_enabled}")
self.literature_enabled = literature_enabled
return self.process_single_query(user_query_text, conversation_history_from_r)
def set_user_context(self, user_id: str = None, hf_user_id: str = None):
"""Set user context for quota tracking and logging"""
self.current_user_id = user_id
self.current_hf_user_id = hf_user_id
print(f"[Manager] Set user context: user_id={user_id}, hf_user_id={hf_user_id}")
def _check_quota_before_processing(self) -> tuple:
"""
Check if user has sufficient quota before processing query
Returns: (has_quota: bool, remaining: int, error_message: str or None)
"""
if not self.supabase_client or not self.supabase_client.is_enabled():
return (True, 999999, None)
if not self.current_hf_user_id:
return (False, 0, "User not authenticated")
try:
has_quota, remaining, used = self.supabase_client.check_quota(self.current_hf_user_id)
if not has_quota:
error_msg = f"Token quota exceeded. Used: {used}, Remaining: {remaining}. Please contact support to increase your quota."
return (False, remaining, error_msg)
return (True, remaining, None)
except Exception as e:
print(f"[Manager] Error checking quota: {e}")
return (True, 999999, None) # Fail open
def _reset_token_tracking(self):
"""Reset token counters for new query"""
self.last_prompt_tokens = 0
self.last_completion_tokens = 0
self.last_total_tokens = 0
def _aggregate_token_usage(self, usage_dict: dict):
"""Aggregate token usage from agent responses"""
if usage_dict:
self.last_prompt_tokens += usage_dict.get('prompt_tokens', 0)
self.last_completion_tokens += usage_dict.get('completion_tokens', 0)
self.last_total_tokens += usage_dict.get('total_tokens', 0)
def process_single_query(self, user_query_text: str, conversation_history_from_r: list = None) -> str:
"""
Processes a single query, suitable for calling from an external system like R/Shiny.
Manages its own conversation history based on input.
Includes quota checking and comprehensive logging.
"""
print(f"[Manager.process_single_query] Received query: '{user_query_text[:100]}...'")
# Reset token tracking for new query
self._reset_token_tracking()
# Check quota BEFORE processing
has_quota, remaining, quota_error = self._check_quota_before_processing()
if not has_quota:
# Log the quota exceeded error
if self.supabase_client and self.supabase_client.is_enabled():
self.supabase_client.log_usage(
hf_user_id=self.current_hf_user_id,
user_id=self.current_user_id,
query_text=user_query_text,
error_message=quota_error,
conversation_history=conversation_history_from_r
)
return quota_error
if conversation_history_from_r is not None:
# Overwrite or extend self.conversation_history. For simplicity, let's overwrite.
# Ensure format matches: list of dicts like {"role": "user/assistant", "content": "..."}
self.conversation_history = [dict(turn) for turn in conversation_history_from_r] # Ensure dicts
# Add the current user query to the history for processing
self.conversation_history.append({"role": "user", "content": user_query_text})
# Initialize image tracking variables in case _process_turn fails
is_image_response = False
current_image_path = None
try:
# Process the query and get response with image information
response_text, is_image_response, current_image_path = self._process_turn(user_query_text)
# IMMEDIATE LOGGING TO SUPABASE AFTER SUCCESSFUL PROCESSING
if self.supabase_client and self.supabase_client.is_enabled():
self.supabase_client.log_usage(
hf_user_id=self.current_hf_user_id,
user_id=self.current_user_id,
query_text=user_query_text,
prompt_tokens=self.last_prompt_tokens,
completion_tokens=self.last_completion_tokens,
total_tokens=self.last_total_tokens,
model=self.model_name,
response_text=response_text,
error_message=None,
conversation_history=self.conversation_history,
is_image_response=is_image_response,
image_path=current_image_path
)
# Update user's token usage
if self.last_total_tokens > 0:
self.supabase_client.update_token_usage(self.current_hf_user_id, self.last_total_tokens)
print(f"[Manager] Updated token usage: +{self.last_total_tokens} tokens")
except Exception as e:
print(f"[Manager.process_single_query] Error in _process_turn: {str(e)}")
response_text = f"I encountered an error processing your request: {str(e)}"
is_image_response = False
current_image_path = None
# LOG ERROR TO SUPABASE
if self.supabase_client and self.supabase_client.is_enabled():
self.supabase_client.log_usage(
hf_user_id=self.current_hf_user_id,
user_id=self.current_user_id,
query_text=user_query_text,
prompt_tokens=self.last_prompt_tokens,
completion_tokens=self.last_completion_tokens,
total_tokens=self.last_total_tokens,
model=self.model_name,
error_message=str(e),
conversation_history=self.conversation_history
)
# If an image was processed, format the response to include image information
if is_image_response and current_image_path:
try:
# Format for R/Shiny to recognize this contains an image
# Ensure any nested quotes are properly escaped
clean_response = response_text.replace('"', '\\"')
image_info = {
"has_image": True,
"image_path": current_image_path,
"original_response": clean_response
}
# Create clean JSON without whitespace
image_info_json = json.dumps(image_info, ensure_ascii=False, separators=(',', ':'))
# Add the prefix
response_text = f"TAIJICHAT_IMAGE_RESPONSE: {image_info_json}"
print(f"[Manager.process_single_query] Created image response JSON: {image_info_json}")
except Exception as e:
print(f"[Manager.process_single_query] Error creating image response JSON: {e}")
# Fall back to original response
pass
# Add agent's response to history (optional if external system manages full history)
# For consistency, if _process_turn assumes self.conversation_history is updated,
# then it's good practice to let the Python side manage it fully or clearly delineate.
# Let's assume the external system (Shiny) will get this response and add it to *its* history.
# The Python side will receive the full history again next time.
# Trim history if it gets too long
MAX_HISTORY_TURNS_INTERNAL = 10
if len(self.conversation_history) > MAX_HISTORY_TURNS_INTERNAL * 2: # User + Assistant
self.conversation_history = self.conversation_history[-(MAX_HISTORY_TURNS_INTERNAL*2):]
return response_text
def start_interactive_session(self):
print("\nStarting interactive session with TaijiChat (Multi-Agent Architecture)...")
if not self.client or not self.generation_agent or not self.supervisor_agent:
# Executor might still be initializable if it has non-LLM functionalities,
# but core loop needs generation and supervision which depend on the client.
print("CRITICAL: OpenAI client or one or more essential LLM-dependent agents (Generation, Supervisor) are not available. Cannot start full session.")
if not self.executor_agent:
print("CRITICAL: Executor agent also not available.")
return
user_query = input("\nTaijiChat > How can I help you today? \nUser: ")
while user_query.lower() not in ["exit", "quit"]:
if not user_query.strip():
user_query = input("User: ")
continue
# Add user query to internal history
self.conversation_history.append({"role": "user", "content": user_query})
# Call the core processing method (note: this now handles conversation history internally)
agent_response_text, is_image_response, current_image_path = self._process_turn(user_query)
# Note: agent response is already added to conversation history in _process_with_literature_preferences
# Print agent's response to console
print(f"TaijiChat > {agent_response_text}")
# Ensure conversation history doesn't grow indefinitely
MAX_HISTORY_TURNS_TERMINAL = 10
if len(self.conversation_history) > MAX_HISTORY_TURNS_TERMINAL * 2:
self.conversation_history = self.conversation_history[-(MAX_HISTORY_TURNS_TERMINAL*2):]
user_query = input("\nUser: ")
print("Ending interactive session.")
@staticmethod
def force_reload_modules():
"""Force Python to reload our module files to ensure latest changes are used"""
try:
import importlib
import sys
# List of modules to reload
modules_to_reload = [
'agents.generation_agent',
'agents.supervisor_agent',
'agents.executor_agent',
'tools.agent_tools'
]
for module_name in modules_to_reload:
if module_name in sys.modules:
print(f"ManagerAgent: Force reloading module {module_name}")
importlib.reload(sys.modules[module_name])
print("ManagerAgent: Successfully reloaded all agent modules")
return True
except Exception as e:
print(f"ManagerAgent: Error reloading modules: {str(e)}")
return False
# ... (Potentially remove all old private methods from the previous Assistant-based ManagerAgent)
if __name__ == '__main__':
print("ManagerAgent is intended to be orchestrated by a main script (e.g., main.py). ")