Spaces:
Running
Running
| from enum import Enum, auto | |
| from typing import List | |
| from google import genai | |
| from google.genai import types | |
| from google.genai.types import * | |
| import os | |
| from dotenv import load_dotenv | |
| import sys | |
| from src.manager.agent_manager import AgentManager | |
| from src.manager.budget_manager import BudgetManager | |
| from src.manager.tool_manager import ToolManager | |
| from src.manager.utils.suppress_outputs import suppress_output | |
| import logging | |
| import gradio as gr | |
| from sentence_transformers import SentenceTransformer | |
| import torch | |
| from src.tools.default_tools.memory_manager import MemoryManager | |
| from pathlib import Path | |
| from google.genai.errors import APIError | |
| import backoff | |
| import mimetypes | |
| import json | |
| import traceback | |
| logger = logging.getLogger(__name__) | |
| handler = logging.StreamHandler(sys.stdout) | |
| # handler.setLevel(logging.DEBUG) | |
| logger.addHandler(handler) | |
| class Mode(Enum): | |
| ENABLE_AGENT_CREATION = auto() | |
| ENABLE_LOCAL_AGENTS = auto() | |
| ENABLE_CLOUD_AGENTS = auto() | |
| ENABLE_TOOL_CREATION = auto() | |
| ENABLE_TOOL_INVOCATION = auto() | |
| ENABLE_RESOURCE_BUDGET = auto() | |
| ENABLE_ECONOMY_BUDGET = auto() | |
| ENABLE_MEMORY = auto() | |
| def format_tool_response(response, indent=2): | |
| return json.dumps(response, indent=indent, ensure_ascii=False) | |
| class GeminiManager: | |
| def __init__(self, system_prompt_file="./src/models/system6.prompt", | |
| gemini_model="gemini-2.5-pro-exp-03-25", | |
| modes: List[Mode] = []): | |
| self.input_tokens = 0 | |
| self.output_tokens = 0 | |
| load_dotenv() | |
| self.budget_manager = BudgetManager() | |
| self.toolsLoader: ToolManager = ToolManager() | |
| self.agentManager: AgentManager = AgentManager() | |
| self.API_KEY = os.getenv("GEMINI_KEY") | |
| self.client = genai.Client(api_key=self.API_KEY) | |
| self.model_name = gemini_model | |
| self.memory_manager = MemoryManager() | |
| with open(system_prompt_file, 'r', encoding="utf8") as f: | |
| self.system_prompt = f.read() | |
| self.messages = [] | |
| self.set_modes(modes) | |
| self.safety_settings = [ | |
| { | |
| "category": "HARM_CATEGORY_HARASSMENT", | |
| "threshold": "BLOCK_NONE", | |
| }, | |
| { | |
| "category": "HARM_CATEGORY_HATE_SPEECH", | |
| "threshold": "BLOCK_NONE", | |
| }, | |
| { | |
| "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", | |
| "threshold": "BLOCK_NONE", | |
| }, | |
| { | |
| "category": "HARM_CATEGORY_DANGEROUS_CONTENT", | |
| "threshold": "BLOCK_NONE", | |
| }, | |
| ] | |
| def get_current_modes(self): | |
| return [mode.name for mode in self.modes] | |
| def set_modes(self, modes: List[Mode]): | |
| self.modes = modes | |
| self.budget_manager.set_resource_budget_status( | |
| self.check_mode(Mode.ENABLE_RESOURCE_BUDGET)) | |
| self.budget_manager.set_expense_budget_status( | |
| self.check_mode(Mode.ENABLE_ECONOMY_BUDGET)) | |
| self.toolsLoader.set_creation_mode( | |
| self.check_mode(Mode.ENABLE_TOOL_CREATION)) | |
| self.toolsLoader.set_invocation_mode( | |
| self.check_mode(Mode.ENABLE_TOOL_INVOCATION)) | |
| self.agentManager.set_creation_mode( | |
| self.check_mode(Mode.ENABLE_AGENT_CREATION)) | |
| self.agentManager.set_local_invocation_mode( | |
| self.check_mode(Mode.ENABLE_LOCAL_AGENTS)) | |
| self.agentManager.set_cloud_invocation_mode( | |
| self.check_mode(Mode.ENABLE_CLOUD_AGENTS)) | |
| def check_mode(self, mode: Mode): | |
| return mode in self.modes | |
| def generate_response(self, messages): | |
| tools = self.toolsLoader.getTools() | |
| response = self.client.models.count_tokens( | |
| model=self.model_name, | |
| contents=messages, | |
| ) | |
| self.budget_manager.add_to_expense_budget( | |
| response.total_tokens * 0.10/1000000 # Assuming $0.10 per million tokens | |
| ) | |
| self.input_tokens += response.total_tokens | |
| return self.client.models.generate_content_stream( | |
| model=self.model_name, | |
| contents=messages, | |
| config=types.GenerateContentConfig( | |
| system_instruction=self.system_prompt, | |
| temperature=0.2, | |
| tools=tools, | |
| safety_settings=self.safety_settings, | |
| ), | |
| ) | |
| def handle_tool_calls(self, function_calls): | |
| parts = [] | |
| i = 0 | |
| for function_call in function_calls: | |
| title = "" | |
| thinking = "" | |
| toolResponse = None | |
| logger.info( | |
| f"Function Name: {function_call.name}, Arguments: {function_call.args}") | |
| title = f"Invoking `{function_call.name}` with \n```json\n{format_tool_response(function_call.args)}\n```\n" | |
| yield { | |
| "role": "assistant", | |
| "content": thinking, | |
| "metadata": { | |
| "title": title, | |
| "id": i, | |
| "status": "pending", | |
| } | |
| } | |
| try: | |
| self.input_tokens += len(repr(function_call).split()) | |
| toolResponse = self.toolsLoader.runTool( | |
| function_call.name, function_call.args) | |
| except Exception as e: | |
| logger.warning(f"Error running tool: {e}") | |
| toolResponse = { | |
| "status": "error", | |
| "message": f"Tool `{function_call.name}` failed to run.", | |
| "output": str(e), | |
| } | |
| logger.debug(f"Tool Response: {toolResponse}") | |
| thinking += f"Tool responded with \n```json\n{format_tool_response(toolResponse)}\n```\n" | |
| yield { | |
| "role": "assistant", | |
| "content": thinking, | |
| "metadata": { | |
| "title": title, | |
| "id": i, | |
| "status": "done", | |
| } | |
| } | |
| tool_content = types.Part.from_function_response( | |
| name=function_call.name, | |
| response={"result": toolResponse}) | |
| try: | |
| if function_call.name == "ToolCreator" or function_call.name == "ToolDeletor": | |
| self.toolsLoader.load_tools() | |
| except Exception as e: | |
| logger.info( | |
| f"Error loading tools: {str(e)}. Deleting the tool.") | |
| yield { | |
| "role": "assistant", | |
| "content": f"Error loading tools: {str(e)}. Deleting the tool.\n", | |
| "metadata": { | |
| "title": "Trying to load the newly created tool", | |
| "id": i, | |
| "status": "done", | |
| } | |
| } | |
| # delete the created tool | |
| self.toolsLoader.delete_tool( | |
| toolResponse['output']['tool_name'], toolResponse['output']['tool_file_path']) | |
| tool_content = types.Part.from_function_response( | |
| name=function_call.name, | |
| response={"result": f"{function_call.name} with {function_call.args} doesn't follow the required format, please read the other tool implementations for reference." + str(e)}) | |
| parts.append(tool_content) | |
| i += 1 | |
| self.output_tokens += len(repr(parts).split()) | |
| yield { | |
| "role": "tool", | |
| "content": repr(types.Content( | |
| role='model' if self.model_name == "gemini-2.5-pro-exp-03-25" else 'tool', | |
| parts=parts | |
| )) | |
| } | |
| def format_chat_history(self, messages=[]): | |
| formatted_history = [] | |
| for message in messages: | |
| # Skip thinking messages (messages with metadata) | |
| if not ((message.get("role") == "assistant" and "metadata" in message | |
| and message["metadata"] is not None)): | |
| role = "model" | |
| match message.get("role"): | |
| case "user": | |
| role = "user" | |
| if isinstance(message["content"], tuple): | |
| path = message["content"][0] | |
| try: | |
| image_bytes = open(path, "rb").read() | |
| mime_type, _ = mimetypes.guess_type(path) | |
| parts = [ | |
| types.Part.from_bytes( | |
| data=image_bytes, | |
| mime_type=mime_type | |
| ), | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error uploading file: {e}") | |
| parts = [types.Part.from_text( | |
| text="Error uploading file: "+str(e))] | |
| formatted_history.append( | |
| types.Content( | |
| role=role, | |
| parts=parts | |
| )) | |
| continue | |
| else: | |
| parts = [types.Part.from_text( | |
| text=message.get("content", ""))] | |
| case "memories": | |
| role = "user" | |
| parts = [types.Part.from_text( | |
| text="Here are the relevant memories for the user's query: "+message.get("content", ""))] | |
| case "tool": | |
| role = "tool" | |
| formatted_history.append( | |
| eval(message.get("content", ""))) | |
| continue | |
| case "function_call": | |
| role = "model" | |
| formatted_history.append( | |
| eval(message.get("content", ""))) | |
| continue | |
| case _: | |
| role = "model" | |
| content = message.get("content", "") | |
| if content.strip() == "": | |
| print("Empty message received: ", message) | |
| continue | |
| parts = [types.Part.from_text( | |
| text=content)] | |
| formatted_history.append(types.Content( | |
| role=role, | |
| parts=parts | |
| )) | |
| return formatted_history | |
| def get_k_memories(self, query, k=5, threshold=0.0): | |
| raw_memories = MemoryManager().get_memories() | |
| memories = [] | |
| for i in range(len(raw_memories)): | |
| memories.append(raw_memories[i]['memory']) | |
| if len(memories) == 0: | |
| return [] | |
| top_k = min(k, len(memories)) | |
| # Semantic Retrieval with GPU | |
| if torch.cuda.is_available(): | |
| device = 'cuda' | |
| elif torch.backends.mps.is_available() and torch.backends.mps.is_built(): | |
| device = 'mps' | |
| else: | |
| device = 'cpu' | |
| model = SentenceTransformer('all-MiniLM-L6-v2', device=device) | |
| doc_embeddings = model.encode( | |
| memories, convert_to_tensor=True, device=device) | |
| query_embedding = model.encode( | |
| query, convert_to_tensor=True, device=device) | |
| similarity_scores = model.similarity( | |
| query_embedding, doc_embeddings)[0] | |
| scores, indices = torch.topk(similarity_scores, k=top_k) | |
| results = [] | |
| for score, idx in zip(scores, indices): | |
| if score >= threshold: | |
| results.append(raw_memories[idx.item()]) | |
| return results | |
| def run(self, messages): | |
| try: | |
| if self.check_mode(Mode.ENABLE_MEMORY) and len(messages) > 0: | |
| memories = self.get_k_memories( | |
| messages[-1]['content'], k=5, threshold=0.1) | |
| if len(memories) > 0: | |
| messages.append({ | |
| "role": "memories", | |
| "content": f"{memories}", | |
| }) | |
| messages.append({ | |
| "role": "assistant", | |
| "content": f"Memories: \n```json\n{format_tool_response(memories)}\n```\n", | |
| "metadata": {"title": "Memories"} | |
| }) | |
| yield messages | |
| except Exception as e: | |
| pass | |
| yield from self.invoke_manager(messages) | |
| print("Tokens used: Input: {}, Output: {}".format( | |
| self.input_tokens, self.output_tokens)) | |
| def invoke_manager(self, messages): | |
| chat_history = self.format_chat_history(messages) | |
| logger.debug(f"Chat history: {chat_history}") | |
| try: | |
| response_stream = self.generate_response(chat_history) | |
| full_text = "" # Accumulate the text from the stream | |
| function_calls = [] | |
| function_call_requests = [] | |
| for chunk in response_stream: | |
| if chunk.text: | |
| full_text += chunk.text | |
| if full_text.strip() != "": | |
| yield messages + [{ | |
| "role": "assistant", | |
| "content": full_text | |
| }] | |
| else: | |
| print("Empty chunk received") | |
| print(chunk) | |
| for candidate in chunk.candidates: | |
| if candidate.content and candidate.content.parts: | |
| has_function_call = False | |
| for part in candidate.content.parts: | |
| if part.function_call: | |
| has_function_call = True | |
| function_calls.append(part.function_call) | |
| if has_function_call: | |
| function_call_requests.append({ | |
| "role": "function_call", | |
| "content": repr(candidate.content), | |
| }) | |
| if full_text.strip() != "": | |
| messages.append({ | |
| "role": "assistant", | |
| "content": full_text, | |
| }) | |
| self.output_tokens += len(full_text.split()) | |
| self.budget_manager.add_to_expense_budget( | |
| len(full_text.split()) * 0.40/1000000 # Assuming $0.40 per million tokens | |
| ) | |
| if function_call_requests: | |
| messages = messages + function_call_requests | |
| yield messages | |
| except Exception as e: | |
| traceback.print_exc(file=sys.stdout) | |
| print(messages) | |
| print(chat_history) | |
| messages.append({ | |
| "role": "assistant", | |
| "content": f"Error generating response: {str(e)}", | |
| "metadata": { | |
| "title": "Error generating response", | |
| "id": 0, | |
| "status": "done" | |
| } | |
| }) | |
| logger.error(f"Error generating response{e}") | |
| yield messages | |
| return messages | |
| # Check if any text was received | |
| if len(full_text.strip()) == 0 and len(function_calls) == 0: | |
| messages.append({ | |
| "role": "assistant", | |
| "content": "No response from the model.", | |
| "metadata": {"title": "No response from the model."} | |
| }) | |
| if function_calls and len(function_calls) > 0: | |
| for call in self.handle_tool_calls(function_calls): | |
| yield messages + [call] | |
| if (call.get("role") == "tool" | |
| or (call.get("role") == "assistant" and call.get("metadata", {}).get("status") == "done")): | |
| messages.append(call) | |
| yield from self.invoke_manager(messages) | |
| else: | |
| yield messages | |