RAG01 / steps.py
NaderAfshar
Committing new and updated files
fd58b95
import warnings
import os
import json
import random
import asyncio
from queue import Queue
from dotenv import load_dotenv
import gradio as gr
from llama_cloud_services import LlamaParse
from llama_index.utils.workflow import draw_all_possible_flows
from llama_index.llms.cohere import Cohere
from llama_index.llms.openai import OpenAI
from llama_index.llms.groq import Groq
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core import (
VectorStoreIndex,
StorageContext,
load_index_from_storage
)
from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
Event,
Context,
InputRequiredEvent,
HumanResponseEvent
)
from llama_index.readers.whisper import WhisperReader
import nest_asyncio
from IPython.display import display, HTML, DisplayHandle
from helper import extract_html_content
from pathlib import Path
# Load environment variables
load_dotenv()
CO_API_KEY = os.getenv("COHERE_API_KEY")
llama_cloud_api_key = os.getenv("LLAMA_CLOUD_API_KEY")
LLAMA_CLOUD_BASE_URL = os.getenv("LLAMA_CLOUD_BASE_URL")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
warnings.filterwarnings('ignore')
#llm = Cohere(api_key=CO_API_KEY, model="command-r")
#llm = OpenAI(api_key=OPENAI_API_KEY, model="gpt-4o-mini")
llm = Groq(api_key=GROQ_API_KEY, model="llama3-70b-8192")
class FirstEvent(Event):
first_output: str
class SecondEvent(Event):
second_output: str
response: str
class TextEvent(Event):
delta: str
class ProgressEvent(Event):
msg: str
class MyWorkflow(Workflow):
@step
async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
return FirstEvent(first_output="First step complete.")
@step
async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
# Pay attention to this API: ;;m.astream.complete(). Here "a" is for async.
generator = await llm.astream_complete(
"Please give me the first 50 words of Moby Dick, a book in the public domain."
)
async for response in generator:
# Allow the workflow to stream this piece of response
ctx.write_event_to_stream(TextEvent(delta=response.delta))
return SecondEvent(
second_output="Second step complete, full response attached",
response=str(response),
)
@step
async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
ctx.write_event_to_stream(ProgressEvent(msg="\nStep three is happening"))
return StopEvent(result="Workflow complete.")
async def main():
workflow = MyWorkflow(timeout=30, verbose=False)
handler = workflow.run(first_input="Start the workflow.")
async for ev in handler.stream_events():
if isinstance(ev, ProgressEvent):
print(ev.msg)
if isinstance(ev, TextEvent):
print(ev.delta, end="")
final_result = await handler
print("Final result = ", final_result)
# Display of the workflow
workflow_file = Path(__file__).parent / "workflows" / "RAG-EventDriven.html"
draw_all_possible_flows(workflow, filename=str(workflow_file))
html_content = extract_html_content(workflow_file)
display(HTML(html_content), metadata=dict(isolated=True))
if __name__ == "__main__":
asyncio.run(main())
'''
# instantiate the workflow
async def main():
from pathlib import Path
workflow = MyWorkflow(timeout=10, verbose=False)
result = await workflow.run(first_input="Start the workflow.")
print(result)
# Test the display of the workflow
WORKFLOW_FILE = Path(__file__).parent / "workflows" / "RAG-EventDriven.html"
draw_all_possible_flows(workflow, filename=str(WORKFLOW_FILE))
html_content = extract_html_content(WORKFLOW_FILE)
display(HTML(html_content), metadata=dict(isolated=True))
print(result)
if __name__ == "__main__":
asyncio.run(main())
'''