|
|
|
|
|
|
|
|
import json |
|
|
import os |
|
|
|
|
|
import numpy as np |
|
|
from ...core.logging import logger |
|
|
|
|
|
|
|
|
class ConvergenceUtils: |
|
|
|
|
|
def __init__(self, root_path): |
|
|
self.root_path = root_path |
|
|
self.data = None |
|
|
self.rounds = None |
|
|
self.avg_scores, self.stds = None, None |
|
|
|
|
|
def load_data(self, root_path): |
|
|
""" |
|
|
Read JSON file, create a new file if it doesn't exist, then return the data. |
|
|
""" |
|
|
rounds_dir = self.root_path |
|
|
result_file = os.path.join(rounds_dir, "results.json") |
|
|
|
|
|
|
|
|
os.makedirs(rounds_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
if not os.path.exists(result_file): |
|
|
with open(result_file, "w") as file: |
|
|
json.dump([], file) |
|
|
|
|
|
|
|
|
with open(result_file, "r") as file: |
|
|
return json.load(file) |
|
|
|
|
|
def process_rounds(self): |
|
|
""" |
|
|
Organize data by round, return a dictionary of scores by round. |
|
|
""" |
|
|
self.data = self.load_data(root_path=self.root_path) |
|
|
rounds = {} |
|
|
for entry in self.data: |
|
|
round_number = entry["round"] |
|
|
score = entry["score"] |
|
|
if round_number not in rounds: |
|
|
rounds[round_number] = [] |
|
|
rounds[round_number].append(score) |
|
|
return rounds |
|
|
|
|
|
def calculate_avg_and_std(self): |
|
|
""" |
|
|
Calculate average score and standard deviation for each round, return two lists: average scores and standard deviations. |
|
|
""" |
|
|
self.rounds = self.process_rounds() |
|
|
|
|
|
sorted_rounds = sorted(self.rounds.items(), key=lambda x: x[0]) |
|
|
avg_scores = [] |
|
|
stds = [] |
|
|
for round_number, scores in sorted_rounds: |
|
|
avg_scores.append(np.mean(scores)) |
|
|
stds.append(np.std(scores)) |
|
|
return avg_scores, stds |
|
|
|
|
|
def check_convergence(self, top_k=3, z=0, consecutive_rounds=5): |
|
|
""" |
|
|
Check for convergence. z is the z-score corresponding to the confidence level. |
|
|
consecutive_rounds is the number of consecutive rounds that must meet the stop condition. |
|
|
""" |
|
|
|
|
|
self.avg_scores, self.stds = self.calculate_avg_and_std() |
|
|
|
|
|
if len(self.avg_scores) < top_k + 1: |
|
|
return False, None, None |
|
|
convergence_count = 0 |
|
|
previous_y = None |
|
|
sigma_y_previous = None |
|
|
for i in range(len(self.avg_scores)): |
|
|
|
|
|
top_k_indices = np.argsort(self.avg_scores[: i + 1])[::-1][ |
|
|
:top_k |
|
|
] |
|
|
top_k_scores = [self.avg_scores[j] for j in top_k_indices] |
|
|
top_k_stds = [ |
|
|
self.stds[j] for j in top_k_indices |
|
|
] |
|
|
|
|
|
y_current = np.mean(top_k_scores) |
|
|
|
|
|
sigma_y_current = np.sqrt(np.sum([s**2 for s in top_k_stds]) / (top_k**2)) |
|
|
|
|
|
if previous_y is not None: |
|
|
|
|
|
delta_y = y_current - previous_y |
|
|
|
|
|
sigma_delta_y = np.sqrt(sigma_y_current**2 + sigma_y_previous**2) |
|
|
|
|
|
if abs(delta_y) <= z * sigma_delta_y: |
|
|
convergence_count += 1 |
|
|
|
|
|
if convergence_count >= consecutive_rounds: |
|
|
return True, i - consecutive_rounds + 1, i |
|
|
else: |
|
|
|
|
|
convergence_count = 0 |
|
|
|
|
|
previous_y = y_current |
|
|
sigma_y_previous = sigma_y_current |
|
|
|
|
|
return False, None, None |
|
|
|
|
|
def print_results(self): |
|
|
""" |
|
|
Print average score and standard deviation for all rounds. |
|
|
""" |
|
|
self.avg_scores, self.stds = self.calculate_avg_and_std() |
|
|
for i, (avg_score, std) in enumerate(zip(self.avg_scores, self.stds), 1): |
|
|
logger.info(f"Round {i}: Average Score = {avg_score:.4f}, Standard Deviation = {std:.4f}") |
|
|
|
|
|
|