import asyncio # import os, glob import uuid import time import pandas as pd from dotenv import load_dotenv load_dotenv() from typing import AsyncIterable, List, Optional, Dict, Union from langchain_core.messages import HumanMessage, AIMessage from langchain_core.callbacks import AsyncCallbackHandler # from fastapi.middleware.cors import CORSMiddleware # from fastapi.responses import StreamingResponse # from langchain.chat_models import ChatOpenAI # from pydantic import BaseModel # from langchain_google_genai import ChatGoogleGenerativeAI from services.llms.LLM import model_5mini, model_4o_2 from services.agents.LLMAgent import LLMAgent from models.data_model import OutProfile, Profile from services.prompts.profile_extraction import extract_one_profile from utils.utils import pdf_reader, ingest_one_profile, ingest_bulk_profile, retrieve_profile, pretty_profiles # from concurrent.futures import ThreadPoolExecutor, as_completed prompt_template_filename = "AutograderPrompt.md" prompt_autograder = open(f"src/prompts/{prompt_template_filename}", "rb").read().decode('utf-8') class AutograderAgent(LLMAgent): def __init__(self, model=model_4o_2): super().__init__(model) self.agent_name = "AutograderAgent" self.prompt_template = prompt_autograder self.prompt_extract_one_profile = extract_one_profile async def generate(self, user_profile: str) -> AsyncIterable[str]: """Generates a response from messages using the model's astream method.""" self.callback = AsyncCallbackHandler() self.callbacks = [self.callback] input = [ HumanMessage(content=self.prompt_template.format(user_profile=user_profile)), ] try: async for token in self.model.astream(input=input, callbacks=self.callbacks): yield token.content except Exception as e: print(f"Caught exception: {e}") async def generate_one(self, file_path:str) -> Optional[OutProfile]: "Generate extracted profile from a CV (curriculum vitae)" try: llm = self.model.with_structured_output(OutProfile) cv = await pdf_reader(file_path) # get_pdf(path) # extract_one_profile = extract_one_profile.format(cv=cv) chain = self.prompt_extract_one_profile | llm input_chain = { "cv":cv } # profile = chain.invoke(input_chain, config=None) profile = await chain.ainvoke(input_chain, config=None) return profile except Exception as E: print(f"Failed to generate one profile for {file_path} due to error, {E}") raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}") # async def generate_bulk(self, folder_path:str, export_csv:bool=False) -> Optional[List[OutProfile]]: # "Generate extracted profile from a CV (curriculum vitae)" # try: # st = time.time() # llm = self.model.with_structured_output(OutProfile) # files_path = glob.glob(f"{folder_path}/*.pdf") # profiles = [] # n_files = len(files_path) # for i, file_path in enumerate(files_path): # cv = await pdf_reader(file_path) # get_pdf(path) # chain = self.prompt_extract_one_profile | llm # input_chain = { # "cv":cv # } # profile = await chain.ainvoke(input_chain, config=None) # profiles.append(profile) # print(f"[{i+1}/{n_files}] profile extracted ✅") # print(f"✅ Finish in {(time.time() - st)//60} min, {(time.time() - st)%60} sec") # return profiles # except Exception as E: # print(f"Failed to generate one profile for {file_path} due to error, {E}") # raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}") # async def generate_bulk(self, folder_path:str, export_csv:bool=False) -> Optional[List[OutProfile]]: # "Generate extracted profile from a CV (curriculum vitae)" # try: # st = time.time() # llm = self.model.with_structured_output(OutProfile) # files_path = glob.glob(f"{folder_path}/*.pdf") # profiles = [] # n_files = len(files_path) # for i, file_path in enumerate(files_path): # cv = await pdf_reader(file_path) # get_pdf(path) # chain = self.prompt_extract_one_profile | llm # input_chain = { # "cv":cv # } # profile = await chain.ainvoke(input_chain, config=None) # profiles.append(profile) # print(f"[{i+1}/{n_files}] profile extracted ✅") # print(f"✅ Finish in {(time.time() - st)//60} min, {(time.time() - st)%60} sec") # return profiles # except Exception as E: # print(f"Failed to generate one profile for {file_path} due to error, {E}") # raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}") # not using threadpool # async def generate_bulk(self, pdfs:List, export_csv:bool=False) -> Optional[List[OutProfile]]: # "Generate extracted profile from a CV (curriculum vitae)" # try: # st = time.time() # llm = self.model.with_structured_output(OutProfile) # profiles = [] # n_files = len(pdfs) # for i, file_path in enumerate(pdfs): # print(f"Reading file [{i+1}/{n_files}]") # cv = await pdf_reader(file_path) # get_pdf(path) # chain = self.prompt_extract_one_profile | llm # input_chain = { # "cv":cv # } # profile = await chain.ainvoke(input_chain, config=None) # profiles.append(profile) # print(f"[{i+1}/{n_files}] profile extracted ✅") # print(f"✅ Finish in {(time.time() - st)//60} min, {(time.time() - st)%60} sec") # return profiles # except Exception as E: # print(f"Failed to generate one profile for {file_path} due to error, {E}") # raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}") async def _helper_generate_one(self, file_path): st = time.time() cv = await pdf_reader(file_path) # get_pdf(path) llm = self.model.with_structured_output(OutProfile) chain = self.prompt_extract_one_profile | llm input_chain = { "cv":cv } profile = await chain.ainvoke(input_chain, config=None) rt = time.time() - st print(f"Runtime extract one profile: {round(rt,2)}") return profile async def generate_bulk(self, pdfs:List, export_csv:bool=False) -> Optional[List[OutProfile]]: "Generate extracted profile from a CV (curriculum vitae)" try: st = time.time() profiles = [] n_files = len(pdfs) tasks = [] for i, file_path in enumerate(pdfs): print(f"Reading file [{i+1}/{n_files}]") task = asyncio.create_task(self._helper_generate_one(file_path)) tasks.append(task) print(f"[{i+1}/{n_files}] profile extracted ✅") profiles = await asyncio.gather(*tasks) print(f"✅ Finish in {(time.time() - st)//60} min, {(time.time() - st)%60} sec") return profiles except Exception as E: print(f"Failed to generate one profile for {file_path} due to error, {E}") raise NotImplementedError(f"Failed to generate one profile for {file_path} due to error, {E}") async def insert_one_profile(self, profile:Profile): await ingest_one_profile(profile) async def insert_bulk_profile(self, profiles:List[Profile]): await ingest_bulk_profile(profiles) async def get_profiles(self, criteria:str, limit:int): retrieved_profiles = await retrieve_profile(input_user=criteria, limit=limit) return retrieved_profiles async def get_dataframe_profiles(self, profiles:List[Profile]) -> pd.DataFrame: df = await pretty_profiles(profiles) return df # import asyncio # # myagent = AutograderAgent(model=model_gemini) # myagent2 = AutograderAgent(model=model_4o) # folder_path="src/data/cvs" # files_path = glob.glob(f"{folder_path}/*.pdf") # print(len(files_path)) # # res = asyncio.run(myagent.generate_one(file_path=files_path[1])) # res_bulk = asyncio.run(myagent2.generate_bulk(folder_path=folder_path)) # profiles = asyncio.run(helper_prepare_profiles(files_path, res_bulk)) # asyncio.run(myagent2.insert_bulk_profile(profiles))