Jb_Applier_AI_Agent / src /libs /llm_manager.py
jaothan's picture
Upload 59 files
08d0f6d verified
import json
import os
import re
import textwrap
import time
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Union
import httpx
from dotenv import load_dotenv
from langchain_core.messages import BaseMessage
from langchain_core.messages.ai import AIMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompt_values import StringPromptValue
from langchain_core.prompts import ChatPromptTemplate
from Levenshtein import distance
import ai_hawk.llm.prompts as prompts
from config import JOB_SUITABILITY_SCORE
from src.utils.constants import (
AVAILABILITY,
CERTIFICATIONS,
CLAUDE,
COMPANY,
CONTENT,
COVER_LETTER,
EDUCATION_DETAILS,
EXPERIENCE_DETAILS,
FINISH_REASON,
GEMINI,
HUGGINGFACE,
ID,
INPUT_TOKENS,
INTERESTS,
JOB_APPLICATION_PROFILE,
JOB_DESCRIPTION,
LANGUAGES,
LEGAL_AUTHORIZATION,
LLM_MODEL_TYPE,
LOGPROBS,
MODEL,
MODEL_NAME,
OLLAMA,
OPENAI,
PERPLEXITY,
OPTIONS,
OUTPUT_TOKENS,
PERSONAL_INFORMATION,
PHRASE,
PROJECTS,
PROMPTS,
QUESTION,
REPLIES,
RESPONSE_METADATA,
RESUME,
RESUME_EDUCATIONS,
RESUME_JOBS,
RESUME_PROJECTS,
RESUME_SECTION,
SALARY_EXPECTATIONS,
SELF_IDENTIFICATION,
SYSTEM_FINGERPRINT,
TEXT,
TIME,
TOKEN_USAGE,
TOTAL_COST,
TOTAL_TOKENS,
USAGE_METADATA,
WORK_PREFERENCES,
)
from src.job import Job
from src.logging import logger
import config as cfg
load_dotenv()
class AIModel(ABC):
@abstractmethod
def invoke(self, prompt: str) -> str:
pass
class OpenAIModel(AIModel):
def __init__(self, api_key: str, llm_model: str):
from langchain_openai import ChatOpenAI
self.model = ChatOpenAI(
model_name=llm_model, openai_api_key=api_key, temperature=0.4
)
def invoke(self, prompt: str) -> BaseMessage:
logger.debug("Invoking OpenAI API")
response = self.model.invoke(prompt)
return response
class ClaudeModel(AIModel):
def __init__(self, api_key: str, llm_model: str):
from langchain_anthropic import ChatAnthropic
self.model = ChatAnthropic(model=llm_model, api_key=api_key, temperature=0.4)
def invoke(self, prompt: str) -> BaseMessage:
response = self.model.invoke(prompt)
logger.debug("Invoking Claude API")
return response
class OllamaModel(AIModel):
def __init__(self, llm_model: str, llm_api_url: str):
from langchain_ollama import ChatOllama
if len(llm_api_url) > 0:
logger.debug(f"Using Ollama with API URL: {llm_api_url}")
self.model = ChatOllama(model=llm_model, base_url=llm_api_url)
else:
self.model = ChatOllama(model=llm_model)
def invoke(self, prompt: str) -> BaseMessage:
response = self.model.invoke(prompt)
return response
class PerplexityModel(AIModel):
def __init__(self, api_key: str, llm_model: str):
from langchain_community.chat_models import ChatPerplexity
self.model = ChatPerplexity(model=llm_model, api_key=api_key, temperature=0.4)
def invoke(self, prompt: str) -> BaseMessage:
response = self.model.invoke(prompt)
return response
# gemini doesn't seem to work because API doesn't rstitute answers for questions that involve answers that are too short
class GeminiModel(AIModel):
def __init__(self, api_key: str, llm_model: str):
from langchain_google_genai import (
ChatGoogleGenerativeAI,
HarmBlockThreshold,
HarmCategory,
)
self.model = ChatGoogleGenerativeAI(
model=llm_model,
google_api_key=api_key,
safety_settings={
HarmCategory.HARM_CATEGORY_UNSPECIFIED: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_DEROGATORY: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_TOXICITY: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_VIOLENCE: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_SEXUAL: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_MEDICAL: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_DANGEROUS: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
},
)
def invoke(self, prompt: str) -> BaseMessage:
response = self.model.invoke(prompt)
return response
class HuggingFaceModel(AIModel):
def __init__(self, api_key: str, llm_model: str):
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
self.model = HuggingFaceEndpoint(
repo_id=llm_model, huggingfacehub_api_token=api_key, temperature=0.4
)
self.chatmodel = ChatHuggingFace(llm=self.model)
def invoke(self, prompt: str) -> BaseMessage:
response = self.chatmodel.invoke(prompt)
logger.debug(
f"Invoking Model from Hugging Face API. Response: {response}, Type: {type(response)}"
)
return response
class AIAdapter:
def __init__(self, config: dict, api_key: str):
self.model = self._create_model(config, api_key)
def _create_model(self, config: dict, api_key: str) -> AIModel:
llm_model_type = cfg.LLM_MODEL_TYPE
llm_model = cfg.LLM_MODEL
llm_api_url = cfg.LLM_API_URL
logger.debug(f"Using {llm_model_type} with {llm_model}")
if llm_model_type == OPENAI:
return OpenAIModel(api_key, llm_model)
elif llm_model_type == CLAUDE:
return ClaudeModel(api_key, llm_model)
elif llm_model_type == OLLAMA:
return OllamaModel(llm_model, llm_api_url)
elif llm_model_type == GEMINI:
return GeminiModel(api_key, llm_model)
elif llm_model_type == HUGGINGFACE:
return HuggingFaceModel(api_key, llm_model)
elif llm_model_type == PERPLEXITY:
return PerplexityModel(api_key, llm_model)
else:
raise ValueError(f"Unsupported model type: {llm_model_type}")
def invoke(self, prompt: str) -> str:
return self.model.invoke(prompt)
class LLMLogger:
def __init__(self, llm: Union[OpenAIModel, OllamaModel, ClaudeModel, GeminiModel]):
self.llm = llm
logger.debug(f"LLMLogger successfully initialized with LLM: {llm}")
@staticmethod
def log_request(prompts, parsed_reply: Dict[str, Dict]):
logger.debug("Starting log_request method")
logger.debug(f"Prompts received: {prompts}")
logger.debug(f"Parsed reply received: {parsed_reply}")
try:
calls_log = os.path.join(Path("data_folder/output"), "open_ai_calls.json")
logger.debug(f"Logging path determined: {calls_log}")
except Exception as e:
logger.error(f"Error determining the log path: {str(e)}")
raise
if isinstance(prompts, StringPromptValue):
logger.debug("Prompts are of type StringPromptValue")
prompts = prompts.text
logger.debug(f"Prompts converted to text: {prompts}")
elif isinstance(prompts, Dict):
logger.debug("Prompts are of type Dict")
try:
prompts = {
f"prompt_{i + 1}": prompt.content
for i, prompt in enumerate(prompts.messages)
}
logger.debug(f"Prompts converted to dictionary: {prompts}")
except Exception as e:
logger.error(f"Error converting prompts to dictionary: {str(e)}")
raise
else:
logger.debug("Prompts are of unknown type, attempting default conversion")
try:
prompts = {
f"prompt_{i + 1}": prompt.content
for i, prompt in enumerate(prompts.messages)
}
logger.debug(
f"Prompts converted to dictionary using default method: {prompts}"
)
except Exception as e:
logger.error(f"Error converting prompts using default method: {str(e)}")
raise
try:
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.debug(f"Current time obtained: {current_time}")
except Exception as e:
logger.error(f"Error obtaining current time: {str(e)}")
raise
try:
token_usage = parsed_reply[USAGE_METADATA]
output_tokens = token_usage[OUTPUT_TOKENS]
input_tokens = token_usage[INPUT_TOKENS]
total_tokens = token_usage[TOTAL_TOKENS]
logger.debug(
f"Token usage - Input: {input_tokens}, Output: {output_tokens}, Total: {total_tokens}"
)
except KeyError as e:
logger.error(f"KeyError in parsed_reply structure: {str(e)}")
raise
try:
model_name = parsed_reply[RESPONSE_METADATA][MODEL_NAME]
logger.debug(f"Model name: {model_name}")
except KeyError as e:
logger.error(f"KeyError in response_metadata: {str(e)}")
raise
try:
prompt_price_per_token = 0.00000015
completion_price_per_token = 0.0000006
total_cost = (input_tokens * prompt_price_per_token) + (
output_tokens * completion_price_per_token
)
logger.debug(f"Total cost calculated: {total_cost}")
except Exception as e:
logger.error(f"Error calculating total cost: {str(e)}")
raise
try:
log_entry = {
MODEL: model_name,
TIME: current_time,
PROMPTS: prompts,
REPLIES: parsed_reply[CONTENT],
TOTAL_TOKENS: total_tokens,
INPUT_TOKENS: input_tokens,
OUTPUT_TOKENS: output_tokens,
TOTAL_COST: total_cost,
}
logger.debug(f"Log entry created: {log_entry}")
except KeyError as e:
logger.error(
f"Error creating log entry: missing key {str(e)} in parsed_reply"
)
raise
try:
with open(calls_log, "a", encoding="utf-8") as f:
json_string = json.dumps(log_entry, ensure_ascii=False, indent=4)
f.write(json_string + "\n")
logger.debug(f"Log entry written to file: {calls_log}")
except Exception as e:
logger.error(f"Error writing log entry to file: {str(e)}")
raise
class LoggerChatModel:
def __init__(self, llm: Union[OpenAIModel, OllamaModel, ClaudeModel, GeminiModel]):
self.llm = llm
logger.debug(f"LoggerChatModel successfully initialized with LLM: {llm}")
def __call__(self, messages: List[Dict[str, str]]) -> str:
logger.debug(f"Entering __call__ method with messages: {messages}")
while True:
try:
logger.debug("Attempting to call the LLM with messages")
reply = self.llm.invoke(messages)
logger.debug(f"LLM response received: {reply}")
parsed_reply = self.parse_llmresult(reply)
logger.debug(f"Parsed LLM reply: {parsed_reply}")
LLMLogger.log_request(prompts=messages, parsed_reply=parsed_reply)
logger.debug("Request successfully logged")
return reply
except httpx.HTTPStatusError as e:
logger.error(f"HTTPStatusError encountered: {str(e)}")
if e.response.status_code == 429:
retry_after = e.response.headers.get("retry-after")
retry_after_ms = e.response.headers.get("retry-after-ms")
if retry_after:
wait_time = int(retry_after)
logger.warning(
f"Rate limit exceeded. Waiting for {wait_time} seconds before retrying (extracted from 'retry-after' header)..."
)
time.sleep(wait_time)
elif retry_after_ms:
wait_time = int(retry_after_ms) / 1000.0
logger.warning(
f"Rate limit exceeded. Waiting for {wait_time} seconds before retrying (extracted from 'retry-after-ms' header)..."
)
time.sleep(wait_time)
else:
wait_time = 30
logger.warning(
f"'retry-after' header not found. Waiting for {wait_time} seconds before retrying (default)..."
)
time.sleep(wait_time)
else:
logger.error(
f"HTTP error occurred with status code: {e.response.status_code}, waiting 30 seconds before retrying"
)
time.sleep(30)
except Exception as e:
logger.error(f"Unexpected error occurred: {str(e)}")
logger.info(
"Waiting for 30 seconds before retrying due to an unexpected error."
)
time.sleep(30)
continue
def parse_llmresult(self, llmresult: AIMessage) -> Dict[str, Dict]:
logger.debug(f"Parsing LLM result: {llmresult}")
try:
if hasattr(llmresult, USAGE_METADATA):
content = llmresult.content
response_metadata = llmresult.response_metadata
id_ = llmresult.id
usage_metadata = llmresult.usage_metadata
parsed_result = {
CONTENT: content,
RESPONSE_METADATA: {
MODEL_NAME: response_metadata.get(
MODEL_NAME, ""
),
SYSTEM_FINGERPRINT: response_metadata.get(
SYSTEM_FINGERPRINT, ""
),
FINISH_REASON: response_metadata.get(
FINISH_REASON, ""
),
LOGPROBS: response_metadata.get(
LOGPROBS, None
),
},
ID: id_,
USAGE_METADATA: {
INPUT_TOKENS: usage_metadata.get(
INPUT_TOKENS, 0
),
OUTPUT_TOKENS: usage_metadata.get(
OUTPUT_TOKENS, 0
),
TOTAL_TOKENS: usage_metadata.get(
TOTAL_TOKENS, 0
),
},
}
else:
content = llmresult.content
response_metadata = llmresult.response_metadata
id_ = llmresult.id
token_usage = response_metadata[TOKEN_USAGE]
parsed_result = {
CONTENT: content,
RESPONSE_METADATA: {
MODEL_NAME: response_metadata.get(
MODEL, ""
),
FINISH_REASON: response_metadata.get(
FINISH_REASON, ""
),
},
ID: id_,
USAGE_METADATA: {
INPUT_TOKENS: token_usage.prompt_tokens,
OUTPUT_TOKENS: token_usage.completion_tokens,
TOTAL_TOKENS: token_usage.total_tokens,
},
}
logger.debug(f"Parsed LLM result successfully: {parsed_result}")
return parsed_result
except KeyError as e:
logger.error(f"KeyError while parsing LLM result: missing key {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error while parsing LLM result: {str(e)}")
raise
class GPTAnswerer:
def __init__(self, config, llm_api_key):
self.ai_adapter = AIAdapter(config, llm_api_key)
self.llm_cheap = LoggerChatModel(self.ai_adapter)
@property
def job_description(self):
return self.job.description
@staticmethod
def find_best_match(text: str, options: list[str]) -> str:
logger.debug(f"Finding best match for text: '{text}' in options: {options}")
distances = [
(option, distance(text.lower(), option.lower())) for option in options
]
best_option = min(distances, key=lambda x: x[1])[0]
logger.debug(f"Best match found: {best_option}")
return best_option
@staticmethod
def _remove_placeholders(text: str) -> str:
logger.debug(f"Removing placeholders from text: {text}")
text = text.replace("PLACEHOLDER", "")
return text.strip()
@staticmethod
def _preprocess_template_string(template: str) -> str:
logger.debug("Preprocessing template string")
return textwrap.dedent(template)
def set_resume(self, resume):
logger.debug(f"Setting resume: {resume}")
self.resume = resume
def set_job(self, job: Job):
logger.debug(f"Setting job: {job}")
self.job = job
self.job.set_summarize_job_description(
self.summarize_job_description(self.job.description)
)
def set_job_application_profile(self, job_application_profile):
logger.debug(f"Setting job application profile: {job_application_profile}")
self.job_application_profile = job_application_profile
def _clean_llm_output(self, output: str) -> str:
return output.replace("*", "").replace("#", "").strip()
def summarize_job_description(self, text: str) -> str:
logger.debug(f"Summarizing job description: {text}")
prompts.summarize_prompt_template = self._preprocess_template_string(
prompts.summarize_prompt_template
)
prompt = ChatPromptTemplate.from_template(prompts.summarize_prompt_template)
chain = prompt | self.llm_cheap | StrOutputParser()
raw_output = chain.invoke({TEXT: text})
output = self._clean_llm_output(raw_output)
logger.debug(f"Summary generated: {output}")
return output
def _create_chain(self, template: str):
logger.debug(f"Creating chain with template: {template}")
prompt = ChatPromptTemplate.from_template(template)
return prompt | self.llm_cheap | StrOutputParser()
def answer_question_textual_wide_range(self, question: str) -> str:
logger.debug(f"Answering textual question: {question}")
chains = {
PERSONAL_INFORMATION: self._create_chain(
prompts.personal_information_template
),
SELF_IDENTIFICATION: self._create_chain(
prompts.self_identification_template
),
LEGAL_AUTHORIZATION: self._create_chain(
prompts.legal_authorization_template
),
WORK_PREFERENCES: self._create_chain(
prompts.work_preferences_template
),
EDUCATION_DETAILS: self._create_chain(
prompts.education_details_template
),
EXPERIENCE_DETAILS: self._create_chain(
prompts.experience_details_template
),
PROJECTS: self._create_chain(prompts.projects_template),
AVAILABILITY: self._create_chain(prompts.availability_template),
SALARY_EXPECTATIONS: self._create_chain(
prompts.salary_expectations_template
),
CERTIFICATIONS: self._create_chain(
prompts.certifications_template
),
LANGUAGES: self._create_chain(prompts.languages_template),
INTERESTS: self._create_chain(prompts.interests_template),
COVER_LETTER: self._create_chain(prompts.coverletter_template),
}
prompt = ChatPromptTemplate.from_template(prompts.determine_section_template)
chain = prompt | self.llm_cheap | StrOutputParser()
raw_output = chain.invoke({QUESTION: question})
output = self._clean_llm_output(raw_output)
match = re.search(
r"(Personal information|Self Identification|Legal Authorization|Work Preferences|Education "
r"Details|Experience Details|Projects|Availability|Salary "
r"Expectations|Certifications|Languages|Interests|Cover letter)",
output,
re.IGNORECASE,
)
if not match:
raise ValueError("Could not extract section name from the response.")
section_name = match.group(1).lower().replace(" ", "_")
if section_name == "cover_letter":
chain = chains.get(section_name)
raw_output = chain.invoke(
{
RESUME: self.resume,
JOB_DESCRIPTION: self.job_description,
COMPANY: self.job.company,
}
)
output = self._clean_llm_output(raw_output)
logger.debug(f"Cover letter generated: {output}")
return output
resume_section = getattr(self.resume, section_name, None) or getattr(
self.job_application_profile, section_name, None
)
if resume_section is None:
logger.error(
f"Section '{section_name}' not found in either resume or job_application_profile."
)
raise ValueError(
f"Section '{section_name}' not found in either resume or job_application_profile."
)
chain = chains.get(section_name)
if chain is None:
logger.error(f"Chain not defined for section '{section_name}'")
raise ValueError(f"Chain not defined for section '{section_name}'")
raw_output = chain.invoke(
{RESUME_SECTION: resume_section, QUESTION: question}
)
output = self._clean_llm_output(raw_output)
logger.debug(f"Question answered: {output}")
return output
def answer_question_numeric(
self, question: str, default_experience: str = 3
) -> str:
logger.debug(f"Answering numeric question: {question}")
func_template = self._preprocess_template_string(
prompts.numeric_question_template
)
prompt = ChatPromptTemplate.from_template(func_template)
chain = prompt | self.llm_cheap | StrOutputParser()
raw_output_str = chain.invoke(
{
RESUME_EDUCATIONS: self.resume.education_details,
RESUME_JOBS: self.resume.experience_details,
RESUME_PROJECTS: self.resume.projects,
QUESTION: question,
}
)
output_str = self._clean_llm_output(raw_output_str)
logger.debug(f"Raw output for numeric question: {output_str}")
try:
output = self.extract_number_from_string(output_str)
logger.debug(f"Extracted number: {output}")
except ValueError:
logger.warning(
f"Failed to extract number, using default experience: {default_experience}"
)
output = default_experience
return output
def extract_number_from_string(self, output_str):
logger.debug(f"Extracting number from string: {output_str}")
numbers = re.findall(r"\d+", output_str)
if numbers:
logger.debug(f"Numbers found: {numbers}")
return str(numbers[0])
else:
logger.error("No numbers found in the string")
raise ValueError("No numbers found in the string")
def answer_question_from_options(self, question: str, options: list[str]) -> str:
logger.debug(f"Answering question from options: {question}")
func_template = self._preprocess_template_string(prompts.options_template)
prompt = ChatPromptTemplate.from_template(func_template)
chain = prompt | self.llm_cheap | StrOutputParser()
raw_output_str = chain.invoke(
{
RESUME: self.resume,
JOB_APPLICATION_PROFILE: self.job_application_profile,
QUESTION: question,
OPTIONS: options,
}
)
output_str = self._clean_llm_output(raw_output_str)
logger.debug(f"Raw output for options question: {output_str}")
best_option = self.find_best_match(output_str, options)
logger.debug(f"Best option determined: {best_option}")
return best_option
def determine_resume_or_cover(self, phrase: str) -> str:
logger.debug(
f"Determining if phrase refers to resume or cover letter: {phrase}"
)
prompt = ChatPromptTemplate.from_template(
prompts.resume_or_cover_letter_template
)
chain = prompt | self.llm_cheap | StrOutputParser()
raw_response = chain.invoke({PHRASE: phrase})
response = self._clean_llm_output(raw_response)
logger.debug(f"Response for resume_or_cover: {response}")
if "resume" in response:
return "resume"
elif "cover" in response:
return "cover"
else:
return "resume"
def is_job_suitable(self):
logger.info("Checking if job is suitable")
prompt = ChatPromptTemplate.from_template(prompts.is_relavant_position_template)
chain = prompt | self.llm_cheap | StrOutputParser()
raw_output = chain.invoke(
{
RESUME: self.resume,
JOB_DESCRIPTION: self.job_description,
}
)
output = self._clean_llm_output(raw_output)
logger.debug(f"Job suitability output: {output}")
try:
score = re.search(r"Score:\s*(\d+)", output, re.IGNORECASE).group(1)
reasoning = re.search(r"Reasoning:\s*(.+)", output, re.IGNORECASE | re.DOTALL).group(1)
except AttributeError:
logger.warning("Failed to extract score or reasoning from LLM. Proceeding with application, but job may or may not be suitable.")
return True
logger.info(f"Job suitability score: {score}")
if int(score) < JOB_SUITABILITY_SCORE:
logger.debug(f"Job is not suitable: {reasoning}")
return int(score) >= JOB_SUITABILITY_SCORE