| import importlib.util |
| import traceback |
| import json |
| import os |
| import sys |
| from pathlib import Path |
|
|
| |
| parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| sys.path.insert(0, parent_dir) |
| from utils import * |
| from simulator import * |
| from broadcast import * |
| import networkx as nx |
|
|
|
|
| def validate_broadcast_topology(bc_t, source_node, terminal_nodes, num_partitions, G): |
| """ |
| Validate that the broadcast topology is complete and correct. |
| |
| Returns: |
| (is_valid, error_message) tuple |
| """ |
| |
| if set(bc_t.dsts) != set(terminal_nodes): |
| missing_dsts = set(terminal_nodes) - set(bc_t.dsts) |
| extra_dsts = set(bc_t.dsts) - set(terminal_nodes) |
| return False, f"Destination mismatch: missing={missing_dsts}, extra={extra_dsts}" |
| |
| |
| if bc_t.src != source_node: |
| return False, f"Source mismatch: expected={source_node}, got={bc_t.src}" |
| |
| |
| missing_partitions = [] |
| empty_partitions = [] |
| invalid_paths = [] |
| |
| for dst in terminal_nodes: |
| if dst not in bc_t.paths: |
| return False, f"Missing destination '{dst}' in paths" |
| |
| for partition_id in range(num_partitions): |
| partition_key = str(partition_id) |
| |
| |
| if partition_key not in bc_t.paths[dst]: |
| missing_partitions.append((dst, partition_id)) |
| continue |
| |
| partition_paths = bc_t.paths[dst][partition_key] |
| |
| |
| if partition_paths is None or len(partition_paths) == 0: |
| empty_partitions.append((dst, partition_id)) |
| continue |
| |
| |
| |
| path_nodes = [source_node] |
| path_valid = True |
| |
| for edge in partition_paths: |
| if len(edge) < 3: |
| invalid_paths.append((dst, partition_id, "edge format invalid")) |
| path_valid = False |
| break |
| |
| edge_src, edge_dst, edge_data = edge[0], edge[1], edge[2] |
| |
| |
| if not G.has_edge(edge_src, edge_dst): |
| invalid_paths.append((dst, partition_id, f"edge {edge_src}->{edge_dst} not in graph")) |
| path_valid = False |
| break |
| |
| |
| if path_nodes[-1] != edge_src: |
| invalid_paths.append((dst, partition_id, f"path discontinuity: expected {path_nodes[-1]}, got {edge_src}")) |
| path_valid = False |
| break |
| |
| path_nodes.append(edge_dst) |
| |
| |
| if path_valid and path_nodes[-1] != dst: |
| invalid_paths.append((dst, partition_id, f"path does not reach destination: ends at {path_nodes[-1]}, expected {dst}")) |
| |
| |
| errors = [] |
| if missing_partitions: |
| errors.append(f"Missing partitions: {missing_partitions}") |
| if empty_partitions: |
| errors.append(f"Empty partitions: {empty_partitions}") |
| if invalid_paths: |
| errors.append(f"Invalid paths: {invalid_paths}") |
| |
| if errors: |
| return False, "Validation failed: " + "; ".join(errors) |
| |
| |
| |
| expected_total_partitions = len(terminal_nodes) * num_partitions |
| |
| |
| actual_partitions = 0 |
| for dst in terminal_nodes: |
| for partition_id in range(num_partitions): |
| partition_key = str(partition_id) |
| if (partition_key in bc_t.paths[dst] and |
| bc_t.paths[dst][partition_key] is not None and |
| len(bc_t.paths[dst][partition_key]) > 0): |
| actual_partitions += 1 |
| |
| if actual_partitions != expected_total_partitions: |
| return False, f"Data loss detected: expected {expected_total_partitions} partitions, got {actual_partitions}" |
| |
| return True, None |
|
|
|
|
| def evaluate(program_path): |
| """ |
| Evaluate the evolved broadcast optimization program across multiple configurations. |
| |
| Args: |
| program_path: Path to the evolved program file |
| |
| Returns: |
| Dictionary with evaluation metrics including required 'combined_score' |
| """ |
| try: |
| |
| spec = importlib.util.spec_from_file_location("program", program_path) |
| program = importlib.util.module_from_spec(spec) |
| spec.loader.exec_module(program) |
| |
| |
| if not hasattr(program, "search_algorithm"): |
| return { |
| "combined_score": 0.0, |
| "runs_successfully": 0.0, |
| "error": "Missing search_algorithm function" |
| } |
| |
| |
| evaluator_dir = os.path.dirname(os.path.abspath(__file__)) |
| config_files = [ |
| os.path.join(evaluator_dir, "examples/config/intra_aws.json"), |
| os.path.join(evaluator_dir, "examples/config/intra_azure.json"), |
| os.path.join(evaluator_dir, "examples/config/intra_gcp.json"), |
| os.path.join(evaluator_dir, "examples/config/inter_agz.json"), |
| os.path.join(evaluator_dir, "examples/config/inter_gaz2.json") |
| ] |
| |
| |
| existing_configs = [f for f in config_files if os.path.exists(f)] |
| |
| if not existing_configs: |
| return { |
| "combined_score": 0.0, |
| "runs_successfully": 0.0, |
| "error": f"No configuration files found. Checked: {config_files}" |
| } |
| |
| num_vms = 2 |
| total_cost = 0.0 |
| successful_configs = 0 |
| failed_configs = 0 |
| |
| |
| for jsonfile in existing_configs: |
| try: |
| print(f"Processing config: {os.path.basename(jsonfile)}") |
| |
| |
| with open(jsonfile, "r") as f: |
| config_name = os.path.basename(jsonfile).split(".")[0] |
| config = json.loads(f.read()) |
|
|
| |
| G = make_nx_graph(num_vms=int(num_vms)) |
|
|
| |
| source_node = config["source_node"] |
| terminal_nodes = config["dest_nodes"] |
|
|
| |
| directory = f"paths/{config_name}" |
| if not os.path.exists(directory): |
| Path(directory).mkdir(parents=True, exist_ok=True) |
|
|
| |
| num_partitions = config["num_partitions"] |
| bc_t = program.search_algorithm(source_node, terminal_nodes, G, num_partitions) |
|
|
| bc_t.set_num_partitions(config["num_partitions"]) |
| |
| |
| is_valid, validation_error = validate_broadcast_topology( |
| bc_t, source_node, terminal_nodes, num_partitions, G |
| ) |
| |
| if not is_valid: |
| print(f"Validation failed for {config_name}: {validation_error}") |
| |
| return { |
| "combined_score": 0.0, |
| "runs_successfully": 0.0, |
| "error": f"Invalid broadcast topology: {validation_error}" |
| } |
| |
| |
| outf = f"{directory}/search_algorithm.json" |
| with open(outf, "w") as outfile: |
| outfile.write( |
| json.dumps( |
| { |
| "algo": "search_algorithm", |
| "source_node": bc_t.src, |
| "terminal_nodes": bc_t.dsts, |
| "num_partitions": bc_t.num_partitions, |
| "generated_path": bc_t.paths, |
| } |
| ) |
| ) |
|
|
| |
| input_dir = f"paths/{config_name}" |
| output_dir = f"evals/{config_name}" |
| if not os.path.exists(output_dir): |
| Path(output_dir).mkdir(parents=True, exist_ok=True) |
|
|
| |
| simulator = BCSimulator(int(num_vms), output_dir) |
| _, cost = simulator.evaluate_path(outf, config) |
| |
| |
| total_cost += cost |
| successful_configs += 1 |
| |
| print(f"Config {config_name}: cost={cost:.2f}") |
| |
| except Exception as e: |
| print(f"Failed to process {os.path.basename(jsonfile)}: {str(e)}") |
| failed_configs += 1 |
| break |
| |
| |
| if failed_configs != 0: |
| return { |
| "combined_score": 0.0, |
| "runs_successfully": 0.0, |
| "error": "1 or more configuration files failed to process" |
| } |
| |
| |
| avg_cost = total_cost / successful_configs |
| success_rate = successful_configs / (successful_configs + failed_configs) |
| |
| print(f"Summary: {successful_configs} successful, {failed_configs} failed") |
| print(f"Total cost: {total_cost:.2f}") |
| |
| |
| |
| cost_score = 1.0 / (1.0 + total_cost) |
| |
| |
| combined_score = cost_score |
| |
| return { |
| "combined_score": combined_score, |
| "runs_successfully": success_rate, |
| "total_cost": total_cost, |
| "avg_cost": avg_cost, |
| "successful_configs": successful_configs, |
| "failed_configs": failed_configs, |
| "cost_score": cost_score, |
| "success_rate": success_rate |
| } |
|
|
| except Exception as e: |
| print(f"Evaluation failed: {str(e)}") |
| print(traceback.format_exc()) |
| return { |
| "combined_score": 0.0, |
| "runs_successfully": 0.0, |
| "error": str(e) |
| } |
|
|
|
|
| if __name__ == "__main__": |
| |
| |
| |
| from wrapper import run |
|
|
| run(evaluate) |