zenith-backend / app /services /metadata_extraction_service.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
import hashlib
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from pydantic import BaseModel
try:
import pymupdf
except ImportError:
pymupdf = None
try:
from PIL import ExifTags, Image
except ImportError:
Image = None
ExifTags = None
logger = logging.getLogger(__name__)
class ForensicFlags(BaseModel):
is_tampered: bool = False
indicators: list[str] = []
risk_score: float = 0.0
class DocumentMetadata(BaseModel):
title: str | None = None
author: str | None = None
created_at: str | None = None
modified_at: str | None = None
software: str | None = None
content_hash: str | None = None
file_size_bytes: int = 0
mime_type: str = "application/octet-stream"
forensic: ForensicFlags | None = None
raw_metadata: dict[str, Any] = {}
class MetadataExtractionService:
"""
Service for extracting and analyzing metadata from files.
Supports PDFs, images, and documents for forensic analysis.
"""
def __init__(self):
self.pdf_support = pymupdf is not None
self.image_support = Image is not None
def extract_metadata(self, file_path: Path) -> DocumentMetadata:
"""
Extract metadata from a file based on its type.
"""
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
file_size = file_path.stat().st_size
mime_type = self._detect_mime_type(file_path)
if mime_type == "application/pdf":
return self._extract_pdf_metadata(file_path, file_size, mime_type)
elif mime_type.startswith("image/"):
return self._extract_image_metadata(file_path, file_size, mime_type)
else:
return self._extract_general_metadata(file_path, file_size, mime_type)
def _detect_mime_type(self, file_path: Path) -> str:
"""
Detect the MIME type of a file.
"""
suffix = file_path.suffix.lower()
mime_map = {
".pdf": "application/pdf",
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".tiff": "image/tiff",
".gif": "image/gif",
".doc": "application/msword",
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
".txt": "text/plain",
}
return mime_map.get(suffix, "application/octet-stream")
def _extract_pdf_metadata(self, file_path: Path, file_size: int, mime_type: str) -> DocumentMetadata:
"""
Extract metadata from PDF files.
"""
if not self.pdf_support:
logger.warning("PyMuPDF not available, skipping PDF metadata extraction")
return self._extract_general_metadata(file_path, file_size, mime_type)
try:
doc = pymupdf.Document(str(file_path))
metadata = doc.metadata
content_hash = self._calculate_file_hash(file_path)
forensic_flags = self._analyze_pdf_tampering(doc, content_hash)
return DocumentMetadata(
title=metadata.get("title"),
author=metadata.get("author"),
created_at=metadata.get("creationDate"),
modified_at=metadata.get("modDate"),
software=metadata.get("producer"),
content_hash=content_hash,
file_size_bytes=file_size,
mime_type=mime_type,
forensic=forensic_flags,
raw_metadata={
"pages": doc.page_count,
"is_encrypted": doc.is_encrypted,
"is_pdf": True,
},
)
except Exception as e:
logger.error(f"PDF metadata extraction failed: {e}")
return self._extract_general_metadata(file_path, file_size, mime_type)
def _extract_image_metadata(self, file_path: Path, file_size: int, mime_type: str) -> DocumentMetadata:
"""
Extract metadata from image files including EXIF data.
"""
if not self.image_support:
logger.warning("PIL not available, skipping image metadata extraction")
return self._extract_general_metadata(file_path, file_size, mime_type)
try:
with Image.open(file_path) as img:
exif_data = {}
# Extract EXIF data
if hasattr(img, "_getexif"):
exif_dict = img._getexif()
if exif_dict:
for tag, value in exif_dict.items():
tag_name = ExifTags.TAGS.get(tag, tag)
exif_data[tag_name] = value
content_hash = self._calculate_file_hash(file_path)
forensic_flags = self._analyze_image_tampering(img, exif_data, content_hash)
return DocumentMetadata(
title=exif_data.get("ImageDescription"),
author=exif_data.get("Artist") or exif_data.get("Software"),
created_at=exif_data.get("DateTimeOriginal"),
modified_at=exif_data.get("DateTime"),
software=exif_data.get("Software"),
content_hash=content_hash,
file_size_bytes=file_size,
mime_type=mime_type,
forensic=forensic_flags,
raw_metadata={
"width": img.width,
"height": img.height,
"format": img.format,
"mode": img.mode,
"exif": exif_data,
},
)
except Exception as e:
logger.error(f"Image metadata extraction failed: {e}")
return self._extract_general_metadata(file_path, file_size, mime_type)
def _extract_general_metadata(self, file_path: Path, file_size: int, mime_type: str) -> DocumentMetadata:
"""
Extract general metadata when specialized extraction is not available.
"""
content_hash = self._calculate_file_hash(file_path)
mod_time = datetime.fromtimestamp(file_path.stat().st_mtime)
return DocumentMetadata(
title=file_path.stem,
author=None,
created_at=None,
modified_at=mod_time.isoformat(),
software=None,
content_hash=content_hash,
file_size_bytes=file_size,
mime_type=mime_type,
forensic=ForensicFlags(is_tampered=False, indicators=["No forensic analysis performed"], risk_score=0.0),
raw_metadata={
"filename": file_path.name,
"created": mod_time.isoformat(),
},
)
def _calculate_file_hash(self, file_path: Path) -> str:
"""
Calculate SHA-256 hash of a file for integrity verification.
"""
hash_sha256 = hashlib.sha256()
chunk_size = 8192
try:
with open(file_path, "rb") as f:
while chunk := f.read(chunk_size):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
except Exception as e:
logger.error(f"Hash calculation failed: {e}")
return ""
def _analyze_pdf_tampering(self, doc: "pymupdf.Document", file_hash: str) -> ForensicFlags:
"""
Analyze PDF for potential tampering indicators.
"""
indicators = []
risk_score = 0.0
is_tampered = False
# Check for encryption (normal for sensitive docs, but requires analysis)
if doc.is_encrypted:
indicators.append("Document is encrypted - password required")
risk_score += 2.0
# Check for unusual metadata
metadata = doc.metadata
if not metadata or not any(metadata.values()):
indicators.append("No metadata found - possibly tampered or stripped")
risk_score += 1.5
is_tampered = True
# Check for suspicious software/modification patterns
producer = metadata.get("producer", "").lower()
if "photoshop" in producer or "gimp" in producer or "ilovepdf" in producer:
indicators.append(f"Suspicious software detected: {producer}")
risk_score += 2.0
is_tampered = True
# Check page count consistency
if doc.page_count < 1:
indicators.append("Invalid page count detected")
risk_score += 1.0
# Final risk assessment
if risk_score > 3.0:
is_tampered = True
return ForensicFlags(is_tampered=is_tampered, indicators=indicators, risk_score=min(risk_score, 10.0))
def _analyze_image_tampering(self, img: "Image.Image", exif_data: dict[str, Any], file_hash: str) -> ForensicFlags:
"""
Analyze image for potential tampering indicators.
"""
indicators = []
risk_score = 0.0
is_tampered = False
# Check for missing EXIF data (could indicate editing)
if not exif_data or not exif_data:
indicators.append("No EXIF data - image may be edited")
risk_score += 1.5
is_tampered = True
else:
# Check for software evidence
software = exif_data.get("Software", "").lower()
if "photoshop" in software or "gimp" in software:
indicators.append(f"Suspicious software detected: {software}")
risk_score += 2.0
is_tampered = True
# Check for timestamp anomalies
original_date = exif_data.get("DateTimeOriginal")
modified_date = exif_data.get("DateTime")
if original_date and modified_date:
if original_date != modified_date:
indicators.append("Image modification date differs from original")
risk_score += 1.0
# Check for unusual camera or equipment
make = exif_data.get("Make", "")
model = exif_data.get("Model", "")
if not make or not model:
indicators.append("Missing camera/equipment information")
risk_score += 0.5
# Check image dimensions for anomalies
width, height = img.size
aspect_ratio = width / height if height > 0 else 0
# Check for unusual aspect ratios
if aspect_ratio < 0.1 or aspect_ratio > 10:
indicators.append(f"Unusual aspect ratio: {aspect_ratio:.2f}")
risk_score += 1.0
# Final risk assessment
if risk_score > 3.0:
is_tampered = True
return ForensicFlags(is_tampered=is_tampered, indicators=indicators, risk_score=min(risk_score, 10.0))
def detect_malware(self, file_path: Path) -> dict[str, Any]:
"""
Scan file for malware (placeholder for ClamAV integration).
"""
# This is a placeholder - in production, integrate with ClamAV
# from pyclamd import ClamdAmmoScanner
# scanner = ClamdAmmoScanner()
# result = scanner.scan_file(str(file_path))
return {
"file_path": str(file_path),
"scanned": False,
"malware_detected": False,
"threats": [],
"message": "Malware scanning not configured",
}
def extract_text_content(self, file_path: Path) -> Optional[str]:
"""
Extract text content from documents for analysis.
"""
if not self.pdf_support:
return None
try:
doc = pymupdf.Document(str(file_path))
text_content = "\n".join(page.get_text() for page in doc)
return text_content
except Exception as e:
logger.error(f"Text extraction failed: {e}")
return None
def batch_extract_metadata(self, file_paths: list[Path]) -> list[DocumentMetadata]:
"""
Extract metadata from multiple files.
"""
results = []
for file_path in file_paths:
try:
metadata = self.extract_metadata(file_path)
results.append(metadata)
except Exception as e:
logger.error(f"Failed to extract metadata from {file_path}: {e}")
continue
return results
metadata_service = MetadataExtractionService()