import logging import json import uuid import datetime import sqlite3 from typing import List, Dict, Any from typing_extensions import TypedDict from langchain_core.documents import Document from langgraph.graph import StateGraph, END from langgraph.checkpoint.sqlite import SqliteSaver # Import all our custom agent and service classes from insucompass.core.agents.profile_agent import profile_builder from insucompass.core.agents.plan_agent import planner from insucompass.core.agents.query_trasformer import QueryTransformationAgent from insucompass.core.agents.router_agent import router from insucompass.services.ingestion_service import IngestionService from insucompass.core.agents.search_agent import searcher from insucompass.core.agents.advisor_agent import advisor from insucompass.services import llm_provider from insucompass.prompts.prompt_loader import load_prompt from insucompass.services.vector_store import vector_store_service llm = llm_provider.get_gemini_llm() retriever = vector_store_service.get_retriever() transformer = QueryTransformationAgent(llm, retriever) ingestor = IngestionService() # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # --- Unified LangGraph State Definition --- class AgentState(TypedDict): user_profile: Dict[str, Any] user_message: str conversation_history: List[str] is_profile_complete: bool standalone_question: str documents: List[Document] is_relevant: bool generation: str plan_recommendations: Dict[str, Any] # --- Graph Nodes --- def profile_builder_node(state: AgentState) -> Dict[str, Any]: """A single turn of the profile building conversation.""" logger.info("---NODE: PROFILE BUILDER---") profile = state["user_profile"] message = state["user_message"] history = state.get("conversation_history", []) if message == "START_PROFILE_BUILDING": agent_response = profile_builder.get_next_question(profile, []) new_history = [f"Agent: {agent_response}"] return {"conversation_history": new_history, "generation": agent_response, "user_profile": profile, "is_profile_complete": False} last_question = history[-1][len("Agent: "):] if history and history[-1].startswith("Agent:") else "" updated_profile = profile_builder.update_profile_with_answer(profile, last_question, message) agent_response = profile_builder.get_next_question(updated_profile, history + [f"User: {message}"]) new_history = history + [f"User: {message}", f"Agent: {agent_response}"] if agent_response == "PROFILE_COMPLETE": logger.info("Profile building complete.") return {"user_profile": updated_profile, "is_profile_complete": True, "conversation_history": new_history, "generation": "PROFILE_COMPLETE_TRANSITION"} # final_message = "Great! Your profile is complete. How can I help you with your health insurance questions?" # new_history[-1] = f"Agent: {final_message}" # Replace "PROFILE_COMPLETE" # return {"user_profile": updated_profile, "is_profile_complete": True, "conversation_history": new_history, "generation": final_message} return {"user_profile": updated_profile, "is_profile_complete": False, "conversation_history": new_history, "generation": agent_response} def plan_recommender_node(state: AgentState) -> Dict[str, Any]: """ (NEW NODE) Generates initial plan recommendations after profile is complete. """ logger.info("---NODE: PLAN RECOMMENDER---") user_profile = state["user_profile"] # # Formulate a generic query to retrieve a broad set of plan documents # initial_plan_query = f"Find general health insurance plan options available in {user_profile.get('state', 'the US')} suitable for a {user_profile.get('age')}-year-old." # # Retrieve documents # documents = transformer.transform_and_retrieve(initial_plan_query) # # Generate structured recommendations # recommendations = planner.generate_recommendations(user_profile, documents) logger.info(f"User Profile: {user_profile}") # 1. Formulate a search query to find up-to-date plans # ------------ A) Capture today's date in a friendly format ------------ current_date = datetime.date.today().strftime("%B %d %Y") # e.g., "July 21 2025" # ------------ B) Tavern / Tavily search-query template ------------ query_template = ( '"Individual health-insurance plans" ' '"{county} County {state} {zip_code}" "{current_date}" ' '"{age}-year-old" "household size {household_size}" "income {income}" ' '"employment {employment_status}" "citizenship {citizenship}" ' '"medical history {medical_history}" "formulary {medications}" "{special_cases}" ' '"ACA marketplace OR Healthcare.gov OR CMS Landscape File" ' '"off-exchange OR private health insurance OR direct-to-consumer plan OR short-term medical" ' '"premium" "deductible" "out-of-pocket max" "metal tier bronze silver gold platinum" ' '"network HMO PPO EPO" "Summary of Benefits and Coverage" "CMS 2025 final rate filings"' ) # ------------ C) Inject user-specific values ------------ search_query = query_template.format(current_date=current_date, **user_profile) logger.info(f"Plan Recommender search query: '{search_query}'") # 2. Use the SearchAgent to get live, up-to-date information documents = searcher.search(search_query) # Optional: Ingest these newly found documents for future reference if documents: ingestor.ingest_documents(documents) # 3. Generate structured recommendations from the live search results recommendations = planner.generate_recommendations(user_profile, documents) # Create a human-friendly message to present the plans generation_message = "Great, your profile is all set! I've prepared a few initial plan recommendations just for you. \n Ask me anything about these plans or any questions you have related to Health Insurance!" history = state["conversation_history"] # history.append(f"Agent: {generation_message}") history[-1] = f"Agent: {generation_message}" return { "plan_recommendations": recommendations, "generation": generation_message, "conversation_history": history } def reformulate_query_node(state: AgentState) -> Dict[str, Any]: """Reformulates the user's question to be self-contained.""" logger.info("---NODE: REFORMULATE QUERY---") question = state["user_message"] history = state["conversation_history"] user_profile = state["user_profile"] profile_str = str(user_profile) prompt = load_prompt("query_reformulator") history_str = "\n".join(history) full_prompt = f"{prompt}\n\n### User Profile\n{profile_str}\n\n### Conversation History:\n{history_str}\n\n### Follow-up Question:\n{question}" response = llm.invoke(full_prompt) standalone_question = response.content.strip() return {"standalone_question": standalone_question} def retrieve_and_grade_node(state: AgentState) -> Dict[str, Any]: """Retrieves documents and grades them.""" logger.info("---NODE: RETRIEVE & GRADE---") standalone_question = state["standalone_question"] documents = transformer.transform_and_retrieve(standalone_question) is_relevant = router.grade_documents(standalone_question, documents) return {"documents": documents, "is_relevant": is_relevant} def search_and_ingest_node(state: AgentState) -> Dict[str, Any]: """Searches the web and ingests new info.""" logger.info("---NODE: SEARCH & INGEST---") web_documents = searcher.search(state["standalone_question"]) if web_documents: ingestor.ingest_documents(web_documents) return {} def generate_answer_node(state: AgentState) -> Dict[str, Any]: """Generates the final answer.""" logger.info("---NODE: GENERATE ADVISOR RESPONSE---") generation = advisor.generate_response( state["standalone_question"], state["user_profile"], state["documents"] ) history = state["conversation_history"] + [f"User: {state['user_message']}", f"Agent: {generation}"] return {"generation": generation, "conversation_history": history} # --- Conditional Edges --- def should_search_web(state: AgentState) -> str: return "search" if not state["is_relevant"] else "generate" # (CORRECTED) This is the function for the entry point conditional edge def decide_entry_point(state: AgentState) -> str: """Decides the initial path based on profile completion status.""" logger.info("---ROUTING: ENTRY POINT---") if state.get("is_profile_complete"): return "qna" else: return "profile" def after_profile_build(state: AgentState) -> str: """Checks if the profile was just completed.""" if state.get("is_profile_complete"): logger.info(">>> Route: Profile just completed. Transitioning to Plan Recommender.") return "recommend_plans" else: logger.info(">>> Route: Profile not yet complete. Ending turn.") return "end_turn" # --- Build the Graph --- db_connection = sqlite3.connect("data/checkpoints.db", check_same_thread=False) memory = SqliteSaver(db_connection) builder = StateGraph(AgentState) # (CORRECTED) Removed the faulty entry_router_node builder.add_node("profile_builder", profile_builder_node) builder.add_node("plan_recommender", plan_recommender_node) builder.add_node("reformulate_query", reformulate_query_node) builder.add_node("retrieve_and_grade", retrieve_and_grade_node) builder.add_node("search_and_ingest", search_and_ingest_node) builder.add_node("generate_answer", generate_answer_node) # (CORRECTED) Set a conditional entry point builder.set_conditional_entry_point( decide_entry_point, { "profile": "profile_builder", "qna": "reformulate_query" } ) # (CRITICAL FIX) Add a conditional edge after the profile builder builder.add_conditional_edges( "profile_builder", after_profile_build, { "recommend_plans": "plan_recommender", "end_turn": END } ) # Define graph edges builder.add_edge("profile_builder", END) # A profile turn is one full loop. The state is saved, and the next call will re-evaluate at the entry point. builder.add_edge("plan_recommender", END) builder.add_edge("reformulate_query", "retrieve_and_grade") builder.add_conditional_edges("retrieve_and_grade", should_search_web, {"search": "search_and_ingest", "generate": "generate_answer"}) builder.add_edge("search_and_ingest", "generate_answer") builder.add_edge("generate_answer", END) app = builder.compile(checkpointer=memory) # --- Interactive Test Harness (CORRECTED) --- # if __name__ == '__main__': # print("--- InsuCompass AI Unified Orchestrator Interactive Test ---") # print("Type 'quit' at any time to exit.") # test_thread_id = f"interactive-test-{uuid.uuid4()}" # thread_config = {"configurable": {"thread_id": test_thread_id}} # print(f"Using conversation thread_id: {test_thread_id}") # # Initial state for a new user # current_state = { # "user_profile": { # "zip_code": "90210", "county": "Los Angeles", "state": "California", "state_abbreviation": "CA", # "age": 45, "gender": "Male", "household_size": 2, "income": 120000, # "employment_status": "employed_with_employer_coverage", "citizenship": "US Citizen", # "medical_history": None, "medications": None, "special_cases": None # }, # "user_message": "START_PROFILE_BUILDING", # "is_profile_complete": False, # "conversation_history": [], # } # while True: # print("\n" + "="*20 + " INVOKING GRAPH " + "="*20) # print(f"Sending message: '{current_state['user_message']}'") # # The graph is invoked with the current state # final_state = app.invoke(current_state, config=thread_config) # # Update our local state from the graph's final output # current_state = final_state # agent_response = current_state["generation"] # print(f"\nInsuCompass Agent: {agent_response}") # # Get the next input from the user # if current_state["is_profile_complete"]: # # If the last response was the completion message, prompt for a question # if "profile is complete" in agent_response: # next_message = input("Your Question > ") # else: # It was a Q&A response, so prompt for another question # next_message = input("Your Follow-up Question > ") # else: # next_message = input("Your Answer > ") # if next_message.lower() == 'quit': # print("Exiting test.") # break # # Prepare the state for the next turn # current_state["user_message"] = next_message