import datetime
import os
import math
import time
from collections import defaultdict
import numpy as np
import matplotlib.pyplot as plt
from imputegap.tools import utils
from imputegap.recovery.manager import TimeSeries
import itertools
[docs]
class Benchmark:
"""
A class to evaluate the performance of imputation algorithms through benchmarking across datasets and patterns.
Methods
-------
average_runs_by_names(self, data):
Average the results of all runs depending on the dataset.
avg_results():
Calculate average metrics (e.g., RMSE) across multiple datasets and algorithm runs.
generate_heatmap():
Generate and save a heatmap visualization of RMSE scores for datasets and algorithms.
generate_reports_txt():
Create detailed text-based reports summarizing metrics and timing results for all evaluations.
generate_reports_excel():
Create detailed excel-based reports summarizing metrics and timing results for all evaluations.
generate_plots():
Visualize metrics (e.g., RMSE, MAE) and timing (e.g., imputation, optimization) across patterns and datasets.
eval():
Perform a complete benchmarking pipeline, including contamination, imputation, evaluation, and reporting.
Example
-------
output : {'eegalcohol': {'mcar': {'MeanImpute': {'default_params': {'0.05': {'scores': {'RMSE': 1.107394798606378, 'MAE': 0.9036474830477748, 'CORRELATION': nan, 'RUNTIME': 10.07390022277832, 'RUNTIME_LOG': 1.00319764506136}}, '0.1': {'scores': {'RMSE': 0.8569349076796438, 'MAE': 0.6416542359734557, 'CORRELATION': nan, 'RUNTIME': 1.0, 'RUNTIME_LOG': 0.0}}, '0.2': {'scores': {'RMSE': 0.9924113085421721, 'MAE': 0.7939689811173046, 'CORRELATION': nan, 'RUNTIME': 1.0, 'RUNTIME_LOG': 0.0}}, '0.4': {'scores': {'RMSE': 1.0058063455061463, 'MAE': 0.8076546785476064, 'CORRELATION': nan, 'RUNTIME': 1.0, 'RUNTIME_LOG': 0.0}}, '0.6': {'scores': {'RMSE': 0.9891809506243663, 'MAE': 0.7914550709031675, 'CORRELATION': nan, 'RUNTIME': 1.0, 'RUNTIME_LOG': 0.0}}, '0.8': {'scores': {'RMSE': 0.9927953862507292, 'MAE': 0.7925635744718286, 'CORRELATION': nan, 'RUNTIME': 1.0, 'RUNTIME_LOG': 0.0}}}}, 'SoftImpute': {'default_params': {'0.05': {'scores': {'RMSE': 0.4359915238078244, 'MAE': 0.3725965559420608, 'CORRELATION': 0.9530448037164908, 'RUNTIME': 199.30577278137207, 'RUNTIME_LOG': 2.2995198779819055}}, '0.1': {'scores': {'RMSE': 0.3665001858394363, 'MAE': 0.2989983612840734, 'CORRELATION': 0.9049909722894052, 'RUNTIME': 117.54822731018066, 'RUNTIME_LOG': 2.0702160841184516}}, '0.2': {'scores': {'RMSE': 0.39833006221984, 'MAE': 0.30824644022807457, 'CORRELATION': 0.9161465703422209, 'RUNTIME': 317.5652027130127, 'RUNTIME_LOG': 2.5018329084349737}}, '0.4': {'scores': {'RMSE': 0.435591016228979, 'MAE': 0.3335144215651955, 'CORRELATION': 0.9021032587324183, 'RUNTIME': 302.2916316986084, 'RUNTIME_LOG': 2.4804261248244566}}, '0.6': {'scores': {'RMSE': 0.4500113661547204, 'MAE': 0.338085865703361, 'CORRELATION': 0.8893263437029546, 'RUNTIME': 314.93282318115234, 'RUNTIME_LOG': 2.498217926383076}}, '0.8': {'scores': {'RMSE': 0.46554422402146944, 'MAE': 0.3508926604243284, 'CORRELATION': 0.8791443563129441, 'RUNTIME': 311.9697570800781, 'RUNTIME_LOG': 2.4941124947560986}}}}}}}
"""
def __init__(self):
"""
Initialize the Benchmark object.
"""
self.list_results = None
self.aggregate_results = None
self.heatmap = None
self.subplots = None
def _benchmark_exception(self, data, algorithm, pattern, x, N, F):
"""
Check whether a specific algorithm-pattern combination should be excluded from benchmarking.
This function flags exceptions where benchmarking is not appropriate or known to fail,
based on the algorithm name, the missingness pattern, and the missingness rate `x`.
Parameters
----------
data : str
Dataset used
algorithm : str
Name of the imputation algorithm (e.g., 'DEEPMVI', 'PRISTI').
pattern : str
Missing data pattern (e.g., 'MCAR', 'ALIGNED').
x : float
Proportion of missing values in the data (between 0 and 1).
N: int
Number of values
F : int
Number of series
Returns
-------
bool
True if the benchmark should be skipped for the given configuration, False otherwise.
Rules
-----
- For DeepMVI with MCAR pattern and x > 0.6, skip benchmarking.
- For PRISTI, always skip benchmarking.
"""
#if M < 5 or N < 5:
# print(f"\n(BENCH) The imputation algorithm {algorithm} has not enough data to proceed ({M}, {N})")
return False
if algorithm.upper() == 'DEEPMVI' or algorithm.upper() == 'DEEP_MVI':
if pattern.lower() == "mcar" or pattern.lower() == "missing_completely_at_random":
if x > 0.6:
print(f"\n(BENCH) The imputation algorithm {algorithm} is not compatible with this configuration {pattern} with missingness rate more than 0.6.")
return True
if pattern.lower() == "mp" or pattern.lower() == "aligned":
if x < 0.15:
print(f"\n(BENCH) The imputation algorithm {algorithm} is not compatible with this configuration {pattern} with missingness rate less then 0.15.")
return True
if data == "meteo":
return True
if data == "meteo":
if x >= 0.8:
print(f"\n(BENCH) The imputation algorithm {algorithm} is not compatible with this configuration {data}. Not enough series to train the model.")
return True
return False
def _config_optimization(self, opti_mean, ts_test, pattern, algorithm, block_size_mcar):
"""
Configure and execute optimization for selected imputation algorithm and pattern.
Parameters
----------
opti_mean : float
Mean parameter for contamination.
ts_test : TimeSeries
TimeSeries object containing dataset.
pattern : str
Type of contamination pattern (e.g., "mcar", "mp", "blackout", "disjoint", "overlap", "gaussian").
algorithm : str
Imputation algorithm to use.
block_size_mcar : int
Size of blocks removed in MCAR
Returns
-------
BaseImputer
Configured imputer instance with optimal parameters.
"""
incomp_data = utils.config_contamination(ts=ts_test, pattern=pattern, dataset_rate=opti_mean, series_rate=opti_mean, block_size=block_size_mcar)
imputer = utils.config_impute_algorithm(incomp_data=incomp_data, algorithm=algorithm)
return imputer
[docs]
def average_runs_by_names(self, data):
"""
Average the results of all runs depending on the dataset
Parameters
----------
data : list
list of dictionary containing the results of the benchmark runs.
Returns
-------
list
list of dictionary containing the results of the benchmark runs averaged by datasets.
"""
results_avg, all_names = [], []
# Extract dataset names
for dictionary in data:
all_keys = list(dictionary.keys())
dataset_name = all_keys[0]
all_names.append(dataset_name)
# Get unique dataset names
unique_names = sorted(set(all_names))
# Initialize and populate the split matrix
split = [[0 for _ in range(all_names.count(name))] for name in unique_names]
for i, name in enumerate(unique_names):
x = 0
for y, match in enumerate(all_names):
if name == match:
split[i][x] = data[y]
x += 1
# Iterate over the split matrix to calculate averages
for datasets in split:
tmp = [dataset for dataset in datasets if dataset != 0]
merged_dict = {}
count = len(tmp)
# Process and calculate averages
for dataset in tmp:
for outer_key, outer_value in dataset.items():
for middle_key, middle_value in outer_value.items():
for mean_key, mean_value in middle_value.items():
for method_key, method_value in mean_value.items():
for level_key, level_value in method_value.items():
# Initialize scores and times if not already initialized
merger = merged_dict.setdefault(outer_key, {}
).setdefault(middle_key, {}).setdefault(mean_key, {}
).setdefault(
method_key, {}).setdefault(level_key, {"scores": {}})
# Add scores and times
for score_key, v in level_value["scores"].items():
if v is None :
v = 0
merger["scores"][score_key] = (merger["scores"].get(score_key, 0) + v / count)
results_avg.append(merged_dict)
return results_avg
[docs]
def avg_results(self, *datasets, metric="RMSE"):
"""
Calculate the average of all metrics and times across multiple datasets.
Parameters
----------
datasets : dict
Multiple dataset dictionaries to be averaged.
metric : str
Metric to group.
Returns
-------
List
Matrix with averaged scores and times for all levels, list of algorithms, list of datasets
"""
# Step 1: Compute average RMSE across runs for each dataset and algorithm
aggregated_data = {}
for runs in datasets:
for dataset, dataset_items in runs.items():
if dataset not in aggregated_data:
aggregated_data[dataset] = {}
for pattern, pattern_items in dataset_items.items():
for algo, algo_data in pattern_items.items():
if algo not in aggregated_data[dataset]:
aggregated_data[dataset][algo] = []
for missing_values, missing_values_item in algo_data.items():
for param, param_data in missing_values_item.items():
rmse = param_data["scores"][metric]
aggregated_data[dataset][algo].append(rmse)
# Step 2: Compute averages using NumPy
average_rmse_matrix = {}
for dataset, algos in aggregated_data.items():
average_rmse_matrix[dataset] = {}
for algo, rmse_values in algos.items():
rmse_array = np.array(rmse_values)
avg_rmse = np.mean(rmse_array)
average_rmse_matrix[dataset][algo] = avg_rmse
# Step 3: Create a matrix representation of datasets and algorithms
datasets_list = list(average_rmse_matrix.keys())
algorithms = {algo for algos in average_rmse_matrix.values() for algo in algos}
algorithms_list = sorted(algorithms)
# Prepare a NumPy matrix
comprehensive_matrix = np.zeros((len(datasets_list), len(algorithms_list)))
for i, dataset in enumerate(datasets_list):
for j, algo in enumerate(algorithms_list):
comprehensive_matrix[i, j] = average_rmse_matrix[dataset].get(algo, np.nan)
return comprehensive_matrix, algorithms_list, datasets_list
[docs]
def generate_heatmap(self, scores_list, algos, sets, metric="RMSE", save_dir="./reports", display=True):
"""
Generate and save RMSE matrix in HD quality.
Parameters
----------
scores_list : np.ndarray
2D numpy array containing RMSE values.
algos : list of str
List of algorithm names (columns of the heatmap).
sets : list of str
List of dataset names (rows of the heatmap).
metric : str, optional
metric to extract
save_dir : str, optional
Directory to save the generated plot (default is "./reports").
display : bool, optional
Display or not the plot
Returns
-------
Bool
True if the matrix has been generated
"""
save_dir = save_dir + "/_heatmaps/"
if not os.path.exists(save_dir):
os.makedirs(save_dir)
nbr_algorithms = len(algos)
nbr_datasets= len(sets)
cell_size = 4.0
x_size = cell_size*nbr_algorithms
y_size = cell_size*nbr_datasets
fig, ax = plt.subplots(figsize=(x_size, y_size))
fig.canvas.manager.set_window_title("benchmark heatmap, " + metric)
import matplotlib.colors as mcolors
cmap = mcolors.LinearSegmentedColormap.from_list(f"trunc({plt.cm.Greys.name},{0.3:.2f},{0.9:.2f})", plt.cm.Greys(np.linspace(0.3, 0.9, 256)))
norm_ranges = {"RMSE": (0, 2), "CORRELATION": (-2, 2), "MAE": (0, 1.5), "MI": (-1, 1.5), "runtime": (0, 5000), "runtime_log": (-2, 10), }
key = metric if metric in norm_ranges else metric.lower()
vmin, vmax = norm_ranges.get(key, (0, 2000))
norm = plt.Normalize(vmin=vmin, vmax=vmax)
# Create the heatmap
heatmap = ax.imshow(scores_list, cmap=cmap, norm=norm, aspect='auto')
# Add color bar for reference
cbar = plt.colorbar(heatmap, ax=ax, orientation='vertical')
cbar.set_label(metric, rotation=270, labelpad=15)
# Set the tick labels
ax.set_xticks(np.arange(nbr_algorithms))
ax.set_xticklabels(algos)
ax.set_yticks(np.arange(nbr_datasets))
ax.set_yticklabels(sets)
# Add titles and labels
ax.set_title('ImputeGAP Algorithms Comparison')
ax.set_xlabel('Algorithms')
ax.set_ylabel('Datasets')
# Show values on the heatmap
for i in range(len(sets)):
for j in range(len(algos)):
ax.text(j, i, f"{scores_list[i, j]:.2f}",
ha='center', va='center',
color="black" if scores_list[i, j] < 1 else "white") # for visibility
filename = "benchmarking_"+ metric.lower()+ ".jpg"
filepath = os.path.join(save_dir, filename)
plt.savefig(filepath, dpi=300, bbox_inches='tight') # Save in HD with tight layout
# Show the plot
if display:
plt.tight_layout()
plt.show()
else:
plt.close()
return True
[docs]
def generate_reports_summary(self, run_of_values, save_dir="./reports", dataset="", metrics=["RMSE"], run=-1, rt=0, title="", verbose=True):
"""
Generate and save a text report of metrics and timing for each dataset, algorithm, and pattern for the whole experiment.
Parameters
----------
run_of_values : dict
Dictionary containing scores and timing information for each dataset, pattern, and algorithm.
save_dir : str, optional
Directory to save the reports file (default is "./reports").
dataset : str, optional
Name of the data for the report name.
metrics : str, optional
List of metrics asked for in the report.
run : int, optional
Number of the run.
rt : float, optional
Total time of the run.
title : str, optional
Title of the report (default is "").
verbose : bool, optional
Whether to display the contamination information (default is True).
Returns
-------
None
Notes
-----
The report is saved in a "report.txt" file in `save_dir`, organized in sections with headers and results.
"""
os.makedirs(save_dir, exist_ok=True)
metric_unit = "ms"
if "RMSE" not in metrics:
to_call = [metrics[0], "RUNTIME"]
else:
to_call = ["RMSE", "RUNTIME"]
new_metrics = np.copy(metrics)
if metrics is None:
new_metrics = utils.list_of_metrics()
else:
if "RUNTIME" not in new_metrics:
new_metrics = np.append(new_metrics, "RUNTIME")
if "RUNTIME_LOG" not in new_metrics:
new_metrics = np.append(new_metrics, "RUNTIME_LOG")
opt = None
all_patterns = set()
patterns_to_algos = defaultdict(set)
for scores in run_of_values:
for dataset, patterns_items in scores.items():
for pattern, algorithm_items in patterns_items.items():
all_patterns.add(pattern)
for algorithm, optimizer_items in algorithm_items.items():
patterns_to_algos[pattern].add(algorithm)
if opt is None:
# grab the first optimizer name we see
for optimizer in optimizer_items.keys():
opt = optimizer
break
# 2) open the report ONCE (not inside any dataset loop)
os.makedirs(save_dir, exist_ok=True)
title_report = "report_" + title + ".log"
save_path = os.path.join(save_dir, title_report)
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(save_path, "w") as file:
file.write(f"Generated on: {current_time}\n")
file.write(f"Total runtime: {rt} (ms)\n")
if run >= 0:
file.write(f"Run number: {run}\n")
file.write("=" * 120 + "\n\n")
# 3) one big table per (pattern, metric) including ALL datasets
for pattern in sorted(all_patterns):
algos = sorted(patterns_to_algos[pattern])
for metric in new_metrics:
# nice title (avoid Python set display)
if metric == "RUNTIME":
title = "{" + f"{pattern}, {metric}[{metric_unit}], {opt}" + "}"
else:
title = "{" + f"{pattern}, {metric}, {opt}" + "}"
file.write(title + "\n")
# build rows across ALL datasets in runs_plots_scores
row_map = defaultdict(dict) # (dataset, rate) -> {algo: score_str}
for scores in run_of_values:
for dataset, patterns_items in scores.items():
if pattern not in patterns_items:
continue
for algorithm, optimizer_items in patterns_items[pattern].items():
for optimizer, x_data_items in optimizer_items.items():
for rate, payload in x_data_items.items():
val = payload.get("scores", {}).get(metric, None)
if val is not None:
row_map[(dataset, rate)][algorithm] = f"{val:.10f}"
if not row_map:
file.write("[no results]\n\n")
continue
# headers & widths
headers = ["Dataset", "Rate"] + list(algos)
ds_width = max(12, max((len(ds) for ds, _ in row_map.keys()), default=0) + 2)
rate_width = max(6, max((len(str(r)) for _, r in row_map.keys()), default=0) + 2)
algo_width = 18
col_widths = [ds_width, rate_width] + [algo_width] * len(algos)
def fmt_row(vals):
return "".join(f" {str(v):^{w}} " for v, w in zip(vals, col_widths))
header_row = fmt_row(headers)
sep_row = "-" * len(header_row)
file.write(sep_row + "\n")
file.write(header_row + "\n")
file.write(sep_row + "\n")
if verbose and metric in to_call:
print("\n" + title)
print(sep_row)
print(header_row)
print(sep_row)
def row_key(k):
ds, rate = k
# sort numerics first by value, then non-numerics by string;
# within the same rate, sort by dataset name
try:
rf = float(rate)
return (0, rf, ds)
except Exception:
return (1, str(rate), ds)
for key in sorted(row_map.keys(), key=row_key):
ds, rate = key
row_vals = [ds, rate] + [row_map[key].get(a, "") for a in algos]
line = fmt_row(row_vals)
file.write(line + "\n")
if verbose and metric in to_call:
print(line)
file.write(sep_row + "\n\n")
if verbose and metric in to_call:
print(sep_row + "\n")
# optional: dump raw dict(s)
file.write("Dictionary of Results:\n")
file.write(str(run_of_values) + "\n")
[docs]
def generate_reports_txt(self, runs_plots_scores, save_dir="./reports", dataset="", metrics=["RMSE"], run=-1, rt=0, verbose=True):
"""
Generate and save a text report of metrics and timing for each dataset, algorithm, and pattern.
Parameters
----------
runs_plots_scores : dict
Dictionary containing scores and timing information for each dataset, pattern, and algorithm.
save_dir : str, optional
Directory to save the reports file (default is "./reports").
dataset : str, optional
Name of the data for the report name.
metrics : str, optional
List of metrics asked for in the report.
run : int, optional
Number of the run.
rt : float, optional
Total time of the run.
verbose : bool, optional
Whether to display the contamination information (default is True).
Returns
-------
None
Notes
-----
The report is saved in a "report.txt" file in `save_dir`, organized in sections with headers and results.
"""
os.makedirs(save_dir, exist_ok=True)
metric_unit = "ms"
if "RMSE" not in metrics:
to_call = [metrics[0], "RUNTIME"]
else:
to_call = ["RMSE", "RUNTIME"]
new_metrics = np.copy(metrics)
if metrics is None:
new_metrics = utils.list_of_metrics()
else:
if "RUNTIME" not in new_metrics:
new_metrics = np.append(new_metrics, "RUNTIME")
if "RUNTIME_LOG" not in new_metrics:
new_metrics = np.append(new_metrics, "RUNTIME_LOG")
opt = None
for dataset, patterns_items in runs_plots_scores.items():
for pattern, algorithm_items in patterns_items.items():
for algorithm, optimizer_items in algorithm_items.items():
for optimizer, x_data_items in optimizer_items.items():
opt = optimizer
break
list_of_patterns = []
for dataset, patterns_items in runs_plots_scores.items():
for pattern, algorithm_items in patterns_items.items():
list_of_patterns.append(pattern)
new_dir = save_dir + "/" + pattern.lower() + "/error"
os.makedirs(new_dir, exist_ok=True)
save_path = os.path.join(new_dir, f"report_{pattern}_{dataset}.txt")
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(save_path, "w") as file:
file.write(f"Report for Dataset: {dataset}\n")
file.write(f"Generated on: {current_time}\n")
file.write(f"Total runtime: {rt} (ms)\n")
file.write(f"Run number: {run}\n")
file.write("=" * 120 + "\n\n")
for metric in new_metrics:
if metric == "RUNTIME":
file.write(f"\n{dataset}: {{{pattern}, {metric}[{metric_unit}], {opt}}}")
else:
file.write(f"\n{dataset}: {{{pattern}, {metric}, {opt}}}")
# Collect all algorithms and scores by rate
rate_to_scores = defaultdict(dict)
all_algorithms = set()
for algorithm, optimizer_items in algorithm_items.items():
for optimizer, x_data_items in optimizer_items.items():
for x, values in x_data_items.items():
score = values.get("scores", {}).get(metric, None)
if score is not None:
rate_to_scores[x][algorithm] = f"{score:.10f}"
all_algorithms.add(algorithm)
all_algorithms = sorted(all_algorithms)
headers = ["Rate"] + list(all_algorithms)
column_widths = [5] + [18] * len(all_algorithms)
# Header and separator rows
header_row = "".join(f" {header:^{width}} " for header, width in zip(headers, column_widths))
separator_row = "" + "".join(f"{'' * (width + 2)}" for width in column_widths) + ""
file.write(f"{separator_row}\n")
file.write(f"{header_row}\n")
file.write(f"{separator_row}\n")
if metric in to_call and verbose:
if metric == "RUNTIME":
print(f"\n{dataset}: {{{pattern}, {metric}[{metric_unit}], {opt}}}")
else:
print(f"\n{dataset}: {{{pattern}, {metric}, {opt}}}")
print(separator_row)
print(f"{header_row}")
print(separator_row)
# Write each row
for rate in sorted(rate_to_scores.keys()):
row_values = [rate] + [rate_to_scores[rate].get(algo, "") for algo in all_algorithms]
row = "".join(f" {val:^{width}} " for val, width in zip(row_values, column_widths))
file.write(f"{row}\n")
if metric in to_call and verbose:
print(f"{row}")
file.write(f"{separator_row}\n\n")
if metric in to_call and verbose:
print(separator_row + "\n")
file.write("Dictionary of Results:\n")
file.write(str(runs_plots_scores) + "\n")
"""
def generate_reports_excel(self, runs_plots_scores, save_dir="./reports", dataset="", run=-1, verbose=True):
os.makedirs(save_dir, exist_ok=True)
save_path = os.path.join(save_dir, f"report_{dataset}.xlsx")
# Create an Excel workbook
workbook = xlsxwriter.Workbook(save_path)
# Add a summary sheet with the header, creation date, dictionary content, and links to other sheets
summary_sheet = workbook.add_worksheet("Summary")
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
summary_sheet.set_column(0, 1, 50)
# Title and header
summary_sheet.write(0, 0, "ImputeGAP, A library of Imputation Techniques for Time Series Data")
summary_sheet.write(2, 0, "Report for Dataset")
summary_sheet.write(2, 1, dataset)
summary_sheet.write(3, 0, "Generated on")
summary_sheet.write(3, 1, current_time)
if run >= 0:
summary_sheet.write(4, 0, "Run Number")
summary_sheet.write(4, 1, run)
# Add links to metric sheets
row = 6
summary_sheet.write(row, 0, "Metric Sheets:")
row += 1
metrics = {
"RMSE": "Root Mean Square Error - Measures the average magnitude of error.",
"MAE": "Mean Absolute Error - Measures the average absolute error.",
"MI": "Mutual Information - Indicates dependency between variables.",
"CORRELATION": "Correlation Coefficient - Indicates linear relationship between variables."
}
for metric in metrics.keys():
summary_sheet.write_url(row, 0, f"internal:'{metric}'!A1", string=f"Go to {metric} Sheet")
row += 1
# Write the dictionary content
summary_sheet.write(row + 1, 0, "Dictionary of Results")
row += 2
for key, value in runs_plots_scores.items():
summary_sheet.write(row, 0, str(key))
summary_sheet.write(row, 1, str(value))
row += 1
for metric, description in metrics.items():
# Create a worksheet for each metric
worksheet = workbook.add_worksheet(metric)
# Write the metric description at the top and add IMPUTEGAP header
worksheet.write(0, 0, "ImputeGAP, A library of Imputation Techniques for Time Series Data")
worksheet.write(2, 0, f"{metric}: {description}")
# Define consistent column headers and widths
headers = ["Dataset", "Algorithm", "Optimizer", "Pattern", "X Value", metric]
column_widths = [15, 15, 15, 15, 12, 20] # Adjust widths for Excel
# Write the headers
for col, (header, width) in enumerate(zip(headers, column_widths)):
worksheet.set_column(col, col, width)
worksheet.write(3, col, header)
# Populate the data
row = 4
for dataset, algo_items in runs_plots_scores.items():
for algorithm, optimizer_items in algo_items.items():
for optimizer, pattern_data in optimizer_items.items():
for pattern, x_data_items in pattern_data.items():
for x, values in x_data_items.items():
value = values.get("scores", {}).get(metric, None)
if value is not None:
value = f"{value:.10f}"
data = [dataset, algorithm, optimizer, pattern, str(x), value]
for col, cell_value in enumerate(data):
worksheet.write(row, col, cell_value)
row += 1
# Close the workbook
workbook.close()
"""
[docs]
def generate_plots(self, runs_plots_scores, ticks, metrics=None, subplot=False, y_size=8, title=None, save_dir="./reports",display=False, verbose=True):
"""
Generate and save plots for each metric and pattern based on provided scores.
Parameters
----------
runs_plots_scores : dict
Dictionary containing scores and timing information for each dataset, pattern, and algorithm.
ticks : list of float
List of missing rates for contamination.
metrics : list of string
List of metrics used
subplot : bool, optional
If True, generates a single figure with subplots for all metrics (default is False).
y_size : int, optional
Default size of the graph (default is 4).
title : str, optional
Title of the graph (default is "imputegap benchmark").
save_dir : str, optional
Directory to save generated plots (default is "./reports").
display : bool, optional
Display or not the plots (default is False).
verbose : bool, optional
Whether to display the contamination information (default is True).
Returns
-------
None
Notes
-----
Saves generated plots in `save_dir`, categorized by dataset, pattern, and metric.
"""
os.makedirs(save_dir, exist_ok=True)
markers = itertools.cycle(["o", "s", "D", "^", "v", "<", ">", "P", "X", "*", "h", "p", "8"])
marker_by_algo = {}
print("\nThe plots have been generated...\n")
new_metrics = np.copy(metrics)
new_plots = 0
if metrics is None:
new_metrics = utils.list_of_metrics()
else:
if "RUNTIME_LOG" not in new_metrics:
new_plots = new_plots+1
new_metrics = np.append(new_metrics, "RUNTIME_LOG")
nbr_metrics = len(new_metrics)
n_rows = int((len(new_metrics)+new_plots)/2)
x_size, title_flag = 16, title
if ticks and len(ticks) > 0:
tick_min = float(min(ticks))
tick_max = float(max(ticks))
else:
tick_min, tick_max = 0.0, 1.0 # fallback
x_pad = 0.025 # 5% points (because rates are in [0,1])
x_left = max(0.0, tick_min - x_pad)
x_right = min(1.0, tick_max + x_pad)
for dataset, pattern_items in runs_plots_scores.items():
for pattern, algo_items in pattern_items.items():
if subplot:
x_size = x_size * 2
y_size = y_size * round(nbr_metrics//2)
scale_factor = 0.85
x_size_screen = (1920 / 100) * scale_factor
y_size_screen = (1080 / 100) * scale_factor
if n_rows < 4:
x_size = x_size_screen
y_size = y_size_screen
ncols = 2
if nbr_metrics % 2 == 1:
ncols, n_rows, y_size = 1, (n_rows*2)-1, y_size*1.25
fig, axes = plt.subplots(nrows=n_rows, ncols=ncols, figsize=(x_size, y_size)) # Adjusted figsize
axes = axes.ravel()
fig.subplots_adjust(
left=0.04,
right=0.99,
top=0.97,
bottom=0.05,
wspace=0.095,
hspace=0.35
)
if title_flag is None:
title = dataset + " : " + pattern + ", benchmark analysis"
fig.canvas.manager.set_window_title(title)
# Iterate over each metric, generating separate plots, including new timing metrics
for i, metric in enumerate(new_metrics):
if subplot:
if i < len(axes):
ax = axes[i]
else:
break # Prevent index out of bounds if metrics exceed subplot slots
else:
plt.figure(figsize=(x_size, y_size))
ax = plt.gca()
has_data = False # Flag to check if any data is added to the plot
max_y, min_y = -99999, 99999
for algorithm, optimizer_items in algo_items.items():
x_vals = []
y_vals = []
for optimizer, x_data in optimizer_items.items():
for x, values in x_data.items():
if metric in values["scores"]:
x_vals.append(float(x))
y_vals.append(values["scores"][metric])
if x_vals and y_vals:
sorted_pairs = sorted(zip(x_vals, y_vals))
x_vals, y_vals = zip(*sorted_pairs)
if algorithm not in marker_by_algo:
marker_by_algo[algorithm] = next(markers)
m = marker_by_algo[algorithm]
# Plot each algorithm as a line with scattered points
ax.plot(x_vals, y_vals, label=f"{algorithm}", linewidth=2, marker=m, markersize=6)
ax.scatter(x_vals, y_vals, marker=m, s=35)
#ax.plot(x_vals, y_vals, label=f"{algorithm}", linewidth=2)
#ax.scatter(x_vals, y_vals)
has_data = True
if min_y > min(y_vals):
min_y = min(y_vals)
if max_y < max(y_vals):
max_y = max(y_vals)
# Save plot only if there is data to display
if has_data:
ylabel_metric = {
"RUNTIME": "Runtime [ms]",
"RUNTIME_LOG": "log₁₀(Runtime [ms])",
}.get(metric, metric)
ax.set_title(metric)
ax.set_xlabel("Rate")
ax.set_ylabel(ylabel_metric)
#ax.set_xlim(0.0, 0.85)
ax.set_xlim(x_left, x_right)
bounds = {"RMSE": (0, 3), "MAE": (0, 3), "CORRELATION": (-1, 1), "MI": (0, 2), "RUNTIME": (0, 10000), "RUNTIME_LOG": (-5, 5), }
if metric in bounds:
lo, hi = bounds[metric]
min_y = max(min_y, lo)
max_y = min(max_y, hi)
diff = (max_y - min_y)
y_padding = 0.15*diff
if y_padding is None or y_padding == 0:
y_padding = 1
ax.set_ylim(min_y - y_padding, max_y + y_padding)
# Set y-axis limits with padding below 0 for visibility
if metric == "RUNTIME":
ax.set_title("Runtime (linear scale)")
elif metric == "RUNTIME_LOG":
ax.set_title("Runtime (log scale)")
elif metric == "CORRELATION":
ax.set_title("Pearson Correlation")
# Customize x-axis ticks
ax.set_xticks(ticks)
ax.set_xticklabels([f"{int(tick * 100)}%" for tick in ticks])
ax.grid(True, zorder=0)
ax.legend(loc='upper left', fontsize=7, frameon=True, fancybox=True, framealpha=0.8, ncol=len(ax.get_legend_handles_labels()[0]))
if not subplot:
new_dir = save_dir + "/" + pattern
os.makedirs(new_dir, exist_ok=True)
filepath = os.path.join(new_dir, f"{dataset}_{pattern}_{optimizer}_{metric}.jpg")
plt.savefig(filepath)
if not display:
plt.close()
if subplot:
#plt.tight_layout()
new_dir = save_dir + "/" + pattern + "/error"
os.makedirs(new_dir, exist_ok=True)
filename = f"{dataset}_{pattern}_metrics_subplot.jpg"
filepath = os.path.join(new_dir, filename)
plt.savefig(filepath)
if display:
plt.show()
self.subplots = plt
[docs]
def eval(self, algorithms=["cdrec"], datasets=["eeg-alcohol"], patterns=["mcar"], x_axis=[0.05, 0.1, 0.2, 0.4, 0.6, 0.8], optimizer="default_params", metrics=["*"], save_dir="./imputegap_assets/benchmark", runs=1, normalizer="z_score", report_title="", nbr_series=200, nbr_vals=2000, dl_ratio=None, verbose=False):
"""
Execute a comprehensive evaluation of imputation algorithms over multiple datasets and patterns.
Parameters
----------
algorithms : list of str
List of imputation algorithms to test.
datasets : list of str
List of dataset names to evaluate.
patterns : list of str
List of contamination patterns to apply.
x_axis : list of float
List of missing rates for contamination.
optimizer : str, dict
Name of the optimizer (str) or optimizer with their configurations (dict).
metrics : list of str
List of metrics for evaluation.
save_dir : str, optional
Directory to save reports and plots (default is "./reports").
runs : int, optional
Number of executions with a view to averaging them
normalizer : str, optional
Normalizer to pre-process the data (default is "z_score").
report_title : str, optional
Title of the report (default is "").
nbr_series : int, optional
Number of series to take inside the dataset (default is 200 (as the max values)). Set to None to remove the limitation.
nbr_vals : int, optional
Number of values to take inside the series (default is 2500 (as the max values)). Set to None to remove the limitation.
dl_ratio : float, optional
Training ratio for Deep Learning techniques (default is 0.8)
verbose : bool, optional
Whether to display the contamination information (default is False).
Returns
-------
List
List of all runs results, matrix with averaged scores and times for all levels
Notes
-----
Runs contamination, imputation, and evaluation, then generates plots and a summary reports.
"""
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
run_storage = []
not_optimized = ["none"]
mean_group = ["mean", "MeanImpute", "min", "MinImpute", "zero", "ZeroImpute", "MeanImputeBySeries", "meanimpute", "minimpute", "zeroimpute", "meanimputebyseries"]
if optimizer is None:
optimizer = "default_params"
if not isinstance(algorithms, list):
raise TypeError(f"'algorithms' must be a list, but got {type(algorithms).__name__}")
if not isinstance(datasets, list):
raise TypeError(f"'datasets' must be a list, but got {type(datasets).__name__}")
if not isinstance(patterns, list):
raise TypeError(f"'patterns' must be a list, but got {type(patterns).__name__}")
if not isinstance(x_axis, list):
raise TypeError(f"'x_axis' must be a list, but got {type(x_axis).__name__}")
if not isinstance(optimizer, str) and not isinstance(optimizer, dict):
raise TypeError(f"'optimizer' must be a str or dict, but got {type(optimizer).__name__}")
if "*" in metrics or "all" in metrics:
metrics = utils.list_of_metrics()
if "*" in metrics or "all" in algorithms:
algorithms = utils.list_of_algorithms()
if "default" in optimizer and isinstance(optimizer, str):
optimizer = "default_params"
directory_now = datetime.datetime.now()
directory_time = directory_now.strftime("%y_%m_%d_%H_%M_%S")
save_dir = save_dir + "/" + "benchmark_" + report_title + "_" + directory_time
if nbr_series is None:
nbr_series = 10000000
if nbr_vals is None:
nbr_vals = 10000000
benchmark_time = time.time()
definition_of_exp = f"\nThe benchmark has been called:\n\talgorithms: {algorithms}\n\tdatasets: {datasets}\n\tpatterns: {patterns}\n\tmissing_percentages: {x_axis}\n\toptimizer: {optimizer}\n\tnormalizer: {normalizer}\n\truns: {runs}\n\tnumber max series: {nbr_series}\n\tnumber max values: {nbr_vals}\n\n"
print(definition_of_exp)
for i_run in range(0, abs(runs)):
for dataset in datasets:
runs_plots_scores = {}
block_size_mcar = 10
y_p_size = max(4, len(algorithms)*0.275)
if verbose:
print("\n1. evaluation launch for", dataset, "\n")
ts_test = TimeSeries(verbose=False)
default_data = TimeSeries(verbose=False)
header = False
if dataset == "eeg-reading" or dataset == "eegreading":
header = True
reshp = False
default_data.load_series(data=utils.search_path(dataset), header=header, verbose=False)
Ndef, Mdef = default_data.data.shape
if Ndef > nbr_vals or Mdef > nbr_series:
reshp = True
print(f"\nThe dataset {dataset} contains a large number of values {default_data.data.shape}, which may be too much for some algorithms to handle efficiently. Consider reducing the number of series or the volume of data.")
default_data = None
ts_test.load_series(data=utils.search_path(dataset), nbr_series=nbr_series, nbr_val=nbr_vals, header=header, normalizer=normalizer, verbose=verbose)
N, M = ts_test.data.shape
if M <= 0:
raise ValueError(f"The dataset loaded has no series (series {M}).")
if reshp:
print(f"Benchmarking module has reduced the shape to {ts_test.data.shape}.\n")
if N < 250:
print(f"The block size is too high for the number of values per series, reduce to 2\n")
block_size_mcar = 2
for pattern in patterns:
if verbose:
print("\n2. contamination of", dataset, "with pattern", pattern, "\n")
for algorithm in algorithms:
has_been_optimized = False
if verbose:
print(f"3. {algorithm} is tested with {pattern} on {dataset}, started at {time.strftime('%Y-%m-%d %H:%M:%S')}.")
else:
print(f"{algorithm} is tested with {pattern} on {dataset}, started at {time.strftime('%Y-%m-%d %H:%M:%S')}.")
for incx, x in enumerate(x_axis):
if verbose:
print("\n4. missing values (series&values) set to", x, "for x_axis\n")
incomp_data = utils.config_contamination(ts=ts_test, pattern=pattern, dataset_rate=x, series_rate=x, block_size=block_size_mcar, verbose=verbose)
opt_imp = optimizer
try:
algo = utils.config_impute_algorithm(incomp_data=incomp_data, algorithm=algorithm, verbose=verbose)
if not isinstance(opt_imp, dict) and opt_imp != "default_params":
if opt_imp == "ray-tune":
opt_imp = "ray_tune"
opt_imp = {"optimizer": opt_imp}
if isinstance(opt_imp, dict):
optimizer_gt = {"input_data": ts_test.data, **opt_imp}
optimizer_value = opt_imp.get('optimizer') # or optimizer['optimizer']
if not has_been_optimized and algorithm not in mean_group and algorithm not in not_optimized:
if verbose:
print("\n5. AutoML to set the parameters", opt_imp, "\n")
i_opti = self._config_optimization(0.20, ts_test, pattern, algorithm, block_size_mcar)
if utils.check_family("DeepLearning", algorithm):
if dl_ratio is None:
i_opti.impute(user_def=False, params=optimizer_gt)
else:
i_opti.impute(user_def=False, params=optimizer_gt, tr_ratio=dl_ratio)
else:
i_opti.impute(user_def=False, params=optimizer_gt)
optimal_params_path = utils.save_optimization(optimal_params=i_opti.parameters, algorithm=algorithm, dataset=dataset, optimizer="e", verbose=verbose)
has_been_optimized = True
else:
if verbose:
print("\n5. AutoML already optimized...\n")
if algorithm not in mean_group and algorithm not in not_optimized:
if i_opti.parameters is None:
opti_params = utils.load_parameters(query="optimal", algorithm=algorithm, dataset=dataset, optimizer="e", path=optimal_params_path, verbose=verbose)
if verbose:
print("\n6. load imputation", algorithm, "with optimal parameters from files", *opti_params)
else:
opti_params = i_opti.parameters
if verbose:
print("\n6. set imputation", algorithm, "with optimal parameters from object", *opti_params)
else:
if verbose:
print("\n5. No AutoML launches without optimal params for", algorithm, "\n")
opti_params = None
else:
if verbose:
print("\n5. Default parameters have been set the parameters", opt_imp, "for", algorithm, "\n")
optimizer_value = opt_imp
opti_params = None
start_time_imputation = time.time()
if not self._benchmark_exception(dataset, algorithm, pattern, x, N, M):
if (utils.check_family("DeepLearning", algorithm) or utils.check_family("LLMs", algorithm)) and dl_ratio is not None:
if x > round(1-dl_ratio, 2):
algo.recov_data = incomp_data
else:
algo.impute(params=opti_params, tr_ratio=dl_ratio)
else:
algo.impute(params=opti_params)
else:
algo.recov_data = incomp_data
end_time_imputation = time.time()
algo.score(input_data=ts_test.data, recov_data=algo.recov_data, verbose=False)
if "*" not in metrics and "all" not in metrics:
algo.metrics = {k: algo.metrics[k] for k in metrics if k in algo.metrics}
time_imputation = (end_time_imputation - start_time_imputation) * 1000
if time_imputation < 1:
time_imputation = 1
log_time_imputation = math.log10(time_imputation) if time_imputation > 0 else None
algo.metrics["RUNTIME"] = time_imputation
algo.metrics["RUNTIME_LOG"] = log_time_imputation
dataset_s = dataset
if "-" in dataset:
dataset_s = dataset.replace("-", "")
save_dir_plot = save_dir + "/" + dataset_s + "/" + pattern + "/recovery/"
cont_rate = int(x*100)
ts_test.plot(input_data=ts_test.data, incomp_data=incomp_data, recov_data=algo.recov_data, nbr_series=6, subplot=True, algorithm=algo.algorithm, cont_rate=str(cont_rate), display=False, save_path=save_dir_plot, verbose=False)
runs_plots_scores.setdefault(str(dataset_s), {}).setdefault(str(pattern), {}).setdefault(str(algorithm), {}).setdefault(str(optimizer_value), {})[str(x)] = {"scores": algo.metrics}
except Exception as e:
dataset_s = dataset
if "-" in dataset:
dataset_s = dataset.replace("-", "")
print(f"Error during benchmark for {algorithm}, with {dataset_s}, and {x}%: {e}")
algo.metrics = {
"RMSE": np.nan,
"MAE": np.nan,
"MI": np.nan,
"CORRELATION": np.nan,
"RUNTIME": np.nan,
"RUNTIME_LOG": np.nan,
}
if isinstance(opt_imp, dict):
val_opt = opt_imp.get("optimizer")
if isinstance(opt_imp, str):
val_opt = opt_imp
if val_opt is None:
val_opt = ""
runs_plots_scores.setdefault(str(dataset_s), {}).setdefault(str(pattern), {}).setdefault(str(algorithm), {}).setdefault(str(val_opt), {})[str(x)] = {"scores": algo.metrics}
os.makedirs(save_dir, exist_ok=True)
save_path = os.path.join(save_dir, f"error.log")
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(save_path, "a") as file:
file.write(f"{timestamp} | Error during benchmark for {algorithm}, with {dataset_s} - for a shape of ({N}, {M}) - ({pattern}/{val_opt}), and {x}%: {e}\n\n")
print(f"done!\n\n")
run_storage.append(runs_plots_scores)
plt.close('all') # Close all open figures
for x, m in enumerate(reversed(metrics)):
#tag = True if x == (len(metrics)-1) else False
scores_list, algos, sets = self.avg_results(*run_storage, metric=m)
_ = self.generate_heatmap(scores_list=scores_list, algos=algos, sets=sets, metric=m, save_dir=save_dir, display=False)
run_averaged = self.average_runs_by_names(run_storage)
benchmark_end = time.time()
total_time_benchmark = round(benchmark_end - benchmark_time, 4)
print(f"\n> logs: benchmark - Execution Time: {total_time_benchmark} seconds\n")
os.makedirs(save_dir, exist_ok=True)
save_path = os.path.join(save_dir, f"runtime.log")
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(save_path, "a") as file:
file.write(f"{timestamp} | logs: benchmark - Execution Time: {total_time_benchmark} seconds\n")
verb = False
for scores in run_averaged:
all_keys = list(scores.keys())
dataset_name = str(all_keys[0])
save_dir_agg_set = save_dir + "/" + dataset_name
self.generate_reports_txt(runs_plots_scores=scores, save_dir=save_dir_agg_set, dataset=dataset_name, metrics=metrics, rt=total_time_benchmark, run=-1)
self.generate_plots(runs_plots_scores=scores, ticks=x_axis, metrics=metrics, subplot=True, y_size=y_p_size, save_dir=save_dir_agg_set, display=verb)
self.generate_reports_summary(run_of_values=run_averaged, save_dir=save_dir, metrics=metrics, rt=total_time_benchmark, run=-1, title=report_title)
print("\nThe results are saved in : ", save_dir, "\n")
self.list_results = run_averaged
self.aggregate_results = scores_list
save_def = os.path.join(save_dir, f"experimentation_setup.log")
with open(save_def, "w") as file:
file.write(definition_of_exp)