tiena2cva commited on
Commit
79624b7
·
1 Parent(s): 95fec78

fix(renderer): preserve video colors in annotations

Browse files
src/pozify/steps/annotated_renderer.py CHANGED
@@ -1,6 +1,9 @@
1
  from __future__ import annotations
2
 
 
3
  from pathlib import Path
 
 
4
  from typing import Any
5
 
6
  import cv2
@@ -24,6 +27,186 @@ SKELETON_EDGES = [
24
  ]
25
 
26
  PREFERRED_VIDEO_CODECS = ("avc1", "H264", "mp4v")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
 
28
 
29
  def _frame_landmark_points(
@@ -103,7 +286,12 @@ def _draw_overlays(
103
  )
104
 
105
 
106
- def _open_video_writer(output_path: Path, fps: float, width: int, height: int) -> tuple[cv2.VideoWriter | None, str | None]:
 
 
 
 
 
107
  for codec in PREFERRED_VIDEO_CODECS:
108
  writer = cv2.VideoWriter(
109
  str(output_path),
@@ -127,9 +315,21 @@ def run(
127
  if not manifest.analysis_allowed or not manifest.video_path:
128
  return manifest.video_path
129
 
130
- capture = cv2.VideoCapture(manifest.video_path)
 
 
 
 
 
 
 
 
 
 
131
  if not capture.isOpened():
132
  capture.release()
 
 
133
  return manifest.video_path
134
 
135
  fps = manifest.fps if manifest.fps > 0 else 30.0
@@ -140,9 +340,12 @@ def run(
140
  return manifest.video_path
141
 
142
  output_path = run_dir / "annotated_video.mp4"
143
- writer, _codec = _open_video_writer(output_path, fps, width, height)
 
144
  if writer is None:
145
  capture.release()
 
 
146
  return manifest.video_path
147
 
148
  pose_by_frame = {frame.frame_index: frame for frame in pose_sequence.frames}
@@ -180,5 +383,13 @@ def run(
180
  finally:
181
  writer.release()
182
  capture.release()
 
 
 
 
 
 
 
 
183
 
184
  return str(output_path)
 
1
  from __future__ import annotations
2
 
3
+ import json
4
  from pathlib import Path
5
+ import shutil
6
+ import subprocess
7
  from typing import Any
8
 
9
  import cv2
 
27
  ]
28
 
29
  PREFERRED_VIDEO_CODECS = ("avc1", "H264", "mp4v")
30
+ HDR_TRANSFERS = {"arib-std-b67", "smpte2084"}
31
+ HDR_PRIMARIES = {"bt2020"}
32
+ BT709_COLOR_ARGS = (
33
+ "-color_primaries",
34
+ "bt709",
35
+ "-color_trc",
36
+ "bt709",
37
+ "-colorspace",
38
+ "bt709",
39
+ )
40
+
41
+
42
+ def _tool_path(name: str) -> str | None:
43
+ return shutil.which(name)
44
+
45
+
46
+ def _video_color_metadata(video_path: str) -> dict[str, str]:
47
+ ffprobe = _tool_path("ffprobe")
48
+ if ffprobe is None:
49
+ return {}
50
+
51
+ command = [
52
+ ffprobe,
53
+ "-v",
54
+ "error",
55
+ "-select_streams",
56
+ "v:0",
57
+ "-show_entries",
58
+ "stream=color_space,color_transfer,color_primaries,color_range",
59
+ "-of",
60
+ "json",
61
+ video_path,
62
+ ]
63
+ try:
64
+ result = subprocess.run(
65
+ command,
66
+ check=True,
67
+ capture_output=True,
68
+ text=True,
69
+ timeout=10,
70
+ )
71
+ payload = json.loads(result.stdout)
72
+ except (subprocess.SubprocessError, json.JSONDecodeError, OSError):
73
+ return {}
74
+
75
+ streams = payload.get("streams")
76
+ if not isinstance(streams, list) or not streams:
77
+ return {}
78
+ stream = streams[0]
79
+ if not isinstance(stream, dict):
80
+ return {}
81
+ metadata: dict[str, str] = {}
82
+ for key, value in stream.items():
83
+ if value is None or isinstance(value, (dict, list)):
84
+ continue
85
+ normalized_value = str(value).lower()
86
+ if normalized_value == "unknown":
87
+ continue
88
+ metadata[key] = normalized_value
89
+ return metadata
90
+
91
+
92
+ def _needs_sdr_conversion(color_metadata: dict[str, str]) -> bool:
93
+ transfer = color_metadata.get("color_transfer", "")
94
+ primaries = color_metadata.get("color_primaries", "")
95
+ return transfer in HDR_TRANSFERS or primaries in HDR_PRIMARIES
96
+
97
+
98
+ def _sdr_filter(color_metadata: dict[str, str]) -> str:
99
+ transfer = color_metadata.get("color_transfer", "arib-std-b67")
100
+ primaries = color_metadata.get("color_primaries", "bt2020")
101
+ matrix = color_metadata.get("color_space", "bt2020nc")
102
+ if transfer not in HDR_TRANSFERS:
103
+ transfer = "arib-std-b67"
104
+ if primaries not in HDR_PRIMARIES:
105
+ primaries = "bt2020"
106
+ if matrix not in {"bt2020nc", "bt2020c"}:
107
+ matrix = "bt2020nc"
108
+
109
+ return (
110
+ f"zscale=transfer=linear:transferin={transfer}:"
111
+ f"primariesin={primaries}:matrixin={matrix}:npl=100,"
112
+ "tonemap=tonemap=hable:desat=0,"
113
+ "zscale=transfer=bt709:primaries=bt709:matrix=bt709:range=tv,"
114
+ "format=yuv420p"
115
+ )
116
+
117
+
118
+ def _transcode_hdr_to_sdr(
119
+ input_path: Path,
120
+ output_path: Path,
121
+ color_metadata: dict[str, str],
122
+ ) -> bool:
123
+ ffmpeg = _tool_path("ffmpeg")
124
+ if ffmpeg is None:
125
+ return False
126
+
127
+ command = [
128
+ ffmpeg,
129
+ "-y",
130
+ "-v",
131
+ "error",
132
+ "-i",
133
+ str(input_path),
134
+ "-vf",
135
+ _sdr_filter(color_metadata),
136
+ "-an",
137
+ "-c:v",
138
+ "libx264",
139
+ "-preset",
140
+ "veryfast",
141
+ "-crf",
142
+ "18",
143
+ "-pix_fmt",
144
+ "yuv420p",
145
+ *BT709_COLOR_ARGS,
146
+ str(output_path),
147
+ ]
148
+ try:
149
+ subprocess.run(command, check=True, capture_output=True, timeout=120)
150
+ except (subprocess.SubprocessError, OSError):
151
+ return False
152
+ return output_path.exists() and output_path.stat().st_size > 0
153
+
154
+
155
+ def _encode_bt709_output(
156
+ raw_video_path: Path,
157
+ output_path: Path,
158
+ audio_source_path: Path | None,
159
+ ) -> bool:
160
+ ffmpeg = _tool_path("ffmpeg")
161
+ if ffmpeg is None:
162
+ return False
163
+
164
+ command = [
165
+ ffmpeg,
166
+ "-y",
167
+ "-v",
168
+ "error",
169
+ "-i",
170
+ str(raw_video_path),
171
+ ]
172
+ if audio_source_path is not None:
173
+ command.extend(["-i", str(audio_source_path)])
174
+
175
+ command.extend(
176
+ [
177
+ "-map",
178
+ "0:v:0",
179
+ ]
180
+ )
181
+ if audio_source_path is not None:
182
+ command.extend(["-map", "1:a?"])
183
+
184
+ command.extend(
185
+ [
186
+ "-c:v",
187
+ "libx264",
188
+ "-preset",
189
+ "veryfast",
190
+ "-crf",
191
+ "18",
192
+ "-vf",
193
+ "setparams=color_primaries=bt709:color_trc=bt709:colorspace=bt709,format=yuv420p",
194
+ "-pix_fmt",
195
+ "yuv420p",
196
+ *BT709_COLOR_ARGS,
197
+ ]
198
+ )
199
+ if audio_source_path is not None:
200
+ command.extend(["-c:a", "aac", "-b:a", "128k", "-shortest"])
201
+ else:
202
+ command.append("-an")
203
+ command.append(str(output_path))
204
+
205
+ try:
206
+ subprocess.run(command, check=True, capture_output=True, timeout=120)
207
+ except (subprocess.SubprocessError, OSError):
208
+ return False
209
+ return output_path.exists() and output_path.stat().st_size > 0
210
 
211
 
212
  def _frame_landmark_points(
 
286
  )
287
 
288
 
289
+ def _open_video_writer(
290
+ output_path: Path,
291
+ fps: float,
292
+ width: int,
293
+ height: int,
294
+ ) -> tuple[cv2.VideoWriter | None, str | None]:
295
  for codec in PREFERRED_VIDEO_CODECS:
296
  writer = cv2.VideoWriter(
297
  str(output_path),
 
315
  if not manifest.analysis_allowed or not manifest.video_path:
316
  return manifest.video_path
317
 
318
+ source_path = Path(manifest.video_path)
319
+ color_metadata = _video_color_metadata(manifest.video_path)
320
+ render_input_path = source_path
321
+ temporary_paths: list[Path] = []
322
+ if _needs_sdr_conversion(color_metadata):
323
+ sdr_input_path = run_dir / "renderer_sdr_input.mp4"
324
+ if _transcode_hdr_to_sdr(source_path, sdr_input_path, color_metadata):
325
+ render_input_path = sdr_input_path
326
+ temporary_paths.append(sdr_input_path)
327
+
328
+ capture = cv2.VideoCapture(str(render_input_path))
329
  if not capture.isOpened():
330
  capture.release()
331
+ for temporary_path in temporary_paths:
332
+ temporary_path.unlink(missing_ok=True)
333
  return manifest.video_path
334
 
335
  fps = manifest.fps if manifest.fps > 0 else 30.0
 
340
  return manifest.video_path
341
 
342
  output_path = run_dir / "annotated_video.mp4"
343
+ raw_output_path = run_dir / "annotated_video_raw.mp4" if _tool_path("ffmpeg") else output_path
344
+ writer, _codec = _open_video_writer(raw_output_path, fps, width, height)
345
  if writer is None:
346
  capture.release()
347
+ for temporary_path in temporary_paths:
348
+ temporary_path.unlink(missing_ok=True)
349
  return manifest.video_path
350
 
351
  pose_by_frame = {frame.frame_index: frame for frame in pose_sequence.frames}
 
383
  finally:
384
  writer.release()
385
  capture.release()
386
+ for temporary_path in temporary_paths:
387
+ temporary_path.unlink(missing_ok=True)
388
+
389
+ if raw_output_path != output_path:
390
+ encoded = _encode_bt709_output(raw_output_path, output_path, source_path)
391
+ if not encoded:
392
+ raw_output_path.replace(output_path)
393
+ raw_output_path.unlink(missing_ok=True)
394
 
395
  return str(output_path)
tests/test_annotated_renderer.py CHANGED
@@ -95,6 +95,26 @@ class AnnotatedRendererTests(unittest.TestCase):
95
  self.assertIsNotNone(output_path)
96
  self.assertTrue(Path(str(output_path)).exists())
97
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98
 
99
  if __name__ == "__main__":
100
  unittest.main()
 
95
  self.assertIsNotNone(output_path)
96
  self.assertTrue(Path(str(output_path)).exists())
97
 
98
+ def test_hdr_metadata_requires_sdr_conversion(self) -> None:
99
+ self.assertTrue(
100
+ annotated_renderer._needs_sdr_conversion(
101
+ {
102
+ "color_space": "bt2020nc",
103
+ "color_transfer": "arib-std-b67",
104
+ "color_primaries": "bt2020",
105
+ }
106
+ )
107
+ )
108
+ self.assertFalse(
109
+ annotated_renderer._needs_sdr_conversion(
110
+ {
111
+ "color_space": "bt709",
112
+ "color_transfer": "bt709",
113
+ "color_primaries": "bt709",
114
+ }
115
+ )
116
+ )
117
+
118
 
119
  if __name__ == "__main__":
120
  unittest.main()