Spaces:
Sleeping
Sleeping
| import logging | |
| import json | |
| import uuid | |
| import datetime | |
| import sqlite3 | |
| from typing import List, Dict, Any | |
| from typing_extensions import TypedDict | |
| from langchain_core.documents import Document | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.checkpoint.sqlite import SqliteSaver | |
| # Import all our custom agent and service classes | |
| from insucompass.core.agents.profile_agent import profile_builder | |
| from insucompass.core.agents.plan_agent import planner | |
| from insucompass.core.agents.query_trasformer import QueryTransformationAgent | |
| from insucompass.core.agents.router_agent import router | |
| from insucompass.services.ingestion_service import IngestionService | |
| from insucompass.core.agents.search_agent import searcher | |
| from insucompass.core.agents.advisor_agent import advisor | |
| from insucompass.services import llm_provider | |
| from insucompass.prompts.prompt_loader import load_prompt | |
| from insucompass.services.vector_store import vector_store_service | |
| llm = llm_provider.get_gemini_llm() | |
| retriever = vector_store_service.get_retriever() | |
| transformer = QueryTransformationAgent(llm, retriever) | |
| ingestor = IngestionService() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # --- Unified LangGraph State Definition --- | |
| class AgentState(TypedDict): | |
| user_profile: Dict[str, Any] | |
| user_message: str | |
| conversation_history: List[str] | |
| is_profile_complete: bool | |
| standalone_question: str | |
| documents: List[Document] | |
| is_relevant: bool | |
| generation: str | |
| plan_recommendations: Dict[str, Any] | |
| # --- Graph Nodes --- | |
| def profile_builder_node(state: AgentState) -> Dict[str, Any]: | |
| """A single turn of the profile building conversation.""" | |
| logger.info("---NODE: PROFILE BUILDER---") | |
| profile = state["user_profile"] | |
| message = state["user_message"] | |
| history = state.get("conversation_history", []) | |
| if message == "START_PROFILE_BUILDING": | |
| agent_response = profile_builder.get_next_question(profile, []) | |
| new_history = [f"Agent: {agent_response}"] | |
| return {"conversation_history": new_history, "generation": agent_response, "user_profile": profile, "is_profile_complete": False} | |
| last_question = history[-1][len("Agent: "):] if history and history[-1].startswith("Agent:") else "" | |
| updated_profile = profile_builder.update_profile_with_answer(profile, last_question, message) | |
| agent_response = profile_builder.get_next_question(updated_profile, history + [f"User: {message}"]) | |
| new_history = history + [f"User: {message}", f"Agent: {agent_response}"] | |
| if agent_response == "PROFILE_COMPLETE": | |
| logger.info("Profile building complete.") | |
| return {"user_profile": updated_profile, "is_profile_complete": True, "conversation_history": new_history, "generation": "PROFILE_COMPLETE_TRANSITION"} | |
| # final_message = "Great! Your profile is complete. How can I help you with your health insurance questions?" | |
| # new_history[-1] = f"Agent: {final_message}" # Replace "PROFILE_COMPLETE" | |
| # return {"user_profile": updated_profile, "is_profile_complete": True, "conversation_history": new_history, "generation": final_message} | |
| return {"user_profile": updated_profile, "is_profile_complete": False, "conversation_history": new_history, "generation": agent_response} | |
| def plan_recommender_node(state: AgentState) -> Dict[str, Any]: | |
| """ | |
| (NEW NODE) Generates initial plan recommendations after profile is complete. | |
| """ | |
| logger.info("---NODE: PLAN RECOMMENDER---") | |
| user_profile = state["user_profile"] | |
| # # Formulate a generic query to retrieve a broad set of plan documents | |
| # initial_plan_query = f"Find general health insurance plan options available in {user_profile.get('state', 'the US')} suitable for a {user_profile.get('age')}-year-old." | |
| # # Retrieve documents | |
| # documents = transformer.transform_and_retrieve(initial_plan_query) | |
| # # Generate structured recommendations | |
| # recommendations = planner.generate_recommendations(user_profile, documents) | |
| logger.info(f"User Profile: {user_profile}") | |
| # 1. Formulate a search query to find up-to-date plans | |
| # ------------ A) Capture today's date in a friendly format ------------ | |
| current_date = datetime.date.today().strftime("%B %d %Y") # e.g., "July 21 2025" | |
| # ------------ B) Tavern / Tavily search-query template ------------ | |
| query_template = ( | |
| '"Individual health-insurance plans" ' | |
| '"{county} County {state} {zip_code}" "{current_date}" ' | |
| '"{age}-year-old" "household size {household_size}" "income {income}" ' | |
| '"employment {employment_status}" "citizenship {citizenship}" ' | |
| '"medical history {medical_history}" "formulary {medications}" "{special_cases}" ' | |
| '"ACA marketplace OR Healthcare.gov OR CMS Landscape File" ' | |
| '"off-exchange OR private health insurance OR direct-to-consumer plan OR short-term medical" ' | |
| '"premium" "deductible" "out-of-pocket max" "metal tier bronze silver gold platinum" ' | |
| '"network HMO PPO EPO" "Summary of Benefits and Coverage" "CMS 2025 final rate filings"' | |
| ) | |
| # ------------ C) Inject user-specific values ------------ | |
| search_query = query_template.format(current_date=current_date, **user_profile) | |
| logger.info(f"Plan Recommender search query: '{search_query}'") | |
| # 2. Use the SearchAgent to get live, up-to-date information | |
| documents = searcher.search(search_query) | |
| # Optional: Ingest these newly found documents for future reference | |
| if documents: | |
| ingestor.ingest_documents(documents) | |
| # 3. Generate structured recommendations from the live search results | |
| recommendations = planner.generate_recommendations(user_profile, documents) | |
| # Create a human-friendly message to present the plans | |
| generation_message = "Great, your profile is all set! I've prepared a few initial plan recommendations just for you. \n Ask me anything about these plans or any questions you have related to Health Insurance!" | |
| history = state["conversation_history"] | |
| # history.append(f"Agent: {generation_message}") | |
| history[-1] = f"Agent: {generation_message}" | |
| return { | |
| "plan_recommendations": recommendations, | |
| "generation": generation_message, | |
| "conversation_history": history | |
| } | |
| def reformulate_query_node(state: AgentState) -> Dict[str, Any]: | |
| """Reformulates the user's question to be self-contained.""" | |
| logger.info("---NODE: REFORMULATE QUERY---") | |
| question = state["user_message"] | |
| history = state["conversation_history"] | |
| user_profile = state["user_profile"] | |
| profile_str = str(user_profile) | |
| prompt = load_prompt("query_reformulator") | |
| history_str = "\n".join(history) | |
| full_prompt = f"{prompt}\n\n### User Profile\n{profile_str}\n\n### Conversation History:\n{history_str}\n\n### Follow-up Question:\n{question}" | |
| response = llm.invoke(full_prompt) | |
| standalone_question = response.content.strip() | |
| return {"standalone_question": standalone_question} | |
| def retrieve_and_grade_node(state: AgentState) -> Dict[str, Any]: | |
| """Retrieves documents and grades them.""" | |
| logger.info("---NODE: RETRIEVE & GRADE---") | |
| standalone_question = state["standalone_question"] | |
| documents = transformer.transform_and_retrieve(standalone_question) | |
| is_relevant = router.grade_documents(standalone_question, documents) | |
| return {"documents": documents, "is_relevant": is_relevant} | |
| def search_and_ingest_node(state: AgentState) -> Dict[str, Any]: | |
| """Searches the web and ingests new info.""" | |
| logger.info("---NODE: SEARCH & INGEST---") | |
| web_documents = searcher.search(state["standalone_question"]) | |
| if web_documents: | |
| ingestor.ingest_documents(web_documents) | |
| return {} | |
| def generate_answer_node(state: AgentState) -> Dict[str, Any]: | |
| """Generates the final answer.""" | |
| logger.info("---NODE: GENERATE ADVISOR RESPONSE---") | |
| generation = advisor.generate_response( | |
| state["standalone_question"], state["user_profile"], state["documents"] | |
| ) | |
| history = state["conversation_history"] + [f"User: {state['user_message']}", f"Agent: {generation}"] | |
| return {"generation": generation, "conversation_history": history} | |
| # --- Conditional Edges --- | |
| def should_search_web(state: AgentState) -> str: | |
| return "search" if not state["is_relevant"] else "generate" | |
| # (CORRECTED) This is the function for the entry point conditional edge | |
| def decide_entry_point(state: AgentState) -> str: | |
| """Decides the initial path based on profile completion status.""" | |
| logger.info("---ROUTING: ENTRY POINT---") | |
| if state.get("is_profile_complete"): | |
| return "qna" | |
| else: | |
| return "profile" | |
| def after_profile_build(state: AgentState) -> str: | |
| """Checks if the profile was just completed.""" | |
| if state.get("is_profile_complete"): | |
| logger.info(">>> Route: Profile just completed. Transitioning to Plan Recommender.") | |
| return "recommend_plans" | |
| else: | |
| logger.info(">>> Route: Profile not yet complete. Ending turn.") | |
| return "end_turn" | |
| # --- Build the Graph --- | |
| db_connection = sqlite3.connect("data/checkpoints.db", check_same_thread=False) | |
| memory = SqliteSaver(db_connection) | |
| builder = StateGraph(AgentState) | |
| # (CORRECTED) Removed the faulty entry_router_node | |
| builder.add_node("profile_builder", profile_builder_node) | |
| builder.add_node("plan_recommender", plan_recommender_node) | |
| builder.add_node("reformulate_query", reformulate_query_node) | |
| builder.add_node("retrieve_and_grade", retrieve_and_grade_node) | |
| builder.add_node("search_and_ingest", search_and_ingest_node) | |
| builder.add_node("generate_answer", generate_answer_node) | |
| # (CORRECTED) Set a conditional entry point | |
| builder.set_conditional_entry_point( | |
| decide_entry_point, | |
| { | |
| "profile": "profile_builder", | |
| "qna": "reformulate_query" | |
| } | |
| ) | |
| # (CRITICAL FIX) Add a conditional edge after the profile builder | |
| builder.add_conditional_edges( | |
| "profile_builder", | |
| after_profile_build, | |
| { | |
| "recommend_plans": "plan_recommender", | |
| "end_turn": END | |
| } | |
| ) | |
| # Define graph edges | |
| builder.add_edge("profile_builder", END) # A profile turn is one full loop. The state is saved, and the next call will re-evaluate at the entry point. | |
| builder.add_edge("plan_recommender", END) | |
| builder.add_edge("reformulate_query", "retrieve_and_grade") | |
| builder.add_conditional_edges("retrieve_and_grade", should_search_web, {"search": "search_and_ingest", "generate": "generate_answer"}) | |
| builder.add_edge("search_and_ingest", "generate_answer") | |
| builder.add_edge("generate_answer", END) | |
| app = builder.compile(checkpointer=memory) | |
| # --- Interactive Test Harness (CORRECTED) --- | |
| # if __name__ == '__main__': | |
| # print("--- InsuCompass AI Unified Orchestrator Interactive Test ---") | |
| # print("Type 'quit' at any time to exit.") | |
| # test_thread_id = f"interactive-test-{uuid.uuid4()}" | |
| # thread_config = {"configurable": {"thread_id": test_thread_id}} | |
| # print(f"Using conversation thread_id: {test_thread_id}") | |
| # # Initial state for a new user | |
| # current_state = { | |
| # "user_profile": { | |
| # "zip_code": "90210", "county": "Los Angeles", "state": "California", "state_abbreviation": "CA", | |
| # "age": 45, "gender": "Male", "household_size": 2, "income": 120000, | |
| # "employment_status": "employed_with_employer_coverage", "citizenship": "US Citizen", | |
| # "medical_history": None, "medications": None, "special_cases": None | |
| # }, | |
| # "user_message": "START_PROFILE_BUILDING", | |
| # "is_profile_complete": False, | |
| # "conversation_history": [], | |
| # } | |
| # while True: | |
| # print("\n" + "="*20 + " INVOKING GRAPH " + "="*20) | |
| # print(f"Sending message: '{current_state['user_message']}'") | |
| # # The graph is invoked with the current state | |
| # final_state = app.invoke(current_state, config=thread_config) | |
| # # Update our local state from the graph's final output | |
| # current_state = final_state | |
| # agent_response = current_state["generation"] | |
| # print(f"\nInsuCompass Agent: {agent_response}") | |
| # # Get the next input from the user | |
| # if current_state["is_profile_complete"]: | |
| # # If the last response was the completion message, prompt for a question | |
| # if "profile is complete" in agent_response: | |
| # next_message = input("Your Question > ") | |
| # else: # It was a Q&A response, so prompt for another question | |
| # next_message = input("Your Follow-up Question > ") | |
| # else: | |
| # next_message = input("Your Answer > ") | |
| # if next_message.lower() == 'quit': | |
| # print("Exiting test.") | |
| # break | |
| # # Prepare the state for the next turn | |
| # current_state["user_message"] = next_message |