from helper import extract_html_content from IPython.display import display, HTML from llama_index.utils.workflow import draw_all_possible_flows from llama_index.core.tools import FunctionTool from llama_index.core.agent import FunctionCallingAgent from llama_index.core import Settings from llama_parse import LlamaParse from llama_index.llms.groq import Groq from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.core import ( VectorStoreIndex, StorageContext, load_index_from_storage ) import nest_asyncio from llama_index.core.workflow import InputRequiredEvent, HumanResponseEvent from llama_index.core.workflow import ( StartEvent, StopEvent, Workflow, step, Event, Context ) from pathlib import Path from queue import Queue import gradio as gr import whisper from dotenv import load_dotenv import os, json import asyncio storage_dir = "./storage" application_file = "./data/fake_application_form.pdf" nest_asyncio.apply() load_dotenv() llama_cloud_api_key = os.getenv("LLAMA_CLOUD_API_KEY") GROQ_API_KEY = os.getenv("GROQ_API_KEY") LLAMA_CLOUD_BASE_URL = os.getenv("LLAMA_CLOUD_BASE_URL") global_llm = Groq(api_key=GROQ_API_KEY, model="llama3-70b-8192") global_embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") Settings.embed_model = global_embed_model class ParseFormEvent(Event): application_form: str class QueryEvent(Event): query: str field: str class ResponseEvent(Event): response: str # new! class FeedbackEvent(Event): feedback: str class GenerateQuestionsEvent(Event): pass class RAGWorkflow(Workflow): storage_dir = "./storage" llm: Groq query_engine: VectorStoreIndex @step async def set_up(self, ctx: Context, ev: StartEvent) -> ParseFormEvent: self.llm = global_llm self.storage_dir = storage_dir if not ev.resume_file: raise ValueError("No resume file provided") if not ev.application_form: raise ValueError("No application form provided") # ingest the data and set up the query engine if os.path.exists(self.storage_dir): # you've already ingested the resume document storage_context = StorageContext.from_defaults(persist_dir=self.storage_dir) index = load_index_from_storage(storage_context) else: # parse and load the resume document documents = LlamaParse( result_type="markdown", content_guideline_instruction="This is a resume, gather related facts together and format it as " "bullet points with headers" ).load_data(ev.resume_file) # embed and index the documents index = VectorStoreIndex.from_documents( documents, embed_model=global_embed_model ) index.storage_context.persist(persist_dir=self.storage_dir) # create a query engine self.query_engine = index.as_query_engine(llm=self.llm, similarity_top_k=5) # you no longer need a query to be passed in, # you'll be generating the queries instead # let's pass the application form to a new step to parse it return ParseFormEvent(application_form=ev.application_form) # new - separated the form parsing from the question generation @step async def parse_form(self, ctx: Context, ev: ParseFormEvent) -> GenerateQuestionsEvent: parser = LlamaParse( result_type="markdown", content_guideline_instruction="This is a job application form. Create a list of all the fields " "that need to be filled in.", formatting_instruction="Return a bulleted list of the fields ONLY." ) # get the LLM to convert the parsed form into JSON result = parser.load_data(ev.application_form)[0] raw_json = self.llm.complete( f""" This is a parsed form. Convert it into a JSON object containing only the list of fields to be filled in, in the form {{ fields: [...] }}.
{result.text}
. Return JSON ONLY, no markdown. """) fields = json.loads(raw_json.text)["fields"] await ctx.set("fields_to_fill", fields) print("\n DEBUG: all fields written to Context >>>>>>>>>>>>>>>>>>>>>>>>>>\n") return GenerateQuestionsEvent() # new - this step can get triggered either by GenerateQuestionsEvent or a FeedbackEvent @step async def generate_questions(self, ctx: Context, ev: GenerateQuestionsEvent | FeedbackEvent) -> QueryEvent: # get the list of fields to fill in fields = await ctx.get("fields_to_fill") print("\n DEBUG:all fields Read from Context >>>>>>>>>>>>>>>>>>>>>>>>>>\n") # generate one query for each of the fields, and fire them off for field in fields: question = f"How would you answer this question about the candidate? {field}" # Is there feedback? If so, add it to the query: if hasattr(ev, "feedback"): question += f""" \nWe previously got feedback about how we answered the questions. It might not be relevant to this particular field, but here it is: {ev.feedback} """ print("\n question : ", question) ctx.send_event(QueryEvent( field=field, query=question )) # store the number of fields, so we know how many to wait for later await ctx.set("total_fields", len(fields)) print(f"\n DEBUG: total fields from Context : {len(fields)}") return @step async def ask_question(self, ctx: Context, ev: QueryEvent) -> ResponseEvent: response = self.query_engine.query( f"This is a question about the specific resume we have in our database: {ev.query}") return ResponseEvent(field=ev.field, response=response.response) # new - we now emit an InputRequiredEvent @step async def fill_in_application(self, ctx: Context, ev: ResponseEvent) -> InputRequiredEvent: # get the total number of fields to wait for total_fields = await ctx.get("total_fields") responses = ctx.collect_events(ev, [ResponseEvent] * total_fields) if responses is None: return None # do nothing if there's nothing to do yet # we've got all the responses! responseList = "\n".join("Field: " + r.field + "\n" + "Response: " + r.response for r in responses) print("\n DEBUG: got all responses :\n") result = self.llm.complete(f""" You are given a list of fields in an application form and responses to questions about those fields from a resume. Combine the two into a list of fields and succinct, factual answers to fill in those fields. {responseList} """) print("\n DEBUG: llm combined the fields and responses from resume") # new! save the result for later await ctx.set("filled_form", str(result)) print("\n DEBUG: Write all form fields to context. Now will emit InputRequiredEvent") # new! Let's get a human in the loop return InputRequiredEvent( prefix="How does this look? Give me any feedback you have on any of the answers.", result=result ) # new! Accept the feedback. @step async def get_feedback(self, ctx: Context, ev: HumanResponseEvent) -> FeedbackEvent | StopEvent: result = self.llm.complete(f""" You have received some human feedback on the form-filling task you've done. Does everything look good, or is there more work to be done? {ev.response} If everything is fine, respond with just the word 'OKAY'. If there's any other feedback, respond with just the word 'FEEDBACK'. """) verdict = result.text.strip() print(f"LLM says the verdict was {verdict}") if (verdict == "OKAY"): return StopEvent(result=await ctx.get("filled_form")) else: return FeedbackEvent(feedback=ev.response) def transcribe_speech(filepath): if filepath is None: gr.Warning("No audio found, please retry.") model = whisper.load_model("base") result = model.transcribe(filepath, fp16=False) return result["text"] # New! Transcription handler. class TranscriptionHandler: # we create a queue to hold transcription values def __init__(self): self.transcription_queue = Queue() self.interface = None self.log_display = None # every time we record something we put it in the queue def store_transcription(self, output): self.transcription_queue.put(output) return output # This is the same interface and transcription logic as before # except it stores the result in a queue instead of a global def create_interface(self): # Initial Log Display (Textbox with logs) log_box = gr.Textbox( label="Log Output", interactive=False, value="Waiting for user interaction...\n", height=200 ) # Transcription area that gets activated after form input mic_transcribe = gr.Interface( fn=lambda x: self.store_transcription(transcribe_speech(x)), inputs=gr.Audio(sources=["microphone"], type="filepath"), outputs=gr.Textbox(label="Transcription") ) # Creating a Block interface self.interface = gr.Blocks() with self.interface: with gr.Row(): self.log_display = log_box # Display log with gr.Row(): # A Tabbed Interface, initially showing the log, then the microphone input gr.TabbedInterface([log_box, mic_transcribe], ["Log", "Transcribe Microphone"]) return self.interface # Launches the interface with dynamic transition based on events async def get_transcription(self): self.interface = self.create_interface() self.interface.launch( share=True, # Remove when running on Hugging Face Spaces ssr_mode=False, prevent_thread_lock=True ) # Poll every 1.5 seconds, checking if transcription has been queued while True: if not self.transcription_queue.empty(): result = self.transcription_queue.get() if self.interface is not None: self.interface.close() return result await asyncio.sleep(1.5) # Update log display dynamically as the workflow progresses def update_log(self, message): if self.log_display: self.log_display.update(value=f"{message}\n") async def main(): w = RAGWorkflow(timeout=600, verbose=True) handler = w.run( resume_file="data/fake_resume.pdf", application_form="data/fake_application_form.pdf" ) print("DEBUG: Starting event stream...") async for event in handler.stream_events(): print(f"DEBUG: Received event type {type(event).__name__}") if isinstance(event, InputRequiredEvent): print("We've filled in your form! Here are the results:\n") print(event.result) # Get transcription transcription_handler = TranscriptionHandler() response = await transcription_handler.get_transcription() handler.ctx.send_event( HumanResponseEvent( response=response ) ) else: print("\n handler received event ", event) response = await handler print("Agent complete! Here's your final result:") print(str(response)) # Display of the workflow workflow_file = Path(__file__).parent / "workflows" / "form_parsing_workflow.html" draw_all_possible_flows(w, filename=str(workflow_file)) html_content = extract_html_content(str(workflow_file)) display(HTML(html_content), metadata=dict(isolated=True)) if __name__ == "__main__": asyncio.run(main())