Spaces:
Sleeping
Sleeping
| import asyncio | |
| import logging | |
| import os | |
| from collections.abc import AsyncIterable | |
| from haystack.dataclasses import ChatMessage | |
| from haystack.components.agents import Agent | |
| from haystack.tools import ComponentTool | |
| from haystack_integrations.components.generators.nvidia import NvidiaChatGenerator | |
| from haystack_integrations.tools.mcp import MCPToolset, StreamableHttpServerInfo | |
| from app.helper import stream_to_chunks | |
| from app.models import TextChunk | |
| from app.queue import StreamingQueue | |
| logger = logging.getLogger(__name__) | |
| MODEL = os.environ.get("MODEL", "nvidia/nemotron-3-super-120b-a12b") | |
| _QUERY_ONLY_SCHEMA = { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "The task or question for this agent.", | |
| } | |
| }, | |
| "required": ["query"], | |
| } | |
| def make_generator() -> NvidiaChatGenerator: | |
| return NvidiaChatGenerator( | |
| model=MODEL, | |
| generation_kwargs={ | |
| "stream": True, | |
| "max_tokens": 96_000, | |
| "temperature": 1.0, | |
| "top_p": 0.95, | |
| "extra_body": { | |
| "chat_template_kwargs": { | |
| "enable_thinking": True, | |
| "low_effort": True, | |
| }, | |
| }, | |
| }, | |
| ) | |
| async def run_agent(input: str) -> AsyncIterable[TextChunk]: | |
| """ | |
| Create and run a per-request agent, using the streaming queue to capture | |
| all the messages send by any of the subagents. | |
| """ | |
| _mcp_server = StreamableHttpServerInfo(url="http://127.0.0.1:3100/mcp") | |
| # The MCP server provides several tools useful to work with Google Drive | |
| google_drive_toolset = MCPToolset(server_info=_mcp_server) | |
| # Writer gets a narrower toolset: create/format/share only - no delete or rename of existing files | |
| writer_toolset = MCPToolset( | |
| server_info=_mcp_server, | |
| tool_names=[ | |
| "listFolder", "listSharedDrives", | |
| "createFolder", "createGoogleDoc", | |
| "insertText", "updateGoogleDoc", | |
| "formatGoogleDocText", "formatGoogleDocParagraph", | |
| "applyTextStyle", "applyParagraphStyle", | |
| "createParagraphBullets", "findAndReplaceInDoc", "insertTable", | |
| "readGoogleDoc", "getGoogleDocContent", "getDocumentInfo", "listGoogleDocs", | |
| "addPermission", "listPermissions", "shareFile", | |
| ], | |
| ) | |
| # Streaming queue receives the individual tokens sent by the model | |
| streaming_queue = StreamingQueue() | |
| # Planner agent comes with an actionable plan needed to create the report, | |
| # and can also make some adjustments in the plan, once we know more during | |
| # the process. It doesn't use any tools. | |
| planner = Agent( | |
| chat_generator=make_generator(), | |
| streaming_callback=streaming_queue.sync_callback, | |
| system_prompt=( | |
| "You plan research and writing for a conference assistant that answers any question about conference content. " | |
| "The user's question may be broad ('what were the main themes?') or specific ('summarise the talk on federated learning'). " | |
| "Adapt your plan to the question β do not assume a fixed report structure. " | |
| "\n\nAgent capabilities you must plan around:" | |
| "\n- researcher: has Google Drive access; can search, list, and read documents. " | |
| "Give it specific search queries and a list of data points to extract " | |
| "(model sizes, latency numbers, GitHub links, policy citations, speaker names, etc.). " | |
| "\n- writer: has Google Drive access to CREATE new documents in the 'Reports' folder and share them. " | |
| "It will save the report there and return a public link. " | |
| "Tell it to save the report to Drive. " | |
| "Your plan must tell the orchestrator to forward researcher output to the writer verbatim. " | |
| "\n\nFor each plan, output:" | |
| "\n1. What the user is actually asking (one sentence)." | |
| "\n2. Specific search queries for the researcher and the data points to extract." | |
| "\n3. Clear format instruction for the writer: tell it what style and structure fits this question " | |
| "(e.g. Twitter thread, one-paragraph answer, bullet list, talk-by-talk breakdown β whatever serves the question). " | |
| "Be concise." | |
| ), | |
| user_prompt="""{% message role="user" %}{{ query }}{% endmessage %}""", | |
| ) | |
| # Researcher searches Google Drive and extracts relevant content for a given theme | |
| researcher = Agent( | |
| chat_generator=make_generator(), | |
| streaming_callback=streaming_queue.sync_callback, | |
| system_prompt=( | |
| "You are a research assistant with access to Google Drive. " | |
| "Search Drive for documents matching the queries you are given, read them, " | |
| "and return structured findings. " | |
| "\n\nFor each document extract and report:" | |
| "\n- Talk title, speaker, organisation" | |
| "\n- Key points (β€5 bullets)" | |
| "\n- Quantitative data: model sizes, FLOPs, latency, accuracy, privacy guarantees" | |
| "\n- GitHub/GitLab URLs mentioned" | |
| "\n- External citations (policy docs, white-papers, standards)" | |
| "\n- Direct link to the source document" | |
| "\n\nDo not invent or assume facts not present in the documents. " | |
| "If a document is not accessible or irrelevant, say so and move on. " | |
| "Return all findings as structured text so the writer can use them directly. " | |
| "ALWAYS return all the files you found relevant as links to Google Drive, assuming they're public." | |
| ), | |
| user_prompt="""{% message role="user" %}{{ query }}{% endmessage %}""", | |
| # Allow the researcher to use all the Google Drive tools, | |
| # so it can find the relevant documents and process them | |
| tools=google_drive_toolset, | |
| ) | |
| # Writer agent uses the collected information to come up with a summary | |
| # that is concise and informative, but also links back to the relevant | |
| # documents, assuming they're all public. | |
| writer = Agent( | |
| chat_generator=make_generator(), | |
| streaming_callback=streaming_queue.sync_callback, | |
| system_prompt=( | |
| "You answer questions about conference content. " | |
| "You have NO internet access. " | |
| "You work exclusively from the text passed to you β the research plan and researcher findings. " | |
| "Do not invent facts, links, or data not present in the input. " | |
| "If a piece of information is missing, say so explicitly rather than guessing. " | |
| "\n\nTone: write like a Twitter thread β casual, punchy, short sentences, one idea per beat. " | |
| "Lead with the most interesting thing. Make engineers go 'wow, I didn't know that.' " | |
| "No corporate speak. No filler. No rigid section headers unless the planner explicitly asked for them. " | |
| "\n\nFormat: follow the format the planner specified for this question. " | |
| "It may be a thread, a one-paragraph answer, a bullet list, a talk-by-talk breakdown β whatever fits. " | |
| "If the planner did not specify a format, default to a short Twitter-thread-style answer. " | |
| "\n\nAlways end with a sources line: for each source include the talk/doc name and the real link from the researcher output. " | |
| "Use real links from the researcher output only β never invent URLs." | |
| "\n\nAfter composing your answer, save it to Google Drive:" | |
| "\n1. Use listFolder to find the folder named 'Reports' (search in the root or My Drive). " | |
| "If it does not exist, create it with createFolder." | |
| "\n2. Use createGoogleDoc to create a new document inside 'Reports'. " | |
| "Give it a short descriptive title derived from the user's question." | |
| "\n3. Use insertText or updateGoogleDoc to fill the document with your answer." | |
| "\n4. Use addPermission with type='anyone' and role='reader' to make it publicly readable." | |
| "\n5. Use getDocumentInfo to retrieve the document URL." | |
| "\n6. Append this line at the very end of your text response: " | |
| "'π Report saved: <url>' β use the real URL from step 5, never invent one." | |
| ), | |
| user_prompt="""{% message role="user" %}{{ query }}{% endmessage %}""", | |
| tools=writer_toolset, | |
| ) | |
| # The main agent orchestrates the rest of them, so they can focus | |
| # on the individual tasks, without bloating the context of a single agent | |
| # with all the details. | |
| orchestrator = Agent( | |
| chat_generator=make_generator(), | |
| system_prompt=( | |
| "You answer questions about conference content by coordinating a planner, researcher, and writer. " | |
| "The user may ask anything: themes, specific talks, speakers, tools, comparisons, summaries β any question is valid. " | |
| "Delegate all work to your tools. Do not answer the question yourself. " | |
| "When the writer tool returns its output, copy it word-for-word as your final response. " | |
| "No intro, no outro, no paraphrasing β the writer's text is the answer." | |
| ), | |
| # Haystack agent is a component, so we can pass it like so. | |
| # The orchestrator can only delegate tasks and plan the work | |
| # items, but do not communicate with the external services, | |
| # like Google Drive, directly. | |
| tools=[ | |
| ComponentTool( | |
| planner, | |
| name="planner", | |
| description=( | |
| "Plan how to research and answer the user's question. " | |
| "Returns search queries for the researcher and format instructions for the writer. " | |
| ), | |
| parameters=_QUERY_ONLY_SCHEMA, | |
| outputs_to_string={"source": "last_message"}, | |
| ), | |
| ComponentTool( | |
| researcher, | |
| name="researcher", | |
| description=( | |
| "Search Google Drive for documents relevant to a given theme or topic, " | |
| "read their contents, and return a summary of the key findings. " | |
| "Use this tool whenever you need to gather information from Drive." | |
| ), | |
| parameters=_QUERY_ONLY_SCHEMA, | |
| outputs_to_string={"source": "last_message"}, | |
| ), | |
| ComponentTool( | |
| writer, | |
| name="writer", | |
| description=( | |
| "Write the final answer from a research plan and collected notes. " | |
| "Call this after planning and research are complete." | |
| ), | |
| parameters=_QUERY_ONLY_SCHEMA, | |
| outputs_to_string={"source": "last_message"}, | |
| ), | |
| ], | |
| ) | |
| # Build the input message to the orchestrator agent | |
| messages = [ChatMessage.from_user(f"User question: {input}")] | |
| # Finally, run the orchestrator and transfer the streamed messages back to | |
| # the user, so they might be seen on their screen. | |
| async def run() -> None: | |
| async with streaming_queue: | |
| await orchestrator.run_async( | |
| messages=messages, | |
| streaming_callback=streaming_queue.callback | |
| ) | |
| asyncio.create_task(run()) | |
| async for text_chunk in stream_to_chunks(streaming_queue): | |
| yield text_chunk |