Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| PDF Analysis Agent - Extracts and analyzes legal documents from PDF files | |
| """ | |
| import os | |
| import logging | |
| import pypdf | |
| from typing import Optional | |
| from langgraph.graph import StateGraph, END | |
| from langchain_openai import ChatOpenAI | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| from mistralai import Mistral | |
| from agent_states.pdf_analyzer_state import PDFAnalyzerState | |
| from prompts.pdf_analyzer import SYSTEM_PROMPT, EXTRACT_ACTORS_PROMPT, EXTRACT_KEY_DETAILS_PROMPT, GENERATE_SUMMARY_PROMPT | |
| logger = logging.getLogger(__name__) | |
| class PDFAnalyzerAgent: | |
| """Agent that analyzes PDF documents to extract summary, actors, and key details""" | |
| def __init__(self, llm, mistral_client: Optional[Mistral]): | |
| self.llm = llm | |
| self.mistral_client = mistral_client | |
| self.workflow = self._build_workflow() | |
| def _build_workflow(self): | |
| workflow = StateGraph(PDFAnalyzerState) | |
| workflow.add_node("detect_pdf_type", self._detect_pdf_type) | |
| workflow.add_node("extract_content", self._extract_content) | |
| workflow.add_node("ocr_pdf", self._ocr_pdf) | |
| workflow.add_node("extract_actors", self._extract_actors) | |
| workflow.add_node("extract_key_details", self._extract_key_details) | |
| workflow.add_node("generate_summary", self._generate_summary) | |
| workflow.set_entry_point("detect_pdf_type") | |
| workflow.add_conditional_edges("detect_pdf_type", self._should_use_ocr, {"ocr": "ocr_pdf", "extract": "extract_content"}) | |
| workflow.add_edge("ocr_pdf", "extract_actors") | |
| workflow.add_edge("extract_content", "extract_actors") | |
| workflow.add_edge("extract_actors", "extract_key_details") | |
| workflow.add_edge("extract_key_details", "generate_summary") | |
| workflow.add_edge("generate_summary", END) | |
| return workflow.compile() | |
| def _should_use_ocr(self, state: PDFAnalyzerState) -> str: | |
| return "ocr" if state.get("needs_ocr", False) else "extract" | |
| async def _detect_pdf_type(self, state: PDFAnalyzerState) -> PDFAnalyzerState: | |
| import os | |
| from pathlib import Path | |
| file_path = state["pdf_path"] | |
| file_ext = Path(file_path).suffix.lower() | |
| # Check if it's an image file | |
| if file_ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp']: | |
| state["needs_ocr"] = True | |
| state["document_type"] = "image" | |
| logger.info(f"πΌοΈ Image file detected: {file_ext}") | |
| elif file_ext == '.pdf': | |
| with open(file_path, 'rb') as f: | |
| reader = pypdf.PdfReader(f) | |
| text = reader.pages[0].extract_text() if reader.pages else "" | |
| state["needs_ocr"] = not text or len(text.strip()) < 50 | |
| state["document_type"] = "pdf" | |
| logger.info(f"π PDF detected, OCR needed: {state['needs_ocr']}") | |
| else: | |
| # Unknown format, try OCR as fallback | |
| state["needs_ocr"] = True | |
| state["document_type"] = "unknown" | |
| logger.warning(f"β οΈ Unknown file format: {file_ext}, will attempt OCR") | |
| state["processing_status"] = "extracting" | |
| return state | |
| async def _ocr_pdf(self, state: PDFAnalyzerState) -> PDFAnalyzerState: | |
| try: | |
| import base64 | |
| from pathlib import Path | |
| file_path = state["pdf_path"] | |
| file_ext = Path(file_path).suffix.lower() | |
| # Determine MIME type | |
| mime_types = { | |
| '.pdf': 'application/pdf', | |
| '.jpg': 'image/jpeg', | |
| '.jpeg': 'image/jpeg', | |
| '.png': 'image/png', | |
| '.bmp': 'image/bmp', | |
| '.tiff': 'image/tiff', | |
| '.webp': 'image/webp' | |
| } | |
| mime_type = mime_types.get(file_ext, 'application/octet-stream') | |
| # Read and encode file | |
| with open(file_path, 'rb') as f: | |
| file_bytes = f.read() | |
| file_b64 = base64.b64encode(file_bytes).decode() | |
| # Construct data URI | |
| data_uri = f"data:{mime_type};base64,{file_b64}" | |
| logger.info(f"π Running OCR on {file_ext} file ({mime_type})") | |
| result = self.mistral_client.ocr.process( | |
| model="mistral-ocr-latest", | |
| document={"type": "document_url", "document_url": data_uri} | |
| ) | |
| text = "\n\n".join([p.markdown for p in result.pages]) | |
| state["extracted_text"] = text | |
| state["ocr_performed"] = True | |
| state["ocr_method"] = "mistral" | |
| logger.info("β OCR complete") | |
| except Exception as e: | |
| logger.error(f"β OCR failed: {e}") | |
| state["processing_status"] = "failed" | |
| state["extracted_text"] = f"Error: OCR processing failed - {str(e)}" | |
| return state | |
| state["processing_status"] = "analyzing" | |
| return state | |
| async def _extract_content(self, state: PDFAnalyzerState) -> PDFAnalyzerState: | |
| """Extract text content from PDF file""" | |
| state["processing_status"] = "extracting" | |
| try: | |
| pdf_path = state["pdf_path"] | |
| logger.info(f"π Extracting content from PDF: {pdf_path}") | |
| extracted_text = "" | |
| with open(pdf_path, 'rb') as file: | |
| reader = pypdf.PdfReader(file) | |
| num_pages = len(reader.pages) | |
| for page_num in range(num_pages): | |
| page = reader.pages[page_num] | |
| extracted_text += page.extract_text() + "\n\n" | |
| state["extracted_text"] = extracted_text | |
| state["processing_status"] = "analyzing" | |
| logger.info(f"β Extracted {num_pages} pages from PDF") | |
| except Exception as e: | |
| logger.error(f"β Error extracting PDF content: {e}") | |
| raise | |
| return state | |
| async def _extract_actors(self, state: PDFAnalyzerState) -> PDFAnalyzerState: | |
| """Extract and categorize actors from the document - first analysis step""" | |
| if not state.get("extracted_text"): | |
| logger.warning("β οΈ No extracted text available for actors extraction") | |
| return state | |
| logger.info("π₯ Extracting actors...") | |
| # Build conversation history with system message and document content | |
| intermediate_steps = state.get("intermediate_steps", []) | |
| # Add system message if not present | |
| if not intermediate_steps: | |
| intermediate_steps.append(SystemMessage(content=SYSTEM_PROMPT)) | |
| intermediate_steps.append(HumanMessage(content=f"Here is the legal document to analyze:\n\n{state['extracted_text']}")) | |
| # Add prompt to extract actors | |
| intermediate_steps.append(HumanMessage(content=EXTRACT_ACTORS_PROMPT)) | |
| response = await self.llm.ainvoke(intermediate_steps) | |
| intermediate_steps.append(response) | |
| state["actors"] = response.content | |
| state["intermediate_steps"] = intermediate_steps | |
| # Log detailed LLM response | |
| logger.info("=" * 80) | |
| logger.info("π€ LLM RESPONSE (extract_actors)") | |
| logger.info("=" * 80) | |
| logger.info(f"π Response length: {len(response.content)} characters") | |
| logger.info(f"π Content preview (first 300 chars):") | |
| logger.info(response.content[:300] + ("..." if len(response.content) > 300 else "")) | |
| logger.info("=" * 80) | |
| logger.info("β Actors extracted") | |
| return state | |
| async def _extract_key_details(self, state: PDFAnalyzerState) -> PDFAnalyzerState: | |
| """Extract key details from the document - second analysis step""" | |
| if not state.get("extracted_text"): | |
| logger.warning("β οΈ No extracted text available for key details extraction") | |
| return state | |
| logger.info("π Extracting key details...") | |
| # Continue the conversation | |
| intermediate_steps = state.get("intermediate_steps", []) | |
| intermediate_steps.append(HumanMessage(content=EXTRACT_KEY_DETAILS_PROMPT)) | |
| response = await self.llm.ainvoke(intermediate_steps) | |
| intermediate_steps.append(response) | |
| state["key_details"] = response.content | |
| state["intermediate_steps"] = intermediate_steps | |
| # Log detailed LLM response | |
| logger.info("=" * 80) | |
| logger.info("π€ LLM RESPONSE (extract_key_details)") | |
| logger.info("=" * 80) | |
| logger.info(f"π Response length: {len(response.content)} characters") | |
| logger.info(f"π Content preview (first 300 chars):") | |
| logger.info(response.content[:300] + ("..." if len(response.content) > 300 else "")) | |
| logger.info("=" * 80) | |
| logger.info("β Key details extracted") | |
| return state | |
| async def _generate_summary(self, state: PDFAnalyzerState) -> PDFAnalyzerState: | |
| """Generate high-level summary of the document - final analysis step""" | |
| if not state.get("extracted_text"): | |
| logger.warning("β οΈ No extracted text available for summary generation") | |
| return state | |
| logger.info("π Generating document summary...") | |
| # Continue the conversation | |
| intermediate_steps = state.get("intermediate_steps", []) | |
| intermediate_steps.append(HumanMessage(content=GENERATE_SUMMARY_PROMPT)) | |
| response = await self.llm.ainvoke(intermediate_steps) | |
| intermediate_steps.append(response) | |
| state["summary"] = response.content | |
| state["intermediate_steps"] = intermediate_steps | |
| state["processing_status"] = "complete" | |
| # Log detailed LLM response | |
| logger.info("=" * 80) | |
| logger.info("π€ LLM RESPONSE (generate_summary)") | |
| logger.info("=" * 80) | |
| logger.info(f"π Response length: {len(response.content)} characters") | |
| logger.info(f"π Content preview (first 300 chars):") | |
| logger.info(response.content[:300] + ("..." if len(response.content) > 300 else "")) | |
| logger.info("=" * 80) | |
| logger.info("β Summary generated") | |
| return state | |
| async def analyze_pdf(self, pdf_path: str) -> dict: | |
| """ | |
| Main method to analyze a PDF document | |
| Args: | |
| pdf_path: Path to the PDF file to analyze | |
| Returns: | |
| Dictionary containing summary, actors, and key details | |
| """ | |
| from pathlib import Path | |
| file_ext = Path(pdf_path).suffix.lower() | |
| initial_state: PDFAnalyzerState = { | |
| "pdf_path": pdf_path, | |
| "pdf_content": None, | |
| "extracted_text": None, | |
| "summary": None, | |
| "actors": None, | |
| "key_details": None, | |
| "document_type": "image" if file_ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'] else None, | |
| "processing_status": "pending", | |
| "intermediate_steps": [], | |
| "needs_ocr": False, | |
| "ocr_performed": False, | |
| "ocr_method": None | |
| } | |
| logger.info(f"π Starting PDF analysis for: {pdf_path}") | |
| final_state = await self.workflow.ainvoke(initial_state) | |
| logger.info(f"β PDF analysis complete. Status: {final_state['processing_status']}") | |
| return { | |
| "summary": final_state.get("summary"), | |
| "actors": final_state.get("actors"), | |
| "key_details": final_state.get("key_details"), | |
| "processing_status": final_state.get("processing_status"), | |
| "ocr_used": final_state.get("ocr_performed", False), | |
| "ocr_method": final_state.get("ocr_method") | |
| } | |