Source code for imputegap.recovery.benchmark

import datetime
import os
import math
import time
from collections import defaultdict
import numpy as np
import matplotlib.pyplot as plt
import xlsxwriter
from imputegap.tools import utils
from imputegap.recovery.manager import TimeSeries
import psutil



[docs] class Benchmark: """ A class to evaluate the performance of imputation algorithms through benchmarking across datasets and patterns. Methods ------- average_runs_by_names(self, data): Average the results of all runs depending on the dataset. avg_results(): Calculate average metrics (e.g., RMSE) across multiple datasets and algorithm runs. generate_heatmap(): Generate and save a heatmap visualization of RMSE scores for datasets and algorithms. generate_reports_txt(): Create detailed text-based reports summarizing metrics and timing results for all evaluations. generate_reports_excel(): Create detailed excel-based reports summarizing metrics and timing results for all evaluations. generate_plots(): Visualize metrics (e.g., RMSE, MAE) and timing (e.g., imputation, optimization) across patterns and datasets. eval(): Perform a complete benchmarking pipeline, including contamination, imputation, evaluation, and reporting. Example ------- output : {'eegalcohol': {'mcar': {'MeanImpute': {'default_params': {'0.05': {'scores': {'RMSE': 1.107394798606378, 'MAE': 0.9036474830477748, 'CORRELATION': nan, 'RUNTIME': 10.07390022277832, 'RUNTIME_LOG': 1.00319764506136}}, '0.1': {'scores': {'RMSE': 0.8569349076796438, 'MAE': 0.6416542359734557, 'CORRELATION': nan, 'RUNTIME': 1.0, 'RUNTIME_LOG': 0.0}}, '0.2': {'scores': {'RMSE': 0.9924113085421721, 'MAE': 0.7939689811173046, 'CORRELATION': nan, 'RUNTIME': 1.0, 'RUNTIME_LOG': 0.0}}, '0.4': {'scores': {'RMSE': 1.0058063455061463, 'MAE': 0.8076546785476064, 'CORRELATION': nan, 'RUNTIME': 1.0, 'RUNTIME_LOG': 0.0}}, '0.6': {'scores': {'RMSE': 0.9891809506243663, 'MAE': 0.7914550709031675, 'CORRELATION': nan, 'RUNTIME': 1.0, 'RUNTIME_LOG': 0.0}}, '0.8': {'scores': {'RMSE': 0.9927953862507292, 'MAE': 0.7925635744718286, 'CORRELATION': nan, 'RUNTIME': 1.0, 'RUNTIME_LOG': 0.0}}}}, 'SoftImpute': {'default_params': {'0.05': {'scores': {'RMSE': 0.4359915238078244, 'MAE': 0.3725965559420608, 'CORRELATION': 0.9530448037164908, 'RUNTIME': 199.30577278137207, 'RUNTIME_LOG': 2.2995198779819055}}, '0.1': {'scores': {'RMSE': 0.3665001858394363, 'MAE': 0.2989983612840734, 'CORRELATION': 0.9049909722894052, 'RUNTIME': 117.54822731018066, 'RUNTIME_LOG': 2.0702160841184516}}, '0.2': {'scores': {'RMSE': 0.39833006221984, 'MAE': 0.30824644022807457, 'CORRELATION': 0.9161465703422209, 'RUNTIME': 317.5652027130127, 'RUNTIME_LOG': 2.5018329084349737}}, '0.4': {'scores': {'RMSE': 0.435591016228979, 'MAE': 0.3335144215651955, 'CORRELATION': 0.9021032587324183, 'RUNTIME': 302.2916316986084, 'RUNTIME_LOG': 2.4804261248244566}}, '0.6': {'scores': {'RMSE': 0.4500113661547204, 'MAE': 0.338085865703361, 'CORRELATION': 0.8893263437029546, 'RUNTIME': 314.93282318115234, 'RUNTIME_LOG': 2.498217926383076}}, '0.8': {'scores': {'RMSE': 0.46554422402146944, 'MAE': 0.3508926604243284, 'CORRELATION': 0.8791443563129441, 'RUNTIME': 311.9697570800781, 'RUNTIME_LOG': 2.4941124947560986}}}}}}} """ def __init__(self): """ Initialize the Benchmark object. """ self.list_results = None self.aggregate_results = None self.heatmap = None self.plots = None def _benchmark_exception(self, data, algorithm, pattern, x): """ Check whether a specific algorithm-pattern combination should be excluded from benchmarking. This function flags exceptions where benchmarking is not appropriate or known to fail, based on the algorithm name, the missingness pattern, and the missingness rate `x`. Parameters ---------- data : numpy matrix Matrix of data with nan values algorithm : str Name of the imputation algorithm (e.g., 'DEEPMVI', 'PRISTI'). pattern : str Missing data pattern (e.g., 'MCAR', 'ALIGNED'). x : float Proportion of missing values in the data (between 0 and 1). Returns ------- bool True if the benchmark should be skipped for the given configuration, False otherwise. Rules ----- - For DeepMVI with MCAR pattern and x > 0.6, skip benchmarking. - For PRISTI, always skip benchmarking. """ if algorithm.upper() == 'DEEPMVI' or algorithm.upper() == 'DEEP_MVI': if pattern.lower() == "mcar" or pattern.lower() == "missing_completely_at_random": if x > 0.6: print(f"\n(BENCH) The imputation algorithm {algorithm} is not compatible with this configuration {pattern} with missingness rate more than 0.6.") return True if pattern.lower() == "mp" or pattern.lower() == "aligned": if x < 0.15: print(f"\n(BENCH) The imputation algorithm {algorithm} is not compatible with this configuration {pattern} with missingness rate less then 0.15.") return True if algorithm.upper() == 'MPIN': print(f"\n(BENCH) The imputation algorithm {algorithm} is not compatible with this setup.") return True return False def _config_optimization(self, opti_mean, ts_test, pattern, algorithm, block_size_mcar): """ Configure and execute optimization for selected imputation algorithm and pattern. Parameters ---------- opti_mean : float Mean parameter for contamination. ts_test : TimeSeries TimeSeries object containing dataset. pattern : str Type of contamination pattern (e.g., "mcar", "mp", "blackout", "disjoint", "overlap", "gaussian"). algorithm : str Imputation algorithm to use. block_size_mcar : int Size of blocks removed in MCAR Returns ------- BaseImputer Configured imputer instance with optimal parameters. """ incomp_data = utils.config_contamination(ts=ts_test, pattern=pattern, dataset_rate=opti_mean, series_rate=opti_mean, block_size=block_size_mcar) imputer = utils.config_impute_algorithm(incomp_data=incomp_data, algorithm=algorithm) return imputer
[docs] def average_runs_by_names(self, data): """ Average the results of all runs depending on the dataset Parameters ---------- data : list list of dictionary containing the results of the benchmark runs. Returns ------- list list of dictionary containing the results of the benchmark runs averaged by datasets. """ results_avg, all_names = [], [] # Extract dataset names for dictionary in data: all_keys = list(dictionary.keys()) dataset_name = all_keys[0] all_names.append(dataset_name) # Get unique dataset names unique_names = sorted(set(all_names)) # Initialize and populate the split matrix split = [[0 for _ in range(all_names.count(name))] for name in unique_names] for i, name in enumerate(unique_names): x = 0 for y, match in enumerate(all_names): if name == match: split[i][x] = data[y] x += 1 # Iterate over the split matrix to calculate averages for datasets in split: tmp = [dataset for dataset in datasets if dataset != 0] merged_dict = {} count = len(tmp) # Process and calculate averages for dataset in tmp: for outer_key, outer_value in dataset.items(): for middle_key, middle_value in outer_value.items(): for mean_key, mean_value in middle_value.items(): for method_key, method_value in mean_value.items(): for level_key, level_value in method_value.items(): # Initialize scores and times if not already initialized merger = merged_dict.setdefault(outer_key, {} ).setdefault(middle_key, {}).setdefault(mean_key, {} ).setdefault( method_key, {}).setdefault(level_key, {"scores": {}}) # Add scores and times for score_key, v in level_value["scores"].items(): if v is None : v = 0 merger["scores"][score_key] = (merger["scores"].get(score_key, 0) + v / count) results_avg.append(merged_dict) return results_avg
[docs] def avg_results(self, *datasets, metric="RMSE"): """ Calculate the average of all metrics and times across multiple datasets. Parameters ---------- datasets : dict Multiple dataset dictionaries to be averaged. metric : str Metric to group. Returns ------- List Matrix with averaged scores and times for all levels, list of algorithms, list of datasets """ # Step 1: Compute average RMSE across runs for each dataset and algorithm aggregated_data = {} for runs in datasets: for dataset, dataset_items in runs.items(): if dataset not in aggregated_data: aggregated_data[dataset] = {} for pattern, pattern_items in dataset_items.items(): for algo, algo_data in pattern_items.items(): if algo not in aggregated_data[dataset]: aggregated_data[dataset][algo] = [] for missing_values, missing_values_item in algo_data.items(): for param, param_data in missing_values_item.items(): rmse = param_data["scores"][metric] aggregated_data[dataset][algo].append(rmse) # Step 2: Compute averages using NumPy average_rmse_matrix = {} for dataset, algos in aggregated_data.items(): average_rmse_matrix[dataset] = {} for algo, rmse_values in algos.items(): rmse_array = np.array(rmse_values) avg_rmse = np.mean(rmse_array) average_rmse_matrix[dataset][algo] = avg_rmse # Step 3: Create a matrix representation of datasets and algorithms datasets_list = list(average_rmse_matrix.keys()) algorithms = {algo for algos in average_rmse_matrix.values() for algo in algos} algorithms_list = sorted(algorithms) # Prepare a NumPy matrix comprehensive_matrix = np.zeros((len(datasets_list), len(algorithms_list))) for i, dataset in enumerate(datasets_list): for j, algo in enumerate(algorithms_list): comprehensive_matrix[i, j] = average_rmse_matrix[dataset].get(algo, np.nan) return comprehensive_matrix, algorithms_list, datasets_list
[docs] def generate_heatmap(self, scores_list, algos, sets, metric="RMSE", save_dir="./reports", display=True): """ Generate and save RMSE matrix in HD quality. Parameters ---------- scores_list : np.ndarray 2D numpy array containing RMSE values. algos : list of str List of algorithm names (columns of the heatmap). sets : list of str List of dataset names (rows of the heatmap). metric : str, optional metric to extract save_dir : str, optional Directory to save the generated plot (default is "./reports"). display : bool, optional Display or not the plot Returns ------- Bool True if the matrix has been generated """ save_dir = save_dir + "/_heatmaps/" if not os.path.exists(save_dir): os.makedirs(save_dir) nbr_algorithms = len(algos) nbr_datasets= len(sets) cell_size = 4.0 x_size = cell_size*nbr_algorithms y_size = cell_size*nbr_datasets fig, ax = plt.subplots(figsize=(x_size, y_size)) fig.canvas.manager.set_window_title("benchmark heatmap, " + metric) cmap = plt.cm.Greys if metric == "RMSE": norm = plt.Normalize(vmin=0, vmax=2) elif metric == "CORRELATION": norm = plt.Normalize(vmin=-2, vmax=2) elif metric == "MAE": norm = plt.Normalize(vmin=0, vmax=1) elif metric == "MI": norm = plt.Normalize(vmin=-1, vmax=1.5) elif metric.lower() == "runtime": norm = plt.Normalize(vmin=0, vmax=5000) elif metric.lower() == "runtime_log": norm = plt.Normalize(vmin=-2, vmax=10) else: norm = plt.Normalize(vmin=0, vmax=2000) # Create the heatmap heatmap = ax.imshow(scores_list, cmap=cmap, norm=norm, aspect='auto') # Add color bar for reference cbar = plt.colorbar(heatmap, ax=ax, orientation='vertical') cbar.set_label(metric, rotation=270, labelpad=15) # Set the tick labels ax.set_xticks(np.arange(nbr_algorithms)) ax.set_xticklabels(algos) ax.set_yticks(np.arange(nbr_datasets)) ax.set_yticklabels(sets) # Add titles and labels ax.set_title('ImputeGAP Algorithms Comparison') ax.set_xlabel('Algorithms') ax.set_ylabel('Datasets') # Show values on the heatmap for i in range(len(sets)): for j in range(len(algos)): ax.text(j, i, f"{scores_list[i, j]:.2f}", ha='center', va='center', color="black" if scores_list[i, j] < 1 else "white") # for visibility filename = "benchmarking_"+ metric.lower()+ ".jpg" filepath = os.path.join(save_dir, filename) plt.savefig(filepath, dpi=300, bbox_inches='tight') # Save in HD with tight layout # Show the plot if display: plt.tight_layout() plt.show() self.heatmap = plt else: plt.close() return True
[docs] def generate_reports_txt(self, runs_plots_scores, save_dir="./reports", dataset="", metrics=["RMSE"], run=-1, rt=0, verbose=True): """ Generate and save a text report of metrics and timing for each dataset, algorithm, and pattern. Parameters ---------- runs_plots_scores : dict Dictionary containing scores and timing information for each dataset, pattern, and algorithm. save_dir : str, optional Directory to save the reports file (default is "./reports"). dataset : str, optional Name of the data for the report name. metrics : str, optional List of metrics asked for in the report. run : int, optional Number of the run. rt : float, optional Total time of the run. verbose : bool, optional Whether to display the contamination information (default is True). Returns ------- None Notes ----- The report is saved in a "report.txt" file in `save_dir`, organized in sections with headers and results. """ os.makedirs(save_dir, exist_ok=True) metric_unit = "ms" if "RMSE" not in metrics: to_call = [metrics[0], "RUNTIME"] else: to_call = ["RMSE", "RUNTIME"] new_metrics = np.copy(metrics) if metrics is None: new_metrics = utils.list_of_metrics() else: if "RUNTIME" not in new_metrics: new_metrics = np.append(new_metrics, "RUNTIME") if "RUNTIME_LOG" not in new_metrics: new_metrics = np.append(new_metrics, "RUNTIME_LOG") opt = None for dataset, patterns_items in runs_plots_scores.items(): for pattern, algorithm_items in patterns_items.items(): for algorithm, optimizer_items in algorithm_items.items(): for optimizer, x_data_items in optimizer_items.items(): opt = optimizer break list_of_patterns = [] for dataset, patterns_items in runs_plots_scores.items(): for pattern, algorithm_items in patterns_items.items(): list_of_patterns.append(pattern) new_dir = save_dir + "/" + pattern.lower() + "/error" os.makedirs(new_dir, exist_ok=True) save_path = os.path.join(new_dir, f"report_{pattern}_{dataset}.txt") current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") with open(save_path, "w") as file: file.write(f"Report for Dataset: {dataset}\n") file.write(f"Generated on: {current_time}\n") file.write(f"Total runtime: {rt} (ms)\n") if run >= 0: file.write(f"Run number: {run}\n") file.write("=" * 120 + "\n\n") for metric in new_metrics: if metric == "RUNTIME": file.write(f"\n{dataset}: {{{pattern}, {metric}[{metric_unit}], {opt}}}") else: file.write(f"\n{dataset}: {{{pattern}, {metric}, {opt}}}") # Collect all algorithms and scores by rate rate_to_scores = defaultdict(dict) all_algorithms = set() for algorithm, optimizer_items in algorithm_items.items(): for optimizer, x_data_items in optimizer_items.items(): for x, values in x_data_items.items(): score = values.get("scores", {}).get(metric, None) if score is not None: rate_to_scores[x][algorithm] = f"{score:.10f}" all_algorithms.add(algorithm) all_algorithms = sorted(all_algorithms) headers = ["Rate"] + list(all_algorithms) column_widths = [5] + [18] * len(all_algorithms) # Header and separator rows header_row = "".join(f" {header:^{width}} " for header, width in zip(headers, column_widths)) separator_row = "" + "".join(f"{'' * (width + 2)}" for width in column_widths) + "" file.write(f"{separator_row}\n") file.write(f"{header_row}\n") file.write(f"{separator_row}\n") if metric in to_call and verbose: if metric == "RUNTIME": print(f"\n{dataset}: {{{pattern}, {metric}[{metric_unit}], {opt}}}") else: print(f"\n{dataset}: {{{pattern}, {metric}, {opt}}}") print(separator_row) print(f"{header_row}") print(separator_row) # Write each row for rate in sorted(rate_to_scores.keys()): row_values = [rate] + [rate_to_scores[rate].get(algo, "") for algo in all_algorithms] row = "".join(f" {val:^{width}} " for val, width in zip(row_values, column_widths)) file.write(f"{row}\n") if metric in to_call and verbose: print(f"{row}") file.write(f"{separator_row}\n\n") if metric in to_call and verbose: print(separator_row + "\n") file.write("Dictionary of Results:\n") file.write(str(runs_plots_scores) + "\n")
[docs] def generate_reports_excel(self, runs_plots_scores, save_dir="./reports", dataset="", run=-1, verbose=True): """ Generate and save an Excel-like text report of metrics and timing for each dataset, algorithm, and pattern. Parameters ---------- runs_plots_scores : dict Dictionary containing scores and timing information for each dataset, pattern, and algorithm. save_dir : str, optional Directory to save the Excel-like file (default is "./reports"). dataset : str, optional Name of the data for the Excel-like file name. run : int, optional Number of the run verbose : bool, optional Whether to display the contamination information (default is True). Returns ------- None """ os.makedirs(save_dir, exist_ok=True) save_path = os.path.join(save_dir, f"report_{dataset}.xlsx") # Create an Excel workbook workbook = xlsxwriter.Workbook(save_path) # Add a summary sheet with the header, creation date, dictionary content, and links to other sheets summary_sheet = workbook.add_worksheet("Summary") current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") summary_sheet.set_column(0, 1, 50) # Title and header summary_sheet.write(0, 0, "ImputeGAP, A library of Imputation Techniques for Time Series Data") summary_sheet.write(2, 0, "Report for Dataset") summary_sheet.write(2, 1, dataset) summary_sheet.write(3, 0, "Generated on") summary_sheet.write(3, 1, current_time) if run >= 0: summary_sheet.write(4, 0, "Run Number") summary_sheet.write(4, 1, run) # Add links to metric sheets row = 6 summary_sheet.write(row, 0, "Metric Sheets:") row += 1 metrics = { "RMSE": "Root Mean Square Error - Measures the average magnitude of error.", "MAE": "Mean Absolute Error - Measures the average absolute error.", "MI": "Mutual Information - Indicates dependency between variables.", "CORRELATION": "Correlation Coefficient - Indicates linear relationship between variables." } for metric in metrics.keys(): summary_sheet.write_url(row, 0, f"internal:'{metric}'!A1", string=f"Go to {metric} Sheet") row += 1 # Write the dictionary content summary_sheet.write(row + 1, 0, "Dictionary of Results") row += 2 for key, value in runs_plots_scores.items(): summary_sheet.write(row, 0, str(key)) summary_sheet.write(row, 1, str(value)) row += 1 for metric, description in metrics.items(): # Create a worksheet for each metric worksheet = workbook.add_worksheet(metric) # Write the metric description at the top and add IMPUTEGAP header worksheet.write(0, 0, "ImputeGAP, A library of Imputation Techniques for Time Series Data") worksheet.write(2, 0, f"{metric}: {description}") # Define consistent column headers and widths headers = ["Dataset", "Algorithm", "Optimizer", "Pattern", "X Value", metric] column_widths = [15, 15, 15, 15, 12, 20] # Adjust widths for Excel # Write the headers for col, (header, width) in enumerate(zip(headers, column_widths)): worksheet.set_column(col, col, width) worksheet.write(3, col, header) # Populate the data row = 4 for dataset, algo_items in runs_plots_scores.items(): for algorithm, optimizer_items in algo_items.items(): for optimizer, pattern_data in optimizer_items.items(): for pattern, x_data_items in pattern_data.items(): for x, values in x_data_items.items(): value = values.get("scores", {}).get(metric, None) if value is not None: value = f"{value:.10f}" data = [dataset, algorithm, optimizer, pattern, str(x), value] for col, cell_value in enumerate(data): worksheet.write(row, col, cell_value) row += 1 # Close the workbook workbook.close()
[docs] def generate_plots(self, runs_plots_scores, ticks, metrics=None, subplot=False, y_size=4, title=None, save_dir="./reports",display=False, verbose=True): """ Generate and save plots for each metric and pattern based on provided scores. Parameters ---------- runs_plots_scores : dict Dictionary containing scores and timing information for each dataset, pattern, and algorithm. ticks : list of float List of missing rates for contamination. metrics : list of string List of metrics used subplot : bool, optional If True, generates a single figure with subplots for all metrics (default is False). y_size : int, optional Default size of the graph (default is 4). title : str, optional Title of the graph (default is "imputegap benchmark"). save_dir : str, optional Directory to save generated plots (default is "./reports"). display : bool, optional Display or not the plots (default is False). verbose : bool, optional Whether to display the contamination information (default is True). Returns ------- None Notes ----- Saves generated plots in `save_dir`, categorized by dataset, pattern, and metric. """ os.makedirs(save_dir, exist_ok=True) print("\nThe plots have been generated...\n") new_metrics = np.copy(metrics) new_plots = 0 if metrics is None: new_metrics = utils.list_of_metrics() else: if "RUNTIME_LOG" not in new_metrics: new_plots = new_plots + 1 new_metrics = np.append(new_metrics, "RUNTIME_LOG") n_rows = int((len(new_metrics)+new_plots)/2) x_size, title_flag = 16, title for dataset, pattern_items in runs_plots_scores.items(): for pattern, algo_items in pattern_items.items(): if subplot: fig, axes = plt.subplots(nrows=n_rows, ncols=2, figsize=(x_size*1.90, y_size*2.90)) # Adjusted figsize fig.subplots_adjust( left=0.04, right=0.99, top=0.97, bottom=0.05, wspace=0.095, hspace=0.2 ) if title_flag is None: title = dataset + " : " + pattern + ", benchmark analysis" fig.canvas.manager.set_window_title(title) axes = axes.ravel() # Flatten the 2D array of axes to a 1D array # Iterate over each metric, generating separate plots, including new timing metrics for i, metric in enumerate(new_metrics): if subplot: if i < len(axes): ax = axes[i] else: break # Prevent index out of bounds if metrics exceed subplot slots else: plt.figure(figsize=(x_size, y_size)) ax = plt.gca() has_data = False # Flag to check if any data is added to the plot max_y, min_y = -99999, 99999 for algorithm, optimizer_items in algo_items.items(): x_vals = [] y_vals = [] for optimizer, x_data in optimizer_items.items(): for x, values in x_data.items(): if metric in values["scores"]: x_vals.append(float(x)) y_vals.append(values["scores"][metric]) if x_vals and y_vals: sorted_pairs = sorted(zip(x_vals, y_vals)) x_vals, y_vals = zip(*sorted_pairs) # Plot each algorithm as a line with scattered points ax.plot(x_vals, y_vals, label=f"{algorithm}", linewidth=2) ax.scatter(x_vals, y_vals) has_data = True if min_y > min(y_vals): min_y = min(y_vals) if max_y < max(y_vals): max_y = max(y_vals) # Save plot only if there is data to display if has_data: ylabel_metric = { "RUNTIME": "Runtime [ms]", "RUNTIME_LOG": "log₁₀(Runtime [ms])", }.get(metric, metric) ax.set_title(metric) ax.set_xlabel("Rate") ax.set_ylabel(ylabel_metric) ax.set_xlim(0.0, 0.85) if metric == "RMSE" or metric == "MAE": if min_y < 0: min_y = 0 if max_y > 3: max_y = 3 elif metric == "CORRELATION": if min_y < -1: min_y = -1 if max_y > 1: max_y = 1 elif metric == "MI": if min_y < 0: min_y = 0 if max_y > 2: max_y = 2 elif metric == "RUNTIME": if min_y < 0: min_y = 0 if max_y > 10000: max_y = 10000 elif metric == "RUNTIME_LOG": if min_y < -5: min_y = -5 if max_y > 5: max_y = 5 diff = (max_y - min_y) y_padding = 0.15*diff if y_padding is None or y_padding == 0: y_padding = 1 ax.set_ylim(min_y - y_padding, max_y + y_padding) # Set y-axis limits with padding below 0 for visibility if metric == "RUNTIME": ax.set_title("Runtime (linear scale)") elif metric == "RUNTIME_LOG": ax.set_title("Runtime (log scale)") elif metric == "CORRELATION": ax.set_title("Pearson Correlation") # Customize x-axis ticks ax.set_xticks(ticks) ax.set_xticklabels([f"{int(tick * 100)}%" for tick in ticks]) ax.grid(True, zorder=0) ax.legend(loc='upper left', fontsize=7, frameon=True, fancybox=True, framealpha=0.8) if not subplot: filename = f"{dataset}_{pattern}_{optimizer}_{metric}.jpg" new_dir = save_dir + "/" + pattern os.makedirs(new_dir, exist_ok=True) filepath = os.path.join(new_dir, filename) plt.savefig(filepath) if not display: plt.close() if subplot: #plt.tight_layout() new_dir = save_dir + "/" + pattern + "/error" os.makedirs(new_dir, exist_ok=True) filename = f"{dataset}_{pattern}_metrics_subplot.jpg" filepath = os.path.join(new_dir, filename) plt.savefig(filepath) if display: plt.show() else: plt.close() self.plots = plt
[docs] def eval(self, algorithms=["cdrec"], datasets=["eeg-alcohol"], patterns=["mcar"], x_axis=[0.05, 0.1, 0.2, 0.4, 0.6, 0.8], optimizers=["default_params"], metrics=["*"], save_dir="./imputegap_assets/benchmark", runs=1, normalizer="z_score", nbr_series=2500, nbr_vals=2500, dl_ratio=0.9, verbose=False): """ Execute a comprehensive evaluation of imputation algorithms over multiple datasets and patterns. Parameters ---------- algorithms : list of str List of imputation algorithms to test. datasets : list of str List of dataset names to evaluate. patterns : list of str List of contamination patterns to apply. x_axis : list of float List of missing rates for contamination. optimizers : list List of optimizers with their configurations. metrics : list of str List of metrics for evaluation. save_dir : str, optional Directory to save reports and plots (default is "./reports"). runs : int, optional Number of executions with a view to averaging them normalizer : str, optional Normalizer to pre-process the data (default is "z_score"). nbr_series : int, optional Number of series to take inside the dataset (default is 2500 (as the max values)). nbr_vals : int, optional Number of values to take inside the series (default is 2500 (as the max values)). dl_ratio : float, optional Training ratio for Deep Learning techniques (default is 0.8) verbose : bool, optional Whether to display the contamination information (default is False). Returns ------- List List of all runs results, matrix with averaged scores and times for all levels Notes ----- Runs contamination, imputation, and evaluation, then generates plots and a summary reports. """ os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' run_storage = [] not_optimized = ["none"] mean_group = ["mean", "MeanImpute", "min", "MinImpute", "zero", "ZeroImpute", "MeanImputeBySeries", "meanimpute", "minimpute", "zeroimpute", "meanimputebyseries"] if not isinstance(algorithms, list): raise TypeError(f"'algorithms' must be a list, but got {type(algorithms).__name__}") if not isinstance(datasets, list): raise TypeError(f"'datasets' must be a list, but got {type(datasets).__name__}") if not isinstance(patterns, list): raise TypeError(f"'patterns' must be a list, but got {type(patterns).__name__}") if not isinstance(x_axis, list): raise TypeError(f"'x_axis' must be a list, but got {type(x_axis).__name__}") if "*" in metrics or "all" in metrics: metrics = utils.list_of_metrics() if "*" in metrics or "all" in algorithms: all_algs = utils.list_of_algorithms() algorithms = [item for item in all_algs if item.upper() != "MPIN"] directory_now = datetime.datetime.now() directory_time = directory_now.strftime("%y_%m_%d_%H_%M_%S") save_dir = save_dir + "/" + "bench_" + directory_time benchmark_time = time.time() for i_run in range(0, abs(runs)): for dataset in datasets: runs_plots_scores = {} block_size_mcar = 10 y_p_size = max(4, len(algorithms)*0.275) if verbose: print("\n1. evaluation launch for", dataset, "\n") ts_test = TimeSeries() default_data = TimeSeries() header = False if dataset == "eeg-reading": header = True reshp = False default_data.load_series(data=utils.search_path(dataset), header=header, verbose=False) Mdef, Ndef = default_data.data.shape if Ndef > nbr_vals or Mdef > nbr_series: reshp = True print(f"\nThe dataset contains a large number of values {default_data.data.shape}, which may be too much for some algorithms to handle efficiently. Consider reducing the number of series or the volume of data.") default_data = None ts_test.load_series(data=utils.search_path(dataset), nbr_series=nbr_series, nbr_val=nbr_vals, header=header) M, N = ts_test.data.shape if reshp: print(f"Benchmarking module has reduced the shape to {ts_test.data.shape}.\n") if N < 250: print(f"The block size is too high for the number of values per series, reduce to 2\n") block_size_mcar = 2 if normalizer in utils.list_of_normalizers(): ts_test.normalize(verbose=verbose) for pattern in patterns: if verbose: print("\n2. contamination of", dataset, "with pattern", pattern, "\n") for algorithm in algorithms: has_been_optimized = False if verbose: print("\n3. algorithm evaluated", algorithm, "with", pattern, "\n") else: print(f"{algorithm} is tested with {pattern}, started at {time.strftime('%Y-%m-%d %H:%M:%S')}.") for incx, x in enumerate(x_axis): if verbose: print("\n4. missing values (series&values) set to", x, "for x_axis\n") incomp_data = utils.config_contamination(ts=ts_test, pattern=pattern, dataset_rate=x, series_rate=x, block_size=block_size_mcar, verbose=verbose) for optimizer in optimizers: algo = utils.config_impute_algorithm(incomp_data=incomp_data, algorithm=algorithm, verbose=verbose) if isinstance(optimizer, dict): optimizer_gt = {"input_data": ts_test.data, **optimizer} optimizer_value = optimizer.get('optimizer') # or optimizer['optimizer'] if not has_been_optimized and algorithm not in mean_group and algorithm not in not_optimized: if verbose: print("\n5. AutoML to set the parameters", optimizer, "\n") i_opti = self._config_optimization(0.20, ts_test, pattern, algorithm, block_size_mcar) if utils.check_family("DeepLearning", algorithm): i_opti.impute(user_def=False, params=optimizer_gt, tr_ratio=0.80) else: i_opti.impute(user_def=False, params=optimizer_gt) utils.save_optimization(optimal_params=i_opti.parameters, algorithm=algorithm, dataset=dataset, optimizer="e") has_been_optimized = True else: if verbose: print("\n5. AutoML already optimized...\n") if algorithm not in mean_group and algorithm not in not_optimized: if i_opti.parameters is None: opti_params = utils.load_parameters(query="optimal", algorithm=algorithm, dataset=dataset, optimizer="e") if verbose: print("\n6. imputation", algorithm, "with optimal parameters from files", *opti_params) else: opti_params = i_opti.parameters if verbose: print("\n6. imputation", algorithm, "with optimal parameters from object", *opti_params) else: if verbose: print("\n5. No AutoML launches without optimal params for", algorithm, "\n") opti_params = None else: if verbose: print("\n5. Default parameters have been set the parameters", optimizer, "for", algorithm, "\n") optimizer_value = optimizer opti_params = None start_time_imputation = time.time() if not self._benchmark_exception(incomp_data, algorithm, pattern, x): if utils.check_family("DeepLearning", algorithm) or utils.check_family("LLMs", algorithm): if x > round(1-dl_ratio, 2): algo.recov_data = incomp_data else: algo.impute(params=opti_params, tr_ratio=dl_ratio) else: algo.impute(params=opti_params) else: algo.recov_data = incomp_data end_time_imputation = time.time() algo.score(input_data=ts_test.data, recov_data=algo.recov_data, verbose=False) if "*" not in metrics and "all" not in metrics: algo.metrics = {k: algo.metrics[k] for k in metrics if k in algo.metrics} time_imputation = (end_time_imputation - start_time_imputation) * 1000 if time_imputation < 1: time_imputation = 1 log_time_imputation = math.log10(time_imputation) if time_imputation > 0 else None algo.metrics["RUNTIME"] = time_imputation algo.metrics["RUNTIME_LOG"] = log_time_imputation dataset_s = dataset if "-" in dataset: dataset_s = dataset.replace("-", "") save_dir_plot = save_dir + "/" + dataset_s + "/" + pattern + "/recovery/" cont_rate = int(x*100) ts_test.plot(input_data=ts_test.data, incomp_data=incomp_data, recov_data=algo.recov_data, nbr_series=3, subplot=True, algorithm=algo.algorithm, cont_rate=str(cont_rate), display=False, save_path=save_dir_plot, verbose=False) runs_plots_scores.setdefault(str(dataset_s), {}).setdefault(str(pattern), {}).setdefault(str(algorithm), {}).setdefault(str(optimizer_value), {})[str(x)] = {"scores": algo.metrics} print(f"done!\n\n") #save_dir_runs = save_dir + "/_details/run_" + str(i_run) + "/" + dataset #if verbose: # print("\nruns saved in : ", save_dir_runs) #self.generate_plots(runs_plots_scores=runs_plots_scores, ticks=x_axis, metrics=metrics, subplot=True, y_size=y_p_size, save_dir=save_dir_runs, display=False, verbose=verbose) #self.generate_plots(runs_plots_scores=runs_plots_scores, ticks=x_axis, metrics=metrics, subplot=False, y_size=y_p_size, save_dir=save_dir_runs, display=False, verbose=verbose) #self.generate_reports_txt(runs_plots_scores=runs_plots_scores, save_dir=save_dir_runs, dataset=dataset, metrics=metrics, run=i_run, verbose=verbose) #self.generate_reports_excel(runs_plots_scores, save_dir_runs, dataset, i_run, verbose=verbose) run_storage.append(runs_plots_scores) plt.close('all') # Close all open figures for x, m in enumerate(reversed(metrics)): #tag = True if x == (len(metrics)-1) else False scores_list, algos, sets = self.avg_results(*run_storage, metric=m) _ = self.generate_heatmap(scores_list=scores_list, algos=algos, sets=sets, metric=m, save_dir=save_dir, display=False) run_averaged = self.average_runs_by_names(run_storage) benchmark_end = time.time() total_time_benchmark = round(benchmark_end - benchmark_time, 4) print(f"\n> logs: benchmark - Execution Time: {total_time_benchmark} seconds\n") verb = True for scores in run_averaged: all_keys = list(scores.keys()) dataset_name = str(all_keys[0]) save_dir_agg_set = save_dir + "/" + dataset_name self.generate_reports_txt(runs_plots_scores=scores, save_dir=save_dir_agg_set, dataset=dataset_name, metrics=metrics, rt=total_time_benchmark, run=-1) self.generate_plots(runs_plots_scores=scores, ticks=x_axis, metrics=metrics, subplot=True, y_size=y_p_size, save_dir=save_dir_agg_set, display=verb) print("\nThe results are saved in : ", save_dir, "\n") self.list_results = run_averaged self.aggregate_results = scores_list