Spaces:
Sleeping
Sleeping
| import asyncio | |
| # import os, glob | |
| import uuid | |
| import time | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| from typing import AsyncIterable, List, Optional, Dict, Union | |
| from langchain_core.messages import HumanMessage, AIMessage | |
| from langchain_core.callbacks import AsyncCallbackHandler | |
| # from fastapi.middleware.cors import CORSMiddleware | |
| # from fastapi.responses import StreamingResponse | |
| # from langchain.chat_models import ChatOpenAI | |
| # from pydantic import BaseModel | |
| # from langchain_google_genai import ChatGoogleGenerativeAI | |
| from services.llms.LLM import model_5mini, model_4o_2 | |
| from services.agents.LLMAgent import LLMAgent | |
| from models.data_model import OutProfile, Profile | |
| from services.prompts.profile_extraction import extract_one_profile | |
| from utils.utils import pdf_reader, ingest_one_profile, ingest_bulk_profile, retrieve_profile, pretty_profiles | |
| # from concurrent.futures import ThreadPoolExecutor, as_completed | |
| prompt_template_filename = "AutograderPrompt.md" | |
| prompt_autograder = open(f"src/prompts/{prompt_template_filename}", "rb").read().decode('utf-8') | |
| class AutograderAgent(LLMAgent): | |
| def __init__(self, model=model_4o_2): | |
| super().__init__(model) | |
| self.agent_name = "AutograderAgent" | |
| self.prompt_template = prompt_autograder | |
| self.prompt_extract_one_profile = extract_one_profile | |
| async def generate(self, user_profile: str) -> AsyncIterable[str]: | |
| """Generates a response from messages using the model's astream method.""" | |
| self.callback = AsyncCallbackHandler() | |
| self.callbacks = [self.callback] | |
| input = [ | |
| HumanMessage(content=self.prompt_template.format(user_profile=user_profile)), | |
| ] | |
| try: | |
| async for token in self.model.astream(input=input, callbacks=self.callbacks): | |
| yield token.content | |
| except Exception as e: | |
| print(f"Caught exception: {e}") | |
| async def generate_one(self, file_path:str) -> Optional[OutProfile]: | |
| "Generate extracted profile from a CV (curriculum vitae)" | |
| try: | |
| llm = self.model.with_structured_output(OutProfile) | |
| cv = await pdf_reader(file_path) # get_pdf(path) | |
| # extract_one_profile = extract_one_profile.format(cv=cv) | |
| chain = self.prompt_extract_one_profile | llm | |
| input_chain = { | |
| "cv":cv | |
| } | |
| # profile = chain.invoke(input_chain, config=None) | |
| profile = await chain.ainvoke(input_chain, config=None) | |
| return profile | |
| except Exception as E: | |
| print(f"Failed to generate one profile for {file_path} due to error, {E}") | |
| raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}") | |
| # async def generate_bulk(self, folder_path:str, export_csv:bool=False) -> Optional[List[OutProfile]]: | |
| # "Generate extracted profile from a CV (curriculum vitae)" | |
| # try: | |
| # st = time.time() | |
| # llm = self.model.with_structured_output(OutProfile) | |
| # files_path = glob.glob(f"{folder_path}/*.pdf") | |
| # profiles = [] | |
| # n_files = len(files_path) | |
| # for i, file_path in enumerate(files_path): | |
| # cv = await pdf_reader(file_path) # get_pdf(path) | |
| # chain = self.prompt_extract_one_profile | llm | |
| # input_chain = { | |
| # "cv":cv | |
| # } | |
| # profile = await chain.ainvoke(input_chain, config=None) | |
| # profiles.append(profile) | |
| # print(f"[{i+1}/{n_files}] profile extracted β ") | |
| # print(f"β Finish in {(time.time() - st)//60} min, {(time.time() - st)%60} sec") | |
| # return profiles | |
| # except Exception as E: | |
| # print(f"Failed to generate one profile for {file_path} due to error, {E}") | |
| # raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}") | |
| # async def generate_bulk(self, folder_path:str, export_csv:bool=False) -> Optional[List[OutProfile]]: | |
| # "Generate extracted profile from a CV (curriculum vitae)" | |
| # try: | |
| # st = time.time() | |
| # llm = self.model.with_structured_output(OutProfile) | |
| # files_path = glob.glob(f"{folder_path}/*.pdf") | |
| # profiles = [] | |
| # n_files = len(files_path) | |
| # for i, file_path in enumerate(files_path): | |
| # cv = await pdf_reader(file_path) # get_pdf(path) | |
| # chain = self.prompt_extract_one_profile | llm | |
| # input_chain = { | |
| # "cv":cv | |
| # } | |
| # profile = await chain.ainvoke(input_chain, config=None) | |
| # profiles.append(profile) | |
| # print(f"[{i+1}/{n_files}] profile extracted β ") | |
| # print(f"β Finish in {(time.time() - st)//60} min, {(time.time() - st)%60} sec") | |
| # return profiles | |
| # except Exception as E: | |
| # print(f"Failed to generate one profile for {file_path} due to error, {E}") | |
| # raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}") | |
| # not using threadpool | |
| # async def generate_bulk(self, pdfs:List, export_csv:bool=False) -> Optional[List[OutProfile]]: | |
| # "Generate extracted profile from a CV (curriculum vitae)" | |
| # try: | |
| # st = time.time() | |
| # llm = self.model.with_structured_output(OutProfile) | |
| # profiles = [] | |
| # n_files = len(pdfs) | |
| # for i, file_path in enumerate(pdfs): | |
| # print(f"Reading file [{i+1}/{n_files}]") | |
| # cv = await pdf_reader(file_path) # get_pdf(path) | |
| # chain = self.prompt_extract_one_profile | llm | |
| # input_chain = { | |
| # "cv":cv | |
| # } | |
| # profile = await chain.ainvoke(input_chain, config=None) | |
| # profiles.append(profile) | |
| # print(f"[{i+1}/{n_files}] profile extracted β ") | |
| # print(f"β Finish in {(time.time() - st)//60} min, {(time.time() - st)%60} sec") | |
| # return profiles | |
| # except Exception as E: | |
| # print(f"Failed to generate one profile for {file_path} due to error, {E}") | |
| # raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}") | |
| async def _helper_generate_one(self, file_path): | |
| st = time.time() | |
| cv = await pdf_reader(file_path) # get_pdf(path) | |
| llm = self.model.with_structured_output(OutProfile) | |
| chain = self.prompt_extract_one_profile | llm | |
| input_chain = { | |
| "cv":cv | |
| } | |
| profile = await chain.ainvoke(input_chain, config=None) | |
| rt = time.time() - st | |
| print(f"Runtime extract one profile: {round(rt,2)}") | |
| return profile | |
| async def generate_bulk(self, pdfs:List, export_csv:bool=False) -> Optional[List[OutProfile]]: | |
| "Generate extracted profile from a CV (curriculum vitae)" | |
| try: | |
| st = time.time() | |
| profiles = [] | |
| n_files = len(pdfs) | |
| tasks = [] | |
| for i, file_path in enumerate(pdfs): | |
| print(f"Reading file [{i+1}/{n_files}]") | |
| task = asyncio.create_task(self._helper_generate_one(file_path)) | |
| tasks.append(task) | |
| print(f"[{i+1}/{n_files}] profile extracted β ") | |
| profiles = await asyncio.gather(*tasks) | |
| print(f"β Finish in {(time.time() - st)//60} min, {(time.time() - st)%60} sec") | |
| return profiles | |
| except Exception as E: | |
| print(f"Failed to generate one profile for {file_path} due to error, {E}") | |
| raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}") | |
| async def insert_one_profile(self, profile:Profile): | |
| await ingest_one_profile(profile) | |
| async def insert_bulk_profile(self, profiles:List[Profile]): | |
| await ingest_bulk_profile(profiles) | |
| async def get_profiles(self, criteria:str, limit:int): | |
| retrieved_profiles = await retrieve_profile(input_user=criteria, limit=limit) | |
| return retrieved_profiles | |
| async def get_dataframe_profiles(self, profiles:List[Profile]) -> pd.DataFrame: | |
| df = await pretty_profiles(profiles) | |
| return df | |
| # import asyncio | |
| # # myagent = AutograderAgent(model=model_gemini) | |
| # myagent2 = AutograderAgent(model=model_4o) | |
| # folder_path="src/data/cvs" | |
| # files_path = glob.glob(f"{folder_path}/*.pdf") | |
| # print(len(files_path)) | |
| # # res = asyncio.run(myagent.generate_one(file_path=files_path[1])) | |
| # res_bulk = asyncio.run(myagent2.generate_bulk(folder_path=folder_path)) | |
| # profiles = asyncio.run(helper_prepare_profiles(files_path, res_bulk)) | |
| # asyncio.run(myagent2.insert_bulk_profile(profiles)) | |