from helper import extract_html_content
from IPython.display import display, HTML
from llama_index.utils.workflow import draw_all_possible_flows
from llama_index.core.tools import FunctionTool
from llama_index.core.agent import FunctionCallingAgent
from llama_index.core import Settings
from llama_parse import LlamaParse
from llama_index.llms.groq import Groq
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core import (
VectorStoreIndex,
StorageContext,
load_index_from_storage
)
import nest_asyncio
from llama_index.core.workflow import InputRequiredEvent, HumanResponseEvent
from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
Event,
Context
)
from pathlib import Path
from queue import Queue
import gradio as gr
import whisper
from dotenv import load_dotenv
import os, json
import asyncio
storage_dir = "./storage"
application_file = "./data/fake_application_form.pdf"
nest_asyncio.apply()
load_dotenv()
llama_cloud_api_key = os.getenv("LLAMA_CLOUD_API_KEY")
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
LLAMA_CLOUD_BASE_URL = os.getenv("LLAMA_CLOUD_BASE_URL")
global_llm = Groq(api_key=GROQ_API_KEY, model="llama3-70b-8192")
global_embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
Settings.embed_model = global_embed_model
class ParseFormEvent(Event):
application_form: str
class QueryEvent(Event):
query: str
field: str
class ResponseEvent(Event):
response: str
# new!
class FeedbackEvent(Event):
feedback: str
class GenerateQuestionsEvent(Event):
pass
class RAGWorkflow(Workflow):
storage_dir = "./storage"
llm: Groq
query_engine: VectorStoreIndex
@step
async def set_up(self, ctx: Context, ev: StartEvent) -> ParseFormEvent:
self.llm = global_llm
self.storage_dir = storage_dir
if not ev.resume_file:
raise ValueError("No resume file provided")
if not ev.application_form:
raise ValueError("No application form provided")
# ingest the data and set up the query engine
if os.path.exists(self.storage_dir):
# you've already ingested the resume document
storage_context = StorageContext.from_defaults(persist_dir=self.storage_dir)
index = load_index_from_storage(storage_context)
else:
# parse and load the resume document
documents = LlamaParse(
result_type="markdown",
content_guideline_instruction="This is a resume, gather related facts together and format it as "
"bullet points with headers"
).load_data(ev.resume_file)
# embed and index the documents
index = VectorStoreIndex.from_documents(
documents,
embed_model=global_embed_model
)
index.storage_context.persist(persist_dir=self.storage_dir)
# create a query engine
self.query_engine = index.as_query_engine(llm=self.llm, similarity_top_k=5)
# you no longer need a query to be passed in,
# you'll be generating the queries instead
# let's pass the application form to a new step to parse it
return ParseFormEvent(application_form=ev.application_form)
# new - separated the form parsing from the question generation
@step
async def parse_form(self, ctx: Context, ev: ParseFormEvent) -> GenerateQuestionsEvent:
parser = LlamaParse(
result_type="markdown",
content_guideline_instruction="This is a job application form. Create a list of all the fields "
"that need to be filled in.",
formatting_instruction="Return a bulleted list of the fields ONLY."
)
# get the LLM to convert the parsed form into JSON
result = parser.load_data(ev.application_form)[0]
raw_json = self.llm.complete(
f"""
This is a parsed form.
Convert it into a JSON object containing only the list
of fields to be filled in, in the form {{ fields: [...] }}.
.
Return JSON ONLY, no markdown.
""")
fields = json.loads(raw_json.text)["fields"]
await ctx.set("fields_to_fill", fields)
print("\n DEBUG: all fields written to Context >>>>>>>>>>>>>>>>>>>>>>>>>>\n")
return GenerateQuestionsEvent()
# new - this step can get triggered either by GenerateQuestionsEvent or a FeedbackEvent
@step
async def generate_questions(self, ctx: Context, ev: GenerateQuestionsEvent | FeedbackEvent) -> QueryEvent:
# get the list of fields to fill in
fields = await ctx.get("fields_to_fill")
print("\n DEBUG:all fields Read from Context >>>>>>>>>>>>>>>>>>>>>>>>>>\n")
# generate one query for each of the fields, and fire them off
for field in fields:
question = f"How would you answer this question about the candidate? {field}"
# Is there feedback? If so, add it to the query:
if hasattr(ev, "feedback"):
question += f"""
\nWe previously got feedback about how we answered the questions.
It might not be relevant to this particular field, but here it is:
{ev.feedback}
"""
print("\n question : ", question)
ctx.send_event(QueryEvent(
field=field,
query=question
))
# store the number of fields, so we know how many to wait for later
await ctx.set("total_fields", len(fields))
print(f"\n DEBUG: total fields from Context : {len(fields)}")
return
@step
async def ask_question(self, ctx: Context, ev: QueryEvent) -> ResponseEvent:
response = self.query_engine.query(
f"This is a question about the specific resume we have in our database: {ev.query}")
return ResponseEvent(field=ev.field, response=response.response)
# new - we now emit an InputRequiredEvent
@step
async def fill_in_application(self, ctx: Context, ev: ResponseEvent) -> InputRequiredEvent:
# get the total number of fields to wait for
total_fields = await ctx.get("total_fields")
responses = ctx.collect_events(ev, [ResponseEvent] * total_fields)
if responses is None:
return None # do nothing if there's nothing to do yet
# we've got all the responses!
responseList = "\n".join("Field: " + r.field + "\n" + "Response: " + r.response for r in responses)
print("\n DEBUG: got all responses :\n")
result = self.llm.complete(f"""
You are given a list of fields in an application form and responses to
questions about those fields from a resume. Combine the two into a list of
fields and succinct, factual answers to fill in those fields.
{responseList}
""")
print("\n DEBUG: llm combined the fields and responses from resume")
# new! save the result for later
await ctx.set("filled_form", str(result))
print("\n DEBUG: Write all form fields to context. Now will emit InputRequiredEvent")
# new! Let's get a human in the loop
return InputRequiredEvent(
prefix="How does this look? Give me any feedback you have on any of the answers.",
result=result
)
# new! Accept the feedback.
@step
async def get_feedback(self, ctx: Context, ev: HumanResponseEvent) -> FeedbackEvent | StopEvent:
result = self.llm.complete(f"""
You have received some human feedback on the form-filling task you've done.
Does everything look good, or is there more work to be done?
{ev.response}
If everything is fine, respond with just the word 'OKAY'.
If there's any other feedback, respond with just the word 'FEEDBACK'.
""")
verdict = result.text.strip()
print(f"LLM says the verdict was {verdict}")
if (verdict == "OKAY"):
return StopEvent(result=await ctx.get("filled_form"))
else:
return FeedbackEvent(feedback=ev.response)
def transcribe_speech(filepath):
if filepath is None:
gr.Warning("No audio found, please retry.")
model = whisper.load_model("base")
result = model.transcribe(filepath, fp16=False)
return result["text"]
# New! Transcription handler.
class TranscriptionHandler:
# we create a queue to hold transcription values
def __init__(self):
self.transcription_queue = Queue()
self.interface = None
self.log_display = None
# every time we record something we put it in the queue
def store_transcription(self, output):
self.transcription_queue.put(output)
return output
# This is the same interface and transcription logic as before
# except it stores the result in a queue instead of a global
def create_interface(self):
# Initial Log Display (Textbox with logs)
log_box = gr.Textbox(
label="Log Output",
interactive=False,
value="Waiting for user interaction...\n",
height=200
)
# Transcription area that gets activated after form input
mic_transcribe = gr.Interface(
fn=lambda x: self.store_transcription(transcribe_speech(x)),
inputs=gr.Audio(sources=["microphone"], type="filepath"),
outputs=gr.Textbox(label="Transcription")
)
# Creating a Block interface
self.interface = gr.Blocks()
with self.interface:
with gr.Row():
self.log_display = log_box # Display log
with gr.Row():
# A Tabbed Interface, initially showing the log, then the microphone input
gr.TabbedInterface([log_box, mic_transcribe], ["Log", "Transcribe Microphone"])
return self.interface
# Launches the interface with dynamic transition based on events
async def get_transcription(self):
self.interface = self.create_interface()
self.interface.launch(
share=True, # Remove when running on Hugging Face Spaces
ssr_mode=False,
prevent_thread_lock=True
)
# Poll every 1.5 seconds, checking if transcription has been queued
while True:
if not self.transcription_queue.empty():
result = self.transcription_queue.get()
if self.interface is not None:
self.interface.close()
return result
await asyncio.sleep(1.5)
# Update log display dynamically as the workflow progresses
def update_log(self, message):
if self.log_display:
self.log_display.update(value=f"{message}\n")
async def main():
w = RAGWorkflow(timeout=600, verbose=True)
handler = w.run(
resume_file="data/fake_resume.pdf",
application_form="data/fake_application_form.pdf"
)
print("DEBUG: Starting event stream...")
async for event in handler.stream_events():
print(f"DEBUG: Received event type {type(event).__name__}")
if isinstance(event, InputRequiredEvent):
print("We've filled in your form! Here are the results:\n")
print(event.result)
# Get transcription
transcription_handler = TranscriptionHandler()
response = await transcription_handler.get_transcription()
handler.ctx.send_event(
HumanResponseEvent(
response=response
)
)
else:
print("\n handler received event ", event)
response = await handler
print("Agent complete! Here's your final result:")
print(str(response))
# Display of the workflow
workflow_file = Path(__file__).parent / "workflows" / "form_parsing_workflow.html"
draw_all_possible_flows(w, filename=str(workflow_file))
html_content = extract_html_content(str(workflow_file))
display(HTML(html_content), metadata=dict(isolated=True))
if __name__ == "__main__":
asyncio.run(main())