Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| import ctypes | |
| import glob | |
| import os | |
| import platform | |
| import sys | |
| import time | |
| from dataclasses import dataclass | |
| import cv2 | |
| import numpy as np | |
| from eve_messages import ModelConfig | |
| from eve_python import eve_sdk as sdk | |
| from eve_python.structs.CFaceIdStructs import ( | |
| EveFaceIdCommand, | |
| EveFaceIdIdentificationStatus, | |
| ) | |
| from log_utils import setup_logger | |
| logger = setup_logger("EveWrapper") | |
| def _pack_cstring(dst, value: str) -> None: | |
| """Write ``value`` as a null-terminated UTF-8 string into a ``c_byte`` array. | |
| ``c_byte`` is signed (-128..127), so bytes above 127 are folded into their | |
| signed two's-complement form — the underlying memory is identical to an | |
| unsigned ``char[]`` on the C side. The array is fully written so any stale | |
| bytes from a recycled options struct are cleared. | |
| Args: | |
| dst: A ctypes ``c_byte`` array (the destination buffer). | |
| value: The string to store; must fit with room for the null terminator. | |
| Raises: | |
| ValueError: If ``value`` does not leave room for a null terminator. | |
| """ | |
| raw = value.encode("utf-8") | |
| capacity = len(dst) | |
| if len(raw) >= capacity: | |
| raise ValueError(f"{value!r} ({len(raw)} bytes) exceeds buffer of {capacity}") | |
| for i in range(capacity): | |
| b = raw[i] if i < len(raw) else 0 | |
| dst[i] = b - 256 if b > 127 else b | |
| def _pack_class_names(dst, names: tuple[str, ...]) -> None: | |
| """Write ``names`` into a 2-D ``c_byte`` class-name table, one row each. | |
| Args: | |
| dst: A ctypes 2-D ``c_byte`` array (rows of fixed-width names). | |
| names: Class labels to store, in output-index order. | |
| Raises: | |
| ValueError: If there are more names than table rows. | |
| """ | |
| if len(names) > len(dst): | |
| raise ValueError(f"{len(names)} class names exceed table of {len(dst)} rows") | |
| for i, name in enumerate(names): | |
| _pack_cstring(dst[i], name) | |
| _is_windows = platform.system() == "Windows" | |
| DO_FAKE_MIRROR = True | |
| # Gate timing instrumentation behind the profiler env var | |
| _TIMING_ENABLED = os.environ.get("ENABLE_PROFILER", "").strip() not in ("", "0", "false") | |
| class CalibrationResult: | |
| """Result of a face ID calibration attempt.""" | |
| success: bool | |
| user_id: int | |
| message: str | |
| class EveWrapper: | |
| def __init__(self, eve_bin_path="", eve_lib_path=""): | |
| self._ensure_config_dir() | |
| eve_bin_path, eve_lib_path = self._resolve_eve_paths(eve_bin_path, eve_lib_path) | |
| self._mirror = False | |
| self._inference_frame_count = 0 | |
| self._last_sent_frame: np.ndarray | None = None | |
| # Per-call timing: {name: [call_count, total_seconds]} | |
| self._timings: dict[str, list[float]] = {} | |
| self._timing_enabled = _TIMING_ENABLED | |
| image_provider = sdk.structs.EveImageProvider.EVE_CLIENT_PROVIDED | |
| self.eve_sdk = self._load_and_create_eve(eve_bin_path, eve_lib_path, image_provider) | |
| if image_provider == sdk.structs.EveImageProvider.EVE_CAMERA: | |
| self._set_camera() | |
| image_request = sdk.structs.EveImageFormatRequest( | |
| location=sdk.structs.EveImageLocation.EVE_CPU, | |
| format=sdk.structs.EveVideoFormat.EVE_BGRA, | |
| ) | |
| self.eve_sdk.EveConfigureProcessedImage(image_request) | |
| err = self.eve_sdk.StartEveWithParameters( | |
| sdk.structs.EveProcessingParameters(type=sdk.structs.EveProcessingPipelineType.EVE_HMI) | |
| ) | |
| if err != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| logger.info(f"StartEveWithParameters error code: {err}") | |
| sys.exit(err) | |
| logger.info("EVE initialized") | |
| def _record_timing(self, name: str, elapsed: float) -> None: | |
| entry = self._timings.get(name) | |
| if entry is None: | |
| self._timings[name] = [1, elapsed] | |
| else: | |
| entry[0] += 1 | |
| entry[1] += elapsed | |
| def get_timing_stats(self, reset: bool = False) -> dict[str, tuple[int, float]]: | |
| """Return accumulated per-call timings as {name: (count, total_seconds)}. | |
| Args: | |
| reset: If True, clear the accumulators after reading. | |
| """ | |
| result = {k: (int(v[0]), v[1]) for k, v in self._timings.items()} | |
| if reset: | |
| self._timings.clear() | |
| return result | |
| def inference(self, image: np.ndarray) -> np.ndarray: | |
| t = self._timing_enabled | |
| if self._mirror and DO_FAKE_MIRROR: | |
| if t: | |
| _t0 = time.perf_counter() | |
| image = cv2.flip(image, 1) | |
| if t: | |
| self._record_timing("flip", time.perf_counter() - _t0) | |
| if t: | |
| _t0 = time.perf_counter() | |
| if not self._send_frame(image): | |
| return image | |
| if t: | |
| self._record_timing("EveSendImageForProcessing", time.perf_counter() - _t0) | |
| if t: | |
| _t0 = time.perf_counter() | |
| processed_image = self.eve_sdk.EveGetProcessedImage() | |
| if t: | |
| self._record_timing("EveGetProcessedImage", time.perf_counter() - _t0) | |
| if processed_image.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| logger.info(f"EveGetProcessedImage() error code: {processed_image.error}") | |
| sys.exit(processed_image.error) | |
| if t: | |
| _t0 = time.perf_counter() | |
| img = np.ctypeslib.as_array( | |
| processed_image.data, | |
| shape=(processed_image.height, processed_image.width, processed_image.channels), | |
| ).copy() | |
| del processed_image | |
| if t: | |
| self._record_timing("as_array+copy", time.perf_counter() - _t0) | |
| if img.shape[2] == 2: | |
| if t: | |
| _t0 = time.perf_counter() | |
| img = cv2.cvtColor(img, cv2.COLOR_YUV2BGR_YUYV) | |
| if t: | |
| self._record_timing("cvtColor", time.perf_counter() - _t0) | |
| elif img.shape[2] == 4: | |
| if t: | |
| _t0 = time.perf_counter() | |
| img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR) | |
| if t: | |
| self._record_timing("cvtColor", time.perf_counter() - _t0) | |
| elif img.shape[2] == 3: | |
| if t: | |
| _t0 = time.perf_counter() | |
| img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) | |
| if t: | |
| self._record_timing("cvtColor", time.perf_counter() - _t0) | |
| # SDK requires all outputs to be consumed before the next frame | |
| if t: | |
| _t0 = time.perf_counter() | |
| person_detection_data = self.eve_sdk.EveGetPersonDetectionData() | |
| if t: | |
| self._record_timing("EveGetPersonDetectionData", time.perf_counter() - _t0) | |
| if person_detection_data.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| logger.info(f"EveGetPersonDetectionData() error code: {person_detection_data.error}") | |
| sys.exit(person_detection_data.error) | |
| del person_detection_data | |
| if t: | |
| _t0 = time.perf_counter() | |
| all_faces = self.eve_sdk.EveGetAllFaceData() | |
| if t: | |
| self._record_timing("EveGetAllFaceData", time.perf_counter() - _t0) | |
| if all_faces.errorCode == sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| face_count = all_faces.faceData.contents.detectedFacesCount | |
| if face_count > 0: | |
| self._inference_frame_count += 1 | |
| # Log face ID data periodically to avoid flooding | |
| if self._inference_frame_count % self._INFERENCE_LOG_INTERVAL == 1: | |
| self._log_all_faces("inference", self._inference_frame_count, all_faces) | |
| del all_faces | |
| if img.shape[2] in (1, 3): | |
| return img | |
| print( | |
| f"WRONG FORMAT: {img.shape} " | |
| "(Consider converting it above here, or making EveImageFormatRequest work)" | |
| ) | |
| return None | |
| def reset_pipeline(self) -> None: | |
| """Reset the EVE processing pipeline to a clean state. | |
| Pipeline state (face tracking, ideal-user selection) persists on the | |
| long-lived SDK handle across inference calls. Workers are reused across | |
| offline video jobs, so without a reset the second video inherits the | |
| first run's pipeline state — most visibly, no ideal user is selected. | |
| Call this before processing each new video. | |
| """ | |
| err = self.eve_sdk.EveResetPipeline() | |
| if err != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| logger.warning(f"EveResetPipeline error code: {err}") | |
| def shutdown(self) -> None: | |
| """Cleanly shut down the Eve SDK instance.""" | |
| try: | |
| self.eve_sdk.ShutdownEve() | |
| logger.info("EVE shut down") | |
| except Exception as exc: | |
| logger.warning(f"EVE shutdown error: {exc}") | |
| def enable_mirror(self, enabled: bool = True) -> None: | |
| """Enable or disable image mirroring.""" | |
| if DO_FAKE_MIRROR: | |
| self._mirror = enabled | |
| return | |
| options = sdk.structs.EveImageManipulationOptions() | |
| options.settings.mirrorImage = 1 if enabled else 0 | |
| result = self.eve_sdk.EveConfigureImageManipulation(options) | |
| if result.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| logger.info(f"EveConfigureImageManipulation() error code: {result.errorCode}") | |
| sys.exit(result.errorCode) | |
| def enable_object_detection(self, config: "ModelConfig | None" = None) -> None: | |
| """Enable and configure EVE's object detection (MOD) feature. | |
| Pass ``None`` to leave MOD off (the worker simply does not configure | |
| it, matching the SDK's disabled-by-default state). Otherwise: | |
| - ``config.model_path`` empty / ``None`` selects the SDK's bundled | |
| default model (GMOD Base Model) with its default class names. | |
| - a populated ``config.model_path`` loads that ``.tflite`` and, if | |
| given, replaces the class table with ``config.class_names``. | |
| ``modelPath`` and ``classCount`` are rewritten on every call rather | |
| than left to the values returned by | |
| ``EveGetDefaultObjectDetectionOptions``: that getter echoes the | |
| *previously configured* options, not pristine defaults. Without the | |
| unconditional reset, switching from a custom model (e.g. AMOD-8) back | |
| to the bundled default would leave the prior model's class names in | |
| EVE's table. An empty ``modelPath`` with ``classCount == 0`` is the | |
| SDK's documented "use the default model and its default class names". | |
| Args: | |
| config: The resolved MOD model to load, or ``None`` to skip MOD | |
| configuration entirely. | |
| Raises: | |
| SystemExit: If ``EveConfigureObjectDetection`` returns an error. | |
| """ | |
| if config is None: | |
| return | |
| options = self.eve_sdk.EveGetDefaultObjectDetectionOptions() | |
| options.enabled = sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED | |
| _pack_cstring(options.modelPath, config.model_path or "") | |
| if config.model_path and config.class_names: | |
| _pack_class_names(options.classNames, config.class_names) | |
| options.classCount = len(config.class_names) | |
| else: | |
| options.classCount = 0 | |
| if config.nms_threshold is not None: | |
| options.nmsThreshold = config.nms_threshold | |
| if config.iou_threshold is not None: | |
| options.iouThreshold = config.iou_threshold | |
| result = self.eve_sdk.EveConfigureObjectDetection(options) | |
| if result.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| logger.info(f"EveConfigureObjectDetection() error code: {result.error}") | |
| sys.exit(result.error) | |
| def enable_face_and_person_detection( | |
| self, faceEnabled: bool = True, personEnabled: bool = True | |
| ): | |
| mode = sdk.structs.EveFaceTrackerMinimumMode.EVE_FACETRACKER_MINIMUM_MODE_OFF | |
| params = sdk.structs.EveFaceTrackerOptions( | |
| faceTrackerMode=mode, | |
| enable3DFaceTracking=( | |
| sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED | |
| if faceEnabled | |
| else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED | |
| ), | |
| fitSecondaryUsers=( | |
| sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED | |
| if faceEnabled | |
| else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED | |
| ), | |
| enablePersonDetection=( | |
| sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED | |
| if personEnabled | |
| else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED | |
| ), | |
| enableEyeLandmarks=sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED, | |
| ) | |
| result = self.eve_sdk.EveConfigureFaceTracker(params) | |
| if result.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| logger.info(f"EveConfigureFaceTracker() error code: {result.error}") | |
| sys.exit(result.error) | |
| def enable_face_id(self, enabled: bool = True, threshold: float = 0.7) -> None: | |
| eve_enabled = ( | |
| sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED | |
| if enabled | |
| else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED | |
| ) | |
| options = sdk.structs.EveFaceIdOptions(enabled=eve_enabled, threshold=threshold) | |
| if enabled: | |
| options.calibrationPoses = ( | |
| sdk.structs.EveFaceIdCalibrationPoseMode.EVE_FACEID_CALIBRATION_FRONTAL_ONLY | |
| ) | |
| result = self.eve_sdk.EveConfigureFaceId(options) | |
| if result.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| logger.info(f"EveConfigureFaceId() error code: {result.error}") | |
| sys.exit(result.error) | |
| def enable_hand_gesture(self, enabled=True, redetection_delay_ms=0): | |
| eve_enabled = ( | |
| sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED | |
| if enabled | |
| else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED | |
| ) | |
| result = self.eve_sdk.EveConfigureHandGesture( | |
| sdk.structs.EveHandGestureOptions( | |
| enabled=eve_enabled, redetectionDelay=redetection_delay_ms | |
| ) | |
| ) | |
| if result.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| logger.info(f"EveConfigureHandGesture() error code: {result.errorCode}") | |
| sys.exit(result.errorCode) | |
| def _send_face_id_command( | |
| self, command: EveFaceIdCommand | |
| ) -> sdk.structs.EveFaceIdCommandStruct: | |
| cmd = sdk.structs.EveFaceIdCommandStruct(command=command) | |
| return self.eve_sdk.EveSendFaceIdCommand(cmd) | |
| def _log_all_faces(self, context: str, frame_idx: int, all_faces) -> None: | |
| """Log face ID details for every detected face in one frame.""" | |
| face_count = all_faces.faceData.contents.detectedFacesCount | |
| if face_count == 0: | |
| logger.debug(f"[{context}] frame {frame_idx}: no faces detected") | |
| return | |
| for j in range(face_count): | |
| f = all_faces.faceData.contents.faces[j] | |
| fid = f.faceId.faceIdentity | |
| logger.debug( | |
| f"[{context}] frame {frame_idx}: face {j}: " | |
| f"id={fid.id}, confidence={fid.confidence:.4f}, " | |
| f"similarity={fid.similarity:.4f}, " | |
| f"identificationStatus={f.faceId.identificationStatus}, " | |
| f"calibrationStatus={f.faceId.calibrationStatus}" | |
| ) | |
| def _send_frame(self, frame: np.ndarray) -> None: | |
| """Send a single frame through the SDK pipeline (send + consume output).""" | |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # The SDK keeps the input pointer live past EveSendImageForProcessing and | |
| # dereferences it during the matching EveGet*() calls. Retain the ndarray | |
| # on the instance so it outlives every follow-up SDK call until the next | |
| # frame is sent. Dropping the local ref here caused 1080p video runs to | |
| # segfault inside libc memmove (use-after-free on the RGB buffer). | |
| self._last_sent_frame = rgb | |
| eve_image = self._create_eve_input_image(rgb, rgb.shape[1], rgb.shape[0], "RGB") | |
| send_err = self.eve_sdk.EveSendImageForProcessing(eve_image) | |
| if send_err != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| logger.warning(f"EveSendImageForProcessing failed: {send_err}") | |
| return False | |
| return True | |
| _FLUSH_FRAME_COUNT = 6 | |
| _CALIBRATION_MAX_ATTEMPTS = 5 | |
| _INFERENCE_LOG_INTERVAL = 30 # log face ID data every N frames during inference | |
| def calibrate_new_user(self, frames: list[np.ndarray]) -> CalibrationResult: | |
| """Register a new face by sending frames through the SDK calibration pipeline. | |
| Retries up to ``_CALIBRATION_MAX_ATTEMPTS`` times because the SDK can | |
| intermittently reject borderline-frontal faces. | |
| Args: | |
| frames: BGR images to use for calibration. Each frame is sent exactly once | |
| per attempt; returns at the first successful attempt. | |
| Returns: | |
| CalibrationResult with success flag, SDK-assigned user ID, and message. | |
| """ | |
| last_result: CalibrationResult | None = None | |
| for attempt in range(self._CALIBRATION_MAX_ATTEMPTS): | |
| last_result = self._calibrate_new_user_once(frames) | |
| if last_result.success: | |
| return last_result | |
| logger.info( | |
| "calibrate_new_user: attempt %d/%d failed: %s", | |
| attempt + 1, | |
| self._CALIBRATION_MAX_ATTEMPTS, | |
| last_result.message, | |
| ) | |
| return last_result # type: ignore[return-value] | |
| def _calibrate_new_user_once(self, frames: list[np.ndarray]) -> CalibrationResult: | |
| """Single calibration attempt — flush, send ADD_NEW_USER + frames, check result.""" | |
| self.enable_face_and_person_detection(faceEnabled=True) | |
| self.enable_face_id(enabled=True) | |
| h, w = frames[0].shape[:2] | |
| blank = np.zeros((h, w, 3), dtype=np.uint8) | |
| for _ in range(self._FLUSH_FRAME_COUNT): | |
| self._send_frame(blank) | |
| success_frame = -1 | |
| for i, frame in enumerate(frames): | |
| result = self._send_face_id_command(EveFaceIdCommand.EVE_FACE_ID_COMMAND_ADD_NEW_USER) | |
| if result.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| return CalibrationResult( | |
| False, 0, f"ADD_NEW_USER command failed: {result.errorCode}" | |
| ) | |
| self._send_frame(frame) | |
| all_faces = self.eve_sdk.EveGetAllFaceData() | |
| if all_faces.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| return CalibrationResult( | |
| False, 0, f"EveGetAllFaceData failed: {all_faces.errorCode}" | |
| ) | |
| face_count = all_faces.faceData.contents.detectedFacesCount | |
| self._log_all_faces("calibrate frame", i, all_faces) | |
| matched_face = None | |
| for fi in range(face_count): | |
| face = all_faces.faceData.contents.faces[fi] | |
| if ( | |
| face.faceId.faceIdentity.id >= 0 | |
| and face.faceId.identificationStatus | |
| == EveFaceIdIdentificationStatus.EVE_FACE_ID_SUCCESS | |
| ): | |
| matched_face = face | |
| break | |
| if matched_face is not None: | |
| success_frame = i | |
| break | |
| if success_frame > -1: | |
| user_id = matched_face.faceId.faceIdentity.id | |
| confidence = matched_face.faceId.faceIdentity.confidence | |
| similarity = matched_face.faceId.faceIdentity.similarity | |
| logger.debug( | |
| f"calibrate_new_user: SUCCESS on frame {success_frame}, " | |
| f"id={user_id}, confidence={confidence:.4f}, similarity={similarity:.4f}" | |
| ) | |
| if len(frames) == 1: | |
| return CalibrationResult(True, user_id, f"Registered as user {user_id}") | |
| else: | |
| return CalibrationResult( | |
| True, | |
| user_id, | |
| f"Calibration succeeded on frame {success_frame}. " | |
| f"Registered as user {user_id}.", | |
| ) | |
| else: | |
| return CalibrationResult( | |
| False, | |
| 0, | |
| "Calibration did not succeed. " | |
| "Ensure a clear, frontal face is visible in the upload.", | |
| ) | |
| def remove_all_users(self) -> bool: | |
| """Remove all users from the SDK face ID gallery. | |
| Returns: | |
| True if removal succeeded. | |
| """ | |
| result = self._send_face_id_command(EveFaceIdCommand.EVE_FACE_ID_COMMAND_REMOVE_ALL_USERS) | |
| if result.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| logger.warning(f"REMOVE_ALL_USERS command error: {result.errorCode}") | |
| return False | |
| return True | |
| def restore_gallery(self, frames_per_user: list[list[np.ndarray]]) -> list[CalibrationResult]: | |
| """Wipe the SDK gallery and re-register users from stored frames. | |
| Used before video processing to sync the SDK gallery with a session's | |
| registered users. Sends a frame after removing to flush the command | |
| through the SDK pipeline before re-registering. | |
| Args: | |
| frames_per_user: List of frame lists, one per user to re-register. | |
| Returns: | |
| List of CalibrationResult, one per user. | |
| """ | |
| if not frames_per_user: | |
| return [] | |
| self.enable_face_and_person_detection(faceEnabled=True) | |
| self.enable_face_id(enabled=True) | |
| self.remove_all_users() | |
| # SDK commands are async — flush the remove by sending a frame | |
| self._send_frame(frames_per_user[0][0]) | |
| results = [] | |
| for idx, frames in enumerate(frames_per_user): | |
| r = self.calibrate_new_user(frames) | |
| logger.info( | |
| f"restore_gallery: user {idx}: success={r.success}, " | |
| f"sdk_id={r.user_id}, message='{r.message}'" | |
| ) | |
| results.append(r) | |
| return results | |
| def _ensure_config_dir(): | |
| from pathlib import Path | |
| try: | |
| new_dir_path = Path.home() / ".config" | |
| new_dir_path.mkdir(exist_ok=True) | |
| except Exception as e: | |
| print(f"Error creating directory: {e}") | |
| def _resolve_eve_paths(eve_bin_path="", eve_lib_path=""): | |
| if eve_bin_path and eve_lib_path: | |
| return eve_bin_path, eve_lib_path | |
| if _is_windows: | |
| eve_bin_path = r"C:\TLT_SRC_DIR\EdgeVisionEngine\x64\Release\\" | |
| eve_lib_path = eve_bin_path | |
| else: | |
| eve_dir_default_paths = glob.glob("/opt/EVE-*-Source", recursive=False) | |
| eve_bin_path = eve_bin_path or ( | |
| os.path.join(eve_dir_default_paths[0], "bin") if eve_dir_default_paths else "" | |
| ) | |
| eve_lib_path = eve_lib_path or ( | |
| os.path.join(eve_dir_default_paths[0], "lib") if eve_dir_default_paths else "" | |
| ) | |
| return eve_bin_path, eve_lib_path | |
| def _load_and_create_eve(self, eve_bin_path, eve_lib_path, image_provider): | |
| from pathlib import Path | |
| os.chdir(eve_bin_path) | |
| if _is_windows: | |
| eve_sdk_path = os.path.join(eve_bin_path, "EveSDK.dll") | |
| root = Path(os.path.abspath(__file__)).parent | |
| if not os.path.isfile(eve_sdk_path): | |
| eve_sdk_path = os.path.join(root.parent.parent, eve_bin_path, "EveSDK.dll") | |
| else: | |
| eve_sdk_path = os.path.join(eve_bin_path, "libEveSDK.so") | |
| if not os.path.isfile(eve_sdk_path): | |
| eve_sdk_path = os.path.join(eve_lib_path, "libEveSDK.so") | |
| eve_sdk_instance = sdk.EveSDK(eve_sdk_path) | |
| ByteArray512 = ctypes.c_byte * 512 | |
| encoded = os.path.dirname(eve_bin_path + os.sep).encode("utf-8") # EVE needs os.sep | |
| pathOverride = ByteArray512(*encoded, *([0] * (512 - len(encoded)))) # zero-pad to 512 | |
| startup_options = sdk.structs.EveStartupParameters( | |
| pathOverride=pathOverride, | |
| gpuPreference=sdk.structs.EveGpuPreference.EVE_NO_GPU, | |
| imageProvider=image_provider, | |
| startupType=sdk.structs.EveStartupType.EVE_SYNC, | |
| ) | |
| err = eve_sdk_instance.CreateEve(startup_options) | |
| if err != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| logger.info(f"CreateEve error code: {err}") | |
| sys.exit(err) | |
| return eve_sdk_instance | |
| def _set_camera(self): | |
| i = 0 | |
| self._metaDataFpgaCameraId = -1 | |
| self._fpgaCameraId = -1 | |
| while True: | |
| cameraInfo = self.eve_sdk.EveGetCamera(i) | |
| if ( | |
| cameraInfo.error == sdk.structs.EveError.EVE_INVALID_CAMERA_ID | |
| or cameraInfo.error == sdk.structs.EveError.EVE_NO_MORE_DATA | |
| ): | |
| break | |
| pid = ctypes.cast(cameraInfo.data.pid, ctypes.c_char_p).value | |
| vid = ctypes.cast(cameraInfo.data.vid, ctypes.c_char_p).value | |
| if cameraInfo.data.isFpgaCamera == 1: | |
| if self._metaDataFpgaCameraId == -1 and vid == b"META" and pid == b"DATA": | |
| self._metaDataFpgaCameraId = i | |
| elif self._fpgaCameraId == -1: | |
| self._fpgaCameraId = i | |
| print(i, self._fpgaCameraId, self._metaDataFpgaCameraId, cameraInfo.error, pid, vid) | |
| if self._fpgaCameraId >= 0 and self._metaDataFpgaCameraId >= 0: | |
| break | |
| i += 1 | |
| if self._fpgaCameraId == -1 and self._metaDataFpgaCameraId == -1: | |
| raise RuntimeError("No FPGA camera found") | |
| print( | |
| f" \n\t\t *** FPGA camera found: {self._fpgaCameraId}, metadata {self._metaDataFpgaCameraId}\n" | |
| ) | |
| useMetadataCamera = False | |
| if useMetadataCamera: | |
| self._usedCameraId = self._metaDataFpgaCameraId | |
| else: | |
| self._usedCameraId = self._fpgaCameraId | |
| cameraFormat = sdk.structs.CCameraFormat() | |
| cameraFormat.resolution.width = 640 | |
| cameraFormat.resolution.height = 480 | |
| cameraFormat.compareResolution = sdk.structs.EveCompare.EVE_AT_MOST | |
| cameraFormat.compareFps = sdk.structs.EveCompare.EVE_AT_LEAST | |
| formats = self.eve_sdk.EveGetFormats(self._usedCameraId, cameraFormat) | |
| f = formats.formats[0] | |
| print( | |
| f"camera selected: ID#{self._usedCameraId}: {f.resolution.width}x{f.resolution.height}, " | |
| f"Format: {f.format} @ {f.fps}FPS" | |
| ) | |
| errorCode = self.eve_sdk.EveSetCamera(self._usedCameraId, f) | |
| if errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR: | |
| raise RuntimeError(f"Could't set camera {errorCode}") | |
| def _create_eve_input_image(image_bin, width, height, encoding): | |
| image = sdk.structs.EveInputImage() | |
| image.data = image_bin.ctypes.data_as(ctypes.POINTER(ctypes.c_ubyte)) | |
| image.width = width | |
| image.height = height | |
| if encoding == "YUY2": | |
| image.encoding = sdk.structs.EveVideoFormat.EVE_YUY2 | |
| elif encoding == "NV12": | |
| image.encoding = sdk.structs.EveVideoFormat.EVE_NV12 | |
| elif encoding == "BGR": | |
| image.encoding = sdk.structs.EveVideoFormat.EVE_BGR | |
| elif encoding == "RGB": | |
| image.encoding = sdk.structs.EveVideoFormat.EVE_RGB | |
| return image | |