AMontiB
Your original commit message (now includes LFS pointer)
9c4b1c4
import os
import json
import sys
import yaml
import subprocess
from datetime import datetime
def get_gpus():
from numpy import argwhere, asarray, diff
import re
smi = os.popen('nvidia-smi').readlines()
div = re.compile('[+]-{3,}[+]|[|]={3,}[|]')
dividers = argwhere([div.match(line) != None for line in smi])[-2:, 0]
processes = [line for line in smi[dividers[0]+1:dividers[1]] if ' C ' in line]
free = list(set([process.split()[1] for process in processes]) ^ set([str(0), str(1)]))
udiv = re.compile('[|]={3,}[+]={3,}[+]={3,}[|]')
ldiv = re.compile('[+]-{3,}[+]-{3,}[+]-{3,}[+]')
divider_up = argwhere([udiv.match(line) != None for line in smi])[0,0]
divider_down = argwhere([ldiv.match(line) != None for line in smi])[-1, 0]
gpus = [line for line in smi[divider_up+1:divider_down] if '%' in line and 'MiB' in line]
gpus = [gpu.split('|')[2].replace(' ', '').replace('MiB', '').split('/') for gpu in gpus]
memory = diff(asarray(gpus).astype(int), axis=1).squeeze()
return free, memory
def load_config(config_path):
"""Load configuration from YAML file."""
with open(config_path, 'r') as f:
return yaml.safe_load(f)
def run_detect(args):
"""Run single image detection."""
if not args.image:
raise ValueError("--image is required for detect mode")
if not os.path.exists(args.image):
raise FileNotFoundError(f"Image not found: {args.image}")
# Load detector config
config_path = os.path.join(args.config_dir, f'{args.detector}.yaml')
if not os.path.exists(config_path):
raise FileNotFoundError(f"Configuration file not found: {config_path}")
config = load_config(config_path)
global_config = config.get('global', {})
# Set device
if hasattr(args, 'device') and args.device:
device = args.device
else:
device = global_config.get('device_override')
if not device or device == "null":
_, memory = get_gpus()
if len(memory) > 0:
device = f"cuda:{memory.argmax()}"
else:
device = "cpu"
# Set up output path for results
if not args.output:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Extract image filename without extension
image_name = os.path.splitext(os.path.basename(args.image))[0]
output_dir = os.path.join('detection_results', args.detector, 'detect')
os.makedirs(output_dir, exist_ok=True)
args.output = os.path.join(output_dir, f'{timestamp}_{image_name}.json')
# Call detector's detect.py
detector_path = os.path.join('detectors', args.detector)
detect_script = os.path.join(detector_path, 'detect.py')
if not os.path.exists(detect_script):
raise FileNotFoundError(f"Detector {args.detector} does not support single image detection")
cmd_args = [
sys.executable,
detect_script,
f'--image "{args.image}"',
f'--device {device}',
f'--output "{args.output}"'
]
# Add model path if specified
if args.weights:
cmd_args.append(f'--model "{args.weights}"')
cmd = ' '.join(cmd_args)
print(f"Running detection with {args.detector}...")
if not args.dry_run:
subprocess.run(cmd, shell=True)#, check=True)
# Print results if available
if os.path.exists(args.output):
with open(args.output, 'r') as f:
result = json.load(f)
print("\nDetection Results:")
print(f"Prediction: {result['prediction']}")
print(f"Confidence: {result['confidence']:.4f}")
print(f"Time: {result['elapsed_time']:.3f}s")
print(f"\nFull results saved to: {args.output}")