Spaces:
Sleeping
Sleeping
| from typing import TypedDict, List, Dict, Any, Optional | |
| from langgraph.graph import StateGraph, END | |
| from langchain_core.messages import BaseMessage, HumanMessage, AIMessage | |
| from app.services.gemini_service import gemini_service | |
| from app.core.logging_config import logger | |
| class InterviewState(TypedDict): | |
| # Chat history | |
| messages: List[BaseMessage] | |
| history: List[str] | |
| # State tracking | |
| current_question: Optional[str] | |
| current_question_num: int | |
| total_questions: int | |
| follow_up_count: int # Current follow-ups for this question | |
| max_follow_ups: int # Max allowed | |
| # Context | |
| target_company: str | |
| interview_style: str | |
| job_role: str | |
| difficulty: str | |
| topic: str | |
| resume_text: Optional[str] # New field | |
| # Results | |
| analysis_data: List[Dict[str, Any]] | |
| final_report: Optional[str] | |
| # --- Nodes --- | |
| async def generate_question_node(state: InterviewState): | |
| """Node: Generates the next question or ends interview.""" | |
| logger.info(f"Generating question {state['current_question_num'] + 1}/{state['total_questions']}") | |
| question = await gemini_service.generate_question( | |
| target_company=state["target_company"], | |
| interview_style=state["interview_style"], | |
| job_role=state["job_role"], | |
| difficulty=state["difficulty"], | |
| topic=state["topic"], | |
| question_num=state["current_question_num"] + 1, | |
| total_questions=state["total_questions"], | |
| history=state["history"], | |
| resume_text=state.get("resume_text") | |
| ) | |
| # Update state | |
| state["current_question"] = question | |
| state["current_question_num"] += 1 | |
| # Add to message history (as AI) | |
| state["messages"].append(AIMessage(content=question)) | |
| # Reset follow-up count for new question | |
| state["follow_up_count"] = 0 | |
| return state | |
| async def generate_follow_up_node(state: InterviewState): | |
| """Node: Generates a follow-up question.""" | |
| logger.info("Generating Follow-up Question...") | |
| last_user_msg = state["messages"][-1] | |
| last_answer = last_user_msg.content if isinstance(last_user_msg, HumanMessage) else "" | |
| question = await gemini_service.generate_followup_question( | |
| target_company=state["target_company"], | |
| question=state["current_question"], | |
| answer=last_answer | |
| ) | |
| # Update state | |
| state["current_question"] = question | |
| # Do NOT increment current_question_num, as it's the same topic | |
| state["follow_up_count"] += 1 | |
| # Add to message history | |
| state["messages"].append(AIMessage(content=question)) | |
| return state | |
| async def analyze_answer_node(state: InterviewState): | |
| """Node: Analyzes the user's latest response.""" | |
| last_message = state["messages"][-1] | |
| if not isinstance(last_message, HumanMessage): | |
| # Should not happen in normal flow | |
| return state | |
| user_answer = last_message.content | |
| logger.info("Analyzing user answer...") | |
| analysis = await gemini_service.analyze_response( | |
| question=state["current_question"], | |
| answer=user_answer, | |
| job_role=state["job_role"], | |
| difficulty=state["difficulty"] | |
| ) | |
| # Append analysis to list | |
| if "analysis_data" not in state: | |
| state["analysis_data"] = [] | |
| # Store complete analysis object | |
| analysis_record = { | |
| "question": state["current_question"], | |
| "answer": user_answer, | |
| "analysis": analysis, | |
| "question_num": state["current_question_num"] | |
| } | |
| state["analysis_data"].append(analysis_record) | |
| # Add context to history for the next question generator | |
| # We include a brief summary so the AI knows how the user did, but not the full JSON | |
| feedback_short = f"Question: {state['current_question']}\nAnswer: {user_answer}\nFeedback: {analysis.get('feedback', '')}" | |
| state["history"].append(feedback_short) | |
| # Adaptive Difficulty Logic | |
| # If strongly positive, increase difficulty. If negative, decrease. | |
| # Simple implementation for now. | |
| score = analysis.get("sentiment_score", 0) | |
| current_diff = state["difficulty"] | |
| if score > 0.7 and current_diff == "Easy": | |
| state["difficulty"] = "Medium" | |
| elif score > 0.8 and current_diff == "Medium": | |
| state["difficulty"] = "Hard" | |
| elif score < 0.3 and current_diff == "Hard": | |
| state["difficulty"] = "Medium" | |
| elif score < 0.2 and current_diff == "Medium": | |
| state["difficulty"] = "Easy" | |
| return state | |
| async def generate_report_node(state: InterviewState): | |
| """Node: Generates the final report after all questions.""" | |
| logger.info("Generating Final Report...") | |
| # Prepare data for the prompt | |
| interview_data_str = json.dumps(state["analysis_data"], indent=2) | |
| report = await gemini_service.generate_final_report( | |
| target_company=state["target_company"], | |
| job_role=state["job_role"], | |
| interview_data=interview_data_str | |
| ) | |
| state["final_report"] = report | |
| return state | |
| import json | |
| # --- Routing --- | |
| def route_interview(state: InterviewState): | |
| """Decides whether to continue questioning, follow-up, or end.""" | |
| # 1. Check if we should ask a follow-up | |
| if state.get("follow_up_count", 0) < state.get("max_follow_ups", 0): | |
| return "generate_follow_up" | |
| if state["current_question_num"] >= state["total_questions"]: | |
| return "generate_report" | |
| return "generate_question" | |
| # --- Graph Definition --- | |
| workflow = StateGraph(InterviewState) | |
| workflow.add_node("generate_question", generate_question_node) | |
| workflow.add_node("generate_follow_up", generate_follow_up_node) | |
| workflow.add_node("analyze_answer", analyze_answer_node) | |
| workflow.add_node("generate_report", generate_report_node) | |
| # Entry point | |
| workflow.set_entry_point("generate_question") | |
| # Transition from Question extraction -> Wait for user input | |
| # NOTE: In a real API, we would pause here. | |
| # For this graph, we assume the HumanMessage is injected into state | |
| # externally before resuming. | |
| # BUT `StateGraph` in basic form runs until END or interrupt. | |
| # Since we are building an API, we will likely run one step at a time or use `interrupt`. | |
| # For MVP simplicity: | |
| # The "cycle" is: Generate Question -> END (Return to user) -> (User calls API) -> Analyze Answer -> Route | |
| # However, to visualize the logic: | |
| # generate_question -> END (user sees question) | |
| # ... User inputs answer ... | |
| # (Resume with answer) -> analyze_answer -> route -> generate_question/report | |
| # We will define the edge from analyze to route | |
| workflow.add_conditional_edges( | |
| "analyze_answer", | |
| route_interview, | |
| { | |
| "generate_question": "generate_question", | |
| "generate_follow_up": "generate_follow_up", | |
| "generate_report": "generate_report" | |
| } | |
| ) | |
| workflow.add_edge("generate_report", END) | |
| # We define the edge that "ends" a turn to wait for user input. | |
| # In LangGraph terms, `generate_question` finishes, and we return state to the caller. | |
| # The caller (FastAPI) will persist state. | |
| # When user replies, we invoke `analyze_answer` directly? | |
| # OR we define the full loop and use `interrupt_before`. | |
| # Let's use the explicit loop for clarity and compilation, | |
| # but at runtime we might use it differently. | |
| # Ideally: generate_question -> END. | |
| # Then user submits answer -> analyze_answer -> check condition. | |