"""JobWorkflow orchestrator for the job application writer.""" import logging import os from datetime import datetime from typing import Any from langchain_core.runnables import RunnableConfig from langchain_core.tracers import ConsoleCallbackHandler, LangChainTracer from job_writing_agent.classes import DataLoadState, node_name logger = logging.getLogger(__name__) class JobWorkflow: """ Workflow orchestrator for the job application writer. The workflow consists of: 1. Data Loading: Parse resume and job description (parallel subgraph) 2. Research: Company research and relevance filtering (subgraph) 3. Draft Creation: Generate initial application material 4. Critique: AI-powered feedback on the draft 5. Human Approval: User feedback collection 6. Finalization: Incorporate feedback and produce final output """ current_time: datetime = datetime.now() thread_id: str = f"job_workflow_session_{current_time:%Y%m%d%H%M%S}" timestamp: str = current_time.strftime("%Y%m%d-%H%M%S") def __init__(self, resume: str, job_description_source: str, content: str): """ Initialize the JobWorkflow orchestrator. Parameters ---------- resume: str Path to the resume file or resume text. job_description_source: str URL, file path, or text content of the job description. content: str Type of application material to generate ("cover_letter", "bullets", "linkedin_note"). """ self.resume = resume self.job_description_source = job_description_source self.content = content def __repr__(self) -> str: return ( f"JobWorkflow(resume={self.resume!r}, " f"job_description_source={self.job_description_source!r}, " f"content={self.content!r})" ) def _build_initial_workflow_state(self) -> DataLoadState: """ Get the initial application state for the workflow. Returns ------- DataLoadState Initialized state dictionary with resume path, job description source, content type, and empty messages list. """ return { "resume_path": self.resume, "job_description_source": self.job_description_source, "content_category": self.content, "current_node": "", "messages": [], "company_research_data": {}, } def _get_callbacks(self) -> list[Any]: """ Get list of callbacks including LangSmith tracer with metadata. This method creates callback handlers for LangGraph execution, including LangSmith tracing with workflow-level metadata and tags for better observability and filtering in the LangSmith UI. Returns ------- list List of callback handlers for LangGraph execution, including: - ConsoleCallbackHandler: Console output - LangChainTracer: LangSmith tracing (if enabled) """ callbacks: list[Any] = [ConsoleCallbackHandler()] if os.getenv("LANGSMITH_TRACING", "").lower() == "true": try: langsmith_tracer = LangChainTracer( project_name=os.getenv( "LANGSMITH_PROJECT", "job_application_writer" ) ) callbacks.append(langsmith_tracer) logger.info("Enabled LangSmith Tracing...") except Exception as exc: logger.warning( "Failed to initialize LangSmith tracer: %s. Continuing without tracing.", exc, ) else: logger.debug( "LangSmith tracing is not enabled (set environment variable LANGSMITH_TRACING to 'true')" ) return callbacks def _build_runnable_config(self) -> RunnableConfig: """ Build RunnableConfig with LangSmith tracing metadata. Creates a config with workflow-specific tags, metadata, and callbacks for comprehensive observability across all LLM calls. Returns ------- RunnableConfig Configured for LangSmith tracing with content-specific metadata. """ return { "configurable": {"thread_id": self.thread_id}, "callbacks": self._get_callbacks(), "run_name": f"JobAppWorkflow.{self.content}.{self.timestamp}", "metadata": { "workflow": "job_application_writer", "content_type": self.content, "session_id": self.thread_id, }, "tags": ["job-application-workflow", self.content], "recursion_limit": 10, } async def run_workflow(self) -> dict[str, Any] | None: """ Execute the complete job application writer workflow. This method compiles the graph, configures LangSmith tracing with enhanced metadata, and executes the workflow. It handles errors gracefully and returns the final state or None if execution fails. Returns ------- Optional[Dict[str, Any]] Final workflow state containing the generated application material in the "output_data" field, or None if execution fails. """ from job_writing_agent.graph import job_app_graph try: compiled_graph = job_app_graph except Exception as exc: logger.error("Error compiling graph: %s", exc, exc_info=True) return None initial_workflow_state = self._build_initial_workflow_state() run_name = f"JobAppWorkflow.{self.content}.{self.timestamp}" config: RunnableConfig = self._build_runnable_config() try: initial_workflow_state["current_node"] = node_name.LOAD logger.info( "Starting workflow execution: %s (content_type=%s, session_id=%s)", run_name, self.content, self.thread_id, ) graph_output = await compiled_graph.ainvoke( initial_workflow_state, config=config, ) logger.info("Workflow execution completed successfully") return graph_output except Exception as exc: logger.error("Error running graph: %s", exc, exc_info=True) return None