File size: 12,145 Bytes
69e77a3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 | # -*- coding: utf-8 -*-
"""
Data Loading Subgraph Module
This module defines the data loading subgraph, including all node
functions and the subgraph definition. It uses the separate loader classes
(ResumeLoader, JobDescriptionLoader, SystemInitializer) following the
Single Responsibility Principle.
Advanced patterns used:
- **Decorators**: @log_async (cross-cutting logging), @retry_async_load (tenacity
retry for I/O nodes). Stack order: retry innermost, log_async outermost so we
log once per logical call and retry inside.
- **Type hints**: Literal for conditional edge targets (type-safe routing),
StateUpdate alias for partial state dicts.
- **functools.wraps**: Preserved in log_async and in custom retry decorator
so __name__ and docstrings are correct for debugging and LangGraph.
"""
import logging
from typing import Any, Literal, Callable
from functools import wraps
from langgraph.graph import StateGraph
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from job_writing_agent.classes import (
DataLoadState,
CompanyResearchData,
DataLoadingNodes,
AgentWorkflowNodes,
)
from job_writing_agent.nodes.graph_interrupt import GraphInterrupt
from job_writing_agent.nodes.resume_loader import ResumeLoader
from job_writing_agent.nodes.job_description_loader import JobDescriptionLoader
from job_writing_agent.nodes.system_initializer import SystemInitializer
from job_writing_agent.utils.document_processing import analyze_candidate_job_fit
from job_writing_agent.utils.logging.logging_decorators import log_async
logger = logging.getLogger(__name__)
# Type alias for LangGraph partial state updates (nodes return dicts that get merged).
StateUpdate = dict[str, Any]
# Retry config for I/O-bound load nodes (resume, job description).
_LOAD_RETRY_ATTEMPTS = 3
_LOAD_RETRY_MIN_WAIT = 1
_LOAD_RETRY_MAX_WAIT = 10
def _get_cr(state: DataLoadState) -> CompanyResearchData:
"""Safe access to company_research_data; returns empty instance if missing."""
return state.company_research_data or CompanyResearchData()
def retry_async_load(func: Callable) -> Callable:
"""
Decorator: retry async load functions with exponential backoff.
Uses tenacity for retries. Apply below @log_async so we log once per
logical invocation; retries happen inside. Preserves __name__ etc. via wraps.
"""
@retry(
stop=stop_after_attempt(_LOAD_RETRY_ATTEMPTS),
wait=wait_exponential(
multiplier=1, min=_LOAD_RETRY_MIN_WAIT, max=_LOAD_RETRY_MAX_WAIT
),
retry=retry_if_exception_type((OSError, ConnectionError, TimeoutError)),
reraise=True,
)
@wraps(func)
async def wrapper(state: DataLoadState) -> StateUpdate:
return await func(state)
return wrapper
graph_interrupt = GraphInterrupt()
# ============================================================================
# Data Loading Subgraph Node Functions
# ============================================================================
@log_async
async def set_agent_system_message_node(state: DataLoadState) -> StateUpdate:
"""
Node function to initialize system message in workflow state.
This node wraps the SystemInitializer.set_agent_system_message method
for use in the LangGraph workflow.
Parameters
----------
state: DataLoadState
Current workflow state.
Returns
-------
StateUpdate
Partial state update with system message added to messages list.
"""
initializer = SystemInitializer()
return await initializer.set_agent_system_message(state)
@log_async
@retry_async_load
async def load_resume_node(state: DataLoadState) -> StateUpdate:
"""
Load the resume from the configured source (file path). Runs before job description parsing.
Returns only the resume data; LangGraph merges this update with state.
Retries on OSError/ConnectionError/TimeoutError (tenacity).
Parameters
----------
state: DataLoadState
Current workflow state containing resume_path.
Returns
-------
StateUpdate
Partial state update with resume data in company_research_data.
"""
resume_src = state.workflow_inputs.resume_file_path_
if not resume_src:
resume_text = ""
else:
loader = ResumeLoader()
resume_text = await loader.load_resume(resume_src)
logger.info(f"Resume loaded: {len(resume_text)} characters")
cr = _get_cr(state)
return {"company_research_data": cr.model_copy(update={"resume": resume_text})}
def prompt_user_for_resume_node(state: DataLoadState) -> StateUpdate:
"""
Prompt user to provide resume manually via chat (paste text).
Used when resume extraction failed or no path was given. The workflow
pauses; the frontend shows the interrupt payload so the user can paste
their resume. The client resumes with Command(resume=user_input). If the
user sends empty input, we return {} so the router sends execution back here.
"""
logger.info("Resume missing or empty, prompting user to paste resume via chat")
return graph_interrupt.request_input_for_field(
state, "resume", "Please paste your resume in text format:", "resume"
)
def prompt_user_for_job_description_node(state: DataLoadState) -> StateUpdate:
"""
Prompt user to provide job description manually via chat (paste text).
Used when job description extraction failed or no URL was given. The workflow
pauses; the frontend shows the interrupt so the user can paste the job
description. The client resumes with Command(job_description=user_input).
If the user sends empty input, we return {} so the router sends execution back here.
"""
logger.info("Job description missing or empty, prompting user to paste via chat")
return graph_interrupt.request_input_for_field(
state, "job_description", "Please paste the job description:", "job description"
)
def route_after_resume_load(
state: DataLoadState,
) -> Literal["prompt_user_for_resume", "load_job_description"]:
"""After load_resume: if resume is empty, go to prompt_user_for_resume; else go to load_job_description."""
cr = state.company_research_data
resume = (cr.resume if cr else "") or ""
resume = str(resume).strip()
if not resume:
logger.info("Resume is empty, routing to prompt_user_for_resume")
return "prompt_user_for_resume"
logger.info("Resume is present, routing to load_job_description")
return "load_job_description"
def route_after_job_load(
state: DataLoadState,
) -> Literal["prompt_user_for_job_description", "candidate_job_fit_analysis"]:
"""After load_job_description: if job_description is empty, go to prompt_user_for_job_description; else go to candidate_job_fit_analysis."""
cr = state.company_research_data
job_desc = (cr.job_description if cr else "") or ""
job_desc = str(job_desc).strip()
if not job_desc:
logger.info(
"Job description is empty, routing to prompt_user_for_job_description"
)
return "prompt_user_for_job_description"
logger.info("Job description is present, routing to candidate_job_fit_analysis")
return "candidate_job_fit_analysis"
@log_async
@retry_async_load
async def load_job_description_node(state: DataLoadState) -> StateUpdate:
"""
Load the job description from the configured URL. Runs after resume is loaded or provided via interrupt.
Returns job description and company name in company_research_data; LangGraph merges this update with state.
Retries on OSError/ConnectionError/TimeoutError (tenacity).
Parameters
----------
state: DataLoadState
Current workflow state containing job_description_url_.
Returns
-------
StateUpdate
Partial state update with job description and company name in company_research_data.
"""
jd_src = state.workflow_inputs.job_description_url_
if not jd_src:
job_text = ""
company_name = ""
else:
loader = JobDescriptionLoader()
job_text, company_name = await loader.load_job_description(jd_src)
cr = _get_cr(state)
resume_text = cr.resume or ""
logger.info(
f"Job description loaded: {len(job_text)} characters, company: {company_name}"
)
return {
"company_research_data": cr.model_copy(
update={
"resume": resume_text,
"job_description": job_text,
"company_name": company_name,
}
)
}
@log_async
async def candidate_job_fit_analysis_node(state: DataLoadState) -> StateUpdate:
"""
Analyze candidate-job fit using DSPy after resume and job description are loaded.
Uses the resume and job description to generate actionable insights
for downstream content generation (cover letter, bullets, LinkedIn note).
Parameters
----------
state: DataLoadState
Current workflow state with resume and job description loaded.
Returns
-------
StateUpdate
Partial state update with candidate_job_fit_analysis in company_research_data
and next_node set to NodeName.RESEARCH for main graph routing.
"""
cr = _get_cr(state)
resume_text = cr.resume or ""
job_description = cr.job_description or ""
company_name = cr.company_name or ""
# Validate inputs (should always pass due to routing, but log if not)
if not resume_text.strip():
logger.warning("Resume is empty in candidate_job_fit_analysis_node")
if not job_description.strip():
logger.warning("Job description is empty in candidate_job_fit_analysis_node")
analysis = await analyze_candidate_job_fit(
resume_text=resume_text,
job_description=job_description,
company_name=company_name,
)
logger.info("Candidate-job fit analysis node completed")
return {
"company_research_data": cr.model_copy(
update={"candidate_job_fit_analysis": analysis}
),
"next_node": AgentWorkflowNodes.RESEARCH,
}
# ============================================================================
# Data Loading Subgraph Definition
# ============================================================================
N = DataLoadingNodes # Shorthand for graph construction
data_loading_subgraph = StateGraph(DataLoadState)
data_loading_subgraph.add_node(
N.SET_AGENT_SYSTEM_MESSAGE.value, set_agent_system_message_node
)
data_loading_subgraph.add_node(N.LOAD_RESUME.value, load_resume_node)
data_loading_subgraph.add_node(N.LOAD_JOB_DESCRIPTION.value, load_job_description_node)
data_loading_subgraph.add_node(
N.PROMPT_USER_FOR_RESUME.value, prompt_user_for_resume_node
)
data_loading_subgraph.add_node(
N.PROMPT_USER_FOR_JOB_DESCRIPTION.value, prompt_user_for_job_description_node
)
data_loading_subgraph.add_node(
N.CANDIDATE_JOB_FIT_ANALYSIS.value, candidate_job_fit_analysis_node
)
data_loading_subgraph.set_entry_point(N.SET_AGENT_SYSTEM_MESSAGE.value)
data_loading_subgraph.set_finish_point(N.CANDIDATE_JOB_FIT_ANALYSIS.value)
data_loading_subgraph.add_edge(N.SET_AGENT_SYSTEM_MESSAGE.value, N.LOAD_RESUME.value)
data_loading_subgraph.add_conditional_edges(
N.LOAD_RESUME.value,
route_after_resume_load,
{
N.PROMPT_USER_FOR_RESUME.value: N.PROMPT_USER_FOR_RESUME.value,
N.LOAD_JOB_DESCRIPTION.value: N.LOAD_JOB_DESCRIPTION.value,
},
)
data_loading_subgraph.add_edge(
N.PROMPT_USER_FOR_RESUME.value, N.LOAD_JOB_DESCRIPTION.value
)
data_loading_subgraph.add_conditional_edges(
N.LOAD_JOB_DESCRIPTION.value,
route_after_job_load,
{
N.PROMPT_USER_FOR_JOB_DESCRIPTION.value: N.PROMPT_USER_FOR_JOB_DESCRIPTION.value,
N.CANDIDATE_JOB_FIT_ANALYSIS.value: N.CANDIDATE_JOB_FIT_ANALYSIS.value,
},
)
data_loading_subgraph.add_edge(
N.PROMPT_USER_FOR_JOB_DESCRIPTION.value, N.CANDIDATE_JOB_FIT_ANALYSIS.value
)
data_loading_workflow = data_loading_subgraph.compile(name="Data Load Subgraph")
|