Update recursion limit in job workflow and replace LLM model references with Google Gemma. Add new data loading and research subgraph modules for modularity
69e77a3 | # -*- coding: utf-8 -*- | |
| """ | |
| Data Loading Subgraph Module | |
| This module defines the data loading subgraph, including all node | |
| functions and the subgraph definition. It uses the separate loader classes | |
| (ResumeLoader, JobDescriptionLoader, SystemInitializer) following the | |
| Single Responsibility Principle. | |
| Advanced patterns used: | |
| - **Decorators**: @log_async (cross-cutting logging), @retry_async_load (tenacity | |
| retry for I/O nodes). Stack order: retry innermost, log_async outermost so we | |
| log once per logical call and retry inside. | |
| - **Type hints**: Literal for conditional edge targets (type-safe routing), | |
| StateUpdate alias for partial state dicts. | |
| - **functools.wraps**: Preserved in log_async and in custom retry decorator | |
| so __name__ and docstrings are correct for debugging and LangGraph. | |
| """ | |
| import logging | |
| from typing import Any, Literal, Callable | |
| from functools import wraps | |
| from langgraph.graph import StateGraph | |
| from tenacity import ( | |
| retry, | |
| retry_if_exception_type, | |
| stop_after_attempt, | |
| wait_exponential, | |
| ) | |
| from job_writing_agent.classes import ( | |
| DataLoadState, | |
| CompanyResearchData, | |
| DataLoadingNodes, | |
| AgentWorkflowNodes, | |
| ) | |
| from job_writing_agent.nodes.graph_interrupt import GraphInterrupt | |
| from job_writing_agent.nodes.resume_loader import ResumeLoader | |
| from job_writing_agent.nodes.job_description_loader import JobDescriptionLoader | |
| from job_writing_agent.nodes.system_initializer import SystemInitializer | |
| from job_writing_agent.utils.document_processing import analyze_candidate_job_fit | |
| from job_writing_agent.utils.logging.logging_decorators import log_async | |
| logger = logging.getLogger(__name__) | |
| # Type alias for LangGraph partial state updates (nodes return dicts that get merged). | |
| StateUpdate = dict[str, Any] | |
| # Retry config for I/O-bound load nodes (resume, job description). | |
| _LOAD_RETRY_ATTEMPTS = 3 | |
| _LOAD_RETRY_MIN_WAIT = 1 | |
| _LOAD_RETRY_MAX_WAIT = 10 | |
| def _get_cr(state: DataLoadState) -> CompanyResearchData: | |
| """Safe access to company_research_data; returns empty instance if missing.""" | |
| return state.company_research_data or CompanyResearchData() | |
| def retry_async_load(func: Callable) -> Callable: | |
| """ | |
| Decorator: retry async load functions with exponential backoff. | |
| Uses tenacity for retries. Apply below @log_async so we log once per | |
| logical invocation; retries happen inside. Preserves __name__ etc. via wraps. | |
| """ | |
| async def wrapper(state: DataLoadState) -> StateUpdate: | |
| return await func(state) | |
| return wrapper | |
| graph_interrupt = GraphInterrupt() | |
| # ============================================================================ | |
| # Data Loading Subgraph Node Functions | |
| # ============================================================================ | |
| async def set_agent_system_message_node(state: DataLoadState) -> StateUpdate: | |
| """ | |
| Node function to initialize system message in workflow state. | |
| This node wraps the SystemInitializer.set_agent_system_message method | |
| for use in the LangGraph workflow. | |
| Parameters | |
| ---------- | |
| state: DataLoadState | |
| Current workflow state. | |
| Returns | |
| ------- | |
| StateUpdate | |
| Partial state update with system message added to messages list. | |
| """ | |
| initializer = SystemInitializer() | |
| return await initializer.set_agent_system_message(state) | |
| async def load_resume_node(state: DataLoadState) -> StateUpdate: | |
| """ | |
| Load the resume from the configured source (file path). Runs before job description parsing. | |
| Returns only the resume data; LangGraph merges this update with state. | |
| Retries on OSError/ConnectionError/TimeoutError (tenacity). | |
| Parameters | |
| ---------- | |
| state: DataLoadState | |
| Current workflow state containing resume_path. | |
| Returns | |
| ------- | |
| StateUpdate | |
| Partial state update with resume data in company_research_data. | |
| """ | |
| resume_src = state.workflow_inputs.resume_file_path_ | |
| if not resume_src: | |
| resume_text = "" | |
| else: | |
| loader = ResumeLoader() | |
| resume_text = await loader.load_resume(resume_src) | |
| logger.info(f"Resume loaded: {len(resume_text)} characters") | |
| cr = _get_cr(state) | |
| return {"company_research_data": cr.model_copy(update={"resume": resume_text})} | |
| def prompt_user_for_resume_node(state: DataLoadState) -> StateUpdate: | |
| """ | |
| Prompt user to provide resume manually via chat (paste text). | |
| Used when resume extraction failed or no path was given. The workflow | |
| pauses; the frontend shows the interrupt payload so the user can paste | |
| their resume. The client resumes with Command(resume=user_input). If the | |
| user sends empty input, we return {} so the router sends execution back here. | |
| """ | |
| logger.info("Resume missing or empty, prompting user to paste resume via chat") | |
| return graph_interrupt.request_input_for_field( | |
| state, "resume", "Please paste your resume in text format:", "resume" | |
| ) | |
| def prompt_user_for_job_description_node(state: DataLoadState) -> StateUpdate: | |
| """ | |
| Prompt user to provide job description manually via chat (paste text). | |
| Used when job description extraction failed or no URL was given. The workflow | |
| pauses; the frontend shows the interrupt so the user can paste the job | |
| description. The client resumes with Command(job_description=user_input). | |
| If the user sends empty input, we return {} so the router sends execution back here. | |
| """ | |
| logger.info("Job description missing or empty, prompting user to paste via chat") | |
| return graph_interrupt.request_input_for_field( | |
| state, "job_description", "Please paste the job description:", "job description" | |
| ) | |
| def route_after_resume_load( | |
| state: DataLoadState, | |
| ) -> Literal["prompt_user_for_resume", "load_job_description"]: | |
| """After load_resume: if resume is empty, go to prompt_user_for_resume; else go to load_job_description.""" | |
| cr = state.company_research_data | |
| resume = (cr.resume if cr else "") or "" | |
| resume = str(resume).strip() | |
| if not resume: | |
| logger.info("Resume is empty, routing to prompt_user_for_resume") | |
| return "prompt_user_for_resume" | |
| logger.info("Resume is present, routing to load_job_description") | |
| return "load_job_description" | |
| def route_after_job_load( | |
| state: DataLoadState, | |
| ) -> Literal["prompt_user_for_job_description", "candidate_job_fit_analysis"]: | |
| """After load_job_description: if job_description is empty, go to prompt_user_for_job_description; else go to candidate_job_fit_analysis.""" | |
| cr = state.company_research_data | |
| job_desc = (cr.job_description if cr else "") or "" | |
| job_desc = str(job_desc).strip() | |
| if not job_desc: | |
| logger.info( | |
| "Job description is empty, routing to prompt_user_for_job_description" | |
| ) | |
| return "prompt_user_for_job_description" | |
| logger.info("Job description is present, routing to candidate_job_fit_analysis") | |
| return "candidate_job_fit_analysis" | |
| async def load_job_description_node(state: DataLoadState) -> StateUpdate: | |
| """ | |
| Load the job description from the configured URL. Runs after resume is loaded or provided via interrupt. | |
| Returns job description and company name in company_research_data; LangGraph merges this update with state. | |
| Retries on OSError/ConnectionError/TimeoutError (tenacity). | |
| Parameters | |
| ---------- | |
| state: DataLoadState | |
| Current workflow state containing job_description_url_. | |
| Returns | |
| ------- | |
| StateUpdate | |
| Partial state update with job description and company name in company_research_data. | |
| """ | |
| jd_src = state.workflow_inputs.job_description_url_ | |
| if not jd_src: | |
| job_text = "" | |
| company_name = "" | |
| else: | |
| loader = JobDescriptionLoader() | |
| job_text, company_name = await loader.load_job_description(jd_src) | |
| cr = _get_cr(state) | |
| resume_text = cr.resume or "" | |
| logger.info( | |
| f"Job description loaded: {len(job_text)} characters, company: {company_name}" | |
| ) | |
| return { | |
| "company_research_data": cr.model_copy( | |
| update={ | |
| "resume": resume_text, | |
| "job_description": job_text, | |
| "company_name": company_name, | |
| } | |
| ) | |
| } | |
| async def candidate_job_fit_analysis_node(state: DataLoadState) -> StateUpdate: | |
| """ | |
| Analyze candidate-job fit using DSPy after resume and job description are loaded. | |
| Uses the resume and job description to generate actionable insights | |
| for downstream content generation (cover letter, bullets, LinkedIn note). | |
| Parameters | |
| ---------- | |
| state: DataLoadState | |
| Current workflow state with resume and job description loaded. | |
| Returns | |
| ------- | |
| StateUpdate | |
| Partial state update with candidate_job_fit_analysis in company_research_data | |
| and next_node set to NodeName.RESEARCH for main graph routing. | |
| """ | |
| cr = _get_cr(state) | |
| resume_text = cr.resume or "" | |
| job_description = cr.job_description or "" | |
| company_name = cr.company_name or "" | |
| # Validate inputs (should always pass due to routing, but log if not) | |
| if not resume_text.strip(): | |
| logger.warning("Resume is empty in candidate_job_fit_analysis_node") | |
| if not job_description.strip(): | |
| logger.warning("Job description is empty in candidate_job_fit_analysis_node") | |
| analysis = await analyze_candidate_job_fit( | |
| resume_text=resume_text, | |
| job_description=job_description, | |
| company_name=company_name, | |
| ) | |
| logger.info("Candidate-job fit analysis node completed") | |
| return { | |
| "company_research_data": cr.model_copy( | |
| update={"candidate_job_fit_analysis": analysis} | |
| ), | |
| "next_node": AgentWorkflowNodes.RESEARCH, | |
| } | |
| # ============================================================================ | |
| # Data Loading Subgraph Definition | |
| # ============================================================================ | |
| N = DataLoadingNodes # Shorthand for graph construction | |
| data_loading_subgraph = StateGraph(DataLoadState) | |
| data_loading_subgraph.add_node( | |
| N.SET_AGENT_SYSTEM_MESSAGE.value, set_agent_system_message_node | |
| ) | |
| data_loading_subgraph.add_node(N.LOAD_RESUME.value, load_resume_node) | |
| data_loading_subgraph.add_node(N.LOAD_JOB_DESCRIPTION.value, load_job_description_node) | |
| data_loading_subgraph.add_node( | |
| N.PROMPT_USER_FOR_RESUME.value, prompt_user_for_resume_node | |
| ) | |
| data_loading_subgraph.add_node( | |
| N.PROMPT_USER_FOR_JOB_DESCRIPTION.value, prompt_user_for_job_description_node | |
| ) | |
| data_loading_subgraph.add_node( | |
| N.CANDIDATE_JOB_FIT_ANALYSIS.value, candidate_job_fit_analysis_node | |
| ) | |
| data_loading_subgraph.set_entry_point(N.SET_AGENT_SYSTEM_MESSAGE.value) | |
| data_loading_subgraph.set_finish_point(N.CANDIDATE_JOB_FIT_ANALYSIS.value) | |
| data_loading_subgraph.add_edge(N.SET_AGENT_SYSTEM_MESSAGE.value, N.LOAD_RESUME.value) | |
| data_loading_subgraph.add_conditional_edges( | |
| N.LOAD_RESUME.value, | |
| route_after_resume_load, | |
| { | |
| N.PROMPT_USER_FOR_RESUME.value: N.PROMPT_USER_FOR_RESUME.value, | |
| N.LOAD_JOB_DESCRIPTION.value: N.LOAD_JOB_DESCRIPTION.value, | |
| }, | |
| ) | |
| data_loading_subgraph.add_edge( | |
| N.PROMPT_USER_FOR_RESUME.value, N.LOAD_JOB_DESCRIPTION.value | |
| ) | |
| data_loading_subgraph.add_conditional_edges( | |
| N.LOAD_JOB_DESCRIPTION.value, | |
| route_after_job_load, | |
| { | |
| N.PROMPT_USER_FOR_JOB_DESCRIPTION.value: N.PROMPT_USER_FOR_JOB_DESCRIPTION.value, | |
| N.CANDIDATE_JOB_FIT_ANALYSIS.value: N.CANDIDATE_JOB_FIT_ANALYSIS.value, | |
| }, | |
| ) | |
| data_loading_subgraph.add_edge( | |
| N.PROMPT_USER_FOR_JOB_DESCRIPTION.value, N.CANDIDATE_JOB_FIT_ANALYSIS.value | |
| ) | |
| data_loading_workflow = data_loading_subgraph.compile(name="Data Load Subgraph") | |