PII-Image-Masking-mpc-server / pii_image_processing.py
Sor0ush's picture
Improve prompt
51f60c6 verified
## Image Handler
import base64
import requests
from io import BytesIO
from PIL import Image
class ImageHandler:
@staticmethod
def load_image_from_local(path: str) -> Image.Image:
try:
image = Image.open(path)
image.load()
return image
except Exception as e:
raise IOError(f"Error loading local image: {e}")
@staticmethod
def load_image_from_web(url: str) -> Image.Image:
try:
response = requests.get(url)
response.raise_for_status()
image = Image.open(BytesIO(response.content))
image.load()
return image
except Exception as e:
raise IOError(f"Error loading web image: {e}")
@staticmethod
def load_image_from_base64(base64_str: str) -> Image.Image:
try:
image_data = base64.b64decode(base64_str)
image = Image.open(BytesIO(image_data))
image.load()
return image
except Exception as e:
raise IOError(f"Error loading base64 image: {e}")
@staticmethod
def save_image(image: Image.Image, path: str) -> None:
try:
image.save(path)
except Exception as e:
raise IOError(f"Error saving image: {e}")
@staticmethod
def load_image(path: str) -> Image.Image:
if path.startswith('http://') or path.startswith('https://'):
return ImageHandler.load_image_from_web(path)
elif path.startswith('data:image/') and ';base64,' in path:
base64_str = path.split(';base64,')[1]
return ImageHandler.load_image_from_base64(base64_str)
else:
return ImageHandler.load_image_from_local(path)
## Area Covering
import random
import copy
from PIL import ImageFilter, ImageDraw
class CoverStrategy:
def cover(self, image, coordinates):
raise NotImplementedError("Cover method must be implemented by subclasses")
class BlurStrategy(CoverStrategy):
def __init__(self, blur_amount=5):
self.blur_amount = blur_amount
def cover(self, image, coordinates):
x1, y1 = int(coordinates.get('x1', 0)), int(coordinates.get('y1', 0))
x2, y2 = int(coordinates.get('x2', 0)), int(coordinates.get('y2', 0))
# Extract the region to blur
region = image.crop((x1, y1, x2, y2))
blurred_region = region.filter(ImageFilter.GaussianBlur(radius=self.blur_amount))
# Paste back the blurred region
image.paste(blurred_region, (x1, y1))
return image
class SingleColorStrategy(CoverStrategy):
def __init__(self, color=(0, 0, 0)):
self.color = color
def cover(self, image, coordinates):
x1, y1 = int(coordinates.get('x1', 0)), int(coordinates.get('y1', 0))
x2, y2 = int(coordinates.get('x2', 0)), int(coordinates.get('y2', 0))
draw = ImageDraw.Draw(image)
draw.rectangle([x1, y1, x2, y2], fill=self.color)
return image
class CoordinateBlurrer:
def __init__(self, strategy: CoverStrategy):
self.strategy = strategy
def blur_coordinates(self, data, blur_amount=5):
blurred_data = []
for item in data:
blurred_item = copy.deepcopy(item)
coords = blurred_item.get('coordinates', {})
blurred_coords = {}
for key, value in coords.items():
if isinstance(value, (int, float)):
blurred_coords[key] = value + random.uniform(-blur_amount, blur_amount)
else:
blurred_coords[key] = value
blurred_item['coordinates'] = blurred_coords
blurred_data.append(blurred_item)
return blurred_data
def cover_areas(self, image, data):
for item in data:
coords = item.get('coordinates', {})
image = self.strategy.cover(image, coords)
return image
# PII Extractor
from dotenv import load_dotenv
load_dotenv()
import base64
import os
from abc import ABC, abstractmethod
from typing import List, Optional, Union, Dict, Any
from pydantic import BaseModel
class Coordinates(BaseModel):
x1: int
y1: int
x2: int
y2: int
class PIIItem(BaseModel):
name: str
coordinates: Coordinates
confidence: float
severity: str
type: str
probable_regulations: List[str]
class PIIResponse(BaseModel):
piis: List[PIIItem]
containing_text: str
class BaseVisionExtractor(ABC):
"""Abstract base class for vision-based PII extractors"""
def __init__(self, api_key: Optional[str] = None, model: str = None):
self.api_key = api_key
self.model = model
self._client = None
@abstractmethod
def _initialize_client(self):
"""Initialize the specific client (Mistral, OpenAI, etc.)"""
pass
@abstractmethod
def _create_messages(self, image_input: str, prompt: str) -> List[Dict[str, Any]]:
"""Create messages in the format expected by the specific API"""
pass
@abstractmethod
def _make_request(self, messages: List[Dict[str, Any]]) -> Any:
"""Make the actual API request"""
pass
@staticmethod
def encode_image_to_base64(image_path: str) -> Optional[str]:
"""Encode a local image file to base64 string"""
try:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except FileNotFoundError:
print(f"Error: The file {image_path} was not found.")
return None
except Exception as e:
print(f"Error encoding image: {e}")
return None
@staticmethod
def is_url(input_string: str) -> bool:
"""Check if the input is a URL"""
return input_string.startswith(('http://', 'https://'))
@staticmethod
def is_base64(input_string: str) -> bool:
"""Check if the input is already base64 encoded"""
return input_string.startswith('data:image/')
def prepare_image_input(self, image_input: str) -> str:
"""
Prepare image input - handles URL, base64, or local file path
Args:
image_input: Can be:
- URL (http://... or https://...)
- Base64 encoded string (data:image/...)
- Local file path
Returns:
Properly formatted image input for API
"""
if self.is_url(image_input):
return image_input
elif self.is_base64(image_input):
return image_input
else:
# Assume it's a local file path
base64_image = self.encode_image_to_base64(image_input)
if base64_image:
# Detect image format from file extension
file_ext = image_input.lower().split('.')[-1]
if file_ext in ['jpg', 'jpeg']:
mime_type = 'image/jpeg'
elif file_ext == 'png':
mime_type = 'image/png'
elif file_ext == 'webp':
mime_type = 'image/webp'
elif file_ext == 'gif':
mime_type = 'image/gif'
else:
mime_type = 'image/jpeg' # Default fallback
return f"data:{mime_type};base64,{base64_image}"
else:
raise ValueError(f"Could not process image input: {image_input}")
def extract_pii(self, image_input: str, custom_prompt: Optional[str] = None) -> Any:
"""Extract PII from image"""
if not self._client:
self._initialize_client()
prepared_image = self.prepare_image_input(image_input)
prompt = custom_prompt or self.get_default_prompt()
messages = self._create_messages(prepared_image, prompt)
return self._make_request(messages)
def get_default_prompt(self) -> str:
"""Get the default PII extraction prompt"""
return """
Extract all the PII in the image and the corresponding coordinates (x1, y1, x2, y2) in pixels.
CRITICAL REQUIREMENTS:
1. Coordinates must form the minimal rectangle that FULLY CONTAINS ALL VISIBLE PARTS of the text
2. Expand boxes to include:
- Character descenders (g,j,p,q,y)
- Accent marks (ç, ñ, ü)
- Text margins (add 2-5px buffer around text edges)
3. Ensure x2 > x1 and y2 > y1 (strictly)
4. Verify coordinates match the image dimensions (e.g., 0 ≤ x < width, 0 ≤ y < height)
5. Handle multi-line text as single rectangle covering all lines
Output JSON format:
{
"containing_text": "",
"piis": [
{
"name": "[PII type]",
"coordinates": [[x1,y1],[x2,y2]],
"x1": number,
"y1": number,
"x2": number,
"y2": number,
"confidence": 0-1,
"severity": "low|medium|high",
"type": "[PII category]",
"probable_regulations": [ GDPR, HIPAA, CCPA, PECR, LGPD, PDPA ]
}
]
}
---- Additional information ----
REGULATIONS = {
"GDPR": "General Data Protection Regulation (EU)",
"CCPA": "California Consumer Privacy Act",
"PIPEDA": "Personal Information Protection and Electronic Documents Act (Canada)",
"LGPD": "Lei Geral de Proteção de Dados (Brazil)",
"PDPA": "Personal Data Protection Act (Singapore)",
"PECR": "Privacy and Electronic Communications Regulations (UK)",
"HIPAA": "Health Insurance Portability and Accountability Act (USA)",
}
Critical Checks Before Output:
- Compare box width/height to text size: Box should be wider than text length × avg. character width
- Ensure multi-line text has vertical coverage from topmost to bottommost pixel
- Reject any box where (x2-x1) < 5 or (y2-y1) < 5 pixels
For coordinates:
- (x1, y1) = Top-left corner
- (x2, y2) = Bottom-right corner
- x2 MUST be > x1 and y2 MUST be > y1
- Verify box fully encloses text width/height including diacritics.
"""
class MistralPIIExtractor(BaseVisionExtractor):
"""Mistral-specific implementation"""
def __init__(self, api_key: Optional[str] = None, model: str = 'pixtral-large-latest'):
super().__init__(api_key or os.environ.get('MISTRAL_API_KEY'), model)
def _initialize_client(self):
"""Initialize Mistral client"""
from mistralai import Mistral
self._client = Mistral(api_key=self.api_key)
def _create_messages(self, image_input: str, prompt: str) -> List[Dict[str, Any]]:
"""Create messages in Mistral format"""
return [
{
"role": "user",
"content": [
{
"type": "text",
"text": prompt
},
{
"type": "image_url",
"image_url": image_input
}
]
}
]
def _make_request(self, messages: List[Dict[str, Any]]) -> str:
"""Make request to Mistral API"""
chat_response = self._client.chat.parse(
model=self.model,
messages=messages,
response_format=PIIResponse,
temperature=0
)
return chat_response.choices[0].message.content
class OpenAIPIIExtractor(BaseVisionExtractor):
"""OpenAI-specific implementation (example of extensibility)"""
def __init__(self, api_key: Optional[str] = None, model: str = 'gpt-4-vision-preview'):
super().__init__(api_key or os.environ.get('OPENAI_API_KEY'), model)
def _initialize_client(self):
"""Initialize OpenAI client"""
from openai import OpenAI
self._client = OpenAI(api_key=self.api_key)
def _create_messages(self, image_input: str, prompt: str) -> List[Dict[str, Any]]:
"""Create messages in OpenAI format"""
return [
{
"role": "user",
"content": [
{
"type": "text",
"text": prompt
},
{
"type": "image_url",
"image_url": {
"url": image_input
}
}
]
}
]
def _make_request(self, messages: List[Dict[str, Any]]) -> str:
"""Make request to OpenAI API"""
response = self._client.chat.completions.create(
model=self.model,
messages=messages,
max_tokens=1000
)
return response.choices[0].message.content
# Factory for easy model switching
class PIIExtractorFactory:
"""Factory to create different PII extractors"""
@staticmethod
def create_extractor(provider: str, **kwargs) -> BaseVisionExtractor:
"""
Create a PII extractor for the specified provider
Args:
provider: 'mistral', 'openai', etc.
**kwargs: Additional arguments passed to the extractor
"""
if provider.lower() == 'mistral':
return MistralPIIExtractor(**kwargs)
elif provider.lower() == 'openai':
return OpenAIPIIExtractor(**kwargs)
else:
raise ValueError(f"Unsupported provider: {provider}")
# Image Processing Facade
import json
class ImageProcessingService:
@staticmethod
def process_image(image):
extracotr = MistralPIIExtractor()
try:
data_str = extracotr.extract_pii(image)
print(f'DEBUG - Extracted PII: {data_str}')
data = json.loads(data_str)
piis = data['piis']
containing_text = data['containing_text']
return piis, containing_text
except Exception as e:
print({"error": f"Failed to extract PII: {e}"})
raise e
class MockImageProcessingService:
@staticmethod
def process_image(image):
# Mock processing that would typically use OCR or computer vision
return [
{
"name": "Trattoria Il Gabbiano",
"coordinates": {"x1": 50, "y1": 20, "x2": 280, "y2": 40},
"confidence": 0.99,
"severity": "low",
"type": "business_name"
},
{
"name": "Tarta sas di Fontana Stefania & c.",
"coordinates": {"x1": 90, "y1": 40, "x2": 320, "y2": 55},
"confidence": 0.98,
"severity": "medium",
"type": "business_name"
}
], "the containing text mocked"
class ImageProcessingFacade:
def __init__(self):
self.image_handler = ImageHandler()
def process(self, image_path, strategy_name='blur', blur_amount=5, color=(0, 0, 0), output_path=None):
try:
image = self.image_handler.load_image(image_path)
except Exception as e:
return {"error": f"Failed to load image: {e}"}
# Select covering strategy
if strategy_name == 'blur':
strategy = BlurStrategy(blur_amount)
elif strategy_name == 'single_color':
strategy = SingleColorStrategy(color)
else:
return {"error": f"Unknown strategy: {strategy_name}"}
# Process image with mock service
try:
piis, containing_text = ImageProcessingService.process_image(image_path)
except Exception as e:
return {"error": f"Failed to process image: {e}"}
# Apply coordinate blurring and area covering
try:
blurrer = CoordinateBlurrer(strategy)
blurred_data = blurrer.blur_coordinates(piis, blur_amount)
processed_image = blurrer.cover_areas(image.copy(), blurred_data)
# Save processed image if output path provided
if output_path:
self.image_handler.save_image(processed_image, output_path)
return {
"data": blurred_data,
"processed_image": processed_image,
"success": True
}
except Exception as e:
return {"error": f"Failed to process coordinates: {e}"}
def process_image_api(image_path,
strategy_name='blur',
blur_amount=5,
color=(0, 0, 0),
output_path=None,
provider='mistral',
model=None,
regulation_map=None):
"""
API function to process images with coordinate blurring and area covering.
Args:
image_path (str): Path to image (local, web URL, or base64)
strategy_name (str): Default covering strategy when regulation_map is not provided ('blur' or 'single_color')
blur_amount (int): Amount of blur for coordinates and blur strategy
color (tuple): RGB color for single_color strategy
output_path (str, optional): Path to save processed image
provider (str): PII extractor provider ('mistral' or 'openai')
model (str, optional): Model name for the PII extractor
regulation_map (dict, optional): Mapping of regulation names to strategy names or None
Returns:
dict: Processing results with data and success status
"""
# Load image
try:
print(f"DEBUG - Loading image from: {image_path}")
image = ImageHandler.load_image(image_path)
except Exception as e:
return {"error": f"Failed to load image: {e}"}
# Create PII extractor
try:
extractor_kwargs = {}
if model is not None:
extractor_kwargs["model"] = model
extractor = PIIExtractorFactory.create_extractor(provider, **extractor_kwargs)
except Exception as e:
return {"error": f"Failed to create PII extractor: {e}"}
# Extract PII
try:
data_str = extractor.extract_pii(image_path)
data = json.loads(data_str)
piis = data.get("piis", [])
except Exception as e:
return {"error": f"Failed to extract PII: {e}"}
processed_data = []
processed_image = image.copy()
# Apply covering
try:
if regulation_map is not None:
for item in piis:
regs = item.get("probable_regulations", [])
strategy_for_item = None
for reg in regs:
if reg in regulation_map:
strategy_for_item = regulation_map[reg]
break
if strategy_for_item is None:
processed_data.append(item)
continue
if strategy_for_item == "blur":
strategy = BlurStrategy(blur_amount)
elif strategy_for_item == "single_color":
strategy = SingleColorStrategy(color)
else:
return {"error": f"Unknown strategy for regulation {reg}: {strategy_for_item}"}
blurrer = CoordinateBlurrer(strategy)
blurred_item = blurrer.blur_coordinates([item], blur_amount)[0]
processed_image = blurrer.cover_areas(processed_image, [blurred_item])
processed_data.append(blurred_item)
else:
if strategy_name == "blur":
strategy = BlurStrategy(blur_amount)
elif strategy_name == "single_color":
strategy = SingleColorStrategy(color)
else:
return {"error": f"Unknown strategy: {strategy_name}"}
blurrer = CoordinateBlurrer(strategy)
processed_data = blurrer.blur_coordinates(piis, blur_amount)
processed_image = blurrer.cover_areas(image.copy(), processed_data)
except Exception as e:
return {"error": f"Failed to apply covering: {e}"}
# Save processed image if provided
if output_path:
try:
ImageHandler.save_image(processed_image, output_path)
except Exception as e:
return {"error": f"Failed to save processed image: {e}"}
return {"data": processed_data, "processed_image": processed_image, "success": True}
from enum import Enum
class CoverStrategy(Enum):
BLUR = "blur"
SINGLE_COLOR = "single_color"
class MistralModels(Enum):
# https://docs.mistral.ai/getting-started/models/models_overview/
'''
mistral-large-latest: currently points to mistral-large-2411.
pixtral-large-latest: currently points to pixtral-large-2411.
mistral-medium-latest: currently points to mistral-medium-2505.
mistral-moderation-latest: currently points to mistral-moderation-2411.
ministral-3b-latest: currently points to ministral-3b-2410.
ministral-8b-latest: currently points to ministral-8b-2410.
open-mistral-nemo: currently points to open-mistral-nemo-2407.
mistral-small-latest: currently points to mistral-small-2503.
devstral-small-latest: currently points to devstral-small-2505
mistral-saba-latest: currently points to mistral-saba-2502.
codestral-latest: currently points to codestral-2501.
mistral-ocr-latest: currently points to mistral-ocr-2505.
'''
PIXTRAL_LARGE_LATEST = 'pixtral-large-latest'
MISTRAL_OCR_LATEST = 'mistral-ocr-latest'
# MISTRAL_SABA_2502 = 'mistral-saba-2502'
MISTRAL_MEDIUM_2505 = 'mistral-medium-2505'
if __name__ == "__main__":
myhome = os.environ.get('HOME')
image = os.path.join(myhome, "/Pictures/tmp/lo-scontrino-fiscale.jpg")
result = ImageProcessingService.process_image(image)
print(result)
# Process with blur strategy
result = process_image_api(
image_path=image,
strategy_name="blur",
blur_amount=3,
output_path="tmp/processed_image.jpg"
)
print("Result1")
print(result)
# Process with single color covering
result2 = process_image_api(
image_path="https://www.servizicontabiliefiscaliviterbo.it/wordpress/wp-content/uploads/2016/03/lo-scontrino-fiscale.jpg",
strategy_name="single_color",
color=(255, 0, 0), # Red
blur_amount=2
)
print("Result2")
print(result2)