| """ |
| Agent loop using the Gemini API via the custom GeminiClient (google-genai SDK). |
| Receives user input, calls tools as needed, and returns results. |
| """ |
|
|
| import json |
| import logging |
| from .config import LOG_LEVEL |
| from .tools import get_tool_schemas, execute_tool, TOOLS |
| from .gemini_client import gemini_client |
|
|
| logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s [%(levelname)s] %(message)s") |
| logger = logging.getLogger(__name__) |
|
|
| SYSTEM_PROMPT = """You are an intelligent AI assistant. |
| You can use the provided tools to complete tasks. |
| Think step by step and use tools when necessary.""" |
|
|
| def run_agent_loop(user_input: str, max_turns: int = 10) -> str: |
| """ |
| Run the agent loop using Gemini: send message -> receive response -> call tool -> repeat. |
| """ |
| system_instruction = SYSTEM_PROMPT |
| |
| |
| gemini_tools = [v["fn"] for v in TOOLS.values()] |
| |
| |
| messages = [ |
| {"role": "user", "parts": [{"text": user_input}]} |
| ] |
|
|
| for turn in range(max_turns): |
| logger.info(f"Turn {turn + 1}/{max_turns}") |
|
|
| try: |
| response = gemini_client.chat( |
| messages=messages, |
| system_instruction=system_instruction, |
| tools=gemini_tools |
| ) |
| |
| |
| candidate = response.candidates[0] |
| model_content = candidate.content |
| |
| |
| function_calls = [] |
| if model_content.parts: |
| for part in model_content.parts: |
| if part.function_call: |
| function_calls.append(part.function_call) |
| |
| |
| if not function_calls: |
| return response.text or "No response text found." |
|
|
| |
| logger.info(f"Agent requested {len(function_calls)} tool calls.") |
| |
| |
| |
| messages.append({ |
| "role": "model", |
| "parts": [ |
| {"function_call": {"name": p.function_call.name, "args": {k: v for k, v in p.function_call.args.items()}}} |
| if p.function_call else {"text": p.text} |
| for p in model_content.parts |
| ] |
| }) |
| |
| |
| tool_parts = [] |
| for fc in function_calls: |
| logger.info(f"Calling tool: {fc.name}({fc.args})") |
| |
| args = {k: v for k, v in fc.args.items()} |
| result = execute_tool(fc.name, args) |
| logger.info(f"Result: {str(result)[:200]}") |
| |
| tool_parts.append({ |
| "function_response": { |
| "name": fc.name, |
| "response": {"result": str(result)} |
| } |
| }) |
| |
| |
| messages.append({"role": "user", "parts": tool_parts}) |
|
|
| except Exception as e: |
| logger.error(f"Error in agent loop: {e}") |
| return f"Error: {e}" |
|
|
| return "Agent reached the maximum number of processing turns." |
|
|
| def main(): |
| """Interactive loop - enter a prompt and receive results.""" |
| print("Agentic App (Powered by Gemini) - type 'quit' to exit") |
| print("-" * 50) |
|
|
| while True: |
| user_input = input("\nYou: ").strip() |
| if not user_input or user_input.lower() in ("quit", "exit", "q"): |
| print("Bye!") |
| break |
|
|
| try: |
| response = run_agent_loop(user_input) |
| print(f"\nAgent: {response}") |
| except Exception as e: |
| logger.error(f"Error: {e}") |
| print(f"\nError: {e}") |
|
|
| if __name__ == "__main__": |
| main() |
|
|