File size: 7,575 Bytes
e47da65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
"""
File: web_app/module_agent_web_search.py
Description: Gradio module for the Agent Web Search functionality.
Author: Didier Guillevic
Date: 2025-10-20
"""

import gradio as gr

from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import google_search
from google.genai import types

import asyncio
import uuid

APP_NAME="google_search_agent"
SESSION_ID="1234"

model = "gemini-2.5-flash"

#
# ===== agent =====
#
root_agent = Agent(
    name="basic_search_agent",
    model=model,
    description=(
        "Agent to answer questions with the option to call Google Search "
        "if needed for up-to-date information."
    ),
    instruction=(
        "I can answer your questions from my own knowledge or by searching the "
        "web using Google Search. Just ask me anything!"
    ),
    # google_search: pre-built tool allows agent to perform Google searches.
    tools=[google_search]
)

#
# ===== Session and Runner =====
#
async def setup_session_and_runner(user_id: str):
    session_service = InMemorySessionService()
    session = await session_service.create_session(
        app_name=APP_NAME,
        user_id=user_id,
        session_id=SESSION_ID
    )
    runner = Runner(
        agent=root_agent,
        app_name=APP_NAME,
        session_service=session_service
    )
    return session, runner


#
# ===== Call Agent Asynchronously =====
#
async def call_agent_async(query: str, user_id: str):
    content = types.Content(role='user', parts=[types.Part(text=query)])
    session, runner = await setup_session_and_runner(user_id=user_id)
    events = runner.run_async(
        user_id=user_id,
        session_id=SESSION_ID,
        new_message=content
    )

    final_response = ""
    rendered_content = ""

    async for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text

            # Check if the event has grounding metadata and rendered content
            if (
                event.grounding_metadata and
                event.grounding_metadata.search_entry_point and
                event.grounding_metadata.search_entry_point.rendered_content
            ):
                rendered_content = event.grounding_metadata.search_entry_point.rendered_content
            else:
                rendered_content = None

    return final_response, rendered_content


#
# ===== Call Agent Asynchronously with Streaming =====
#
async def call_agent_streaming(query: str, user_id: str):
    content = types.Content(role='user', parts=[types.Part(text=query)])
    session, runner = await setup_session_and_runner(user_id=user_id)
    events = runner.run_async(
        user_id=user_id,
        session_id=SESSION_ID,
        new_message=content
    )

    accumulated_response = ""
    rendered_content = None # Initialize to None

    async for event in events:
        # Check for intermediate text parts to stream
        if event.content and event.content.parts and event.content.parts[0].text:
            # Accumulate and yield the new text
            new_text = event.content.parts[0].text
            accumulated_response += new_text
            yield accumulated_response, None, user_id # Yield the current text and empty grounding

        # When the final response event is received, capture the grounding content
        if event.is_final_response():
            # The final response text should already be in accumulated_response from earlier yields,
            # but we can ensure it's fully captured here.
            # accumulated_response = event.content.parts[0].text # The final text

            # Capture the rendered_content from grounding_metadata
            if (
                event.grounding_metadata and
                event.grounding_metadata.search_entry_point and
                event.grounding_metadata.search_entry_point.rendered_content
            ):
                rendered_content = event.grounding_metadata.search_entry_point.rendered_content
            
            # After the final response, yield one last time with the accumulated text AND the grounding content
            # This final yield updates the grounding block.
            yield accumulated_response, rendered_content, user_id 

    # If the grounding content wasn't in the final event (e.g., if no search was performed), 
    # make sure to yield the final accumulated text.
    if rendered_content is None:
        yield accumulated_response, None, user_id


#
# ===== User interface Block =====
#
def agent_web_search(query: str, user_id=None):
    """Calls a language model agent with Google Search tool to answer the query.

    Args:
        query (str): The user query.
        user_id (str, optional): The user ID for session management. If None, a new ID is generated. Defaults to None.

    Returns:
        tuple: A tuple containing the agent's response (str), rendered grounding content (str or None), and user_id (str).
    """
    if user_id is None:
        user_id = str(uuid.uuid4())  # Generate a unique user ID

    response, rendered_content = asyncio.run(call_agent_async(query, user_id))
    return response, rendered_content, user_id


async def agent_web_search_streaming(query: str, current_user_id: str | None):
    # If the user ID state is None (first run), generate a new one
    if current_user_id is None:
        user_id = str(uuid.uuid4())
    else:
        user_id = current_user_id

    # The user_id is passed as part of the yield from the generator
    # but we need to ensure the Gradio state is updated initially for the generator to use the correct ID.
    
    # Gradio handles the asynchronous generator return and streams the output to the UI.
    return call_agent_streaming(query, user_id)


with gr.Blocks() as demo:
    gr.Markdown(
        """
        **Agent with Google Search tool**: be patient :-) Currently looking into (async) streaming support...
        """
    )

    with gr.Row():
        input_text = gr.Textbox(
            lines=2,
            placeholder="Enter your query here...",
            label="Query",
            render=True
        )

    user_id = gr.State(None)

    with gr.Row():
        submit_button = gr.Button("Submit", variant="primary")
        clear_button = gr.Button("Clear", variant="secondary")
    
    with gr.Row():
        output_text = gr.Markdown(
            label="Agent Response",
            render=True
        )
    
    with gr.Row():
        grounding = gr.HTML(
            label="Grounding Content",
            render=True
        )
    
    with gr.Accordion("Examples", open=False):
        examples = gr.Examples(
            examples=[
                ["What is the prime number factorization of 15?",], # no need got Google Search
                ["Who won the Nobel Peace Prize in 2025?",],
                ["What is the weather like tomorrow in Montreal, Canada?",],
                ["What are the latest news about Graph Neural Networks?",],
            ],
            inputs=[input_text,],
            cache_examples=False,
            label="Click to use an example"
        )

    # ===== Button Actions =====
    submit_button.click(
        fn=agent_web_search,
        inputs=[input_text, user_id],
        outputs=[output_text, grounding, user_id]
    )
    clear_button.click(
        fn=lambda : ('', '', None),
        inputs=None,
        outputs=[input_text, output_text, grounding]
    )


if __name__ == "__main__":
    demo.launch(mcp_server=True)