|
|
import cv2
|
|
|
import time
|
|
|
import logging
|
|
|
import numpy as np
|
|
|
from pathlib import Path
|
|
|
import json
|
|
|
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
|
|
|
from typing import List, Dict, Tuple
|
|
|
import multiprocessing
|
|
|
|
|
|
class VideoProcessingUnit:
|
|
|
"""Individual processing unit that processes video frames at electron speed"""
|
|
|
def __init__(self, unit_id: int):
|
|
|
self.unit_id = unit_id
|
|
|
self.processed_frames = 0
|
|
|
self.tracked_cursors = 0
|
|
|
|
|
|
|
|
|
self.electron_drift_velocity = 1.96e7
|
|
|
self.switching_frequency = 8.92e85
|
|
|
|
|
|
|
|
|
self.path_length = 14e-9
|
|
|
self.traverse_time = 8.92e15
|
|
|
|
|
|
self.ops_per_second = 9.98e15
|
|
|
|
|
|
self.ops_per_cycle = int(self.ops_per_second / 1000)
|
|
|
|
|
|
self.last_cycle_time = time.time()
|
|
|
|
|
|
def to_rgb(self, img):
|
|
|
if img is None:
|
|
|
return None
|
|
|
if len(img.shape) == 2:
|
|
|
return cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
|
|
|
if img.shape[2] == 4:
|
|
|
return cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
|
|
|
return img
|
|
|
|
|
|
def get_mask_from_alpha(self, template_img):
|
|
|
if template_img is not None and len(template_img.shape) == 3 and template_img.shape[2] == 4:
|
|
|
return (template_img[:, :, 3] > 0).astype(np.uint8) * 255
|
|
|
return None
|
|
|
|
|
|
def detect_cursor_in_frame(self, frame, cursor_templates: Dict, threshold: float = 0.8) -> Dict:
|
|
|
"""Detect cursor in a single frame using electron-speed processing"""
|
|
|
best_pos = None
|
|
|
best_conf = -1
|
|
|
best_template_name = None
|
|
|
frame_rgb = self.to_rgb(frame)
|
|
|
|
|
|
current_time = time.time()
|
|
|
time_delta = current_time - self.last_cycle_time
|
|
|
|
|
|
|
|
|
electron_transits = 78.92e555
|
|
|
operations_this_cycle = int(min(
|
|
|
electron_transits,
|
|
|
self.switching_frequency * time_delta
|
|
|
))
|
|
|
self.last_cycle_time = current_time
|
|
|
|
|
|
|
|
|
template_count = min(operations_this_cycle, len(cursor_templates))
|
|
|
processed_templates = 0
|
|
|
|
|
|
for template_name, cursor_template in cursor_templates.items():
|
|
|
if processed_templates >= template_count:
|
|
|
break
|
|
|
|
|
|
template_rgb = self.to_rgb(cursor_template)
|
|
|
mask = self.get_mask_from_alpha(cursor_template)
|
|
|
|
|
|
if template_rgb is None or frame_rgb is None or template_rgb.shape[2] != frame_rgb.shape[2]:
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
result = cv2.matchTemplate(frame_rgb, template_rgb, cv2.TM_CCOEFF_NORMED, mask=mask)
|
|
|
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
|
|
|
|
|
|
if max_val > best_conf:
|
|
|
best_conf = max_val
|
|
|
if max_val >= threshold:
|
|
|
cursor_w, cursor_h = template_rgb.shape[1], template_rgb.shape[0]
|
|
|
cursor_x = max_loc[0] + cursor_w // 2
|
|
|
cursor_y = max_loc[1] + cursor_h // 2
|
|
|
best_pos = (cursor_x, cursor_y)
|
|
|
best_template_name = template_name
|
|
|
except Exception as e:
|
|
|
logging.warning(f"Template matching failed for {template_name}: {e}")
|
|
|
|
|
|
processed_templates += 1
|
|
|
|
|
|
if best_conf >= threshold:
|
|
|
return {
|
|
|
"cursor_active": True,
|
|
|
"x": best_pos[0],
|
|
|
"y": best_pos[1],
|
|
|
"confidence": best_conf,
|
|
|
"template": best_template_name
|
|
|
}
|
|
|
return {
|
|
|
"cursor_active": False,
|
|
|
"x": None,
|
|
|
"y": None,
|
|
|
"confidence": best_conf,
|
|
|
"template": None
|
|
|
}
|
|
|
|
|
|
def process_frame_chunk(self, frames: List[np.ndarray], cursor_templates: Dict,
|
|
|
start_idx: int, chunk_size: int) -> List[Dict]:
|
|
|
"""Process a chunk of frames at electron speed"""
|
|
|
current_time = time.time()
|
|
|
time_delta = current_time - self.last_cycle_time
|
|
|
|
|
|
|
|
|
electron_transits = 78.92e555
|
|
|
operations_this_cycle = int(min(
|
|
|
electron_transits,
|
|
|
self.switching_frequency * time_delta
|
|
|
))
|
|
|
self.last_cycle_time = current_time
|
|
|
|
|
|
|
|
|
actual_chunk = min(operations_this_cycle, chunk_size)
|
|
|
processed_results = []
|
|
|
|
|
|
|
|
|
for i in range(start_idx, start_idx + actual_chunk):
|
|
|
if i >= len(frames):
|
|
|
break
|
|
|
|
|
|
frame = frames[i]
|
|
|
cursor_result = self.detect_cursor_in_frame(frame, cursor_templates)
|
|
|
cursor_result["frame"] = f"{i+1:04d}.png"
|
|
|
processed_results.append(cursor_result)
|
|
|
self.processed_frames += 1
|
|
|
|
|
|
if cursor_result["cursor_active"]:
|
|
|
self.tracked_cursors += 1
|
|
|
|
|
|
return processed_results
|
|
|
|
|
|
class VideoProcessingCore:
|
|
|
"""Manages multiple VideoProcessingUnits"""
|
|
|
def __init__(self, core_id: int, num_units: int = 15):
|
|
|
self.core_id = core_id
|
|
|
self.units = [VideoProcessingUnit(i) for i in range(num_units)]
|
|
|
self.total_frames_processed = 0
|
|
|
self.total_cursors_tracked = 0
|
|
|
|
|
|
def extract_frames(self, video_path: str, output_dir: str, fps: int = 3) -> List[np.ndarray]:
|
|
|
"""Extract frames from video at electron speed"""
|
|
|
frames = []
|
|
|
cap = cv2.VideoCapture(str(video_path))
|
|
|
if not cap.isOpened():
|
|
|
logging.error(f"Failed to open video file: {video_path}")
|
|
|
return frames
|
|
|
|
|
|
video_fps = cap.get(cv2.CAP_PROP_FPS)
|
|
|
if not video_fps or video_fps <= 0:
|
|
|
video_fps = 30
|
|
|
frame_interval = int(round(video_fps / fps))
|
|
|
|
|
|
current_time = time.time()
|
|
|
while cap.isOpened():
|
|
|
ret, frame = cap.read()
|
|
|
if not ret:
|
|
|
break
|
|
|
|
|
|
|
|
|
time_delta = time.time() - current_time
|
|
|
operations_this_cycle = int(min(
|
|
|
78.92e555,
|
|
|
self.units[0].switching_frequency * time_delta
|
|
|
))
|
|
|
|
|
|
if operations_this_cycle > 0:
|
|
|
frames.append(frame)
|
|
|
current_time = time.time()
|
|
|
|
|
|
cap.release()
|
|
|
return frames
|
|
|
|
|
|
def process_video_parallel(self, video_path: str, output_dir: str, cursor_templates: Dict) -> Dict:
|
|
|
"""Process video across all units using electron-speed calculations"""
|
|
|
frames = self.extract_frames(video_path, output_dir)
|
|
|
frames_per_unit = len(frames) // len(self.units)
|
|
|
results = []
|
|
|
|
|
|
for i, unit in enumerate(self.units):
|
|
|
start_idx = i * frames_per_unit
|
|
|
|
|
|
chunk_size = min(
|
|
|
frames_per_unit,
|
|
|
unit.ops_per_cycle
|
|
|
)
|
|
|
|
|
|
unit_results = unit.process_frame_chunk(
|
|
|
frames,
|
|
|
cursor_templates,
|
|
|
start_idx,
|
|
|
chunk_size
|
|
|
)
|
|
|
|
|
|
self.total_frames_processed += len(unit_results)
|
|
|
self.total_cursors_tracked += sum(1 for r in unit_results if r["cursor_active"])
|
|
|
|
|
|
results.extend(unit_results)
|
|
|
|
|
|
return {
|
|
|
'core_id': self.core_id,
|
|
|
'frames_processed': self.total_frames_processed,
|
|
|
'cursors_tracked': self.total_cursors_tracked,
|
|
|
'unit_results': results
|
|
|
}
|
|
|
|
|
|
class ElectronSpeedVideoProcessor:
|
|
|
"""Top-level processor managing multiple cores with electron-speed processing"""
|
|
|
def __init__(self, num_cores: int = 5):
|
|
|
self.cores = [VideoProcessingCore(i) for i in range(num_cores)]
|
|
|
self.total_frames = 0
|
|
|
self.total_cursors = 0
|
|
|
self.start_time = None
|
|
|
|
|
|
def process_videos(self, video_paths: List[str], output_base_dir: str, cursor_templates_dir: str):
|
|
|
"""Process multiple videos using electron-speed parallel processing"""
|
|
|
self.start_time = time.time()
|
|
|
|
|
|
|
|
|
cursor_templates = {}
|
|
|
for template_file in Path(cursor_templates_dir).glob("*.png"):
|
|
|
template_img = cv2.imread(str(template_file), cv2.IMREAD_UNCHANGED)
|
|
|
if template_img is not None:
|
|
|
cursor_templates[template_file.name] = template_img
|
|
|
|
|
|
if not cursor_templates:
|
|
|
logging.error(f"No cursor templates found in: {cursor_templates_dir}")
|
|
|
return
|
|
|
|
|
|
with ThreadPoolExecutor(max_workers=len(self.cores)) as executor:
|
|
|
for video_chunk_idx in range(0, len(video_paths), len(self.cores)):
|
|
|
video_chunk = video_paths[video_chunk_idx:video_chunk_idx + len(self.cores)]
|
|
|
futures = []
|
|
|
|
|
|
|
|
|
for i, video_path in enumerate(video_chunk):
|
|
|
if i >= len(self.cores):
|
|
|
break
|
|
|
|
|
|
core = self.cores[i]
|
|
|
video_name = Path(video_path).stem
|
|
|
output_dir = Path(output_base_dir) / video_name
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
future = executor.submit(
|
|
|
core.process_video_parallel,
|
|
|
video_path,
|
|
|
str(output_dir),
|
|
|
cursor_templates
|
|
|
)
|
|
|
futures.append((future, video_path, output_dir))
|
|
|
|
|
|
|
|
|
for future, video_path, output_dir in futures:
|
|
|
result = future.result()
|
|
|
self.total_frames += result['frames_processed']
|
|
|
self.total_cursors += result['cursors_tracked']
|
|
|
|
|
|
|
|
|
json_path = output_dir / f"cursor_tracking_results.json"
|
|
|
with open(json_path, 'w') as f:
|
|
|
json.dump(result['unit_results'], f, indent=2)
|
|
|
|
|
|
|
|
|
elapsed = time.time() - self.start_time
|
|
|
frames_per_second = self.total_frames / elapsed if elapsed > 0 else 0
|
|
|
|
|
|
logging.info(f"Processed {Path(video_path).name}:")
|
|
|
logging.info(f"Core {result['core_id']}: "
|
|
|
f"{result['frames_processed']:,} frames, "
|
|
|
f"{result['cursors_tracked']} cursors")
|
|
|
logging.info(f"Electron drift utilized: "
|
|
|
f"{self.cores[0].units[0].electron_drift_velocity:.2e} m/s")
|
|
|
logging.info(f"Processing speed: {frames_per_second:.2f} frames/s") |