Spaces:
Running
Running
File size: 6,922 Bytes
d2885a7 ba73462 d2885a7 ba73462 d2885a7 ba73462 d2885a7 ba73462 d2885a7 ba73462 d2885a7 ba73462 d2885a7 ba73462 d2885a7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | """
This file provides a clean serving wrapper around the inference pipeline.
It is designed for backend usage, especially inside FastAPI endpoints later.
The predictor loads configs, initializes the pipeline, and returns structured results.
This keeps API code small and avoids mixing business logic inside route functions.
"""
import base64
from pathlib import Path
import cv2
from src.config.configuration import ConfigurationManager
from src.pipeline.inference_pipeline import InferencePipeline
from src.utils.common import create_directories, save_json
from src.utils.logger import get_logger
def _encode_face_frame(face_frame) -> str | None:
"""
Encode a face frame (numpy BGR image) to a base64 JPEG string.
Returns None if encoding fails or frame is None.
"""
if face_frame is None:
return None
try:
_, jpeg_encoded = cv2.imencode(".jpg", face_frame)
return base64.b64encode(jpeg_encoded.tobytes()).decode("utf-8")
except Exception:
return None
class Predictor:
"""
Service wrapper used by backend/API code.
"""
def __init__(self, log_dir: Path | None = None, log_level: str = "INFO") -> None:
self.config_manager = ConfigurationManager()
self.paths_config = self.config_manager.get_paths_config()
self.mmpose_config = self.config_manager.get_mmpose_config()
self.posture_model_config = self.config_manager.get_posture_model_config()
self.phone_detector_config = self.config_manager.get_phone_detector_config()
self.inference_config = self.config_manager.get_inference_config()
self.logger = get_logger(
self.__class__.__name__,
log_dir=log_dir or self.paths_config.logs_dir,
level=log_level,
)
create_directories(
[
self.paths_config.predictions_dir,
self.paths_config.frontend_result_dir,
self.paths_config.metrics_dir,
]
)
self.pipeline = InferencePipeline(
mmpose_config=self.mmpose_config,
posture_model_config=self.posture_model_config,
phone_detector_config=self.phone_detector_config,
inference_config=self.inference_config,
log_dir=log_dir or self.paths_config.logs_dir,
log_level=log_level,
)
def predict_image(
self, image_path: Path, save_rendered_output: bool = True
) -> dict:
"""
Run full inference on one image and optionally save rendered output.
"""
image_path = Path(image_path)
if not image_path.exists():
raise FileNotFoundError(f"Input image not found: {image_path}")
frame = cv2.imread(str(image_path))
if frame is None:
raise ValueError(f"Could not read image: {image_path}")
result = self.pipeline.run_on_frame(frame=frame, draw_visualizer=False)
saved_result_path = None
if save_rendered_output:
output_name = f"pred_{image_path.stem}.jpg"
output_path = self.paths_config.frontend_result_dir / output_name
save_ok = cv2.imwrite(str(output_path), result["frame"])
if not save_ok:
raise IOError(f"Could not save rendered output image: {output_path}")
saved_result_path = str(output_path)
response = {
"num_persons": result["num_persons"],
"person_results": [
{
"posture": person_result["posture"],
"phone": person_result["phone"],
"state": person_result["state"],
"display_text": person_result["display_text"],
"score_text": person_result["score_text"],
"face_xyxy": person_result.get("face_xyxy"),
"announced_face_frame": _encode_face_frame(
person_result.get("announced_face_frame")
),
}
for person_result in result["person_results"]
],
"saved_result_path": saved_result_path,
}
save_json(self.paths_config.metrics_dir / "latest_prediction.json", response)
self.logger.info("Prediction response: %s", response)
return response
def predict_video(
self, video_path: Path, frame_step: int = 10, save_rendered_output: bool = True
) -> dict:
video_path = Path(video_path)
if not video_path.exists():
raise FileNotFoundError(f"Video not found: {video_path}")
cap = cv2.VideoCapture(str(video_path))
frame_count = 0
saved_count = 0
all_results = []
# Persist runtime_parameters across frames so face announce interval works correctly.
runtime_parameters = {
"time_last_record_framerate": 0.0,
"time_last_announce_face": 0.0,
"path_runtime_handframes": None,
}
while True:
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_step == 0:
result = self.pipeline.run_on_frame(
frame=frame,
draw_visualizer=False,
runtime_parameters=runtime_parameters,
)
if save_rendered_output:
out_name = f"pred_{video_path.stem}_frame{saved_count:04d}.jpg"
out_path = self.paths_config.frontend_result_dir / out_name
cv2.imwrite(str(out_path), result["frame"])
all_results.append(
{
"frame_index": frame_count,
"num_persons": result["num_persons"],
"person_results": [
{
"posture": person_result["posture"],
"phone": person_result["phone"],
"state": person_result["state"],
"display_text": person_result["display_text"],
"score_text": person_result["score_text"],
"face_xyxy": person_result.get("face_xyxy"),
"announced_face_frame": _encode_face_frame(
person_result.get("announced_face_frame")
),
}
for person_result in result["person_results"]
],
}
)
saved_count += 1
frame_count += 1
cap.release()
self.logger.info("Video inference done. Frames processed: %s", saved_count)
return {"total_frames_processed": saved_count, "results": all_results}
|