#!/usr/bin/env python3 """ PDF Analysis Agent - Extracts and analyzes legal documents from PDF files """ import os import logging import pypdf from typing import Optional from langgraph.graph import StateGraph, END from langchain_openai import ChatOpenAI from langchain_google_genai import ChatGoogleGenerativeAI from langchain_core.messages import HumanMessage, SystemMessage from mistralai import Mistral from agent_states.pdf_analyzer_state import PDFAnalyzerState from prompts.pdf_analyzer import SYSTEM_PROMPT, EXTRACT_ACTORS_PROMPT, EXTRACT_KEY_DETAILS_PROMPT, GENERATE_SUMMARY_PROMPT logger = logging.getLogger(__name__) class PDFAnalyzerAgent: """Agent that analyzes PDF documents to extract summary, actors, and key details""" def __init__(self, llm, mistral_client: Optional[Mistral]): self.llm = llm self.mistral_client = mistral_client self.workflow = self._build_workflow() def _build_workflow(self): workflow = StateGraph(PDFAnalyzerState) workflow.add_node("detect_pdf_type", self._detect_pdf_type) workflow.add_node("extract_content", self._extract_content) workflow.add_node("ocr_pdf", self._ocr_pdf) workflow.add_node("extract_actors", self._extract_actors) workflow.add_node("extract_key_details", self._extract_key_details) workflow.add_node("generate_summary", self._generate_summary) workflow.set_entry_point("detect_pdf_type") workflow.add_conditional_edges("detect_pdf_type", self._should_use_ocr, {"ocr": "ocr_pdf", "extract": "extract_content"}) workflow.add_edge("ocr_pdf", "extract_actors") workflow.add_edge("extract_content", "extract_actors") workflow.add_edge("extract_actors", "extract_key_details") workflow.add_edge("extract_key_details", "generate_summary") workflow.add_edge("generate_summary", END) return workflow.compile() def _should_use_ocr(self, state: PDFAnalyzerState) -> str: return "ocr" if state.get("needs_ocr", False) else "extract" async def _detect_pdf_type(self, state: PDFAnalyzerState) -> PDFAnalyzerState: import os from pathlib import Path file_path = state["pdf_path"] file_ext = Path(file_path).suffix.lower() # Check if it's an image file if file_ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp']: state["needs_ocr"] = True state["document_type"] = "image" logger.info(f"🖼️ Image file detected: {file_ext}") elif file_ext == '.pdf': with open(file_path, 'rb') as f: reader = pypdf.PdfReader(f) text = reader.pages[0].extract_text() if reader.pages else "" state["needs_ocr"] = not text or len(text.strip()) < 50 state["document_type"] = "pdf" logger.info(f"📄 PDF detected, OCR needed: {state['needs_ocr']}") else: # Unknown format, try OCR as fallback state["needs_ocr"] = True state["document_type"] = "unknown" logger.warning(f"⚠️ Unknown file format: {file_ext}, will attempt OCR") state["processing_status"] = "extracting" return state async def _ocr_pdf(self, state: PDFAnalyzerState) -> PDFAnalyzerState: try: import base64 from pathlib import Path file_path = state["pdf_path"] file_ext = Path(file_path).suffix.lower() # Determine MIME type mime_types = { '.pdf': 'application/pdf', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.bmp': 'image/bmp', '.tiff': 'image/tiff', '.webp': 'image/webp' } mime_type = mime_types.get(file_ext, 'application/octet-stream') # Read and encode file with open(file_path, 'rb') as f: file_bytes = f.read() file_b64 = base64.b64encode(file_bytes).decode() # Construct data URI data_uri = f"data:{mime_type};base64,{file_b64}" logger.info(f"🔍 Running OCR on {file_ext} file ({mime_type})") result = self.mistral_client.ocr.process( model="mistral-ocr-latest", document={"type": "document_url", "document_url": data_uri} ) text = "\n\n".join([p.markdown for p in result.pages]) state["extracted_text"] = text state["ocr_performed"] = True state["ocr_method"] = "mistral" logger.info("✅ OCR complete") except Exception as e: logger.error(f"❌ OCR failed: {e}") state["processing_status"] = "failed" state["extracted_text"] = f"Error: OCR processing failed - {str(e)}" return state state["processing_status"] = "analyzing" return state async def _extract_content(self, state: PDFAnalyzerState) -> PDFAnalyzerState: """Extract text content from PDF file""" state["processing_status"] = "extracting" try: pdf_path = state["pdf_path"] logger.info(f"📄 Extracting content from PDF: {pdf_path}") extracted_text = "" with open(pdf_path, 'rb') as file: reader = pypdf.PdfReader(file) num_pages = len(reader.pages) for page_num in range(num_pages): page = reader.pages[page_num] extracted_text += page.extract_text() + "\n\n" state["extracted_text"] = extracted_text state["processing_status"] = "analyzing" logger.info(f"✅ Extracted {num_pages} pages from PDF") except Exception as e: logger.error(f"❌ Error extracting PDF content: {e}") raise return state async def _extract_actors(self, state: PDFAnalyzerState) -> PDFAnalyzerState: """Extract and categorize actors from the document - first analysis step""" if not state.get("extracted_text"): logger.warning("⚠️ No extracted text available for actors extraction") return state logger.info("👥 Extracting actors...") # Build conversation history with system message and document content intermediate_steps = state.get("intermediate_steps", []) # Add system message if not present if not intermediate_steps: intermediate_steps.append(SystemMessage(content=SYSTEM_PROMPT)) intermediate_steps.append(HumanMessage(content=f"Here is the legal document to analyze:\n\n{state['extracted_text']}")) # Add prompt to extract actors intermediate_steps.append(HumanMessage(content=EXTRACT_ACTORS_PROMPT)) response = await self.llm.ainvoke(intermediate_steps) intermediate_steps.append(response) state["actors"] = response.content state["intermediate_steps"] = intermediate_steps # Log detailed LLM response logger.info("=" * 80) logger.info("🤖 LLM RESPONSE (extract_actors)") logger.info("=" * 80) logger.info(f"📊 Response length: {len(response.content)} characters") logger.info(f"📄 Content preview (first 300 chars):") logger.info(response.content[:300] + ("..." if len(response.content) > 300 else "")) logger.info("=" * 80) logger.info("✅ Actors extracted") return state async def _extract_key_details(self, state: PDFAnalyzerState) -> PDFAnalyzerState: """Extract key details from the document - second analysis step""" if not state.get("extracted_text"): logger.warning("⚠️ No extracted text available for key details extraction") return state logger.info("🔑 Extracting key details...") # Continue the conversation intermediate_steps = state.get("intermediate_steps", []) intermediate_steps.append(HumanMessage(content=EXTRACT_KEY_DETAILS_PROMPT)) response = await self.llm.ainvoke(intermediate_steps) intermediate_steps.append(response) state["key_details"] = response.content state["intermediate_steps"] = intermediate_steps # Log detailed LLM response logger.info("=" * 80) logger.info("🤖 LLM RESPONSE (extract_key_details)") logger.info("=" * 80) logger.info(f"📊 Response length: {len(response.content)} characters") logger.info(f"📄 Content preview (first 300 chars):") logger.info(response.content[:300] + ("..." if len(response.content) > 300 else "")) logger.info("=" * 80) logger.info("✅ Key details extracted") return state async def _generate_summary(self, state: PDFAnalyzerState) -> PDFAnalyzerState: """Generate high-level summary of the document - final analysis step""" if not state.get("extracted_text"): logger.warning("⚠️ No extracted text available for summary generation") return state logger.info("📝 Generating document summary...") # Continue the conversation intermediate_steps = state.get("intermediate_steps", []) intermediate_steps.append(HumanMessage(content=GENERATE_SUMMARY_PROMPT)) response = await self.llm.ainvoke(intermediate_steps) intermediate_steps.append(response) state["summary"] = response.content state["intermediate_steps"] = intermediate_steps state["processing_status"] = "complete" # Log detailed LLM response logger.info("=" * 80) logger.info("🤖 LLM RESPONSE (generate_summary)") logger.info("=" * 80) logger.info(f"📊 Response length: {len(response.content)} characters") logger.info(f"📄 Content preview (first 300 chars):") logger.info(response.content[:300] + ("..." if len(response.content) > 300 else "")) logger.info("=" * 80) logger.info("✅ Summary generated") return state async def analyze_pdf(self, pdf_path: str) -> dict: """ Main method to analyze a PDF document Args: pdf_path: Path to the PDF file to analyze Returns: Dictionary containing summary, actors, and key details """ from pathlib import Path file_ext = Path(pdf_path).suffix.lower() initial_state: PDFAnalyzerState = { "pdf_path": pdf_path, "pdf_content": None, "extracted_text": None, "summary": None, "actors": None, "key_details": None, "document_type": "image" if file_ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'] else None, "processing_status": "pending", "intermediate_steps": [], "needs_ocr": False, "ocr_performed": False, "ocr_method": None } logger.info(f"🚀 Starting PDF analysis for: {pdf_path}") final_state = await self.workflow.ainvoke(initial_state) logger.info(f"✅ PDF analysis complete. Status: {final_state['processing_status']}") return { "summary": final_state.get("summary"), "actors": final_state.get("actors"), "key_details": final_state.get("key_details"), "processing_status": final_state.get("processing_status"), "ocr_used": final_state.get("ocr_performed", False), "ocr_method": final_state.get("ocr_method") }