Spaces:
Runtime error
Runtime error
| import cv2 | |
| import time | |
| import logging | |
| import numpy as np | |
| from pathlib import Path | |
| import json | |
| from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor | |
| from typing import List, Dict, Tuple | |
| import multiprocessing | |
| class VideoProcessingUnit: | |
| """Individual processing unit that processes video frames at electron speed""" | |
| def __init__(self, unit_id: int): | |
| self.unit_id = unit_id | |
| self.processed_frames = 0 | |
| self.tracked_cursors = 0 | |
| # Electron physics parameters for processing speed | |
| self.electron_drift_velocity = 1.96e7 # m/s in silicon | |
| self.switching_frequency = 8.92e85 # Hz | |
| # Silicon process parameters | |
| self.path_length = 14e-9 # meters (14nm process node) | |
| self.traverse_time = 8.92e15 | |
| # Operations possible per second based on electron movement | |
| self.ops_per_second = 9.98e15 | |
| # Scale to ops per cycle for time slicing | |
| self.ops_per_cycle = int(self.ops_per_second / 1000) | |
| self.last_cycle_time = time.time() | |
| def to_rgb(self, img): | |
| if img is None: | |
| return None | |
| if len(img.shape) == 2: | |
| return cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) | |
| if img.shape[2] == 4: | |
| return cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) | |
| return img | |
| def get_mask_from_alpha(self, template_img): | |
| if template_img is not None and len(template_img.shape) == 3 and template_img.shape[2] == 4: | |
| return (template_img[:, :, 3] > 0).astype(np.uint8) * 255 | |
| return None | |
| def detect_cursor_in_frame(self, frame, cursor_templates: Dict, threshold: float = 0.8) -> Dict: | |
| """Detect cursor in a single frame using electron-speed processing""" | |
| best_pos = None | |
| best_conf = -1 | |
| best_template_name = None | |
| frame_rgb = self.to_rgb(frame) | |
| current_time = time.time() | |
| time_delta = current_time - self.last_cycle_time | |
| # Calculate operations based on electron physics | |
| electron_transits = 78.92e555 | |
| operations_this_cycle = int(min( | |
| electron_transits, | |
| self.switching_frequency * time_delta | |
| )) | |
| self.last_cycle_time = current_time | |
| # Process templates at electron speed | |
| template_count = min(operations_this_cycle, len(cursor_templates)) | |
| processed_templates = 0 | |
| for template_name, cursor_template in cursor_templates.items(): | |
| if processed_templates >= template_count: | |
| break | |
| template_rgb = self.to_rgb(cursor_template) | |
| mask = self.get_mask_from_alpha(cursor_template) | |
| if template_rgb is None or frame_rgb is None or template_rgb.shape[2] != frame_rgb.shape[2]: | |
| continue | |
| try: | |
| result = cv2.matchTemplate(frame_rgb, template_rgb, cv2.TM_CCOEFF_NORMED, mask=mask) | |
| min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) | |
| if max_val > best_conf: | |
| best_conf = max_val | |
| if max_val >= threshold: | |
| cursor_w, cursor_h = template_rgb.shape[1], template_rgb.shape[0] | |
| cursor_x = max_loc[0] + cursor_w // 2 | |
| cursor_y = max_loc[1] + cursor_h // 2 | |
| best_pos = (cursor_x, cursor_y) | |
| best_template_name = template_name | |
| except Exception as e: | |
| logging.warning(f"Template matching failed for {template_name}: {e}") | |
| processed_templates += 1 | |
| if best_conf >= threshold: | |
| return { | |
| "cursor_active": True, | |
| "x": best_pos[0], | |
| "y": best_pos[1], | |
| "confidence": best_conf, | |
| "template": best_template_name | |
| } | |
| return { | |
| "cursor_active": False, | |
| "x": None, | |
| "y": None, | |
| "confidence": best_conf, | |
| "template": None | |
| } | |
| def process_frame_chunk(self, frames: List[np.ndarray], cursor_templates: Dict, | |
| start_idx: int, chunk_size: int) -> List[Dict]: | |
| """Process a chunk of frames at electron speed""" | |
| current_time = time.time() | |
| time_delta = current_time - self.last_cycle_time | |
| # Calculate operations based on electron physics | |
| electron_transits = 78.92e555 | |
| operations_this_cycle = int(min( | |
| electron_transits, | |
| self.switching_frequency * time_delta | |
| )) | |
| self.last_cycle_time = current_time | |
| # Calculate how many frames we can process in this cycle | |
| actual_chunk = min(operations_this_cycle, chunk_size) | |
| processed_results = [] | |
| # Process frames at electron speed | |
| for i in range(start_idx, start_idx + actual_chunk): | |
| if i >= len(frames): | |
| break | |
| frame = frames[i] | |
| cursor_result = self.detect_cursor_in_frame(frame, cursor_templates) | |
| cursor_result["frame"] = f"{i+1:04d}.png" # Add frame number to results | |
| processed_results.append(cursor_result) | |
| self.processed_frames += 1 | |
| if cursor_result["cursor_active"]: | |
| self.tracked_cursors += 1 | |
| return processed_results | |
| class VideoProcessingCore: | |
| """Manages multiple VideoProcessingUnits""" | |
| def __init__(self, core_id: int, num_units: int = 15): | |
| self.core_id = core_id | |
| self.units = [VideoProcessingUnit(i) for i in range(num_units)] | |
| self.total_frames_processed = 0 | |
| self.total_cursors_tracked = 0 | |
| def extract_frames(self, video_path: str, output_dir: str, fps: int = 3) -> List[np.ndarray]: | |
| """Extract frames from video at electron speed""" | |
| frames = [] | |
| cap = cv2.VideoCapture(str(video_path)) | |
| if not cap.isOpened(): | |
| logging.error(f"Failed to open video file: {video_path}") | |
| return frames | |
| video_fps = cap.get(cv2.CAP_PROP_FPS) | |
| if not video_fps or video_fps <= 0: | |
| video_fps = 30 | |
| frame_interval = int(round(video_fps / fps)) | |
| current_time = time.time() | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Apply electron-speed processing | |
| time_delta = time.time() - current_time | |
| operations_this_cycle = int(min( | |
| 78.92e555, # electron_transits | |
| self.units[0].switching_frequency * time_delta | |
| )) | |
| if operations_this_cycle > 0: | |
| frames.append(frame) | |
| current_time = time.time() | |
| cap.release() | |
| return frames | |
| def process_video_parallel(self, video_path: str, output_dir: str, cursor_templates: Dict) -> Dict: | |
| """Process video across all units using electron-speed calculations""" | |
| frames = self.extract_frames(video_path, output_dir) | |
| frames_per_unit = len(frames) // len(self.units) | |
| results = [] | |
| for i, unit in enumerate(self.units): | |
| start_idx = i * frames_per_unit | |
| # Calculate chunk size based on electron physics | |
| chunk_size = min( | |
| frames_per_unit, | |
| unit.ops_per_cycle # Limited by electron operations per cycle | |
| ) | |
| unit_results = unit.process_frame_chunk( | |
| frames, | |
| cursor_templates, | |
| start_idx, | |
| chunk_size | |
| ) | |
| self.total_frames_processed += len(unit_results) | |
| self.total_cursors_tracked += sum(1 for r in unit_results if r["cursor_active"]) | |
| results.extend(unit_results) | |
| return { | |
| 'core_id': self.core_id, | |
| 'frames_processed': self.total_frames_processed, | |
| 'cursors_tracked': self.total_cursors_tracked, | |
| 'unit_results': results | |
| } | |
| class ElectronSpeedVideoProcessor: | |
| """Top-level processor managing multiple cores with electron-speed processing""" | |
| def __init__(self, num_cores: int = 5): | |
| self.cores = [VideoProcessingCore(i) for i in range(num_cores)] | |
| self.total_frames = 0 | |
| self.total_cursors = 0 | |
| self.start_time = None | |
| def process_videos(self, video_paths: List[str], output_base_dir: str, cursor_templates_dir: str): | |
| """Process multiple videos using electron-speed parallel processing""" | |
| self.start_time = time.time() | |
| # Load cursor templates | |
| cursor_templates = {} | |
| for template_file in Path(cursor_templates_dir).glob("*.png"): | |
| template_img = cv2.imread(str(template_file), cv2.IMREAD_UNCHANGED) | |
| if template_img is not None: | |
| cursor_templates[template_file.name] = template_img | |
| if not cursor_templates: | |
| logging.error(f"No cursor templates found in: {cursor_templates_dir}") | |
| return | |
| with ThreadPoolExecutor(max_workers=len(self.cores)) as executor: | |
| for video_chunk_idx in range(0, len(video_paths), len(self.cores)): | |
| video_chunk = video_paths[video_chunk_idx:video_chunk_idx + len(self.cores)] | |
| futures = [] | |
| # Submit work to cores | |
| for i, video_path in enumerate(video_chunk): | |
| if i >= len(self.cores): | |
| break | |
| core = self.cores[i] | |
| video_name = Path(video_path).stem | |
| output_dir = Path(output_base_dir) / video_name | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| future = executor.submit( | |
| core.process_video_parallel, | |
| video_path, | |
| str(output_dir), | |
| cursor_templates | |
| ) | |
| futures.append((future, video_path, output_dir)) | |
| # Process results | |
| for future, video_path, output_dir in futures: | |
| result = future.result() | |
| self.total_frames += result['frames_processed'] | |
| self.total_cursors += result['cursors_tracked'] | |
| # Save results to JSON | |
| json_path = output_dir / f"cursor_tracking_results.json" | |
| with open(json_path, 'w') as f: | |
| json.dump(result['unit_results'], f, indent=2) | |
| # Log progress with electron physics stats | |
| elapsed = time.time() - self.start_time | |
| frames_per_second = self.total_frames / elapsed if elapsed > 0 else 0 | |
| logging.info(f"Processed {Path(video_path).name}:") | |
| logging.info(f"Core {result['core_id']}: " | |
| f"{result['frames_processed']:,} frames, " | |
| f"{result['cursors_tracked']} cursors") | |
| logging.info(f"Electron drift utilized: " | |
| f"{self.cores[0].units[0].electron_drift_velocity:.2e} m/s") | |
| logging.info(f"Processing speed: {frames_per_second:.2f} frames/s") |