| import argparse
|
| import json
|
| import tempfile
|
| from pathlib import Path
|
| from typing import List, Tuple, Dict
|
| import urllib.request
|
| import urllib.parse
|
| import urllib.error
|
|
|
| import cv2
|
| import numpy as np
|
|
|
| from miner1 import TVFrameResult, BoundingBox
|
| from keypoint_evaluation import (
|
| load_template_from_file,
|
| )
|
| from test_predict_batch import (
|
| evaluate_keypoints_batch,
|
| visualize_keypoint_evaluation,
|
| )
|
|
|
|
|
| def fetch_json_data(url: str) -> dict:
|
| """Fetch JSON data from URL."""
|
| print(f"Fetching data from {url}...")
|
|
|
|
|
| req = urllib.request.Request(url)
|
| req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
|
| req.add_header('Accept', 'application/json, text/plain, */*')
|
| req.add_header('Accept-Language', 'en-US,en;q=0.9')
|
|
|
| try:
|
| with urllib.request.urlopen(req) as response:
|
| data = json.loads(response.read().decode('utf-8'))
|
| predictions = data.get('predictions', {})
|
| frames_list = predictions.get('frames', [])
|
| print(f"Successfully fetched data with {len(frames_list)} frames")
|
| return data
|
| except urllib.error.HTTPError as e:
|
| print(f"HTTP Error {e.code}: {e.reason}")
|
| if e.code == 403:
|
| print("403 Forbidden: The server is blocking the request. This might require authentication or different headers.")
|
| raise
|
| except urllib.error.URLError as e:
|
| print(f"URL Error: {e.reason}")
|
| raise
|
|
|
|
|
| def download_video(video_url: str, output_path: Path) -> Path:
|
| """Download video from URL to local file."""
|
| print(f"Downloading video from {video_url}...")
|
| output_path.parent.mkdir(parents=True, exist_ok=True)
|
| urllib.request.urlretrieve(video_url, str(output_path))
|
| print(f"Video downloaded to {output_path}")
|
| return output_path
|
|
|
|
|
| def extract_frames_from_video(video_path: Path, frame_ids: List[int] = None) -> Dict[int, np.ndarray]:
|
| """Extract frames from video, optionally only specific frame IDs."""
|
| print(f"Extracting frames from {video_path}...")
|
| cap = cv2.VideoCapture(str(video_path))
|
| if not cap.isOpened():
|
| raise RuntimeError(f"Unable to open video: {video_path}")
|
|
|
| frames = {}
|
| frame_count = 0
|
|
|
| while True:
|
| ret, frame = cap.read()
|
| if not ret:
|
| break
|
|
|
| if frame_ids is None or frame_count in frame_ids:
|
| frames[frame_count] = frame
|
|
|
| frame_count += 1
|
|
|
| cap.release()
|
| print(f"Extracted {len(frames)} frames from video")
|
| return frames
|
|
|
|
|
| def convert_keypoints_format(json_keypoints: List[List[int]]) -> List[Tuple[int, int]]:
|
| """Convert keypoints from JSON format [[x,y], [x,y], ...] to List[Tuple[int, int]]."""
|
| return [(int(kp[0]), int(kp[1])) for kp in json_keypoints]
|
|
|
|
|
| def convert_json_to_tvframe_results(
|
| json_data: dict,
|
| frames: Dict[int, np.ndarray],
|
| ) -> List[TVFrameResult]:
|
| """
|
| Convert JSON data to TVFrameResult objects.
|
|
|
| Args:
|
| json_data: JSON data containing predictions with frames, boxes, and keypoints
|
| frames: Dictionary mapping frame_id to frame image
|
|
|
| Returns:
|
| List of TVFrameResult objects
|
| """
|
| predictions = json_data.get('predictions', {})
|
| frames_data = predictions.get('frames', [])
|
|
|
| results = []
|
| for frame_data in frames_data:
|
| frame_id = frame_data.get('frame_id')
|
| if frame_id not in frames:
|
| print(f"Warning: Frame {frame_id} not found in extracted frames, skipping")
|
| continue
|
|
|
|
|
| json_boxes = frame_data.get('boxes', [])
|
| boxes = []
|
| for box_data in json_boxes:
|
| box = BoundingBox(
|
| x1=int(box_data.get('x1', 0)),
|
| y1=int(box_data.get('y1', 0)),
|
| x2=int(box_data.get('x2', 0)),
|
| y2=int(box_data.get('y2', 0)),
|
| cls_id=int(box_data.get('cls_id', 0)),
|
| conf=float(box_data.get('conf', 0.0)),
|
| )
|
| boxes.append(box)
|
|
|
|
|
| json_keypoints = frame_data.get('keypoints', [])
|
| keypoints = convert_keypoints_format(json_keypoints)
|
|
|
| result = TVFrameResult(
|
| frame_id=frame_id,
|
| boxes=boxes,
|
| keypoints=keypoints,
|
| )
|
| results.append(result)
|
|
|
| return results
|
|
|
|
|
| def evaluate_keypoints_from_json(
|
| json_data: dict,
|
| frames: Dict[int, np.ndarray],
|
| template_image: np.ndarray,
|
| template_keypoints: List[Tuple[int, int]],
|
| visualization_output_dir: Path = None,
|
| ) -> Dict[str, float]:
|
| """
|
| Evaluate keypoint accuracy from JSON data using the same function as test_predict_batch.py.
|
|
|
| Args:
|
| json_data: JSON data containing predictions with frames and keypoints
|
| frames: Dictionary mapping frame_id to frame image
|
| template_image: Template image for evaluation
|
| template_keypoints: Template keypoints
|
| visualization_output_dir: Optional directory to save visualization images
|
|
|
| Returns:
|
| Dictionary with keypoint evaluation statistics
|
| """
|
|
|
| results = convert_json_to_tvframe_results(json_data, frames)
|
|
|
| if len(results) == 0:
|
| print("No valid frames found in JSON data")
|
| return {
|
| "keypoint_avg_score": 0.0,
|
| "keypoint_valid_frames": 0,
|
| "keypoint_total_frames": 0,
|
| }
|
|
|
| print(f"Evaluating {len(results)} frames using evaluate_keypoints_batch...")
|
|
|
|
|
| stats = evaluate_keypoints_batch(
|
| results=results,
|
| original_frames=frames,
|
| template_image=template_image,
|
| template_keypoints=template_keypoints,
|
| visualization_output_dir=visualization_output_dir,
|
| )
|
|
|
| print("\n=== Keypoint Evaluation Results ===")
|
| print(f"Total frames: {stats['keypoint_total_frames']}")
|
| print(f"Valid frames: {stats['keypoint_valid_frames']}")
|
| print(f"Average score: {stats['keypoint_avg_score']:.3f}")
|
| print(f"Max score: {stats['keypoint_max_score']:.3f}")
|
| print(f"Min score: {stats['keypoint_min_score']:.3f}")
|
| print(f"Frames with score > 0.5: {stats['keypoint_frames_above_0.5']}")
|
| print(f"Frames with score > 0.7: {stats['keypoint_frames_above_0.7']}")
|
|
|
| return stats
|
|
|
|
|
| def parse_args() -> argparse.Namespace:
|
| parser = argparse.ArgumentParser(
|
| description="Fetch video and keypoint data from URL, evaluate keypoints, and visualize results."
|
| )
|
| parser.add_argument(
|
| "--url",
|
| type=str,
|
| default="https://pub-7b4130b6af75472f800371248bca15b6.r2.dev/scorevision/results_soccer/5Fnhz5fDihvno4DfssfRogL84VFvdDRRsgu19grbqEDPbJGv/responses/007115302-f9bd4226d1f4248c782a3179764e3203ce2fc520642eed4f7b02c40e61db55eb.json",
|
| help="URL to fetch JSON data containing video_url and predictions.",
|
| )
|
| parser.add_argument(
|
| "--template-image",
|
| type=Path,
|
| default='football_pitch_template.png',
|
| help="Path to football pitch template image.",
|
| )
|
| parser.add_argument(
|
| "--output-dir",
|
| type=Path,
|
| default='outputs/url_evaluation',
|
| help="Directory to save visualizations and downloaded video.",
|
| )
|
| parser.add_argument(
|
| "--delete-video",
|
| action="store_true",
|
| help="Delete downloaded video file after processing (default: keep video).",
|
| )
|
| return parser.parse_args()
|
|
|
|
|
| def main() -> None:
|
| args = parse_args()
|
|
|
|
|
| args.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| json_data = fetch_json_data(args.url)
|
|
|
|
|
| video_url = json_data.get('video_url')
|
| if not video_url:
|
| raise ValueError("No video_url found in JSON data")
|
|
|
|
|
| video_filename = Path(urllib.parse.urlparse(video_url).path).name
|
| if not video_filename:
|
| video_filename = "video.mp4"
|
| video_path = args.output_dir / video_filename
|
|
|
| download_video(video_url, video_path)
|
|
|
|
|
| video_name_without_ext = Path(video_filename).stem
|
|
|
|
|
| predictions = json_data.get('predictions', {})
|
| frames_data = predictions.get('frames', [])
|
| frame_ids = [frame_data.get('frame_id') for frame_data in frames_data]
|
|
|
|
|
| frames = extract_frames_from_video(video_path, frame_ids=frame_ids if frame_ids else None)
|
|
|
|
|
| template_image, template_keypoints = load_template_from_file(str(args.template_image))
|
|
|
|
|
| visualization_dir = args.output_dir / f"visualizations_{video_name_without_ext}"
|
|
|
|
|
| stats = evaluate_keypoints_from_json(
|
| json_data=json_data,
|
| frames=frames,
|
| template_image=template_image,
|
| template_keypoints=template_keypoints,
|
| visualization_output_dir=visualization_dir,
|
| )
|
|
|
|
|
| if args.delete_video:
|
| video_path.unlink()
|
| print(f"Deleted video file: {video_path}")
|
| else:
|
| print(f"Video saved at: {video_path}")
|
|
|
| print(f"\nResults saved to: {args.output_dir}")
|
| print(f"Visualizations saved to: {visualization_dir}")
|
|
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|
|
|