# main_redaction_processor.py # Required packages: pip install requests Pillow import os import requests from PIL import Image, ImageDraw import io import base64 import json # --- Configuration --- # API endpoints should remain constant INVOKE_URL_OCR = "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-ocr-v1" INVOKE_URL_PARSER = "https://integrate.api.nvidia.com/v1/chat/completions" # Define a max pixel count for the parser model to avoid sending overly large images. MAX_PIXELS_FOR_PARSER = 1024 * 1024 # 1 Megapixel # --- Internal Helper Functions --- def _get_average_color_from_regions(image: Image.Image, regions: list[tuple]): """Calculates the average RGB color from a list of regions in an image.""" total_r, total_g, total_b = 0, 0, 0 pixel_count = 0 img_width, img_height = image.size if image.mode == 'RGBA': image = image.convert('RGB') pixels = image.load() for region in regions: x1, y1, x2, y2 = [max(0, int(c)) for c in region] x2 = min(img_width, x2); y2 = min(img_height, y2) for x in range(x1, x2): for y in range(y1, y2): r, g, b = pixels[x, y] total_r += r; total_g += g; total_b += b pixel_count += 1 if pixel_count == 0: return (0, 0, 0) return (total_r // pixel_count, total_g // pixel_count, total_b // pixel_count) def _detect_pictures_with_parser(image_to_process: Image.Image, api_key: str): """Sends an image to the NemoRetriever Parser model to detect 'Picture' elements.""" headers = {"Authorization": f"Bearer {api_key}", "Accept": "application/json"} buffered = io.BytesIO() image_to_process.save(buffered, format="PNG") b64_str = base64.b64encode(buffered.getvalue()).decode("ascii") content = f'' tool_name = "markdown_bbox" payload = { "model": "nvidia/nemoretriever-parse", "messages": [{"role": "user", "content": content}], "tools": [{"type": "function", "function": {"name": tool_name}}], "tool_choice": {"type": "function", "function": {"name": tool_name}}, "max_tokens": 2048, } response = requests.post(INVOKE_URL_PARSER, headers=headers, json=payload, timeout=120) response.raise_for_status() response_json = response.json() picture_bboxes = [] tool_calls = response_json.get('choices', [{}])[0].get('message', {}).get('tool_calls', []) if tool_calls: arguments_str = tool_calls[0].get('function', {}).get('arguments', '[]') parsed_arguments = json.loads(arguments_str) if parsed_arguments and isinstance(parsed_arguments, list): for element in parsed_arguments[0]: if element.get("type") == "Picture" and element.get("bbox"): picture_bboxes.append(element["bbox"]) return picture_bboxes def _redact_text_in_image(input_image: Image.Image, api_key: str): """Sends a (cropped) image to the OCR model and returns a redacted version.""" headers = {"Authorization": f"Bearer {api_key}", "Accept": "application/json"} buffered = io.BytesIO() input_image.save(buffered, format="PNG") image_b64 = base64.b64encode(buffered.getvalue()).decode() payload = {"input": [{"type": "image_url", "url": f"data:image/png;base64,{image_b64}"}]} try: response = requests.post(INVOKE_URL_OCR, headers=headers, json=payload, timeout=60) response.raise_for_status() response_json = response.json() except requests.exceptions.RequestException: return input_image image_with_redactions = input_image.copy() draw = ImageDraw.Draw(image_with_redactions) img_width, img_height = image_with_redactions.size radius = max(1, int(((img_width**2 + img_height**2)**0.5) / 100)) try: detections = response_json['data'][0]['text_detections'] for detection in detections: bbox = detection.get("bounding_box") if bbox and bbox.get("points"): points = bbox["points"] p1 = (points[0]['x'] * img_width, points[0]['y'] * img_height) p3 = (points[2]['x'] * img_width, points[2]['y'] * img_height) sample_regions = [(p1[0], p1[1] - radius, p3[0], p1[1]), (p1[0], p3[1], p3[0], p3[1] + radius), (p1[0] - radius, p1[1], p1[0], p3[1]), (p3[0], p1[1], p3[0] + radius, p3[1])] redaction_color = _get_average_color_from_regions(image_with_redactions, sample_regions) draw.rectangle([p1, p3], fill=redaction_color) return image_with_redactions except (KeyError, IndexError, TypeError): return input_image # --- Main Public Function --- def redact_pictures_in_image(image_source: str, api_key: str, callback: callable = None) -> Image.Image: """ Analyzes an image to find pictures, then redacts text within those pictures. Args: image_source (str): The source of the image. Can be a local file path or a base64 encoded string. api_key (str): Your NVIDIA API key. callback (callable, optional): A function to call with progress updates. Defaults to None. The function should accept a single string argument. Returns: Image.Image: A PIL Image object with the text inside pictures redacted. """ def _progress(message: str): if callback: callback(message) _progress("Step 1: Loading image...") try: if os.path.exists(image_source): input_image = Image.open(image_source).convert("RGB") else: image_bytes = base64.b64decode(image_source) input_image = Image.open(io.BytesIO(image_bytes)).convert("RGB") except Exception as e: raise ValueError(f"Invalid image_source: not a valid file path or base64 string. Error: {e}") # --- Resize if necessary for analysis --- image_to_analyze = input_image original_width, original_height = input_image.size if (original_width * original_height) > MAX_PIXELS_FOR_PARSER: _progress(f"Image is large, resizing for initial analysis...") scale = (MAX_PIXELS_FOR_PARSER / (original_width * original_height))**0.5 new_dims = (int(original_width * scale), int(original_height * scale)) image_to_analyze = input_image.resize(new_dims, Image.Resampling.LANCZOS) # --- Detect Pictures --- _progress("Step 2: Detecting 'Picture' elements...") try: picture_bboxes = _detect_pictures_with_parser(image_to_analyze, api_key) except requests.exceptions.RequestException as e: _progress(f"API Error during picture detection: {e}") raise # Re-raise the exception after reporting progress if not picture_bboxes: _progress("No 'Picture' elements were found. Returning original image.") return input_image _progress(f"Step 3: Found {len(picture_bboxes)} 'Picture' element(s). Redacting text...") final_image = input_image.copy() # --- Crop, Redact, and Paste --- for i, box in enumerate(picture_bboxes): _progress(f" - Processing picture {i + 1} of {len(picture_bboxes)}...") x1 = int(box["xmin"] * original_width) y1 = int(box["ymin"] * original_height) x2 = int(box["xmax"] * original_width) y2 = int(box["ymax"] * original_height) # Crop from the original, high-resolution image cropped_element = input_image.crop((x1, y1, x2, y2)) redacted_crop = _redact_text_in_image(cropped_element, api_key) # Paste the redacted, high-resolution crop back final_image.paste(redacted_crop, (x1, y1)) _progress("Step 4: Redaction process complete.") return final_image # --- Example Usage --- if __name__ == "__main__": # Define a simple callback function to print progress to the console. def print_progress(message: str): print(f"[PROGRESS] {message}") # 1. Get API Key from environment variable my_api_key = os.getenv("NVIDIA_API_KEY") if not my_api_key: print("ERROR: Please set the NVIDIA_API_KEY environment variable.") else: # 2. Define the path to your input image # (replace with your actual image file) input_image_path = "yolox1.png" # Make sure this image exists if not os.path.exists(input_image_path): print(f"ERROR: Input image not found at '{input_image_path}'") else: print("--- Running Redaction on Image Path ---") try: # 3. Call the main function with the image path and callback redacted_image = redact_pictures_in_image( image_source=input_image_path, api_key=my_api_key, callback=print_progress ) # 4. Save the result output_path = "redacted_output.png" redacted_image.save(output_path) print(f"\nSuccessfully saved redacted image to '{output_path}'") except Exception as e: print(f"\nAn error occurred: {e}")