CandidateExplorer / services /agents /AutograderAgent.py
ishaq101's picture
clean init
478dec6
import asyncio
# import os, glob
import uuid
import time
import pandas as pd
from dotenv import load_dotenv
load_dotenv()
from typing import AsyncIterable, List, Optional, Dict, Union
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.callbacks import AsyncCallbackHandler
# from fastapi.middleware.cors import CORSMiddleware
# from fastapi.responses import StreamingResponse
# from langchain.chat_models import ChatOpenAI
# from pydantic import BaseModel
# from langchain_google_genai import ChatGoogleGenerativeAI
from services.llms.LLM import model_5mini, model_4o_2
from services.agents.LLMAgent import LLMAgent
from models.data_model import OutProfile, Profile
from services.prompts.profile_extraction import extract_one_profile
from utils.utils import pdf_reader, ingest_one_profile, ingest_bulk_profile, retrieve_profile, pretty_profiles
# from concurrent.futures import ThreadPoolExecutor, as_completed
prompt_template_filename = "AutograderPrompt.md"
prompt_autograder = open(f"src/prompts/{prompt_template_filename}", "rb").read().decode('utf-8')
class AutograderAgent(LLMAgent):
def __init__(self, model=model_4o_2):
super().__init__(model)
self.agent_name = "AutograderAgent"
self.prompt_template = prompt_autograder
self.prompt_extract_one_profile = extract_one_profile
async def generate(self, user_profile: str) -> AsyncIterable[str]:
"""Generates a response from messages using the model's astream method."""
self.callback = AsyncCallbackHandler()
self.callbacks = [self.callback]
input = [
HumanMessage(content=self.prompt_template.format(user_profile=user_profile)),
]
try:
async for token in self.model.astream(input=input, callbacks=self.callbacks):
yield token.content
except Exception as e:
print(f"Caught exception: {e}")
async def generate_one(self, file_path:str) -> Optional[OutProfile]:
"Generate extracted profile from a CV (curriculum vitae)"
try:
llm = self.model.with_structured_output(OutProfile)
cv = await pdf_reader(file_path) # get_pdf(path)
# extract_one_profile = extract_one_profile.format(cv=cv)
chain = self.prompt_extract_one_profile | llm
input_chain = {
"cv":cv
}
# profile = chain.invoke(input_chain, config=None)
profile = await chain.ainvoke(input_chain, config=None)
return profile
except Exception as E:
print(f"Failed to generate one profile for {file_path} due to error, {E}")
raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}")
# async def generate_bulk(self, folder_path:str, export_csv:bool=False) -> Optional[List[OutProfile]]:
# "Generate extracted profile from a CV (curriculum vitae)"
# try:
# st = time.time()
# llm = self.model.with_structured_output(OutProfile)
# files_path = glob.glob(f"{folder_path}/*.pdf")
# profiles = []
# n_files = len(files_path)
# for i, file_path in enumerate(files_path):
# cv = await pdf_reader(file_path) # get_pdf(path)
# chain = self.prompt_extract_one_profile | llm
# input_chain = {
# "cv":cv
# }
# profile = await chain.ainvoke(input_chain, config=None)
# profiles.append(profile)
# print(f"[{i+1}/{n_files}] profile extracted βœ…")
# print(f"βœ… Finish in {(time.time() - st)//60} min, {(time.time() - st)%60} sec")
# return profiles
# except Exception as E:
# print(f"Failed to generate one profile for {file_path} due to error, {E}")
# raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}")
# async def generate_bulk(self, folder_path:str, export_csv:bool=False) -> Optional[List[OutProfile]]:
# "Generate extracted profile from a CV (curriculum vitae)"
# try:
# st = time.time()
# llm = self.model.with_structured_output(OutProfile)
# files_path = glob.glob(f"{folder_path}/*.pdf")
# profiles = []
# n_files = len(files_path)
# for i, file_path in enumerate(files_path):
# cv = await pdf_reader(file_path) # get_pdf(path)
# chain = self.prompt_extract_one_profile | llm
# input_chain = {
# "cv":cv
# }
# profile = await chain.ainvoke(input_chain, config=None)
# profiles.append(profile)
# print(f"[{i+1}/{n_files}] profile extracted βœ…")
# print(f"βœ… Finish in {(time.time() - st)//60} min, {(time.time() - st)%60} sec")
# return profiles
# except Exception as E:
# print(f"Failed to generate one profile for {file_path} due to error, {E}")
# raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}")
# not using threadpool
# async def generate_bulk(self, pdfs:List, export_csv:bool=False) -> Optional[List[OutProfile]]:
# "Generate extracted profile from a CV (curriculum vitae)"
# try:
# st = time.time()
# llm = self.model.with_structured_output(OutProfile)
# profiles = []
# n_files = len(pdfs)
# for i, file_path in enumerate(pdfs):
# print(f"Reading file [{i+1}/{n_files}]")
# cv = await pdf_reader(file_path) # get_pdf(path)
# chain = self.prompt_extract_one_profile | llm
# input_chain = {
# "cv":cv
# }
# profile = await chain.ainvoke(input_chain, config=None)
# profiles.append(profile)
# print(f"[{i+1}/{n_files}] profile extracted βœ…")
# print(f"βœ… Finish in {(time.time() - st)//60} min, {(time.time() - st)%60} sec")
# return profiles
# except Exception as E:
# print(f"Failed to generate one profile for {file_path} due to error, {E}")
# raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}")
async def _helper_generate_one(self, file_path):
st = time.time()
cv = await pdf_reader(file_path) # get_pdf(path)
llm = self.model.with_structured_output(OutProfile)
chain = self.prompt_extract_one_profile | llm
input_chain = {
"cv":cv
}
profile = await chain.ainvoke(input_chain, config=None)
rt = time.time() - st
print(f"Runtime extract one profile: {round(rt,2)}")
return profile
async def generate_bulk(self, pdfs:List, export_csv:bool=False) -> Optional[List[OutProfile]]:
"Generate extracted profile from a CV (curriculum vitae)"
try:
st = time.time()
profiles = []
n_files = len(pdfs)
tasks = []
for i, file_path in enumerate(pdfs):
print(f"Reading file [{i+1}/{n_files}]")
task = asyncio.create_task(self._helper_generate_one(file_path))
tasks.append(task)
print(f"[{i+1}/{n_files}] profile extracted βœ…")
profiles = await asyncio.gather(*tasks)
print(f"βœ… Finish in {(time.time() - st)//60} min, {(time.time() - st)%60} sec")
return profiles
except Exception as E:
print(f"Failed to generate one profile for {file_path} due to error, {E}")
raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}")
async def insert_one_profile(self, profile:Profile):
await ingest_one_profile(profile)
async def insert_bulk_profile(self, profiles:List[Profile]):
await ingest_bulk_profile(profiles)
async def get_profiles(self, criteria:str, limit:int):
retrieved_profiles = await retrieve_profile(input_user=criteria, limit=limit)
return retrieved_profiles
async def get_dataframe_profiles(self, profiles:List[Profile]) -> pd.DataFrame:
df = await pretty_profiles(profiles)
return df
# import asyncio
# # myagent = AutograderAgent(model=model_gemini)
# myagent2 = AutograderAgent(model=model_4o)
# folder_path="src/data/cvs"
# files_path = glob.glob(f"{folder_path}/*.pdf")
# print(len(files_path))
# # res = asyncio.run(myagent.generate_one(file_path=files_path[1]))
# res_bulk = asyncio.run(myagent2.generate_bulk(folder_path=folder_path))
# profiles = asyncio.run(helper_prepare_profiles(files_path, res_bulk))
# asyncio.run(myagent2.insert_bulk_profile(profiles))