Spaces:
Running
Running
| import os | |
| import json | |
| import sys | |
| import yaml | |
| import subprocess | |
| from datetime import datetime | |
| def get_gpus(): | |
| from numpy import argwhere, asarray, diff | |
| import re | |
| smi = os.popen('nvidia-smi').readlines() | |
| div = re.compile('[+]-{3,}[+]|[|]={3,}[|]') | |
| dividers = argwhere([div.match(line) != None for line in smi])[-2:, 0] | |
| processes = [line for line in smi[dividers[0]+1:dividers[1]] if ' C ' in line] | |
| free = list(set([process.split()[1] for process in processes]) ^ set([str(0), str(1)])) | |
| udiv = re.compile('[|]={3,}[+]={3,}[+]={3,}[|]') | |
| ldiv = re.compile('[+]-{3,}[+]-{3,}[+]-{3,}[+]') | |
| divider_up = argwhere([udiv.match(line) != None for line in smi])[0,0] | |
| divider_down = argwhere([ldiv.match(line) != None for line in smi])[-1, 0] | |
| gpus = [line for line in smi[divider_up+1:divider_down] if '%' in line and 'MiB' in line] | |
| gpus = [gpu.split('|')[2].replace(' ', '').replace('MiB', '').split('/') for gpu in gpus] | |
| memory = diff(asarray(gpus).astype(int), axis=1).squeeze() | |
| return free, memory | |
| def load_config(config_path): | |
| """Load configuration from YAML file.""" | |
| with open(config_path, 'r') as f: | |
| return yaml.safe_load(f) | |
| def run_detect(args): | |
| """Run single image detection.""" | |
| if not args.image: | |
| raise ValueError("--image is required for detect mode") | |
| if not os.path.exists(args.image): | |
| raise FileNotFoundError(f"Image not found: {args.image}") | |
| # Load detector config | |
| config_path = os.path.join(args.config_dir, f'{args.detector}.yaml') | |
| if not os.path.exists(config_path): | |
| raise FileNotFoundError(f"Configuration file not found: {config_path}") | |
| config = load_config(config_path) | |
| global_config = config.get('global', {}) | |
| # Set device | |
| if hasattr(args, 'device') and args.device: | |
| device = args.device | |
| else: | |
| device = global_config.get('device_override') | |
| if not device or device == "null": | |
| _, memory = get_gpus() | |
| if len(memory) > 0: | |
| device = f"cuda:{memory.argmax()}" | |
| else: | |
| device = "cpu" | |
| # Set up output path for results | |
| if not args.output: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # Extract image filename without extension | |
| image_name = os.path.splitext(os.path.basename(args.image))[0] | |
| output_dir = os.path.join('detection_results', args.detector, 'detect') | |
| os.makedirs(output_dir, exist_ok=True) | |
| args.output = os.path.join(output_dir, f'{timestamp}_{image_name}.json') | |
| # Call detector's detect.py | |
| detector_path = os.path.join('detectors', args.detector) | |
| detect_script = os.path.join(detector_path, 'detect.py') | |
| if not os.path.exists(detect_script): | |
| raise FileNotFoundError(f"Detector {args.detector} does not support single image detection") | |
| cmd_args = [ | |
| sys.executable, | |
| detect_script, | |
| f'--image "{args.image}"', | |
| f'--device {device}', | |
| f'--output "{args.output}"' | |
| ] | |
| # Add model path if specified | |
| if args.weights: | |
| cmd_args.append(f'--model "{args.weights}"') | |
| cmd = ' '.join(cmd_args) | |
| print(f"Running detection with {args.detector}...") | |
| if not args.dry_run: | |
| subprocess.run(cmd, shell=True)#, check=True) | |
| # Print results if available | |
| if os.path.exists(args.output): | |
| with open(args.output, 'r') as f: | |
| result = json.load(f) | |
| print("\nDetection Results:") | |
| print(f"Prediction: {result['prediction']}") | |
| print(f"Confidence: {result['confidence']:.4f}") | |
| print(f"Time: {result['elapsed_time']:.3f}s") | |
| print(f"\nFull results saved to: {args.output}") |