import ctypes import glob import os import platform import sys import time from dataclasses import dataclass import cv2 import numpy as np from eve_messages import ModelConfig from eve_python import eve_sdk as sdk from eve_python.structs.CFaceIdStructs import ( EveFaceIdCommand, EveFaceIdIdentificationStatus, ) from log_utils import setup_logger logger = setup_logger("EveWrapper") def _pack_cstring(dst, value: str) -> None: """Write ``value`` as a null-terminated UTF-8 string into a ``c_byte`` array. ``c_byte`` is signed (-128..127), so bytes above 127 are folded into their signed two's-complement form — the underlying memory is identical to an unsigned ``char[]`` on the C side. The array is fully written so any stale bytes from a recycled options struct are cleared. Args: dst: A ctypes ``c_byte`` array (the destination buffer). value: The string to store; must fit with room for the null terminator. Raises: ValueError: If ``value`` does not leave room for a null terminator. """ raw = value.encode("utf-8") capacity = len(dst) if len(raw) >= capacity: raise ValueError(f"{value!r} ({len(raw)} bytes) exceeds buffer of {capacity}") for i in range(capacity): b = raw[i] if i < len(raw) else 0 dst[i] = b - 256 if b > 127 else b def _pack_class_names(dst, names: tuple[str, ...]) -> None: """Write ``names`` into a 2-D ``c_byte`` class-name table, one row each. Args: dst: A ctypes 2-D ``c_byte`` array (rows of fixed-width names). names: Class labels to store, in output-index order. Raises: ValueError: If there are more names than table rows. """ if len(names) > len(dst): raise ValueError(f"{len(names)} class names exceed table of {len(dst)} rows") for i, name in enumerate(names): _pack_cstring(dst[i], name) _is_windows = platform.system() == "Windows" DO_FAKE_MIRROR = True # Gate timing instrumentation behind the profiler env var _TIMING_ENABLED = os.environ.get("ENABLE_PROFILER", "").strip() not in ("", "0", "false") @dataclass class CalibrationResult: """Result of a face ID calibration attempt.""" success: bool user_id: int message: str class EveWrapper: def __init__(self, eve_bin_path="", eve_lib_path=""): self._ensure_config_dir() eve_bin_path, eve_lib_path = self._resolve_eve_paths(eve_bin_path, eve_lib_path) self._mirror = False self._inference_frame_count = 0 self._last_sent_frame: np.ndarray | None = None # Per-call timing: {name: [call_count, total_seconds]} self._timings: dict[str, list[float]] = {} self._timing_enabled = _TIMING_ENABLED image_provider = sdk.structs.EveImageProvider.EVE_CLIENT_PROVIDED self.eve_sdk = self._load_and_create_eve(eve_bin_path, eve_lib_path, image_provider) if image_provider == sdk.structs.EveImageProvider.EVE_CAMERA: self._set_camera() image_request = sdk.structs.EveImageFormatRequest( location=sdk.structs.EveImageLocation.EVE_CPU, format=sdk.structs.EveVideoFormat.EVE_BGRA, ) self.eve_sdk.EveConfigureProcessedImage(image_request) err = self.eve_sdk.StartEveWithParameters( sdk.structs.EveProcessingParameters(type=sdk.structs.EveProcessingPipelineType.EVE_HMI) ) if err != sdk.structs.EveError.EVE_ERROR_NO_ERROR: logger.info(f"StartEveWithParameters error code: {err}") sys.exit(err) logger.info("EVE initialized") def _record_timing(self, name: str, elapsed: float) -> None: entry = self._timings.get(name) if entry is None: self._timings[name] = [1, elapsed] else: entry[0] += 1 entry[1] += elapsed def get_timing_stats(self, reset: bool = False) -> dict[str, tuple[int, float]]: """Return accumulated per-call timings as {name: (count, total_seconds)}. Args: reset: If True, clear the accumulators after reading. """ result = {k: (int(v[0]), v[1]) for k, v in self._timings.items()} if reset: self._timings.clear() return result def inference(self, image: np.ndarray) -> np.ndarray: t = self._timing_enabled if self._mirror and DO_FAKE_MIRROR: if t: _t0 = time.perf_counter() image = cv2.flip(image, 1) if t: self._record_timing("flip", time.perf_counter() - _t0) if t: _t0 = time.perf_counter() if not self._send_frame(image): return image if t: self._record_timing("EveSendImageForProcessing", time.perf_counter() - _t0) if t: _t0 = time.perf_counter() processed_image = self.eve_sdk.EveGetProcessedImage() if t: self._record_timing("EveGetProcessedImage", time.perf_counter() - _t0) if processed_image.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR: logger.info(f"EveGetProcessedImage() error code: {processed_image.error}") sys.exit(processed_image.error) if t: _t0 = time.perf_counter() img = np.ctypeslib.as_array( processed_image.data, shape=(processed_image.height, processed_image.width, processed_image.channels), ).copy() del processed_image if t: self._record_timing("as_array+copy", time.perf_counter() - _t0) if img.shape[2] == 2: if t: _t0 = time.perf_counter() img = cv2.cvtColor(img, cv2.COLOR_YUV2BGR_YUYV) if t: self._record_timing("cvtColor", time.perf_counter() - _t0) elif img.shape[2] == 4: if t: _t0 = time.perf_counter() img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR) if t: self._record_timing("cvtColor", time.perf_counter() - _t0) elif img.shape[2] == 3: if t: _t0 = time.perf_counter() img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) if t: self._record_timing("cvtColor", time.perf_counter() - _t0) # SDK requires all outputs to be consumed before the next frame if t: _t0 = time.perf_counter() person_detection_data = self.eve_sdk.EveGetPersonDetectionData() if t: self._record_timing("EveGetPersonDetectionData", time.perf_counter() - _t0) if person_detection_data.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR: logger.info(f"EveGetPersonDetectionData() error code: {person_detection_data.error}") sys.exit(person_detection_data.error) del person_detection_data if t: _t0 = time.perf_counter() all_faces = self.eve_sdk.EveGetAllFaceData() if t: self._record_timing("EveGetAllFaceData", time.perf_counter() - _t0) if all_faces.errorCode == sdk.structs.EveError.EVE_ERROR_NO_ERROR: face_count = all_faces.faceData.contents.detectedFacesCount if face_count > 0: self._inference_frame_count += 1 # Log face ID data periodically to avoid flooding if self._inference_frame_count % self._INFERENCE_LOG_INTERVAL == 1: self._log_all_faces("inference", self._inference_frame_count, all_faces) del all_faces if img.shape[2] in (1, 3): return img print( f"WRONG FORMAT: {img.shape} " "(Consider converting it above here, or making EveImageFormatRequest work)" ) return None def reset_pipeline(self) -> None: """Reset the EVE processing pipeline to a clean state. Pipeline state (face tracking, ideal-user selection) persists on the long-lived SDK handle across inference calls. Workers are reused across offline video jobs, so without a reset the second video inherits the first run's pipeline state — most visibly, no ideal user is selected. Call this before processing each new video. """ err = self.eve_sdk.EveResetPipeline() if err != sdk.structs.EveError.EVE_ERROR_NO_ERROR: logger.warning(f"EveResetPipeline error code: {err}") def shutdown(self) -> None: """Cleanly shut down the Eve SDK instance.""" try: self.eve_sdk.ShutdownEve() logger.info("EVE shut down") except Exception as exc: logger.warning(f"EVE shutdown error: {exc}") def enable_mirror(self, enabled: bool = True) -> None: """Enable or disable image mirroring.""" if DO_FAKE_MIRROR: self._mirror = enabled return options = sdk.structs.EveImageManipulationOptions() options.settings.mirrorImage = 1 if enabled else 0 result = self.eve_sdk.EveConfigureImageManipulation(options) if result.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR: logger.info(f"EveConfigureImageManipulation() error code: {result.errorCode}") sys.exit(result.errorCode) def enable_object_detection(self, config: "ModelConfig | None" = None) -> None: """Enable and configure EVE's object detection (MOD) feature. Pass ``None`` to leave MOD off (the worker simply does not configure it, matching the SDK's disabled-by-default state). Otherwise: - ``config.model_path`` empty / ``None`` selects the SDK's bundled default model (GMOD Base Model) with its default class names. - a populated ``config.model_path`` loads that ``.tflite`` and, if given, replaces the class table with ``config.class_names``. ``modelPath`` and ``classCount`` are rewritten on every call rather than left to the values returned by ``EveGetDefaultObjectDetectionOptions``: that getter echoes the *previously configured* options, not pristine defaults. Without the unconditional reset, switching from a custom model (e.g. AMOD-8) back to the bundled default would leave the prior model's class names in EVE's table. An empty ``modelPath`` with ``classCount == 0`` is the SDK's documented "use the default model and its default class names". Args: config: The resolved MOD model to load, or ``None`` to skip MOD configuration entirely. Raises: SystemExit: If ``EveConfigureObjectDetection`` returns an error. """ if config is None: return options = self.eve_sdk.EveGetDefaultObjectDetectionOptions() options.enabled = sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED _pack_cstring(options.modelPath, config.model_path or "") if config.model_path and config.class_names: _pack_class_names(options.classNames, config.class_names) options.classCount = len(config.class_names) else: options.classCount = 0 if config.nms_threshold is not None: options.nmsThreshold = config.nms_threshold if config.iou_threshold is not None: options.iouThreshold = config.iou_threshold result = self.eve_sdk.EveConfigureObjectDetection(options) if result.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR: logger.info(f"EveConfigureObjectDetection() error code: {result.error}") sys.exit(result.error) def enable_face_and_person_detection( self, faceEnabled: bool = True, personEnabled: bool = True ): mode = sdk.structs.EveFaceTrackerMinimumMode.EVE_FACETRACKER_MINIMUM_MODE_OFF params = sdk.structs.EveFaceTrackerOptions( faceTrackerMode=mode, enable3DFaceTracking=( sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED if faceEnabled else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED ), fitSecondaryUsers=( sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED if faceEnabled else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED ), enablePersonDetection=( sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED if personEnabled else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED ), enableEyeLandmarks=sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED, ) result = self.eve_sdk.EveConfigureFaceTracker(params) if result.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR: logger.info(f"EveConfigureFaceTracker() error code: {result.error}") sys.exit(result.error) def enable_face_id(self, enabled: bool = True, threshold: float = 0.7) -> None: eve_enabled = ( sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED if enabled else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED ) options = sdk.structs.EveFaceIdOptions(enabled=eve_enabled, threshold=threshold) if enabled: options.calibrationPoses = ( sdk.structs.EveFaceIdCalibrationPoseMode.EVE_FACEID_CALIBRATION_FRONTAL_ONLY ) result = self.eve_sdk.EveConfigureFaceId(options) if result.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR: logger.info(f"EveConfigureFaceId() error code: {result.error}") sys.exit(result.error) def enable_hand_gesture(self, enabled=True, redetection_delay_ms=0): eve_enabled = ( sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED if enabled else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED ) result = self.eve_sdk.EveConfigureHandGesture( sdk.structs.EveHandGestureOptions( enabled=eve_enabled, redetectionDelay=redetection_delay_ms ) ) if result.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR: logger.info(f"EveConfigureHandGesture() error code: {result.errorCode}") sys.exit(result.errorCode) def _send_face_id_command( self, command: EveFaceIdCommand ) -> sdk.structs.EveFaceIdCommandStruct: cmd = sdk.structs.EveFaceIdCommandStruct(command=command) return self.eve_sdk.EveSendFaceIdCommand(cmd) def _log_all_faces(self, context: str, frame_idx: int, all_faces) -> None: """Log face ID details for every detected face in one frame.""" face_count = all_faces.faceData.contents.detectedFacesCount if face_count == 0: logger.debug(f"[{context}] frame {frame_idx}: no faces detected") return for j in range(face_count): f = all_faces.faceData.contents.faces[j] fid = f.faceId.faceIdentity logger.debug( f"[{context}] frame {frame_idx}: face {j}: " f"id={fid.id}, confidence={fid.confidence:.4f}, " f"similarity={fid.similarity:.4f}, " f"identificationStatus={f.faceId.identificationStatus}, " f"calibrationStatus={f.faceId.calibrationStatus}" ) def _send_frame(self, frame: np.ndarray) -> None: """Send a single frame through the SDK pipeline (send + consume output).""" rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # The SDK keeps the input pointer live past EveSendImageForProcessing and # dereferences it during the matching EveGet*() calls. Retain the ndarray # on the instance so it outlives every follow-up SDK call until the next # frame is sent. Dropping the local ref here caused 1080p video runs to # segfault inside libc memmove (use-after-free on the RGB buffer). self._last_sent_frame = rgb eve_image = self._create_eve_input_image(rgb, rgb.shape[1], rgb.shape[0], "RGB") send_err = self.eve_sdk.EveSendImageForProcessing(eve_image) if send_err != sdk.structs.EveError.EVE_ERROR_NO_ERROR: logger.warning(f"EveSendImageForProcessing failed: {send_err}") return False return True _FLUSH_FRAME_COUNT = 6 _CALIBRATION_MAX_ATTEMPTS = 5 _INFERENCE_LOG_INTERVAL = 30 # log face ID data every N frames during inference def calibrate_new_user(self, frames: list[np.ndarray]) -> CalibrationResult: """Register a new face by sending frames through the SDK calibration pipeline. Retries up to ``_CALIBRATION_MAX_ATTEMPTS`` times because the SDK can intermittently reject borderline-frontal faces. Args: frames: BGR images to use for calibration. Each frame is sent exactly once per attempt; returns at the first successful attempt. Returns: CalibrationResult with success flag, SDK-assigned user ID, and message. """ last_result: CalibrationResult | None = None for attempt in range(self._CALIBRATION_MAX_ATTEMPTS): last_result = self._calibrate_new_user_once(frames) if last_result.success: return last_result logger.info( "calibrate_new_user: attempt %d/%d failed: %s", attempt + 1, self._CALIBRATION_MAX_ATTEMPTS, last_result.message, ) return last_result # type: ignore[return-value] def _calibrate_new_user_once(self, frames: list[np.ndarray]) -> CalibrationResult: """Single calibration attempt — flush, send ADD_NEW_USER + frames, check result.""" self.enable_face_and_person_detection(faceEnabled=True) self.enable_face_id(enabled=True) h, w = frames[0].shape[:2] blank = np.zeros((h, w, 3), dtype=np.uint8) for _ in range(self._FLUSH_FRAME_COUNT): self._send_frame(blank) success_frame = -1 for i, frame in enumerate(frames): result = self._send_face_id_command(EveFaceIdCommand.EVE_FACE_ID_COMMAND_ADD_NEW_USER) if result.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR: return CalibrationResult( False, 0, f"ADD_NEW_USER command failed: {result.errorCode}" ) self._send_frame(frame) all_faces = self.eve_sdk.EveGetAllFaceData() if all_faces.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR: return CalibrationResult( False, 0, f"EveGetAllFaceData failed: {all_faces.errorCode}" ) face_count = all_faces.faceData.contents.detectedFacesCount self._log_all_faces("calibrate frame", i, all_faces) matched_face = None for fi in range(face_count): face = all_faces.faceData.contents.faces[fi] if ( face.faceId.faceIdentity.id >= 0 and face.faceId.identificationStatus == EveFaceIdIdentificationStatus.EVE_FACE_ID_SUCCESS ): matched_face = face break if matched_face is not None: success_frame = i break if success_frame > -1: user_id = matched_face.faceId.faceIdentity.id confidence = matched_face.faceId.faceIdentity.confidence similarity = matched_face.faceId.faceIdentity.similarity logger.debug( f"calibrate_new_user: SUCCESS on frame {success_frame}, " f"id={user_id}, confidence={confidence:.4f}, similarity={similarity:.4f}" ) if len(frames) == 1: return CalibrationResult(True, user_id, f"Registered as user {user_id}") else: return CalibrationResult( True, user_id, f"Calibration succeeded on frame {success_frame}. " f"Registered as user {user_id}.", ) else: return CalibrationResult( False, 0, "Calibration did not succeed. " "Ensure a clear, frontal face is visible in the upload.", ) def remove_all_users(self) -> bool: """Remove all users from the SDK face ID gallery. Returns: True if removal succeeded. """ result = self._send_face_id_command(EveFaceIdCommand.EVE_FACE_ID_COMMAND_REMOVE_ALL_USERS) if result.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR: logger.warning(f"REMOVE_ALL_USERS command error: {result.errorCode}") return False return True def restore_gallery(self, frames_per_user: list[list[np.ndarray]]) -> list[CalibrationResult]: """Wipe the SDK gallery and re-register users from stored frames. Used before video processing to sync the SDK gallery with a session's registered users. Sends a frame after removing to flush the command through the SDK pipeline before re-registering. Args: frames_per_user: List of frame lists, one per user to re-register. Returns: List of CalibrationResult, one per user. """ if not frames_per_user: return [] self.enable_face_and_person_detection(faceEnabled=True) self.enable_face_id(enabled=True) self.remove_all_users() # SDK commands are async — flush the remove by sending a frame self._send_frame(frames_per_user[0][0]) results = [] for idx, frames in enumerate(frames_per_user): r = self.calibrate_new_user(frames) logger.info( f"restore_gallery: user {idx}: success={r.success}, " f"sdk_id={r.user_id}, message='{r.message}'" ) results.append(r) return results @staticmethod def _ensure_config_dir(): from pathlib import Path try: new_dir_path = Path.home() / ".config" new_dir_path.mkdir(exist_ok=True) except Exception as e: print(f"Error creating directory: {e}") @staticmethod def _resolve_eve_paths(eve_bin_path="", eve_lib_path=""): if eve_bin_path and eve_lib_path: return eve_bin_path, eve_lib_path if _is_windows: eve_bin_path = r"C:\TLT_SRC_DIR\EdgeVisionEngine\x64\Release\\" eve_lib_path = eve_bin_path else: eve_dir_default_paths = glob.glob("/opt/EVE-*-Source", recursive=False) eve_bin_path = eve_bin_path or ( os.path.join(eve_dir_default_paths[0], "bin") if eve_dir_default_paths else "" ) eve_lib_path = eve_lib_path or ( os.path.join(eve_dir_default_paths[0], "lib") if eve_dir_default_paths else "" ) return eve_bin_path, eve_lib_path def _load_and_create_eve(self, eve_bin_path, eve_lib_path, image_provider): from pathlib import Path os.chdir(eve_bin_path) if _is_windows: eve_sdk_path = os.path.join(eve_bin_path, "EveSDK.dll") root = Path(os.path.abspath(__file__)).parent if not os.path.isfile(eve_sdk_path): eve_sdk_path = os.path.join(root.parent.parent, eve_bin_path, "EveSDK.dll") else: eve_sdk_path = os.path.join(eve_bin_path, "libEveSDK.so") if not os.path.isfile(eve_sdk_path): eve_sdk_path = os.path.join(eve_lib_path, "libEveSDK.so") eve_sdk_instance = sdk.EveSDK(eve_sdk_path) ByteArray512 = ctypes.c_byte * 512 encoded = os.path.dirname(eve_bin_path + os.sep).encode("utf-8") # EVE needs os.sep pathOverride = ByteArray512(*encoded, *([0] * (512 - len(encoded)))) # zero-pad to 512 startup_options = sdk.structs.EveStartupParameters( pathOverride=pathOverride, gpuPreference=sdk.structs.EveGpuPreference.EVE_NO_GPU, imageProvider=image_provider, startupType=sdk.structs.EveStartupType.EVE_SYNC, ) err = eve_sdk_instance.CreateEve(startup_options) if err != sdk.structs.EveError.EVE_ERROR_NO_ERROR: logger.info(f"CreateEve error code: {err}") sys.exit(err) return eve_sdk_instance def _set_camera(self): i = 0 self._metaDataFpgaCameraId = -1 self._fpgaCameraId = -1 while True: cameraInfo = self.eve_sdk.EveGetCamera(i) if ( cameraInfo.error == sdk.structs.EveError.EVE_INVALID_CAMERA_ID or cameraInfo.error == sdk.structs.EveError.EVE_NO_MORE_DATA ): break pid = ctypes.cast(cameraInfo.data.pid, ctypes.c_char_p).value vid = ctypes.cast(cameraInfo.data.vid, ctypes.c_char_p).value if cameraInfo.data.isFpgaCamera == 1: if self._metaDataFpgaCameraId == -1 and vid == b"META" and pid == b"DATA": self._metaDataFpgaCameraId = i elif self._fpgaCameraId == -1: self._fpgaCameraId = i print(i, self._fpgaCameraId, self._metaDataFpgaCameraId, cameraInfo.error, pid, vid) if self._fpgaCameraId >= 0 and self._metaDataFpgaCameraId >= 0: break i += 1 if self._fpgaCameraId == -1 and self._metaDataFpgaCameraId == -1: raise RuntimeError("No FPGA camera found") print( f" \n\t\t *** FPGA camera found: {self._fpgaCameraId}, metadata {self._metaDataFpgaCameraId}\n" ) useMetadataCamera = False if useMetadataCamera: self._usedCameraId = self._metaDataFpgaCameraId else: self._usedCameraId = self._fpgaCameraId cameraFormat = sdk.structs.CCameraFormat() cameraFormat.resolution.width = 640 cameraFormat.resolution.height = 480 cameraFormat.compareResolution = sdk.structs.EveCompare.EVE_AT_MOST cameraFormat.compareFps = sdk.structs.EveCompare.EVE_AT_LEAST formats = self.eve_sdk.EveGetFormats(self._usedCameraId, cameraFormat) f = formats.formats[0] print( f"camera selected: ID#{self._usedCameraId}: {f.resolution.width}x{f.resolution.height}, " f"Format: {f.format} @ {f.fps}FPS" ) errorCode = self.eve_sdk.EveSetCamera(self._usedCameraId, f) if errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR: raise RuntimeError(f"Could't set camera {errorCode}") @staticmethod def _create_eve_input_image(image_bin, width, height, encoding): image = sdk.structs.EveInputImage() image.data = image_bin.ctypes.data_as(ctypes.POINTER(ctypes.c_ubyte)) image.width = width image.height = height if encoding == "YUY2": image.encoding = sdk.structs.EveVideoFormat.EVE_YUY2 elif encoding == "NV12": image.encoding = sdk.structs.EveVideoFormat.EVE_NV12 elif encoding == "BGR": image.encoding = sdk.structs.EveVideoFormat.EVE_BGR elif encoding == "RGB": image.encoding = sdk.structs.EveVideoFormat.EVE_RGB return image