File size: 6,922 Bytes
d2885a7
 
 
 
 
 
 
ba73462
d2885a7
 
 
 
 
 
 
 
 
 
ba73462
 
 
 
 
 
 
 
 
 
 
 
 
 
d2885a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ba73462
 
 
 
d2885a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ba73462
 
 
 
 
 
 
d2885a7
 
 
 
 
ba73462
 
 
 
 
d2885a7
 
 
 
ba73462
d2885a7
 
 
 
ba73462
 
 
 
 
 
 
 
 
 
 
 
 
 
d2885a7
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
"""
This file provides a clean serving wrapper around the inference pipeline.
It is designed for backend usage, especially inside FastAPI endpoints later.
The predictor loads configs, initializes the pipeline, and returns structured results.
This keeps API code small and avoids mixing business logic inside route functions.
"""

import base64
from pathlib import Path

import cv2

from src.config.configuration import ConfigurationManager
from src.pipeline.inference_pipeline import InferencePipeline
from src.utils.common import create_directories, save_json
from src.utils.logger import get_logger


def _encode_face_frame(face_frame) -> str | None:
    """
    Encode a face frame (numpy BGR image) to a base64 JPEG string.
    Returns None if encoding fails or frame is None.
    """
    if face_frame is None:
        return None
    try:
        _, jpeg_encoded = cv2.imencode(".jpg", face_frame)
        return base64.b64encode(jpeg_encoded.tobytes()).decode("utf-8")
    except Exception:
        return None


class Predictor:
    """
    Service wrapper used by backend/API code.
    """

    def __init__(self, log_dir: Path | None = None, log_level: str = "INFO") -> None:
        self.config_manager = ConfigurationManager()

        self.paths_config = self.config_manager.get_paths_config()
        self.mmpose_config = self.config_manager.get_mmpose_config()
        self.posture_model_config = self.config_manager.get_posture_model_config()
        self.phone_detector_config = self.config_manager.get_phone_detector_config()
        self.inference_config = self.config_manager.get_inference_config()

        self.logger = get_logger(
            self.__class__.__name__,
            log_dir=log_dir or self.paths_config.logs_dir,
            level=log_level,
        )

        create_directories(
            [
                self.paths_config.predictions_dir,
                self.paths_config.frontend_result_dir,
                self.paths_config.metrics_dir,
            ]
        )

        self.pipeline = InferencePipeline(
            mmpose_config=self.mmpose_config,
            posture_model_config=self.posture_model_config,
            phone_detector_config=self.phone_detector_config,
            inference_config=self.inference_config,
            log_dir=log_dir or self.paths_config.logs_dir,
            log_level=log_level,
        )

    def predict_image(
        self, image_path: Path, save_rendered_output: bool = True
    ) -> dict:
        """
        Run full inference on one image and optionally save rendered output.
        """
        image_path = Path(image_path)

        if not image_path.exists():
            raise FileNotFoundError(f"Input image not found: {image_path}")

        frame = cv2.imread(str(image_path))
        if frame is None:
            raise ValueError(f"Could not read image: {image_path}")

        result = self.pipeline.run_on_frame(frame=frame, draw_visualizer=False)

        saved_result_path = None
        if save_rendered_output:
            output_name = f"pred_{image_path.stem}.jpg"
            output_path = self.paths_config.frontend_result_dir / output_name
            save_ok = cv2.imwrite(str(output_path), result["frame"])
            if not save_ok:
                raise IOError(f"Could not save rendered output image: {output_path}")
            saved_result_path = str(output_path)

        response = {
            "num_persons": result["num_persons"],
            "person_results": [
                {
                    "posture": person_result["posture"],
                    "phone": person_result["phone"],
                    "state": person_result["state"],
                    "display_text": person_result["display_text"],
                    "score_text": person_result["score_text"],
                    "face_xyxy": person_result.get("face_xyxy"),
                    "announced_face_frame": _encode_face_frame(
                        person_result.get("announced_face_frame")
                    ),
                }
                for person_result in result["person_results"]
            ],
            "saved_result_path": saved_result_path,
        }

        save_json(self.paths_config.metrics_dir / "latest_prediction.json", response)
        self.logger.info("Prediction response: %s", response)
        return response

    def predict_video(
        self, video_path: Path, frame_step: int = 10, save_rendered_output: bool = True
    ) -> dict:
        video_path = Path(video_path)
        if not video_path.exists():
            raise FileNotFoundError(f"Video not found: {video_path}")

        cap = cv2.VideoCapture(str(video_path))
        frame_count = 0
        saved_count = 0
        all_results = []

        # Persist runtime_parameters across frames so face announce interval works correctly.
        runtime_parameters = {
            "time_last_record_framerate": 0.0,
            "time_last_announce_face": 0.0,
            "path_runtime_handframes": None,
        }

        while True:
            ret, frame = cap.read()
            if not ret:
                break
            if frame_count % frame_step == 0:
                result = self.pipeline.run_on_frame(
                    frame=frame,
                    draw_visualizer=False,
                    runtime_parameters=runtime_parameters,
                )
                if save_rendered_output:
                    out_name = f"pred_{video_path.stem}_frame{saved_count:04d}.jpg"
                    out_path = self.paths_config.frontend_result_dir / out_name
                    cv2.imwrite(str(out_path), result["frame"])

                all_results.append(
                    {
                        "frame_index": frame_count,
                        "num_persons": result["num_persons"],
                        "person_results": [
                            {
                                "posture": person_result["posture"],
                                "phone": person_result["phone"],
                                "state": person_result["state"],
                                "display_text": person_result["display_text"],
                                "score_text": person_result["score_text"],
                                "face_xyxy": person_result.get("face_xyxy"),
                                "announced_face_frame": _encode_face_frame(
                                    person_result.get("announced_face_frame")
                                ),
                            }
                            for person_result in result["person_results"]
                        ],
                    }
                )
                saved_count += 1
            frame_count += 1

        cap.release()
        self.logger.info("Video inference done. Frames processed: %s", saved_count)
        return {"total_frames_processed": saved_count, "results": all_results}