import ctypes
import os
import numpy as np
import toml
import importlib.resources
import numpy as __numpy_import
import platform
[docs]
def load_parameters(query: str = "default", algorithm: str = "cdrec", dataset: str = "chlorine", optimizer: str = "b", path=None, verbose=False):
"""
Load default or optimal parameters for algorithms from a TOML file.
Parameters
----------
query : str, optional
'default' or 'optimal' to load default or optimal parameters (default is "default").
algorithm : str, optional
Algorithm to load parameters for (default is "cdrec").
dataset : str, optional
Name of the dataset (default is "chlorine").
optimizer : str, optional
optimizer : str, optional
Optimizer type for optimal parameters (default is "b").
path : str, optional
Custom file path for the TOML file (default is None).
verbose : bool, optional
Whether to display the contamination information (default is False).
Returns
-------
tuple
A tuple containing the loaded parameters for the given algorithm.
"""
if query == "default":
if path is None:
filepath = importlib.resources.files('imputegap.env').joinpath("./default_values.toml")
else:
filepath = path
if not os.path.exists(filepath):
print(f"\n\tthe selected path is wrong, auto-redirection...")
here = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
filepath = os.path.join(here, "env/default_values.toml")
elif query == "optimal":
algorithm = algorithm.lower().replace("-", "").replace("_", "")
dataset = dataset.lower().replace("-", "").replace("_", "")
if path is None:
here = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
s = "optimal_parameters_" + str(optimizer) + "_" + str(dataset) + "_" + str(algorithm) + ".toml"
filepath = os.path.join(here, "imputegap_assets/params", s)
else:
filepath = path
else:
raise ValueError("Query not found for this function (expected 'optimal' or 'default')")
if not os.path.exists(filepath):
filepath = "./params/optimal_parameters_" + str(optimizer) + "_" + str(dataset) + "_" + str(algorithm) + ".toml"
if not os.path.exists(filepath): # test
here = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
s = "optimal_parameters_" + str(optimizer) + "_" + str(dataset) + "_" + str(algorithm) + ".toml"
filepath = os.path.join(here, "tests/imputegap_assets/params", s)
print(f"file not found: {filepath}, load the default folder.\n")
with open(filepath, "r") as _:
config = toml.load(filepath)
if verbose:
print("\n(SYS) Inner files loaded : ", filepath, "\n")
if algorithm == "cdrec":
truncation_rank = int(config[algorithm]['rank'])
epsilon = float(config[algorithm]['epsilon'])
iterations = int(config[algorithm]['iteration'])
return (truncation_rank, epsilon, iterations)
elif algorithm == "stmvl":
window_size = int(config[algorithm]['window_size'])
gamma = float(config[algorithm]['gamma'])
alpha = int(config[algorithm]['alpha'])
return (window_size, gamma, alpha)
elif algorithm == "iim":
learning_neighbors = int(config[algorithm]['learning_neighbors'])
if query == "default":
algo_code = config[algorithm]['algorithm_code']
return (learning_neighbors, algo_code)
else:
return (learning_neighbors,)
elif algorithm == "mrnn":
seq_len = int(config[algorithm]['seq_len'])
epochs = int(config[algorithm]['epochs'])
batch_size = int(config[algorithm]['batch_size'])
sliding_windows = int(config[algorithm]['sliding_windows'])
hidden_layers = int(config[algorithm]['hidden_layers'])
impute_weight = float(config[algorithm]['impute_weight'])
num_workers = int(config[algorithm]['num_workers'])
return (seq_len, epochs, batch_size, sliding_windows, hidden_layers, impute_weight, num_workers)
elif algorithm == "iterative_svd":
truncation_rank = int(config[algorithm]['rank'])
return (truncation_rank)
elif algorithm == "grouse":
max_rank = int(config[algorithm]['max_rank'])
return (max_rank)
elif algorithm == "dynammo":
h = int(config[algorithm]['h'])
max_iteration = int(config[algorithm]['max_iteration'])
approximation = bool(config[algorithm]['approximation'])
return (h, max_iteration, approximation)
elif algorithm == "rosl":
rank = int(config[algorithm]['rank'])
regularization = float(config[algorithm]['regularization'])
return (rank, regularization)
elif algorithm == "soft_impute":
max_rank = int(config[algorithm]['max_rank'])
return (max_rank)
elif algorithm == "spirit":
k = int(config[algorithm]['k'])
w = int(config[algorithm]['w'])
lvalue = float(config[algorithm]['lvalue'])
return (k, w, lvalue)
elif algorithm == "svt":
tau = float(config[algorithm]['tau'])
return (tau)
elif algorithm == "tkcm":
rank = int(config[algorithm]['rank'])
return (rank)
elif algorithm == "deep_mvi":
max_epoch = int(config[algorithm]['max_epoch'])
patience = int(config[algorithm]['patience'])
lr = float(config[algorithm]['lr'])
batch_size = int(config[algorithm]['batch_size'])
return (max_epoch, patience, lr, batch_size)
elif algorithm == "brits":
model = str(config[algorithm]['model'])
seq_len = int(config[algorithm]['seq_len'])
epochs = int(config[algorithm]['epochs'])
batch_size = int(config[algorithm]['batch_size'])
sliding_windows = int(config[algorithm]['sliding_windows'])
hidden_layers = int(config[algorithm]['hidden_layers'])
impute_weight = float(config[algorithm]['impute_weight'])
num_workers = int(config[algorithm]['num_workers'])
return (model, seq_len, epochs, batch_size, sliding_windows, hidden_layers, impute_weight, num_workers)
elif algorithm == "mpin":
window = int(config[algorithm]['window'])
incre_mode = str(config[algorithm]['incre_mode'])
base = str(config[algorithm]['base'])
epochs = int(config[algorithm]['epochs'])
num_of_iteration = int(config[algorithm]['num_of_iteration'])
k = int(config[algorithm]['k'])
return (window, incre_mode, base, epochs, num_of_iteration, k)
elif algorithm == "knn" or algorithm == "knn_impute":
k = int(config[algorithm]['k'])
weights = str(config[algorithm]['weights'])
return (k, weights)
elif algorithm == "interpolation":
method = str(config[algorithm]['method'])
poly_order = int(config[algorithm]['poly_order'])
return (method, poly_order)
elif algorithm == "trmf":
lags = list(config[algorithm]['lags'])
K = int(config[algorithm]['K'])
lambda_f = float(config[algorithm]['lambda_f'])
lambda_x = float(config[algorithm]['lambda_x'])
lambda_w = float(config[algorithm]['lambda_w'])
eta = float(config[algorithm]['eta'])
alpha = float(config[algorithm]['alpha'])
max_iter = int(config[algorithm]['max_iter'])
return (lags, K, lambda_f, lambda_x, lambda_w, eta, alpha, max_iter)
elif algorithm == "mice":
max_iter = int(config[algorithm]['max_iter'])
tol = float(config[algorithm]['tol'])
initial_strategy = str(config[algorithm]['initial_strategy'])
seed = int(config[algorithm]['seed'])
return (max_iter, tol, initial_strategy, seed)
elif algorithm == "miss_forest":
n_estimators = int(config[algorithm]['n_estimators'])
max_iter = int(config[algorithm]['max_iter'])
max_features = str(config[algorithm]['max_features'])
seed = int(config[algorithm]['seed'])
return (n_estimators, max_iter, max_features, seed)
elif algorithm == "xgboost":
n_estimators = int(config[algorithm]['n_estimators'])
seed = int(config[algorithm]['seed'])
return (n_estimators, seed)
elif algorithm == "miss_net":
n_components = int(config[algorithm]['n_components'])
alpha = float(config[algorithm]['alpha'])
beta = float(config[algorithm]['beta'])
n_cl = int(config[algorithm]['n_cl'])
max_iter = int(config[algorithm]['max_iter'])
tol = float(config[algorithm]['tol'])
random_init = bool(config[algorithm]['random_init'])
return (n_components, alpha, beta, n_cl, max_iter, tol, random_init)
elif algorithm == "gain":
batch_size = int(config[algorithm]['batch_size'])
epochs = int(config[algorithm]['epochs'])
alpha = int(config[algorithm]['alpha'])
hint_rate = float(config[algorithm]['hint_rate'])
return (batch_size, epochs, alpha, hint_rate)
elif algorithm == "grin":
seq_len = int(config[algorithm]['seq_len'])
sim_type = str(config[algorithm]['sim_type'])
epochs = int(config[algorithm]['epochs'])
batch_size = int(config[algorithm]['batch_size'])
sliding_windows = int(config[algorithm]['sliding_windows'])
alpha = int(config[algorithm]['alpha'])
patience = int(config[algorithm]['patience'])
num_workers = int(config[algorithm]['num_workers'])
return (seq_len, sim_type, epochs, batch_size, sliding_windows, alpha, patience, num_workers)
elif algorithm == "bay_otide":
K_trend = int(config[algorithm]['K_trend'])
K_season = int(config[algorithm]['K_season'])
n_season = int(config[algorithm]['n_season'])
K_bias = int(config[algorithm]['K_bias'])
time_scale = int(config[algorithm]['time_scale'])
a0 = float(config[algorithm]['a0'])
b0 = float(config[algorithm]['b0'])
v = float(config[algorithm]['v'])
num_fold = int(config[algorithm]['num_fold'])
return (K_trend, K_season, n_season, K_bias, time_scale, a0, b0, v, num_fold)
elif algorithm == "hkmf_t":
tags = config[algorithm]['tags']
seq_len = int(config[algorithm]['seq_len'])
blackouts_begin = int(config[algorithm]['blackouts_begin'])
blackouts_end = int(config[algorithm]['blackouts_end'])
epochs = int(config[algorithm]['epochs'])
return (tags, seq_len, blackouts_begin, blackouts_end, epochs)
elif algorithm == "nuwats":
seq_len = int(config[algorithm]['seq_len'])
batch_size = int(config[algorithm]['batch_size'])
epochs = int(config[algorithm]['epochs'])
gpt_layers = int(config[algorithm]['gpt_layers'])
num_workers = int(config[algorithm]['num_workers'])
seed = int(config[algorithm]['seed'])
return (seq_len, batch_size, epochs,gpt_layers, num_workers, seed)
elif algorithm == "gpt4ts":
seq_len = int(config[algorithm]['seq_len'])
batch_size = int(config[algorithm]['batch_size'])
epochs = int(config[algorithm]['epochs'])
gpt_layers = int(config[algorithm]['gpt_layers'])
num_workers = int(config[algorithm]['num_workers'])
seed = int(config[algorithm]['seed'])
return (seq_len, batch_size, epochs, gpt_layers, num_workers, seed)
elif algorithm == "pristi":
seq_len = int(config[algorithm]['seq_len'])
batch_size = int(config[algorithm]['batch_size'])
epochs = int(config[algorithm]['epochs'])
sliding_windows = int(config[algorithm]['sliding_windows'])
target_strategy = str(config[algorithm]['target_strategy'])
nsamples = int(config[algorithm]['nsamples'])
num_workers = int(config[algorithm]['num_workers'])
return (seq_len, batch_size, epochs, sliding_windows, target_strategy, nsamples, num_workers)
elif algorithm == "csdi":
seq_len = int(config[algorithm]['seq_len'])
batch_size = int(config[algorithm]['batch_size'])
epochs = int(config[algorithm]['epochs'])
sliding_windows = int(config[algorithm]['sliding_windows'])
target_strategy = str(config[algorithm]['target_strategy'])
nsamples = int(config[algorithm]['nsamples'])
num_workers = int(config[algorithm]['num_workers'])
return (seq_len, batch_size, epochs, sliding_windows, target_strategy, nsamples, num_workers)
elif algorithm == "timesnet":
seq_len = int(config[algorithm]['seq_len'])
batch_size = int(config[algorithm]['batch_size'])
epochs = int(config[algorithm]['epochs'])
gpt_layers = int(config[algorithm]['gpt_layers'])
num_workers = int(config[algorithm]['num_workers'])
seed = int(config[algorithm]['seed'])
return (seq_len, batch_size, epochs, gpt_layers, num_workers, seed)
elif algorithm == "bit_graph":
seq_len = int(config[algorithm]['seq_len'])
sliding_windows = int(config[algorithm]['sliding_windows'])
kernel_size = int(config[algorithm]['kernel_size'])
kernel_set = config[algorithm]['kernel_set']
epochs = int(config[algorithm]['epochs'])
batch_size = int(config[algorithm]['batch_size'])
subgraph_size = int(config[algorithm]['subgraph_size'])
num_workers = int(config[algorithm]['num_workers'])
return (seq_len, sliding_windows, kernel_size, kernel_set, epochs, batch_size, subgraph_size, num_workers)
elif algorithm == "saits":
seq_len = int(config[algorithm]['seq_len'])
batch_size = int(config[algorithm]['batch_size'])
epochs = int(config[algorithm]['epochs'])
sliding_windows = int(config[algorithm]['sliding_windows'])
n_head = int(config[algorithm]['n_head'])
num_workers = int(config[algorithm]['num_workers'])
return (seq_len, batch_size, epochs, sliding_windows, n_head, num_workers)
elif algorithm == "greedy":
n_calls = int(config[algorithm]['n_calls'])
metrics = config[algorithm]['metrics']
return (n_calls, [metrics])
elif algorithm.lower() in ["bayesian", "bo", "bayesopt"]:
n_calls = int(config['bayesian']['n_calls'])
n_random_starts = int(config['bayesian']['n_random_starts'])
acq_func = str(config['bayesian']['acq_func'])
metrics = config['bayesian']['metrics']
return (n_calls, n_random_starts, acq_func, [metrics])
elif algorithm.lower() in ['pso', "particle_swarm"]:
n_particles = int(config['pso']['n_particles'])
c1 = float(config['pso']['c1'])
c2 = float(config['pso']['c2'])
w = float(config['pso']['w'])
iterations = int(config['pso']['iterations'])
n_processes = int(config['pso']['n_processes'])
metrics = config['pso']['metrics']
return (n_particles, c1, c2, w, iterations, n_processes, [metrics])
elif algorithm.lower() in ['sh', "successive_halving"]:
num_configs = int(config['sh']['num_configs'])
num_iterations = int(config['sh']['num_iterations'])
reduction_factor = int(config['sh']['reduction_factor'])
metrics = config['sh']['metrics']
return (num_configs, num_iterations, reduction_factor, [metrics])
elif algorithm.lower() in ['ray_tune', "ray"]:
metrics = config['ray_tune']['metrics']
n_calls = int(config['ray_tune']['n_calls'])
max_concurrent_trials = int(config['ray_tune']['max_concurrent_trials'])
return ([metrics], n_calls, max_concurrent_trials)
elif algorithm == "forecaster-naive":
strategy = str(config[algorithm]['strategy'])
window_length = int(config[algorithm]['window_length'])
sp = int(config[algorithm]['sp'])
return {"strategy": strategy, "window_length": window_length, "sp": sp}
elif algorithm == "forecaster-exp-smoothing":
trend = str(config[algorithm]['trend'])
seasonal = str(config[algorithm]['seasonal'])
sp = int(config[algorithm]['sp'])
return {"trend": trend, "seasonal": seasonal, "sp": sp}
elif algorithm == "forecaster-prophet":
seasonality_mode = str(config[algorithm]['seasonality_mode'])
n_changepoints = int(config[algorithm]['n_changepoints'])
return {"seasonality_mode": seasonality_mode, "n_changepoints": n_changepoints}
elif algorithm == "forecaster-nbeats":
input_chunk_length = int(config[algorithm]['input_chunk_length'])
output_chunk_length = int(config[algorithm]['output_chunk_length'])
num_blocks = int(config[algorithm]['num_blocks'])
layer_widths = int(config[algorithm]['layer_widths'])
random_state = int(config[algorithm]['random_state'])
n_epochs = int(config[algorithm]['n_epochs'])
pl_trainer_kwargs = str(config[algorithm]['pl_trainer_kwargs'])
if pl_trainer_kwargs == "cpu":
drive = {"accelerator": pl_trainer_kwargs}
else:
drive = {"accelerator": pl_trainer_kwargs, "devices": [0]}
return {"input_chunk_length": input_chunk_length, "output_chunk_length": output_chunk_length, "num_blocks": num_blocks,
"layer_widths": layer_widths, "random_state": random_state, "n_epochs": n_epochs, "pl_trainer_kwargs": drive}
elif algorithm == "forecaster-xgboost":
lags = int(config[algorithm]['lags'])
return {"lags": lags}
elif algorithm == "forecaster-lightgbm":
lags = int(config[algorithm]['lags'])
verbose = int(config[algorithm]['verbose'])
return {"lags": lags, "verbose": verbose}
elif algorithm == "forecaster-lstm":
input_chunk_length = int(config[algorithm]['input_chunk_length'])
model = str(config[algorithm]['model'])
random_state = int(config[algorithm]['random_state'])
n_epochs = int(config[algorithm]['n_epochs'])
pl_trainer_kwargs = str(config[algorithm]['pl_trainer_kwargs'])
if pl_trainer_kwargs == "cpu":
drive = {"accelerator": pl_trainer_kwargs}
else:
drive = {"accelerator": pl_trainer_kwargs, "devices": [0]}
return {"input_chunk_length": input_chunk_length, "model": model, "random_state": random_state, "n_epochs": n_epochs, "pl_trainer_kwargs": drive}
elif algorithm == "forecaster-deepar":
input_chunk_length = int(config[algorithm]['input_chunk_length'])
model = str(config[algorithm]['model'])
random_state = int(config[algorithm]['random_state'])
n_epochs = int(config[algorithm]['n_epochs'])
pl_trainer_kwargs = str(config[algorithm]['pl_trainer_kwargs'])
if pl_trainer_kwargs == "cpu":
drive = {"accelerator": pl_trainer_kwargs}
else:
drive = {"accelerator": pl_trainer_kwargs, "devices": [0]}
return {"input_chunk_length": input_chunk_length, "model": model, "random_state": random_state, "n_epochs": n_epochs, "pl_trainer_kwargs": drive}
elif algorithm == "forecaster-transformer":
input_chunk_length = int(config[algorithm]['input_chunk_length'])
output_chunk_length = int(config[algorithm]['output_chunk_length'])
random_state = int(config[algorithm]['random_state'])
n_epochs = int(config[algorithm]['n_epochs'])
pl_trainer_kwargs = str(config[algorithm]['pl_trainer_kwargs'])
if pl_trainer_kwargs == "cpu":
drive = {"accelerator": pl_trainer_kwargs}
else:
drive = {"accelerator": pl_trainer_kwargs, "devices": [0]}
return {"input_chunk_length": input_chunk_length, "output_chunk_length": output_chunk_length, "random_state": random_state, "n_epochs": n_epochs, "pl_trainer_kwargs": drive}
elif algorithm == "forecaster-hw-add":
sp = int(config[algorithm]['sp'])
trend = str(config[algorithm]['trend'])
seasonal = str(config[algorithm]['seasonal'])
return {"sp": sp, "trend": trend, "seasonal": seasonal}
elif algorithm == "forecaster-arima":
sp = int(config[algorithm]['sp'])
suppress_warnings = bool(config[algorithm]['suppress_warnings'])
start_p = int(config[algorithm]['start_p'])
start_q = int(config[algorithm]['start_q'])
max_p = int(config[algorithm]['max_p'])
max_q = int(config[algorithm]['max_q'])
start_P = int(config[algorithm]['start_P'])
seasonal = int(config[algorithm]['seasonal'])
d = int(config[algorithm]['d'])
D = int(config[algorithm]['D'])
return {"sp": sp, "suppress_warnings": suppress_warnings, "start_p": start_p, "start_q": start_q,
"max_p": max_p, "max_q": max_q, "start_P": start_P, "seasonal": seasonal, "d": d, "D": D}
elif algorithm == "forecaster-sf-arima":
sp = int(config[algorithm]['sp'])
start_p = int(config[algorithm]['start_p'])
start_q = int(config[algorithm]['start_q'])
max_p = int(config[algorithm]['max_p'])
max_q = int(config[algorithm]['max_q'])
start_P = int(config[algorithm]['start_P'])
seasonal = int(config[algorithm]['seasonal'])
d = int(config[algorithm]['d'])
D = int(config[algorithm]['D'])
return {"sp": sp, "start_p": start_p, "start_q": start_q,
"max_p": max_p, "max_q": max_q, "start_P": start_P, "seasonal": seasonal, "d": d, "D": D}
elif algorithm == "forecaster-bats":
sp = int(config[algorithm]['sp'])
use_trend = bool(config[algorithm]['use_trend'])
use_box_cox = bool(config[algorithm]['use_box_cox'])
return {"sp": sp, "use_trend": use_trend, "use_box_cox": use_box_cox}
elif algorithm == "forecaster-ets":
sp = int(config[algorithm]['sp'])
auto = bool(config[algorithm]['auto'])
return {"sp": sp, "auto": auto}
elif algorithm == "forecaster-croston":
smoothing = float(config[algorithm]['smoothing'])
return {"smoothing": smoothing}
elif algorithm == "forecaster-unobs":
level = bool(config[algorithm]['level'])
trend = bool(config[algorithm]['trend'])
sp = int(config[algorithm]['sp'])
return {"level": level, "trend": trend, "seasonal": sp}
elif algorithm == "forecaster-theta":
sp = int(config[algorithm]['sp'])
deseasonalize = bool(config[algorithm]['deseasonalize'])
return {"sp": sp, "deseasonalize": deseasonalize}
elif algorithm == "forecaster-rnn":
input_size = int(config[algorithm]['input_size'])
inference_input_size = int(config[algorithm]['inference_input_size'])
return {"input_size": input_size, "inference_input_size": inference_input_size}
elif algorithm == "colors":
colors = config[algorithm]['plot']
return colors
elif algorithm == "colors_blacks":
colors = config[algorithm]['plot']
return colors
elif algorithm == "other":
return config
# Your own default parameters #contributing
#
#elif algorithm == "your_algo_name":
# param_1 = int(config[algorithm]['param_1'])
# param_2 = config[algorithm]['param_2']
# param_3 = float(config[algorithm]['param_3'])
# return (param_1, param_2, param_3)
else:
print("(SYS) Default/Optimal config not found for this algorithm")
return None
[docs]
def config_impute_algorithm(incomp_data, algorithm, verbose=True):
"""
Configure and execute algorithm for selected imputation imputer and pattern.
Parameters
----------
incomp_data : TimeSeries
TimeSeries object containing dataset.
algorithm : str
Name of algorithm
verbose : bool, optional
Whether to display the contamination information (default is False).
Returns
-------
BaseImputer
Configured imputer instance with optimal parameters.
"""
from imputegap.recovery.imputation import Imputation
from imputegap.recovery.manager import TimeSeries
alg_low = algorithm.lower()
alg = alg_low.replace('_', '').replace('-', '')
# 1st generation
if alg == "cdrec":
imputer = Imputation.MatrixCompletion.CDRec(incomp_data)
elif alg == "stmvl":
imputer = Imputation.PatternSearch.STMVL(incomp_data)
elif alg == "iim":
imputer = Imputation.MachineLearning.IIM(incomp_data)
elif alg == "mrnn":
imputer = Imputation.DeepLearning.MRNN(incomp_data)
# 2nd generation
elif alg == "iterativesvd" or alg == "itersvd":
imputer = Imputation.MatrixCompletion.IterativeSVD(incomp_data)
elif alg == "grouse":
imputer = Imputation.MatrixCompletion.GROUSE(incomp_data)
elif alg == "dynammo":
imputer = Imputation.PatternSearch.DynaMMo(incomp_data)
elif alg == "rosl":
imputer = Imputation.MatrixCompletion.ROSL(incomp_data)
elif alg == "softimpute" or alg == "softimp":
imputer = Imputation.MatrixCompletion.SoftImpute(incomp_data)
elif alg == "spirit":
imputer = Imputation.MatrixCompletion.SPIRIT(incomp_data)
elif alg == "svt":
imputer = Imputation.MatrixCompletion.SVT(incomp_data)
elif alg == "tkcm":
imputer = Imputation.PatternSearch.TKCM(incomp_data)
elif alg == "deepmvi":
imputer = Imputation.DeepLearning.DeepMVI(incomp_data)
elif alg == "brits":
imputer = Imputation.DeepLearning.BRITS(incomp_data)
elif alg == "mpin":
imputer = Imputation.DeepLearning.MPIN(incomp_data)
elif alg == "pristi":
imputer = Imputation.DeepLearning.PriSTI(incomp_data)
# 3rd generation
elif alg == "knn" or alg == "knnimpute":
imputer = Imputation.Statistics.KNNImpute(incomp_data)
elif alg == "interpolation":
imputer = Imputation.Statistics.Interpolation(incomp_data)
elif alg == "meanseries" or alg == "meanimputebyseries":
imputer = Imputation.Statistics.MeanImputeBySeries(incomp_data)
elif alg == "minimpute":
imputer = Imputation.Statistics.MinImpute(incomp_data)
elif alg == "zeroimpute":
imputer = Imputation.Statistics.ZeroImpute(incomp_data)
elif alg == "trmf":
imputer = Imputation.MatrixCompletion.TRMF(incomp_data)
elif alg == "mice":
imputer = Imputation.MachineLearning.MICE(incomp_data)
elif alg == "missforest":
imputer = Imputation.MachineLearning.MissForest(incomp_data)
elif alg == "xgboost":
imputer = Imputation.MachineLearning.XGBOOST(incomp_data)
elif alg == "missnet":
imputer = Imputation.DeepLearning.MissNet(incomp_data)
elif alg == "gain":
imputer = Imputation.DeepLearning.GAIN(incomp_data)
elif alg == "grin":
imputer = Imputation.DeepLearning.GRIN(incomp_data)
elif alg == "bayotide":
imputer = Imputation.DeepLearning.BayOTIDE(incomp_data)
elif alg == "hkmft" or alg == "hkmf-t" :
imputer = Imputation.DeepLearning.HKMFT(incomp_data)
elif alg == "bitgraph":
imputer = Imputation.DeepLearning.BitGraph(incomp_data)
elif alg == "meanimpute":
imputer = Imputation.Statistics.MeanImpute(incomp_data)
# 4th generation
elif alg == "nuwats":
imputer = Imputation.LLMs.NuwaTS(incomp_data)
elif alg == "gpt4ts":
imputer = Imputation.LLMs.GPT4TS(incomp_data)
# 5th generation
elif alg == "saits":
imputer = Imputation.DeepLearning.SAITS(incomp_data)
elif alg == "timesnet":
imputer = Imputation.DeepLearning.TimesNet(incomp_data)
elif alg == "csdi":
imputer = Imputation.DeepLearning.CSDI(incomp_data)
# your own implementation #contributing
#
#elif alg == "your_algo_name":
# imputer = Imputation.MyFamily.NewAlg(incomp_data)
else:
raise ValueError(f"(IMP) Algorithm '{algorithm}' not recognized, please choose your algorithm from this list:\n\t{TimeSeries().algorithms}")
imputer = None
if imputer is not None:
imputer.verbose = verbose
return imputer
[docs]
def save_optimization(optimal_params, algorithm="cdrec", dataset="", optimizer="b", file_name=None, verbose=True):
"""
Save the optimization parameters to a TOML file for later use without recomputing.
Parameters
----------
optimal_params : dict
Dictionary of the optimal parameters.
algorithm : str, optional
The name of the imputation algorithm (default is 'cdrec').
dataset : str, optional
The name of the dataset (default is an empty string).
optimizer : str, optional
The name of the optimizer used (default is 'b').
file_name : str, optional
The name of the TOML file to save the results (default is None).
Returns
-------
None
"""
algorithm = algorithm.lower().replace("-", "").replace("_", "")
dataset = dataset.lower().replace("-", "").replace("_", "")
here = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
s = "optimal_parameters_" + str(optimizer) + "_" + str(dataset) + "_" + str(algorithm) + ".toml"
dir_name = os.path.join(here, "imputegap_assets/params")
file_name = os.path.join(here, "imputegap_assets/params", s)
if isinstance(optimal_params, dict):
optimal_params = tuple(optimal_params.values())
dir_name = os.path.dirname(dir_name)
if dir_name and not os.path.exists(dir_name):
os.makedirs(dir_name)
if algorithm == "cdrec":
params_to_save = {
"rank": int(optimal_params[0]),
"epsilon": optimal_params[1],
"iteration": int(optimal_params[2])
}
elif algorithm == "mrnn" or algorithm == "MRNN":
params_to_save = {
"seq_len": int(optimal_params[0]),
"epoch": int(optimal_params[1]),
"batch_size": int(optimal_params[2]),
"sliding_windows": int(optimal_params[3]),
"hidden_layers": int(optimal_params[4]),
"impute_weight": float(optimal_params[5]),
"num_workers": int(optimal_params[6])
}
elif algorithm == "stmvl" or algorithm == "STMVL" or algorithm == "ST-MVL":
params_to_save = {
"window_size": int(optimal_params[0]),
"gamma": optimal_params[1],
"alpha": int(optimal_params[2])
}
elif algorithm == "iim" or algorithm == "IIM":
params_to_save = {
"learning_neighbors": int(optimal_params[0])
}
elif algorithm == "iterative_svd" or algorithm == "iterativesvd":
params_to_save = {
"rank": int(optimal_params[0])
}
elif algorithm == "grouse" or algorithm == "GROUSE":
params_to_save= {
"max_rank": int(optimal_params[0])
}
elif algorithm == "rosl" or algorithm == "ROSL":
params_to_save = {
"rank": int(optimal_params[0]),
"regularization": optimal_params[1]
}
elif algorithm == "soft_impute" or algorithm == "softimpute":
params_to_save = {
"max_rank": int(optimal_params[0])
}
elif algorithm == "spirit" or algorithm == "SPIRIT":
params_to_save = {
"k": int(optimal_params[0]),
"w": int(optimal_params[1]),
"lvalue": optimal_params[2]
}
elif algorithm == "svt" or algorithm == "SVT":
params_to_save = {
"tau": optimal_params[0],
"delta": optimal_params[1],
"max_iter": int(optimal_params[2])
}
elif algorithm == "dynammo" or algorithm == "DynamoMO":
params_to_save = {
"h": int(optimal_params[0]),
"max_iteration": int(optimal_params[1]),
"approximation": bool(optimal_params[2])
}
elif algorithm == "tkcm":
params_to_save = {
"rank": int(optimal_params[0])
}
elif algorithm == "brits":
params_to_save = {
"model": optimal_params[0],
"seq_len": int(optimal_params[1]),
"epoch": int(optimal_params[2]),
"batch_size": int(optimal_params[3]),
"sliding_windows": int(optimal_params[4]),
"hidden_layers": int(optimal_params[5]),
"impute_weight": float(optimal_params[6]),
"num_workers": int(optimal_params[7])
}
elif algorithm == "deep_mvi" or algorithm == "deepmvi":
params_to_save = {
"max_epoch": int(optimal_params[0]),
"patience": int(optimal_params[1]),
"lr": float(optimal_params[2]),
"batch_size": float(optimal_params[3])
}
elif algorithm == "mpin":
params_to_save = {
"window": int(optimal_params[0]),
"incre_mode": optimal_params[1],
"base": optimal_params[2],
"epochs": int(optimal_params[3]),
"num_of_iteration": int(optimal_params[4]),
"k": int(optimal_params[5])
}
elif algorithm == "knn" or algorithm == "knn_impute":
params_to_save = {
"k": int(optimal_params[0]),
"weights": str(optimal_params[1])
}
elif algorithm == "interpolation":
params_to_save = {
"method": str(optimal_params[0]),
"poly_order": int(optimal_params[1])
}
elif algorithm == "mice":
params_to_save = {
"max_iter": int(optimal_params[0]),
"tol": float(optimal_params[1]),
"initial_strategy": str(optimal_params[2]),
"seed": 42
}
elif algorithm == "miss_forest" or algorithm == "missforest":
params_to_save = {
"n_estimators": int(optimal_params[0]),
"max_iter": int(optimal_params[1]),
"max_features": str(optimal_params[2]),
"seed": 42
}
elif algorithm == "xgboost":
params_to_save = {
"n_estimators": int(optimal_params[0]),
"seed": 42
}
elif algorithm == "miss_net" or algorithm == "missnet":
params_to_save = {
"n_components": int(optimal_params[0]),
"alpha": float(optimal_params[1]),
"beta": float(optimal_params[2]),
"n_cl": int(optimal_params[3]),
"max_iter": int(optimal_params[4]),
"tol": float(optimal_params[5]),
"random_init": bool(optimal_params[6])
}
elif algorithm == "gain":
params_to_save = {
"batch_size": int(optimal_params[0]),
"epochs": int(optimal_params[1]),
"alpha": int(optimal_params[2]),
"hint_rate": float(optimal_params[3]),
}
elif algorithm == "grin":
params_to_save = {
"seq_len": int(optimal_params[0]),
"sim_type": str(optimal_params[1]),
"epochs": int(optimal_params[2]),
"batch_size": int(optimal_params[3]),
"sliding_windows": int(optimal_params[4]),
"alpha": int(optimal_params[5]),
"patience": int(optimal_params[6]),
"num_workers": int(optimal_params[7])
}
elif algorithm == "bay_otide" or algorithm == "bayotide":
params_to_save = {
"K_trend": int(optimal_params[0]),
"K_season": int(optimal_params[1]),
"n_season": int(optimal_params[2]),
"K_bias": int(optimal_params[3]),
"time_scale": int(optimal_params[4]),
"a0": float(optimal_params[5]),
"b0": float(optimal_params[6]),
"v": float(optimal_params[7]),
"num_fold": int(optimal_params[8]),
}
elif algorithm == "hkmf_t" or algorithm == "hkmft":
params_to_save = {
"tags": optimal_params[0],
"seq_len": optimal_params[1],
"blackouts_begin": int(optimal_params[2]),
"blackouts_end": int(optimal_params[3]),
"epochs": int(optimal_params[4]),
}
elif algorithm == "bit_graph" or algorithm == "bitgraph":
params_to_save = {
"seq_len": int(optimal_params[0]),
"sliding_windows": int(optimal_params[1]),
"kernel_size": int(optimal_params[2]),
"kernel_set": optimal_params[3],
"epochs": int(optimal_params[4]),
"batch_size": int(optimal_params[5]),
"subgraph_size": int(optimal_params[6]),
"num_workers": int(optimal_params[7])
}
elif algorithm == "nuwats" or algorithm == "NUWATS":
params_to_save = {
"seq_len": int(optimal_params[0]),
"batch_size": float(optimal_params[1]),
"epochs": int(optimal_params[2]),
"gpt_layers": int(optimal_params[3]),
"num_workers": int(optimal_params[4]),
"seed": int(optimal_params[5]),
}
elif algorithm == "gpt4ts" or algorithm == "GPT4TS":
params_to_save = {
"seq_len": int(optimal_params[0]),
"batch_size": float(optimal_params[1]),
"epochs": int(optimal_params[2]),
"gpt_layers": int(optimal_params[3]),
"num_workers": int(optimal_params[4]),
"seed": int(optimal_params[5]),
}
elif algorithm == "timesnet" or algorithm == "TimesNet":
params_to_save = {
"seq_len": int(optimal_params[0]),
"batch_size": float(optimal_params[1]),
"epochs": int(optimal_params[2]),
"gpt_layers": int(optimal_params[3]),
"num_workers": int(optimal_params[4]),
"seed": int(optimal_params[5]),
}
elif algorithm == "pristi":
params_to_save = {
"seq_len": int(optimal_params[0]),
"batch_size": float(optimal_params[1]),
"epochs": int(optimal_params[2]),
"sliding_windows": int(optimal_params[3]),
"target_strategy": str(optimal_params[4]),
"nsamples": int(optimal_params[5]),
"num_workers": int(optimal_params[6]),
}
elif algorithm == "csdi" or algorithm == "CSDI":
params_to_save = {
"seq_len": int(optimal_params[0]),
"batch_size": float(optimal_params[1]),
"epochs": int(optimal_params[2]),
"sliding_windows": int(optimal_params[3]),
"target_strategy": str(optimal_params[4]),
"nsamples": int(optimal_params[5]),
"num_workers": int(optimal_params[6]),
}
elif algorithm == "saits" or algorithm == "SAITS":
params_to_save = {
"seq_len": int(optimal_params[0]),
"batch_size": int(optimal_params[1]),
"epochs": int(optimal_params[2]),
"sliding_windows": int(optimal_params[3]),
"n_head": int(optimal_params[4]),
"num_workers": int(optimal_params[5])
}
# Your own optimal save parameters #contributing
#
#elif algorithm == "your_algo_name":
# params_to_save = {
# "param_1": int(optimal_params[0]),
# "param_2": optimal_params[1],
# "param_3": float(optimal_params[2]),
#}
else:
if verbose:
print(f"\n\t\t(SYS) Algorithm {algorithm} is not recognized.")
return
toml_payload = {algorithm: params_to_save}
try:
with open(file_name, 'w') as file:
toml.dump(toml_payload, file)
if verbose:
print(f"\n(SYS) Optimization parameters successfully saved to {file_name}")
except Exception as e:
print(f"\n(SYS) An error occurred while saving the file: {e}")
return file_name
[docs]
def check_family(family="DeepLearning", algorithm=""):
"""
Check whether a given algorithm belongs to a specified family.
Parameters
----------
family : str, optional
Name of the algorithm family to check against (e.g. ``"DeepLearning"``).
Defaults to ``"DeepLearning"``.
algorithm : str
Name of the algorithm to check for membership in the given family.
Matching is case-insensitive and ignores underscores and hyphens.
Returns
-------
bool
``True`` if an algorithm with the given name exists within the
specified family, ``False`` otherwise.
"""
norm_input = algorithm.lower().replace("_", "").replace("-", "")
for full_name in list_of_algorithms_with_families():
if full_name.startswith(family+"."):
suffix = full_name.split(".", 1)[1]
norm_suffix = suffix.lower().replace("_", "").replace("-", "")
if norm_input == norm_suffix:
return True
return False
[docs]
def sets_splitter_based_on_training(tr, split=0.66667, verbose=False):
"""
Compute test and validation split ratios based on a given training ratio.
Ensures that the sum of training, validation, and test ratios equals 1.0
after rounding to one decimal place. Raises a ValueError if the resulting
ratios do not sum to 1.0 within tolerance.
Parameters
----------
tr : float
Training ratio (between 0 and 1).
split : float, optional
Percentage of test set. Default is 2/3.
verbose : bool, optional
If True, prints the computed ratios for verification. Default is False.
Returns
-------
- test_ratio : Fraction of data allocated to the test set.
- val_ratio : Fraction of data allocated to the validation set.
Raises
------
ValueError
If the computed ratios do not sum to 1.0 (after rounding).
"""
test_len = round((1 - tr) * (split), 1)
val_len = round(1 - tr - test_len, 1)
total = round(tr + test_len + val_len, 1)
if total != 1.0 or test_len < 0 or val_len < 0:
raise ValueError(f"Ratios do not sum to 1.0 (train={tr}, test={test_len}, val={val_len}, total={total})")
if verbose:
print(f"\ntraining ratio: {tr}, validation ratio: {val_len}, test ratio: {test_len}")
print(f"\tsum of ratios: {total}\n")
return test_len, val_len
[docs]
def config_contamination(ts, pattern, dataset_rate=0.4, series_rate=0.4, block_size=10, offset=0.1, seed=True, limit=1, shift=0.05, std_dev=0.5, explainer=False, probabilities=None, logic_by_series=True, verbose=True):
"""
Configure and execute contamination for selected imputation algorithm and pattern.
Parameters
----------
rate : float
Mean parameter for contamination missing percentage rate.
ts_test : TimeSeries
A TimeSeries object containing dataset.
pattern : str
Type of contamination pattern (e.g., "mcar", "mp", "blackout", "disjoint", "overlap", "gaussian").
block_size_mcar : int
Size of blocks removed in MCAR
Returns
-------
TimeSeries
TimeSeries object containing contaminated data.
"""
from imputegap.recovery.contamination import GenGap
from imputegap.recovery.manager import TimeSeries
pattern_low = pattern.lower()
ptn = pattern_low.replace('_', '').replace('-', '')
if ptn == "mcar" or ptn == "missing_completely_at_random":
incomp_data = GenGap.mcar(input_data=ts.data, rate_dataset=dataset_rate, rate_series=series_rate, block_size=block_size, offset=offset, seed=seed, explainer=explainer, logic_by_series=logic_by_series, verbose=verbose)
elif ptn == "mp" or ptn == "missingpercentage" or ptn == "aligned":
incomp_data = GenGap.aligned(input_data=ts.data, rate_dataset=dataset_rate, rate_series=series_rate, offset=offset, explainer=explainer, logic_by_series=logic_by_series, verbose=verbose)
elif ptn == "ps" or ptn == "percentageshift" or ptn == "scattered" or ptn == "scatter":
incomp_data = GenGap.scattered(input_data=ts.data, rate_dataset=dataset_rate, rate_series=series_rate, offset=offset, seed=seed, explainer=explainer, logic_by_series=logic_by_series, verbose=verbose)
elif ptn == "disjoint":
incomp_data = GenGap.disjoint(input_data=ts.data, rate_series=dataset_rate, limit=1, offset=offset, logic_by_series=logic_by_series, verbose=verbose)
elif ptn == "overlap":
incomp_data = GenGap.overlap(input_data=ts.data, rate_series=dataset_rate, limit=limit, shift=shift, offset=offset, logic_by_series=logic_by_series, verbose=verbose)
elif ptn == "gaussian":
incomp_data = GenGap.gaussian(input_data=ts.data, rate_dataset=dataset_rate, rate_series=series_rate, std_dev=std_dev, offset=offset, seed=seed, explainer=explainer, logic_by_series=logic_by_series, verbose=verbose)
elif ptn == "distribution" or pattern == "dist":
incomp_data = GenGap.distribution(input_data=ts.data, rate_dataset=dataset_rate, rate_series=series_rate, probabilities_list=probabilities, offset=offset, seed=seed, explainer=explainer, logic_by_series=logic_by_series, verbose=verbose)
elif ptn == "blackout":
incomp_data = GenGap.blackout(input_data=ts.data, rate_series=dataset_rate, offset=offset, logic_by_series=logic_by_series, verbose=verbose)
else:
raise ValueError(f"\n(CONT) Pattern '{pattern}' not recognized, please choose your algorithm on this list :\n\t{TimeSeries().patterns}\n")
incomp_data = None
return incomp_data
[docs]
def config_forecaster(model, params):
"""
Configure and execute forecaster model for downstream analytics
Parameters
----------
model : str
name of the forcaster model
params : list of params
List of paramaters for a forcaster model
Returns
-------
Forecaster object (SKTIME/DART)
Forecaster object for downstream analytics
"""
from imputegap.recovery.manager import TimeSeries
model_low = model.lower()
mdl = model_low.replace('_', '').replace('-', '')
if mdl == "prophet":
from sktime.forecasting.fbprophet import Prophet
forecaster = Prophet(**params)
elif mdl == "expsmoothing":
from sktime.forecasting.exp_smoothing import ExponentialSmoothing
forecaster = ExponentialSmoothing(**params)
elif mdl == "nbeats":
from darts.models import NBEATSModel
forecaster = NBEATSModel(**params)
elif mdl == "xgboost":
from darts.models.forecasting.xgboost import XGBModel
forecaster = XGBModel(**params)
elif mdl == "lightgbm":
from darts.models.forecasting.lgbm import LightGBMModel
forecaster = LightGBMModel(**params)
elif mdl == "lstm":
from darts.models.forecasting.rnn_model import RNNModel
forecaster = RNNModel(**params)
elif mdl == "deepar":
from darts.models.forecasting.rnn_model import RNNModel
forecaster = RNNModel(**params)
elif mdl == "transformer":
from darts.models.forecasting.transformer_model import TransformerModel
forecaster = TransformerModel(**params)
elif mdl == "hwadd":
from sktime.forecasting.exp_smoothing import ExponentialSmoothing
forecaster = ExponentialSmoothing(**params)
elif mdl == "arima":
from sktime.forecasting.arima import AutoARIMA
forecaster = AutoARIMA(**params)
elif mdl == "sf-arima" or mdl == "sfarima":
from sktime.forecasting.statsforecast import StatsForecastAutoARIMA
forecaster = StatsForecastAutoARIMA(**params)
forecaster.set_config(warnings='off')
elif mdl == "bats":
from sktime.forecasting.bats import BATS
forecaster = BATS(**params)
elif mdl == "ets":
from sktime.forecasting.ets import AutoETS
forecaster = AutoETS(**params)
elif mdl == "croston":
from sktime.forecasting.croston import Croston
forecaster = Croston(**params)
elif mdl == "theta":
from sktime.forecasting.theta import ThetaForecaster
forecaster = ThetaForecaster(**params)
elif mdl == "unobs":
from sktime.forecasting.structural import UnobservedComponents
forecaster = UnobservedComponents(**params)
elif mdl == "naive":
from sktime.forecasting.naive import NaiveForecaster
forecaster = NaiveForecaster(**params)
else:
raise ValueError(f"\n(DOWN) Forecasting model '{model}' not recognized, please choose your algorithm on this list :\n\t{TimeSeries().forecasting_models}\n")
forecaster = None
return forecaster
[docs]
def get_resuts_unit_tests(algo_name, loader, verbose=True):
"""
Returns (dataset, rmse, mae) for the given algo name
from loader.toml.
"""
try:
import tomllib # Python 3.11+
with open(loader, "rb") as f:
config = tomllib.load(f)
except ImportError:
import toml
with open(loader, "r", encoding="utf-8") as f:
config = toml.load(f)
section = config[algo_name]
dataset = section["dataset"]
rmse = section["rmse"]
mae = section["mae"]
if verbose:
print(f"\nloaded for {algo_name}: {dataset = }, {rmse = }, {mae = }\n")
return dataset, rmse, mae
[docs]
def window_truncation(feature_vectors, seq_len, stride=None, info="", verbose=True, deep_verbose=False):
"""
Segment a sequence of feature vectors into fixed-length windows. In ImputeGAP, this is used in deep learning to reshape a 2D univariate dataset into a 3D windowed representation, enabling multivariate-like processing.
See reconstruction_window_based() to restore the imputed matrix to its original shape.
The code was inspired by: https://dl.acm.org/doi/10.1016/j.eswa.2023.119619
Parameters
----------
feature_vectors : np.ndarray
Input array of feature vectors. Windowing is applied along the
first axis (typically the time or sequence dimension).
seq_len : int
Length of each window (number of time steps per segment).
stride : int, optional
Step size between the starting indices of consecutive windows.
Defaults to ``seq_len`` (non-overlapping windows).
info : str, optional
Additional descriptive string to include in the verbose log output.
Defaults to an empty string.
verbose : bool, optional
If True, prints a summary of the computed windows (shape and
configuration). Defaults to True.
deep_verbose : bool, optional
If True, prints the raw start indices used to generate the
windows. Useful for debugging. Defaults to False.
Returns
-------
np.ndarray
Array of shape ``(num_windows, seq_len, features)`` containing the
extracted windows, cast to ``float32``.
"""
stride = seq_len if stride is None else stride
values = feature_vectors.shape[0]
start_indices = np.asarray(range(values // stride)) * stride
if deep_verbose:
print(f"{start_indices = }")
sample_collector = []
for idx in start_indices:
if (idx + seq_len) > values:
break
sample_collector.append(feature_vectors[idx: idx + seq_len])
dataset_strat_windows = np.asarray(sample_collector).astype('float32')
if verbose:
print(f"\t{info} windows have been computed ({seq_len=} | {stride=}): {dataset_strat_windows.shape}")
return dataset_strat_windows
[docs]
def dataset_add_dimensionality(matrix, seq_length=24, reshapable=True, adding_nans=True, three_dim=True, window=False, verbose=False, deep_verbose=False):
"""
Prepare a 2D matrix for sequence-based models (sample strategy) by padding and optional reshaping to 3D.
Parameters
----------
matrix : np.ndarray
Input 2D array of shape ``(N, M)``, where ``N`` is the number of
time steps (rows) and ``M`` is the number of features (columns).
seq_length : int, optional
Target sequence length (number of time steps per segment). Used
for padding and reshaping. Default is 24.
reshapable : bool, optional
If True, the matrix is padded (if needed) so that its number of
rows is divisible by ``seq_length``. If False, sequences are
extracted in non-overlapping chunks of length ``seq_length``
without padding. Default is True.
adding_nans : {True, False, None}, optional
Controls the padding values:
- None: pad with zeros.
- True: pad with NaNs.
- False: pad with per-column means (ignoring NaNs).
Default is True (pad with NaNs).
three_dim : bool, optional
If True and ``reshapable`` is True, the padded matrix is reshaped
to a 3D array of shape ``(num_sequences, seq_length, M)``.
If False, the function returns the padded 2D matrix.
Ignored when ``window=True`` or ``reshapable=False``.
Default is True.
window : bool, optional
If True, the function only appends a block of ``seq_length`` rows
(using the chosen padding strategy) and returns the resulting 2D
matrix without reshaping. Default is False.
verbose : bool, optional
If True, prints information about padding and the resulting
shape(s). Default is False.
deep_verbose : bool, optional
If True and ``three_dim`` is True, prints the full reshaped
3D matrix for inspection. Default is False.
Returns
-------
np.ndarray
3D array of shape ``(N_padded // seq_length, seq_length, features)``.
"""
if verbose:
print(f"\ndataset is pre-processed for 3 dimensionality, with a sequence length of {seq_length}.")
N, M = matrix.shape
if window:
pad_len = seq_length
if adding_nans is None:
pad_block = np.full((pad_len, M), 0)
else:
pad_block = np.full((pad_len, M), np.nan)
matrix = np.vstack([matrix, pad_block])
if verbose:
print(f"\tThe new shape is {matrix.shape}\n")
return matrix
if reshapable:
# How many rows needed to make it divisible?
remainder = N % seq_length
if remainder != 0:
pad_len = seq_length - remainder
if adding_nans is None:
if verbose:
print(f"the algorithm has added {pad_len} rows of NaNs")
pad_block = np.full((pad_len, M), 0) # fill with NaNs
else:
if adding_nans:
if verbose:
print(f"the algorithm has added {pad_len} rows of NaNs")
pad_block = np.full((pad_len, M), np.nan) # fill with NaNs
else:
col_mean = np.nanmean(matrix, axis=0)
if verbose:
print(f"the algorithm has added {pad_len} rows of {col_mean}")
pad_block = np.tile(col_mean, (pad_len, 1)) # repeat row of averages
matrix = np.vstack([matrix, pad_block])
N = matrix.shape[0]
if not three_dim:
if verbose:
print(f"\tThe new shape is {matrix.shape}\n")
return matrix
# Now safe to reshape
new_m = matrix.reshape(N // seq_length, seq_length, M)
if verbose:
print(f"\tThe new shape is {new_m.shape}\n")
if deep_verbose:
print(f"\tnew matrix : {new_m}\n")
return new_m
else:
new_m = np.array([matrix[i:i + seq_length] for i in range(0, N - seq_length + 1, seq_length)])
if verbose:
print(f"\ntThe new shape is {new_m.shape}\n")
return new_m
[docs]
def dataset_reverse_dimensionality(matrix, expected_n: int, verbose: bool = True):
"""
Convert (1, N, T, L) -> (N*T, L) or (N, T, L) -> (N*T, L), then trim to expected_n rows.
Steps:
1) If ndim==4, squeeze axis 0 (requires S==1).
2) Reshape first two dims together -> (N*T, L).
3) Drop the last (N*T - expected_n) rows.
Args:
matrix: np.ndarray of shape (1, N, T, L) or (N, T, L)
expected_n: final number of rows after trimming (e.g., 1000)
verbose: print shapes and removed-row count
Returns:
np.ndarray of shape (expected_n, L)
"""
if not isinstance(matrix, np.ndarray):
raise TypeError(f"'matrix' must be a numpy array, got {type(matrix)}")
if verbose:
print("\nThe dataset will be reverse back to its original dimensionality...")
# 1) Optional squeeze if 4D
if matrix.ndim == 4:
S, N, T, L = matrix.shape
if verbose:
print(f"\tinput: {matrix.shape} (S={S}, N={N}, T={T}, L={L})")
if S != 1:
raise ValueError(f"\tCannot squeeze: expected S==1 on the first dim, got S={S}")
imp = np.squeeze(matrix, axis=0) # (N, T, L)
if verbose:
print(f"\tafter squeeze -> {imp.shape}")
elif matrix.ndim == 3:
N, T, L = matrix.shape
if verbose:
print(f"\tinput: {matrix.shape} (N={N}, T={T}, L={L})")
imp = matrix
else:
raise ValueError(f"\tExpected a 3D or 4D array, got {matrix.ndim}D with shape {matrix.shape}")
# 2) Reshape (N, T, L) -> (N*T, L)
imp = imp.reshape(N * T, L)
if verbose:
print(f"\tafter reshape -> {imp.shape} (N*T={N*T}, L={L})")
# 3) Trim to expected_n rows
total_rows = N * T
if expected_n < 0:
raise ValueError(f"\texpected_n must be non-negative, got {expected_n}")
if expected_n > total_rows:
raise ValueError(f"\texpected_n ({expected_n}) > total rows ({total_rows}) after reshape")
removed = total_rows - expected_n
if removed > 0:
imp = imp[:-removed, :]
if verbose:
print(f"\tafter trim -> {imp.shape} (removed {removed} rows)")
return imp
def __marshal_as_numpy_column(__ctype_container, __py_sizen, __py_sizem):
"""
Marshal a ctypes container as a numpy column-major array.
Parameters
----------
__ctype_container : ctypes.Array
The input ctypes container (flattened matrix).
__py_sizen : int
The number of rows in the numpy array.
__py_sizem : int
The number of columns in the numpy array.
Returns
-------
numpy.ndarray
A numpy array reshaped to the original matrix dimensions (row-major order).
"""
__numpy_marshal = __numpy_import.array(__ctype_container).reshape(__py_sizem, __py_sizen).T;
return __numpy_marshal;
def __marshal_as_native_column(__py_matrix):
"""
Marshal a numpy array as a ctypes flat container for passing to native code.
Parameters
----------
__py_matrix : numpy.ndarray
The input numpy matrix (2D array).
Returns
-------
ctypes.Array
A ctypes array containing the flattened matrix (in column-major order).
"""
__py_input_flat = __numpy_import.ndarray.flatten(__py_matrix.T);
__ctype_marshal = __numpy_import.ctypeslib.as_ctypes(__py_input_flat);
return __ctype_marshal;
[docs]
def display_title(title="Master Thesis", aut="Quentin Nater", lib="ImputeGAP", university="University Fribourg"):
"""
Display the title and author information.
Parameters
----------
title : str, optional
The title of the thesis (default is "Master Thesis").
aut : str, optional
The author's name (default is "Quentin Nater").
lib : str, optional
The library or project name (default is "ImputeGAP").
university : str, optional
The university or institution (default is "University Fribourg").
Returns
-------
None
"""
print("=" * 100)
print(f"{title} : {aut}")
print("=" * 100)
print(f" {lib} - {university}")
print("=" * 100)
[docs]
def auto_seq_llms(data_x, goal="seq", subset=False, high_limit=200, exception=False, b=None, verbose=True, deep_verbose=False):
"""
Brute-force search for nice (seq_len, batch_size) pairs.
If subset is False:
data_x: array of shape (T, ...)
If subset is True:
We internally split T into train / test / val using 0.7 / 0.2 / 0.1
and ensure that batch_size is <= num_windows for *each* subset.
Returns:
(seq_len, batch_size)
"""
T = data_x.shape[0]
F = data_x.shape[1]
max_batch = T // 2
if b is not None:
max_batch = F//2
if high_limit > T*0.1 and not exception:
high_limit = int(T*0.1)
if T > 50:
start_batch = 8
else:
start_batch = 2
if b is not None:
start_batch = 2
if subset:
Tr = int(T * 0.7)
sizes = [T, Tr]
else:
Tr = T//3
sizes = [T, Tr]
starting_point = max(2, (T // 2) - 1)
starting_point = min(starting_point, high_limit)
candidates = []
for seq_len in range(starting_point, 1, -1):
list_windows = []
valid_seq = True
if seq_len % 2 == 1:
if seq_len != 1:
continue
for s in sizes:
num_windows = s - seq_len + 1
if num_windows < 2:
valid_seq = False
break
list_windows.append(num_windows)
if not valid_seq:
continue
for batch_size in range(start_batch, max_batch + 1):
# *** FIX: iterate over list_windows, not max_possible_batch ***
remainders=[]
for nw in list_windows:
r = nw % batch_size
nbr = nw // batch_size
if nbr > 1:
nbr = 0
if nbr == 1:
nbr = 1
r = r + nbr
remainders.append(r)
total_remainder = sum(remainders)
if deep_verbose:
print(f"{seq_len = } | {batch_size = }: {total_remainder = }")
# ------- scoring logic -------
if goal == "seq":
# prefer perfect match first, then larger seq_len
score = 1000 * (total_remainder > 0) - (2*seq_len) + abs(seq_len - batch_size)
elif goal == "batch":
# prefer perfect match first, then larger batch_size
score = 1000 * (total_remainder > 0) - (2*batch_size) + abs(seq_len - batch_size)
else: # "balance"
# keep original spirit, but on aggregated remainder
score = total_remainder + abs(seq_len - batch_size) * 0.1
# store a copy of list_windows for this candidate
candidates.append((score, seq_len, batch_size, list_windows.copy()))
if not candidates:
if verbose:
print("\ncompute pre-processor → No valid (seq_len, batch_size) combinations found.\n")
return None, None
# pick the combination with the lowest score
candidates.sort(key=lambda x: x[0])
best = candidates[0]
score, seq_len, batch_size, best_windows = best
if verbose:
if subset:
print(f"\ncompute pre-processor {score=} → Best seq_len={seq_len}, batch_size={batch_size}, num_windows_per_subset={best_windows}\n")
else:
num_windows = best_windows[0]
print(f"\ncompute pre-processor {score=} → Best seq_len={seq_len}, batch_size={batch_size} (num_windows={num_windows})\n")
return seq_len, batch_size
[docs]
def auto_seq_sample(matrix, tr_ratio, high_val=98, verbose=True):
"""
Automatically select a suitable sequence length and batch size
based on the dataset size and a predefined batch-size table.
The function iteratively searches for an even `seq_len`, starting from
`high_val` and decreasing by 2, until it is less than or equal to
`small_set`, where:
small_set = int(T * (1 - tr_ratio)) // 2
with `T` being the number of time steps (rows) in `matrix`.
If the search goes below 2, `seq_len` is clamped to 2.
Once `seq_len` is found, the batch size is chosen from a fixed
table `[2, 4, 8, 16, 32, 64, 96]` as the value closest to `seq_len`.
Parameters
----------
matrix : np.ndarray
Input 2D array of shape (T, F), where T is the number of time steps
and F the number of features.
tr_ratio : float
Training ratio in [0, 1]. Used to compute the size of the
"smallest set" (typically validation/test portion) that `seq_len`
should not exceed.
high_val : int, optional
Initial (maximum) candidate sequence length from which the search
starts and decreases by 2. Default is 98.
verbose : bool, optional
If True, prints the selected `seq_len`, `batch_size` and
the computed `small_set`. Default is True.
Returns
-------
seq_len : int
Selected sequence length, guaranteed to be at least 2 and
less than or equal to `small_set`.
batch_size : int
Selected batch size from the fixed table `[2, 4, 8, 16, 32, 64, 96]`
that is closest (in absolute difference) to `seq_len`.
"""
T, F = matrix.shape
found = False
batch_table = [2, 4, 8, 16, 32, 64, 96]
small_set = int(T*(1-tr_ratio))//2
if small_set <= 1000:
high_val = 50
batch_table = [2, 4, 8, 16, 32]
if small_set <= 200:
high_val = 26
batch_table = [2, 4, 8, 16]
seq_len = high_val
while found is False:
seq_len = seq_len - 2
if seq_len <= small_set:
found = True
if seq_len <= 2:
seq_len = 2
break
batch_size = min(batch_table, key=lambda b: abs(b - seq_len))
if verbose:
print(f"\nthe seq_len found is {seq_len}, and the batch_size {batch_size}, to match with the smallest set {small_set}\n")
return seq_len, batch_size
[docs]
def reconstruction_window_based(preds, nbr_timestamps, sliding_windows=1, verbose=True, deep_verbose=False):
"""
Reconstruct the full time series after window-based imputation. This function restores the original univariate series or 2D matrix from the 3D windowed (multivariate-style) representation used during the deep learning process.
See window_truncation() for the preprocessing transformation applied beforehand.
Parameters
----------
preds : torch.Tensor
Predicted windows of shape ``(N, L, F)``, where:
- ``N`` is the number of windows,
- ``L`` is the window length (sequence length),
- ``F`` is the number of features per time step.
nbr_timestamps : int
Target length ``T`` of the reconstructed time series along the
time dimension (number of time steps).
sliding_windows : int, optional
Step size between the starting indices of consecutive windows in
the original time series. The i-th window is placed starting at
index ``i * sliding_windows``. Default is 1.
verbose : bool, optional
If True, prints a summary of the reconstruction process and basic
completeness statistics. Default is True.
deep_verbose : bool, optional
If True, prints detailed information about the index ranges used
for each window and the internal count matrix. Useful for
debugging. Default is False.
Returns
-------
torch.Tensor
Reconstructed time series of shape ``(T, D)``, where
overlapping windows have been averaged at each time step.
"""
import torch
N, L, F = preds.shape
T = nbr_timestamps
if verbose:
print(f"\nreconstruction of the windows shaped matrix...\n\tsetup : {N =}, {L =}, {F =} -> {T = } : ", sep=" ", end=" ")
recons = torch.zeros(T, F)
counts = torch.zeros(T, F)
for i in range(N):
start = i * sliding_windows
seq = (start + (preds[i].shape[0]))
if deep_verbose:
print(f"{i}-{seq - 1}|", sep="", end="")
recons[start:seq] += preds[i]
counts[start:seq] += 1
if deep_verbose:
print(f"{counts = }")
mask_l = counts > 0
recons[mask_l] = recons[mask_l] / counts[mask_l]
# =test===================================================================================================
row_sums = recons.sum(dim=1) # if recons is also a torch.Tensor
mask_nonzero = ~torch.isclose(row_sums, torch.tensor(0.0))
full = mask_nonzero.all().item()
rows_all_at_least_one = (counts >= 1).all(dim=1)
num_bad_rows = (~rows_all_at_least_one).sum().item()
bad_values = (~mask_nonzero).sum().item()
if verbose:
if full and num_bad_rows == 0:
print(f"the reconstruction has been done successfully, full recovery matrix.\n"
f"\tnumber of time steps reconstructed: {nbr_timestamps - num_bad_rows}/{nbr_timestamps}, "
f"number of values not handled: {bad_values}")
else:
print(f"the reconstruction has been done successfully, full recovery matrix.\n"
f"\tnumber of time steps reconstructed: {nbr_timestamps - num_bad_rows}/{nbr_timestamps}, "
f"number of values not handled: {bad_values}")
# ======================================================================================================
return recons
[docs]
def check_contamination_series(ts_m, algo="the algorithm", verbose=True):
"""
Verify whether the input time series matrix meets the contamination constraints
required by uni-dimensional algorithms (such as SPIRIT).
Specifically, this function checks if only the first series (column 0) contains
missing (NaN) values. If any other series is contaminated, it reports an
imputation error (optionally printing a message) and returns `True` to signal
that an issue exists.
Parameters
----------
ts_m : np.ndarray
A 2D NumPy array representing the time series matrix, where each column
corresponds to a separate series.
algo : str, optional
The name of the algorithm being validated. Used only for logging in
the printed error message. Default is "the algorithm".
verbose : bool, optional
If True, prints an error message when contamination is detected outside
of series 0. Default is True.
Returns
-------
bool
False if only series 0 is contaminated (valid input).
True if contamination exists in any other series (invalid input).
"""
nan_counts_per_col = np.sum(np.isnan(ts_m), axis=0)
cols_with_nans = np.where(nan_counts_per_col > 0)[0].shape[0]
if nan_counts_per_col[0] > 0 and np.sum(nan_counts_per_col > 0) == 1:
return False
else:
if verbose:
print(f"(IMPUTATION-ERROR) {algo} is a uni-dimensional algorithm and can only operate when series 0 is the sole contaminated one.\n\tThe provided matrix contains {cols_with_nans} contaminated series.\n")
return True
[docs]
def search_path(set_name="test"):
"""
Find the accurate path for loading test files.
Parameters
----------
set_name : str, optional
Name of the dataset (default is "test").
Returns
-------
str
The correct file path for the dataset.
"""
if set_name in list_of_datasets():
return set_name + ".txt"
else:
here = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
filepath = os.path.join(here, "datasets/" + set_name)
if not os.path.exists(filepath):
filepath = "../imputegap/datasets/" + set_name
if not os.path.exists(filepath):
filepath = filepath[1:]
return filepath
[docs]
def get_missing_ratio(incomp_data):
"""
Check whether the proportion of missing values in the contaminated data is acceptable
for training a deep learning model.
Parameters
----------
incomp_data : TimeSeries (numpy array)
TimeSeries object containing dataset.
Returns
-------
bool
True if the missing data ratio is less than or equal to 40%, False otherwise.
"""
import numpy as np
miss_m = incomp_data
total_values = miss_m.size
missing_values = np.isnan(miss_m).sum()
missing_ratio = missing_values / total_values
return missing_ratio
[docs]
def verification_limitation(percentage, low_limit=0.001, high_limit=1.0):
"""
Format and verify that the percentage given by the user is within acceptable bounds.
Parameters
----------
percentage : float
The percentage value to be checked and potentially adjusted.
low_limit : float, optional
The lower limit of the acceptable percentage range (default is 0.01).
high_limit : float, optional
The upper limit of the acceptable percentage range (default is 1.0).
Returns
-------
float
Adjusted percentage based on the limits.
Raises
------
ValueError
If the percentage is outside the accepted limits.
Notes
-----
- If the percentage is between 1 and 100, it will be divided by 100 to convert it to a decimal format.
- If the percentage is outside the low and high limits, the function will print a warning and return the original value.
"""
if low_limit <= percentage <= high_limit:
return percentage # No modification needed
elif 1 <= percentage <= 100:
print(f"The percentage {percentage} is between 1 and 100. Dividing by 100 to convert to a decimal.")
return percentage / 100
else:
raise ValueError(f"The percentage {percentage} is out of the acceptable range.")
[docs]
def prepare_fixed_testing_set(incomp_m, tr_ratio=0.8, offset=0.05, block_selection=True, verbose=True):
"""
Introduces additional missing values (NaNs) into a data matrix to match a specified training ratio.
This function modifies a copy of the input matrix `incomp_m` by introducing NaNs
such that the proportion of observed (non-NaN) values matches the desired `tr_ratio`.
It returns the modified matrix and the corresponding missing data mask.
Parameters
----------
incomp_m : np.ndarray
A 2D NumPy array with potential pre-existing NaNs representing missing values.
tr_ratio : float
Desired ratio of observed (non-NaN) values in the output matrix. Must be in the range (0, 1).
offset : float
Protected zone in the begining of the series
block_selection : bool
Select the missing values by blocks or randomly (True, is by block)
verbose : bool
Whether to print debug info.
Returns
-------
data_matrix_cont : np.ndarray
The modified matrix with additional NaNs introduced to match the specified training ratio.
new_mask : np.ndarray
A boolean mask of the same shape as `data_matrix_cont` where True indicates missing (NaN) entries.
Raises
------
AssertionError:
If the final observed and missing ratios deviate from the target by more than 1%.
Notes
-----
- The function assumes that the input contains some non-NaN entries.
- NaNs are added in row-major order from the list of available (non-NaN) positions.
"""
import numpy as np
data_matrix_cont = incomp_m.copy()
target_ratio = 1 - tr_ratio
total_values = data_matrix_cont.size
target_n_nan = int(target_ratio * total_values)
# 2) Current number of NaNs
current_n_nan = np.isnan(data_matrix_cont).sum()
n_new_nans = target_n_nan - current_n_nan
available_mask = ~np.isnan(data_matrix_cont)
offset_vals = int(offset * data_matrix_cont.shape[1])
for row in range(data_matrix_cont.shape[0]):
available_mask[row, :offset_vals] = False # protect leftmost `offset` columns in each row
available_indices = np.argwhere(available_mask)
# 3) Pick indices to contaminate
if n_new_nans > 0:
if block_selection :
chosen_indices = available_indices[:n_new_nans]
else:
np.random.seed(42)
chosen_indices = available_indices[np.random.choice(len(available_indices), n_new_nans, replace=False)]
for i, j in chosen_indices:
data_matrix_cont[i, j] = np.nan
# 4) check ratio
n_total = data_matrix_cont.size
n_nan = np.isnan(data_matrix_cont).sum()
n_not_nan = n_total - n_nan
# Compute actual ratios
missing_ratio = n_nan / n_total
observed_ratio = n_not_nan / n_total
# Check if they match expectations (within a small tolerance)
assert abs(missing_ratio - target_ratio) < 0.01, f"Missing ratio {missing_ratio} is not {target_ratio}"
assert abs(observed_ratio - tr_ratio) < 0.01, f"Missing ratio {observed_ratio} is not {tr_ratio}"
# Create the new mask
new_mask = np.isnan(data_matrix_cont)
new_m = data_matrix_cont.copy()
if verbose:
print(f"(DL): TEST-SET > Test set fixed to {int(round(target_ratio*100))}% of the dataset, for {target_n_nan} values, add test values: {n_new_nans}")
return new_m, new_mask
[docs]
def split_mask_bwt_test_valid(data_matrix, test_rate=0.8, valid_rate=0.2, nan_val=None, verbose=False, seed=42):
"""
Dispatch NaN positions in data_matrix to test and validation masks only.
Parameters
----------
data_matrix : numpy.ndarray
Input matrix containing NaNs to be split.
test_rate : float
Proportion of NaNs to assign to the test set (default is 0.8).
valid_rate : float
Proportion of NaNs to assign to the validation set (default is 0.2).
test_rate + valid_rate must equal 1.0.
verbose : bool
Whether to print debug info.
seed : int, optional
Random seed for reproducibility.
Returns
-------
tuple
test_mask : numpy.ndarray
Binary mask indicating positions of NaNs in the test set.
valid_mask : numpy.ndarray
Binary mask indicating positions of NaNs in the validation set.
n_nan : int
Total number of NaN values found in the input matrix.
"""
import numpy as np
assert np.isclose(test_rate + valid_rate, 1.0), "test_rate and valid_rate must sum to 1.0"
if seed is not None:
np.random.seed(seed)
if nan_val is None:
nan_mask = np.isnan(data_matrix)
else:
nan_mask = data_matrix == nan_val
nan_indices = np.argwhere(nan_mask)
np.random.shuffle(nan_indices)
n_nan = len(nan_indices)
n_test = int(n_nan * test_rate)
n_valid = n_nan - n_test
if verbose:
print(f"\n(DL): MASKS > creating mask (testing, validation): Total NaNs = {n_nan}")
print(f"(DL): TEST-MASK > creating mask: Assigned to test = {n_test}")
print(f"(DL): VALID-MASK > creating mask: Assigned to valid = {n_valid}")
test_idx = nan_indices[:n_test]
valid_idx = nan_indices[n_test:]
mask_test = np.zeros_like(data_matrix, dtype=np.uint8)
mask_valid = np.zeros_like(data_matrix, dtype=np.uint8)
mask_test[tuple(test_idx.T)] = 1
mask_valid[tuple(valid_idx.T)] = 1
if verbose:
print(f"(DL): TEST-MASK > Test mask NaNs: {mask_test.sum()}")
print(f"(DL): VALID-MASK > Valid mask NaNs: {mask_valid.sum()}\n")
return mask_test, mask_valid, n_nan
[docs]
def generate_random_mask(gt, mask_test, mask_valid, droprate=0.2, offset=None, series_like=True, verbose=False, seed=42):
"""
Generate a random training mask over the non-NaN entries of gt, excluding positions
already present in the test and validation masks.
Parameters
----------
gt : numpy.ndarray
Ground truth data (no NaNs).
mask_test : numpy.ndarray
Binary mask indicating test positions.
mask_valid : numpy.ndarray
Binary mask indicating validation positions.
droprate : float
Proportion of eligible entries to include in the training mask.
series_like : bool
The mask must be set on free series
offset : float
Protect of not the offset of the dataset
verbose : bool
Whether to print debug info.
seed : int, optional
Random seed for reproducibility.
Returns
-------
numpy.ndarray
Binary mask indicating training positions.
"""
import numpy as np
assert gt.shape == mask_test.shape == mask_valid.shape, "All input matrices must have the same shape"
if seed is not None:
np.random.seed(seed)
mask_test_tmp = mask_test.astype(int)
mask_valid_tmp = mask_valid.astype(int)
# Valid positions: non-NaN and not in test/valid masks
num_offset = 0
mask_offset = np.zeros_like(gt, dtype=np.uint8)
# just the cell must be free to be picked
if offset is not None:
if offset > droprate:
offset = droprate
mask_offset[:, :int(offset * gt.shape[1])] = 1
num_offset = np.sum(mask_offset)
if series_like:
row_test = mask_test_tmp.any(axis=1)
row_valid = mask_valid_tmp.any(axis=1)
mask_test_tmp[row_test, :] = 1
mask_valid_tmp[row_valid, :] = 1
occupied_mask = (mask_test_tmp + mask_valid_tmp + mask_offset).astype(bool)
eligible_mask = (~np.isnan(gt)) & (~occupied_mask)
eligible_indices = np.argwhere(eligible_mask)
n_train = int(len(eligible_indices) * droprate) + int(num_offset*droprate)
np.random.shuffle(eligible_indices)
selected_indices = eligible_indices[:n_train]
mask_train = np.zeros_like(gt, dtype=np.uint8)
mask_train[tuple(selected_indices.T)] = 1
if verbose:
print(f"(DL): TRAIN-MASK > eligible entries: {len(eligible_indices)}")
print(f"(DL): TRAIN-MASK > selected training entries: {n_train}\n")
# Sanity check: no overlap between training and test masks
overlap = np.logical_and(mask_train, mask_test).sum()
assert overlap == 0, f"Overlap detected between training and test masks: {overlap} entries."
# Sanity check: no overlap between training and test masks
overlap = np.logical_and(mask_train, mask_valid).sum()
assert overlap == 0, f"Overlap detected between training and test masks: {overlap} entries."
if verbose:
print(f"(DL): TRAIN-MASK > Train mask NaNs: {mask_train.sum()}\n")
return mask_train
[docs]
def prevent_leakage(matrix, mask, replacement=0, verbose=True):
"""
Replaces missing values in a matrix to prevent data leakage during evaluation.
This function replaces all entries in `matrix` that are marked as missing in `mask`
with a specified `replacement` value (default is 0). It then checks to ensure that
there are no remaining NaNs in the matrix and that at least one replacement occurred.
Parameters
----------
matrix : np.ndarray
A NumPy array potentially containing missing values (NaNs).
mask : np.ndarray
A boolean mask of the same shape as `matrix`, where True indicates positions
to be replaced (typically where original values were NaN).
replacement : float or int, optional
The value to use in place of missing entries. Defaults to 0.
verbose : bool
Whether to print debug info.
Returns
-------
matrix : np.ndarray
The matrix with missing entries replaced by the specified value.
Raises
------
AssertionError:
If any NaNs remain in the matrix after replacement, or if no replacements were made.
Notes
-----
- This function is typically used before evaluation to ensure the model does not
access ground truth values where data was originally missing.
"""
import numpy as np
matrix[mask] = replacement
assert not np.isnan(matrix).any(), "matrix still contains NaNs"
assert (matrix == replacement).any(), "matrix does not contain any zeros"
if verbose:
print(f"\n(DL) Reset all testing matrix values to {replacement} to prevent data leakage.")
return matrix
[docs]
def prepare_testing_set(incomp_m, original_missing_ratio, block_selection=True, tr_ratio=0.8, verbose=True):
import numpy as np
error = False
mask_original_nan = np.isnan(incomp_m)
if verbose:
print(f"\n(DL) TEST-SET : testing ratio to reach = {1-tr_ratio:.2%}")
print(f"\n(DL) TEST-SET : original missing ratio = {original_missing_ratio:.2%}")
print(f"(DL) TEST-SET : original missing numbers = {np.sum(mask_original_nan)}")
if original_missing_ratio > 1-tr_ratio:
print(f"\n(ERROR) The proportion of original missing values is too high and will corrupt the training set.\n\tPlease consider reducing the percentage contamination pattern [{original_missing_ratio:.2%}] or decreasing the training ratio [{tr_ratio:.2%}].\n")
return incomp_m, mask_original_nan, True
if abs((1-tr_ratio) - original_missing_ratio) > 0.01:
new_m, new_mask = prepare_fixed_testing_set(incomp_m, tr_ratio, block_selection=block_selection, verbose=verbose)
if verbose:
print(f"(DL) TEST-SET : building of the test set to reach a fix ratio of {1 - tr_ratio:.2%}...")
final_ratio = get_missing_ratio(new_m)
print(f"(DL) TEST-SET : final artificially missing ratio for test set = {final_ratio:.2%}")
print(f"(DL) TEST-SET : final number of rows with NaN values = {np.sum(np.isnan(new_m).any(axis=1))}")
print(f"(DL) TEST-SET : final artificially missing numbers = {np.sum(new_mask)}\n")
else:
new_m = incomp_m
new_mask = mask_original_nan.copy()
return new_m, new_mask, error
"""
def set_dic_position_dl(mask_test, split_idx, verbose=False):
position_dic_imputegap = {} # {original_row_index: "train"|"val"|"test"}
non_test_seen = 0 # position among NON-TEST rows
inc_tr = 0
inc_val = 0
inc_test = 0
for i, is_test in enumerate(mask_test):
if is_test:
position_dic_imputegap[i] = ("test", inc_test)
inc_test += 1
else:
if non_test_seen < split_idx:
position_dic_imputegap[i] = ("train", inc_tr)
inc_tr += 1
else:
position_dic_imputegap[i] = ("val", inc_val)
inc_val += 1
non_test_seen += 1
if verbose:
print("\nIndices ImputeGAP Deep Learning Training with Patterns:")
for k, v in position_dic_imputegap.items():
set, inc = v
print(f"\t{k}\t{set}\t{inc}")
return position_dic_imputegap
def compute_seq_length(M):
seq_length = 1
if M > 5000:
seq_length = 3000
elif M > 3000:
seq_length = 1400
elif M > 2000:
seq_length = 1000
elif M > 1000:
seq_length = 600
elif M > 300:
seq_length = 100
elif M > 30:
seq_length = 16
else:
if M % 5 == 0:
seq_length = M // 5
elif M % 6 == 0:
seq_length = M // 6
elif M % 2 == 0:
seq_length = M // 2 - 2
if seq_length < 1:
seq_length = 1
elif M % 3 == 0:
seq_length = M // 3
return seq_length
def compute_batch_size(data, min_size=4, max_size=16, divisor=2, verbose=True):
M, N = data.shape
batch_size = min(M // divisor, max_size)
if batch_size < min_size:
batch_size = min_size
if batch_size % 2 != 0:
batch_size = batch_size + 1
if batch_size > max_size:
batch_size = batch_size -2
if batch_size < 1:
batch_size = 1
if verbose:
print(f"(Batch-Size) Computed batch size: {batch_size}\n")
return batch_size
"""
[docs]
def load_share_lib(name="lib_cdrec", verbose=True):
"""
Load the shared library based on the operating system.
Parameters
----------
name : str, optional
The name of the shared library (default is "lib_cdrec").
lib : bool, optional
If True, the function loads the library from the default 'imputegap' path; if False, it loads from a local path (default is True).
verbose : bool, optional
Whether to display the contamination information (default is True).
Returns
-------
ctypes.CDLL
The loaded shared library object.
"""
system = platform.system()
here = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# mac os ===========================================================================================================
if system == "Darwin":
lib_path = importlib.resources.files('imputegap.algorithms.lib').joinpath("./" + str(name) + ".dylib")
try: # try inner file C++
cpp_wrapper = ctypes.CDLL(lib_path)
if verbose:
print(f"\n(SYS) Wrapper files loaded for C++ : ", {lib_path}, "\n")
except Exception:
try:
lib_path = importlib.resources.files('imputegap.algorithms.lib').joinpath("./" + str(name) + "_new.dylib")
cpp_wrapper = ctypes.CDLL(lib_path)
print(f"(SYS-UPD) C++ shared object linked with new version of armadillo: {lib_path}\n")
except Exception:
lib_path = os.path.join(here, 'algorithms/lib/' + name + ".dylib")
cpp_wrapper = ctypes.CDLL(lib_path)
print(f"(SYS-UPD) C++ shared object linked with the user path : {lib_path}\n")
# other ===========================================================================================================
else:
lib_path = importlib.resources.files('imputegap.algorithms.lib').joinpath("./" + str(name) + ".so")
try: # try inner file C++
cpp_wrapper = ctypes.CDLL(lib_path)
if verbose:
print(f"\n(SYS) Wrapper files loaded for C++ : ", {lib_path}, "\n")
except Exception:
lib_path = os.path.join(here, 'algorithms/lib/' + name + ".so")
cpp_wrapper = ctypes.CDLL(lib_path)
print(f"(SYS-UPD) C++ shared object linked with the user path : {lib_path}\n")
return cpp_wrapper
[docs]
def control_boundaries(rank, boundary, algorithm="Algorithm", reduction=1):
"""
Ensure that the rank does not exceed the boundary limit.
Parameters
----------
rank : int
The input rank, typically representing the number of components or factors.
boundary : int
The maximum allowed value, usually corresponding to the number of available series.
algorithm : str, optional
The name of the algorithm using this control check (default is "Algorithm").
reduction : int, optional
The amount to reduce the boundary by if the rank exceeds it (default is 1).
Returns
-------
int
The adjusted rank value. If the input rank is valid, it is returned unchanged.
If it exceeds the boundary, a reduced value is returned. If no valid reduction is
possible, returns 1.
"""
if rank >= boundary:
new_estimator = boundary-reduction
print(f"(ERROR) {algorithm}\n\trank {rank} is higher than the number of series {boundary}. Reduce to {new_estimator}.\n")
if new_estimator > 0:
return new_estimator
else:
print(f"(ERROR) {algorithm}\n\tNot enough series to impute with this algorithm {boundary} <= 0.\n")
return 1
else:
return rank
[docs]
def list_of_algorithms():
"""
Return the list of available imputation algorithms.
Parameters
----------
None
Returns
-------
list of str
A sorted list of algorithm names supported by the framework.
"""
return sorted([
"CDRec",
"IterativeSVD",
"GROUSE",
"ROSL",
"SPIRIT",
"SoftImpute",
"SVT",
"TRMF",
"STMVL",
"DynaMMo",
"TKCM",
"IIM",
"XGBOOST",
"MICE",
"MissForest",
"KNNImpute",
"Interpolation",
"MinImpute",
"MeanImpute",
"ZeroImpute",
"MeanImputeBySeries",
"MRNN",
"BRITS",
"DeepMVI",
"MPIN",
"PRISTI",
"MissNet",
"GAIN",
"GRIN",
"BayOTIDE",
"HKMFT",
"BitGraph",
"SAITS",
"NuwaTS",
"GPT4TS",
"TimesNet",
"CSDI"
])
[docs]
def list_of_patterns():
"""
Return the list of available imputation patterns.
Parameters
----------
None
Returns
-------
list of str
A sorted list of patterns names supported by the framework.
"""
return sorted([
"aligned",
"disjoint",
"overlap",
"scattered",
"mcar",
"gaussian",
"distribution"
])
[docs]
def list_of_datasets(txt=False):
"""
Return the list of available datasets from ImputeGAP.
Parameters
----------
None
Returns
-------
list of str
A sorted list of datasets names supported by ImputeGAP.
"""
list = sorted([
"airq",
"bafu",
"chlorine",
"climate",
"drift",
"eeg-alcohol",
"eeg-reading",
"electricity",
#"fmri-stoptask",
"forecast-economy",
"meteo",
"motion",
"soccer",
#"solar",
"sport-activity",
"stock-exchange",
"temperature",
"traffic"
])
if txt:
list = [dataset + ".txt" for dataset in list]
return list
[docs]
def list_of_optimizers():
"""
Return the list of available optimizers from ImputeGAP.
Parameters
----------
None
Returns
-------
list of str
A sorted list of optimizers names supported by ImputeGAP.
"""
return sorted([
"ray_tune",
"bayesian",
"particle_swarm",
"successive_halving",
"greedy"
])
[docs]
def list_of_downstreams():
"""
Return the list of available downstream models from ImputeGAP.
Parameters
----------
None
Returns
-------
list of str
A sorted list of downstream models names supported by ImputeGAP.
"""
return sorted(list_of_downstreams_sktime() + list_of_downstreams_darts())
[docs]
def list_of_downstreams_sktime():
"""
Return the list of available downstream models (sktime) from ImputeGAP.
Parameters
----------
None
Returns
-------
list of str
A sorted list of downstream models names supported by ImputeGAP.
"""
return sorted([
"prophet",
"exp-smoothing",
"hw-add",
"arima",
"sf-arima",
#"bats",
"ets",
"croston",
"theta",
"unobs",
"naive"
])
[docs]
def list_of_downstreams_darts():
"""
Return the list of available downstream models (darts) from ImputeGAP.
Parameters
----------
None
Returns
-------
list of str
A sorted list of downstream models names supported by ImputeGAP.
"""
return sorted([
"nbeats",
"xgboost",
"lightgbm",
"lstm",
"deepar",
"transformer"
])
[docs]
def list_of_families():
"""
Return the list of available families of imputation techniques from ImputeGAP.
Parameters
----------
None
Returns
-------
list of str
A sorted list of families of imputation techniques names supported by ImputeGAP.
"""
return sorted(["DeepLearning", "MatrixCompletion", "PatternSearch", "MachineLearning", "Statistics", "LLMs"])
[docs]
def list_of_metrics():
"""
Return the list of available metrics from ImputeGAP.
Parameters
----------
None
Returns
-------
list of str
A sorted list of families of imputation metrics supported by ImputeGAP.
"""
return ["RMSE", "MAE", "MI", "CORRELATION", "RUNTIME", "RUNTIME_LOG"]
[docs]
def list_of_algorithms_deep_learning():
"""
Returns all imputation algorithms of the Deep Learning family.
"""
return list_of_algorithms_with_families(specify_family="DeepLearning")
[docs]
def list_of_algorithms_matrix_completion():
"""
Returns all imputation algorithms of the Matrix Completion family.
"""
return list_of_algorithms_with_families(specify_family="MatrixCompletion")
[docs]
def list_of_algorithms_pattern_search():
"""
Returns all imputation algorithms of the Pattern Search family.
"""
return list_of_algorithms_with_families(specify_family="PatternSearch")
[docs]
def list_of_algorithms_machine_learning():
"""
Returns all imputation algorithms of the Machine Learning family.
"""
return list_of_algorithms_with_families(specify_family="MachineLearning")
[docs]
def list_of_algorithms_statistics():
"""
Returns all imputation algorithms of the Statistics family.
"""
return list_of_algorithms_with_families(specify_family="Statistics")
[docs]
def list_of_algorithms_llms():
"""
Returns all imputation algorithms of the LLMs family.
"""
return list_of_algorithms_with_families(specify_family="LLMs")
[docs]
def list_of_algorithms_with_families(specify_family=None):
"""
Return the list of available imputation techniques (with families) from ImputeGAP.
Parameters
----------
None
Returns
-------
list of str
A sorted list of imputation techniques (with families) supported by ImputeGAP.
"""
my_list = [
"MatrixCompletion.CDRec",
"MatrixCompletion.IterativeSVD",
"MatrixCompletion.GROUSE",
"MatrixCompletion.ROSL",
"MatrixCompletion.SPIRIT",
"MatrixCompletion.SoftImpute",
"MatrixCompletion.SVT",
"MatrixCompletion.TRMF",
"PatternSearch.STMVL",
"PatternSearch.DynaMMo",
"PatternSearch.TKCM",
"MachineLearning.IIM",
"MachineLearning.XGBOOST",
"MachineLearning.MICE",
"MachineLearning.MissForest",
"Statistics.KNNImpute",
"Statistics.Interpolation",
"Statistics.MinImpute",
"Statistics.MeanImpute",
"Statistics.ZeroImpute",
"Statistics.MeanImputeBySeries",
"DeepLearning.MRNN",
"DeepLearning.BRITS",
"DeepLearning.DeepMVI",
"DeepLearning.MPIN",
"DeepLearning.PRISTI",
"DeepLearning.MissNet",
"DeepLearning.GAIN",
"DeepLearning.GRIN",
"DeepLearning.BayOTIDE",
"DeepLearning.HKMFT",
"DeepLearning.BitGraph",
"DeepLearning.SAITS",
"DeepLearning.CSDI",
"DeepLearning.TimesNet",
"LLMs.NuwaTS",
"LLMs.GPT4TS"
]
def normalize_family(family: str) -> str:
return family.replace(" ", "").lower()
if specify_family is not None:
target = normalize_family(specify_family)
return [algo.split(".")[1] for algo in my_list if normalize_family(algo.split(".")[0]) == target]
else:
return sorted(my_list)
[docs]
def list_of_normalizers():
"""
Return the list of available normalizer (with families) from ImputeGAP.
Parameters
----------
None
Returns
-------
list of str
A sorted list of normalizer supported by ImputeGAP.
"""
return ["z_score", "min_max"]
[docs]
def clean_missing_values(raw_data=None, substitute="zero", mask=None):
"""
Replace all NaN values in a 2D matrix by a column-wise substitute.
Parameters
----------
raw_data : np.ndarray
2D input array of shape (N, M) containing missing values encoded
substitute : {"mean", "median", "zero"}, optional
Strategy used to replace NaNs per column:
- "mean": replace NaNs with the column-wise mean (ignoring NaNs).
- "median": replace NaNs with the column-wise median (ignoring NaNs).
- "zero": replace NaNs with 0.
Default is "mean".
mask, np.ndarraym optional
Replace the normal NaNs detection
Returns
-------
np.ndarray
2D array of shape (N, M) with NaNs replaced column-wise
"""
if raw_data is None:
raise ValueError("raw_data must not be None.")
if mask is None:
mask = np.isnan(raw_data)
if not mask.any():
return raw_data.copy()
n_rows, n_cols = raw_data.shape
if substitute == "mean":
col_values = np.nanmean(raw_data, axis=0)
elif substitute == "median":
col_values = np.nanmedian(raw_data, axis=0)
elif substitute == "zero":
col_values = np.zeros(n_cols, dtype=float)
else:
raise ValueError(f"Unknown substitute strategy '{substitute}'. Use 'mean', 'median', or 'zero'.")
filled = raw_data.copy()
rows, cols = np.where(mask)
filled[rows, cols] = col_values[cols]
return filled
[docs]
def prepare_deep_learning_params(incomp_data, seq_len, batch_size, sliding_windows, tr_ratio, verbose):
error = False
if sliding_windows == 0:
multivariate = True
else:
multivariate = False
if seq_len == -1 or batch_size == -1:
if multivariate:
seq_len, batch_size = auto_seq_sample(matrix=incomp_data, tr_ratio=tr_ratio, verbose=verbose)
sliding_windows = seq_len
else:
seq_len, batch_size = auto_seq_llms(data_x=incomp_data, goal="seq", subset=True, high_limit=50, verbose=verbose)
if seq_len > len(incomp_data):
print(f"(ERROR) The current seq_length {seq_len} is not adapted to the contaminated matrix {len(incomp_data)} !")
error =True
return multivariate, seq_len, batch_size, sliding_windows, error