Source code for imputegap.recovery.downstream

import datetime
import os

import numpy as np
import matplotlib.pyplot as plt

from imputegap.tools import utils



[docs] class Downstream: """ A class to evaluate the performance of imputation algorithms using downstream analysis. This class provides tools to assess the quality of imputed time series data by analyzing the performance of downstream forecasting models. It computes metrics such as Mean Absolute Error (MAE) and Mean Squared Error (MSE) and visualizes the results for better interpretability. ImputeGAP downstream models for forcasting : ['arima', 'bats', 'croston', 'deepar', 'ets', 'exp-smoothing', 'hw-add', 'lightgbm', 'lstm', 'naive', 'nbeats', 'prophet', 'sf-arima', 'theta', 'transformer', 'unobs', 'xgboost'] Attributes ---------- input_data : numpy.ndarray The original time series without contamination (ground truth). recov_data : numpy.ndarray The imputed time series to evaluate. incomp_data : numpy.ndarray The time series with contamination (NaN values). downstream : dict Configuration for the downstream analysis, including the evaluator, model, and parameters. Methods ------- __init__(input_data, recov_data, incomp_data, downstream) Initializes the Downstream class with the provided data and configuration. downstream_analysis() Performs downstream analysis, computes metrics, and optionally visualizes results. _plot_downstream(y_train, y_test, y_pred, incomp_data, title="Ground Truth vs Predictions", max_series=4) Static method to plot ground truth vs. predictions for contaminated series. """ def __init__(self, input_data, recov_data, incomp_data, algorithm, downstream, verbose=True): """ Initialize the Downstream class Parameters ---------- input_data : numpy.ndarray The original time series without contamination. recov_data : numpy.ndarray The imputed time series. incomp_data : numpy.ndarray The time series with contamination (NaN values). algorithm : str Name of the algorithm to analyse. downstream : dict Information about the model to launch with its parameters verbose : bool Display or not information. """ self.input_data = input_data self.recov_data = recov_data self.incomp_data = incomp_data self.downstream = downstream self.algorithm = algorithm self.sktime_models = utils.list_of_downstreams_sktime() self.verbose = verbose
[docs] def downstream_analysis(self): """ Compute a set of evaluation metrics with a downstream analysis ImputeGAP downstream models for forcasting : ['arima', 'bats', 'croston', 'deepar', 'ets', 'exp-smoothing', 'hw-add', 'lightgbm', 'lstm', 'naive', 'nbeats', 'prophet', 'sf-arima', 'theta', 'transformer', 'unobs', 'xgboost'] Returns ------- dict or None Metrics from the downstream analysis or None if no valid evaluator is provided. """ evaluator = self.downstream.get("task", "forecast") model = self.downstream.get("model", "naive") params = self.downstream.get("params", None) plots = self.downstream.get("plots", True) baseline = self.downstream.get("baseline", None) split = self.downstream.get("split", 0.8) selected_series = self.downstream.get("selected_series", None) if baseline is None: baseline = self.downstream.get("comparator", None) plt = None model = model.lower() evaluator = evaluator.lower() if selected_series is None: nan_cols = np.isnan(self.incomp_data).any(axis=0) series_indices = int(np.argmax(nan_cols)) if nan_cols.any() else -1 selected_series = [series_indices] elif selected_series == -1: selected_series = range(self.input_data.shape[1]) if not params: print("\n\n(DOWNSTREAM) Default parameters of the downstream model loaded.") loader = "forecaster-" + str(model) params = utils.load_parameters(query="default", algorithm=loader) print(f"\n(DOWNSTREAM) Analysis launched !\n\ttask: {evaluator}\n\tmodel: {model}\n\tparams: {params}\n\tbase algorithm: {str(self.algorithm).lower()}\n\treference algorithm: {str(baseline).lower()}\n\tselected series: {selected_series}\n") if evaluator in ["forecast", "forecaster", "forecasting"]: y_train_all, y_test_all, y_pred_all = [], [], [] mae, mse, smape = [], [], [] for x in range(3): # Iterate over recov_data, input_data, and mean_impute if x == 0: data = self.input_data elif x == 1: data = self.recov_data elif x == 2: from imputegap.recovery.imputation import Imputation if baseline is not None: impt = utils.config_impute_algorithm(self.incomp_data, algorithm=baseline) impt.impute() data = impt.recov_data else: baseline = "zero-impute" zero_impute = Imputation.Statistics.ZeroImpute(self.incomp_data).impute() data = zero_impute.recov_data time_len = data.shape[0] train_len = int(time_len * split) y_train = data[:train_len, :] y_test = data[train_len:, :] forecaster = utils.config_forecaster(model, params) y_pred = np.zeros_like(y_test, dtype=float) if model in self.sktime_models: # --- SKTIME APPROACH --- from sklearn.metrics import mean_absolute_error, mean_squared_error from sktime.forecasting.base import ForecastingHorizon from sktime.performance_metrics.forecasting import MeanAbsolutePercentageError mae_list, mse_list, smape_list = [], [], [] fh = np.arange(1, y_test.shape[0] + 1) # Forecast horizon for series_idx in selected_series: series_train = y_train[:, series_idx] if model == "ltsf" or model == "rnn": forecaster.fit(series_train, fh=ForecastingHorizon(fh)) series_pred = forecaster.predict() else: forecaster.fit(series_train) series_pred = forecaster.predict(fh=fh) y_pred[:, series_idx] = np.asarray(series_pred).ravel() # Compute metrics using sktime mae_list.append(mean_absolute_error(y_test, y_pred)) mse_list.append(mean_squared_error(y_test, y_pred)) scoring_m = MeanAbsolutePercentageError(symmetric=True) smape_list.append(scoring_m.evaluate(y_test, y_pred)*100) # Compute SMAPE else: # --- DARTS APPROACH --- from darts import TimeSeries from darts.metrics import mae as darts_mae, mse as darts_mse from darts.metrics import smape as darts_smape mae_list, mse_list, smape_list = [], [], [] fh = y_test.shape[0] for series_idx in selected_series: train_ts = TimeSeries.from_values(y_train[:, series_idx]) test_ts = TimeSeries.from_values(y_test[:, series_idx]) forecaster.fit(train_ts) pred_ts = forecaster.predict(n=fh) y_pred[:, series_idx] = pred_ts.values().ravel() # Ensure pred_ts has the same components as y_test_ts pred_ts = pred_ts.with_columns_renamed(pred_ts.components, test_ts.components) # Shift time index to match if pred_ts.start_time() != test_ts.start_time(): pred_ts = pred_ts.shift(test_ts.start_time() - pred_ts.start_time()) mae_list.append(darts_mae(test_ts, pred_ts)) mse_list.append(darts_mse(test_ts, pred_ts)) smape_list.append(darts_smape(test_ts, pred_ts)) mae.append(float(np.mean(mae_list))) mse.append(float(np.mean(mse_list))) smape.append(float(np.mean(smape_list))) # Store for plotting y_train_all.append(y_train) y_test_all.append(y_test) y_pred_all.append(y_pred) # Save metrics in a dictionary al_name = "MSE_" + self.algorithm.lower() al_name_s = "sMAPE_" + self.algorithm.lower() al_name_c = "MSE_" + baseline.lower() al_name_cs = "sMAPE_" + baseline.lower() metrics = {"MSE_original": mse[0], al_name: mse[1], al_name_c: mse[2], "sMAPE_original": smape[0], al_name_s: smape[1], al_name_cs: smape[2] } if plots: # Global plot with all rows and columns here = os.path.dirname(os.path.dirname(__file__)) save_path = os.path.join(here, "imputegap_assets/downstream") plt = self._plot_downstream(y_train=y_train_all, y_test=y_test_all, y_pred=y_pred_all, incomp_data=self.incomp_data, algorithm=self.algorithm, comparison=baseline, model=model, type=evaluator, save_path=save_path) return metrics, plt else: print("\tNo evaluator found... list possible : 'forecaster'" + "*" * 30 + "\n") return None
@staticmethod def _plot_downstream(y_train, y_test, y_pred, incomp_data, algorithm, comparison, model=None, type=None, title="", max_series=1, save_path="./imputegap_assets/downstream"): """ Plot ground truth vs. predictions for contaminated series (series with NaN values). Parameters ---------- y_train : np.ndarray Training data array of shape (n_series, train_len). y_test : np.ndarray Testing data array of shape (n_series, test_len). y_pred : np.ndarray Forecasted data array of shape (n_series, test_len). incomp_data : np.ndarray Incomplete data array of shape (n_series, total_len), used to identify contaminated series. model : str Name of the current model used algorithm : str Name of the current algorithm used comparison : str Name of the current algorithm used as comparison type : str Name of the current type used title : str Title of the plot. max_series : int Maximum number of series to plot (default is 9). Returns ------- plt Return the plots object. """ # Create a 3x3 subplot grid (3 rows for data types, 3 columns for valid series) x_size = max_series * 5 if max_series == 1: x_size = 24 fig, axs = plt.subplots(3, max_series, figsize=(x_size, 15)) fig.canvas.manager.set_window_title("downstream evaluation") fig.suptitle(title, fontsize=16) # Find indices of the first 4 valid (non-NaN) series valid_indices = [i for i in range(incomp_data.shape[1]) if np.isnan(incomp_data[:, i]).any()][:max_series] # Iterate over the three data types (recov_data, input_data, mean_impute) for row_idx in range(len(y_train)): for col_idx, series_idx in enumerate(valid_indices): ax = axs[row_idx, col_idx] if max_series > 1 else axs[row_idx] # Extract the corresponding data for this data type and series s_y_train = y_train[row_idx] s_y_test = y_test[row_idx] s_y_pred = y_pred[row_idx] train_series = s_y_train[:, series_idx] test_series = s_y_test[:, series_idx] pred_series = s_y_pred[:, series_idx] # Combine training and testing data for visualization full_series = np.concatenate([train_series, test_series]) # Plot training data ax.plot(range(len(train_series)), train_series, color="green") # Plot ground truth (testing data) ax.plot( range(len(train_series), len(full_series)), test_series, label="ground truth", color="green" ) label = type + " " + model # Plot forecasted data ax.plot( range(len(train_series), len(full_series)), pred_series, label=label, linestyle="--", marker=None, color="red" ) # Add a vertical line at the split point ax.axvline(x=len(train_series), color="orange", linestyle="--") # Add labels, title, and grid if row_idx == 0: ax.set_title(f"original data, series_{series_idx+1}") elif row_idx == 1: ax.set_title(f"{algorithm.lower()} imputation, series_{series_idx+1}") else: ax.set_title(f"{comparison.lower()} imputation, series_{series_idx+1}") ax.set_xlabel("Timestamp") ax.set_ylabel("Value") ax.legend(loc='upper left', fontsize=7, frameon=True, fancybox=True, framealpha=0.8) ax.grid() # Adjust layout plt.tight_layout(rect=[0, 0.03, 1, 0.95]) fig.subplots_adjust(top=0.92, hspace=0.4) if save_path: os.makedirs(save_path, exist_ok=True) now = datetime.datetime.now() current_time = now.strftime("%y_%m_%d_%H_%M_%S") file_path = os.path.join(save_path + "/" + current_time + "_" + type + "_" + model + "_downstream.jpg") plt.savefig(file_path, bbox_inches='tight') print("plots saved in: ", save_path) plt.show() return plt