|
|
from flask import Flask, request, jsonify |
|
|
import cv2 |
|
|
import mediapipe as mp |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import math |
|
|
import io |
|
|
import base64 |
|
|
import requests |
|
|
import os |
|
|
import threading |
|
|
import time |
|
|
from datetime import datetime |
|
|
import traceback |
|
|
|
|
|
|
|
|
try: |
|
|
from transformers import pipeline |
|
|
TRANSFORMERS_AVAILABLE = True |
|
|
print("[AI] ✓ Transformers available") |
|
|
except ImportError: |
|
|
TRANSFORMERS_AVAILABLE = False |
|
|
print("[AI] ✗ Transformers not available") |
|
|
|
|
|
|
|
|
FACE_PARSER = None |
|
|
FACE_PARSING_AVAILABLE = False |
|
|
|
|
|
FACE_PARSING_LABELS = { |
|
|
0: 'background', |
|
|
1: 'skin', |
|
|
2: 'nose', |
|
|
3: 'eye_g', |
|
|
4: 'l_eye', |
|
|
5: 'r_eye', |
|
|
6: 'l_brow', |
|
|
7: 'r_brow', |
|
|
8: 'l_ear', |
|
|
9: 'r_ear', |
|
|
10: 'mouth', |
|
|
11: 'u_lip', |
|
|
12: 'l_lip', |
|
|
13: 'hair', |
|
|
14: 'hat', |
|
|
15: 'ear_r', |
|
|
16: 'neck_l', |
|
|
17: 'neck', |
|
|
18: 'cloth' |
|
|
} |
|
|
|
|
|
LABEL_COLORS = { |
|
|
0: (0, 0, 0), |
|
|
1: (204, 0, 0), |
|
|
2: (76, 153, 0), |
|
|
3: (255, 0, 0), |
|
|
4: (51, 51, 255), |
|
|
5: (0, 255, 255), |
|
|
6: (255, 255, 0), |
|
|
7: (204, 102, 0), |
|
|
8: (153, 0, 76), |
|
|
9: (255, 102, 153), |
|
|
10: (102, 255, 153), |
|
|
11: (255, 0, 255), |
|
|
12: (204, 0, 153), |
|
|
13: (0, 204, 204), |
|
|
14: (0, 255, 0), |
|
|
15: (255, 204, 0), |
|
|
16: (204, 0, 204), |
|
|
17: (255, 153, 51), |
|
|
18: (102, 102, 156) |
|
|
} |
|
|
|
|
|
def init_face_parser(): |
|
|
|
|
|
global FACE_PARSER, FACE_PARSING_AVAILABLE |
|
|
if FACE_PARSING_AVAILABLE: |
|
|
return True |
|
|
|
|
|
try: |
|
|
print("[FaceParsing] Loading face-parsing model...") |
|
|
|
|
|
|
|
|
from transformers import AutoImageProcessor, AutoModelForSemanticSegmentation |
|
|
|
|
|
model_name = "jonathandinu/face-parsing" |
|
|
print(f"[FaceParsing] Loading from {model_name}...") |
|
|
|
|
|
processor = AutoImageProcessor.from_pretrained(model_name) |
|
|
model = AutoModelForSemanticSegmentation.from_pretrained(model_name) |
|
|
|
|
|
FACE_PARSER = { |
|
|
'processor': processor, |
|
|
'model': model |
|
|
} |
|
|
|
|
|
FACE_PARSING_AVAILABLE = True |
|
|
print("[FaceParsing] ✓ Model loaded successfully!") |
|
|
return True |
|
|
|
|
|
except Exception as e1: |
|
|
print(f"[FaceParsing] Method 1 failed: {e1}") |
|
|
|
|
|
|
|
|
try: |
|
|
print("[FaceParsing] Trying ONNX version...") |
|
|
import onnxruntime as ort |
|
|
|
|
|
|
|
|
model_path = "face_parsing.onnx" |
|
|
|
|
|
|
|
|
if os.path.exists(model_path): |
|
|
session = ort.InferenceSession(model_path) |
|
|
FACE_PARSER = {'session': session, 'type': 'onnx'} |
|
|
FACE_PARSING_AVAILABLE = True |
|
|
print("[FaceParsing] ✓ ONNX model loaded!") |
|
|
return True |
|
|
else: |
|
|
print("[FaceParsing] ONNX file not found") |
|
|
|
|
|
except Exception as e2: |
|
|
print(f"[FaceParsing] Method 2 failed: {e2}") |
|
|
|
|
|
print("[FaceParsing] ⚠ Will use CV2 fallback methods") |
|
|
return False |
|
|
|
|
|
def predict_face_parsing(image_pil): |
|
|
|
|
|
global FACE_PARSER |
|
|
|
|
|
if not FACE_PARSING_AVAILABLE or FACE_PARSER is None: |
|
|
return None |
|
|
|
|
|
try: |
|
|
import torch |
|
|
|
|
|
|
|
|
if 'processor' in FACE_PARSER: |
|
|
processor = FACE_PARSER['processor'] |
|
|
model = FACE_PARSER['model'] |
|
|
|
|
|
|
|
|
inputs = processor(images=image_pil, return_tensors="pt") |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model(**inputs) |
|
|
logits = outputs.logits |
|
|
|
|
|
|
|
|
h, w = image_pil.size[1], image_pil.size[0] |
|
|
|
|
|
|
|
|
upsampled_logits = torch.nn.functional.interpolate( |
|
|
logits, |
|
|
size=(h, w), |
|
|
mode="bilinear", |
|
|
align_corners=False |
|
|
) |
|
|
|
|
|
|
|
|
parsing_mask = upsampled_logits.argmax(dim=1)[0].cpu().numpy() |
|
|
|
|
|
return parsing_mask.astype(np.uint8) |
|
|
|
|
|
|
|
|
elif 'session' in FACE_PARSER: |
|
|
session = FACE_PARSER['session'] |
|
|
|
|
|
|
|
|
img_array = np.array(image_pil.resize((512, 512))) |
|
|
img_array = img_array.transpose(2, 0, 1) |
|
|
img_array = img_array.astype(np.float32) / 255.0 |
|
|
img_array = np.expand_dims(img_array, 0) |
|
|
|
|
|
|
|
|
outputs = session.run(None, {'input': img_array}) |
|
|
parsing_mask = outputs[0].argmax(axis=1)[0] |
|
|
|
|
|
|
|
|
h, w = image_pil.size[1], image_pil.size[0] |
|
|
parsing_mask = cv2.resize(parsing_mask.astype(np.uint8), (w, h), |
|
|
interpolation=cv2.INTER_NEAREST) |
|
|
|
|
|
return parsing_mask |
|
|
|
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[FaceParsing] Prediction error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return None |
|
|
|
|
|
def predict_face_parsing_xenova(image_pil): |
|
|
|
|
|
global FACE_PARSER |
|
|
if not FACE_PARSING_AVAILABLE or FACE_PARSER is None: |
|
|
return None |
|
|
try: |
|
|
output = FACE_PARSER(image_pil) |
|
|
h, w = image_pil.size[1], image_pil.size[0] |
|
|
parsing_mask = np.zeros((h, w), dtype=np.uint8) |
|
|
|
|
|
for item in output: |
|
|
label_name = item['label'] |
|
|
mask_pil = item['mask'] |
|
|
mask_np = np.array(mask_pil) |
|
|
|
|
|
if len(mask_np.shape) == 3: |
|
|
mask_np = cv2.cvtColor(mask_np, cv2.COLOR_RGB2GRAY) |
|
|
if mask_np.shape != (h, w): |
|
|
mask_np = cv2.resize(mask_np, (w, h), interpolation=cv2.INTER_NEAREST) |
|
|
|
|
|
label_id = XENOVA_LABELS.get(label_name, 0) |
|
|
parsing_mask[mask_np > 127] = label_id |
|
|
|
|
|
return parsing_mask |
|
|
except Exception as e: |
|
|
print(f"[FaceParsing] Error: {e}") |
|
|
return None |
|
|
|
|
|
def create_colored_mask(parsing_mask): |
|
|
|
|
|
h, w = parsing_mask.shape |
|
|
colored = np.zeros((h, w, 3), dtype=np.uint8) |
|
|
for label_id, color in LABEL_COLORS.items(): |
|
|
colored[parsing_mask == label_id] = color |
|
|
return colored |
|
|
|
|
|
def create_transparent_overlay(original_pil, parsing_mask, alpha=0.5): |
|
|
|
|
|
h, w = parsing_mask.shape |
|
|
original_resized = original_pil.resize((w, h)) |
|
|
original_np = np.array(original_resized) |
|
|
colored = create_colored_mask(parsing_mask) |
|
|
return (original_np * (1 - alpha) + colored * alpha).astype(np.uint8) |
|
|
|
|
|
def add_legend_to_image(image, active_labels): |
|
|
|
|
|
img = image.copy() |
|
|
h, w = img.shape[:2] |
|
|
|
|
|
|
|
|
important_labels = { |
|
|
1: {'name': 'Skin', 'color': LABEL_COLORS[1]}, |
|
|
2: {'name': 'Nose', 'color': LABEL_COLORS[2]}, |
|
|
3: {'name': 'Eyeglasses', 'color': LABEL_COLORS[3]}, |
|
|
4: {'name': 'L Eye', 'color': LABEL_COLORS[4]}, |
|
|
5: {'name': 'R Eye', 'color': LABEL_COLORS[5]}, |
|
|
6: {'name': 'L Brow', 'color': LABEL_COLORS[6]}, |
|
|
7: {'name': 'R Brow', 'color': LABEL_COLORS[7]}, |
|
|
8: {'name': 'L Ear', 'color': LABEL_COLORS[8]}, |
|
|
9: {'name': 'R Ear', 'color': LABEL_COLORS[9]}, |
|
|
10: {'name': 'Mouth', 'color': LABEL_COLORS[10]}, |
|
|
11: {'name': 'U Lip', 'color': LABEL_COLORS[11]}, |
|
|
12: {'name': 'L Lip', 'color': LABEL_COLORS[12]}, |
|
|
13: {'name': 'Hair', 'color': LABEL_COLORS[13]}, |
|
|
14: {'name': 'Hat', 'color': LABEL_COLORS[14]}, |
|
|
15: {'name': 'Earring', 'color': LABEL_COLORS[15]}, |
|
|
16: {'name': 'Necklace', 'color': LABEL_COLORS[16]}, |
|
|
17: {'name': 'Neck', 'color': LABEL_COLORS[17]}, |
|
|
18: {'name': 'Cloth', 'color': LABEL_COLORS[18]} |
|
|
} |
|
|
|
|
|
|
|
|
legend_items = [] |
|
|
for label_id in active_labels: |
|
|
if label_id in important_labels and label_id != 0: |
|
|
legend_items.append({ |
|
|
'id': label_id, |
|
|
'name': important_labels[label_id]['name'], |
|
|
'color': important_labels[label_id]['color'] |
|
|
}) |
|
|
|
|
|
if not legend_items: |
|
|
return img |
|
|
|
|
|
|
|
|
priority_order = [3, 14, 15, 16, 13, 4, 5, 2, 10, 18, 1, 17, 8, 9, 6, 7, 11, 12] |
|
|
legend_items.sort(key=lambda x: priority_order.index(x['id']) if x['id'] in priority_order else 999) |
|
|
|
|
|
|
|
|
base_size = min(w, h) |
|
|
|
|
|
if base_size < 400: |
|
|
font_scale = 0.25 |
|
|
thickness = 1 |
|
|
line_height = 15 |
|
|
box_size = 10 |
|
|
padding = 4 |
|
|
title_scale = 0.25 |
|
|
title_line_height = 12 |
|
|
title_bottom_margin = 10 |
|
|
elif base_size < 600: |
|
|
font_scale = 0.3 |
|
|
thickness = 1 |
|
|
line_height = 18 |
|
|
box_size = 12 |
|
|
padding = 5 |
|
|
title_scale = 0.3 |
|
|
title_line_height = 14 |
|
|
title_bottom_margin = 12 |
|
|
elif base_size < 800: |
|
|
font_scale = 0.4 |
|
|
thickness = 1 |
|
|
line_height = 20 |
|
|
box_size = 14 |
|
|
padding = 6 |
|
|
title_scale = 0.4 |
|
|
title_line_height = 16 |
|
|
title_bottom_margin = 14 |
|
|
else: |
|
|
font_scale = 0.5 |
|
|
thickness = 1 |
|
|
line_height = 22 |
|
|
box_size = 16 |
|
|
padding = 7 |
|
|
title_scale = 0.5 |
|
|
title_line_height = 18 |
|
|
title_bottom_margin = 16 |
|
|
|
|
|
|
|
|
num_items = len(legend_items) |
|
|
mid_point = (num_items + 1) // 2 |
|
|
|
|
|
left_column_items = legend_items[:mid_point] |
|
|
right_column_items = legend_items[mid_point:] |
|
|
|
|
|
|
|
|
def draw_legend_column(items, side='A'): |
|
|
""" |
|
|
side: 'A' or 'B' |
|
|
""" |
|
|
if not items: |
|
|
return |
|
|
|
|
|
|
|
|
max_text_width = 0 |
|
|
for item in items: |
|
|
(text_w, text_h), _ = cv2.getTextSize(item['name'], cv2.FONT_HERSHEY_SIMPLEX, |
|
|
font_scale, thickness) |
|
|
max_text_width = max(max_text_width, text_w) |
|
|
|
|
|
|
|
|
title_line1 = "Detected" |
|
|
title_line2 = f"({side}):" |
|
|
|
|
|
(title1_w, title1_h), _ = cv2.getTextSize(title_line1, cv2.FONT_HERSHEY_SIMPLEX, |
|
|
title_scale, thickness ) |
|
|
(title2_w, title2_h), _ = cv2.getTextSize(title_line2, cv2.FONT_HERSHEY_SIMPLEX, |
|
|
title_scale, thickness ) |
|
|
|
|
|
max_title_width = max(title1_w, title2_w) |
|
|
|
|
|
|
|
|
column_w = max(box_size + padding * 3 + max_text_width, max_title_width + padding * 2) |
|
|
column_w = min(column_w, int(w * 0.25)) |
|
|
|
|
|
|
|
|
column_h = (padding * 2 + |
|
|
title_line_height * 2 + |
|
|
title_bottom_margin + |
|
|
len(items) * line_height + |
|
|
padding) |
|
|
|
|
|
|
|
|
if side == 'A': |
|
|
|
|
|
column_x = padding * 2 |
|
|
else: |
|
|
|
|
|
column_x = w - column_w - padding * 2 |
|
|
|
|
|
column_y = padding * 2 |
|
|
|
|
|
|
|
|
if column_x + column_w > w: |
|
|
column_x = w - column_w - padding |
|
|
if column_y + column_h > h: |
|
|
column_y = h - column_h - padding |
|
|
|
|
|
|
|
|
overlay = img.copy() |
|
|
cv2.rectangle(overlay, |
|
|
(column_x, column_y), |
|
|
(column_x + column_w, column_y + column_h), |
|
|
(0, 0, 0), -1) |
|
|
cv2.addWeighted(overlay, 0.75, img, 0.25, 0, img) |
|
|
|
|
|
|
|
|
cv2.rectangle(img, |
|
|
(column_x, column_y), |
|
|
(column_x + column_w, column_y + column_h), |
|
|
(255, 255, 255), 1) |
|
|
|
|
|
|
|
|
title_start_y = column_y + padding + title_line_height |
|
|
|
|
|
|
|
|
title1_x = column_x + (column_w - title1_w) // 2 |
|
|
cv2.putText(img, title_line1, |
|
|
(title1_x, title_start_y), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, title_scale, (255, 255, 255), thickness ) |
|
|
|
|
|
|
|
|
title2_x = column_x + (column_w - title2_w) // 2 |
|
|
cv2.putText(img, title_line2, |
|
|
(title2_x, title_start_y + title_line_height), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, title_scale, (255, 255, 255), thickness ) |
|
|
|
|
|
|
|
|
start_y = title_start_y + title_line_height + title_bottom_margin |
|
|
|
|
|
for idx, item in enumerate(items): |
|
|
y_pos = start_y + idx * line_height |
|
|
|
|
|
|
|
|
if y_pos + line_height > column_y + column_h - padding: |
|
|
break |
|
|
|
|
|
|
|
|
box_y = y_pos - box_size // 2 |
|
|
cv2.rectangle(img, |
|
|
(column_x + padding, box_y), |
|
|
(column_x + padding + box_size, box_y + box_size), |
|
|
item['color'], -1) |
|
|
|
|
|
|
|
|
cv2.rectangle(img, |
|
|
(column_x + padding, box_y), |
|
|
(column_x + padding + box_size, box_y + box_size), |
|
|
(255, 255, 255), 1) |
|
|
|
|
|
|
|
|
text_x = column_x + padding * 2 + box_size |
|
|
cv2.putText(img, item['name'], |
|
|
(text_x, y_pos), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (255, 255, 255), thickness) |
|
|
|
|
|
|
|
|
draw_legend_column(left_column_items, side='A') |
|
|
|
|
|
|
|
|
draw_legend_column(right_column_items, side='B') |
|
|
|
|
|
return img |
|
|
|
|
|
class PassportPhotoProcessor: |
|
|
def __init__(self): |
|
|
self.mp_face_mesh = mp.solutions.face_mesh |
|
|
self.mp_pose = mp.solutions.pose |
|
|
self.face_mesh = self.mp_face_mesh.FaceMesh( |
|
|
static_image_mode=True, |
|
|
max_num_faces=1, |
|
|
refine_landmarks=True, |
|
|
min_detection_confidence=0.5 |
|
|
) |
|
|
self.pose = self.mp_pose.Pose( |
|
|
static_image_mode=True, |
|
|
model_complexity=2, |
|
|
min_detection_confidence=0.5 |
|
|
) |
|
|
|
|
|
def detect_landmarks(self, image): |
|
|
|
|
|
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
|
face_results = self.face_mesh.process(rgb_image) |
|
|
pose_results = self.pose.process(rgb_image) |
|
|
|
|
|
if not face_results.multi_face_landmarks: |
|
|
raise ValueError("No face detected in the image") |
|
|
|
|
|
return face_results.multi_face_landmarks[0], pose_results |
|
|
|
|
|
def get_eye_centers(self, landmarks, img_width, img_height): |
|
|
|
|
|
left_eye_indices = [33, 133, 160, 159, 158, 157, 173] |
|
|
right_eye_indices = [362, 263, 387, 386, 385, 384, 398] |
|
|
|
|
|
left_eye_x = np.mean([landmarks.landmark[i].x for i in left_eye_indices]) * img_width |
|
|
left_eye_y = np.mean([landmarks.landmark[i].y for i in left_eye_indices]) * img_height |
|
|
|
|
|
right_eye_x = np.mean([landmarks.landmark[i].x for i in right_eye_indices]) * img_width |
|
|
right_eye_y = np.mean([landmarks.landmark[i].y for i in right_eye_indices]) * img_height |
|
|
|
|
|
return (left_eye_x, left_eye_y), (right_eye_x, right_eye_y) |
|
|
|
|
|
def get_nose_tip(self, landmarks, img_width, img_height): |
|
|
|
|
|
nose_tip = landmarks.landmark[4] |
|
|
return nose_tip.x * img_width, nose_tip.y * img_height |
|
|
|
|
|
def get_chin(self, landmarks, img_width, img_height): |
|
|
|
|
|
chin = landmarks.landmark[152] |
|
|
return chin.x * img_width, chin.y * img_height |
|
|
|
|
|
def get_forehead_top(self, landmarks, img_width, img_height): |
|
|
|
|
|
forehead_indices = [10, 338, 297, 332, 284, 251, 389, 356, 454, 323, 361, 288, |
|
|
397, 365, 379, 378, 400, 377, 152, 148, 176, 149, 150, 136, |
|
|
172, 58, 132, 93, 234, 127, 162, 21, 54, 103, 67, 109] |
|
|
|
|
|
min_y = min([landmarks.landmark[i].y for i in forehead_indices]) * img_height |
|
|
avg_x = np.mean([landmarks.landmark[i].x for i in forehead_indices]) * img_width |
|
|
|
|
|
chin_y = landmarks.landmark[152].y * img_height |
|
|
eye_indices = [33, 133, 362, 263] |
|
|
eye_y = np.mean([landmarks.landmark[i].y for i in eye_indices]) * img_height |
|
|
face_height = chin_y - eye_y |
|
|
|
|
|
hair_extension = face_height * 0.65 |
|
|
estimated_hair_top = max(0, min_y - hair_extension) |
|
|
|
|
|
return avg_x, estimated_hair_top |
|
|
|
|
|
def get_shoulders(self, pose_results, img_width, img_height): |
|
|
|
|
|
if not pose_results.pose_landmarks: |
|
|
return None |
|
|
|
|
|
left_shoulder = pose_results.pose_landmarks.landmark[11] |
|
|
right_shoulder = pose_results.pose_landmarks.landmark[12] |
|
|
|
|
|
return { |
|
|
'left': (left_shoulder.x * img_width, left_shoulder.y * img_height), |
|
|
'right': (right_shoulder.x * img_width, right_shoulder.y * img_height) |
|
|
} |
|
|
|
|
|
def calculate_rotation_angle(self, left_eye, right_eye): |
|
|
|
|
|
dx = right_eye[0] - left_eye[0] |
|
|
dy = right_eye[1] - left_eye[1] |
|
|
angle = math.degrees(math.atan2(dy, dx)) |
|
|
return angle |
|
|
|
|
|
def rotate_image(self, image, angle, center): |
|
|
|
|
|
h, w = image.shape[:2] |
|
|
matrix = cv2.getRotationMatrix2D(center, angle, 1.0) |
|
|
rotated = cv2.warpAffine(image, matrix, (w, h), |
|
|
flags=cv2.INTER_CUBIC, |
|
|
borderMode=cv2.BORDER_REPLICATE) |
|
|
return rotated, matrix |
|
|
|
|
|
def rotate_point(self, point, matrix): |
|
|
|
|
|
px, py = point |
|
|
new_x = matrix[0,0]*px + matrix[0,1]*py + matrix[0,2] |
|
|
new_y = matrix[1,0]*px + matrix[1,1]*py + matrix[1,2] |
|
|
return (new_x, new_y) |
|
|
|
|
|
def calculate_crop_box(self, image, eye_line_y, chin_y, top_hair_y, |
|
|
nose_x, shoulders, body_bottom_y): |
|
|
|
|
|
h, w = image.shape[:2] |
|
|
|
|
|
EYE_TO_BOTTOM_MIN = 0.56 |
|
|
EYE_TO_BOTTOM_MAX = 0.69 |
|
|
FACE_HEIGHT_MIN = 0.50 |
|
|
FACE_HEIGHT_MAX = 0.69 |
|
|
|
|
|
face_height = chin_y - top_hair_y |
|
|
|
|
|
best_crop = None |
|
|
best_score = float('inf') |
|
|
|
|
|
|
|
|
eye_to_bottom = body_bottom_y - eye_line_y |
|
|
min_crop_from_eye = eye_to_bottom / EYE_TO_BOTTOM_MAX |
|
|
max_crop_from_eye = eye_to_bottom / EYE_TO_BOTTOM_MIN |
|
|
|
|
|
|
|
|
min_crop_from_face = face_height / FACE_HEIGHT_MAX |
|
|
max_crop_from_face = face_height / FACE_HEIGHT_MIN |
|
|
|
|
|
|
|
|
min_crop_size = max(min_crop_from_face, min_crop_from_eye, 600) |
|
|
max_crop_size = min(max_crop_from_face, max_crop_from_eye, h, w) |
|
|
|
|
|
print(f"[CropBox] Eye to bottom: {eye_to_bottom:.0f}px") |
|
|
print(f"[CropBox] Face height: {face_height:.0f}px") |
|
|
print(f"[CropBox] Min from eye: {min_crop_from_eye:.0f}, Max: {max_crop_from_eye:.0f}") |
|
|
print(f"[CropBox] Min from face: {min_crop_from_face:.0f}, Max: {max_crop_from_face:.0f}") |
|
|
print(f"[CropBox] Final range: {min_crop_size:.0f} - {max_crop_size:.0f}") |
|
|
|
|
|
|
|
|
if max_crop_size < 600: |
|
|
raise ValueError("Image too small to create passport photo") |
|
|
|
|
|
|
|
|
if min_crop_size > max_crop_size: |
|
|
print(f"[CropBox] Constraint conflict detected. Using flexible approach...") |
|
|
|
|
|
target_size = (min_crop_from_eye + max_crop_from_eye) / 2 |
|
|
target_size = max(600, min(1200, target_size, h, w)) |
|
|
|
|
|
min_crop_size = max(600, target_size * 0.85) |
|
|
max_crop_size = min(1200, target_size * 1.15, h, w) |
|
|
|
|
|
print(f"[CropBox] Adjusted range: {min_crop_size:.0f} - {max_crop_size:.0f}") |
|
|
|
|
|
|
|
|
if max_crop_size < 600: |
|
|
raise ValueError("Image too small to create passport photo") |
|
|
|
|
|
if min_crop_size > max_crop_size: |
|
|
min_crop_size = max_crop_size = target_size |
|
|
print(f"[CropBox] Using fixed size: {target_size:.0f}") |
|
|
|
|
|
|
|
|
search_steps = max(50, int((max_crop_size - min_crop_size) / 10)) |
|
|
|
|
|
for size in np.linspace(max_crop_size, min_crop_size, search_steps): |
|
|
size = int(size) |
|
|
|
|
|
target_eye_ratio = (EYE_TO_BOTTOM_MIN + EYE_TO_BOTTOM_MAX) / 2 |
|
|
top = eye_line_y - (size * (1 - target_eye_ratio)) |
|
|
|
|
|
if top > top_hair_y - (size * 0.05): |
|
|
top = top_hair_y - (size * 0.05) |
|
|
|
|
|
if top + size < chin_y + (size * 0.05): |
|
|
top = chin_y + (size * 0.05) - size |
|
|
|
|
|
left = nose_x - size / 2 |
|
|
right = left + size |
|
|
bottom = top + size |
|
|
|
|
|
if left < 0: |
|
|
left = 0 |
|
|
right = size |
|
|
if right > w: |
|
|
right = w |
|
|
left = w - size |
|
|
if top < 0: |
|
|
top = 0 |
|
|
bottom = size |
|
|
if bottom > h: |
|
|
bottom = h |
|
|
top = h - size |
|
|
|
|
|
if shoulders: |
|
|
shoulder_width = abs(shoulders['right'][0] - shoulders['left'][0]) |
|
|
if shoulder_width > size * 0.95: |
|
|
continue |
|
|
|
|
|
shoulder_left = min(shoulders['left'][0], shoulders['right'][0]) |
|
|
shoulder_right = max(shoulders['left'][0], shoulders['right'][0]) |
|
|
|
|
|
if shoulder_left < left + (size * 0.025) or shoulder_right > right - (size * 0.025): |
|
|
shoulder_center = (shoulder_left + shoulder_right) / 2 |
|
|
left = shoulder_center - size / 2 |
|
|
right = left + size |
|
|
|
|
|
if left < 0 or right > w: |
|
|
continue |
|
|
|
|
|
eye_to_bottom_ratio = (bottom - eye_line_y) / size |
|
|
face_height_ratio = (chin_y - top_hair_y) / size |
|
|
|
|
|
eye_ok = EYE_TO_BOTTOM_MIN <= eye_to_bottom_ratio <= EYE_TO_BOTTOM_MAX |
|
|
face_ok = FACE_HEIGHT_MIN <= face_height_ratio <= FACE_HEIGHT_MAX |
|
|
|
|
|
score = 0 |
|
|
|
|
|
eye_deviation = abs(eye_to_bottom_ratio - target_eye_ratio) |
|
|
if not eye_ok: |
|
|
score += eye_deviation * 800 |
|
|
else: |
|
|
score += eye_deviation * 100 |
|
|
|
|
|
target_face_ratio = (FACE_HEIGHT_MIN + FACE_HEIGHT_MAX) / 2 |
|
|
face_deviation = abs(face_height_ratio - target_face_ratio) |
|
|
if not face_ok: |
|
|
score += face_deviation * 400 |
|
|
else: |
|
|
score += face_deviation * 50 |
|
|
|
|
|
score += (1200 - size) * 0.5 |
|
|
|
|
|
if score < best_score: |
|
|
best_score = score |
|
|
best_crop = { |
|
|
'left': int(left), |
|
|
'top': int(top), |
|
|
'right': int(right), |
|
|
'bottom': int(bottom), |
|
|
'size': size, |
|
|
'eye_to_bottom_ratio': eye_to_bottom_ratio * 100, |
|
|
'face_height_ratio': face_height_ratio * 100, |
|
|
'score': score |
|
|
} |
|
|
|
|
|
if eye_ok and (face_ok or face_deviation < 0.05): |
|
|
print(f"[CropBox] Found good solution at size {size:.0f}") |
|
|
break |
|
|
|
|
|
if not best_crop: |
|
|
raise ValueError("Could not create suitable crop. Please ensure photo shows full head and shoulders.") |
|
|
|
|
|
print(f"[CropBox] Best crop: {best_crop['size']:.0f}px, " + |
|
|
f"eye={best_crop['eye_to_bottom_ratio']:.1f}%, " + |
|
|
f"face={best_crop['face_height_ratio']:.1f}%, " + |
|
|
f"score={best_crop['score']:.1f}") |
|
|
|
|
|
return best_crop |
|
|
|
|
|
def compress_to_size(self, image, max_kb=240): |
|
|
|
|
|
h, w = image.shape[:2] |
|
|
|
|
|
if h < 600 or w < 600: |
|
|
raise ValueError("Image size too small (minimum 600x600px)") |
|
|
|
|
|
|
|
|
if h > 1200 or w > 1200: |
|
|
print(f"[Compress] Original size: {w}x{h}, resizing to max 1200px") |
|
|
scale = 1200 / max(h, w) |
|
|
new_w = int(w * scale) |
|
|
new_h = int(h * scale) |
|
|
image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA) |
|
|
h, w = new_h, new_w |
|
|
print(f"[Compress] Resized to: {w}x{h}") |
|
|
|
|
|
|
|
|
original_image = image.copy() |
|
|
original_h, original_w = h, w |
|
|
|
|
|
pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) |
|
|
buffer = io.BytesIO() |
|
|
pil_image.save(buffer, format='JPEG', quality=100, optimize=True) |
|
|
size_kb = buffer.tell() / 1024 |
|
|
|
|
|
print(f"[Compress] Initial size at quality 100: {size_kb:.2f} KB") |
|
|
|
|
|
if size_kb <= max_kb: |
|
|
print(f"[Compress] ✓ Already under {max_kb}KB") |
|
|
return buffer.getvalue(), size_kb, 100, (w, h) |
|
|
|
|
|
|
|
|
current_size = w |
|
|
best_quality = 100 |
|
|
|
|
|
print(f"[Compress] Starting FAST reduction (step: 100px)") |
|
|
while current_size > 600 and size_kb > max_kb * 1.5: |
|
|
current_size -= 100 |
|
|
current_size = max(600, current_size) |
|
|
|
|
|
new_size = (current_size, current_size) |
|
|
resized = cv2.resize(original_image, new_size, interpolation=cv2.INTER_AREA) |
|
|
|
|
|
pil_image = Image.fromarray(cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)) |
|
|
buffer = io.BytesIO() |
|
|
pil_image.save(buffer, format='JPEG', quality=best_quality, optimize=True) |
|
|
size_kb = buffer.tell() / 1024 |
|
|
|
|
|
print(f"[Compress] FAST - Size {current_size}px, quality {best_quality}: {size_kb:.2f} KB") |
|
|
|
|
|
if size_kb <= max_kb: |
|
|
print(f"[Compress] ✓ Target reached at {current_size}px") |
|
|
return buffer.getvalue(), size_kb, best_quality, (current_size, current_size) |
|
|
|
|
|
|
|
|
print(f"[Compress] Starting SLOW reduction (step: 10px)") |
|
|
while current_size > 600 and size_kb > max_kb: |
|
|
current_size -= 10 |
|
|
current_size = max(600, current_size) |
|
|
|
|
|
new_size = (current_size, current_size) |
|
|
resized = cv2.resize(original_image, new_size, interpolation=cv2.INTER_AREA) |
|
|
|
|
|
pil_image = Image.fromarray(cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)) |
|
|
buffer = io.BytesIO() |
|
|
pil_image.save(buffer, format='JPEG', quality=best_quality, optimize=True) |
|
|
size_kb = buffer.tell() / 1024 |
|
|
|
|
|
print(f"[Compress] SLOW - Size {current_size}px, quality {best_quality}: {size_kb:.2f} KB") |
|
|
|
|
|
if size_kb <= max_kb: |
|
|
print(f"[Compress] ✓ Target reached at {current_size}px") |
|
|
return buffer.getvalue(), size_kb, best_quality, (current_size, current_size) |
|
|
|
|
|
|
|
|
print(f"[Compress] Starting QUALITY reduction (step: 1)") |
|
|
current_quality = 100 |
|
|
best_size = current_size |
|
|
|
|
|
while current_quality >= 50 and size_kb > max_kb: |
|
|
|
|
|
new_size = (current_size, current_size) |
|
|
resized = cv2.resize(original_image, new_size, interpolation=cv2.INTER_AREA) |
|
|
pil_image = Image.fromarray(cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)) |
|
|
|
|
|
buffer = io.BytesIO() |
|
|
pil_image.save(buffer, format='JPEG', quality=current_quality, optimize=True) |
|
|
size_kb = buffer.tell() / 1024 |
|
|
|
|
|
print(f"[Compress] QUALITY - Size {current_size}px, quality {current_quality}: {size_kb:.2f} KB") |
|
|
|
|
|
if size_kb <= max_kb: |
|
|
best_quality = current_quality |
|
|
best_size = current_size |
|
|
print(f"[Compress] ✓ Found acceptable quality {best_quality} at size {best_size}") |
|
|
break |
|
|
|
|
|
current_quality -= 1 |
|
|
|
|
|
|
|
|
if size_kb <= max_kb: |
|
|
print(f"[Compress] Starting SIZE OPTIMIZATION (step: +5px)") |
|
|
|
|
|
optimized_size = best_size |
|
|
optimized_buffer = buffer |
|
|
|
|
|
|
|
|
while optimized_size < original_w and optimized_size < 1200: |
|
|
test_size = optimized_size + 5 |
|
|
|
|
|
new_size = (test_size, test_size) |
|
|
resized = cv2.resize(original_image, new_size, interpolation=cv2.INTER_AREA) |
|
|
pil_image = Image.fromarray(cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)) |
|
|
|
|
|
test_buffer = io.BytesIO() |
|
|
pil_image.save(test_buffer, format='JPEG', quality=best_quality, optimize=True) |
|
|
test_size_kb = test_buffer.tell() / 1024 |
|
|
|
|
|
if test_size_kb <= max_kb: |
|
|
optimized_size = test_size |
|
|
optimized_buffer = test_buffer |
|
|
size_kb = test_size_kb |
|
|
print(f"[Compress] OPTIMIZE - Size {optimized_size}px, quality {best_quality}: {size_kb:.2f} KB") |
|
|
else: |
|
|
print(f"[Compress] OPTIMIZE - Size {test_size}px exceeds limit: {test_size_kb:.2f} KB") |
|
|
break |
|
|
|
|
|
print(f"[Compress] ✓ Optimized to size {optimized_size}px with quality {best_quality}") |
|
|
return optimized_buffer.getvalue(), size_kb, best_quality, (optimized_size, optimized_size) |
|
|
|
|
|
|
|
|
print(f"[Compress] ⚠️ Could not reach {max_kb}KB, returning at {size_kb:.2f}KB") |
|
|
return buffer.getvalue(), size_kb, current_quality, (current_size, current_size) |
|
|
|
|
|
def create_analysis_image(self, image, eye_line_y, chin_y, top_hair_y, nose_x, |
|
|
eye_to_bottom_ratio, face_height_ratio): |
|
|
|
|
|
analysis_img = image.copy() |
|
|
h, w = analysis_img.shape[:2] |
|
|
|
|
|
|
|
|
GREEN = (0, 255, 0) |
|
|
BLUE = (255, 0, 0) |
|
|
RED = (0, 0, 255) |
|
|
YELLOW = (0, 255, 255) |
|
|
CYAN = (255, 255, 0) |
|
|
MAGENTA = (255, 0, 255) |
|
|
WHITE = (255, 255, 255) |
|
|
BLACK = (0, 0, 0) |
|
|
|
|
|
|
|
|
cv2.line(analysis_img, (0, int(top_hair_y)), (w, int(top_hair_y)), BLUE, 2) |
|
|
cv2.line(analysis_img, (0, int(eye_line_y)), (w, int(eye_line_y)), GREEN, 2) |
|
|
cv2.line(analysis_img, (0, int(chin_y)), (w, int(chin_y)), RED, 2) |
|
|
|
|
|
|
|
|
cv2.line(analysis_img, (int(nose_x), 0), (int(nose_x), h), YELLOW, 2) |
|
|
|
|
|
|
|
|
|
|
|
eye_bottom_x = int(w * 0.15) |
|
|
|
|
|
|
|
|
cv2.line(analysis_img, |
|
|
(eye_bottom_x, int(eye_line_y)), |
|
|
(eye_bottom_x, h), |
|
|
CYAN, 3) |
|
|
|
|
|
|
|
|
arrow_size = 15 |
|
|
|
|
|
cv2.arrowedLine(analysis_img, |
|
|
(eye_bottom_x, int(eye_line_y) + 30), |
|
|
(eye_bottom_x, int(eye_line_y)), |
|
|
CYAN, 2, tipLength=0.3) |
|
|
|
|
|
cv2.arrowedLine(analysis_img, |
|
|
(eye_bottom_x, h - 30), |
|
|
(eye_bottom_x, h - 1), |
|
|
CYAN, 2, tipLength=0.3) |
|
|
|
|
|
|
|
|
text = f"Eye to Bottom: {eye_to_bottom_ratio:.1f}%" |
|
|
text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] |
|
|
text_x = eye_bottom_x - text_size[0] // 2 |
|
|
text_y = int((eye_line_y + h) / 2) |
|
|
|
|
|
|
|
|
cv2.rectangle(analysis_img, |
|
|
(text_x - 5, text_y - text_size[1] - 5), |
|
|
(text_x + text_size[0] + 5, text_y + 5), |
|
|
BLACK, -1) |
|
|
|
|
|
|
|
|
cv2.putText(analysis_img, text, (text_x, text_y), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, CYAN, 2) |
|
|
|
|
|
|
|
|
|
|
|
face_height_x = int(w * 0.85) |
|
|
|
|
|
|
|
|
cv2.line(analysis_img, |
|
|
(face_height_x, int(top_hair_y)), |
|
|
(face_height_x, int(chin_y)), |
|
|
MAGENTA, 3) |
|
|
|
|
|
|
|
|
|
|
|
cv2.arrowedLine(analysis_img, |
|
|
(face_height_x, int(top_hair_y) + 30), |
|
|
(face_height_x, int(top_hair_y)), |
|
|
MAGENTA, 2, tipLength=0.3) |
|
|
|
|
|
cv2.arrowedLine(analysis_img, |
|
|
(face_height_x, int(chin_y) - 30), |
|
|
(face_height_x, int(chin_y)), |
|
|
MAGENTA, 2, tipLength=0.3) |
|
|
|
|
|
|
|
|
text = f"Face Height: {face_height_ratio:.1f}%" |
|
|
text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] |
|
|
text_x = face_height_x - text_size[0] // 2 |
|
|
text_y = int((top_hair_y + chin_y) / 2) |
|
|
|
|
|
|
|
|
cv2.rectangle(analysis_img, |
|
|
(text_x - 5, text_y - text_size[1] - 5), |
|
|
(text_x + text_size[0] + 5, text_y + 5), |
|
|
BLACK, -1) |
|
|
|
|
|
|
|
|
cv2.putText(analysis_img, text, (text_x, text_y), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, MAGENTA, 2) |
|
|
|
|
|
|
|
|
legend_x = w - 250 |
|
|
legend_y = 30 |
|
|
line_height = 25 |
|
|
|
|
|
|
|
|
overlay = analysis_img.copy() |
|
|
cv2.rectangle(overlay, (legend_x - 10, legend_y - 10), |
|
|
(w - 10, legend_y + line_height * 5 + 10), |
|
|
BLACK, -1) |
|
|
cv2.addWeighted(overlay, 0.7, analysis_img, 0.3, 0, analysis_img) |
|
|
|
|
|
|
|
|
legends = [ |
|
|
("Top Hair", BLUE), |
|
|
("Eye Line", GREEN), |
|
|
("Chin", RED), |
|
|
("Nose Center", YELLOW), |
|
|
("Eye-Bottom", CYAN), |
|
|
("Face Height", MAGENTA) |
|
|
] |
|
|
|
|
|
for idx, (label, color) in enumerate(legends): |
|
|
y_pos = legend_y + idx * line_height |
|
|
|
|
|
cv2.line(analysis_img, (legend_x, y_pos), |
|
|
(legend_x + 30, y_pos), color, 2) |
|
|
|
|
|
cv2.putText(analysis_img, label, (legend_x + 40, y_pos + 5), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, WHITE, 1) |
|
|
|
|
|
return analysis_img |
|
|
|
|
|
def process_image_from_base64(self, image_base64): |
|
|
|
|
|
try: |
|
|
|
|
|
image_bytes = base64.b64decode(image_base64) |
|
|
nparr = np.frombuffer(image_bytes, np.uint8) |
|
|
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
|
|
|
|
|
if image is None: |
|
|
raise ValueError("Failed to decode image") |
|
|
|
|
|
h, w = image.shape[:2] |
|
|
|
|
|
|
|
|
face_landmarks, pose_results = self.detect_landmarks(image) |
|
|
|
|
|
|
|
|
left_eye, right_eye = self.get_eye_centers(face_landmarks, w, h) |
|
|
|
|
|
|
|
|
angle = self.calculate_rotation_angle(left_eye, right_eye) |
|
|
|
|
|
|
|
|
center = ((left_eye[0] + right_eye[0]) / 2, (left_eye[1] + right_eye[1]) / 2) |
|
|
rotated_image, rotation_matrix = self.rotate_image(image, angle, center) |
|
|
|
|
|
|
|
|
left_eye = self.rotate_point(left_eye, rotation_matrix) |
|
|
right_eye = self.rotate_point(right_eye, rotation_matrix) |
|
|
eye_line_y = (left_eye[1] + right_eye[1]) / 2 |
|
|
|
|
|
|
|
|
face_landmarks, pose_results = self.detect_landmarks(rotated_image) |
|
|
|
|
|
|
|
|
nose_x, nose_y = self.get_nose_tip(face_landmarks, w, h) |
|
|
chin_x, chin_y = self.get_chin(face_landmarks, w, h) |
|
|
hair_x, top_hair_y = self.get_forehead_top(face_landmarks, w, h) |
|
|
shoulders = self.get_shoulders(pose_results, w, h) |
|
|
|
|
|
body_bottom_y = h |
|
|
if shoulders: |
|
|
body_bottom_y = max(shoulders['left'][1], shoulders['right'][1]) |
|
|
|
|
|
|
|
|
crop_box = self.calculate_crop_box( |
|
|
rotated_image, eye_line_y, chin_y, top_hair_y, |
|
|
nose_x, shoulders, body_bottom_y |
|
|
) |
|
|
|
|
|
|
|
|
cropped = rotated_image[ |
|
|
crop_box['top']:crop_box['bottom'], |
|
|
crop_box['left']:crop_box['right'] |
|
|
] |
|
|
|
|
|
|
|
|
analysis_image = self.create_analysis_image( |
|
|
cropped, |
|
|
eye_line_y - crop_box['top'], |
|
|
chin_y - crop_box['top'], |
|
|
top_hair_y - crop_box['top'], |
|
|
nose_x - crop_box['left'], |
|
|
crop_box['eye_to_bottom_ratio'], |
|
|
crop_box['face_height_ratio'] |
|
|
) |
|
|
|
|
|
|
|
|
final_bytes, file_size, quality, final_size = self.compress_to_size(cropped, max_kb=240) |
|
|
|
|
|
|
|
|
final_image_b64 = base64.b64encode(final_bytes).decode('utf-8') |
|
|
|
|
|
_, analysis_buffer = cv2.imencode('.jpg', analysis_image, [cv2.IMWRITE_JPEG_QUALITY, 95]) |
|
|
analysis_image_b64 = base64.b64encode(analysis_buffer).decode('utf-8') |
|
|
|
|
|
return { |
|
|
'service_type': 'processing', |
|
|
'final_image': final_image_b64, |
|
|
'analysis_image': analysis_image_b64, |
|
|
'info': { |
|
|
'size': f"{final_size[0]}x{final_size[1]}", |
|
|
'file_size': f"{file_size:.2f} KB", |
|
|
'quality': quality, |
|
|
'eye_to_bottom': f"{crop_box['eye_to_bottom_ratio']:.1f}%", |
|
|
'face_height': f"{crop_box['face_height_ratio']:.1f}%" |
|
|
} |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Processing error: {str(e)}") |
|
|
|
|
|
|
|
|
class PhotoRequirementsChecker: |
|
|
def __init__(self): |
|
|
self.mp_face_mesh = mp.solutions.face_mesh |
|
|
self.mp_face_detection = mp.solutions.face_detection |
|
|
self.face_mesh = self.mp_face_mesh.FaceMesh( |
|
|
static_image_mode=True, max_num_faces=2, |
|
|
refine_landmarks=True, min_detection_confidence=0.5) |
|
|
self.face_detection = self.mp_face_detection.FaceDetection( |
|
|
min_detection_confidence=0.5) |
|
|
|
|
|
self.results = [] |
|
|
self._bg_remover = None |
|
|
|
|
|
print("[Checker] Initializing with AI Face Parsing...") |
|
|
init_face_parser() |
|
|
print("[Checker] Ready with AI model") |
|
|
|
|
|
def _load_background_remover(self): |
|
|
|
|
|
if self._bg_remover is None: |
|
|
try: |
|
|
print("[AI] Loading background removal model...") |
|
|
|
|
|
|
|
|
try: |
|
|
from transformers import pipeline |
|
|
print("[AI] Trying U2Net model...") |
|
|
|
|
|
self._bg_remover = pipeline( |
|
|
"image-segmentation", |
|
|
model="briaai/RMBG-2.0", |
|
|
device=-1 |
|
|
) |
|
|
print("[AI] ✓ Background model loaded (RMBG-2.0)") |
|
|
return self._bg_remover |
|
|
|
|
|
except Exception as e1: |
|
|
print(f"[AI] RMBG-2.0 failed: {e1}") |
|
|
|
|
|
|
|
|
try: |
|
|
print("[AI] Trying DeepLabV3...") |
|
|
from transformers import AutoImageProcessor, AutoModelForSemanticSegmentation |
|
|
import torch |
|
|
|
|
|
processor = AutoImageProcessor.from_pretrained( |
|
|
"nvidia/segformer-b0-finetuned-ade-512-512" |
|
|
) |
|
|
model = AutoModelForSemanticSegmentation.from_pretrained( |
|
|
"nvidia/segformer-b0-finetuned-ade-512-512" |
|
|
) |
|
|
|
|
|
self._bg_remover = { |
|
|
'processor': processor, |
|
|
'model': model, |
|
|
'type': 'segformer' |
|
|
} |
|
|
print("[AI] ✓ Background model loaded (SegFormer)") |
|
|
return self._bg_remover |
|
|
|
|
|
except Exception as e2: |
|
|
print(f"[AI] SegFormer failed: {e2}") |
|
|
print("[AI] ⚠ Using CV2 fallback for background") |
|
|
self._bg_remover = False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[AI] Background model initialization failed: {e}") |
|
|
self._bg_remover = False |
|
|
|
|
|
return self._bg_remover |
|
|
|
|
|
def add_result(self, category, requirement, status, message, details=""): |
|
|
|
|
|
self.results.append({ |
|
|
'category': category, |
|
|
'requirement': requirement, |
|
|
'status': status, |
|
|
'message': message, |
|
|
'details': details |
|
|
}) |
|
|
|
|
|
def _get_background_mask(self, image, landmarks, img_width, img_height): |
|
|
|
|
|
|
|
|
bg_remover = self._load_background_remover() |
|
|
|
|
|
if bg_remover and bg_remover is not False: |
|
|
try: |
|
|
pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) |
|
|
|
|
|
|
|
|
if callable(bg_remover): |
|
|
result = bg_remover(pil_image) |
|
|
|
|
|
if isinstance(result, list) and len(result) > 0: |
|
|
|
|
|
for item in result: |
|
|
if 'label' in item and ('person' in item['label'].lower() or |
|
|
'human' in item['label'].lower()): |
|
|
mask_pil = item['mask'] |
|
|
mask = np.array(mask_pil) |
|
|
|
|
|
if len(mask.shape) == 3: |
|
|
mask = cv2.cvtColor(mask, cv2.COLOR_RGB2GRAY) |
|
|
|
|
|
_, binary_mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY) |
|
|
|
|
|
if binary_mask.shape[:2] != (img_height, img_width): |
|
|
binary_mask = cv2.resize(binary_mask, (img_width, img_height)) |
|
|
|
|
|
bg_mask = cv2.bitwise_not(binary_mask) |
|
|
print("[AI] ✓ Background mask extracted") |
|
|
return bg_mask |
|
|
|
|
|
|
|
|
elif isinstance(bg_remover, dict) and bg_remover.get('type') == 'segformer': |
|
|
import torch |
|
|
processor = bg_remover['processor'] |
|
|
model = bg_remover['model'] |
|
|
|
|
|
inputs = processor(images=pil_image, return_tensors="pt") |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model(**inputs) |
|
|
logits = outputs.logits |
|
|
|
|
|
|
|
|
upsampled = torch.nn.functional.interpolate( |
|
|
logits, |
|
|
size=(img_height, img_width), |
|
|
mode="bilinear", |
|
|
align_corners=False |
|
|
) |
|
|
|
|
|
seg_mask = upsampled.argmax(dim=1)[0].cpu().numpy() |
|
|
|
|
|
|
|
|
person_mask = (seg_mask == 12).astype(np.uint8) * 255 |
|
|
bg_mask = cv2.bitwise_not(person_mask) |
|
|
|
|
|
print("[AI] ✓ Background mask extracted (SegFormer)") |
|
|
return bg_mask |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[AI] Background segmentation failed: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
print("[AI] Using CV2 fallback for background mask") |
|
|
return self._get_background_mask_cv(image, landmarks, img_width, img_height) |
|
|
|
|
|
def _get_background_mask_cv(self, image, landmarks, img_width, img_height): |
|
|
|
|
|
h, w = image.shape[:2] |
|
|
|
|
|
face_outline = [10, 338, 297, 332, 284, 251, 389, 356, 454, 323, |
|
|
361, 288, 397, 365, 379, 378, 400, 377, 152, 148, |
|
|
176, 149, 150, 136, 172, 58, 132, 93, 234, 127, 162] |
|
|
|
|
|
face_points = [] |
|
|
for idx in face_outline: |
|
|
x = int(landmarks.landmark[idx].x * img_width) |
|
|
y = int(landmarks.landmark[idx].y * img_height) |
|
|
face_points.append([x, y]) |
|
|
|
|
|
face_mask = np.zeros((h, w), dtype=np.uint8) |
|
|
cv2.fillPoly(face_mask, [np.array(face_points)], 255) |
|
|
|
|
|
kernel = np.ones((50, 50), np.uint8) |
|
|
face_mask = cv2.dilate(face_mask, kernel, iterations=1) |
|
|
|
|
|
return cv2.bitwise_not(face_mask) |
|
|
|
|
|
|
|
|
def check_eyeglasses_parsing(self, parsing_mask, image): |
|
|
|
|
|
try: |
|
|
h, w = image.shape[:2] |
|
|
mask_resized = cv2.resize(parsing_mask.astype(np.uint8), (w, h), |
|
|
interpolation=cv2.INTER_NEAREST) |
|
|
|
|
|
glasses_pixels = np.sum(mask_resized == 3) |
|
|
glasses_ratio = glasses_pixels / (h * w) |
|
|
|
|
|
if glasses_ratio > 0.005: |
|
|
self.add_result("Facial Features", "Eyeglasses", "fail", |
|
|
f"Eyeglasses detected (AI: {min(glasses_ratio*100, 100):.1f}%)", |
|
|
"Eyeglasses not allowed. Exception: Medical reasons with doctor's statement.") |
|
|
elif glasses_ratio > 0.002: |
|
|
self.add_result("Facial Features", "Eyeglasses", "warning", |
|
|
"Possible eyeglasses detected", "If wearing glasses, remove and retake.") |
|
|
else: |
|
|
self.add_result("Facial Features", "Eyeglasses", "pass", |
|
|
"No eyeglasses detected (AI)", "Meets requirement") |
|
|
except Exception as e: |
|
|
print(f"[Parsing] Eyeglasses error: {e}") |
|
|
self.check_eyeglasses_cv_fallback(image, None, w, h) |
|
|
|
|
|
def check_headwear_parsing(self, parsing_mask, image): |
|
|
|
|
|
try: |
|
|
h, w = image.shape[:2] |
|
|
mask_resized = cv2.resize(parsing_mask.astype(np.uint8), (w, h), |
|
|
interpolation=cv2.INTER_NEAREST) |
|
|
|
|
|
hat_pixels = np.sum(mask_resized == 14) |
|
|
hat_ratio = hat_pixels / (h * w) |
|
|
|
|
|
|
|
|
if hat_pixels > 0: |
|
|
hat_y_coords, hat_x_coords = np.where(mask_resized == 14) |
|
|
|
|
|
if len(hat_y_coords) > 0: |
|
|
avg_hat_y = np.mean(hat_y_coords) |
|
|
|
|
|
|
|
|
if avg_hat_y > h * 0.3: |
|
|
print(f"[DEBUG] Hat pixels in wrong location (y={avg_hat_y:.0f}/{h}), ignoring") |
|
|
hat_pixels = 0 |
|
|
hat_ratio = 0 |
|
|
|
|
|
print(f"[DEBUG] Hat: {hat_pixels} pixels ({hat_ratio*100:.4f}%), location check passed") |
|
|
|
|
|
|
|
|
MIN_HAT_PIXELS = 2000 |
|
|
FAIL_RATIO = 0.035 |
|
|
WARN_RATIO = 0.018 |
|
|
|
|
|
if hat_pixels < MIN_HAT_PIXELS: |
|
|
self.add_result("Head Covering", "Headwear/Hat", "pass", |
|
|
f"No headwear (AI: {hat_pixels} pixels - likely noise)", "Meets requirement") |
|
|
|
|
|
elif hat_ratio > FAIL_RATIO: |
|
|
head_region = mask_resized[:int(h*0.3), :] |
|
|
hat_in_head = np.sum(head_region == 14) |
|
|
coverage = (hat_in_head / head_region.size) * 100 |
|
|
|
|
|
self.add_result("Head Covering", "Headwear/Hat", "fail", |
|
|
f"Headwear detected (AI: {hat_pixels} pixels, {coverage:.1f}% head coverage)", |
|
|
"Do not wear hats. Exception: Religious covering worn daily.") |
|
|
|
|
|
elif hat_ratio > WARN_RATIO: |
|
|
self.add_result("Head Covering", "Headwear/Hat", "warning", |
|
|
f"Possible headwear/large hair accessory (AI: {hat_pixels} pixels)", |
|
|
"If wearing hat/large accessory, remove. If hair/hijab, proceed.") |
|
|
|
|
|
else: |
|
|
self.add_result("Head Covering", "Headwear/Hat", "pass", |
|
|
f"No significant headwear (AI: {hat_pixels} pixels)", "Meets requirement") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[Parsing] Headwear error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
self.check_headwear_cv_fallback(image, None, w, h) |
|
|
|
|
|
def check_eyes_open_parsing(self, parsing_mask, landmarks, img_width, img_height, image): |
|
|
|
|
|
try: |
|
|
h, w = image.shape[:2] |
|
|
mask_resized = cv2.resize(parsing_mask.astype(np.uint8), (w, h), |
|
|
interpolation=cv2.INTER_NEAREST) |
|
|
|
|
|
left_eye_pixels = np.sum(mask_resized == 4) |
|
|
right_eye_pixels = np.sum(mask_resized == 5) |
|
|
total_eye_pixels = left_eye_pixels + right_eye_pixels |
|
|
eye_ratio = total_eye_pixels / (h * w) |
|
|
|
|
|
print(f"[DEBUG] Eyes: L={left_eye_pixels}, R={right_eye_pixels}, ratio={eye_ratio*100:.4f}%") |
|
|
|
|
|
if eye_ratio > 0.0005: |
|
|
self.add_result("Facial Expression", "Eyes Open", "pass", |
|
|
f"Both eyes clearly open (AI: {total_eye_pixels} pixels)", "Neutral expression met") |
|
|
elif eye_ratio > 0.0008: |
|
|
self.add_result("Facial Expression", "Eyes Open", "warning", |
|
|
f"Eyes may be partially closed (AI: {total_eye_pixels} pixels)", |
|
|
"Both eyes must be fully open") |
|
|
else: |
|
|
glasses_pixels = np.sum(mask_resized == 6) |
|
|
if glasses_pixels > total_eye_pixels * 5: |
|
|
self.add_result("Facial Expression", "Eyes Open", "warning", |
|
|
"Eyes obscured by eyeglasses", "Eyes must be visible") |
|
|
else: |
|
|
self.add_result("Facial Expression", "Eyes Open", "fail", |
|
|
f"Eyes appear closed (AI: {total_eye_pixels} pixels)", |
|
|
"Both eyes must be fully open") |
|
|
except Exception as e: |
|
|
print(f"[Parsing] Eyes error: {e}") |
|
|
self.check_eyes_open_cv(landmarks, img_height) |
|
|
|
|
|
def check_jewelry_parsing(self, parsing_mask, image): |
|
|
|
|
|
try: |
|
|
h, w = image.shape[:2] |
|
|
mask_resized = cv2.resize(parsing_mask.astype(np.uint8), (w, h), |
|
|
interpolation=cv2.INTER_NEAREST) |
|
|
|
|
|
earring_ratio = np.sum(mask_resized == 15) / (h * w) |
|
|
necklace_ratio = np.sum(mask_resized == 16) / (h * w) |
|
|
|
|
|
jewelry_detected = [] |
|
|
if earring_ratio > 0.001: |
|
|
jewelry_detected.append("earrings") |
|
|
if necklace_ratio > 0.002: |
|
|
jewelry_detected.append("necklace") |
|
|
|
|
|
if jewelry_detected: |
|
|
jewelry_str = " and ".join(jewelry_detected) |
|
|
confidence = max(earring_ratio, necklace_ratio) * 100 |
|
|
self.add_result("Accessories", "Jewelry", "warning", |
|
|
f"{jewelry_str.capitalize()} detected (AI: {confidence:.1f}%)", |
|
|
"Visible jewelry should be minimal.") |
|
|
else: |
|
|
self.add_result("Accessories", "Jewelry", "pass", |
|
|
"No prominent jewelry detected (AI)", "Meets requirement") |
|
|
except Exception as e: |
|
|
print(f"[Parsing] Jewelry error: {e}") |
|
|
self.add_result("Accessories", "Jewelry", "pass", |
|
|
"Unable to verify", "Manual review recommended") |
|
|
|
|
|
def check_face_covering_parsing(self, parsing_mask, landmarks, img_width, img_height, image): |
|
|
|
|
|
try: |
|
|
h, w = image.shape[:2] |
|
|
mask_resized = cv2.resize(parsing_mask.astype(np.uint8), (w, h), |
|
|
interpolation=cv2.INTER_NEAREST) |
|
|
|
|
|
face_indices = [234, 127, 162, 21, 54, 103, 67, 109, 10, 338, 297, 332, 284, |
|
|
251, 389, 356, 454, 152, 148, 176, 149, 150, 136, 172, 58, 132, 93] |
|
|
|
|
|
face_points = [[int(landmarks.landmark[i].x * img_width), |
|
|
int(landmarks.landmark[i].y * img_height)] for i in face_indices] |
|
|
|
|
|
face_mask = np.zeros((h, w), dtype=np.uint8) |
|
|
cv2.fillPoly(face_mask, [np.array(face_points)], 255) |
|
|
|
|
|
face_region = mask_resized[face_mask > 0] |
|
|
if len(face_region) == 0: |
|
|
return |
|
|
|
|
|
skin_ratio = np.sum(face_region == 1) / len(face_region) |
|
|
hair_ratio = np.sum(face_region == 13) / len(face_region) |
|
|
|
|
|
print(f"[DEBUG] Face Covering: skin={skin_ratio*100:.1f}%, hair={hair_ratio*100:.1f}%") |
|
|
|
|
|
if skin_ratio < 0.30: |
|
|
self.add_result("Head Covering", "Face Covering", "fail", |
|
|
f"Face significantly covered (AI: {skin_ratio*100:.0f}% visible)", |
|
|
"Full face must be visible") |
|
|
elif hair_ratio > 0.25: |
|
|
self.add_result("Head Covering", "Face Covering", "warning", |
|
|
f"Hair may be covering face ({hair_ratio*100:.0f}%)", |
|
|
"Ensure hair pulled back") |
|
|
else: |
|
|
self.add_result("Head Covering", "Face Covering", "pass", |
|
|
f"Face fully visible (AI: {skin_ratio*100:.0f}% skin)", "No obstruction") |
|
|
except Exception as e: |
|
|
print(f"[Parsing] Face covering error: {e}") |
|
|
self.add_result("Head Covering", "Face Covering", "pass", |
|
|
"Unable to verify", "Manual review recommended") |
|
|
|
|
|
def check_eyes_open_cv(self, landmarks, img_height): |
|
|
|
|
|
left_top = landmarks.landmark[159].y |
|
|
left_bottom = landmarks.landmark[145].y |
|
|
left_opening = abs(left_top - left_bottom) * img_height |
|
|
|
|
|
right_top = landmarks.landmark[386].y |
|
|
right_bottom = landmarks.landmark[374].y |
|
|
right_opening = abs(right_top - right_bottom) * img_height |
|
|
avg_opening = (left_opening + right_opening) / 2 |
|
|
|
|
|
if avg_opening > img_height * 0.012: |
|
|
self.add_result("Facial Expression", "Eyes Open", "pass", |
|
|
"Both eyes open (CV)", "Meets requirement") |
|
|
elif avg_opening > img_height * 0.008: |
|
|
self.add_result("Facial Expression", "Eyes Open", "warning", |
|
|
"Eyes may be partially closed", "Both eyes must be fully open") |
|
|
else: |
|
|
self.add_result("Facial Expression", "Eyes Open", "fail", |
|
|
"Eyes appear closed", "Both eyes must be fully open") |
|
|
|
|
|
def check_eyeglasses_cv_fallback(self, image, landmarks, img_width, img_height): |
|
|
|
|
|
self.add_result("Facial Features", "Eyeglasses", "pass", |
|
|
"Unable to verify with AI", "Manual review recommended") |
|
|
|
|
|
def check_headwear_cv_fallback(self, image, landmarks, img_width, img_height): |
|
|
|
|
|
self.add_result("Head Covering", "Headwear/Hat", "pass", |
|
|
"Unable to verify with AI", "Manual review recommended") |
|
|
|
|
|
|
|
|
def check_dimensions(self, image): |
|
|
h, w = image.shape[:2] |
|
|
if h == w and 600 <= w <= 1200: |
|
|
self.add_result("Dimensions", "Image Size", "pass", |
|
|
f"{w}×{h} pixels", "Meets requirements") |
|
|
else: |
|
|
self.add_result("Dimensions", "Image Size", "fail", |
|
|
f"{w}×{h} pixels", "Must be square (600-1200px)") |
|
|
|
|
|
def check_color_depth(self, image): |
|
|
if len(image.shape) == 3 and image.shape[2] == 3: |
|
|
b, g, r = cv2.split(image) |
|
|
avg_diff = (np.abs(b.astype(float) - g.astype(float)).mean() + |
|
|
np.abs(b.astype(float) - r.astype(float)).mean() + |
|
|
np.abs(g.astype(float) - r.astype(float)).mean()) / 3 |
|
|
|
|
|
if avg_diff < 5: |
|
|
self.add_result("Technical", "Color Depth", "warning", |
|
|
"Image appears grayscale", "Must be in color") |
|
|
return False |
|
|
else: |
|
|
self.add_result("Technical", "Color Depth", "pass", |
|
|
"Image in color (24-bit RGB)", "Meets requirement") |
|
|
return True |
|
|
else: |
|
|
self.add_result("Technical", "Color Depth", "fail", |
|
|
"Image is grayscale", "Must be in color") |
|
|
return False |
|
|
|
|
|
def check_face_detection(self, image): |
|
|
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
|
face_results = self.face_mesh.process(rgb_image) |
|
|
|
|
|
if face_results.multi_face_landmarks: |
|
|
num_faces = len(face_results.multi_face_landmarks) |
|
|
if num_faces == 1: |
|
|
self.add_result("Composition", "Number of People", "pass", |
|
|
"Exactly one face detected", "Meets requirement") |
|
|
return face_results.multi_face_landmarks[0] |
|
|
else: |
|
|
self.add_result("Composition", "Number of People", "fail", |
|
|
f"{num_faces} faces detected", "Only one person allowed") |
|
|
else: |
|
|
self.add_result("Composition", "Face Detection", "fail", |
|
|
"No face detected", "Face must be visible") |
|
|
return None |
|
|
|
|
|
def check_face_angle(self, landmarks, img_width, img_height): |
|
|
nose_x = landmarks.landmark[4].x * img_width |
|
|
left_face = landmarks.landmark[234].x * img_width |
|
|
right_face = landmarks.landmark[454].x * img_width |
|
|
face_center = (left_face + right_face) / 2 |
|
|
deviation_percent = abs(nose_x - face_center) / img_width * 100 |
|
|
|
|
|
if deviation_percent < 2: |
|
|
self.add_result("Head Position", "Face Direction", "pass", |
|
|
"Face directly facing camera", "Full-face view met") |
|
|
elif deviation_percent < 4: |
|
|
self.add_result("Head Position", "Face Direction", "warning", |
|
|
"Face slightly turned", "Must be in full-face view") |
|
|
else: |
|
|
self.add_result("Head Position", "Face Direction", "fail", |
|
|
"Face significantly turned", "Must be in full-face view") |
|
|
|
|
|
def check_red_eye(self, image, landmarks, img_width, img_height): |
|
|
|
|
|
def check_eye_redness(eye_indices): |
|
|
eye_points = [[int(landmarks.landmark[i].x * img_width), |
|
|
int(landmarks.landmark[i].y * img_height)] for i in eye_indices] |
|
|
|
|
|
mask = np.zeros((img_height, img_width), dtype=np.uint8) |
|
|
cv2.fillPoly(mask, [np.array(eye_points)], 255) |
|
|
pixels = image[mask > 0] |
|
|
|
|
|
if len(pixels) > 0: |
|
|
b, g, r = cv2.split(image) |
|
|
red_mean = r[mask > 0].mean() |
|
|
green_mean = g[mask > 0].mean() |
|
|
blue_mean = b[mask > 0].mean() |
|
|
|
|
|
if red_mean > green_mean * 1.4 and red_mean > blue_mean * 1.4 and red_mean > 100: |
|
|
return True |
|
|
return False |
|
|
|
|
|
left_eye_indices = [33, 133, 160, 159, 158, 157, 173] |
|
|
right_eye_indices = [362, 263, 387, 386, 385, 384, 398] |
|
|
|
|
|
if check_eye_redness(left_eye_indices) or check_eye_redness(right_eye_indices): |
|
|
self.add_result("Photo Quality", "Red Eye Effect", "fail", |
|
|
"Red eye effect detected", "Photo must not have red eye") |
|
|
else: |
|
|
self.add_result("Photo Quality", "Red Eye Effect", "pass", |
|
|
"No red eye effect detected", "Meets requirement") |
|
|
|
|
|
def check_image_quality(self, image): |
|
|
|
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
|
|
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() |
|
|
|
|
|
if laplacian_var > 150: |
|
|
self.add_result("Photo Quality", "Sharpness", "pass", |
|
|
"Image sharp and in focus", "Meets requirement") |
|
|
elif laplacian_var > 80: |
|
|
self.add_result("Photo Quality", "Sharpness", "warning", |
|
|
"Image slightly soft", "Should be sharp") |
|
|
else: |
|
|
self.add_result("Photo Quality", "Sharpness", "fail", |
|
|
"Image is blurry", "Must be sharp") |
|
|
|
|
|
blur = cv2.GaussianBlur(gray, (5, 5), 0) |
|
|
noise = cv2.subtract(gray, blur) |
|
|
noise_level = np.std(noise) |
|
|
|
|
|
if noise_level < 8: |
|
|
self.add_result("Photo Quality", "Grain/Noise", "pass", |
|
|
"Minimal grain/noise", "Meets requirement") |
|
|
elif noise_level < 15: |
|
|
self.add_result("Photo Quality", "Grain/Noise", "warning", |
|
|
"Noticeable grain/noise", "Use better camera") |
|
|
else: |
|
|
self.add_result("Photo Quality", "Grain/Noise", "fail", |
|
|
"Image is grainy/noisy", "Use better camera") |
|
|
|
|
|
def check_head_proportions(self, landmarks, img_height, img_width): |
|
|
|
|
|
h, w = img_height, img_width |
|
|
|
|
|
|
|
|
eye_indices = [33, 133, 362, 263] |
|
|
eye_y = np.mean([landmarks.landmark[i].y for i in eye_indices]) * h |
|
|
eye_from_bottom = 100 - (eye_y / h) * 100 |
|
|
|
|
|
|
|
|
if hasattr(self, '_current_parsing_mask') and self._current_parsing_mask is not None: |
|
|
parsing_mask = self._current_parsing_mask |
|
|
mask_resized = cv2.resize(parsing_mask.astype(np.uint8), (w, h), |
|
|
interpolation=cv2.INTER_NEAREST) |
|
|
|
|
|
|
|
|
hair_pixels = np.sum(mask_resized == 13) |
|
|
hat_pixels = np.sum(mask_resized == 14) |
|
|
total_head_coverage = hair_pixels + hat_pixels |
|
|
|
|
|
head_region = mask_resized[:int(h*0.4), :] |
|
|
head_coverage_ratio = total_head_coverage / (head_region.size) |
|
|
|
|
|
is_head_covered = hair_pixels < 3000 or head_coverage_ratio < 0.15 |
|
|
|
|
|
print(f"[DEBUG] Head Coverage Analysis:") |
|
|
print(f" Hair pixels: {hair_pixels}") |
|
|
print(f" Hat/covering pixels: {hat_pixels}") |
|
|
print(f" Coverage ratio: {head_coverage_ratio*100:.2f}%") |
|
|
print(f" Is head covered: {is_head_covered}") |
|
|
|
|
|
|
|
|
top_of_head = h |
|
|
|
|
|
if is_head_covered: |
|
|
|
|
|
print("[HEAD] Covered head detected - using hybrid method") |
|
|
|
|
|
|
|
|
if hat_pixels > 1000: |
|
|
hat_y_coords, _ = np.where(mask_resized == 14) |
|
|
if len(hat_y_coords) > 0: |
|
|
top_of_head = np.min(hat_y_coords) |
|
|
print(f"[HEAD] Using hat/covering top: {top_of_head}") |
|
|
|
|
|
|
|
|
elif hair_pixels > 500: |
|
|
hair_y_coords, _ = np.where(mask_resized == 13) |
|
|
if len(hair_y_coords) > 0: |
|
|
top_of_head = np.min(hair_y_coords) |
|
|
print(f"[HEAD] Using minimal hair top: {top_of_head}") |
|
|
|
|
|
|
|
|
else: |
|
|
head_skin_region = mask_resized[:int(h*0.3), :] |
|
|
skin_y_coords, _ = np.where(head_skin_region == 1) |
|
|
if len(skin_y_coords) > 0: |
|
|
top_of_head = np.min(skin_y_coords) |
|
|
print(f"[HEAD] Using head skin top: {top_of_head}") |
|
|
|
|
|
|
|
|
if top_of_head >= h * 0.5: |
|
|
print("[HEAD] Segmentation insufficient, using geometric estimation") |
|
|
|
|
|
|
|
|
forehead_indices = [10, 338, 297, 332, 284, 251, 389, 356, 454] |
|
|
min_forehead_y = min([landmarks.landmark[i].y for i in forehead_indices]) * h |
|
|
|
|
|
|
|
|
eye_to_forehead = min_forehead_y - eye_y |
|
|
|
|
|
|
|
|
|
|
|
if hat_pixels > 1000: |
|
|
|
|
|
hair_extension = eye_to_forehead * 0.85 |
|
|
else: |
|
|
|
|
|
hair_extension = eye_to_forehead * 0.70 |
|
|
|
|
|
top_of_head = max(0, min_forehead_y - hair_extension) |
|
|
print(f"[HEAD] Estimated top using forehead + {hair_extension:.0f}px extension: {top_of_head}") |
|
|
|
|
|
else: |
|
|
|
|
|
print("[HEAD] Open head detected - using standard method") |
|
|
|
|
|
|
|
|
hair_y_coords, _ = np.where(mask_resized == 13) |
|
|
if len(hair_y_coords) > 0: |
|
|
top_of_head = np.min(hair_y_coords) |
|
|
print(f"[HEAD] Using hair top: {top_of_head}") |
|
|
|
|
|
|
|
|
else: |
|
|
head_region = mask_resized[:int(h*0.3), :] |
|
|
skin_y_coords, _ = np.where(head_region == 1) |
|
|
if len(skin_y_coords) > 0: |
|
|
top_of_head = np.min(skin_y_coords) |
|
|
print(f"[HEAD] Using head skin: {top_of_head}") |
|
|
|
|
|
|
|
|
else: |
|
|
forehead_indices = [10, 338, 297, 332, 284, 251, 389, 356, 454] |
|
|
min_forehead_y = min([landmarks.landmark[i].y for i in forehead_indices]) * h |
|
|
eye_to_forehead = min_forehead_y - eye_y |
|
|
hair_extension = eye_to_forehead * 0.65 |
|
|
top_of_head = max(0, min_forehead_y - hair_extension) |
|
|
print(f"[HEAD] Fallback to landmarks: {top_of_head}") |
|
|
|
|
|
|
|
|
chin_y = 0 |
|
|
|
|
|
lower_face_region = mask_resized[int(h*0.5):, :] |
|
|
skin_y_coords, _ = np.where(lower_face_region == 1) |
|
|
if len(skin_y_coords) > 0: |
|
|
chin_y = np.max(skin_y_coords) + int(h*0.5) |
|
|
else: |
|
|
chin_y = landmarks.landmark[152].y * h |
|
|
|
|
|
|
|
|
face_height_pixels = chin_y - top_of_head |
|
|
face_height_ratio = (face_height_pixels / h) * 100 |
|
|
|
|
|
|
|
|
if face_height_ratio < 35 or face_height_ratio > 85: |
|
|
print(f"[HEAD] WARNING: Unrealistic face height {face_height_ratio:.1f}%, using fallback") |
|
|
|
|
|
|
|
|
chin_y = landmarks.landmark[152].y * h |
|
|
forehead_indices = [10, 338, 297, 332, 284, 251, 389, 356, 454] |
|
|
min_forehead_y = min([landmarks.landmark[i].y for i in forehead_indices]) * h |
|
|
eye_to_forehead = min_forehead_y - eye_y |
|
|
|
|
|
|
|
|
hair_extension = eye_to_forehead * (0.85 if is_head_covered else 0.65) |
|
|
top_of_head = max(0, min_forehead_y - hair_extension) |
|
|
|
|
|
face_height_pixels = chin_y - top_of_head |
|
|
face_height_ratio = (face_height_pixels / h) * 100 |
|
|
print(f"[HEAD] Corrected face height: {face_height_ratio:.1f}%") |
|
|
|
|
|
|
|
|
if not hasattr(self, '_debug_data'): |
|
|
self._debug_data = {} |
|
|
|
|
|
self._debug_data.update({ |
|
|
'top_of_head': top_of_head, |
|
|
'chin_y': chin_y, |
|
|
'eye_y': eye_y, |
|
|
'face_height_pixels': face_height_pixels, |
|
|
'face_height_ratio': face_height_ratio, |
|
|
'is_head_covered': is_head_covered, |
|
|
'hair_pixels': hair_pixels, |
|
|
'hat_pixels': hat_pixels, |
|
|
'method': 'AI_Segmentation_Enhanced' |
|
|
}) |
|
|
|
|
|
else: |
|
|
|
|
|
chin_y = landmarks.landmark[152].y * h |
|
|
forehead_indices = [10, 338, 297, 332, 284, 251, 389, 356, 454] |
|
|
min_forehead_y = min([landmarks.landmark[i].y for i in forehead_indices]) * h |
|
|
eye_to_forehead = min_forehead_y - eye_y |
|
|
hair_extension = eye_to_forehead * 0.75 |
|
|
top_of_head = max(0, min_forehead_y - hair_extension) |
|
|
face_height_pixels = chin_y - top_of_head |
|
|
face_height_ratio = (face_height_pixels / h) * 100 |
|
|
|
|
|
if not hasattr(self, '_debug_data'): |
|
|
self._debug_data = {} |
|
|
self._debug_data.update({ |
|
|
'method': 'MediaPipe_Fallback' |
|
|
}) |
|
|
|
|
|
|
|
|
if 56 <= eye_from_bottom <= 69: |
|
|
self.add_result("Head Position", "Eye Height", "pass", |
|
|
f"Eye height: {eye_from_bottom:.1f}% from bottom", |
|
|
"Meets requirement (56-69%)") |
|
|
else: |
|
|
issue = "Eyes too low" if eye_from_bottom < 56 else "Eyes too high" |
|
|
suggestion = "Move camera down" if eye_from_bottom < 56 else "Move camera up" |
|
|
self.add_result("Head Position", "Eye Height", "fail", |
|
|
f"Eye height: {eye_from_bottom:.1f}%. {issue}", |
|
|
f"Eyes must be 56-69% from bottom. {suggestion}") |
|
|
|
|
|
|
|
|
if 50 <= face_height_ratio <= 69: |
|
|
self.add_result("Head Position", "Face Height", "pass", |
|
|
f"Face height: {face_height_ratio:.1f}% (top to chin)", |
|
|
"Meets requirement (50-69%)") |
|
|
else: |
|
|
issue = "Head too small" if face_height_ratio < 50 else "Head too large" |
|
|
suggestion = "Move closer" if face_height_ratio < 50 else "Move further" |
|
|
self.add_result("Head Position", "Face Height", "fail", |
|
|
f"Face height: {face_height_ratio:.1f}%. {issue}", |
|
|
f"Head must be 50-69% of image. {suggestion}") |
|
|
|
|
|
def check_background_ai(self, image, landmarks, bg_mask, img_width, img_height): |
|
|
|
|
|
try: |
|
|
h, w = image.shape[:2] |
|
|
|
|
|
|
|
|
if hasattr(self, '_current_parsing_mask') and self._current_parsing_mask is not None: |
|
|
parsing_mask = self._current_parsing_mask |
|
|
mask_resized = cv2.resize(parsing_mask.astype(np.uint8), (w, h), |
|
|
interpolation=cv2.INTER_NEAREST) |
|
|
|
|
|
|
|
|
bg_mask_from_parsing = (mask_resized == 0).astype(np.uint8) * 255 |
|
|
|
|
|
|
|
|
if np.sum(bg_mask_from_parsing > 0) > (h * w * 0.1): |
|
|
print("[BG] Using parsing mask for background detection") |
|
|
bg_mask = bg_mask_from_parsing |
|
|
else: |
|
|
print("[BG] Parsing mask insufficient, using CV fallback") |
|
|
|
|
|
bg_pixels = image[bg_mask > 0] |
|
|
|
|
|
if len(bg_pixels) == 0: |
|
|
self.add_result("Background", "Color", "fail", |
|
|
"Cannot analyze background", "Background not detected") |
|
|
return |
|
|
|
|
|
|
|
|
bg_mean = np.mean(bg_pixels.reshape(-1, 3), axis=0) |
|
|
brightness = np.mean(bg_mean) |
|
|
color_variance = np.std(bg_mean) |
|
|
is_neutral = color_variance < 15 |
|
|
|
|
|
if brightness > 200 and is_neutral: |
|
|
self.add_result("Background", "Color", "pass", |
|
|
f"Background white/off-white (brightness: {brightness:.0f})", |
|
|
"Plain white requirement met") |
|
|
elif brightness > 180 and is_neutral: |
|
|
self.add_result("Background", "Color", "warning", |
|
|
f"Background light (brightness: {brightness:.0f})", |
|
|
"Should be plain white") |
|
|
else: |
|
|
self.add_result("Background", "Color", "fail", |
|
|
f"Background not white (brightness: {brightness:.0f})", |
|
|
"Must be plain white or off-white") |
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
|
|
bg_gray = cv2.bitwise_and(gray, gray, mask=bg_mask) |
|
|
|
|
|
|
|
|
bg_gray_pixels = bg_gray[bg_mask > 0] |
|
|
bg_std = np.std(bg_gray_pixels) |
|
|
|
|
|
print(f"[BG-Uniformity] STD: {bg_std:.2f}, Brightness: {brightness:.1f}") |
|
|
|
|
|
|
|
|
|
|
|
edges = cv2.Canny(bg_gray, 30, 100) |
|
|
edge_pixels = np.sum(edges > 0) |
|
|
edge_ratio = edge_pixels / np.sum(bg_mask > 0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if brightness > 200: |
|
|
|
|
|
std_threshold_high = 12 |
|
|
std_threshold_low = 8 |
|
|
edge_threshold_high = 0.02 |
|
|
edge_threshold_low = 0.01 |
|
|
elif brightness > 180: |
|
|
|
|
|
std_threshold_high = 15 |
|
|
std_threshold_low = 10 |
|
|
edge_threshold_high = 0.025 |
|
|
edge_threshold_low = 0.015 |
|
|
else: |
|
|
|
|
|
std_threshold_high = 20 |
|
|
std_threshold_low = 12 |
|
|
edge_threshold_high = 0.03 |
|
|
edge_threshold_low = 0.02 |
|
|
|
|
|
|
|
|
has_texture = False |
|
|
texture_level = "none" |
|
|
|
|
|
if bg_std > std_threshold_high or edge_ratio > edge_threshold_high: |
|
|
has_texture = True |
|
|
texture_level = "high" |
|
|
elif bg_std > std_threshold_low or edge_ratio > edge_threshold_low: |
|
|
has_texture = True |
|
|
texture_level = "slight" |
|
|
|
|
|
print(f"[BG-Uniformity] Edge ratio: {edge_ratio:.4f}, Texture: {texture_level}") |
|
|
|
|
|
if not has_texture: |
|
|
self.add_result("Background", "Uniformity", "pass", |
|
|
"Background plain and uniform", "No patterns detected") |
|
|
elif texture_level == "slight": |
|
|
|
|
|
if brightness > 190 and bg_std < std_threshold_low * 1.5: |
|
|
|
|
|
self.add_result("Background", "Uniformity", "pass", |
|
|
"Background uniform with minimal natural variation", |
|
|
"Slight natural shadow acceptable") |
|
|
else: |
|
|
self.add_result("Background", "Uniformity", "warning", |
|
|
"Background has slight texture", "Should be completely plain") |
|
|
else: |
|
|
self.add_result("Background", "Uniformity", "fail", |
|
|
"Background has visible patterns", "Must be plain") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[AI] Background check error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
self.add_result("Background", "Analysis", "pass", |
|
|
"Unable to verify", "Manual review recommended") |
|
|
|
|
|
def detect_shadow_mask(self, image, parsing_mask=None): |
|
|
|
|
|
try: |
|
|
h, w = image.shape[:2] |
|
|
|
|
|
|
|
|
if parsing_mask is not None: |
|
|
mask_resized = cv2.resize(parsing_mask.astype(np.uint8), (w, h), |
|
|
interpolation=cv2.INTER_NEAREST) |
|
|
bg_mask = (mask_resized == 0).astype(np.uint8) * 255 |
|
|
print("[Shadow] Using parsing mask for background") |
|
|
else: |
|
|
print("[Shadow] Parsing mask not available") |
|
|
return None, None |
|
|
|
|
|
if np.sum(bg_mask > 0) < (h * w * 0.05): |
|
|
print("[Shadow] Background area too small") |
|
|
return None, None |
|
|
|
|
|
|
|
|
lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB) |
|
|
l_channel = lab[:, :, 0] |
|
|
|
|
|
bg_l = cv2.bitwise_and(l_channel, l_channel, mask=bg_mask) |
|
|
bg_pixels = l_channel[bg_mask > 0] |
|
|
|
|
|
if len(bg_pixels) == 0: |
|
|
return None, None |
|
|
|
|
|
|
|
|
bg_mean = np.mean(bg_pixels) |
|
|
bg_std = np.std(bg_pixels) |
|
|
bg_median = np.median(bg_pixels) |
|
|
|
|
|
print(f"[Shadow] BG Stats - Mean: {bg_mean:.1f}, Median: {bg_median:.1f}, STD: {bg_std:.1f}") |
|
|
|
|
|
|
|
|
shadow_threshold_1 = bg_mean - bg_std * 1.2 |
|
|
shadow_threshold_2 = bg_median - bg_std * 1.0 |
|
|
shadow_threshold = min(shadow_threshold_1, shadow_threshold_2) |
|
|
|
|
|
if bg_mean > 200: |
|
|
shadow_threshold = bg_mean - max(bg_std * 1.5, 25) |
|
|
|
|
|
print(f"[Shadow] Threshold: {shadow_threshold:.1f}") |
|
|
|
|
|
|
|
|
shadow_mask_raw = np.zeros((h, w), dtype=np.uint8) |
|
|
shadow_mask_raw[bg_mask > 0] = ((l_channel[bg_mask > 0] < shadow_threshold) * 255).astype(np.uint8) |
|
|
|
|
|
|
|
|
kernel_small = np.ones((3, 3), np.uint8) |
|
|
kernel_medium = np.ones((5, 5), np.uint8) |
|
|
|
|
|
shadow_mask_clean = cv2.morphologyEx(shadow_mask_raw, cv2.MORPH_OPEN, kernel_small) |
|
|
shadow_mask_clean = cv2.morphologyEx(shadow_mask_clean, cv2.MORPH_CLOSE, kernel_medium) |
|
|
shadow_mask_clean = cv2.GaussianBlur(shadow_mask_clean, (5, 5), 0) |
|
|
_, shadow_mask_clean = cv2.threshold(shadow_mask_clean, 127, 255, cv2.THRESH_BINARY) |
|
|
|
|
|
|
|
|
shadow_pixels = np.sum(shadow_mask_clean > 0) |
|
|
bg_pixels_count = np.sum(bg_mask > 0) |
|
|
shadow_ratio = shadow_pixels / bg_pixels_count if bg_pixels_count > 0 else 0 |
|
|
|
|
|
if shadow_pixels > 0: |
|
|
shadow_values = l_channel[shadow_mask_clean > 0] |
|
|
shadow_mean = np.mean(shadow_values) |
|
|
contrast = bg_mean - shadow_mean |
|
|
else: |
|
|
shadow_mean = 0 |
|
|
contrast = 0 |
|
|
|
|
|
shadow_info = { |
|
|
'shadow_ratio': shadow_ratio, |
|
|
'shadow_pixels': shadow_pixels, |
|
|
'bg_mean': bg_mean, |
|
|
'shadow_mean': shadow_mean, |
|
|
'contrast': contrast, |
|
|
'bg_std': bg_std |
|
|
} |
|
|
|
|
|
print(f"[Shadow] Detected: {shadow_ratio*100:.1f}% of background, Contrast: {contrast:.1f}") |
|
|
|
|
|
return shadow_mask_clean, shadow_info |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[Shadow] Detection error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return None, None |
|
|
|
|
|
def create_shadow_visualization(self, image, shadow_mask, parsing_mask=None): |
|
|
|
|
|
try: |
|
|
h, w = image.shape[:2] |
|
|
overlay = image.copy() |
|
|
|
|
|
|
|
|
shadow_color = np.array([0, 0, 255], dtype=np.uint8) |
|
|
|
|
|
|
|
|
overlay[shadow_mask > 0] = cv2.addWeighted( |
|
|
overlay[shadow_mask > 0], 0.4, |
|
|
np.full_like(overlay[shadow_mask > 0], shadow_color), 0.6, |
|
|
0 |
|
|
) |
|
|
|
|
|
|
|
|
contours, _ = cv2.findContours(shadow_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
|
|
cv2.drawContours(overlay, contours, -1, (0, 0, 255), 2) |
|
|
|
|
|
|
|
|
legend_x = 20 |
|
|
legend_y = h - 60 |
|
|
|
|
|
overlay_bg = overlay.copy() |
|
|
cv2.rectangle(overlay_bg, (legend_x - 10, legend_y - 10), |
|
|
(legend_x + 200, legend_y + 35), (0, 0, 0), -1) |
|
|
cv2.addWeighted(overlay_bg, 0.7, overlay, 0.3, 0, overlay) |
|
|
|
|
|
cv2.putText(overlay, "Shadow Area", (legend_x + 35, legend_y + 10), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) |
|
|
|
|
|
cv2.rectangle(overlay, (legend_x, legend_y - 5), |
|
|
(legend_x + 25, legend_y + 15), (0, 0, 255), -1) |
|
|
cv2.rectangle(overlay, (legend_x, legend_y - 5), |
|
|
(legend_x + 25, legend_y + 15), (255, 255, 255), 1) |
|
|
|
|
|
|
|
|
shadow_ratio = np.sum(shadow_mask > 0) / (h * w) |
|
|
text = f"Shadow: {shadow_ratio*100:.1f}%" |
|
|
cv2.putText(overlay, text, (legend_x, legend_y - 25), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) |
|
|
|
|
|
return overlay |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[Shadow] Visualization error: {e}") |
|
|
return image |
|
|
|
|
|
|
|
|
def check_shadows_ai(self, image, landmarks, bg_mask, img_width, img_height): |
|
|
|
|
|
try: |
|
|
h, w = image.shape[:2] |
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
|
|
|
face_indices = [234, 127, 162, 21, 54, 103, 67, 109, 10, 338, 297, 332] |
|
|
face_points = [[int(landmarks.landmark[i].x * img_width), |
|
|
int(landmarks.landmark[i].y * img_height)] for i in face_indices] |
|
|
|
|
|
face_mask = np.zeros((h, w), dtype=np.uint8) |
|
|
cv2.fillPoly(face_mask, [np.array(face_points)], 255) |
|
|
face_pixels = gray[face_mask > 0] |
|
|
|
|
|
if len(face_pixels) > 0: |
|
|
face_mean = np.mean(face_pixels) |
|
|
face_std = np.std(face_pixels) |
|
|
dark_ratio = np.sum(face_pixels < face_mean - face_std * 1.5) / len(face_pixels) |
|
|
|
|
|
if dark_ratio < 0.15: |
|
|
self.add_result("Lighting", "Face Shadows", "pass", |
|
|
"No significant shadows on face", "Even lighting met") |
|
|
elif dark_ratio < 0.25: |
|
|
self.add_result("Lighting", "Face Shadows", "warning", |
|
|
"Slight shadows on face", "Lighting should be even") |
|
|
else: |
|
|
self.add_result("Lighting", "Face Shadows", "fail", |
|
|
"Shadows detected on face", "Must have even lighting") |
|
|
|
|
|
|
|
|
parsing_mask = None |
|
|
if hasattr(self, '_current_parsing_mask'): |
|
|
parsing_mask = self._current_parsing_mask |
|
|
|
|
|
|
|
|
shadow_mask, shadow_info = self.detect_shadow_mask(image, parsing_mask) |
|
|
|
|
|
if shadow_mask is not None and shadow_info is not None: |
|
|
|
|
|
self._shadow_mask = shadow_mask |
|
|
self._shadow_info = shadow_info |
|
|
|
|
|
shadow_ratio = shadow_info['shadow_ratio'] |
|
|
contrast = shadow_info['contrast'] |
|
|
|
|
|
|
|
|
if shadow_ratio < 0.05: |
|
|
self.add_result("Lighting", "Background Shadows", "pass", |
|
|
f"Minimal background shadow ({shadow_ratio*100:.1f}%, contrast: {contrast:.0f})", |
|
|
"Natural shadow acceptable") |
|
|
|
|
|
elif shadow_ratio < 0.15: |
|
|
if contrast < 40: |
|
|
self.add_result("Lighting", "Background Shadows", "pass", |
|
|
f"Slight shadow with low contrast ({shadow_ratio*100:.1f}%, contrast: {contrast:.0f})", |
|
|
"Acceptable shadow level") |
|
|
else: |
|
|
self.add_result("Lighting", "Background Shadows", "warning", |
|
|
f"Moderate shadow detected ({shadow_ratio*100:.1f}%, contrast: {contrast:.0f})", |
|
|
"Consider repositioning lighting or moving away from wall") |
|
|
|
|
|
else: |
|
|
self.add_result("Lighting", "Background Shadows", "fail", |
|
|
f"Strong shadow cast on background ({shadow_ratio*100:.1f}%, contrast: {contrast:.0f})", |
|
|
"Move away from background or improve lighting setup") |
|
|
else: |
|
|
|
|
|
print("[Shadow] Using fallback method") |
|
|
bg_gray_pixels = gray[bg_mask > 0] |
|
|
|
|
|
if len(bg_gray_pixels) > 0: |
|
|
bg_mean = np.mean(bg_gray_pixels) |
|
|
bg_std = np.std(bg_gray_pixels) |
|
|
|
|
|
if bg_std < 15: |
|
|
self.add_result("Lighting", "Background Shadows", "pass", |
|
|
f"Background uniform (STD: {bg_std:.1f})", "No significant shadows") |
|
|
elif bg_std < 25: |
|
|
self.add_result("Lighting", "Background Shadows", "warning", |
|
|
"Slight variation in background", "May indicate shadow") |
|
|
else: |
|
|
self.add_result("Lighting", "Background Shadows", "fail", |
|
|
"Non-uniform background detected", "Check for shadows") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[AI] Shadow check error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
self.add_result("Lighting", "Shadows", "pass", |
|
|
"Unable to verify", "Manual review recommended") |
|
|
|
|
|
|
|
|
def check_image_from_base64(self, image_base64): |
|
|
|
|
|
try: |
|
|
self.results = [] |
|
|
self._current_parsing_mask = None |
|
|
self._debug_data = {} |
|
|
|
|
|
|
|
|
image_bytes = base64.b64decode(image_base64) |
|
|
nparr = np.frombuffer(image_bytes, np.uint8) |
|
|
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
|
|
|
|
|
if image is None: |
|
|
raise ValueError("Failed to decode") |
|
|
|
|
|
h, w = image.shape[:2] |
|
|
|
|
|
|
|
|
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
|
image_pil = Image.fromarray(image_rgb) |
|
|
|
|
|
|
|
|
parsing_mask = predict_face_parsing(image_pil) |
|
|
|
|
|
if parsing_mask is not None: |
|
|
self._current_parsing_mask = parsing_mask |
|
|
print(f"[Checker] ✓ Parsing mask saved") |
|
|
|
|
|
|
|
|
self.check_dimensions(image) |
|
|
self.check_color_depth(image) |
|
|
face_landmarks = self.check_face_detection(image) |
|
|
|
|
|
if face_landmarks: |
|
|
bg_mask = self._get_background_mask(image, face_landmarks, w, h) |
|
|
|
|
|
self.check_head_proportions(face_landmarks, h, w) |
|
|
self.check_face_angle(face_landmarks, w, h) |
|
|
|
|
|
|
|
|
if parsing_mask is not None: |
|
|
self.check_eyeglasses_parsing(parsing_mask, image) |
|
|
self.check_headwear_parsing(parsing_mask, image) |
|
|
self.check_eyes_open_parsing(parsing_mask, face_landmarks, w, h, image) |
|
|
self.check_jewelry_parsing(parsing_mask, image) |
|
|
self.check_face_covering_parsing(parsing_mask, face_landmarks, w, h, image) |
|
|
else: |
|
|
print("[Checker] Parsing unavailable, using fallback") |
|
|
self.check_eyes_open_cv(face_landmarks, h) |
|
|
self.check_eyeglasses_cv_fallback(image, face_landmarks, w, h) |
|
|
self.check_headwear_cv_fallback(image, face_landmarks, w, h) |
|
|
|
|
|
|
|
|
self.check_background_ai(image, face_landmarks, bg_mask, w, h) |
|
|
self.check_shadows_ai(image, face_landmarks, bg_mask, w, h) |
|
|
self.check_red_eye(image, face_landmarks, w, h) |
|
|
self.check_image_quality(image) |
|
|
|
|
|
|
|
|
html_report = self.generate_html_report() |
|
|
|
|
|
|
|
|
parsing_colored_b64 = None |
|
|
parsing_overlay_b64 = None |
|
|
debug_face_height_b64 = None |
|
|
shadow_viz_b64 = None |
|
|
|
|
|
if parsing_mask is not None: |
|
|
unique_labels = np.unique(parsing_mask).tolist() |
|
|
|
|
|
|
|
|
colored = create_colored_mask(parsing_mask) |
|
|
colored_legend = add_legend_to_image(colored, unique_labels) |
|
|
_, buf1 = cv2.imencode('.jpg', cv2.cvtColor(colored_legend, cv2.COLOR_RGB2BGR)) |
|
|
parsing_colored_b64 = base64.b64encode(buf1).decode('utf-8') |
|
|
|
|
|
|
|
|
overlay = create_transparent_overlay(image_pil, parsing_mask, alpha=0.4) |
|
|
overlay_legend = add_legend_to_image(overlay, unique_labels) |
|
|
_, buf2 = cv2.imencode('.jpg', cv2.cvtColor(overlay_legend, cv2.COLOR_RGB2BGR)) |
|
|
parsing_overlay_b64 = base64.b64encode(buf2).decode('utf-8') |
|
|
|
|
|
print("[Checker] ✓ Visualizations created") |
|
|
|
|
|
|
|
|
if face_landmarks and hasattr(self, '_debug_data') and self._debug_data: |
|
|
try: |
|
|
eye_indices = [33, 133, 362, 263] |
|
|
eye_y = np.mean([face_landmarks.landmark[i].y for i in eye_indices]) * h |
|
|
|
|
|
mask_resized = cv2.resize(parsing_mask.astype(np.uint8), (w, h), |
|
|
interpolation=cv2.INTER_NEAREST) |
|
|
|
|
|
|
|
|
chin_region = mask_resized[int(h*0.4):, :] |
|
|
skin_pixels_y, _ = np.where(chin_region == 1) |
|
|
chin_y = (np.max(skin_pixels_y) + int(h*0.4)) if len(skin_pixels_y) > 0 else face_landmarks.landmark[152].y * h |
|
|
|
|
|
|
|
|
hair_pixels = np.sum(mask_resized == 13) |
|
|
if hair_pixels > 500: |
|
|
hair_y_coords, _ = np.where(mask_resized == 13) |
|
|
top_of_head = np.min(hair_y_coords) if len(hair_y_coords) > 0 else eye_y - 200 |
|
|
else: |
|
|
head_region = mask_resized[:int(h*0.5), :] |
|
|
skin_head_y, _ = np.where(head_region == 1) |
|
|
if len(skin_head_y) > 0: |
|
|
top_of_head = np.min(skin_head_y) |
|
|
else: |
|
|
forehead_indices = [10, 338, 297, 332, 284, 251, 389, 356, 454] |
|
|
min_forehead_y = min([face_landmarks.landmark[i].y for i in forehead_indices]) * h |
|
|
top_of_head = max(0, min_forehead_y - (min_forehead_y - eye_y) * 0.65) |
|
|
|
|
|
|
|
|
debug_img = self._create_face_height_debug_image( |
|
|
image, parsing_mask, top_of_head, chin_y, eye_y, h, w |
|
|
) |
|
|
|
|
|
_, buf_debug = cv2.imencode('.jpg', debug_img, [cv2.IMWRITE_JPEG_QUALITY, 95]) |
|
|
debug_face_height_b64 = base64.b64encode(buf_debug).decode('utf-8') |
|
|
|
|
|
print("[Checker] ✓ Debug image created") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[Checker] Debug image failed: {e}") |
|
|
|
|
|
|
|
|
if hasattr(self, '_shadow_mask') and self._shadow_mask is not None: |
|
|
try: |
|
|
shadow_viz = self.create_shadow_visualization( |
|
|
image, self._shadow_mask, parsing_mask |
|
|
) |
|
|
_, buf_shadow = cv2.imencode('.jpg', shadow_viz, [cv2.IMWRITE_JPEG_QUALITY, 95]) |
|
|
shadow_viz_b64 = base64.b64encode(buf_shadow).decode('utf-8') |
|
|
print("[Checker] ✓ Shadow visualization created") |
|
|
except Exception as e: |
|
|
print(f"[Checker] Shadow visualization failed: {e}") |
|
|
|
|
|
|
|
|
self._current_parsing_mask = None |
|
|
self._debug_data = {} |
|
|
if hasattr(self, '_shadow_mask'): |
|
|
delattr(self, '_shadow_mask') |
|
|
if hasattr(self, '_shadow_info'): |
|
|
delattr(self, '_shadow_info') |
|
|
|
|
|
return { |
|
|
'service_type': 'checking', |
|
|
'html_report': html_report, |
|
|
'results': self.results, |
|
|
'parsing_colored_mask': parsing_colored_b64, |
|
|
'parsing_transparent_overlay': parsing_overlay_b64, |
|
|
'debug_face_height': debug_face_height_b64, |
|
|
'shadow_visualization': shadow_viz_b64 |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
raise Exception(f"Checking error: {str(e)}") |
|
|
|
|
|
def _create_face_height_debug_image(self, image, parsing_mask, top_of_head, chin_y, eye_y, img_h, img_w): |
|
|
|
|
|
h, w = img_h, img_w |
|
|
debug_img = image.copy() |
|
|
|
|
|
RED, GREEN, BLUE = (0,0,255), (0,255,0), (255,0,0) |
|
|
YELLOW, WHITE, BLACK = (0,255,255), (255,255,255), (0,0,0) |
|
|
|
|
|
top_of_head, chin_y, eye_y = int(top_of_head), int(chin_y), int(eye_y) |
|
|
|
|
|
|
|
|
cv2.line(debug_img, (0, top_of_head), (w, top_of_head), RED, 3) |
|
|
cv2.line(debug_img, (0, chin_y), (w, chin_y), GREEN, 3) |
|
|
cv2.line(debug_img, (0, eye_y), (w, eye_y), BLUE, 2) |
|
|
|
|
|
|
|
|
measurement_x = int(w * 0.1) |
|
|
cv2.line(debug_img, (measurement_x, top_of_head), (measurement_x, chin_y), YELLOW, 4) |
|
|
cv2.arrowedLine(debug_img, (measurement_x, top_of_head+20), |
|
|
(measurement_x, top_of_head), YELLOW, 3, tipLength=0.3) |
|
|
cv2.arrowedLine(debug_img, (measurement_x, chin_y-20), |
|
|
(measurement_x, chin_y), YELLOW, 3, tipLength=0.3) |
|
|
|
|
|
|
|
|
face_height_pixels = chin_y - top_of_head |
|
|
face_height_ratio = (face_height_pixels / h) * 100 |
|
|
eye_from_bottom = 100 - (eye_y / h) * 100 |
|
|
|
|
|
face_status = "PASS ✓" if 50 <= face_height_ratio <= 69 else "FAIL ✗" |
|
|
eye_status = "PASS ✓" if 56 <= eye_from_bottom <= 69 else "FAIL ✗" |
|
|
|
|
|
|
|
|
annotations = [ |
|
|
(f"IMAGE: {h}px", 30, WHITE, 1.5), |
|
|
(f"TOP: {top_of_head}px", 60, RED, 1.5), |
|
|
(f"CHIN: {chin_y}px", 85, GREEN, 1.5), |
|
|
(f"EYE: {eye_y}px", 110, BLUE, 1.5), |
|
|
(f"", 130, WHITE, 0.5), |
|
|
(f"FACE: {face_height_pixels}px", 150, YELLOW, 1.5), |
|
|
(f"FACE %: {face_height_ratio:.1f}% {face_status}", 180, YELLOW, 2), |
|
|
(f"Required: 50-69%", 205, YELLOW, 1.5), |
|
|
(f"", 225, WHITE, 0.5), |
|
|
(f"EYE: {eye_from_bottom:.1f}% {eye_status}", 245, BLUE, 2), |
|
|
(f"Required: 56-69%", 270, BLUE, 1.5), |
|
|
] |
|
|
|
|
|
|
|
|
overlay = debug_img.copy() |
|
|
cv2.rectangle(overlay, (10, 10), (350, 290), BLACK, -1) |
|
|
cv2.addWeighted(overlay, 0.75, debug_img, 0.25, 0, debug_img) |
|
|
|
|
|
|
|
|
for text, y_pos, color, thickness in annotations: |
|
|
if text: |
|
|
cv2.putText(debug_img, text, (20, y_pos), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, int(thickness)) |
|
|
|
|
|
|
|
|
legend_x, legend_y = w - 220, 30 |
|
|
overlay = debug_img.copy() |
|
|
cv2.rectangle(overlay, (legend_x-10, legend_y-10), (w-10, legend_y+120), BLACK, -1) |
|
|
cv2.addWeighted(overlay, 0.75, debug_img, 0.25, 0, debug_img) |
|
|
|
|
|
cv2.putText(debug_img, "Legend:", (legend_x, legend_y+20), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, WHITE, 2) |
|
|
|
|
|
legends = [("Top", RED, 45), ("Chin", GREEN, 70), ("Eye", BLUE, 95), ("Height", YELLOW, 120)] |
|
|
|
|
|
for label, color, y_offset in legends: |
|
|
y_pos = legend_y + y_offset |
|
|
cv2.line(debug_img, (legend_x, y_pos), (legend_x+30, y_pos), color, 3) |
|
|
cv2.putText(debug_img, label, (legend_x+40, y_pos+5), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, WHITE, 1) |
|
|
|
|
|
|
|
|
overall_status = "PASS" if (50 <= face_height_ratio <= 69 and 56 <= eye_from_bottom <= 69) else "REVIEW" |
|
|
status_color = GREEN if overall_status == "PASS" else RED |
|
|
|
|
|
overlay = debug_img.copy() |
|
|
cv2.rectangle(overlay, (w//2-150, h-60), (w//2+150, h-10), BLACK, -1) |
|
|
cv2.addWeighted(overlay, 0.75, debug_img, 0.25, 0, debug_img) |
|
|
|
|
|
cv2.putText(debug_img, f"Status: {overall_status}", |
|
|
(w//2-130, h-25), cv2.FONT_HERSHEY_SIMPLEX, 0.9, status_color, 2) |
|
|
|
|
|
return debug_img |
|
|
|
|
|
def generate_html_report(self): |
|
|
|
|
|
passed = sum(1 for r in self.results if r['status'] == 'pass') |
|
|
failed = sum(1 for r in self.results if r['status'] == 'fail') |
|
|
warnings = sum(1 for r in self.results if r['status'] == 'warning') |
|
|
|
|
|
if failed == 0 and warnings == 0: |
|
|
overall, color, subtitle = "✓ ALL CHECKS PASSED", "#28a745", "Your photo meets all requirements" |
|
|
elif failed == 0: |
|
|
overall, color, subtitle = "⚠ MINOR ISSUES", "#ffc107", "Photo acceptable but could be improved" |
|
|
else: |
|
|
overall, color, subtitle = "✗ REQUIREMENTS NOT MET", "#dc3545", "Please review issues and consider retaking" |
|
|
|
|
|
html = f''' |
|
|
<div style="font-family: Arial, sans-serif; max-width: 900px; margin: 0 auto;"> |
|
|
<div style="padding: 25px; background: {color}; color: white; border-radius: 10px; margin-bottom: 25px; text-align: center;"> |
|
|
<h2 style="margin: 0; font-size: 28px;">{overall}</h2> |
|
|
<p style="margin: 10px 0 0; font-size: 16px;">{subtitle}</p> |
|
|
<p style="margin: 10px 0 0;"> |
|
|
<strong>Passed:</strong> {passed} | <strong>Warnings:</strong> {warnings} | <strong>Failed:</strong> {failed} |
|
|
</p> |
|
|
<p style="margin: 10px 0 0; font-size: 13px; opacity: 0.9;"> |
|
|
✨ AI-Enhanced (Face Detection + Smart Background) |
|
|
</p> |
|
|
</div> |
|
|
''' |
|
|
|
|
|
categories = {} |
|
|
for result in self.results: |
|
|
cat = result['category'] |
|
|
if cat not in categories: |
|
|
categories[cat] = [] |
|
|
categories[cat].append(result) |
|
|
|
|
|
for category, results in categories.items(): |
|
|
html += f'<div style="margin-bottom: 30px;">' |
|
|
html += f'<h3 style="border-bottom: 3px solid #0073aa; padding-bottom: 10px; color: #0073aa;">{category}</h3>' |
|
|
|
|
|
for result in results: |
|
|
if result['status'] == 'pass': |
|
|
icon, bg, border, text_color = "✓", "#d4edda", "#28a745", "#155724" |
|
|
elif result['status'] == 'warning': |
|
|
icon, bg, border, text_color = "⚠", "#fff3cd", "#ffc107", "#856404" |
|
|
else: |
|
|
icon, bg, border, text_color = "✗", "#f8d7da", "#dc3545", "#721c24" |
|
|
|
|
|
html += f''' |
|
|
<div style="background: {bg}; padding: 18px; margin: 12px 0; border-radius: 8px; border-left: 5px solid {border};"> |
|
|
<h4 style="margin: 0 0 8px 0; color: {text_color}; font-size: 18px;"> |
|
|
{icon} {result["requirement"]} |
|
|
</h4> |
|
|
<p style="margin: 5px 0; color: {text_color}; font-size: 15px;"> |
|
|
<strong>{result["message"]}</strong> |
|
|
</p> |
|
|
<p style="margin: 8px 0 0 0; color: {text_color}; font-size: 14px;"> |
|
|
{result["details"]} |
|
|
</p> |
|
|
</div> |
|
|
''' |
|
|
|
|
|
html += '</div>' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
html += ''' |
|
|
<div style="background: #e7f3ff; padding: 20px; border-radius: 8px; border-left: 5px solid #0073aa; margin-top: 30px;"> |
|
|
<h3 style="margin: 0 0 15px 0; color: #0073aa;">📋 Official U.S. Visa Photo Requirements</h3> |
|
|
<ul style="margin: 10px 0; padding-left: 25px; line-height: 1.8;"> |
|
|
<li><strong>Color:</strong> Must be in color (24-bit)</li> |
|
|
<li><strong>Size:</strong> Head must be 50-69% of image height (1 to 1 3/8 inches or 22-35mm from chin to top of head)</li> |
|
|
<li><strong>Recent:</strong> Taken within last 6 months to reflect current appearance</li> |
|
|
<li><strong>Background:</strong> Plain white or off-white background with no shadows</li> |
|
|
<li><strong>Position:</strong> Full-face view directly facing camera</li> |
|
|
<li><strong>Expression:</strong> Neutral facial expression with both eyes open</li> |
|
|
<li><strong>Clothing:</strong> Everyday clothing (no uniforms except religious clothing worn daily)</li> |
|
|
<li><strong>Head Covering:</strong> No hats unless religious head covering worn daily. Full face must be visible, no shadows on face</li> |
|
|
<li><strong>Eyeglasses:</strong> Not allowed (policy updated). Exception only for medical reasons with doctor's statement</li> |
|
|
<li><strong>Devices:</strong> No headphones, wireless devices, or similar items</li> |
|
|
<li><strong>Quality:</strong> Sharp focus, proper lighting, no red-eye, not grainy</li> |
|
|
</ul> |
|
|
<p style="margin: 15px 0 5px 0; font-size: 14px;"> |
|
|
<strong>📸 Tips for Best Results:</strong><br> |
|
|
• Use a white blanket or sheet as background if wall is not white<br> |
|
|
• Ensure even lighting with no shadows on face or background<br> |
|
|
• Stand 4-5 feet away from background to avoid shadows<br> |
|
|
• Use natural light or diffused indoor lighting<br> |
|
|
• Avoid grainy photos - use good quality printer if printing<br> |
|
|
• Do not use photos from driver's licenses or copied from other documents<br> |
|
|
• No selfies or full-length photos |
|
|
</p> |
|
|
<p style="margin: 15px 0 0 0; font-size: 13px; color: #666;"> |
|
|
For complete requirements and photo examples, visit:<br> |
|
|
<a href="https://travel.state.gov/content/travel/en/us-visas/visa-information-resources/photos.html" |
|
|
target="_blank" |
|
|
style=" |
|
|
display: inline-block; |
|
|
margin-top: 10px; |
|
|
background: linear-gradient(135deg, #0073aa, #005f8d); |
|
|
color: white; |
|
|
text-decoration: none; |
|
|
padding: 10px 18px; |
|
|
border-radius: 8px; |
|
|
font-weight: 500; |
|
|
box-shadow: 0 4px 10px rgba(0,0,0,0.15); |
|
|
transition: all 0.25s ease; |
|
|
" |
|
|
onmouseover="this.style.transform='translateY(-3px)'; this.style.boxShadow='0 8px 15px rgba(0,0,0,0.25)';" |
|
|
onmouseout="this.style.transform='translateY(0)'; this.style.boxShadow='0 4px 10px rgba(0,0,0,0.15)';"> |
|
|
U.S. Department of State – Photo Requirements |
|
|
</a> |
|
|
</p> |
|
|
|
|
|
</div> |
|
|
''' |
|
|
|
|
|
html += '</div>' |
|
|
|
|
|
return html |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
WORKER_ID = os.getenv('WORKER_ID', 'worker-1') |
|
|
ORCHESTRATOR_URL = os.getenv('ORCHESTRATOR_URL', '') |
|
|
WORKER_URL = os.getenv('WORKER_URL', '') |
|
|
|
|
|
current_status = 'idle' |
|
|
current_job = None |
|
|
total_processed = 0 |
|
|
status_lock = threading.Lock() |
|
|
|
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("PASSPORT PHOTO WORKER - XENOVA VERSION") |
|
|
print("="*60) |
|
|
|
|
|
print("[Init] PassportPhotoProcessor...") |
|
|
processor = PassportPhotoProcessor() |
|
|
print("[Init] ✓ Processor ready") |
|
|
|
|
|
print("[Init] PhotoRequirementsChecker...") |
|
|
checker = PhotoRequirementsChecker() |
|
|
print("[Init] ✓ Checker ready") |
|
|
|
|
|
print("[Init] Initializing Xenova Face Parsing...") |
|
|
try: |
|
|
if init_face_parser(): |
|
|
print("[Init] ✓ Xenova model loaded!") |
|
|
else: |
|
|
print("[Init] ⚠ Will initialize on first use") |
|
|
except Exception as e: |
|
|
print(f"[Init] ⚠ Error: {e}") |
|
|
|
|
|
print("="*60) |
|
|
print("WORKER READY") |
|
|
print("="*60 + "\n") |
|
|
|
|
|
@app.route('/ai_status', methods=['GET']) |
|
|
def ai_status(): |
|
|
return jsonify({ |
|
|
'face_parsing': { |
|
|
'available': FACE_PARSING_AVAILABLE, |
|
|
'model': 'Xenova/face-parsing' if FACE_PARSING_AVAILABLE else 'N/A' |
|
|
}, |
|
|
'transformers_available': TRANSFORMERS_AVAILABLE, |
|
|
'worker_id': WORKER_ID, |
|
|
'status': current_status |
|
|
}) |
|
|
|
|
|
@app.route('/health', methods=['GET']) |
|
|
def health_check(): |
|
|
with status_lock: |
|
|
return jsonify({ |
|
|
'status': current_status, |
|
|
'worker_id': WORKER_ID, |
|
|
'total_processed': total_processed, |
|
|
'current_job': current_job, |
|
|
'timestamp': datetime.now().isoformat() |
|
|
}) |
|
|
|
|
|
@app.route('/process', methods=['POST']) |
|
|
def process_job(): |
|
|
global current_status, current_job |
|
|
|
|
|
with status_lock: |
|
|
if current_status == 'busy': |
|
|
return jsonify({'error': 'Worker busy'}), 503 |
|
|
|
|
|
data = request.json |
|
|
if not data or 'unique_id' not in data: |
|
|
return jsonify({'error': 'Missing unique_id'}), 400 |
|
|
|
|
|
unique_id = data['unique_id'] |
|
|
service_type = data.get('service_type', 'processing') |
|
|
image_data = data.get('image_data') |
|
|
|
|
|
if not image_data: |
|
|
return jsonify({'error': 'Missing image_data'}), 400 |
|
|
|
|
|
with status_lock: |
|
|
current_status = 'busy' |
|
|
current_job = unique_id |
|
|
|
|
|
thread = threading.Thread(target=process_job_async, args=(unique_id, service_type, image_data)) |
|
|
thread.daemon = True |
|
|
thread.start() |
|
|
|
|
|
return jsonify({'message': 'Job accepted', 'worker_id': WORKER_ID}), 200 |
|
|
|
|
|
def process_job_async(unique_id, service_type, image_data): |
|
|
global current_status, current_job, total_processed |
|
|
|
|
|
try: |
|
|
print(f"[{WORKER_ID}] Processing job {unique_id} - Type: {service_type}") |
|
|
|
|
|
if service_type == 'processing': |
|
|
result = processor.process_image_from_base64(image_data) |
|
|
else: |
|
|
result = checker.check_image_from_base64(image_data) |
|
|
|
|
|
print(f"[{WORKER_ID}] Job {unique_id} completed") |
|
|
send_result_to_orchestrator(unique_id, 'completed', result) |
|
|
|
|
|
with status_lock: |
|
|
total_processed += 1 |
|
|
|
|
|
except ValueError as ve: |
|
|
|
|
|
error_msg = str(ve) |
|
|
print(f"[{WORKER_ID}] Job {unique_id} failed (ValueError): {error_msg}") |
|
|
send_result_to_orchestrator(unique_id, 'failed', { |
|
|
'error': error_msg, |
|
|
'error_type': 'validation_error' |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
error_msg = str(e) |
|
|
if not error_msg or error_msg == '': |
|
|
error_msg = "Processing failed due to an unknown error" |
|
|
|
|
|
print(f"[{WORKER_ID}] Job {unique_id} failed: {error_msg}") |
|
|
print(traceback.format_exc()) |
|
|
|
|
|
send_result_to_orchestrator(unique_id, 'failed', { |
|
|
'error': error_msg, |
|
|
'error_type': 'processing_error' |
|
|
}) |
|
|
|
|
|
finally: |
|
|
with status_lock: |
|
|
current_status = 'idle' |
|
|
current_job = None |
|
|
send_heartbeat() |
|
|
|
|
|
def send_result_to_orchestrator(unique_id, status, result): |
|
|
if not ORCHESTRATOR_URL: |
|
|
print(f"[{WORKER_ID}] No orchestrator URL") |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
payload = { |
|
|
'unique_id': unique_id, |
|
|
'worker_id': WORKER_ID, |
|
|
'status': status, |
|
|
'result': result |
|
|
} |
|
|
|
|
|
|
|
|
if status == 'failed': |
|
|
if isinstance(result, dict) and 'error' not in result: |
|
|
payload['result'] = {'error': 'Processing failed'} |
|
|
elif isinstance(result, str): |
|
|
payload['result'] = {'error': result} |
|
|
|
|
|
print(f"[{WORKER_ID}] Sending result for {unique_id} - Status: {status}") |
|
|
if status == 'failed': |
|
|
print(f"[{WORKER_ID}] Error message: {payload['result'].get('error', 'Unknown')}") |
|
|
|
|
|
response = requests.post( |
|
|
f"{ORCHESTRATOR_URL}/worker/result", |
|
|
json=payload, |
|
|
timeout=30 |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
print(f"[{WORKER_ID}] Result sent for {unique_id}") |
|
|
else: |
|
|
print(f"[{WORKER_ID}] Result send failed: {response.status_code}") |
|
|
print(f"[{WORKER_ID}] Response: {response.text}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[{WORKER_ID}] Result send error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
def send_heartbeat(): |
|
|
if not ORCHESTRATOR_URL: |
|
|
return |
|
|
|
|
|
try: |
|
|
with status_lock: |
|
|
heartbeat_data = { |
|
|
'worker_id': WORKER_ID, |
|
|
'status': current_status, |
|
|
'url': WORKER_URL, |
|
|
'total_processed': total_processed, |
|
|
'current_job': current_job |
|
|
} |
|
|
|
|
|
response = requests.post( |
|
|
f"{ORCHESTRATOR_URL}/worker/heartbeat", |
|
|
json=heartbeat_data, |
|
|
timeout=10 |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
print(f"[{WORKER_ID}] Heartbeat sent - Status: {heartbeat_data['status']}") |
|
|
else: |
|
|
print(f"[{WORKER_ID}] Heartbeat failed: {response.status_code}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[{WORKER_ID}] Heartbeat error: {e}") |
|
|
|
|
|
def periodic_heartbeat(): |
|
|
while True: |
|
|
try: |
|
|
time.sleep(30) |
|
|
send_heartbeat() |
|
|
except Exception as e: |
|
|
print(f"[{WORKER_ID}] Periodic heartbeat error: {e}") |
|
|
time.sleep(30) |
|
|
|
|
|
|
|
|
print(f"[{WORKER_ID}] Starting heartbeat thread...") |
|
|
heartbeat_thread = threading.Thread(target=periodic_heartbeat, daemon=True) |
|
|
heartbeat_thread.start() |
|
|
|
|
|
if __name__ == '__main__': |
|
|
print(f"=" * 60) |
|
|
print(f"Worker: {WORKER_ID}") |
|
|
print(f"Orchestrator: {ORCHESTRATOR_URL if ORCHESTRATOR_URL else 'Not configured'}") |
|
|
print(f"Worker URL: {WORKER_URL if WORKER_URL else 'Not configured'}") |
|
|
print(f"=" * 60) |
|
|
|
|
|
if ORCHESTRATOR_URL: |
|
|
print(f"[{WORKER_ID}] Sending initial heartbeat...") |
|
|
send_heartbeat() |
|
|
else: |
|
|
print(f"[{WORKER_ID}] ⚠ Standalone mode") |
|
|
|
|
|
port = int(os.getenv('PORT', 7860)) |
|
|
print(f"[{WORKER_ID}] Starting Flask on port {port}...") |
|
|
|
|
|
app.run(host='0.0.0.0', port=port, threaded=True) |