import asyncio import datetime import os import pprint from llama_index.core.agent.workflow.workflow_events import AgentInput, AgentOutput, AgentStream, ToolCall, ToolCallResult from llama_index.core.llms import ChatMessage # from llama_index.llms.ollama import Ollama from llama_index.core.agent.workflow import AgentWorkflow from llama_index.core.tools import FunctionTool from workflows.events import StopEvent, StartEvent, StepState, StepStateChanged from llama_index.tools.arxiv import ArxivToolSpec from llama_index.tools.wikipedia import WikipediaToolSpec from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI from llama_index.llms.nebius import NebiusLLM import tools """ TODO: 4. Run HF Space and submit answers to the scoring server. TODO: 5. Get a certificate for the course. """ class BasicAgent: def __init__(self, verbose: bool = False): self.verbose = verbose # llm = Ollama( # model="gpt-oss:20b", # request_timeout=120.0 # ) # llm = HuggingFaceInferenceAPI( # model_name="Qwen/Qwen2.5-Coder-32B-Instruct", # ) llm = NebiusLLM( api_key=os.getenv("NEBIUS_API_KEY"), model="Qwen/Qwen3-235B-A22B-Instruct-2507", api_base="https://api.tokenfactory.nebius.com/v1" ) self.workflow = AgentWorkflow.from_tools_or_functions( [ FunctionTool.from_defaults(tools.multiply), FunctionTool.from_defaults(tools.add), # FunctionTool.from_defaults(tools.transcribeAudio), # FunctionTool.from_defaults(tools.describeImage), FunctionTool.from_defaults(tools.directFetchTool), FunctionTool.from_defaults(tools.webSearchTool), *ArxivToolSpec().to_tool_list(), *WikipediaToolSpec().to_tool_list(), ], llm=llm, system_prompt=f"Today's date is {datetime.datetime.now().strftime('%Y-%m-%d')}." ) async def __call__(self, question: str) -> str: """Async call method that returns the final result without streaming""" print(f"Agent received question : {question}\n") if self.verbose: answer = await self.stream_answers(question) if answer is None: return '' return str(answer) answer = await self.workflow.run(user_msg=question) print(f"Agent returning answer: {answer}") if answer is None: return '' return str(answer) def call_sync(self, question: str) -> str: """Synchronous wrapper for __call__ method (for compatibility with sync code)""" return asyncio.run(self.__call__(question)) async def stream_answers(self, question: str) -> None: handler = self.workflow.run(user_msg=question, max_iterations=10) async for event in handler.stream_events(): if isinstance(event, AgentInput): # print(f"\nAgent input: {event.input}") pass elif isinstance(event, AgentOutput): # print(f"\nAgent output: {event}") pass elif isinstance(event, ToolCall): print(f"\n\tCalled tool: {event.tool_name} {event.tool_kwargs}") elif isinstance(event, ToolCallResult): print(f"\n\t{event.tool_name} {event.tool_kwargs} -> {str(event.tool_output)[200:]}") elif isinstance(event, StopEvent): return event.result elif isinstance(event, AgentStream): if event.delta: print(event.delta, end="", flush=True) elif event.thinking_delta: print(event.thinking_delta, end="", flush=True) else: pprint.pprint(f"Received event: {type(event)} {event}") if __name__ == "__main__": async def main(): from dotenv import load_dotenv load_dotenv() # agent = BasicAgent() llm = NebiusLLM( api_key=os.getenv("NEBIUS_API_KEY"), model="Qwen/Qwen3-235B-A22B-Instruct-2507", api_base="https://api.tokenfactory.nebius.com/v1" ) # result = llm.complete("Hello!") result = llm.chat([ChatMessage("Who are you?")]) print(result) # Example without streaming (using __call__) # print("\n=== Non-Streaming Example ===") # result = await agent("What is 5 + 7?") # print(f"Final result: {result}") # result = await agent.stream_answers("Who did the actor who played Ray in the Polish-language version of Everybody Loves Raymond play in Magda M.? Give only the first name.") # print(f"\nFinal result: {result}") asyncio.run(main())