|
|
from helper import extract_html_content |
|
|
from IPython.display import display, HTML |
|
|
from llama_index.utils.workflow import draw_all_possible_flows |
|
|
from llama_index.core.tools import FunctionTool |
|
|
from llama_index.core.agent import FunctionCallingAgent |
|
|
from llama_index.core import Settings |
|
|
from llama_parse import LlamaParse |
|
|
from llama_index.llms.groq import Groq |
|
|
from llama_index.embeddings.huggingface import HuggingFaceEmbedding |
|
|
from llama_index.core import ( |
|
|
VectorStoreIndex, |
|
|
StorageContext, |
|
|
load_index_from_storage |
|
|
) |
|
|
import nest_asyncio |
|
|
from llama_index.core.workflow import InputRequiredEvent, HumanResponseEvent |
|
|
from llama_index.core.workflow import ( |
|
|
StartEvent, |
|
|
StopEvent, |
|
|
Workflow, |
|
|
step, |
|
|
Event, |
|
|
Context |
|
|
) |
|
|
from pathlib import Path |
|
|
from queue import Queue |
|
|
import gradio as gr |
|
|
import whisper |
|
|
from dotenv import load_dotenv |
|
|
import os, json |
|
|
import asyncio |
|
|
|
|
|
storage_dir = "./storage" |
|
|
application_file = "./data/fake_application_form.pdf" |
|
|
nest_asyncio.apply() |
|
|
|
|
|
load_dotenv() |
|
|
llama_cloud_api_key = os.getenv("LLAMA_CLOUD_API_KEY") |
|
|
GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
|
|
LLAMA_CLOUD_BASE_URL = os.getenv("LLAMA_CLOUD_BASE_URL") |
|
|
|
|
|
global_llm = Groq(api_key=GROQ_API_KEY, model="llama3-70b-8192") |
|
|
global_embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") |
|
|
Settings.embed_model = global_embed_model |
|
|
|
|
|
|
|
|
class ParseFormEvent(Event): |
|
|
application_form: str |
|
|
|
|
|
|
|
|
class QueryEvent(Event): |
|
|
query: str |
|
|
field: str |
|
|
|
|
|
|
|
|
class ResponseEvent(Event): |
|
|
response: str |
|
|
|
|
|
|
|
|
|
|
|
class FeedbackEvent(Event): |
|
|
feedback: str |
|
|
|
|
|
|
|
|
class GenerateQuestionsEvent(Event): |
|
|
pass |
|
|
|
|
|
|
|
|
class RAGWorkflow(Workflow): |
|
|
storage_dir = "./storage" |
|
|
llm: Groq |
|
|
query_engine: VectorStoreIndex |
|
|
|
|
|
@step |
|
|
async def set_up(self, ctx: Context, ev: StartEvent) -> ParseFormEvent: |
|
|
self.llm = global_llm |
|
|
self.storage_dir = storage_dir |
|
|
if not ev.resume_file: |
|
|
raise ValueError("No resume file provided") |
|
|
|
|
|
if not ev.application_form: |
|
|
raise ValueError("No application form provided") |
|
|
|
|
|
|
|
|
if os.path.exists(self.storage_dir): |
|
|
|
|
|
storage_context = StorageContext.from_defaults(persist_dir=self.storage_dir) |
|
|
index = load_index_from_storage(storage_context) |
|
|
else: |
|
|
|
|
|
documents = LlamaParse( |
|
|
result_type="markdown", |
|
|
content_guideline_instruction="This is a resume, gather related facts together and format it as " |
|
|
"bullet points with headers" |
|
|
).load_data(ev.resume_file) |
|
|
|
|
|
index = VectorStoreIndex.from_documents( |
|
|
documents, |
|
|
embed_model=global_embed_model |
|
|
) |
|
|
index.storage_context.persist(persist_dir=self.storage_dir) |
|
|
|
|
|
|
|
|
self.query_engine = index.as_query_engine(llm=self.llm, similarity_top_k=5) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return ParseFormEvent(application_form=ev.application_form) |
|
|
|
|
|
|
|
|
@step |
|
|
async def parse_form(self, ctx: Context, ev: ParseFormEvent) -> GenerateQuestionsEvent: |
|
|
parser = LlamaParse( |
|
|
result_type="markdown", |
|
|
content_guideline_instruction="This is a job application form. Create a list of all the fields " |
|
|
"that need to be filled in.", |
|
|
formatting_instruction="Return a bulleted list of the fields ONLY." |
|
|
) |
|
|
|
|
|
|
|
|
result = parser.load_data(ev.application_form)[0] |
|
|
raw_json = self.llm.complete( |
|
|
f""" |
|
|
This is a parsed form. |
|
|
Convert it into a JSON object containing only the list |
|
|
of fields to be filled in, in the form {{ fields: [...] }}. |
|
|
<form>{result.text}</form>. |
|
|
Return JSON ONLY, no markdown. |
|
|
""") |
|
|
fields = json.loads(raw_json.text)["fields"] |
|
|
|
|
|
await ctx.set("fields_to_fill", fields) |
|
|
print("\n DEBUG: all fields written to Context >>>>>>>>>>>>>>>>>>>>>>>>>>\n") |
|
|
|
|
|
return GenerateQuestionsEvent() |
|
|
|
|
|
|
|
|
@step |
|
|
async def generate_questions(self, ctx: Context, ev: GenerateQuestionsEvent | FeedbackEvent) -> QueryEvent: |
|
|
|
|
|
|
|
|
fields = await ctx.get("fields_to_fill") |
|
|
print("\n DEBUG:all fields Read from Context >>>>>>>>>>>>>>>>>>>>>>>>>>\n") |
|
|
|
|
|
|
|
|
for field in fields: |
|
|
question = f"How would you answer this question about the candidate? <field>{field}</field>" |
|
|
|
|
|
if hasattr(ev, "feedback"): |
|
|
question += f""" |
|
|
\nWe previously got feedback about how we answered the questions. |
|
|
It might not be relevant to this particular field, but here it is: |
|
|
<feedback>{ev.feedback}</feedback> |
|
|
""" |
|
|
print("\n question : ", question) |
|
|
|
|
|
ctx.send_event(QueryEvent( |
|
|
field=field, |
|
|
query=question |
|
|
)) |
|
|
|
|
|
|
|
|
await ctx.set("total_fields", len(fields)) |
|
|
print(f"\n DEBUG: total fields from Context : {len(fields)}") |
|
|
|
|
|
return |
|
|
|
|
|
@step |
|
|
async def ask_question(self, ctx: Context, ev: QueryEvent) -> ResponseEvent: |
|
|
response = self.query_engine.query( |
|
|
f"This is a question about the specific resume we have in our database: {ev.query}") |
|
|
return ResponseEvent(field=ev.field, response=response.response) |
|
|
|
|
|
|
|
|
@step |
|
|
async def fill_in_application(self, ctx: Context, ev: ResponseEvent) -> InputRequiredEvent: |
|
|
|
|
|
total_fields = await ctx.get("total_fields") |
|
|
|
|
|
responses = ctx.collect_events(ev, [ResponseEvent] * total_fields) |
|
|
if responses is None: |
|
|
return None |
|
|
|
|
|
|
|
|
responseList = "\n".join("Field: " + r.field + "\n" + "Response: " + r.response for r in responses) |
|
|
print("\n DEBUG: got all responses :\n") |
|
|
|
|
|
result = self.llm.complete(f""" |
|
|
You are given a list of fields in an application form and responses to |
|
|
questions about those fields from a resume. Combine the two into a list of |
|
|
fields and succinct, factual answers to fill in those fields. |
|
|
|
|
|
<responses> |
|
|
{responseList} |
|
|
</responses> |
|
|
""") |
|
|
|
|
|
print("\n DEBUG: llm combined the fields and responses from resume") |
|
|
|
|
|
|
|
|
await ctx.set("filled_form", str(result)) |
|
|
|
|
|
print("\n DEBUG: Write all form fields to context. Now will emit InputRequiredEvent") |
|
|
|
|
|
|
|
|
return InputRequiredEvent( |
|
|
prefix="How does this look? Give me any feedback you have on any of the answers.", |
|
|
result=result |
|
|
) |
|
|
|
|
|
|
|
|
@step |
|
|
async def get_feedback(self, ctx: Context, ev: HumanResponseEvent) -> FeedbackEvent | StopEvent: |
|
|
|
|
|
result = self.llm.complete(f""" |
|
|
You have received some human feedback on the form-filling task you've done. |
|
|
Does everything look good, or is there more work to be done? |
|
|
<feedback> |
|
|
{ev.response} |
|
|
</feedback> |
|
|
If everything is fine, respond with just the word 'OKAY'. |
|
|
If there's any other feedback, respond with just the word 'FEEDBACK'. |
|
|
""") |
|
|
|
|
|
verdict = result.text.strip() |
|
|
|
|
|
print(f"LLM says the verdict was {verdict}") |
|
|
if (verdict == "OKAY"): |
|
|
return StopEvent(result=await ctx.get("filled_form")) |
|
|
else: |
|
|
return FeedbackEvent(feedback=ev.response) |
|
|
|
|
|
|
|
|
def transcribe_speech(filepath): |
|
|
if filepath is None: |
|
|
gr.Warning("No audio found, please retry.") |
|
|
|
|
|
model = whisper.load_model("base") |
|
|
result = model.transcribe(filepath, fp16=False) |
|
|
|
|
|
return result["text"] |
|
|
|
|
|
|
|
|
|
|
|
class TranscriptionHandler: |
|
|
|
|
|
|
|
|
def __init__(self): |
|
|
self.transcription_queue = Queue() |
|
|
self.interface = None |
|
|
self.log_display = None |
|
|
|
|
|
|
|
|
def store_transcription(self, output): |
|
|
self.transcription_queue.put(output) |
|
|
return output |
|
|
|
|
|
|
|
|
|
|
|
def create_interface(self): |
|
|
|
|
|
log_box = gr.Textbox( |
|
|
label="Log Output", |
|
|
interactive=False, |
|
|
value="Waiting for user interaction...\n", |
|
|
height=200 |
|
|
) |
|
|
|
|
|
|
|
|
mic_transcribe = gr.Interface( |
|
|
fn=lambda x: self.store_transcription(transcribe_speech(x)), |
|
|
inputs=gr.Audio(sources=["microphone"], type="filepath"), |
|
|
outputs=gr.Textbox(label="Transcription") |
|
|
) |
|
|
|
|
|
|
|
|
self.interface = gr.Blocks() |
|
|
with self.interface: |
|
|
with gr.Row(): |
|
|
self.log_display = log_box |
|
|
with gr.Row(): |
|
|
|
|
|
gr.TabbedInterface([log_box, mic_transcribe], ["Log", "Transcribe Microphone"]) |
|
|
|
|
|
return self.interface |
|
|
|
|
|
|
|
|
async def get_transcription(self): |
|
|
self.interface = self.create_interface() |
|
|
self.interface.launch( |
|
|
share=True, |
|
|
ssr_mode=False, |
|
|
prevent_thread_lock=True |
|
|
) |
|
|
|
|
|
|
|
|
while True: |
|
|
if not self.transcription_queue.empty(): |
|
|
result = self.transcription_queue.get() |
|
|
if self.interface is not None: |
|
|
self.interface.close() |
|
|
return result |
|
|
await asyncio.sleep(1.5) |
|
|
|
|
|
|
|
|
def update_log(self, message): |
|
|
if self.log_display: |
|
|
self.log_display.update(value=f"{message}\n") |
|
|
|
|
|
|
|
|
async def main(): |
|
|
w = RAGWorkflow(timeout=600, verbose=True) |
|
|
handler = w.run( |
|
|
resume_file="data/fake_resume.pdf", |
|
|
application_form="data/fake_application_form.pdf" |
|
|
) |
|
|
|
|
|
print("DEBUG: Starting event stream...") |
|
|
async for event in handler.stream_events(): |
|
|
print(f"DEBUG: Received event type {type(event).__name__}") |
|
|
if isinstance(event, InputRequiredEvent): |
|
|
print("We've filled in your form! Here are the results:\n") |
|
|
print(event.result) |
|
|
|
|
|
|
|
|
transcription_handler = TranscriptionHandler() |
|
|
response = await transcription_handler.get_transcription() |
|
|
|
|
|
handler.ctx.send_event( |
|
|
HumanResponseEvent( |
|
|
response=response |
|
|
) |
|
|
) |
|
|
else: |
|
|
print("\n handler received event ", event) |
|
|
|
|
|
response = await handler |
|
|
print("Agent complete! Here's your final result:") |
|
|
print(str(response)) |
|
|
|
|
|
|
|
|
workflow_file = Path(__file__).parent / "workflows" / "form_parsing_workflow.html" |
|
|
draw_all_possible_flows(w, filename=str(workflow_file)) |
|
|
html_content = extract_html_content(str(workflow_file)) |
|
|
display(HTML(html_content), metadata=dict(isolated=True)) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |
|
|
|