File size: 4,015 Bytes
18ba912
7b5ca41
18ba912
 
 
c75ced2
18ba912
7b5ca41
 
 
18ba912
 
 
 
 
 
 
 
7b5ca41
18ba912
7b5ca41
18ba912
7b5ca41
 
 
 
 
 
c75ced2
7b5ca41
c75ced2
18ba912
 
 
 
7b5ca41
 
 
 
 
 
c75ced2
7b5ca41
 
 
 
 
 
 
 
 
 
c75ced2
7b5ca41
 
 
 
 
 
 
 
 
c75ced2
7b5ca41
 
 
 
 
 
c75ced2
7b5ca41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18ba912
7b5ca41
 
 
18ba912
7b5ca41
18ba912
 
 
7b5ca41
18ba912
 
 
 
 
 
 
 
 
7b5ca41
18ba912
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""
Agent loop using the Gemini API via the custom GeminiClient (google-genai SDK).
Receives user input, calls tools as needed, and returns results.
"""

import json
import logging
from .config import LOG_LEVEL
from .tools import get_tool_schemas, execute_tool, TOOLS
from .gemini_client import gemini_client

logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)

SYSTEM_PROMPT = """You are an intelligent AI assistant.
You can use the provided tools to complete tasks.
Think step by step and use tools when necessary."""

def run_agent_loop(user_input: str, max_turns: int = 10) -> str:
    """
    Run the agent loop using Gemini: send message -> receive response -> call tool -> repeat.
    """
    system_instruction = SYSTEM_PROMPT
    
    # Tool definitions for google-genai
    gemini_tools = [v["fn"] for v in TOOLS.values()]
    
    # History for chat session
    messages = [
        {"role": "user", "parts": [{"text": user_input}]}
    ]

    for turn in range(max_turns):
        logger.info(f"Turn {turn + 1}/{max_turns}")

        try:
            response = gemini_client.chat(
                messages=messages,
                system_instruction=system_instruction,
                tools=gemini_tools
            )
            
            # Extract content
            candidate = response.candidates[0]
            model_content = candidate.content
            
            # Check for function calls
            function_calls = []
            if model_content.parts:
                for part in model_content.parts:
                    if part.function_call:
                        function_calls.append(part.function_call)
            
            # If no function calls, return the text
            if not function_calls:
                return response.text or "No response text found."

            # Handle tool calls
            logger.info(f"Agent requested {len(function_calls)} tool calls.")
            
            # Add model's response to history
            # Convert Content object to dict for consistency in our messages list
            messages.append({
                "role": "model",
                "parts": [
                    {"function_call": {"name": p.function_call.name, "args": {k: v for k, v in p.function_call.args.items()}}}
                    if p.function_call else {"text": p.text}
                    for p in model_content.parts
                ]
            })
            
            # Prepare tool results
            tool_parts = []
            for fc in function_calls:
                logger.info(f"Calling tool: {fc.name}({fc.args})")
                
                args = {k: v for k, v in fc.args.items()}
                result = execute_tool(fc.name, args)
                logger.info(f"Result: {str(result)[:200]}")
                
                tool_parts.append({
                    "function_response": {
                        "name": fc.name,
                        "response": {"result": str(result)}
                    }
                })
            
            # Add tool results as a user turn
            messages.append({"role": "user", "parts": tool_parts})

        except Exception as e:
            logger.error(f"Error in agent loop: {e}")
            return f"Error: {e}"

    return "Agent reached the maximum number of processing turns."

def main():
    """Interactive loop - enter a prompt and receive results."""
    print("Agentic App (Powered by Gemini) - type 'quit' to exit")
    print("-" * 50)

    while True:
        user_input = input("\nYou: ").strip()
        if not user_input or user_input.lower() in ("quit", "exit", "q"):
            print("Bye!")
            break

        try:
            response = run_agent_loop(user_input)
            print(f"\nAgent: {response}")
        except Exception as e:
            logger.error(f"Error: {e}")
            print(f"\nError: {e}")

if __name__ == "__main__":
    main()