Copilot
Add portfolio pages and overlay playback
4a9e84e
from __future__ import annotations
import json
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
import cv2
import numpy as np
STREAMLIT_SERVER_MAX_UPLOAD_MB = 4096
INLINE_VIDEO_PREVIEW_MAX_MB = 256
MIN_FREE_WORKSPACE_BUFFER_MB = int(os.getenv("SPACE_MIN_FREE_SPACE_BUFFER_MB", "2048"))
COPY_CHUNK_BYTES = 8 * 1024 * 1024
SUPPORTED_VIDEO_TYPES = ["mp4", "mov", "avi", "mkv", "webm", "m4v"]
PHASE_OVERLAY_BGR = {
"idle": (139, 125, 96),
"marking": (255, 140, 79),
"injection": (255, 198, 104),
"dissection": (107, 107, 255),
"unknown": (248, 250, 252),
}
HUD_BACKGROUND_BGR = (20, 10, 6)
HUD_TEXT_BGR = (252, 250, 248)
HUD_MUTED_BGR = (215, 200, 191)
def format_bytes(num_bytes: int) -> str:
value = float(num_bytes)
for unit in ["B", "KB", "MB", "GB", "TB"]:
if value < 1024.0 or unit == "TB":
return f"{value:.1f} {unit}"
value /= 1024.0
return f"{value:.1f} TB"
def format_duration(seconds: float | None) -> str:
if seconds is None or seconds <= 0:
return "Unknown"
whole_seconds = int(round(seconds))
hours, remainder = divmod(whole_seconds, 3600)
minutes, secs = divmod(remainder, 60)
if hours > 0:
return f"{hours:d}:{minutes:02d}:{secs:02d}"
return f"{minutes:d}:{secs:02d}"
def recommended_frame_stride(duration_seconds: float | None) -> int:
if duration_seconds is None or duration_seconds <= 0:
return 5
if duration_seconds < 600:
return 1
if duration_seconds < 1800:
return 2
if duration_seconds < 3600:
return 3
return 5
def get_upload_size_bytes(uploaded_file) -> int:
size = getattr(uploaded_file, "size", None)
if size is not None:
return int(size)
current_position = uploaded_file.tell()
uploaded_file.seek(0, os.SEEK_END)
size = uploaded_file.tell()
uploaded_file.seek(current_position)
return int(size)
def should_show_inline_preview(file_size_bytes: int) -> bool:
return file_size_bytes <= INLINE_VIDEO_PREVIEW_MAX_MB * 1024 * 1024
def get_workspace_free_bytes(temp_dir: Path | None = None) -> int:
target_dir = Path(temp_dir or tempfile.gettempdir())
target_dir.mkdir(parents=True, exist_ok=True)
return shutil.disk_usage(target_dir).free
def create_temp_video_path(
*, suffix: str = ".mp4", prefix: str = "portfolio-overlay-", temp_dir: Path | None = None
) -> Path:
target_dir = Path(temp_dir or tempfile.gettempdir())
target_dir.mkdir(parents=True, exist_ok=True)
fd, temp_name = tempfile.mkstemp(prefix=prefix, suffix=suffix, dir=target_dir)
os.close(fd)
return Path(temp_name)
def create_overlay_video_writer(
*, frame_size: tuple[int, int], fps: float, temp_dir: Path | None = None
) -> tuple[cv2.VideoWriter, Path]:
effective_fps = fps if fps > 0 else 24.0
candidates = (
("mp4v", ".mp4"),
("avc1", ".mp4"),
("MJPG", ".avi"),
("XVID", ".avi"),
)
tried = []
for codec, suffix in candidates:
temp_path = create_temp_video_path(
prefix="portfolio-overlay-raw-",
suffix=suffix,
temp_dir=temp_dir,
)
writer = cv2.VideoWriter(
str(temp_path),
cv2.VideoWriter_fourcc(*codec),
effective_fps,
frame_size,
)
if writer.isOpened():
return writer, temp_path
writer.release()
temp_path.unlink(missing_ok=True)
tried.append(f"{codec}{suffix}")
raise RuntimeError(
"Unable to create annotated playback video for this environment. "
f"Tried {', '.join(tried)}."
)
def spool_uploaded_video(uploaded_file, suffix: str | None = None, temp_dir: Path | None = None) -> Path:
file_size_bytes = get_upload_size_bytes(uploaded_file)
if file_size_bytes > STREAMLIT_SERVER_MAX_UPLOAD_MB * 1024 * 1024:
raise RuntimeError(
f"This upload exceeds the {STREAMLIT_SERVER_MAX_UPLOAD_MB} MB Space limit."
)
target_dir = Path(temp_dir or tempfile.gettempdir())
target_dir.mkdir(parents=True, exist_ok=True)
free_bytes = get_workspace_free_bytes(target_dir)
required_bytes = file_size_bytes + (MIN_FREE_WORKSPACE_BUFFER_MB * 1024 * 1024)
if free_bytes < required_bytes:
raise RuntimeError(
"Not enough working storage to stage this upload. "
f"Need at least {format_bytes(required_bytes)} free, only {format_bytes(free_bytes)} available."
)
suffix = suffix or Path(getattr(uploaded_file, "name", "")).suffix or ".mp4"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix, dir=target_dir) as tmp:
uploaded_file.seek(0)
shutil.copyfileobj(uploaded_file, tmp, length=COPY_CHUNK_BYTES)
temp_path = Path(tmp.name)
uploaded_file.seek(0)
return temp_path
def draw_prediction_overlay(
frame: np.ndarray,
*,
phase: str,
confidence: float,
model_label: str,
frame_index: int,
fps: float,
total_frames: int | None = None,
sampled_frame: bool = True,
) -> np.ndarray:
annotated = frame.copy()
overlay = annotated.copy()
height, width = annotated.shape[:2]
margin = max(16, width // 64)
available_width = max(120, width - (margin * 2))
available_height = max(72, height - (margin * 2))
panel_width = min(max(int(width * 0.44), 240), available_width)
panel_height = min(max(int(height * 0.16), 88), min(140, available_height))
accent = _phase_overlay_color_bgr(phase)
panel_x1 = margin
panel_y1 = max(margin, height - panel_height - margin)
panel_x2 = panel_x1 + panel_width
panel_y2 = panel_y1 + panel_height
cv2.rectangle(overlay, (panel_x1, panel_y1), (panel_x2, panel_y2), HUD_BACKGROUND_BGR, -1)
cv2.rectangle(overlay, (panel_x1, panel_y1), (panel_x2, panel_y2), accent, 2)
cv2.rectangle(overlay, (panel_x1 + 12, panel_y1 + 12), (panel_x1 + 18, panel_y2 - 12), accent, -1)
cv2.addWeighted(overlay, 0.68, annotated, 0.32, 0, annotated)
base_scale = max(0.52, min(1.0, width / 1200.0))
phase_label = phase.title() if phase and phase.lower() != "unknown" else "--"
status_label = "Sampled frame" if sampled_frame else "Carry-forward overlay"
_draw_shadowed_text(
annotated,
f"Phase: {phase_label}",
(panel_x1 + 34, panel_y1 + 32),
accent,
font_scale=base_scale * 0.92,
thickness=2,
)
_draw_shadowed_text(
annotated,
f"Confidence {confidence:.1%}{status_label}",
(panel_x1 + 34, panel_y1 + 58),
HUD_TEXT_BGR,
font_scale=base_scale * 0.62,
thickness=1,
)
current_time = format_duration(frame_index / fps if fps > 0 else None)
total_time = format_duration(total_frames / fps if fps > 0 and total_frames else None)
header_text = model_label
if current_time != "Unknown" and total_time != "Unknown":
header_text = f"{header_text}{current_time} / {total_time}"
elif current_time != "Unknown":
header_text = f"{header_text}{current_time}"
header_scale = max(0.46, min(0.62, width / 1600.0))
text_width, text_height = cv2.getTextSize(
header_text, cv2.FONT_HERSHEY_SIMPLEX, header_scale, 1
)[0]
header_width = min(text_width + 28, available_width)
header_height = text_height + 18
header_x2 = width - margin
header_x1 = max(margin, header_x2 - header_width)
header_y1 = margin
header_y2 = header_y1 + header_height
header_overlay = annotated.copy()
cv2.rectangle(header_overlay, (header_x1, header_y1), (header_x2, header_y2), HUD_BACKGROUND_BGR, -1)
cv2.rectangle(header_overlay, (header_x1, header_y1), (header_x2, header_y2), accent, 1)
cv2.addWeighted(header_overlay, 0.62, annotated, 0.38, 0, annotated)
_draw_shadowed_text(
annotated,
header_text,
(header_x1 + 14, header_y1 + text_height + 5),
HUD_MUTED_BGR,
font_scale=header_scale,
thickness=1,
)
if total_frames and total_frames > 0:
progress_ratio = min(max((frame_index + 1) / total_frames, 0.0), 1.0)
bar_x1 = margin
bar_y1 = height - 8
bar_x2 = width - margin
bar_y2 = height - 4
cv2.rectangle(annotated, (bar_x1, bar_y1), (bar_x2, bar_y2), (60, 67, 82), -1)
cv2.rectangle(
annotated,
(bar_x1, bar_y1),
(bar_x1 + int((bar_x2 - bar_x1) * progress_ratio), bar_y2),
accent,
-1,
)
return annotated
def transcode_video_for_streamlit(
video_path: str | Path, *, temp_dir: Path | None = None
) -> tuple[Path, str | None]:
input_path = Path(video_path)
ffmpeg_path = shutil.which("ffmpeg")
if ffmpeg_path is None:
return input_path, "ffmpeg is unavailable, so the raw overlay clip is being used for playback."
output_path = create_temp_video_path(prefix="portfolio-overlay-final-", suffix=".mp4", temp_dir=temp_dir)
command = [
ffmpeg_path,
"-y",
"-loglevel",
"error",
"-i",
str(input_path),
"-an",
"-c:v",
"libx264",
"-preset",
"veryfast",
"-pix_fmt",
"yuv420p",
"-movflags",
"+faststart",
str(output_path),
]
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=600, check=False)
except subprocess.TimeoutExpired:
output_path.unlink(missing_ok=True)
return input_path, "ffmpeg timed out while transcoding the overlay clip, so the raw overlay video is being used."
if result.returncode != 0:
output_path.unlink(missing_ok=True)
error_line = result.stderr.strip().splitlines()[-1] if result.stderr.strip() else "unknown ffmpeg error"
return (
input_path,
f"ffmpeg could not transcode the overlay clip for browser-friendly playback ({error_line}). "
"Using the directly encoded clip instead.",
)
input_path.unlink(missing_ok=True)
return output_path, None
def probe_video_info(video_path: str | Path) -> dict:
path = Path(video_path)
if not path.exists():
raise RuntimeError(f"Video file not found: {path}")
file_size_bytes = path.stat().st_size
if file_size_bytes <= 0:
raise RuntimeError(f"Video file is empty: {path}")
info = {
"path": str(path),
"name": path.name,
"file_size_bytes": file_size_bytes,
"file_size_label": format_bytes(file_size_bytes),
}
capture = cv2.VideoCapture(str(path))
open_cv_info = {}
if capture.isOpened():
fps = float(capture.get(cv2.CAP_PROP_FPS) or 0.0)
frame_count = int(capture.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH) or 0)
height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0)
capture.release()
if width > 0 and height > 0:
duration_seconds = frame_count / fps if fps > 0 and frame_count > 0 else 0.0
open_cv_info = {
"duration_seconds": duration_seconds,
"fps": fps,
"frame_count": frame_count,
"width": width,
"height": height,
}
else:
capture.release()
ffprobe_info = {}
if (
not open_cv_info
or open_cv_info.get("duration_seconds", 0.0) <= 0
or open_cv_info.get("fps", 0.0) <= 0
or open_cv_info.get("frame_count", 0) <= 0
):
ffprobe_info = _probe_video_info_ffprobe(path)
merged = {**info, **open_cv_info, **ffprobe_info}
if merged.get("width", 0) <= 0 or merged.get("height", 0) <= 0:
raise RuntimeError(f"Could not determine video dimensions for {path.name}")
merged["duration_label"] = format_duration(merged.get("duration_seconds"))
merged["resolution_label"] = f"{merged['width']}x{merged['height']}"
return merged
def _probe_video_info_ffprobe(video_path: Path) -> dict:
ffprobe_path = shutil.which("ffprobe")
if ffprobe_path is None:
return {}
command = [
ffprobe_path,
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=width,height,avg_frame_rate,r_frame_rate,nb_frames",
"-show_entries",
"format=duration,format_name",
"-of",
"json",
str(video_path),
]
result = subprocess.run(command, capture_output=True, text=True, timeout=30, check=False)
if result.returncode != 0:
error_message = result.stderr.strip() or "Unknown ffprobe error"
raise RuntimeError(f"ffprobe failed for {video_path.name}: {error_message}")
try:
probe_data = json.loads(result.stdout)
except json.JSONDecodeError as exc:
raise RuntimeError(f"Unable to parse ffprobe output for {video_path.name}") from exc
streams = probe_data.get("streams", [])
if not streams:
raise RuntimeError(f"No video stream found in {video_path.name}")
stream = streams[0]
format_info = probe_data.get("format", {})
width = int(stream.get("width") or 0)
height = int(stream.get("height") or 0)
duration_seconds = float(format_info.get("duration") or 0.0)
fps = _parse_fraction(stream.get("avg_frame_rate") or stream.get("r_frame_rate") or "0/1")
frame_count = int(stream.get("nb_frames") or 0)
if frame_count <= 0 and duration_seconds > 0 and fps > 0:
frame_count = int(round(duration_seconds * fps))
return {
"duration_seconds": duration_seconds,
"fps": fps,
"frame_count": frame_count,
"width": width,
"height": height,
"format_name": format_info.get("format_name"),
}
def _parse_fraction(value: str) -> float:
if "/" in value:
numerator, denominator = value.split("/", 1)
try:
denominator_value = float(denominator)
if denominator_value == 0:
return 0.0
return float(numerator) / denominator_value
except ValueError:
return 0.0
try:
return float(value)
except ValueError:
return 0.0
def _phase_overlay_color_bgr(phase: str | None) -> tuple[int, int, int]:
phase_key = (phase or "").strip().lower()
return PHASE_OVERLAY_BGR.get(phase_key, PHASE_OVERLAY_BGR["unknown"])
def _draw_shadowed_text(
frame: np.ndarray,
text: str,
origin: tuple[int, int],
color: tuple[int, int, int],
*,
font_scale: float,
thickness: int,
) -> None:
shadow_origin = (origin[0] + 1, origin[1] + 1)
cv2.putText(
frame,
text,
shadow_origin,
cv2.FONT_HERSHEY_SIMPLEX,
font_scale,
(0, 0, 0),
thickness + 2,
cv2.LINE_AA,
)
cv2.putText(
frame,
text,
origin,
cv2.FONT_HERSHEY_SIMPLEX,
font_scale,
color,
thickness,
cv2.LINE_AA,
)