#!/usr/bin/env python3 """ Parallelized analysis of Delaunay realizability fraction. Uses multiprocessing to test triangulations on multiple CPUs simultaneously. """ import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) from ideal_poly_volume_toolkit.plantri_interface import find_plantri_executable from ideal_poly_volume_toolkit.rivin_delaunay import check_delaunay_realizability from ideal_poly_volume_toolkit.planar_utils import extract_faces_from_planar_embedding import subprocess import json import time import multiprocessing as mp from collections import defaultdict from typing import Optional, List, Tuple def get_triangulations_text(n_vertices: int, min_connectivity: int = 3) -> list: """ Generate triangulations in ASCII format. Args: n_vertices: Number of vertices min_connectivity: Minimum connectivity (3 or 4) Returns: List of triangulations as adjacency dicts """ plantri = find_plantri_executable() if plantri is None: raise RuntimeError("plantri not found") # Use -p for polytopes, -c# for connectivity args = [plantri, f'-pc{min_connectivity}', '-a', str(n_vertices)] result = subprocess.run(args, capture_output=True, text=True) # Parse ASCII format: "n adj_list" triangulations = [] for line in result.stdout.split('\n'): line = line.strip() if not line or line.startswith('>'): continue parts = line.split(maxsplit=1) if len(parts) != 2: continue n = int(parts[0]) adj_str = parts[1] # Build adjacency dict adj = {} vertex_lists = adj_str.split(',') for v_idx, neighbor_str in enumerate(vertex_lists): neighbors = [] for letter in neighbor_str: neighbor_idx = ord(letter) - ord('a') neighbors.append(neighbor_idx) adj[v_idx] = neighbors # Extract faces from planar embedding (adjacency lists are in cyclic order) triangles = extract_faces_from_planar_embedding(n, adj) if triangles: triangulations.append(triangles) return triangulations def remove_vertex_to_planar(triangles: list, vertex_to_remove: int) -> list: """Remove a vertex to create planar triangulation.""" return [tri for tri in triangles if vertex_to_remove not in tri] def test_chunk(args): """Test a chunk of triangulations (for parallel processing).""" # Import here to ensure each worker has the module from ideal_poly_volume_toolkit.rivin_delaunay import check_delaunay_realizability chunk, worker_id, strict = args realizable = 0 non_realizable = 0 errors = 0 for tri in chunk: try: result = check_delaunay_realizability(tri, verbose=False, strict=strict) if result['realizable']: realizable += 1 else: non_realizable += 1 except Exception as e: errors += 1 return (realizable, non_realizable, errors) def count_triangulations(n: int, min_connectivity: int = 3) -> int: """Count closed triangulations (cached for known values).""" # OEIS A000109 (3-connected) and A000108 (4-connected) counts_3conn = {4: 1, 5: 1, 6: 7, 7: 34, 8: 257, 9: 2606, 10: 32300, 11: 440564, 12: 6384634} counts_4conn = {4: 1, 5: 0, 6: 1, 7: 1, 8: 8, 9: 36, 10: 257, 11: 1734, 12: 13391} if min_connectivity == 3 and n in counts_3conn: return counts_3conn[n] elif min_connectivity == 4 and n in counts_4conn: return counts_4conn[n] # Compute by generating and counting tris = get_triangulations_text(n, min_connectivity) return len(tris) def analyze_single_n_parallel(n: int, n_workers: int = 30, min_connectivity: int = 3, verbose: bool = True, strict: bool = False) -> dict: """ Analyze triangulations with n vertices using parallel processing. Args: n: Number of vertices n_workers: Number of parallel workers min_connectivity: Minimum connectivity (3 or 4) verbose: Print progress strict: Use strict realizability (dihedral < π) Returns: Statistics dictionary """ mode_str = "strict" if strict else "standard" if verbose: print(f"\n{'='*70}") print(f"Analyzing n={n} vertices ({min_connectivity}-connected, {mode_str} mode)") print(f"{'='*70}") # Count and generate closed triangulations start_time = time.time() closed_tris = get_triangulations_text(n, min_connectivity) gen_time = time.time() - start_time n_closed = len(closed_tris) if verbose: print(f"Generated {n_closed} closed triangulations in {gen_time:.2f}s") print(f"Creating {n_closed} planar versions (remove vertex 0)") # Convert to planar triangulations (only remove vertex 0) planar_tris = [] for closed_tri in closed_tris: planar_tri = remove_vertex_to_planar(closed_tri, 0) if planar_tri: planar_tris.append(planar_tri) n_planar = len(planar_tris) if verbose: print(f"Total planar triangulations to test: {n_planar}") print(f"Using {n_workers} parallel workers") # Split into chunks for parallel processing chunk_size = max(1, len(planar_tris) // n_workers) chunks = [] for i in range(0, len(planar_tris), chunk_size): chunk = planar_tris[i:i+chunk_size] chunks.append((chunk, i // chunk_size, strict)) # Parallel testing if verbose: print(f"Split into {len(chunks)} chunks of ~{chunk_size} triangulations each") print(f"Testing realizability...") start_time = time.time() with mp.Pool(n_workers) as pool: results = pool.map(test_chunk, chunks) test_time = time.time() - start_time # Aggregate results total_realizable = sum(r[0] for r in results) total_non_realizable = sum(r[1] for r in results) total_errors = sum(r[2] for r in results) tested = total_realizable + total_non_realizable fraction = total_realizable / tested if tested > 0 else 0.0 stats = { 'n_vertices': n, 'min_connectivity': min_connectivity, 'closed_triangulations': n_closed, 'total_planar_triangulations': n_planar, 'tested': tested, 'realizable': total_realizable, 'non_realizable': total_non_realizable, 'errors': total_errors, 'fraction_realizable': fraction, 'generation_time': gen_time, 'testing_time': test_time, 'n_workers': n_workers, 'strict_mode': strict, } if verbose: print(f"\nResults:") print(f" Tested: {tested}/{n_planar}") print(f" Realizable: {total_realizable} ({100*fraction:.1f}%)") print(f" Non-realizable: {total_non_realizable} ({100*(1-fraction):.1f}%)") if total_errors > 0: print(f" Errors: {total_errors}") if tested > 0: print(f" Testing time: {test_time:.2f}s ({test_time/tested*1000:.2f}ms per triangulation)") print(f" Speedup: {n_workers * test_time / tested * 1000:.2f}ms sequential equivalent") else: print(f" Testing time: {test_time:.2f}s (no successful tests)") return stats def main(): """Run parallelized analysis.""" import argparse parser = argparse.ArgumentParser(description='Parallel Delaunay realizability analysis') parser.add_argument('--min-n', type=int, default=4, help='Minimum vertices') parser.add_argument('--max-n', type=int, default=15, help='Maximum vertices') parser.add_argument('--workers', type=int, default=30, help='Number of parallel workers') parser.add_argument('--connectivity', type=int, default=3, choices=[3, 4], help='Minimum connectivity (3 or 4)') parser.add_argument('--time-limit', type=float, default=7200, help='Wall-clock time limit in seconds (default: 2 hours)') parser.add_argument('--output', type=str, default='delaunay_parallel_results.json', help='Output file') parser.add_argument('--strict', action='store_true', help='Use strict realizability mode (dihedral < π)') args = parser.parse_args() mode_desc = "STRICT (dihedral < π)" if args.strict else "STANDARD (dihedral ≤ π)" print("="*70) print(f"PARALLEL DELAUNAY REALIZABILITY ANALYSIS") print("="*70) print(f"\nParameters:") print(f" Vertex range: {args.min_n} to {args.max_n}") print(f" Parallel workers: {args.workers}") print(f" Min connectivity: {args.connectivity}") print(f" Mode: {mode_desc}") print(f" Time limit: {args.time_limit/3600:.1f} hours") # Run analysis results = [] start_time = time.time() for n in range(args.min_n, args.max_n + 1): # Check time limit elapsed = time.time() - start_time if elapsed > args.time_limit: print(f"\n⏱ Time limit reached ({elapsed/3600:.2f} hours)") break try: # Estimate if we have enough time if results: avg_time = sum(r['generation_time'] + r['testing_time'] for r in results) / len(results) if elapsed + avg_time * 2 > args.time_limit: print(f"\n⏱ Approaching time limit, stopping at n={n-1}") break stats = analyze_single_n_parallel( n, n_workers=args.workers, min_connectivity=args.connectivity, verbose=True, strict=args.strict ) results.append(stats) except KeyboardInterrupt: print(f"\n\n⚠ Interrupted by user at n={n}") break except Exception as e: print(f"\n✗ Error at n={n}: {e}") import traceback traceback.print_exc() break # Save results output_data = { 'parameters': { 'min_n': args.min_n, 'max_n': args.max_n, 'workers': args.workers, 'min_connectivity': args.connectivity, 'strict_mode': args.strict, 'time_limit': args.time_limit, 'actual_runtime': time.time() - start_time, }, 'results': results } with open(args.output, 'w') as f: json.dump(output_data, f, indent=2) # Summary print(f"\n{'='*70}") print("SUMMARY") print(f"{'='*70}") print(f"\n{'n':<4} {'Closed':<8} {'Planar':<8} {'Realizable':<12} {'%Real':<8} {'Time(s)':<8}") print("-" * 70) for r in results: total_time = r['generation_time'] + r['testing_time'] print(f"{r['n_vertices']:<4} {r['closed_triangulations']:<8} " f"{r['total_planar_triangulations']:<8} {r['realizable']:<12} " f"{100*r['fraction_realizable']:>6.1f}% {total_time:>7.1f}") print(f"\nTotal runtime: {(time.time() - start_time)/3600:.2f} hours") print(f"Results saved to: {args.output}") if __name__ == '__main__': main()