CyberLegalAIendpoint / subagents /pdf_analyzer.py
Charles Grandjean
force tool call more logs
47fa0a5
#!/usr/bin/env python3
"""
PDF Analysis Agent - Extracts and analyzes legal documents from PDF files
"""
import os
import logging
import pypdf
from typing import Optional
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, SystemMessage
from mistralai import Mistral
from agent_states.pdf_analyzer_state import PDFAnalyzerState
from prompts.pdf_analyzer import SYSTEM_PROMPT, EXTRACT_ACTORS_PROMPT, EXTRACT_KEY_DETAILS_PROMPT, GENERATE_SUMMARY_PROMPT
logger = logging.getLogger(__name__)
class PDFAnalyzerAgent:
"""Agent that analyzes PDF documents to extract summary, actors, and key details"""
def __init__(self, llm, mistral_client: Optional[Mistral]):
self.llm = llm
self.mistral_client = mistral_client
self.workflow = self._build_workflow()
def _build_workflow(self):
workflow = StateGraph(PDFAnalyzerState)
workflow.add_node("detect_pdf_type", self._detect_pdf_type)
workflow.add_node("extract_content", self._extract_content)
workflow.add_node("ocr_pdf", self._ocr_pdf)
workflow.add_node("extract_actors", self._extract_actors)
workflow.add_node("extract_key_details", self._extract_key_details)
workflow.add_node("generate_summary", self._generate_summary)
workflow.set_entry_point("detect_pdf_type")
workflow.add_conditional_edges("detect_pdf_type", self._should_use_ocr, {"ocr": "ocr_pdf", "extract": "extract_content"})
workflow.add_edge("ocr_pdf", "extract_actors")
workflow.add_edge("extract_content", "extract_actors")
workflow.add_edge("extract_actors", "extract_key_details")
workflow.add_edge("extract_key_details", "generate_summary")
workflow.add_edge("generate_summary", END)
return workflow.compile()
def _should_use_ocr(self, state: PDFAnalyzerState) -> str:
return "ocr" if state.get("needs_ocr", False) else "extract"
async def _detect_pdf_type(self, state: PDFAnalyzerState) -> PDFAnalyzerState:
import os
from pathlib import Path
file_path = state["pdf_path"]
file_ext = Path(file_path).suffix.lower()
# Check if it's an image file
if file_ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp']:
state["needs_ocr"] = True
state["document_type"] = "image"
logger.info(f"πŸ–ΌοΈ Image file detected: {file_ext}")
elif file_ext == '.pdf':
with open(file_path, 'rb') as f:
reader = pypdf.PdfReader(f)
text = reader.pages[0].extract_text() if reader.pages else ""
state["needs_ocr"] = not text or len(text.strip()) < 50
state["document_type"] = "pdf"
logger.info(f"πŸ“„ PDF detected, OCR needed: {state['needs_ocr']}")
else:
# Unknown format, try OCR as fallback
state["needs_ocr"] = True
state["document_type"] = "unknown"
logger.warning(f"⚠️ Unknown file format: {file_ext}, will attempt OCR")
state["processing_status"] = "extracting"
return state
async def _ocr_pdf(self, state: PDFAnalyzerState) -> PDFAnalyzerState:
try:
import base64
from pathlib import Path
file_path = state["pdf_path"]
file_ext = Path(file_path).suffix.lower()
# Determine MIME type
mime_types = {
'.pdf': 'application/pdf',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.bmp': 'image/bmp',
'.tiff': 'image/tiff',
'.webp': 'image/webp'
}
mime_type = mime_types.get(file_ext, 'application/octet-stream')
# Read and encode file
with open(file_path, 'rb') as f:
file_bytes = f.read()
file_b64 = base64.b64encode(file_bytes).decode()
# Construct data URI
data_uri = f"data:{mime_type};base64,{file_b64}"
logger.info(f"πŸ” Running OCR on {file_ext} file ({mime_type})")
result = self.mistral_client.ocr.process(
model="mistral-ocr-latest",
document={"type": "document_url", "document_url": data_uri}
)
text = "\n\n".join([p.markdown for p in result.pages])
state["extracted_text"] = text
state["ocr_performed"] = True
state["ocr_method"] = "mistral"
logger.info("βœ… OCR complete")
except Exception as e:
logger.error(f"❌ OCR failed: {e}")
state["processing_status"] = "failed"
state["extracted_text"] = f"Error: OCR processing failed - {str(e)}"
return state
state["processing_status"] = "analyzing"
return state
async def _extract_content(self, state: PDFAnalyzerState) -> PDFAnalyzerState:
"""Extract text content from PDF file"""
state["processing_status"] = "extracting"
try:
pdf_path = state["pdf_path"]
logger.info(f"πŸ“„ Extracting content from PDF: {pdf_path}")
extracted_text = ""
with open(pdf_path, 'rb') as file:
reader = pypdf.PdfReader(file)
num_pages = len(reader.pages)
for page_num in range(num_pages):
page = reader.pages[page_num]
extracted_text += page.extract_text() + "\n\n"
state["extracted_text"] = extracted_text
state["processing_status"] = "analyzing"
logger.info(f"βœ… Extracted {num_pages} pages from PDF")
except Exception as e:
logger.error(f"❌ Error extracting PDF content: {e}")
raise
return state
async def _extract_actors(self, state: PDFAnalyzerState) -> PDFAnalyzerState:
"""Extract and categorize actors from the document - first analysis step"""
if not state.get("extracted_text"):
logger.warning("⚠️ No extracted text available for actors extraction")
return state
logger.info("πŸ‘₯ Extracting actors...")
# Build conversation history with system message and document content
intermediate_steps = state.get("intermediate_steps", [])
# Add system message if not present
if not intermediate_steps:
intermediate_steps.append(SystemMessage(content=SYSTEM_PROMPT))
intermediate_steps.append(HumanMessage(content=f"Here is the legal document to analyze:\n\n{state['extracted_text']}"))
# Add prompt to extract actors
intermediate_steps.append(HumanMessage(content=EXTRACT_ACTORS_PROMPT))
response = await self.llm.ainvoke(intermediate_steps)
intermediate_steps.append(response)
state["actors"] = response.content
state["intermediate_steps"] = intermediate_steps
# Log detailed LLM response
logger.info("=" * 80)
logger.info("πŸ€– LLM RESPONSE (extract_actors)")
logger.info("=" * 80)
logger.info(f"πŸ“Š Response length: {len(response.content)} characters")
logger.info(f"πŸ“„ Content preview (first 300 chars):")
logger.info(response.content[:300] + ("..." if len(response.content) > 300 else ""))
logger.info("=" * 80)
logger.info("βœ… Actors extracted")
return state
async def _extract_key_details(self, state: PDFAnalyzerState) -> PDFAnalyzerState:
"""Extract key details from the document - second analysis step"""
if not state.get("extracted_text"):
logger.warning("⚠️ No extracted text available for key details extraction")
return state
logger.info("πŸ”‘ Extracting key details...")
# Continue the conversation
intermediate_steps = state.get("intermediate_steps", [])
intermediate_steps.append(HumanMessage(content=EXTRACT_KEY_DETAILS_PROMPT))
response = await self.llm.ainvoke(intermediate_steps)
intermediate_steps.append(response)
state["key_details"] = response.content
state["intermediate_steps"] = intermediate_steps
# Log detailed LLM response
logger.info("=" * 80)
logger.info("πŸ€– LLM RESPONSE (extract_key_details)")
logger.info("=" * 80)
logger.info(f"πŸ“Š Response length: {len(response.content)} characters")
logger.info(f"πŸ“„ Content preview (first 300 chars):")
logger.info(response.content[:300] + ("..." if len(response.content) > 300 else ""))
logger.info("=" * 80)
logger.info("βœ… Key details extracted")
return state
async def _generate_summary(self, state: PDFAnalyzerState) -> PDFAnalyzerState:
"""Generate high-level summary of the document - final analysis step"""
if not state.get("extracted_text"):
logger.warning("⚠️ No extracted text available for summary generation")
return state
logger.info("πŸ“ Generating document summary...")
# Continue the conversation
intermediate_steps = state.get("intermediate_steps", [])
intermediate_steps.append(HumanMessage(content=GENERATE_SUMMARY_PROMPT))
response = await self.llm.ainvoke(intermediate_steps)
intermediate_steps.append(response)
state["summary"] = response.content
state["intermediate_steps"] = intermediate_steps
state["processing_status"] = "complete"
# Log detailed LLM response
logger.info("=" * 80)
logger.info("πŸ€– LLM RESPONSE (generate_summary)")
logger.info("=" * 80)
logger.info(f"πŸ“Š Response length: {len(response.content)} characters")
logger.info(f"πŸ“„ Content preview (first 300 chars):")
logger.info(response.content[:300] + ("..." if len(response.content) > 300 else ""))
logger.info("=" * 80)
logger.info("βœ… Summary generated")
return state
async def analyze_pdf(self, pdf_path: str) -> dict:
"""
Main method to analyze a PDF document
Args:
pdf_path: Path to the PDF file to analyze
Returns:
Dictionary containing summary, actors, and key details
"""
from pathlib import Path
file_ext = Path(pdf_path).suffix.lower()
initial_state: PDFAnalyzerState = {
"pdf_path": pdf_path,
"pdf_content": None,
"extracted_text": None,
"summary": None,
"actors": None,
"key_details": None,
"document_type": "image" if file_ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'] else None,
"processing_status": "pending",
"intermediate_steps": [],
"needs_ocr": False,
"ocr_performed": False,
"ocr_method": None
}
logger.info(f"πŸš€ Starting PDF analysis for: {pdf_path}")
final_state = await self.workflow.ainvoke(initial_state)
logger.info(f"βœ… PDF analysis complete. Status: {final_state['processing_status']}")
return {
"summary": final_state.get("summary"),
"actors": final_state.get("actors"),
"key_details": final_state.get("key_details"),
"processing_status": final_state.get("processing_status"),
"ocr_used": final_state.get("ocr_performed", False),
"ocr_method": final_state.get("ocr_method")
}