import warnings import os import json import random import asyncio from queue import Queue from dotenv import load_dotenv import gradio as gr from llama_cloud_services import LlamaParse from llama_index.utils.workflow import draw_all_possible_flows from llama_index.llms.cohere import Cohere from llama_index.llms.openai import OpenAI from llama_index.llms.groq import Groq from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.core import ( VectorStoreIndex, StorageContext, load_index_from_storage ) from llama_index.core.workflow import ( StartEvent, StopEvent, Workflow, step, Event, Context, InputRequiredEvent, HumanResponseEvent ) from llama_index.readers.whisper import WhisperReader import nest_asyncio from IPython.display import display, HTML, DisplayHandle from helper import extract_html_content from pathlib import Path # Load environment variables load_dotenv() CO_API_KEY = os.getenv("COHERE_API_KEY") llama_cloud_api_key = os.getenv("LLAMA_CLOUD_API_KEY") LLAMA_CLOUD_BASE_URL = os.getenv("LLAMA_CLOUD_BASE_URL") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") GROQ_API_KEY = os.getenv("GROQ_API_KEY") warnings.filterwarnings('ignore') #llm = Cohere(api_key=CO_API_KEY, model="command-r") #llm = OpenAI(api_key=OPENAI_API_KEY, model="gpt-4o-mini") llm = Groq(api_key=GROQ_API_KEY, model="llama3-70b-8192") class FirstEvent(Event): first_output: str class SecondEvent(Event): second_output: str response: str class TextEvent(Event): delta: str class ProgressEvent(Event): msg: str class MyWorkflow(Workflow): @step async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent: ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening")) return FirstEvent(first_output="First step complete.") @step async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent: # Pay attention to this API: ;;m.astream.complete(). Here "a" is for async. generator = await llm.astream_complete( "Please give me the first 50 words of Moby Dick, a book in the public domain." ) async for response in generator: # Allow the workflow to stream this piece of response ctx.write_event_to_stream(TextEvent(delta=response.delta)) return SecondEvent( second_output="Second step complete, full response attached", response=str(response), ) @step async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent: ctx.write_event_to_stream(ProgressEvent(msg="\nStep three is happening")) return StopEvent(result="Workflow complete.") async def main(): workflow = MyWorkflow(timeout=30, verbose=False) handler = workflow.run(first_input="Start the workflow.") async for ev in handler.stream_events(): if isinstance(ev, ProgressEvent): print(ev.msg) if isinstance(ev, TextEvent): print(ev.delta, end="") final_result = await handler print("Final result = ", final_result) # Display of the workflow workflow_file = Path(__file__).parent / "workflows" / "RAG-EventDriven.html" draw_all_possible_flows(workflow, filename=str(workflow_file)) html_content = extract_html_content(workflow_file) display(HTML(html_content), metadata=dict(isolated=True)) if __name__ == "__main__": asyncio.run(main()) ''' # instantiate the workflow async def main(): from pathlib import Path workflow = MyWorkflow(timeout=10, verbose=False) result = await workflow.run(first_input="Start the workflow.") print(result) # Test the display of the workflow WORKFLOW_FILE = Path(__file__).parent / "workflows" / "RAG-EventDriven.html" draw_all_possible_flows(workflow, filename=str(WORKFLOW_FILE)) html_content = extract_html_content(WORKFLOW_FILE) display(HTML(html_content), metadata=dict(isolated=True)) print(result) if __name__ == "__main__": asyncio.run(main()) '''