| import asyncio |
| import datetime |
| import os |
| import pprint |
| from llama_index.core.agent.workflow.workflow_events import AgentInput, AgentOutput, AgentStream, ToolCall, ToolCallResult |
| from llama_index.core.llms import ChatMessage |
| |
| from llama_index.core.agent.workflow import AgentWorkflow |
| from llama_index.core.tools import FunctionTool |
| from workflows.events import StopEvent, StartEvent, StepState, StepStateChanged |
| from llama_index.tools.arxiv import ArxivToolSpec |
| from llama_index.tools.wikipedia import WikipediaToolSpec |
| from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI |
| from llama_index.llms.nebius import NebiusLLM |
|
|
| import tools |
|
|
| """ |
| TODO: 4. Run HF Space and submit answers to the scoring server. |
| TODO: 5. Get a certificate for the course. |
| """ |
|
|
| class BasicAgent: |
| def __init__(self, verbose: bool = False): |
| self.verbose = verbose |
| |
| |
| |
| |
| |
| |
| |
| llm = NebiusLLM( |
| api_key=os.getenv("NEBIUS_API_KEY"), |
| model="Qwen/Qwen3-235B-A22B-Instruct-2507", |
| api_base="https://api.tokenfactory.nebius.com/v1" |
| ) |
| self.workflow = AgentWorkflow.from_tools_or_functions( |
| [ |
| FunctionTool.from_defaults(tools.multiply), |
| FunctionTool.from_defaults(tools.add), |
| |
| |
| FunctionTool.from_defaults(tools.directFetchTool), |
| FunctionTool.from_defaults(tools.webSearchTool), |
| *ArxivToolSpec().to_tool_list(), |
| *WikipediaToolSpec().to_tool_list(), |
| ], |
| llm=llm, |
| system_prompt=f"Today's date is {datetime.datetime.now().strftime('%Y-%m-%d')}." |
| ) |
| |
| async def __call__(self, question: str) -> str: |
| """Async call method that returns the final result without streaming""" |
| print(f"Agent received question : {question}\n") |
| |
| if self.verbose: |
| answer = await self.stream_answers(question) |
| if answer is None: |
| return '' |
| return str(answer) |
| |
| answer = await self.workflow.run(user_msg=question) |
| print(f"Agent returning answer: {answer}") |
| if answer is None: |
| return '' |
| return str(answer) |
| |
| def call_sync(self, question: str) -> str: |
| """Synchronous wrapper for __call__ method (for compatibility with sync code)""" |
| return asyncio.run(self.__call__(question)) |
| |
| async def stream_answers(self, question: str) -> None: |
| |
| handler = self.workflow.run(user_msg=question, max_iterations=10) |
| async for event in handler.stream_events(): |
| if isinstance(event, AgentInput): |
| |
| pass |
| elif isinstance(event, AgentOutput): |
| |
| pass |
| elif isinstance(event, ToolCall): |
| print(f"\n\tCalled tool: {event.tool_name} {event.tool_kwargs}") |
| elif isinstance(event, ToolCallResult): |
| print(f"\n\t{event.tool_name} {event.tool_kwargs} -> {str(event.tool_output)[200:]}") |
| elif isinstance(event, StopEvent): |
| return event.result |
| elif isinstance(event, AgentStream): |
| if event.delta: |
| print(event.delta, end="", flush=True) |
| elif event.thinking_delta: |
| print(event.thinking_delta, end="", flush=True) |
| else: |
| pprint.pprint(f"Received event: {type(event)} {event}") |
|
|
|
|
| if __name__ == "__main__": |
| async def main(): |
| from dotenv import load_dotenv |
| load_dotenv() |
| |
| |
| llm = NebiusLLM( |
| api_key=os.getenv("NEBIUS_API_KEY"), |
| model="Qwen/Qwen3-235B-A22B-Instruct-2507", |
| api_base="https://api.tokenfactory.nebius.com/v1" |
| ) |
| |
| result = llm.chat([ChatMessage("Who are you?")]) |
| print(result) |
| |
| |
| |
| |
| |
| |
| |
| |
| asyncio.run(main()) |