import asyncio import os import datetime from llama_index.core.workflow import Event, StartEvent, StopEvent, Workflow, step, Context from llama_index.llms.nebius import NebiusLLM from agent import BasicAgent """ 1. Decide what to use: context or event payload? 2. Investigate alternative to initializate llm at each step|global llm 3. Throw relevant data throght steps """ class EvalPlanEvent(Event): planStep: str class FinalAnswerEvent(Event): finalAnswer: str class UpdatePlanEvent(Event): planStep: str planStepResult: str class MultiStepWorkflow(Workflow): def __init__(self, **kwargs): self.llm = NebiusLLM( api_key=os.getenv("NEBIUS_API_KEY"), model="Qwen/Qwen3-235B-A22B-Instruct-2507", api_base="https://api.tokenfactory.nebius.com/v1", system_prompt=f"Today's date is {datetime.datetime.now().strftime('%Y-%m-%d')}." ) self.agent = BasicAgent(verbose=kwargs.get("verbose", False)) super().__init__(**kwargs) @step async def makePlanStep(self, ctx: Context, ev: StartEvent) -> EvalPlanEvent: if not hasattr(ev, "question"): raise ValueError("question field is required") await ctx.store.set("question", ev.question) plan = await self.llm.acomplete("""Make a plan to answer the question. Plan should be a list of steps. Each step should contain enough context to execute the step. Formulate the steps in a way that can be executed by the agent. Maximum 7 steps. Return only the plan, no other text. The question: """ + ev.question) await ctx.store.set("plan", str(plan)) step = await self.llm.acomplete("""Get the first step of the plan. Return only the step, no other text. The plan: """ + str(plan)) print(f'Plan is {plan}') return EvalPlanEvent(planStep=str(step)) @step async def evalPlanStep(self, ctx: Context, ev: EvalPlanEvent) -> UpdatePlanEvent | FinalAnswerEvent: if not hasattr(ev, "planStep"): raise ValueError("planStep field is required") result = await self.agent(f"The question is: {await ctx.store.get('question')} \n\n The plan is: {await ctx.store.get('plan')} \n\n Execute only the step: {ev.planStep}") return UpdatePlanEvent(planStep=ev.planStep, planStepResult=str(result)) @step async def updatePlanStep(self, ctx: Context, ev: UpdatePlanEvent)-> EvalPlanEvent | FinalAnswerEvent: if not hasattr(ev, "planStep"): raise ValueError("planStep field is required") if not hasattr(ev, "planStepResult"): raise ValueError("planStepResult field is required") plan = await ctx.store.get("plan") question = await ctx.store.get("question") plan = await self.llm.acomplete("""Update the plan based on the plan step result. Note that each plan step should contain enough context to execute the step and formulate the steps in a way that can be executed by the agent. Maximum 7 steps. Return only the updated plan, no other text. The plan step: """ + ev.planStep + """ The plan step result: """ + ev.planStepResult + """ The question: """ + question + """ The plan: """ + plan) await ctx.store.set("plan", str(plan)) verdict = await self.llm.acomplete("""Check the question and the plan. If there is enough information to answer the question, then return answer with the following template: FINAL ANSWER: [FINAL ANSWER]. Otherwise return an empty string. The question: """ + question + """ The plan: """ + str(plan)) print(f'Plan is {plan} \n\nVerdict is {verdict}') if "FINAL ANSWER" in str(verdict): return FinalAnswerEvent(finalAnswer=str(verdict).split("FINAL ANSWER:")[1]) step = await self.llm.acomplete("""Get the next step to evaluate of the plan. Return only the step, no other text. The plan: """ + str(plan)) return EvalPlanEvent(planStep=str(step)) @step async def finalAnswerStep(self, ctx: Context, ev: FinalAnswerEvent) -> StopEvent: if not hasattr(ev, "finalAnswer"): raise ValueError("finalAnswer field is required") question = await ctx.store.get("question") formattedAnswer = await self.llm.acomplete(""" Help me to format the answer to the correct format. Return only the formatted answer, no other text. """ + question + """ """ + ev.finalAnswer + """ Answer should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. """ ) return StopEvent(result=str(formattedAnswer)) if __name__ == "__main__": async def main(): questionAlbums = "How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia." questionReverse = ".rewsna eht sa \"tfel\" drow eht fo etisoppo eht etirw ,ecnetnes siht dnatsrednu uoy fI" questionDinosaur = "Who nominated the only Featured Article on English Wikipedia about a dinosaur that was promoted in November 2016?" questionTable = "Given this table defining * on the set S = {a, b, c, d, e}\n\n|*|a|b|c|d|e|\n|---|---|---|---|---|---|\n|a|a|b|c|b|d|\n|b|b|c|a|e|c|\n|c|c|a|b|b|a|\n|d|b|e|b|e|d|\n|e|d|b|a|d|c|\n\nprovide the subset of S involved in any possible counter-examples that prove * is not commutative. Provide your answer as a comma separated list of the elements in the set in alphabetical order." questionSurname = "What is the surname of the equine veterinarian mentioned in 1.E Exercises from the chemistry materials licensed by Marisa Alviar-Agnew & Henry Agnew under the CK-12 license in LibreText's Introductory Chemistry materials as compiled 08/21/2023?" from dotenv import load_dotenv load_dotenv() workflow = MultiStepWorkflow(timeout=300, verbose=False) response = await workflow.run(question=questionSurname) print(response) asyncio.run(main())