Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| FastAPI interface for the LangGraph cyber-legal assistant | |
| """ | |
| import os | |
| import asyncio | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from pydantic import BaseModel, Field | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| from dotenv import load_dotenv | |
| from fastapi import Depends | |
| from fastapi.security import APIKeyHeader | |
| import secrets | |
| from structured_outputs.api_models import ( | |
| Message, DocumentAnalysis, ChatRequest, ChatResponse, | |
| HealthResponse, AnalyzePDFRequest, AnalyzePDFResponse, | |
| LawyerProfile | |
| ) | |
| from langraph_agent import CyberLegalAgent | |
| from utils.conversation_manager import ConversationManager | |
| from utils.utils import validate_query | |
| from utils.lightrag_client import LightRAGClient | |
| from utils import tools | |
| from subagents.lawyer_selector import LawyerSelectorAgent | |
| from subagents.lawyer_messenger import LawyerMessengerAgent | |
| from prompts.main import SYSTEM_PROMPT_CLIENT, SYSTEM_PROMPT_LAWYER | |
| from subagents.pdf_analyzer import PDFAnalyzerAgent | |
| from langchain_openai import ChatOpenAI | |
| from mistralai import Mistral | |
| import logging | |
| import traceback | |
| import base64 | |
| import tempfile | |
| import os as pathlib | |
| from langchain_tavily import TavilySearch | |
| import resend | |
| # Load environment variables | |
| load_dotenv(dotenv_path=".env", override=False) | |
| logger = logging.getLogger(__name__) | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="CyberLegal AI API", | |
| description="LangGraph-powered cyber-legal assistant API", | |
| version="1.0.0" | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| API_PASSWORD = os.getenv("API_PASSWORD", "") # set this in HF Space Secrets | |
| api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) | |
| def require_password(x_api_key: str = Depends(api_key_header)): | |
| if not API_PASSWORD: | |
| return # if you forgot to set it, it wonβt lock you out | |
| if x_api_key and secrets.compare_digest(x_api_key, API_PASSWORD): | |
| return | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| # Global agent instance | |
| agent_instance = None | |
| class CyberLegalAPI: | |
| """ | |
| API wrapper for the LangGraph agent | |
| """ | |
| def __init__(self): | |
| load_dotenv(dotenv_path=".env", override=True) | |
| llm_provider = os.getenv("LLM_PROVIDER", "openai").lower() | |
| self.llm_provider = llm_provider | |
| llm = ChatOpenAI( | |
| model=os.getenv("LLM_MODEL", "gpt-5-nano-2025-08-07"), | |
| reasoning_effort="low", | |
| api_key=os.getenv("OPENAI_API_KEY"), | |
| base_url=os.getenv("LLM_BINDING_HOST", "https://api.openai.com/v1"), | |
| default_headers={ | |
| "X-Cerebras-3rd-Party-Integration": "langgraph" | |
| } | |
| ) | |
| mistral_client = Mistral(api_key=os.getenv("MISTRAL_API_KEY")) | |
| logger.info("β Mistral OCR client initialized") | |
| # Initialize subagents and set them globally in tools.py | |
| global lawyer_selector_agent, lawyer_messenger_agent, lightrag_client, tavily_search | |
| lawyer_selector_agent = LawyerSelectorAgent(llm=llm) | |
| tools.lawyer_selector_agent = lawyer_selector_agent | |
| lawyer_messenger_agent = LawyerMessengerAgent(llm=llm) | |
| tools.lawyer_messenger_agent = lawyer_messenger_agent | |
| logger.info("β LawyerMessengerAgent initialized") | |
| lightrag_client = LightRAGClient() | |
| tools.lightrag_client = lightrag_client | |
| tavily_search = TavilySearch( | |
| api_key=os.getenv("TAVILY_API_KEY"), | |
| max_results=5, | |
| topic="general", | |
| search_depth="advanced", | |
| include_answer=True, | |
| include_raw_content=False | |
| ) | |
| tools.tavily_search = tavily_search | |
| logger.info("β Tavily search client initialized") | |
| # Initialize Resend | |
| resend.api_key = os.getenv("RESEND_API_KEY") | |
| logger.info("β Resend client initialized") | |
| self.agent_client = CyberLegalAgent(llm=llm, tools=tools.tools_for_client,tools_facade=tools.tools_for_client_facade) | |
| self.agent_lawyer = CyberLegalAgent(llm=llm, tools=tools.tools_for_lawyer,tools_facade=tools.tools_for_lawyer_facade) | |
| self.pdf_analyzer = PDFAnalyzerAgent(llm=llm, mistral_client=mistral_client) | |
| self.conversation_manager = ConversationManager() | |
| logger.info(f"π§ CyberLegalAPI initialized with {llm_provider.upper()} provider") | |
| def _build_lawyer_prompt(self, document_analyses: Optional[List[DocumentAnalysis]], jurisdiction: str, lawyer_profile: Optional[LawyerProfile] = None) -> str: | |
| """Build lawyer prompt with optional document context and lawyer profile""" | |
| prompt_parts = [] | |
| # Add lawyer profile context if available | |
| if lawyer_profile: | |
| profile_text = "\n\n### Lawyer Profile Context\n" | |
| if lawyer_profile.full_name: | |
| profile_text += f"Name: {lawyer_profile.full_name}\n" | |
| if lawyer_profile.primary_specialty: | |
| profile_text += f"Primary Specialty: {lawyer_profile.primary_specialty}\n" | |
| if lawyer_profile.legal_specialties: | |
| profile_text += f"Specialties: {', '.join(lawyer_profile.legal_specialties)}\n" | |
| if lawyer_profile.experience_level: | |
| profile_text += f"Experience Level: {lawyer_profile.experience_level}\n" | |
| if lawyer_profile.languages: | |
| profile_text += f"Languages: {', '.join(lawyer_profile.languages)}\n" | |
| if lawyer_profile.lawyer_description: | |
| profile_text += f"Description: {lawyer_profile.lawyer_description}\n" | |
| profile_text += "\nWhen answering, consider this lawyer's expertise and experience level. Tailor your responses to be appropriate for their seniority and specialization.\n" | |
| prompt_parts.append(profile_text) | |
| # Add document analyses if available | |
| if document_analyses: | |
| docs_text = "\n### Documents parsed in the lawyer profile\n" | |
| for i, doc in enumerate(document_analyses, 1): | |
| docs_text += f"[Doc {i}] {doc.file_name}\n" | |
| if doc.summary: docs_text += f"Summary: {doc.summary}\n" | |
| if doc.actors: docs_text += f"Actors: {doc.actors}\n" | |
| if doc.key_details: docs_text += f"Key Details: {doc.key_details}\n" | |
| docs_text += "\n" | |
| docs_text += "Use these documents if the user's question is related to their content.\n" | |
| prompt_parts.append(docs_text) | |
| # Combine base prompt with context | |
| base_prompt = SYSTEM_PROMPT_LAWYER.format(jurisdiction=jurisdiction) | |
| if prompt_parts: | |
| return base_prompt + "\n".join(prompt_parts) | |
| return base_prompt | |
| async def process_request(self, request: ChatRequest) -> ChatResponse: | |
| """ | |
| Process chat request through the agent | |
| """ | |
| is_valid, error_msg = validate_query(request.message) | |
| if not is_valid: | |
| raise HTTPException(status_code=400, detail=error_msg) | |
| # Determine user type | |
| logger.info(f"Received request: {request}") | |
| # Select appropriate agent | |
| if request.userType == "lawyer": | |
| agent = self.agent_lawyer | |
| logger.info("π¨ββοΈ Using lawyer specialist agent") | |
| else: | |
| agent = self.agent_client | |
| logger.info("π€ Using client-friendly agent") | |
| # Convert conversation history format | |
| logger.info(f"Received this request: {request}") | |
| conversation_history = [] | |
| for msg in request.conversationHistory or []: | |
| conversation_history.append({ | |
| "role": msg.role, | |
| "content": msg.content | |
| }) | |
| logger.info(f"π Starting request processing - user_type: {request.userType}, jurisdiction: {request.jurisdiction}") | |
| logger.info(f"π¬ User query: {request.message}") | |
| try: | |
| # Build dynamic system prompt for lawyers with document analyses and/or lawyer profile | |
| if request.userType == "lawyer": | |
| system_prompt = self._build_lawyer_prompt( | |
| request.documentAnalyses, | |
| request.jurisdiction, | |
| request.lawyerProfile | |
| ) | |
| context_parts = [] | |
| if request.lawyerProfile: | |
| context_parts.append("lawyer profile") | |
| if request.documentAnalyses: | |
| context_parts.append(f"{len(request.documentAnalyses)} document analyses") | |
| if context_parts: | |
| logger.info(f"π Using lawyer prompt with {', '.join(context_parts)}") | |
| else: | |
| logger.info(f"π Using default lawyer prompt with jurisdiction: {request.jurisdiction}") | |
| else: | |
| system_prompt = SYSTEM_PROMPT_CLIENT.format(jurisdiction=request.jurisdiction) | |
| logger.info(f"π€ Using client prompt with jurisdiction: {request.jurisdiction}") | |
| # Process through selected agent with raw message and conversation history | |
| logger.info(f"π€ Calling agent.process_query with jurisdiction: {request.jurisdiction}") | |
| result = await agent.process_query( | |
| user_query=request.message, | |
| client_id=request.clientId, | |
| conversation_history=conversation_history, | |
| jurisdiction=request.jurisdiction, | |
| system_prompt=system_prompt | |
| ) | |
| logger.info(f"β Agent processing completed successfully") | |
| # Create response | |
| response = ChatResponse( | |
| response=result["response"], | |
| processing_time=result.get("processing_time", 0.0), | |
| references=result.get("references", []), | |
| timestamp=result.get("timestamp", datetime.now().isoformat()), | |
| error=result.get("error") | |
| ) | |
| logger.info(f"π€ Returning response to user") | |
| return response | |
| except Exception as e: | |
| # Log full traceback for debugging | |
| error_traceback = traceback.format_exc() | |
| logger.error(f"β Request processing failed: {str(e)}") | |
| logger.error(f"π Full traceback:\n{error_traceback}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail={ | |
| "error": "Processing failed", | |
| "message": str(e), | |
| "traceback": error_traceback, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| ) | |
| async def health_check(self) -> HealthResponse: | |
| """ | |
| Check health status of the API and dependencies | |
| """ | |
| try: | |
| from utils.lightrag_client import LightRAGClient | |
| lightrag_client = LightRAGClient() | |
| lightrag_healthy = lightrag_client.health_check() | |
| return HealthResponse( | |
| status="healthy" if lightrag_healthy else "degraded", | |
| agent_ready=True, | |
| lightrag_healthy=lightrag_healthy, | |
| timestamp=datetime.now().isoformat() | |
| ) | |
| except Exception as e: | |
| return HealthResponse( | |
| status="unhealthy", | |
| agent_ready=False, | |
| lightrag_healthy=False, | |
| timestamp=datetime.now().isoformat() | |
| ) | |
| async def analyze_pdf(self, request: AnalyzePDFRequest) -> AnalyzePDFResponse: | |
| """ | |
| Analyze PDF document through the PDF analyzer agent | |
| """ | |
| start_time = datetime.now() | |
| try: | |
| # Decode base64 PDF content | |
| pdf_bytes = base64.b64decode(request.pdf_content) | |
| # Create temporary file to save PDF | |
| with tempfile.NamedTemporaryFile(mode='wb', suffix='.pdf', delete=False) as tmp_file: | |
| tmp_file.write(pdf_bytes) | |
| tmp_file_path = tmp_file.name | |
| logger.info(f"π Analyzing PDF: {request.filename}") | |
| try: | |
| # Analyze the PDF | |
| result = await self.pdf_analyzer.analyze_pdf(tmp_file_path) | |
| # Calculate processing time | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| # Create response | |
| response = AnalyzePDFResponse( | |
| actors=result.get("actors", ""), | |
| key_details=result.get("key_details", ""), | |
| summary=result.get("summary", ""), | |
| processing_status=result.get("processing_status", "unknown"), | |
| processing_time=processing_time, | |
| timestamp=datetime.now().isoformat(), | |
| error=result.get("error") | |
| ) | |
| logger.info(f"β PDF analysis completed in {processing_time:.2f}s") | |
| return response | |
| finally: | |
| # Clean up temporary file | |
| if pathlib.path.exists(tmp_file_path): | |
| pathlib.unlink(tmp_file_path) | |
| logger.debug(f"ποΈ Cleaned up temporary file: {tmp_file_path}") | |
| except Exception as e: | |
| error_traceback = traceback.format_exc() | |
| logger.error(f"β PDF analysis failed: {str(e)}") | |
| logger.error(f"π Full traceback:\n{error_traceback}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail={ | |
| "error": "PDF analysis failed", | |
| "message": str(e), | |
| "traceback": error_traceback, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| ) | |
| # Initialize API instance | |
| api = CyberLegalAPI() | |
| async def startup_event(): | |
| """ | |
| Initialize the API on startup | |
| """ | |
| llm_provider = os.getenv("LLM_PROVIDER", "openai").upper() | |
| print("π Starting CyberLegal AI API...") | |
| print(f"π€ LLM Provider: {llm_provider}") | |
| print("π§ Powered by: LangGraph + LightRAG") | |
| print("π API endpoints:") | |
| print(" - POST /chat - Chat with the assistant") | |
| print(" - GET /health - Health check") | |
| print(" - GET / - API info") | |
| async def chat_endpoint(request: ChatRequest): | |
| """ | |
| Chat endpoint for the cyber-legal assistant | |
| Args: | |
| request: Chat request with message, user_type (client/lawyer), and history | |
| Returns: | |
| ChatResponse with assistant's response and metadata | |
| User Types: | |
| - client: For general users (default) - client-friendly language, can find lawyers | |
| - lawyer: For legal professionals - technical language, knowledge graph access only | |
| """ | |
| return await api.process_request(request) | |
| async def health_endpoint(): | |
| """ | |
| Health check endpoint | |
| Returns: | |
| HealthResponse with system status | |
| """ | |
| return await api.health_check() | |
| async def analyze_pdf_endpoint(request: AnalyzePDFRequest): | |
| """ | |
| Analyze document endpoint (PDF or images) | |
| Args: | |
| request: Document analysis request with base64-encoded content | |
| - Supports: PDF, JPG, JPEG, PNG, BMP, TIFF, WEBP | |
| Returns: | |
| AnalyzePDFResponse with actors, key_details, summary, and metadata | |
| Usage: | |
| - Upload a PDF or image file as base64 encoded string | |
| - PDFs: Text-based (direct extraction) or scanned (OCR) | |
| - Images: Always use Mistral OCR | |
| - The endpoint extracts text, analyzes actors, key details, and generates summary | |
| - Results are compact and suitable for further processing | |
| Supported Formats: | |
| - PDF (.pdf): Both text-based and scanned documents | |
| - Images (.jpg, .jpeg, .png, .bmp, .tiff, .webp): Using Mistral OCR | |
| """ | |
| return await api.analyze_pdf(request) | |
| async def root(): | |
| """ | |
| Root endpoint with API information | |
| """ | |
| llm_provider = os.getenv("LLM_PROVIDER", "openai").upper() | |
| technology_map = { | |
| "OPENAI": "LangGraph + RAG + Cerebras (GPT-5-Nano)" | |
| } | |
| return { | |
| "name": "CyberLegal AI API", | |
| "version": "1.0.0", | |
| "description": "LangGraph-powered cyber-legal assistant API", | |
| "llm_provider": llm_provider, | |
| "technology": technology_map.get(llm_provider, "LangGraph + RAG + Cerebras"), | |
| "endpoints": { | |
| "chat": "POST /chat - Chat with the assistant", | |
| "analyze-pdf": "POST /analyze-pdf - Analyze PDF document", | |
| "health": "GET /health - Health check" | |
| }, | |
| "expertise": [ | |
| "GDPR", "NIS2", "DORA", "Cyber Resilience Act", "eIDAS 2.0" | |
| ] | |
| } | |
| async def global_exception_handler(request, exc): | |
| """ | |
| Global exception handler with full traceback for debugging | |
| """ | |
| error_traceback = traceback.format_exc() | |
| logger.error(f"β Unhandled exception: {str(exc)}") | |
| logger.error(f"π Full traceback:\n{error_traceback}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "error": "Internal server error", | |
| "detail": str(exc), | |
| "traceback": error_traceback, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| ) | |
| if __name__ == "__main__": | |
| port = int(os.getenv("PORT", os.getenv("API_PORT", "8000"))) | |
| uvicorn.run( | |
| "agent_api:app", | |
| host="0.0.0.0", | |
| port=port, | |
| reload=False, | |
| log_level="info" | |
| ) | |