Spaces:
Paused
Paused
| import hashlib | |
| import logging | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| from pydantic import BaseModel | |
| try: | |
| import pymupdf | |
| except ImportError: | |
| pymupdf = None | |
| try: | |
| from PIL import ExifTags, Image | |
| except ImportError: | |
| Image = None | |
| ExifTags = None | |
| logger = logging.getLogger(__name__) | |
| class ForensicFlags(BaseModel): | |
| is_tampered: bool = False | |
| indicators: list[str] = [] | |
| risk_score: float = 0.0 | |
| class DocumentMetadata(BaseModel): | |
| title: str | None = None | |
| author: str | None = None | |
| created_at: str | None = None | |
| modified_at: str | None = None | |
| software: str | None = None | |
| content_hash: str | None = None | |
| file_size_bytes: int = 0 | |
| mime_type: str = "application/octet-stream" | |
| forensic: ForensicFlags | None = None | |
| raw_metadata: dict[str, Any] = {} | |
| class MetadataExtractionService: | |
| """ | |
| Service for extracting and analyzing metadata from files. | |
| Supports PDFs, images, and documents for forensic analysis. | |
| """ | |
| def __init__(self): | |
| self.pdf_support = pymupdf is not None | |
| self.image_support = Image is not None | |
| def extract_metadata(self, file_path: Path) -> DocumentMetadata: | |
| """ | |
| Extract metadata from a file based on its type. | |
| """ | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"File not found: {file_path}") | |
| file_size = file_path.stat().st_size | |
| mime_type = self._detect_mime_type(file_path) | |
| if mime_type == "application/pdf": | |
| return self._extract_pdf_metadata(file_path, file_size, mime_type) | |
| elif mime_type.startswith("image/"): | |
| return self._extract_image_metadata(file_path, file_size, mime_type) | |
| else: | |
| return self._extract_general_metadata(file_path, file_size, mime_type) | |
| def _detect_mime_type(self, file_path: Path) -> str: | |
| """ | |
| Detect the MIME type of a file. | |
| """ | |
| suffix = file_path.suffix.lower() | |
| mime_map = { | |
| ".pdf": "application/pdf", | |
| ".png": "image/png", | |
| ".jpg": "image/jpeg", | |
| ".jpeg": "image/jpeg", | |
| ".tiff": "image/tiff", | |
| ".gif": "image/gif", | |
| ".doc": "application/msword", | |
| ".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| ".txt": "text/plain", | |
| } | |
| return mime_map.get(suffix, "application/octet-stream") | |
| def _extract_pdf_metadata(self, file_path: Path, file_size: int, mime_type: str) -> DocumentMetadata: | |
| """ | |
| Extract metadata from PDF files. | |
| """ | |
| if not self.pdf_support: | |
| logger.warning("PyMuPDF not available, skipping PDF metadata extraction") | |
| return self._extract_general_metadata(file_path, file_size, mime_type) | |
| try: | |
| doc = pymupdf.Document(str(file_path)) | |
| metadata = doc.metadata | |
| content_hash = self._calculate_file_hash(file_path) | |
| forensic_flags = self._analyze_pdf_tampering(doc, content_hash) | |
| return DocumentMetadata( | |
| title=metadata.get("title"), | |
| author=metadata.get("author"), | |
| created_at=metadata.get("creationDate"), | |
| modified_at=metadata.get("modDate"), | |
| software=metadata.get("producer"), | |
| content_hash=content_hash, | |
| file_size_bytes=file_size, | |
| mime_type=mime_type, | |
| forensic=forensic_flags, | |
| raw_metadata={ | |
| "pages": doc.page_count, | |
| "is_encrypted": doc.is_encrypted, | |
| "is_pdf": True, | |
| }, | |
| ) | |
| except Exception as e: | |
| logger.error(f"PDF metadata extraction failed: {e}") | |
| return self._extract_general_metadata(file_path, file_size, mime_type) | |
| def _extract_image_metadata(self, file_path: Path, file_size: int, mime_type: str) -> DocumentMetadata: | |
| """ | |
| Extract metadata from image files including EXIF data. | |
| """ | |
| if not self.image_support: | |
| logger.warning("PIL not available, skipping image metadata extraction") | |
| return self._extract_general_metadata(file_path, file_size, mime_type) | |
| try: | |
| with Image.open(file_path) as img: | |
| exif_data = {} | |
| # Extract EXIF data | |
| if hasattr(img, "_getexif"): | |
| exif_dict = img._getexif() | |
| if exif_dict: | |
| for tag, value in exif_dict.items(): | |
| tag_name = ExifTags.TAGS.get(tag, tag) | |
| exif_data[tag_name] = value | |
| content_hash = self._calculate_file_hash(file_path) | |
| forensic_flags = self._analyze_image_tampering(img, exif_data, content_hash) | |
| return DocumentMetadata( | |
| title=exif_data.get("ImageDescription"), | |
| author=exif_data.get("Artist") or exif_data.get("Software"), | |
| created_at=exif_data.get("DateTimeOriginal"), | |
| modified_at=exif_data.get("DateTime"), | |
| software=exif_data.get("Software"), | |
| content_hash=content_hash, | |
| file_size_bytes=file_size, | |
| mime_type=mime_type, | |
| forensic=forensic_flags, | |
| raw_metadata={ | |
| "width": img.width, | |
| "height": img.height, | |
| "format": img.format, | |
| "mode": img.mode, | |
| "exif": exif_data, | |
| }, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Image metadata extraction failed: {e}") | |
| return self._extract_general_metadata(file_path, file_size, mime_type) | |
| def _extract_general_metadata(self, file_path: Path, file_size: int, mime_type: str) -> DocumentMetadata: | |
| """ | |
| Extract general metadata when specialized extraction is not available. | |
| """ | |
| content_hash = self._calculate_file_hash(file_path) | |
| mod_time = datetime.fromtimestamp(file_path.stat().st_mtime) | |
| return DocumentMetadata( | |
| title=file_path.stem, | |
| author=None, | |
| created_at=None, | |
| modified_at=mod_time.isoformat(), | |
| software=None, | |
| content_hash=content_hash, | |
| file_size_bytes=file_size, | |
| mime_type=mime_type, | |
| forensic=ForensicFlags(is_tampered=False, indicators=["No forensic analysis performed"], risk_score=0.0), | |
| raw_metadata={ | |
| "filename": file_path.name, | |
| "created": mod_time.isoformat(), | |
| }, | |
| ) | |
| def _calculate_file_hash(self, file_path: Path) -> str: | |
| """ | |
| Calculate SHA-256 hash of a file for integrity verification. | |
| """ | |
| hash_sha256 = hashlib.sha256() | |
| chunk_size = 8192 | |
| try: | |
| with open(file_path, "rb") as f: | |
| while chunk := f.read(chunk_size): | |
| hash_sha256.update(chunk) | |
| return hash_sha256.hexdigest() | |
| except Exception as e: | |
| logger.error(f"Hash calculation failed: {e}") | |
| return "" | |
| def _analyze_pdf_tampering(self, doc: "pymupdf.Document", file_hash: str) -> ForensicFlags: | |
| """ | |
| Analyze PDF for potential tampering indicators. | |
| """ | |
| indicators = [] | |
| risk_score = 0.0 | |
| is_tampered = False | |
| # Check for encryption (normal for sensitive docs, but requires analysis) | |
| if doc.is_encrypted: | |
| indicators.append("Document is encrypted - password required") | |
| risk_score += 2.0 | |
| # Check for unusual metadata | |
| metadata = doc.metadata | |
| if not metadata or not any(metadata.values()): | |
| indicators.append("No metadata found - possibly tampered or stripped") | |
| risk_score += 1.5 | |
| is_tampered = True | |
| # Check for suspicious software/modification patterns | |
| producer = metadata.get("producer", "").lower() | |
| if "photoshop" in producer or "gimp" in producer or "ilovepdf" in producer: | |
| indicators.append(f"Suspicious software detected: {producer}") | |
| risk_score += 2.0 | |
| is_tampered = True | |
| # Check page count consistency | |
| if doc.page_count < 1: | |
| indicators.append("Invalid page count detected") | |
| risk_score += 1.0 | |
| # Final risk assessment | |
| if risk_score > 3.0: | |
| is_tampered = True | |
| return ForensicFlags(is_tampered=is_tampered, indicators=indicators, risk_score=min(risk_score, 10.0)) | |
| def _analyze_image_tampering(self, img: "Image.Image", exif_data: dict[str, Any], file_hash: str) -> ForensicFlags: | |
| """ | |
| Analyze image for potential tampering indicators. | |
| """ | |
| indicators = [] | |
| risk_score = 0.0 | |
| is_tampered = False | |
| # Check for missing EXIF data (could indicate editing) | |
| if not exif_data or not exif_data: | |
| indicators.append("No EXIF data - image may be edited") | |
| risk_score += 1.5 | |
| is_tampered = True | |
| else: | |
| # Check for software evidence | |
| software = exif_data.get("Software", "").lower() | |
| if "photoshop" in software or "gimp" in software: | |
| indicators.append(f"Suspicious software detected: {software}") | |
| risk_score += 2.0 | |
| is_tampered = True | |
| # Check for timestamp anomalies | |
| original_date = exif_data.get("DateTimeOriginal") | |
| modified_date = exif_data.get("DateTime") | |
| if original_date and modified_date: | |
| if original_date != modified_date: | |
| indicators.append("Image modification date differs from original") | |
| risk_score += 1.0 | |
| # Check for unusual camera or equipment | |
| make = exif_data.get("Make", "") | |
| model = exif_data.get("Model", "") | |
| if not make or not model: | |
| indicators.append("Missing camera/equipment information") | |
| risk_score += 0.5 | |
| # Check image dimensions for anomalies | |
| width, height = img.size | |
| aspect_ratio = width / height if height > 0 else 0 | |
| # Check for unusual aspect ratios | |
| if aspect_ratio < 0.1 or aspect_ratio > 10: | |
| indicators.append(f"Unusual aspect ratio: {aspect_ratio:.2f}") | |
| risk_score += 1.0 | |
| # Final risk assessment | |
| if risk_score > 3.0: | |
| is_tampered = True | |
| return ForensicFlags(is_tampered=is_tampered, indicators=indicators, risk_score=min(risk_score, 10.0)) | |
| def detect_malware(self, file_path: Path) -> dict[str, Any]: | |
| """ | |
| Scan file for malware (placeholder for ClamAV integration). | |
| """ | |
| # This is a placeholder - in production, integrate with ClamAV | |
| # from pyclamd import ClamdAmmoScanner | |
| # scanner = ClamdAmmoScanner() | |
| # result = scanner.scan_file(str(file_path)) | |
| return { | |
| "file_path": str(file_path), | |
| "scanned": False, | |
| "malware_detected": False, | |
| "threats": [], | |
| "message": "Malware scanning not configured", | |
| } | |
| def extract_text_content(self, file_path: Path) -> Optional[str]: | |
| """ | |
| Extract text content from documents for analysis. | |
| """ | |
| if not self.pdf_support: | |
| return None | |
| try: | |
| doc = pymupdf.Document(str(file_path)) | |
| text_content = "\n".join(page.get_text() for page in doc) | |
| return text_content | |
| except Exception as e: | |
| logger.error(f"Text extraction failed: {e}") | |
| return None | |
| def batch_extract_metadata(self, file_paths: list[Path]) -> list[DocumentMetadata]: | |
| """ | |
| Extract metadata from multiple files. | |
| """ | |
| results = [] | |
| for file_path in file_paths: | |
| try: | |
| metadata = self.extract_metadata(file_path) | |
| results.append(metadata) | |
| except Exception as e: | |
| logger.error(f"Failed to extract metadata from {file_path}: {e}") | |
| continue | |
| return results | |
| metadata_service = MetadataExtractionService() | |