beaupreda's picture
Upload sensAI-Generic-Object-Detection with upload_repo.py
0cb83bc verified
Raw
History Blame Contribute Delete
27.7 kB
import ctypes
import glob
import os
import platform
import sys
import time
from dataclasses import dataclass
import cv2
import numpy as np
from eve_messages import ModelConfig
from eve_python import eve_sdk as sdk
from eve_python.structs.CFaceIdStructs import (
EveFaceIdCommand,
EveFaceIdIdentificationStatus,
)
from log_utils import setup_logger
logger = setup_logger("EveWrapper")
def _pack_cstring(dst, value: str) -> None:
"""Write ``value`` as a null-terminated UTF-8 string into a ``c_byte`` array.
``c_byte`` is signed (-128..127), so bytes above 127 are folded into their
signed two's-complement form — the underlying memory is identical to an
unsigned ``char[]`` on the C side. The array is fully written so any stale
bytes from a recycled options struct are cleared.
Args:
dst: A ctypes ``c_byte`` array (the destination buffer).
value: The string to store; must fit with room for the null terminator.
Raises:
ValueError: If ``value`` does not leave room for a null terminator.
"""
raw = value.encode("utf-8")
capacity = len(dst)
if len(raw) >= capacity:
raise ValueError(f"{value!r} ({len(raw)} bytes) exceeds buffer of {capacity}")
for i in range(capacity):
b = raw[i] if i < len(raw) else 0
dst[i] = b - 256 if b > 127 else b
def _pack_class_names(dst, names: tuple[str, ...]) -> None:
"""Write ``names`` into a 2-D ``c_byte`` class-name table, one row each.
Args:
dst: A ctypes 2-D ``c_byte`` array (rows of fixed-width names).
names: Class labels to store, in output-index order.
Raises:
ValueError: If there are more names than table rows.
"""
if len(names) > len(dst):
raise ValueError(f"{len(names)} class names exceed table of {len(dst)} rows")
for i, name in enumerate(names):
_pack_cstring(dst[i], name)
_is_windows = platform.system() == "Windows"
DO_FAKE_MIRROR = True
# Gate timing instrumentation behind the profiler env var
_TIMING_ENABLED = os.environ.get("ENABLE_PROFILER", "").strip() not in ("", "0", "false")
@dataclass
class CalibrationResult:
"""Result of a face ID calibration attempt."""
success: bool
user_id: int
message: str
class EveWrapper:
def __init__(self, eve_bin_path="", eve_lib_path=""):
self._ensure_config_dir()
eve_bin_path, eve_lib_path = self._resolve_eve_paths(eve_bin_path, eve_lib_path)
self._mirror = False
self._inference_frame_count = 0
self._last_sent_frame: np.ndarray | None = None
# Per-call timing: {name: [call_count, total_seconds]}
self._timings: dict[str, list[float]] = {}
self._timing_enabled = _TIMING_ENABLED
image_provider = sdk.structs.EveImageProvider.EVE_CLIENT_PROVIDED
self.eve_sdk = self._load_and_create_eve(eve_bin_path, eve_lib_path, image_provider)
if image_provider == sdk.structs.EveImageProvider.EVE_CAMERA:
self._set_camera()
image_request = sdk.structs.EveImageFormatRequest(
location=sdk.structs.EveImageLocation.EVE_CPU,
format=sdk.structs.EveVideoFormat.EVE_BGRA,
)
self.eve_sdk.EveConfigureProcessedImage(image_request)
err = self.eve_sdk.StartEveWithParameters(
sdk.structs.EveProcessingParameters(type=sdk.structs.EveProcessingPipelineType.EVE_HMI)
)
if err != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
logger.info(f"StartEveWithParameters error code: {err}")
sys.exit(err)
logger.info("EVE initialized")
def _record_timing(self, name: str, elapsed: float) -> None:
entry = self._timings.get(name)
if entry is None:
self._timings[name] = [1, elapsed]
else:
entry[0] += 1
entry[1] += elapsed
def get_timing_stats(self, reset: bool = False) -> dict[str, tuple[int, float]]:
"""Return accumulated per-call timings as {name: (count, total_seconds)}.
Args:
reset: If True, clear the accumulators after reading.
"""
result = {k: (int(v[0]), v[1]) for k, v in self._timings.items()}
if reset:
self._timings.clear()
return result
def inference(self, image: np.ndarray) -> np.ndarray:
t = self._timing_enabled
if self._mirror and DO_FAKE_MIRROR:
if t:
_t0 = time.perf_counter()
image = cv2.flip(image, 1)
if t:
self._record_timing("flip", time.perf_counter() - _t0)
if t:
_t0 = time.perf_counter()
if not self._send_frame(image):
return image
if t:
self._record_timing("EveSendImageForProcessing", time.perf_counter() - _t0)
if t:
_t0 = time.perf_counter()
processed_image = self.eve_sdk.EveGetProcessedImage()
if t:
self._record_timing("EveGetProcessedImage", time.perf_counter() - _t0)
if processed_image.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
logger.info(f"EveGetProcessedImage() error code: {processed_image.error}")
sys.exit(processed_image.error)
if t:
_t0 = time.perf_counter()
img = np.ctypeslib.as_array(
processed_image.data,
shape=(processed_image.height, processed_image.width, processed_image.channels),
).copy()
del processed_image
if t:
self._record_timing("as_array+copy", time.perf_counter() - _t0)
if img.shape[2] == 2:
if t:
_t0 = time.perf_counter()
img = cv2.cvtColor(img, cv2.COLOR_YUV2BGR_YUYV)
if t:
self._record_timing("cvtColor", time.perf_counter() - _t0)
elif img.shape[2] == 4:
if t:
_t0 = time.perf_counter()
img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR)
if t:
self._record_timing("cvtColor", time.perf_counter() - _t0)
elif img.shape[2] == 3:
if t:
_t0 = time.perf_counter()
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
if t:
self._record_timing("cvtColor", time.perf_counter() - _t0)
# SDK requires all outputs to be consumed before the next frame
if t:
_t0 = time.perf_counter()
person_detection_data = self.eve_sdk.EveGetPersonDetectionData()
if t:
self._record_timing("EveGetPersonDetectionData", time.perf_counter() - _t0)
if person_detection_data.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
logger.info(f"EveGetPersonDetectionData() error code: {person_detection_data.error}")
sys.exit(person_detection_data.error)
del person_detection_data
if t:
_t0 = time.perf_counter()
all_faces = self.eve_sdk.EveGetAllFaceData()
if t:
self._record_timing("EveGetAllFaceData", time.perf_counter() - _t0)
if all_faces.errorCode == sdk.structs.EveError.EVE_ERROR_NO_ERROR:
face_count = all_faces.faceData.contents.detectedFacesCount
if face_count > 0:
self._inference_frame_count += 1
# Log face ID data periodically to avoid flooding
if self._inference_frame_count % self._INFERENCE_LOG_INTERVAL == 1:
self._log_all_faces("inference", self._inference_frame_count, all_faces)
del all_faces
if img.shape[2] in (1, 3):
return img
print(
f"WRONG FORMAT: {img.shape} "
"(Consider converting it above here, or making EveImageFormatRequest work)"
)
return None
def reset_pipeline(self) -> None:
"""Reset the EVE processing pipeline to a clean state.
Pipeline state (face tracking, ideal-user selection) persists on the
long-lived SDK handle across inference calls. Workers are reused across
offline video jobs, so without a reset the second video inherits the
first run's pipeline state — most visibly, no ideal user is selected.
Call this before processing each new video.
"""
err = self.eve_sdk.EveResetPipeline()
if err != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
logger.warning(f"EveResetPipeline error code: {err}")
def shutdown(self) -> None:
"""Cleanly shut down the Eve SDK instance."""
try:
self.eve_sdk.ShutdownEve()
logger.info("EVE shut down")
except Exception as exc:
logger.warning(f"EVE shutdown error: {exc}")
def enable_mirror(self, enabled: bool = True) -> None:
"""Enable or disable image mirroring."""
if DO_FAKE_MIRROR:
self._mirror = enabled
return
options = sdk.structs.EveImageManipulationOptions()
options.settings.mirrorImage = 1 if enabled else 0
result = self.eve_sdk.EveConfigureImageManipulation(options)
if result.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
logger.info(f"EveConfigureImageManipulation() error code: {result.errorCode}")
sys.exit(result.errorCode)
def enable_object_detection(self, config: "ModelConfig | None" = None) -> None:
"""Enable and configure EVE's object detection (MOD) feature.
Pass ``None`` to leave MOD off (the worker simply does not configure
it, matching the SDK's disabled-by-default state). Otherwise:
- ``config.model_path`` empty / ``None`` selects the SDK's bundled
default model (GMOD Base Model) with its default class names.
- a populated ``config.model_path`` loads that ``.tflite`` and, if
given, replaces the class table with ``config.class_names``.
``modelPath`` and ``classCount`` are rewritten on every call rather
than left to the values returned by
``EveGetDefaultObjectDetectionOptions``: that getter echoes the
*previously configured* options, not pristine defaults. Without the
unconditional reset, switching from a custom model (e.g. AMOD-8) back
to the bundled default would leave the prior model's class names in
EVE's table. An empty ``modelPath`` with ``classCount == 0`` is the
SDK's documented "use the default model and its default class names".
Args:
config: The resolved MOD model to load, or ``None`` to skip MOD
configuration entirely.
Raises:
SystemExit: If ``EveConfigureObjectDetection`` returns an error.
"""
if config is None:
return
options = self.eve_sdk.EveGetDefaultObjectDetectionOptions()
options.enabled = sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED
_pack_cstring(options.modelPath, config.model_path or "")
if config.model_path and config.class_names:
_pack_class_names(options.classNames, config.class_names)
options.classCount = len(config.class_names)
else:
options.classCount = 0
if config.nms_threshold is not None:
options.nmsThreshold = config.nms_threshold
if config.iou_threshold is not None:
options.iouThreshold = config.iou_threshold
result = self.eve_sdk.EveConfigureObjectDetection(options)
if result.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
logger.info(f"EveConfigureObjectDetection() error code: {result.error}")
sys.exit(result.error)
def enable_face_and_person_detection(
self, faceEnabled: bool = True, personEnabled: bool = True
):
mode = sdk.structs.EveFaceTrackerMinimumMode.EVE_FACETRACKER_MINIMUM_MODE_OFF
params = sdk.structs.EveFaceTrackerOptions(
faceTrackerMode=mode,
enable3DFaceTracking=(
sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED
if faceEnabled
else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED
),
fitSecondaryUsers=(
sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED
if faceEnabled
else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED
),
enablePersonDetection=(
sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED
if personEnabled
else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED
),
enableEyeLandmarks=sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED,
)
result = self.eve_sdk.EveConfigureFaceTracker(params)
if result.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
logger.info(f"EveConfigureFaceTracker() error code: {result.error}")
sys.exit(result.error)
def enable_face_id(self, enabled: bool = True, threshold: float = 0.7) -> None:
eve_enabled = (
sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED
if enabled
else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED
)
options = sdk.structs.EveFaceIdOptions(enabled=eve_enabled, threshold=threshold)
if enabled:
options.calibrationPoses = (
sdk.structs.EveFaceIdCalibrationPoseMode.EVE_FACEID_CALIBRATION_FRONTAL_ONLY
)
result = self.eve_sdk.EveConfigureFaceId(options)
if result.error != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
logger.info(f"EveConfigureFaceId() error code: {result.error}")
sys.exit(result.error)
def enable_hand_gesture(self, enabled=True, redetection_delay_ms=0):
eve_enabled = (
sdk.structs.EveOptionEnabled.EVE_OPTION_ENABLED
if enabled
else sdk.structs.EveOptionEnabled.EVE_OPTION_DISABLED
)
result = self.eve_sdk.EveConfigureHandGesture(
sdk.structs.EveHandGestureOptions(
enabled=eve_enabled, redetectionDelay=redetection_delay_ms
)
)
if result.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
logger.info(f"EveConfigureHandGesture() error code: {result.errorCode}")
sys.exit(result.errorCode)
def _send_face_id_command(
self, command: EveFaceIdCommand
) -> sdk.structs.EveFaceIdCommandStruct:
cmd = sdk.structs.EveFaceIdCommandStruct(command=command)
return self.eve_sdk.EveSendFaceIdCommand(cmd)
def _log_all_faces(self, context: str, frame_idx: int, all_faces) -> None:
"""Log face ID details for every detected face in one frame."""
face_count = all_faces.faceData.contents.detectedFacesCount
if face_count == 0:
logger.debug(f"[{context}] frame {frame_idx}: no faces detected")
return
for j in range(face_count):
f = all_faces.faceData.contents.faces[j]
fid = f.faceId.faceIdentity
logger.debug(
f"[{context}] frame {frame_idx}: face {j}: "
f"id={fid.id}, confidence={fid.confidence:.4f}, "
f"similarity={fid.similarity:.4f}, "
f"identificationStatus={f.faceId.identificationStatus}, "
f"calibrationStatus={f.faceId.calibrationStatus}"
)
def _send_frame(self, frame: np.ndarray) -> None:
"""Send a single frame through the SDK pipeline (send + consume output)."""
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# The SDK keeps the input pointer live past EveSendImageForProcessing and
# dereferences it during the matching EveGet*() calls. Retain the ndarray
# on the instance so it outlives every follow-up SDK call until the next
# frame is sent. Dropping the local ref here caused 1080p video runs to
# segfault inside libc memmove (use-after-free on the RGB buffer).
self._last_sent_frame = rgb
eve_image = self._create_eve_input_image(rgb, rgb.shape[1], rgb.shape[0], "RGB")
send_err = self.eve_sdk.EveSendImageForProcessing(eve_image)
if send_err != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
logger.warning(f"EveSendImageForProcessing failed: {send_err}")
return False
return True
_FLUSH_FRAME_COUNT = 6
_CALIBRATION_MAX_ATTEMPTS = 5
_INFERENCE_LOG_INTERVAL = 30 # log face ID data every N frames during inference
def calibrate_new_user(self, frames: list[np.ndarray]) -> CalibrationResult:
"""Register a new face by sending frames through the SDK calibration pipeline.
Retries up to ``_CALIBRATION_MAX_ATTEMPTS`` times because the SDK can
intermittently reject borderline-frontal faces.
Args:
frames: BGR images to use for calibration. Each frame is sent exactly once
per attempt; returns at the first successful attempt.
Returns:
CalibrationResult with success flag, SDK-assigned user ID, and message.
"""
last_result: CalibrationResult | None = None
for attempt in range(self._CALIBRATION_MAX_ATTEMPTS):
last_result = self._calibrate_new_user_once(frames)
if last_result.success:
return last_result
logger.info(
"calibrate_new_user: attempt %d/%d failed: %s",
attempt + 1,
self._CALIBRATION_MAX_ATTEMPTS,
last_result.message,
)
return last_result # type: ignore[return-value]
def _calibrate_new_user_once(self, frames: list[np.ndarray]) -> CalibrationResult:
"""Single calibration attempt — flush, send ADD_NEW_USER + frames, check result."""
self.enable_face_and_person_detection(faceEnabled=True)
self.enable_face_id(enabled=True)
h, w = frames[0].shape[:2]
blank = np.zeros((h, w, 3), dtype=np.uint8)
for _ in range(self._FLUSH_FRAME_COUNT):
self._send_frame(blank)
success_frame = -1
for i, frame in enumerate(frames):
result = self._send_face_id_command(EveFaceIdCommand.EVE_FACE_ID_COMMAND_ADD_NEW_USER)
if result.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
return CalibrationResult(
False, 0, f"ADD_NEW_USER command failed: {result.errorCode}"
)
self._send_frame(frame)
all_faces = self.eve_sdk.EveGetAllFaceData()
if all_faces.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
return CalibrationResult(
False, 0, f"EveGetAllFaceData failed: {all_faces.errorCode}"
)
face_count = all_faces.faceData.contents.detectedFacesCount
self._log_all_faces("calibrate frame", i, all_faces)
matched_face = None
for fi in range(face_count):
face = all_faces.faceData.contents.faces[fi]
if (
face.faceId.faceIdentity.id >= 0
and face.faceId.identificationStatus
== EveFaceIdIdentificationStatus.EVE_FACE_ID_SUCCESS
):
matched_face = face
break
if matched_face is not None:
success_frame = i
break
if success_frame > -1:
user_id = matched_face.faceId.faceIdentity.id
confidence = matched_face.faceId.faceIdentity.confidence
similarity = matched_face.faceId.faceIdentity.similarity
logger.debug(
f"calibrate_new_user: SUCCESS on frame {success_frame}, "
f"id={user_id}, confidence={confidence:.4f}, similarity={similarity:.4f}"
)
if len(frames) == 1:
return CalibrationResult(True, user_id, f"Registered as user {user_id}")
else:
return CalibrationResult(
True,
user_id,
f"Calibration succeeded on frame {success_frame}. "
f"Registered as user {user_id}.",
)
else:
return CalibrationResult(
False,
0,
"Calibration did not succeed. "
"Ensure a clear, frontal face is visible in the upload.",
)
def remove_all_users(self) -> bool:
"""Remove all users from the SDK face ID gallery.
Returns:
True if removal succeeded.
"""
result = self._send_face_id_command(EveFaceIdCommand.EVE_FACE_ID_COMMAND_REMOVE_ALL_USERS)
if result.errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
logger.warning(f"REMOVE_ALL_USERS command error: {result.errorCode}")
return False
return True
def restore_gallery(self, frames_per_user: list[list[np.ndarray]]) -> list[CalibrationResult]:
"""Wipe the SDK gallery and re-register users from stored frames.
Used before video processing to sync the SDK gallery with a session's
registered users. Sends a frame after removing to flush the command
through the SDK pipeline before re-registering.
Args:
frames_per_user: List of frame lists, one per user to re-register.
Returns:
List of CalibrationResult, one per user.
"""
if not frames_per_user:
return []
self.enable_face_and_person_detection(faceEnabled=True)
self.enable_face_id(enabled=True)
self.remove_all_users()
# SDK commands are async — flush the remove by sending a frame
self._send_frame(frames_per_user[0][0])
results = []
for idx, frames in enumerate(frames_per_user):
r = self.calibrate_new_user(frames)
logger.info(
f"restore_gallery: user {idx}: success={r.success}, "
f"sdk_id={r.user_id}, message='{r.message}'"
)
results.append(r)
return results
@staticmethod
def _ensure_config_dir():
from pathlib import Path
try:
new_dir_path = Path.home() / ".config"
new_dir_path.mkdir(exist_ok=True)
except Exception as e:
print(f"Error creating directory: {e}")
@staticmethod
def _resolve_eve_paths(eve_bin_path="", eve_lib_path=""):
if eve_bin_path and eve_lib_path:
return eve_bin_path, eve_lib_path
if _is_windows:
eve_bin_path = r"C:\TLT_SRC_DIR\EdgeVisionEngine\x64\Release\\"
eve_lib_path = eve_bin_path
else:
eve_dir_default_paths = glob.glob("/opt/EVE-*-Source", recursive=False)
eve_bin_path = eve_bin_path or (
os.path.join(eve_dir_default_paths[0], "bin") if eve_dir_default_paths else ""
)
eve_lib_path = eve_lib_path or (
os.path.join(eve_dir_default_paths[0], "lib") if eve_dir_default_paths else ""
)
return eve_bin_path, eve_lib_path
def _load_and_create_eve(self, eve_bin_path, eve_lib_path, image_provider):
from pathlib import Path
os.chdir(eve_bin_path)
if _is_windows:
eve_sdk_path = os.path.join(eve_bin_path, "EveSDK.dll")
root = Path(os.path.abspath(__file__)).parent
if not os.path.isfile(eve_sdk_path):
eve_sdk_path = os.path.join(root.parent.parent, eve_bin_path, "EveSDK.dll")
else:
eve_sdk_path = os.path.join(eve_bin_path, "libEveSDK.so")
if not os.path.isfile(eve_sdk_path):
eve_sdk_path = os.path.join(eve_lib_path, "libEveSDK.so")
eve_sdk_instance = sdk.EveSDK(eve_sdk_path)
ByteArray512 = ctypes.c_byte * 512
encoded = os.path.dirname(eve_bin_path + os.sep).encode("utf-8") # EVE needs os.sep
pathOverride = ByteArray512(*encoded, *([0] * (512 - len(encoded)))) # zero-pad to 512
startup_options = sdk.structs.EveStartupParameters(
pathOverride=pathOverride,
gpuPreference=sdk.structs.EveGpuPreference.EVE_NO_GPU,
imageProvider=image_provider,
startupType=sdk.structs.EveStartupType.EVE_SYNC,
)
err = eve_sdk_instance.CreateEve(startup_options)
if err != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
logger.info(f"CreateEve error code: {err}")
sys.exit(err)
return eve_sdk_instance
def _set_camera(self):
i = 0
self._metaDataFpgaCameraId = -1
self._fpgaCameraId = -1
while True:
cameraInfo = self.eve_sdk.EveGetCamera(i)
if (
cameraInfo.error == sdk.structs.EveError.EVE_INVALID_CAMERA_ID
or cameraInfo.error == sdk.structs.EveError.EVE_NO_MORE_DATA
):
break
pid = ctypes.cast(cameraInfo.data.pid, ctypes.c_char_p).value
vid = ctypes.cast(cameraInfo.data.vid, ctypes.c_char_p).value
if cameraInfo.data.isFpgaCamera == 1:
if self._metaDataFpgaCameraId == -1 and vid == b"META" and pid == b"DATA":
self._metaDataFpgaCameraId = i
elif self._fpgaCameraId == -1:
self._fpgaCameraId = i
print(i, self._fpgaCameraId, self._metaDataFpgaCameraId, cameraInfo.error, pid, vid)
if self._fpgaCameraId >= 0 and self._metaDataFpgaCameraId >= 0:
break
i += 1
if self._fpgaCameraId == -1 and self._metaDataFpgaCameraId == -1:
raise RuntimeError("No FPGA camera found")
print(
f" \n\t\t *** FPGA camera found: {self._fpgaCameraId}, metadata {self._metaDataFpgaCameraId}\n"
)
useMetadataCamera = False
if useMetadataCamera:
self._usedCameraId = self._metaDataFpgaCameraId
else:
self._usedCameraId = self._fpgaCameraId
cameraFormat = sdk.structs.CCameraFormat()
cameraFormat.resolution.width = 640
cameraFormat.resolution.height = 480
cameraFormat.compareResolution = sdk.structs.EveCompare.EVE_AT_MOST
cameraFormat.compareFps = sdk.structs.EveCompare.EVE_AT_LEAST
formats = self.eve_sdk.EveGetFormats(self._usedCameraId, cameraFormat)
f = formats.formats[0]
print(
f"camera selected: ID#{self._usedCameraId}: {f.resolution.width}x{f.resolution.height}, "
f"Format: {f.format} @ {f.fps}FPS"
)
errorCode = self.eve_sdk.EveSetCamera(self._usedCameraId, f)
if errorCode != sdk.structs.EveError.EVE_ERROR_NO_ERROR:
raise RuntimeError(f"Could't set camera {errorCode}")
@staticmethod
def _create_eve_input_image(image_bin, width, height, encoding):
image = sdk.structs.EveInputImage()
image.data = image_bin.ctypes.data_as(ctypes.POINTER(ctypes.c_ubyte))
image.width = width
image.height = height
if encoding == "YUY2":
image.encoding = sdk.structs.EveVideoFormat.EVE_YUY2
elif encoding == "NV12":
image.encoding = sdk.structs.EveVideoFormat.EVE_NV12
elif encoding == "BGR":
image.encoding = sdk.structs.EveVideoFormat.EVE_BGR
elif encoding == "RGB":
image.encoding = sdk.structs.EveVideoFormat.EVE_RGB
return image