idealpolyhedra / examples /analyze_delaunay_parallel.py
igriv's picture
Add HuggingFace Spaces support
e0ef700
#!/usr/bin/env python3
"""
Parallelized analysis of Delaunay realizability fraction.
Uses multiprocessing to test triangulations on multiple CPUs simultaneously.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from ideal_poly_volume_toolkit.plantri_interface import find_plantri_executable
from ideal_poly_volume_toolkit.rivin_delaunay import check_delaunay_realizability
from ideal_poly_volume_toolkit.planar_utils import extract_faces_from_planar_embedding
import subprocess
import json
import time
import multiprocessing as mp
from collections import defaultdict
from typing import Optional, List, Tuple
def get_triangulations_text(n_vertices: int, min_connectivity: int = 3) -> list:
"""
Generate triangulations in ASCII format.
Args:
n_vertices: Number of vertices
min_connectivity: Minimum connectivity (3 or 4)
Returns:
List of triangulations as adjacency dicts
"""
plantri = find_plantri_executable()
if plantri is None:
raise RuntimeError("plantri not found")
# Use -p for polytopes, -c# for connectivity
args = [plantri, f'-pc{min_connectivity}', '-a', str(n_vertices)]
result = subprocess.run(args, capture_output=True, text=True)
# Parse ASCII format: "n adj_list"
triangulations = []
for line in result.stdout.split('\n'):
line = line.strip()
if not line or line.startswith('>'):
continue
parts = line.split(maxsplit=1)
if len(parts) != 2:
continue
n = int(parts[0])
adj_str = parts[1]
# Build adjacency dict
adj = {}
vertex_lists = adj_str.split(',')
for v_idx, neighbor_str in enumerate(vertex_lists):
neighbors = []
for letter in neighbor_str:
neighbor_idx = ord(letter) - ord('a')
neighbors.append(neighbor_idx)
adj[v_idx] = neighbors
# Extract faces from planar embedding (adjacency lists are in cyclic order)
triangles = extract_faces_from_planar_embedding(n, adj)
if triangles:
triangulations.append(triangles)
return triangulations
def remove_vertex_to_planar(triangles: list, vertex_to_remove: int) -> list:
"""Remove a vertex to create planar triangulation."""
return [tri for tri in triangles if vertex_to_remove not in tri]
def test_chunk(args):
"""Test a chunk of triangulations (for parallel processing)."""
# Import here to ensure each worker has the module
from ideal_poly_volume_toolkit.rivin_delaunay import check_delaunay_realizability
chunk, worker_id, strict = args
realizable = 0
non_realizable = 0
errors = 0
for tri in chunk:
try:
result = check_delaunay_realizability(tri, verbose=False, strict=strict)
if result['realizable']:
realizable += 1
else:
non_realizable += 1
except Exception as e:
errors += 1
return (realizable, non_realizable, errors)
def count_triangulations(n: int, min_connectivity: int = 3) -> int:
"""Count closed triangulations (cached for known values)."""
# OEIS A000109 (3-connected) and A000108 (4-connected)
counts_3conn = {4: 1, 5: 1, 6: 7, 7: 34, 8: 257, 9: 2606, 10: 32300, 11: 440564, 12: 6384634}
counts_4conn = {4: 1, 5: 0, 6: 1, 7: 1, 8: 8, 9: 36, 10: 257, 11: 1734, 12: 13391}
if min_connectivity == 3 and n in counts_3conn:
return counts_3conn[n]
elif min_connectivity == 4 and n in counts_4conn:
return counts_4conn[n]
# Compute by generating and counting
tris = get_triangulations_text(n, min_connectivity)
return len(tris)
def analyze_single_n_parallel(n: int,
n_workers: int = 30,
min_connectivity: int = 3,
verbose: bool = True,
strict: bool = False) -> dict:
"""
Analyze triangulations with n vertices using parallel processing.
Args:
n: Number of vertices
n_workers: Number of parallel workers
min_connectivity: Minimum connectivity (3 or 4)
verbose: Print progress
strict: Use strict realizability (dihedral < π)
Returns:
Statistics dictionary
"""
mode_str = "strict" if strict else "standard"
if verbose:
print(f"\n{'='*70}")
print(f"Analyzing n={n} vertices ({min_connectivity}-connected, {mode_str} mode)")
print(f"{'='*70}")
# Count and generate closed triangulations
start_time = time.time()
closed_tris = get_triangulations_text(n, min_connectivity)
gen_time = time.time() - start_time
n_closed = len(closed_tris)
if verbose:
print(f"Generated {n_closed} closed triangulations in {gen_time:.2f}s")
print(f"Creating {n_closed} planar versions (remove vertex 0)")
# Convert to planar triangulations (only remove vertex 0)
planar_tris = []
for closed_tri in closed_tris:
planar_tri = remove_vertex_to_planar(closed_tri, 0)
if planar_tri:
planar_tris.append(planar_tri)
n_planar = len(planar_tris)
if verbose:
print(f"Total planar triangulations to test: {n_planar}")
print(f"Using {n_workers} parallel workers")
# Split into chunks for parallel processing
chunk_size = max(1, len(planar_tris) // n_workers)
chunks = []
for i in range(0, len(planar_tris), chunk_size):
chunk = planar_tris[i:i+chunk_size]
chunks.append((chunk, i // chunk_size, strict))
# Parallel testing
if verbose:
print(f"Split into {len(chunks)} chunks of ~{chunk_size} triangulations each")
print(f"Testing realizability...")
start_time = time.time()
with mp.Pool(n_workers) as pool:
results = pool.map(test_chunk, chunks)
test_time = time.time() - start_time
# Aggregate results
total_realizable = sum(r[0] for r in results)
total_non_realizable = sum(r[1] for r in results)
total_errors = sum(r[2] for r in results)
tested = total_realizable + total_non_realizable
fraction = total_realizable / tested if tested > 0 else 0.0
stats = {
'n_vertices': n,
'min_connectivity': min_connectivity,
'closed_triangulations': n_closed,
'total_planar_triangulations': n_planar,
'tested': tested,
'realizable': total_realizable,
'non_realizable': total_non_realizable,
'errors': total_errors,
'fraction_realizable': fraction,
'generation_time': gen_time,
'testing_time': test_time,
'n_workers': n_workers,
'strict_mode': strict,
}
if verbose:
print(f"\nResults:")
print(f" Tested: {tested}/{n_planar}")
print(f" Realizable: {total_realizable} ({100*fraction:.1f}%)")
print(f" Non-realizable: {total_non_realizable} ({100*(1-fraction):.1f}%)")
if total_errors > 0:
print(f" Errors: {total_errors}")
if tested > 0:
print(f" Testing time: {test_time:.2f}s ({test_time/tested*1000:.2f}ms per triangulation)")
print(f" Speedup: {n_workers * test_time / tested * 1000:.2f}ms sequential equivalent")
else:
print(f" Testing time: {test_time:.2f}s (no successful tests)")
return stats
def main():
"""Run parallelized analysis."""
import argparse
parser = argparse.ArgumentParser(description='Parallel Delaunay realizability analysis')
parser.add_argument('--min-n', type=int, default=4, help='Minimum vertices')
parser.add_argument('--max-n', type=int, default=15, help='Maximum vertices')
parser.add_argument('--workers', type=int, default=30, help='Number of parallel workers')
parser.add_argument('--connectivity', type=int, default=3, choices=[3, 4],
help='Minimum connectivity (3 or 4)')
parser.add_argument('--time-limit', type=float, default=7200,
help='Wall-clock time limit in seconds (default: 2 hours)')
parser.add_argument('--output', type=str, default='delaunay_parallel_results.json',
help='Output file')
parser.add_argument('--strict', action='store_true',
help='Use strict realizability mode (dihedral < π)')
args = parser.parse_args()
mode_desc = "STRICT (dihedral < π)" if args.strict else "STANDARD (dihedral ≤ π)"
print("="*70)
print(f"PARALLEL DELAUNAY REALIZABILITY ANALYSIS")
print("="*70)
print(f"\nParameters:")
print(f" Vertex range: {args.min_n} to {args.max_n}")
print(f" Parallel workers: {args.workers}")
print(f" Min connectivity: {args.connectivity}")
print(f" Mode: {mode_desc}")
print(f" Time limit: {args.time_limit/3600:.1f} hours")
# Run analysis
results = []
start_time = time.time()
for n in range(args.min_n, args.max_n + 1):
# Check time limit
elapsed = time.time() - start_time
if elapsed > args.time_limit:
print(f"\n⏱ Time limit reached ({elapsed/3600:.2f} hours)")
break
try:
# Estimate if we have enough time
if results:
avg_time = sum(r['generation_time'] + r['testing_time'] for r in results) / len(results)
if elapsed + avg_time * 2 > args.time_limit:
print(f"\n⏱ Approaching time limit, stopping at n={n-1}")
break
stats = analyze_single_n_parallel(
n,
n_workers=args.workers,
min_connectivity=args.connectivity,
verbose=True,
strict=args.strict
)
results.append(stats)
except KeyboardInterrupt:
print(f"\n\n⚠ Interrupted by user at n={n}")
break
except Exception as e:
print(f"\n✗ Error at n={n}: {e}")
import traceback
traceback.print_exc()
break
# Save results
output_data = {
'parameters': {
'min_n': args.min_n,
'max_n': args.max_n,
'workers': args.workers,
'min_connectivity': args.connectivity,
'strict_mode': args.strict,
'time_limit': args.time_limit,
'actual_runtime': time.time() - start_time,
},
'results': results
}
with open(args.output, 'w') as f:
json.dump(output_data, f, indent=2)
# Summary
print(f"\n{'='*70}")
print("SUMMARY")
print(f"{'='*70}")
print(f"\n{'n':<4} {'Closed':<8} {'Planar':<8} {'Realizable':<12} {'%Real':<8} {'Time(s)':<8}")
print("-" * 70)
for r in results:
total_time = r['generation_time'] + r['testing_time']
print(f"{r['n_vertices']:<4} {r['closed_triangulations']:<8} "
f"{r['total_planar_triangulations']:<8} {r['realizable']:<12} "
f"{100*r['fraction_realizable']:>6.1f}% {total_time:>7.1f}")
print(f"\nTotal runtime: {(time.time() - start_time)/3600:.2f} hours")
print(f"Results saved to: {args.output}")
if __name__ == '__main__':
main()