Kacper Łukawski
Allow overriding the model with env var
e1233ed
import asyncio
import logging
import os
from collections.abc import AsyncIterable
from haystack.dataclasses import ChatMessage
from haystack.components.agents import Agent
from haystack.tools import ComponentTool
from haystack_integrations.components.generators.nvidia import NvidiaChatGenerator
from haystack_integrations.tools.mcp import MCPToolset, StreamableHttpServerInfo
from app.helper import stream_to_chunks
from app.models import TextChunk
from app.queue import StreamingQueue
logger = logging.getLogger(__name__)
MODEL = os.environ.get("MODEL", "nvidia/nemotron-3-super-120b-a12b")
_QUERY_ONLY_SCHEMA = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The task or question for this agent.",
}
},
"required": ["query"],
}
def make_generator() -> NvidiaChatGenerator:
return NvidiaChatGenerator(
model=MODEL,
generation_kwargs={
"stream": True,
"max_tokens": 96_000,
"temperature": 1.0,
"top_p": 0.95,
"extra_body": {
"chat_template_kwargs": {
"enable_thinking": True,
"low_effort": True,
},
},
},
)
async def run_agent(input: str) -> AsyncIterable[TextChunk]:
"""
Create and run a per-request agent, using the streaming queue to capture
all the messages send by any of the subagents.
"""
_mcp_server = StreamableHttpServerInfo(url="http://127.0.0.1:3100/mcp")
# The MCP server provides several tools useful to work with Google Drive
google_drive_toolset = MCPToolset(server_info=_mcp_server)
# Writer gets a narrower toolset: create/format/share only - no delete or rename of existing files
writer_toolset = MCPToolset(
server_info=_mcp_server,
tool_names=[
"listFolder", "listSharedDrives",
"createFolder", "createGoogleDoc",
"insertText", "updateGoogleDoc",
"formatGoogleDocText", "formatGoogleDocParagraph",
"applyTextStyle", "applyParagraphStyle",
"createParagraphBullets", "findAndReplaceInDoc", "insertTable",
"readGoogleDoc", "getGoogleDocContent", "getDocumentInfo", "listGoogleDocs",
"addPermission", "listPermissions", "shareFile",
],
)
# Streaming queue receives the individual tokens sent by the model
streaming_queue = StreamingQueue()
# Planner agent comes with an actionable plan needed to create the report,
# and can also make some adjustments in the plan, once we know more during
# the process. It doesn't use any tools.
planner = Agent(
chat_generator=make_generator(),
streaming_callback=streaming_queue.sync_callback,
system_prompt=(
"You plan research and writing for a conference assistant that answers any question about conference content. "
"The user's question may be broad ('what were the main themes?') or specific ('summarise the talk on federated learning'). "
"Adapt your plan to the question β€” do not assume a fixed report structure. "
"\n\nAgent capabilities you must plan around:"
"\n- researcher: has Google Drive access; can search, list, and read documents. "
"Give it specific search queries and a list of data points to extract "
"(model sizes, latency numbers, GitHub links, policy citations, speaker names, etc.). "
"\n- writer: has Google Drive access to CREATE new documents in the 'Reports' folder and share them. "
"It will save the report there and return a public link. "
"Tell it to save the report to Drive. "
"Your plan must tell the orchestrator to forward researcher output to the writer verbatim. "
"\n\nFor each plan, output:"
"\n1. What the user is actually asking (one sentence)."
"\n2. Specific search queries for the researcher and the data points to extract."
"\n3. Clear format instruction for the writer: tell it what style and structure fits this question "
"(e.g. Twitter thread, one-paragraph answer, bullet list, talk-by-talk breakdown β€” whatever serves the question). "
"Be concise."
),
user_prompt="""{% message role="user" %}{{ query }}{% endmessage %}""",
)
# Researcher searches Google Drive and extracts relevant content for a given theme
researcher = Agent(
chat_generator=make_generator(),
streaming_callback=streaming_queue.sync_callback,
system_prompt=(
"You are a research assistant with access to Google Drive. "
"Search Drive for documents matching the queries you are given, read them, "
"and return structured findings. "
"\n\nFor each document extract and report:"
"\n- Talk title, speaker, organisation"
"\n- Key points (≀5 bullets)"
"\n- Quantitative data: model sizes, FLOPs, latency, accuracy, privacy guarantees"
"\n- GitHub/GitLab URLs mentioned"
"\n- External citations (policy docs, white-papers, standards)"
"\n- Direct link to the source document"
"\n\nDo not invent or assume facts not present in the documents. "
"If a document is not accessible or irrelevant, say so and move on. "
"Return all findings as structured text so the writer can use them directly. "
"ALWAYS return all the files you found relevant as links to Google Drive, assuming they're public."
),
user_prompt="""{% message role="user" %}{{ query }}{% endmessage %}""",
# Allow the researcher to use all the Google Drive tools,
# so it can find the relevant documents and process them
tools=google_drive_toolset,
)
# Writer agent uses the collected information to come up with a summary
# that is concise and informative, but also links back to the relevant
# documents, assuming they're all public.
writer = Agent(
chat_generator=make_generator(),
streaming_callback=streaming_queue.sync_callback,
system_prompt=(
"You answer questions about conference content. "
"You have NO internet access. "
"You work exclusively from the text passed to you β€” the research plan and researcher findings. "
"Do not invent facts, links, or data not present in the input. "
"If a piece of information is missing, say so explicitly rather than guessing. "
"\n\nTone: write like a Twitter thread β€” casual, punchy, short sentences, one idea per beat. "
"Lead with the most interesting thing. Make engineers go 'wow, I didn't know that.' "
"No corporate speak. No filler. No rigid section headers unless the planner explicitly asked for them. "
"\n\nFormat: follow the format the planner specified for this question. "
"It may be a thread, a one-paragraph answer, a bullet list, a talk-by-talk breakdown β€” whatever fits. "
"If the planner did not specify a format, default to a short Twitter-thread-style answer. "
"\n\nAlways end with a sources line: for each source include the talk/doc name and the real link from the researcher output. "
"Use real links from the researcher output only β€” never invent URLs."
"\n\nAfter composing your answer, save it to Google Drive:"
"\n1. Use listFolder to find the folder named 'Reports' (search in the root or My Drive). "
"If it does not exist, create it with createFolder."
"\n2. Use createGoogleDoc to create a new document inside 'Reports'. "
"Give it a short descriptive title derived from the user's question."
"\n3. Use insertText or updateGoogleDoc to fill the document with your answer."
"\n4. Use addPermission with type='anyone' and role='reader' to make it publicly readable."
"\n5. Use getDocumentInfo to retrieve the document URL."
"\n6. Append this line at the very end of your text response: "
"'πŸ“„ Report saved: <url>' β€” use the real URL from step 5, never invent one."
),
user_prompt="""{% message role="user" %}{{ query }}{% endmessage %}""",
tools=writer_toolset,
)
# The main agent orchestrates the rest of them, so they can focus
# on the individual tasks, without bloating the context of a single agent
# with all the details.
orchestrator = Agent(
chat_generator=make_generator(),
system_prompt=(
"You answer questions about conference content by coordinating a planner, researcher, and writer. "
"The user may ask anything: themes, specific talks, speakers, tools, comparisons, summaries β€” any question is valid. "
"Delegate all work to your tools. Do not answer the question yourself. "
"When the writer tool returns its output, copy it word-for-word as your final response. "
"No intro, no outro, no paraphrasing β€” the writer's text is the answer."
),
# Haystack agent is a component, so we can pass it like so.
# The orchestrator can only delegate tasks and plan the work
# items, but do not communicate with the external services,
# like Google Drive, directly.
tools=[
ComponentTool(
planner,
name="planner",
description=(
"Plan how to research and answer the user's question. "
"Returns search queries for the researcher and format instructions for the writer. "
),
parameters=_QUERY_ONLY_SCHEMA,
outputs_to_string={"source": "last_message"},
),
ComponentTool(
researcher,
name="researcher",
description=(
"Search Google Drive for documents relevant to a given theme or topic, "
"read their contents, and return a summary of the key findings. "
"Use this tool whenever you need to gather information from Drive."
),
parameters=_QUERY_ONLY_SCHEMA,
outputs_to_string={"source": "last_message"},
),
ComponentTool(
writer,
name="writer",
description=(
"Write the final answer from a research plan and collected notes. "
"Call this after planning and research are complete."
),
parameters=_QUERY_ONLY_SCHEMA,
outputs_to_string={"source": "last_message"},
),
],
)
# Build the input message to the orchestrator agent
messages = [ChatMessage.from_user(f"User question: {input}")]
# Finally, run the orchestrator and transfer the streamed messages back to
# the user, so they might be seen on their screen.
async def run() -> None:
async with streaming_queue:
await orchestrator.run_async(
messages=messages,
streaming_callback=streaming_queue.callback
)
asyncio.create_task(run())
async for text_chunk in stream_to_chunks(streaming_queue):
yield text_chunk