TalentTalkPro / backend /app /agents /interview_graph.py
Param2121's picture
Refactor: Move project to root of repository
3abfc90
Raw
History Blame Contribute Delete
7.4 kB
from typing import TypedDict, List, Dict, Any, Optional
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from app.services.gemini_service import gemini_service
from app.core.logging_config import logger
class InterviewState(TypedDict):
# Chat history
messages: List[BaseMessage]
history: List[str]
# State tracking
current_question: Optional[str]
current_question_num: int
total_questions: int
follow_up_count: int # Current follow-ups for this question
max_follow_ups: int # Max allowed
# Context
target_company: str
interview_style: str
job_role: str
difficulty: str
topic: str
resume_text: Optional[str] # New field
# Results
analysis_data: List[Dict[str, Any]]
final_report: Optional[str]
# --- Nodes ---
async def generate_question_node(state: InterviewState):
"""Node: Generates the next question or ends interview."""
logger.info(f"Generating question {state['current_question_num'] + 1}/{state['total_questions']}")
question = await gemini_service.generate_question(
target_company=state["target_company"],
interview_style=state["interview_style"],
job_role=state["job_role"],
difficulty=state["difficulty"],
topic=state["topic"],
question_num=state["current_question_num"] + 1,
total_questions=state["total_questions"],
history=state["history"],
resume_text=state.get("resume_text")
)
# Update state
state["current_question"] = question
state["current_question_num"] += 1
# Add to message history (as AI)
state["messages"].append(AIMessage(content=question))
# Reset follow-up count for new question
state["follow_up_count"] = 0
return state
async def generate_follow_up_node(state: InterviewState):
"""Node: Generates a follow-up question."""
logger.info("Generating Follow-up Question...")
last_user_msg = state["messages"][-1]
last_answer = last_user_msg.content if isinstance(last_user_msg, HumanMessage) else ""
question = await gemini_service.generate_followup_question(
target_company=state["target_company"],
question=state["current_question"],
answer=last_answer
)
# Update state
state["current_question"] = question
# Do NOT increment current_question_num, as it's the same topic
state["follow_up_count"] += 1
# Add to message history
state["messages"].append(AIMessage(content=question))
return state
async def analyze_answer_node(state: InterviewState):
"""Node: Analyzes the user's latest response."""
last_message = state["messages"][-1]
if not isinstance(last_message, HumanMessage):
# Should not happen in normal flow
return state
user_answer = last_message.content
logger.info("Analyzing user answer...")
analysis = await gemini_service.analyze_response(
question=state["current_question"],
answer=user_answer,
job_role=state["job_role"],
difficulty=state["difficulty"]
)
# Append analysis to list
if "analysis_data" not in state:
state["analysis_data"] = []
# Store complete analysis object
analysis_record = {
"question": state["current_question"],
"answer": user_answer,
"analysis": analysis,
"question_num": state["current_question_num"]
}
state["analysis_data"].append(analysis_record)
# Add context to history for the next question generator
# We include a brief summary so the AI knows how the user did, but not the full JSON
feedback_short = f"Question: {state['current_question']}\nAnswer: {user_answer}\nFeedback: {analysis.get('feedback', '')}"
state["history"].append(feedback_short)
# Adaptive Difficulty Logic
# If strongly positive, increase difficulty. If negative, decrease.
# Simple implementation for now.
score = analysis.get("sentiment_score", 0)
current_diff = state["difficulty"]
if score > 0.7 and current_diff == "Easy":
state["difficulty"] = "Medium"
elif score > 0.8 and current_diff == "Medium":
state["difficulty"] = "Hard"
elif score < 0.3 and current_diff == "Hard":
state["difficulty"] = "Medium"
elif score < 0.2 and current_diff == "Medium":
state["difficulty"] = "Easy"
return state
async def generate_report_node(state: InterviewState):
"""Node: Generates the final report after all questions."""
logger.info("Generating Final Report...")
# Prepare data for the prompt
interview_data_str = json.dumps(state["analysis_data"], indent=2)
report = await gemini_service.generate_final_report(
target_company=state["target_company"],
job_role=state["job_role"],
interview_data=interview_data_str
)
state["final_report"] = report
return state
import json
# --- Routing ---
def route_interview(state: InterviewState):
"""Decides whether to continue questioning, follow-up, or end."""
# 1. Check if we should ask a follow-up
if state.get("follow_up_count", 0) < state.get("max_follow_ups", 0):
return "generate_follow_up"
if state["current_question_num"] >= state["total_questions"]:
return "generate_report"
return "generate_question"
# --- Graph Definition ---
workflow = StateGraph(InterviewState)
workflow.add_node("generate_question", generate_question_node)
workflow.add_node("generate_follow_up", generate_follow_up_node)
workflow.add_node("analyze_answer", analyze_answer_node)
workflow.add_node("generate_report", generate_report_node)
# Entry point
workflow.set_entry_point("generate_question")
# Transition from Question extraction -> Wait for user input
# NOTE: In a real API, we would pause here.
# For this graph, we assume the HumanMessage is injected into state
# externally before resuming.
# BUT `StateGraph` in basic form runs until END or interrupt.
# Since we are building an API, we will likely run one step at a time or use `interrupt`.
# For MVP simplicity:
# The "cycle" is: Generate Question -> END (Return to user) -> (User calls API) -> Analyze Answer -> Route
# However, to visualize the logic:
# generate_question -> END (user sees question)
# ... User inputs answer ...
# (Resume with answer) -> analyze_answer -> route -> generate_question/report
# We will define the edge from analyze to route
workflow.add_conditional_edges(
"analyze_answer",
route_interview,
{
"generate_question": "generate_question",
"generate_follow_up": "generate_follow_up",
"generate_report": "generate_report"
}
)
workflow.add_edge("generate_report", END)
# We define the edge that "ends" a turn to wait for user input.
# In LangGraph terms, `generate_question` finishes, and we return state to the caller.
# The caller (FastAPI) will persist state.
# When user replies, we invoke `analyze_answer` directly?
# OR we define the full loop and use `interrupt_before`.
# Let's use the explicit loop for clarity and compilation,
# but at runtime we might use it differently.
# Ideally: generate_question -> END.
# Then user submits answer -> analyze_answer -> check condition.