Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Lawyer Selection Agent - Analyzes conversations and recommends the best lawyers | |
| """ | |
| import os | |
| import json | |
| import aiohttp | |
| from typing import List, Optional, Literal | |
| from dotenv import load_dotenv | |
| from langgraph.graph import StateGraph, END | |
| from langchain_core.messages import HumanMessage, SystemMessage, AIMessage | |
| from pydantic import BaseModel, Field | |
| from logging import getLogger | |
| from prompts.lawyer_selector import LAWYER_SELECTION_PROMPT | |
| logger = getLogger(__name__) | |
| load_dotenv() | |
| class LawyerSelectorAgent: | |
| """Simple agent that analyzes conversations and selects top 3 lawyers""" | |
| def __init__(self, llm): | |
| self.llm = llm | |
| self.workflow = self._build_workflow() | |
| def _build_workflow(self): | |
| workflow = StateGraph(dict) | |
| workflow.add_node("select_lawyers", self._select_lawyers) | |
| workflow.set_entry_point("select_lawyers") | |
| workflow.add_edge("select_lawyers", END) | |
| return workflow.compile() | |
| def _format_lawyers(self, lawyers: List[dict]) -> str: | |
| return "\n\n".join([ | |
| f"Lawyer ID: {l['lawyer_id']}\n- Name: {l['full_name']}\n- Specialty: {l['primary_specialty']}\n- Experience Level: {l.get('experience_level', 'N/A')}\n- Years: {l.get('experience_years', 'N/A')}\n- Description: {l.get('lawyer_description', 'N/A')}..." | |
| for l in lawyers | |
| ]) | |
| def _format_lawyer_profile(self, lawyer: dict, rank: int, reasoning: str) -> str: | |
| """Format a single lawyer profile for the result output""" | |
| lines = [ | |
| "\n" + "─" * 80, | |
| f"RECOMMENDATION #{rank}", | |
| "─" * 80, | |
| f"\n👤 {lawyer['full_name']}", | |
| f"\n📊 Experience: {lawyer.get('experience_years', 'N/A')} years ({lawyer.get('experience_level', 'N/A')})", | |
| f"🎯 Primary Specialty: {lawyer['primary_specialty']}", | |
| ] | |
| if lawyer.get('legal_specialties'): | |
| lines.append(f"\n📚 Legal Specialties:") | |
| for specialty in lawyer['legal_specialties']: | |
| lines.append(f" • {specialty}") | |
| if lawyer.get('jurisdiction'): | |
| lines.append(f"\n🌍 Jurisdiction: {lawyer['jurisdiction'].capitalize()}") | |
| if lawyer.get('languages'): | |
| lines.append(f"\n💬 Languages: {', '.join(lawyer['languages'])}") | |
| if lawyer.get('lawyer_description'): | |
| lines.append(f"\n📝 {lawyer['lawyer_description']}") | |
| lines.append(f"\n✅ Why this lawyer matches your case:") | |
| lines.append(f" {reasoning}") | |
| lines.append("") | |
| return "\n".join(lines) | |
| async def _fetch_lawyers_from_frontend(self) -> List[dict]: | |
| """Fetch lawyers from frontend API""" | |
| base_url = os.getenv("SUPABASE_BASE_URL") | |
| if not base_url: | |
| raise Exception("SUPABASE_BASE_URL not configured in environment") | |
| API_URL = f"{base_url}/functions/v1/get-lawyer-database" | |
| API_KEY = os.getenv("CYBERLGL_API_KEY") | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get( | |
| API_URL, | |
| headers={"X-API-Key": API_KEY} | |
| ) as response: | |
| if response.status != 200: | |
| error_text = await response.text() | |
| raise Exception(f"Frontend API error: {response.status} - {error_text}") | |
| data = await response.json() | |
| if data.get('success') and data.get('data'): | |
| return data['data'] | |
| else: | |
| raise Exception(f"Frontend API returned unexpected format: {data}") | |
| except aiohttp.ClientError as e: | |
| raise Exception(f"Failed to connect to frontend API: {str(e)}") | |
| except Exception as e: | |
| raise Exception(f"Error fetching lawyers from frontend: {str(e)}") | |
| async def _select_lawyers(self, state: dict) -> dict: | |
| # Fetch lawyers from frontend API | |
| try: | |
| lawyers = await self._fetch_lawyers_from_frontend() | |
| except Exception as e: | |
| logger.error(f"Failed to fetch lawyers: {str(e)}") | |
| state["result"] = f"❌ Error: {str(e)}\n\nPlease try again later or contact support." | |
| return state | |
| if not lawyers: | |
| state["result"] = "❌ No lawyers available in the system." | |
| return state | |
| # Extract valid lawyer IDs | |
| valid_lawyer_ids = [l['lawyer_id'] for l in lawyers] | |
| logger.info(f"Retrieved {len(lawyers)} lawyers with IDs: {valid_lawyer_ids}") | |
| # Create lawyer_id to lawyer data mapping for efficient lookup | |
| lawyer_map = {l['lawyer_id']: l for l in lawyers} | |
| # Dynamically create Pydantic model with Literal types for ID validation | |
| # Create a Literal type from the list of valid lawyer IDs | |
| LawyerID = Literal[tuple(valid_lawyer_ids)] if len(valid_lawyer_ids) > 0 else str | |
| class LawyerRanking(BaseModel): | |
| reasoning: str = Field(description="Client-friendly explanation of how this lawyer can help with their specific legal problem") | |
| rank: int = Field(description="1, 2, or 3") | |
| lawyer_id: LawyerID = Field(description=f"The unique ID of the lawyer from the retrieved list") | |
| class LawyerRankings(BaseModel): | |
| rankings: List[LawyerRanking] = Field(description="List of 0 to 3 lawyer rankings") | |
| # Format lawyers for prompt | |
| lawyers_text = self._format_lawyers(lawyers) | |
| prompt = LAWYER_SELECTION_PROMPT.format(lawyers=lawyers_text) | |
| # Convert message dicts to Message objects | |
| messages = [] | |
| for msg in state["messages"]: | |
| role = msg.get("role") if isinstance(msg, dict) else msg.role | |
| content = msg.get("content") if isinstance(msg, dict) else msg.content | |
| if role == "system": | |
| messages.append(SystemMessage(content=content)) | |
| elif role == "user": | |
| messages.append(HumanMessage(content=content)) | |
| elif role == "assistant": | |
| messages.append(AIMessage(content=content)) | |
| # Append selection prompt | |
| messages.append(HumanMessage(content=prompt)) | |
| # Log the full conversation being sent to LLM for debugging | |
| logger.info(f"Sending {len(messages)} messages to LLM for lawyer selection") | |
| for i, msg in enumerate(messages): | |
| logger.info(f"Message {i+1} ({type(msg).__name__}): {msg.content[:200]}...") | |
| # Use dynamic structured output with ID validation via Literal | |
| try: | |
| result = await self.llm.with_structured_output(LawyerRankings).ainvoke(messages) | |
| logger.info(f"Raw response from LLM: {result}") | |
| rankings = result.rankings | |
| logger.info(f"Received {len(rankings)} rankings from LLM") | |
| if rankings: | |
| for r in rankings: | |
| logger.info(f" - Rank {r.rank}: Lawyer ID {r.lawyer_id}, Reasoning: {r.reasoning[:100]}...") | |
| except Exception as e: | |
| logger.error(f"Structured output error: {str(e)}") | |
| state["result"] = f"❌ Error processing lawyer recommendations: {str(e)}" | |
| return state | |
| # Retrieve and concatenate lawyer profiles | |
| if not rankings: | |
| output = ["=" * 80, "LAWYER RECOMMENDATIONS", "=" * 80] | |
| output.append("\n❌ No lawyers available for this particular case.") | |
| output.append("Your legal issue may fall outside our current areas of expertise.") | |
| output.append("Please consider refining your request or contacting a general legal service.") | |
| else: | |
| output = ["=" * 80, f"{len(rankings)} RECOMMENDED LAWYERS FOR YOUR CASE", "=" * 80] | |
| for r in rankings: | |
| # Lookup lawyer by ID | |
| lawyer = lawyer_map.get(r.lawyer_id) | |
| if lawyer: | |
| output.append(self._format_lawyer_profile(lawyer, r.rank, r.reasoning)) | |
| else: | |
| logger.error(f"Lawyer with ID {r.lawyer_id} not found in lawyer_map") | |
| state["result"] = "\n".join(output) | |
| return state | |
| async def select_lawyers(self, conversation_history: List[dict]) -> str: | |
| result = await self.workflow.ainvoke({ | |
| "messages": conversation_history | |
| }) | |
| return result["result"] | |