import asyncio import logging import os from collections.abc import AsyncIterable from haystack.dataclasses import ChatMessage from haystack.components.agents import Agent from haystack.tools import ComponentTool from haystack_integrations.components.generators.nvidia import NvidiaChatGenerator from haystack_integrations.tools.mcp import MCPToolset, StreamableHttpServerInfo from app.helper import stream_to_chunks from app.models import TextChunk from app.queue import StreamingQueue logger = logging.getLogger(__name__) MODEL = os.environ.get("MODEL", "nvidia/nemotron-3-super-120b-a12b") _QUERY_ONLY_SCHEMA = { "type": "object", "properties": { "query": { "type": "string", "description": "The task or question for this agent.", } }, "required": ["query"], } def make_generator() -> NvidiaChatGenerator: return NvidiaChatGenerator( model=MODEL, generation_kwargs={ "stream": True, "max_tokens": 96_000, "temperature": 1.0, "top_p": 0.95, "extra_body": { "chat_template_kwargs": { "enable_thinking": True, "low_effort": True, }, }, }, ) async def run_agent(input: str) -> AsyncIterable[TextChunk]: """ Create and run a per-request agent, using the streaming queue to capture all the messages send by any of the subagents. """ _mcp_server = StreamableHttpServerInfo(url="http://127.0.0.1:3100/mcp") # The MCP server provides several tools useful to work with Google Drive google_drive_toolset = MCPToolset(server_info=_mcp_server) # Writer gets a narrower toolset: create/format/share only - no delete or rename of existing files writer_toolset = MCPToolset( server_info=_mcp_server, tool_names=[ "listFolder", "listSharedDrives", "createFolder", "createGoogleDoc", "insertText", "updateGoogleDoc", "formatGoogleDocText", "formatGoogleDocParagraph", "applyTextStyle", "applyParagraphStyle", "createParagraphBullets", "findAndReplaceInDoc", "insertTable", "readGoogleDoc", "getGoogleDocContent", "getDocumentInfo", "listGoogleDocs", "addPermission", "listPermissions", "shareFile", ], ) # Streaming queue receives the individual tokens sent by the model streaming_queue = StreamingQueue() # Planner agent comes with an actionable plan needed to create the report, # and can also make some adjustments in the plan, once we know more during # the process. It doesn't use any tools. planner = Agent( chat_generator=make_generator(), streaming_callback=streaming_queue.sync_callback, system_prompt=( "You plan research and writing for a conference assistant that answers any question about conference content. " "The user's question may be broad ('what were the main themes?') or specific ('summarise the talk on federated learning'). " "Adapt your plan to the question — do not assume a fixed report structure. " "\n\nAgent capabilities you must plan around:" "\n- researcher: has Google Drive access; can search, list, and read documents. " "Give it specific search queries and a list of data points to extract " "(model sizes, latency numbers, GitHub links, policy citations, speaker names, etc.). " "\n- writer: has Google Drive access to CREATE new documents in the 'Reports' folder and share them. " "It will save the report there and return a public link. " "Tell it to save the report to Drive. " "Your plan must tell the orchestrator to forward researcher output to the writer verbatim. " "\n\nFor each plan, output:" "\n1. What the user is actually asking (one sentence)." "\n2. Specific search queries for the researcher and the data points to extract." "\n3. Clear format instruction for the writer: tell it what style and structure fits this question " "(e.g. Twitter thread, one-paragraph answer, bullet list, talk-by-talk breakdown — whatever serves the question). " "Be concise." ), user_prompt="""{% message role="user" %}{{ query }}{% endmessage %}""", ) # Researcher searches Google Drive and extracts relevant content for a given theme researcher = Agent( chat_generator=make_generator(), streaming_callback=streaming_queue.sync_callback, system_prompt=( "You are a research assistant with access to Google Drive. " "Search Drive for documents matching the queries you are given, read them, " "and return structured findings. " "\n\nFor each document extract and report:" "\n- Talk title, speaker, organisation" "\n- Key points (≤5 bullets)" "\n- Quantitative data: model sizes, FLOPs, latency, accuracy, privacy guarantees" "\n- GitHub/GitLab URLs mentioned" "\n- External citations (policy docs, white-papers, standards)" "\n- Direct link to the source document" "\n\nDo not invent or assume facts not present in the documents. " "If a document is not accessible or irrelevant, say so and move on. " "Return all findings as structured text so the writer can use them directly. " "ALWAYS return all the files you found relevant as links to Google Drive, assuming they're public." ), user_prompt="""{% message role="user" %}{{ query }}{% endmessage %}""", # Allow the researcher to use all the Google Drive tools, # so it can find the relevant documents and process them tools=google_drive_toolset, ) # Writer agent uses the collected information to come up with a summary # that is concise and informative, but also links back to the relevant # documents, assuming they're all public. writer = Agent( chat_generator=make_generator(), streaming_callback=streaming_queue.sync_callback, system_prompt=( "You answer questions about conference content. " "You have NO internet access. " "You work exclusively from the text passed to you — the research plan and researcher findings. " "Do not invent facts, links, or data not present in the input. " "If a piece of information is missing, say so explicitly rather than guessing. " "\n\nTone: write like a Twitter thread — casual, punchy, short sentences, one idea per beat. " "Lead with the most interesting thing. Make engineers go 'wow, I didn't know that.' " "No corporate speak. No filler. No rigid section headers unless the planner explicitly asked for them. " "\n\nFormat: follow the format the planner specified for this question. " "It may be a thread, a one-paragraph answer, a bullet list, a talk-by-talk breakdown — whatever fits. " "If the planner did not specify a format, default to a short Twitter-thread-style answer. " "\n\nAlways end with a sources line: for each source include the talk/doc name and the real link from the researcher output. " "Use real links from the researcher output only — never invent URLs." "\n\nAfter composing your answer, save it to Google Drive:" "\n1. Use listFolder to find the folder named 'Reports' (search in the root or My Drive). " "If it does not exist, create it with createFolder." "\n2. Use createGoogleDoc to create a new document inside 'Reports'. " "Give it a short descriptive title derived from the user's question." "\n3. Use insertText or updateGoogleDoc to fill the document with your answer." "\n4. Use addPermission with type='anyone' and role='reader' to make it publicly readable." "\n5. Use getDocumentInfo to retrieve the document URL." "\n6. Append this line at the very end of your text response: " "'📄 Report saved: ' — use the real URL from step 5, never invent one." ), user_prompt="""{% message role="user" %}{{ query }}{% endmessage %}""", tools=writer_toolset, ) # The main agent orchestrates the rest of them, so they can focus # on the individual tasks, without bloating the context of a single agent # with all the details. orchestrator = Agent( chat_generator=make_generator(), system_prompt=( "You answer questions about conference content by coordinating a planner, researcher, and writer. " "The user may ask anything: themes, specific talks, speakers, tools, comparisons, summaries — any question is valid. " "Delegate all work to your tools. Do not answer the question yourself. " "When the writer tool returns its output, copy it word-for-word as your final response. " "No intro, no outro, no paraphrasing — the writer's text is the answer." ), # Haystack agent is a component, so we can pass it like so. # The orchestrator can only delegate tasks and plan the work # items, but do not communicate with the external services, # like Google Drive, directly. tools=[ ComponentTool( planner, name="planner", description=( "Plan how to research and answer the user's question. " "Returns search queries for the researcher and format instructions for the writer. " ), parameters=_QUERY_ONLY_SCHEMA, outputs_to_string={"source": "last_message"}, ), ComponentTool( researcher, name="researcher", description=( "Search Google Drive for documents relevant to a given theme or topic, " "read their contents, and return a summary of the key findings. " "Use this tool whenever you need to gather information from Drive." ), parameters=_QUERY_ONLY_SCHEMA, outputs_to_string={"source": "last_message"}, ), ComponentTool( writer, name="writer", description=( "Write the final answer from a research plan and collected notes. " "Call this after planning and research are complete." ), parameters=_QUERY_ONLY_SCHEMA, outputs_to_string={"source": "last_message"}, ), ], ) # Build the input message to the orchestrator agent messages = [ChatMessage.from_user(f"User question: {input}")] # Finally, run the orchestrator and transfer the streamed messages back to # the user, so they might be seen on their screen. async def run() -> None: async with streaming_queue: await orchestrator.run_async( messages=messages, streaming_callback=streaming_queue.callback ) asyncio.create_task(run()) async for text_chunk in stream_to_chunks(streaming_queue): yield text_chunk