File size: 12,145 Bytes
69e77a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# -*- coding: utf-8 -*-
"""
Data Loading Subgraph Module

This module defines the data loading subgraph, including all node
functions and the subgraph definition. It uses the separate loader classes
(ResumeLoader, JobDescriptionLoader, SystemInitializer) following the
Single Responsibility Principle.

Advanced patterns used:
- **Decorators**: @log_async (cross-cutting logging), @retry_async_load (tenacity
  retry for I/O nodes). Stack order: retry innermost, log_async outermost so we
  log once per logical call and retry inside.
- **Type hints**: Literal for conditional edge targets (type-safe routing),
  StateUpdate alias for partial state dicts.
- **functools.wraps**: Preserved in log_async and in custom retry decorator
  so __name__ and docstrings are correct for debugging and LangGraph.
"""

import logging
from typing import Any, Literal, Callable
from functools import wraps

from langgraph.graph import StateGraph
from tenacity import (
    retry,
    retry_if_exception_type,
    stop_after_attempt,
    wait_exponential,
)

from job_writing_agent.classes import (
    DataLoadState,
    CompanyResearchData,
    DataLoadingNodes,
    AgentWorkflowNodes,
)
from job_writing_agent.nodes.graph_interrupt import GraphInterrupt
from job_writing_agent.nodes.resume_loader import ResumeLoader
from job_writing_agent.nodes.job_description_loader import JobDescriptionLoader
from job_writing_agent.nodes.system_initializer import SystemInitializer
from job_writing_agent.utils.document_processing import analyze_candidate_job_fit
from job_writing_agent.utils.logging.logging_decorators import log_async

logger = logging.getLogger(__name__)

# Type alias for LangGraph partial state updates (nodes return dicts that get merged).
StateUpdate = dict[str, Any]

# Retry config for I/O-bound load nodes (resume, job description).
_LOAD_RETRY_ATTEMPTS = 3
_LOAD_RETRY_MIN_WAIT = 1
_LOAD_RETRY_MAX_WAIT = 10


def _get_cr(state: DataLoadState) -> CompanyResearchData:
    """Safe access to company_research_data; returns empty instance if missing."""
    return state.company_research_data or CompanyResearchData()


def retry_async_load(func: Callable) -> Callable:
    """
    Decorator: retry async load functions with exponential backoff.

    Uses tenacity for retries. Apply below @log_async so we log once per
    logical invocation; retries happen inside. Preserves __name__ etc. via wraps.
    """

    @retry(
        stop=stop_after_attempt(_LOAD_RETRY_ATTEMPTS),
        wait=wait_exponential(
            multiplier=1, min=_LOAD_RETRY_MIN_WAIT, max=_LOAD_RETRY_MAX_WAIT
        ),
        retry=retry_if_exception_type((OSError, ConnectionError, TimeoutError)),
        reraise=True,
    )
    @wraps(func)
    async def wrapper(state: DataLoadState) -> StateUpdate:
        return await func(state)

    return wrapper


graph_interrupt = GraphInterrupt()


# ============================================================================
# Data Loading Subgraph Node Functions
# ============================================================================


@log_async
async def set_agent_system_message_node(state: DataLoadState) -> StateUpdate:
    """
    Node function to initialize system message in workflow state.

    This node wraps the SystemInitializer.set_agent_system_message method
    for use in the LangGraph workflow.

    Parameters
    ----------
    state: DataLoadState
        Current workflow state.

    Returns
    -------
    StateUpdate
        Partial state update with system message added to messages list.
    """
    initializer = SystemInitializer()
    return await initializer.set_agent_system_message(state)


@log_async
@retry_async_load
async def load_resume_node(state: DataLoadState) -> StateUpdate:
    """
    Load the resume from the configured source (file path). Runs before job description parsing.

    Returns only the resume data; LangGraph merges this update with state.
    Retries on OSError/ConnectionError/TimeoutError (tenacity).

    Parameters
    ----------
    state: DataLoadState
        Current workflow state containing resume_path.

    Returns
    -------
    StateUpdate
        Partial state update with resume data in company_research_data.
    """
    resume_src = state.workflow_inputs.resume_file_path_
    if not resume_src:
        resume_text = ""
    else:
        loader = ResumeLoader()
        resume_text = await loader.load_resume(resume_src)
    logger.info(f"Resume loaded: {len(resume_text)} characters")
    cr = _get_cr(state)
    return {"company_research_data": cr.model_copy(update={"resume": resume_text})}


def prompt_user_for_resume_node(state: DataLoadState) -> StateUpdate:
    """
    Prompt user to provide resume manually via chat (paste text).

    Used when resume extraction failed or no path was given. The workflow
    pauses; the frontend shows the interrupt payload so the user can paste
    their resume. The client resumes with Command(resume=user_input). If the
    user sends empty input, we return {} so the router sends execution back here.
    """
    logger.info("Resume missing or empty, prompting user to paste resume via chat")
    return graph_interrupt.request_input_for_field(
        state, "resume", "Please paste your resume in text format:", "resume"
    )


def prompt_user_for_job_description_node(state: DataLoadState) -> StateUpdate:
    """
    Prompt user to provide job description manually via chat (paste text).

    Used when job description extraction failed or no URL was given. The workflow
    pauses; the frontend shows the interrupt so the user can paste the job
    description. The client resumes with Command(job_description=user_input).
    If the user sends empty input, we return {} so the router sends execution back here.
    """
    logger.info("Job description missing or empty, prompting user to paste via chat")
    return graph_interrupt.request_input_for_field(
        state, "job_description", "Please paste the job description:", "job description"
    )


def route_after_resume_load(
    state: DataLoadState,
) -> Literal["prompt_user_for_resume", "load_job_description"]:
    """After load_resume: if resume is empty, go to prompt_user_for_resume; else go to load_job_description."""
    cr = state.company_research_data
    resume = (cr.resume if cr else "") or ""
    resume = str(resume).strip()
    if not resume:
        logger.info("Resume is empty, routing to prompt_user_for_resume")
        return "prompt_user_for_resume"
    logger.info("Resume is present, routing to load_job_description")
    return "load_job_description"


def route_after_job_load(
    state: DataLoadState,
) -> Literal["prompt_user_for_job_description", "candidate_job_fit_analysis"]:
    """After load_job_description: if job_description is empty, go to prompt_user_for_job_description; else go to candidate_job_fit_analysis."""
    cr = state.company_research_data
    job_desc = (cr.job_description if cr else "") or ""
    job_desc = str(job_desc).strip()
    if not job_desc:
        logger.info(
            "Job description is empty, routing to prompt_user_for_job_description"
        )
        return "prompt_user_for_job_description"
    logger.info("Job description is present, routing to candidate_job_fit_analysis")
    return "candidate_job_fit_analysis"


@log_async
@retry_async_load
async def load_job_description_node(state: DataLoadState) -> StateUpdate:
    """
    Load the job description from the configured URL. Runs after resume is loaded or provided via interrupt.

    Returns job description and company name in company_research_data; LangGraph merges this update with state.
    Retries on OSError/ConnectionError/TimeoutError (tenacity).

    Parameters
    ----------
    state: DataLoadState
        Current workflow state containing job_description_url_.

    Returns
    -------
    StateUpdate
        Partial state update with job description and company name in company_research_data.
    """
    jd_src = state.workflow_inputs.job_description_url_
    if not jd_src:
        job_text = ""
        company_name = ""
    else:
        loader = JobDescriptionLoader()
        job_text, company_name = await loader.load_job_description(jd_src)
    cr = _get_cr(state)
    resume_text = cr.resume or ""

    logger.info(
        f"Job description loaded: {len(job_text)} characters, company: {company_name}"
    )
    return {
        "company_research_data": cr.model_copy(
            update={
                "resume": resume_text,
                "job_description": job_text,
                "company_name": company_name,
            }
        )
    }


@log_async
async def candidate_job_fit_analysis_node(state: DataLoadState) -> StateUpdate:
    """
    Analyze candidate-job fit using DSPy after resume and job description are loaded.

    Uses the resume and job description to generate actionable insights
    for downstream content generation (cover letter, bullets, LinkedIn note).

    Parameters
    ----------
    state: DataLoadState
        Current workflow state with resume and job description loaded.

    Returns
    -------
    StateUpdate
        Partial state update with candidate_job_fit_analysis in company_research_data
        and next_node set to NodeName.RESEARCH for main graph routing.
    """
    cr = _get_cr(state)
    resume_text = cr.resume or ""
    job_description = cr.job_description or ""
    company_name = cr.company_name or ""

    # Validate inputs (should always pass due to routing, but log if not)
    if not resume_text.strip():
        logger.warning("Resume is empty in candidate_job_fit_analysis_node")
    if not job_description.strip():
        logger.warning("Job description is empty in candidate_job_fit_analysis_node")

    analysis = await analyze_candidate_job_fit(
        resume_text=resume_text,
        job_description=job_description,
        company_name=company_name,
    )

    logger.info("Candidate-job fit analysis node completed")

    return {
        "company_research_data": cr.model_copy(
            update={"candidate_job_fit_analysis": analysis}
        ),
        "next_node": AgentWorkflowNodes.RESEARCH,
    }


# ============================================================================
# Data Loading Subgraph Definition
# ============================================================================

N = DataLoadingNodes  # Shorthand for graph construction

data_loading_subgraph = StateGraph(DataLoadState)

data_loading_subgraph.add_node(
    N.SET_AGENT_SYSTEM_MESSAGE.value, set_agent_system_message_node
)
data_loading_subgraph.add_node(N.LOAD_RESUME.value, load_resume_node)
data_loading_subgraph.add_node(N.LOAD_JOB_DESCRIPTION.value, load_job_description_node)
data_loading_subgraph.add_node(
    N.PROMPT_USER_FOR_RESUME.value, prompt_user_for_resume_node
)
data_loading_subgraph.add_node(
    N.PROMPT_USER_FOR_JOB_DESCRIPTION.value, prompt_user_for_job_description_node
)
data_loading_subgraph.add_node(
    N.CANDIDATE_JOB_FIT_ANALYSIS.value, candidate_job_fit_analysis_node
)

data_loading_subgraph.set_entry_point(N.SET_AGENT_SYSTEM_MESSAGE.value)
data_loading_subgraph.set_finish_point(N.CANDIDATE_JOB_FIT_ANALYSIS.value)
data_loading_subgraph.add_edge(N.SET_AGENT_SYSTEM_MESSAGE.value, N.LOAD_RESUME.value)
data_loading_subgraph.add_conditional_edges(
    N.LOAD_RESUME.value,
    route_after_resume_load,
    {
        N.PROMPT_USER_FOR_RESUME.value: N.PROMPT_USER_FOR_RESUME.value,
        N.LOAD_JOB_DESCRIPTION.value: N.LOAD_JOB_DESCRIPTION.value,
    },
)
data_loading_subgraph.add_edge(
    N.PROMPT_USER_FOR_RESUME.value, N.LOAD_JOB_DESCRIPTION.value
)
data_loading_subgraph.add_conditional_edges(
    N.LOAD_JOB_DESCRIPTION.value,
    route_after_job_load,
    {
        N.PROMPT_USER_FOR_JOB_DESCRIPTION.value: N.PROMPT_USER_FOR_JOB_DESCRIPTION.value,
        N.CANDIDATE_JOB_FIT_ANALYSIS.value: N.CANDIDATE_JOB_FIT_ANALYSIS.value,
    },
)
data_loading_subgraph.add_edge(
    N.PROMPT_USER_FOR_JOB_DESCRIPTION.value, N.CANDIDATE_JOB_FIT_ANALYSIS.value
)

data_loading_workflow = data_loading_subgraph.compile(name="Data Load Subgraph")