| """ |
| Meta agent for extracting and comparing document metadata. |
| Handles document properties like title, author, dates, etc. |
| """ |
| from typing import Dict, Any, Optional |
| from pathlib import Path |
| from datetime import datetime |
| import re |
|
|
| from agents.base_agent import BaseAgent |
| from models.document import MetadataExtraction, RawDocument |
|
|
| |
| try: |
| import fitz |
| PYMUPDF_AVAILABLE = True |
| except (ImportError, OSError): |
| PYMUPDF_AVAILABLE = False |
|
|
| try: |
| from pypdf import PdfReader |
| PYPDF_AVAILABLE = True |
| except ImportError: |
| PYPDF_AVAILABLE = False |
|
|
| |
| try: |
| from docx import Document |
| DOCX_AVAILABLE = True |
| except ImportError: |
| DOCX_AVAILABLE = False |
|
|
|
|
| class MetaAgent(BaseAgent): |
| """Agent responsible for extracting document metadata.""" |
|
|
| def __init__(self, config_dict: Dict[str, Any] = None): |
| super().__init__(config_dict) |
|
|
| def get_agent_name(self) -> str: |
| return "MetaAgent" |
|
|
| async def process(self, raw_document: RawDocument) -> MetadataExtraction: |
| """ |
| Process raw document and extract metadata. |
| |
| Args: |
| raw_document: Raw document |
| |
| Returns: |
| MetadataExtraction object with document metadata |
| """ |
| file_path = raw_document.metadata.get("file_path") |
| file_type = raw_document.file_type |
|
|
| if file_type == "pdf": |
| return self._extract_pdf_metadata(file_path, raw_document) |
| elif file_type == "docx": |
| return self._extract_docx_metadata(file_path, raw_document) |
| else: |
| return MetadataExtraction() |
|
|
| def _extract_pdf_metadata( |
| self, |
| file_path: str, |
| raw_document: RawDocument |
| ) -> MetadataExtraction: |
| """ |
| Extract metadata from PDF file. |
| |
| Args: |
| file_path: Path to PDF file |
| raw_document: Raw document |
| |
| Returns: |
| MetadataExtraction object |
| """ |
| metadata = MetadataExtraction() |
|
|
| if not file_path: |
| return metadata |
|
|
| |
| if PYMUPDF_AVAILABLE: |
| try: |
| with fitz.open(file_path) as pdf_doc: |
| pdf_metadata = pdf_doc.metadata |
|
|
| if pdf_metadata: |
| metadata.title = pdf_metadata.get("title") |
| metadata.author = pdf_metadata.get("author") |
| metadata.subject = pdf_metadata.get("subject") |
| metadata.creator = pdf_metadata.get("creator") |
| metadata.producer = pdf_metadata.get("producer") |
|
|
| |
| keywords_str = pdf_metadata.get("keywords", "") |
| if keywords_str: |
| metadata.keywords = [k.strip() for k in keywords_str.split(",")] |
|
|
| |
| creation_date = pdf_metadata.get("creationDate") |
| if creation_date: |
| metadata.creation_date = self._parse_pdf_date(creation_date) |
|
|
| mod_date = pdf_metadata.get("modDate") |
| if mod_date: |
| metadata.modification_date = self._parse_pdf_date(mod_date) |
|
|
| metadata.page_count = pdf_doc.page_count |
|
|
| except Exception as e: |
| print(f"Error extracting PDF metadata with PyMuPDF: {e}") |
|
|
| |
| elif PYPDF_AVAILABLE: |
| try: |
| reader = PdfReader(file_path) |
| pdf_metadata = reader.metadata |
|
|
| if pdf_metadata: |
| metadata.title = pdf_metadata.get("/Title") |
| metadata.author = pdf_metadata.get("/Author") |
| metadata.subject = pdf_metadata.get("/Subject") |
| metadata.creator = pdf_metadata.get("/Creator") |
| metadata.producer = pdf_metadata.get("/Producer") |
|
|
| |
| keywords_str = pdf_metadata.get("/Keywords", "") |
| if keywords_str: |
| metadata.keywords = [k.strip() for k in keywords_str.split(",")] |
|
|
| |
| creation_date = pdf_metadata.get("/CreationDate") |
| if creation_date: |
| metadata.creation_date = self._parse_pdf_date(creation_date) |
|
|
| mod_date = pdf_metadata.get("/ModDate") |
| if mod_date: |
| metadata.modification_date = self._parse_pdf_date(mod_date) |
|
|
| metadata.page_count = len(reader.pages) |
|
|
| except Exception as e: |
| print(f"Error extracting PDF metadata with pypdf: {e}") |
|
|
| |
| if not metadata.title: |
| metadata.title = self._extract_title_from_text(raw_document.raw_text) |
|
|
| return metadata |
|
|
| def _extract_docx_metadata( |
| self, |
| file_path: str, |
| raw_document: RawDocument |
| ) -> MetadataExtraction: |
| """ |
| Extract metadata from DOCX file. |
| |
| Args: |
| file_path: Path to DOCX file |
| raw_document: Raw document |
| |
| Returns: |
| MetadataExtraction object |
| """ |
| metadata = MetadataExtraction() |
|
|
| if not file_path or not DOCX_AVAILABLE: |
| return metadata |
|
|
| try: |
| doc = Document(file_path) |
| core_props = doc.core_properties |
|
|
| metadata.title = core_props.title |
| metadata.author = core_props.author |
| metadata.subject = core_props.subject |
| metadata.creator = core_props.last_modified_by |
|
|
| |
| if core_props.keywords: |
| metadata.keywords = [k.strip() for k in core_props.keywords.split(",")] |
|
|
| |
| if core_props.created: |
| metadata.creation_date = core_props.created.isoformat() |
|
|
| if core_props.modified: |
| metadata.modification_date = core_props.modified.isoformat() |
|
|
| metadata.page_count = 1 |
|
|
| except Exception as e: |
| print(f"Error extracting DOCX metadata: {e}") |
|
|
| |
| if not metadata.title: |
| metadata.title = self._extract_title_from_text(raw_document.raw_text) |
|
|
| return metadata |
|
|
| def _parse_pdf_date(self, date_str: str) -> Optional[str]: |
| """ |
| Parse PDF date format to ISO format. |
| |
| Args: |
| date_str: PDF date string (e.g., "D:20230101120000") |
| |
| Returns: |
| ISO format date string or None |
| """ |
| if not date_str: |
| return None |
|
|
| try: |
| |
| match = re.match(r"D:(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})", date_str) |
| if match: |
| year, month, day, hour, minute, second = match.groups() |
| dt = datetime( |
| int(year), int(month), int(day), |
| int(hour), int(minute), int(second) |
| ) |
| return dt.isoformat() |
| except Exception as e: |
| print(f"Error parsing PDF date: {e}") |
|
|
| return date_str |
|
|
| def _extract_title_from_text(self, text: str) -> Optional[str]: |
| """ |
| Extract likely title from document text. |
| |
| Args: |
| text: Document text |
| |
| Returns: |
| Extracted title or None |
| """ |
| if not text: |
| return None |
|
|
| |
| lines = [l.strip() for l in text.split('\n') if l.strip()] |
|
|
| if not lines: |
| return None |
|
|
| |
| first_line = lines[0] |
|
|
| |
| if first_line.isupper() and len(first_line) < 100: |
| return first_line |
|
|
| |
| if len(first_line) < 100 and not first_line.lower().startswith(('the ', 'a ', 'an ')): |
| return first_line |
|
|
| return None |
|
|
| def compute_metadata_similarity( |
| self, |
| meta1: MetadataExtraction, |
| meta2: MetadataExtraction |
| ) -> float: |
| """ |
| Compute similarity between two metadata objects. |
| |
| Args: |
| meta1: First metadata |
| meta2: Second metadata |
| |
| Returns: |
| Similarity score (0.0 to 1.0) |
| """ |
| scores = [] |
|
|
| |
| if meta1.title and meta2.title: |
| title_sim = self._string_similarity(meta1.title, meta2.title) |
| scores.append(title_sim * 2) |
|
|
| |
| if meta1.author and meta2.author: |
| author_sim = self._string_similarity(meta1.author, meta2.author) |
| scores.append(author_sim * 1.5) |
|
|
| |
| if meta1.subject and meta2.subject: |
| subject_sim = self._string_similarity(meta1.subject, meta2.subject) |
| scores.append(subject_sim) |
|
|
| |
| if meta1.keywords and meta2.keywords: |
| keywords_sim = self._list_similarity(meta1.keywords, meta2.keywords) |
| scores.append(keywords_sim) |
|
|
| |
| if meta1.page_count and meta2.page_count: |
| page_ratio = min(meta1.page_count, meta2.page_count) / \ |
| max(meta1.page_count, meta2.page_count) |
| scores.append(page_ratio) |
|
|
| |
| if scores: |
| total_weight = len(scores) |
| return sum(scores) / total_weight |
| else: |
| return 0.0 |
|
|
| def _string_similarity(self, s1: str, s2: str) -> float: |
| """ |
| Compute similarity between two strings. |
| |
| Args: |
| s1: First string |
| s2: Second string |
| |
| Returns: |
| Similarity score (0.0 to 1.0) |
| """ |
| if not s1 or not s2: |
| return 0.0 |
|
|
| |
| s1 = s1.lower().strip() |
| s2 = s2.lower().strip() |
|
|
| |
| if s1 == s2: |
| return 1.0 |
|
|
| |
| words1 = set(s1.split()) |
| words2 = set(s2.split()) |
|
|
| intersection = words1 & words2 |
| union = words1 | words2 |
|
|
| if not union: |
| return 0.0 |
|
|
| return len(intersection) / len(union) |
|
|
| def _list_similarity(self, list1: list, list2: list) -> float: |
| """ |
| Compute Jaccard similarity between two lists. |
| |
| Args: |
| list1: First list |
| list2: Second list |
| |
| Returns: |
| Similarity score (0.0 to 1.0) |
| """ |
| if not list1 or not list2: |
| return 0.0 |
|
|
| set1 = set(item.lower() if isinstance(item, str) else item for item in list1) |
| set2 = set(item.lower() if isinstance(item, str) else item for item in list2) |
|
|
| intersection = set1 & set2 |
| union = set1 | set2 |
|
|
| if not union: |
| return 0.0 |
|
|
| return len(intersection) / len(union) |
|
|