File size: 2,405 Bytes
55b7d0c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
Event,
Context,
)
import asyncio
import nest_asyncio
from llama_index.llms.groq import Groq
from llama_index.utils.workflow import draw_all_possible_flows
from IPython.display import display, HTML
from dotenv import load_dotenv
from helper import extract_html_content
from pathlib import Path
import os
nest_asyncio.apply()
load_dotenv()
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
global_llm = Groq(api_key=GROQ_API_KEY, model="llama3-70b-8192")
class FirstEvent(Event):
first_output: str
class SecondEvent(Event):
second_output: str
response: str
class ProgressEvent(Event):
msg: str
class MyWorkflow(Workflow):
@step
async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
return FirstEvent(first_output="First step complete.")
@step
async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
llm = global_llm
generator = await llm.astream_complete(
"Please give me the first 3 paragraphs of Moby Dick, a book in the public domain."
)
async for response in generator:
# Allow the workflow to stream this piece of response
ctx.write_event_to_stream(ProgressEvent(msg=response.delta))
return SecondEvent(
second_output="Second step complete, full response attached",
response=str(response),
)
@step
async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
return StopEvent(result="Workflow complete.")
async def main():
w = MyWorkflow(timeout=30, verbose=True)
handler = w.run(first_input="Start the workflow.")
async for ev in handler.stream_events():
if isinstance(ev, ProgressEvent):
print(ev.msg)
final_result = await handler
print("Final result", final_result)
workflow_file = Path(__file__).parent / "workflows" / "streaming_workflow.html"
draw_all_possible_flows(w, filename=str(workflow_file))
html_content = extract_html_content(str(workflow_file))
display(HTML(html_content), metadata=dict(isolated=True))
if __name__ == "__main__":
asyncio.run(main()) |