import ctypes
import os
import toml
import importlib.resources
import numpy as __numpy_import;
[docs]
def config_impute_algorithm(incomp_data, algorithm):
"""
Configure and execute algorithm for selected imputation imputer and pattern.
Parameters
----------
incomp_data : TimeSeries
TimeSeries object containing dataset.
algorithm : str
Name of algorithm
Returns
-------
BaseImputer
Configured imputer instance with optimal parameters.
"""
from imputegap.recovery.imputation import Imputation
# 1st generation
if algorithm == "cdrec" or algorithm == "CDRec":
imputer = Imputation.MatrixCompletion.CDRec(incomp_data)
elif algorithm == "stmvl" or algorithm == "STMVL":
imputer = Imputation.PatternSearch.STMVL(incomp_data)
elif algorithm == "iim" or algorithm == "IIM":
imputer = Imputation.MachineLearning.IIM(incomp_data)
elif algorithm == "mrnn" or algorithm == "MRNN":
imputer = Imputation.DeepLearning.MRNN(incomp_data)
# 2nd generation
elif algorithm == "iterative_svd" or algorithm == "iter_svd" or algorithm == "IterativeSVD":
imputer = Imputation.MatrixCompletion.IterativeSVD(incomp_data)
elif algorithm == "grouse" or algorithm == "GROUSE":
imputer = Imputation.MatrixCompletion.GROUSE(incomp_data)
elif algorithm == "dynammo" or algorithm == "DynaMMo":
imputer = Imputation.PatternSearch.DynaMMo(incomp_data)
elif algorithm == "rosl" or algorithm == "ROSL":
imputer = Imputation.MatrixCompletion.ROSL(incomp_data)
elif algorithm == "soft_impute" or algorithm == "soft_imp" or algorithm == "SoftImpute":
imputer = Imputation.MatrixCompletion.SoftImpute(incomp_data)
elif algorithm == "spirit" or algorithm == "SPIRIT":
imputer = Imputation.MatrixCompletion.SPIRIT(incomp_data)
elif algorithm == "svt" or algorithm == "SVT":
imputer = Imputation.MatrixCompletion.SVT(incomp_data)
elif algorithm == "tkcm" or algorithm == "TKCM":
imputer = Imputation.PatternSearch.TKCM(incomp_data)
elif algorithm == "deep_mvi" or algorithm == "DeepMVI":
imputer = Imputation.DeepLearning.DeepMVI(incomp_data)
elif algorithm == "brits" or algorithm == "BRITS":
imputer = Imputation.DeepLearning.BRITS(incomp_data)
elif algorithm == "mpin" or algorithm == "MPIN":
imputer = Imputation.DeepLearning.MPIN(incomp_data)
elif algorithm == "pristi" or algorithm == "PRISTI":
imputer = Imputation.DeepLearning.PRISTI(incomp_data)
# 3rd generation
elif algorithm == "knn" or algorithm == "KNN":
imputer = Imputation.Statistics.KNN(incomp_data)
elif algorithm == "interpolation" or algorithm == "Interpolation":
imputer = Imputation.Statistics.Interpolation(incomp_data)
elif algorithm == "mean_series" or algorithm == "MeanImputeBySeries":
imputer = Imputation.Statistics.MeanImputeBySeries(incomp_data)
elif algorithm == "min_impute" or algorithm == "MinImpute":
imputer = Imputation.Statistics.MinImpute(incomp_data)
elif algorithm == "zero_impute" or algorithm == "ZeroImpute":
imputer = Imputation.Statistics.ZeroImpute(incomp_data)
elif algorithm == "trmf" or algorithm == "TRMF":
imputer = Imputation.MatrixCompletion.TRMF(incomp_data)
elif algorithm == "mice" or algorithm == "MICE":
imputer = Imputation.MachineLearning.MICE(incomp_data)
elif algorithm == "miss_forest" or algorithm == "MissForest":
imputer = Imputation.MachineLearning.MissForest(incomp_data)
elif algorithm == "xgboost" or algorithm == "XGBOOST":
imputer = Imputation.MachineLearning.XGBOOST(incomp_data)
elif algorithm == "miss_net" or algorithm == "MissNet":
imputer = Imputation.DeepLearning.MissNet(incomp_data)
elif algorithm == "gain" or algorithm == "GAIN":
imputer = Imputation.DeepLearning.GAIN(incomp_data)
elif algorithm == "grin" or algorithm == "GRIN":
imputer = Imputation.DeepLearning.GRIN(incomp_data)
elif algorithm == "bay_otide" or algorithm == "BayOTIDE":
imputer = Imputation.DeepLearning.BayOTIDE(incomp_data)
elif algorithm == "hkmf_t" or algorithm == "HKMF_T":
imputer = Imputation.DeepLearning.HKMF_T(incomp_data)
elif algorithm == "bit_graph" or algorithm == "BitGraph":
imputer = Imputation.DeepLearning.BitGraph(incomp_data)
else:
imputer = Imputation.Statistics.MeanImpute(incomp_data)
return imputer
[docs]
def config_contamination(ts, pattern, dataset_rate=0.4, series_rate=0.4, block_size=10, offset=0.1, seed=True, limit=1, shift=0.05, std_dev=0, explainer=False, probabilities=None):
"""
Configure and execute contamination for selected imputation algorithm and pattern.
Parameters
----------
rate : float
Mean parameter for contamination missing percentage rate.
ts_test : TimeSeries
A TimeSeries object containing dataset.
pattern : str
Type of contamination pattern (e.g., "mcar", "mp", "blackout", "disjoint", "overlap", "gaussian").
block_size_mcar : int
Size of blocks removed in MCAR
Returns
-------
TimeSeries
TimeSeries object containing contaminated data.
"""
if pattern == "mcar" or pattern == "missing_completely_at_random":
incomp_data = ts.Contamination.missing_completely_at_random(input_data=ts.data, rate_dataset=dataset_rate, rate_series=series_rate, block_size=block_size, offset=offset, seed=seed, explainer=explainer)
elif pattern == "mp" or pattern == "missing_percentage":
incomp_data = ts.Contamination.missing_percentage(input_data=ts.data, rate_dataset=dataset_rate, rate_series=series_rate, offset=offset)
elif pattern == "ps" or pattern == "percentage_shift":
incomp_data = ts.Contamination.percentage_shift(input_data=ts.data, rate_dataset=dataset_rate, rate_series=series_rate, offset=offset, seed=seed)
elif pattern == "disjoint":
incomp_data = ts.Contamination.disjoint(input_data=ts.data, rate_series=dataset_rate, limit=1, offset=offset)
elif pattern == "overlap":
incomp_data = ts.Contamination.overlap(input_data=ts.data, rate_series=dataset_rate, limit=limit, shift=shift, offset=offset)
elif pattern == "gaussian":
incomp_data = ts.Contamination.gaussian(input_data=ts.data, rate_dataset=dataset_rate, rate_series=series_rate, std_dev=std_dev, offset=offset, seed=True)
elif pattern == "distribution":
incomp_data = ts.Contamination.distribution(input_data=ts.data, rate_dataset=dataset_rate, rate_series=series_rate, probabilities=probabilities, offset=offset, seed=True)
else:
incomp_data = ts.Contamination.blackout(input_data=ts.data, series_rate=dataset_rate, offset=offset)
return incomp_data
def __marshal_as_numpy_column(__ctype_container, __py_sizen, __py_sizem):
"""
Marshal a ctypes container as a numpy column-major array.
Parameters
----------
__ctype_container : ctypes.Array
The input ctypes container (flattened matrix).
__py_sizen : int
The number of rows in the numpy array.
__py_sizem : int
The number of columns in the numpy array.
Returns
-------
numpy.ndarray
A numpy array reshaped to the original matrix dimensions (row-major order).
"""
__numpy_marshal = __numpy_import.array(__ctype_container).reshape(__py_sizem, __py_sizen).T;
return __numpy_marshal;
def __marshal_as_native_column(__py_matrix):
"""
Marshal a numpy array as a ctypes flat container for passing to native code.
Parameters
----------
__py_matrix : numpy.ndarray
The input numpy matrix (2D array).
Returns
-------
ctypes.Array
A ctypes array containing the flattened matrix (in column-major order).
"""
__py_input_flat = __numpy_import.ndarray.flatten(__py_matrix.T);
__ctype_marshal = __numpy_import.ctypeslib.as_ctypes(__py_input_flat);
return __ctype_marshal;
[docs]
def display_title(title="Master Thesis", aut="Quentin Nater", lib="ImputeGAP", university="University Fribourg"):
"""
Display the title and author information.
Parameters
----------
title : str, optional
The title of the thesis (default is "Master Thesis").
aut : str, optional
The author's name (default is "Quentin Nater").
lib : str, optional
The library or project name (default is "ImputeGAP").
university : str, optional
The university or institution (default is "University Fribourg").
Returns
-------
None
"""
print("=" * 100)
print(f"{title} : {aut}")
print("=" * 100)
print(f" {lib} - {university}")
print("=" * 100)
[docs]
def search_path(set_name="test"):
"""
Find the accurate path for loading test files.
Parameters
----------
set_name : str, optional
Name of the dataset (default is "test").
Returns
-------
str
The correct file path for the dataset.
"""
if set_name in list_of_datasets():
return set_name + ".txt"
else:
filepath = "../imputegap/dataset/" + set_name + ".txt"
if not os.path.exists(filepath):
filepath = filepath[1:]
return filepath
[docs]
def load_parameters(query: str = "default", algorithm: str = "cdrec", dataset: str = "chlorine", optimizer: str = "b", path=None):
"""
Load default or optimal parameters for algorithms from a TOML file.
Parameters
----------
query : str, optional
'default' or 'optimal' to load default or optimal parameters (default is "default").
algorithm : str, optional
Algorithm to load parameters for (default is "cdrec").
dataset : str, optional
Name of the dataset (default is "chlorine").
optimizer : str, optional
Optimizer type for optimal parameters (default is "b").
path : str, optional
Custom file path for the TOML file (default is None).
Returns
-------
tuple
A tuple containing the loaded parameters for the given algorithm.
"""
if query == "default":
if path is None:
filepath = importlib.resources.files('imputegap.env').joinpath("./default_values.toml")
if not filepath.is_file():
filepath = "./env/default_values.toml"
else:
filepath = path
if not os.path.exists(filepath):
filepath = "./env/default_values.toml"
elif query == "optimal":
if path is None:
filename = "./optimal_parameters_" + str(optimizer) + "_" + str(dataset) + "_" + str(algorithm) + ".toml"
filepath = importlib.resources.files('imputegap.params').joinpath(filename)
if not filepath.is_file():
filepath = "./params/optimal_parameters_" + str(optimizer) + "_" + str(dataset) + "_" + str(algorithm) + ".toml"
else:
filepath = path
if not os.path.exists(filepath):
filepath = "./params/optimal_parameters_" + str(optimizer) + "_" + str(dataset) + "_" + str(algorithm) + ".toml"
else:
filepath = None
print("Query not found for this function ('optimal' or 'default')")
if not os.path.exists(filepath):
filepath = "./params/optimal_parameters_" + str(optimizer) + "_" + str(dataset) + "_" + str(algorithm) + ".toml"
if not os.path.exists(filepath):
filepath = filepath[1:]
with open(filepath, "r") as _:
config = toml.load(filepath)
print("\n\t\t\t\t(SYS) Inner files loaded : ", filepath, "\n")
if algorithm == "cdrec":
truncation_rank = int(config[algorithm]['rank'])
epsilon = float(config[algorithm]['epsilon'])
iterations = int(config[algorithm]['iteration'])
return (truncation_rank, epsilon, iterations)
elif algorithm == "stmvl":
window_size = int(config[algorithm]['window_size'])
gamma = float(config[algorithm]['gamma'])
alpha = int(config[algorithm]['alpha'])
return (window_size, gamma, alpha)
elif algorithm == "iim":
learning_neighbors = int(config[algorithm]['learning_neighbors'])
if query == "default":
algo_code = config[algorithm]['algorithm_code']
return (learning_neighbors, algo_code)
else:
return (learning_neighbors,)
elif algorithm == "mrnn":
hidden_dim = int(config[algorithm]['hidden_dim'])
learning_rate = float(config[algorithm]['learning_rate'])
iterations = int(config[algorithm]['iterations'])
if query == "default":
sequence_length = int(config[algorithm]['sequence_length'])
return (hidden_dim, learning_rate, iterations, sequence_length)
else:
return (hidden_dim, learning_rate, iterations)
elif algorithm == "iterative_svd":
truncation_rank = int(config[algorithm]['rank'])
return (truncation_rank)
elif algorithm == "grouse":
max_rank = int(config[algorithm]['max_rank'])
return (max_rank)
elif algorithm == "dynammo":
h = int(config[algorithm]['h'])
max_iteration = int(config[algorithm]['max_iteration'])
approximation = bool(config[algorithm]['approximation'])
return (h, max_iteration, approximation)
elif algorithm == "rosl":
rank = int(config[algorithm]['rank'])
regularization = float(config[algorithm]['regularization'])
return (rank, regularization)
elif algorithm == "soft_impute":
max_rank = int(config[algorithm]['max_rank'])
return (max_rank)
elif algorithm == "spirit":
k = int(config[algorithm]['k'])
w = int(config[algorithm]['w'])
lvalue = float(config[algorithm]['lvalue'])
return (k, w, lvalue)
elif algorithm == "svt":
tau = float(config[algorithm]['tau'])
return (tau)
elif algorithm == "tkcm":
rank = int(config[algorithm]['rank'])
return (rank)
elif algorithm == "deep_mvi":
max_epoch = int(config[algorithm]['max_epoch'])
patience = int(config[algorithm]['patience'])
lr = float(config[algorithm]['lr'])
return (max_epoch, patience, lr)
elif algorithm == "brits":
model = str(config[algorithm]['model'])
epoch = int(config[algorithm]['epoch'])
batch_size = int(config[algorithm]['batch_size'])
nbr_features = int(config[algorithm]['nbr_features'])
hidden_layers = int(config[algorithm]['hidden_layers'])
return (model, epoch, batch_size, nbr_features, hidden_layers)
elif algorithm == "mpin":
incre_mode = str(config[algorithm]['incre_mode'])
window = int(config[algorithm]['window'])
k = int(config[algorithm]['k'])
learning_rate = float(config[algorithm]['learning_rate'])
weight_decay = float(config[algorithm]['weight_decay'])
epochs = int(config[algorithm]['epochs'])
num_of_iteration = int(config[algorithm]['num_of_iteration'])
threshold = float(config[algorithm]['threshold'])
base = str(config[algorithm]['base'])
return (incre_mode, window, k, learning_rate, weight_decay, epochs, num_of_iteration, threshold, base)
elif algorithm == "pristi":
target_strategy = str(config[algorithm]['target_strategy'])
unconditional = bool(config[algorithm]['unconditional'])
seed = int(config[algorithm]['seed'])
device = str(config[algorithm]['device'])
return (target_strategy, unconditional, seed, device)
elif algorithm == "knn":
k = int(config[algorithm]['k'])
weights = str(config[algorithm]['weights'])
return (k, weights)
elif algorithm == "interpolation":
method = str(config[algorithm]['method'])
poly_order = int(config[algorithm]['poly_order'])
return (method, poly_order)
elif algorithm == "trmf":
lags = list(config[algorithm]['lags'])
K = int(config[algorithm]['K'])
lambda_f = float(config[algorithm]['lambda_f'])
lambda_x = float(config[algorithm]['lambda_x'])
lambda_w = float(config[algorithm]['lambda_w'])
eta = float(config[algorithm]['eta'])
alpha = float(config[algorithm]['alpha'])
max_iter = int(config[algorithm]['max_iter'])
return (lags, K, lambda_f, lambda_x, lambda_w, eta, alpha, max_iter)
elif algorithm == "mice":
max_iter = int(config[algorithm]['max_iter'])
tol = float(config[algorithm]['tol'])
initial_strategy = str(config[algorithm]['initial_strategy'])
seed = int(config[algorithm]['seed'])
return (max_iter, tol, initial_strategy, seed)
elif algorithm == "miss_forest":
n_estimators = int(config[algorithm]['n_estimators'])
max_iter = int(config[algorithm]['max_iter'])
max_features = str(config[algorithm]['max_features'])
seed = int(config[algorithm]['seed'])
return (n_estimators, max_iter, max_features, seed)
elif algorithm == "xgboost":
n_estimators = int(config[algorithm]['n_estimators'])
seed = int(config[algorithm]['seed'])
return (n_estimators, seed)
elif algorithm == "miss_net":
alpha = float(config[algorithm]['alpha'])
beta = float(config[algorithm]['beta'])
L = int(config[algorithm]['L'])
n_cl = int(config[algorithm]['n_cl'])
max_iter = int(config[algorithm]['max_iter'])
tol = float(config[algorithm]['tol'])
random_init = bool(config[algorithm]['random_init'])
return (alpha, beta, L, n_cl, max_iter, tol, random_init)
elif algorithm == "gain":
batch_size = int(config[algorithm]['batch_size'])
hint_rate = float(config[algorithm]['hint_rate'])
alpha = int(config[algorithm]['alpha'])
epoch = int(config[algorithm]['epoch'])
return (batch_size, hint_rate, alpha, epoch)
elif algorithm == "grin":
d_hidden = int(config[algorithm]['d_hidden'])
lr = float(config[algorithm]['lr'])
batch_size = int(config[algorithm]['batch_size'])
window = int(config[algorithm]['window'])
alpha = int(config[algorithm]['alpha'])
patience = int(config[algorithm]['patience'])
epochs = int(config[algorithm]['epochs'])
workers = int(config[algorithm]['workers'])
return (d_hidden, lr, batch_size, window, alpha, patience, epochs, workers)
elif algorithm == "bay_otide":
K_trend = int(config[algorithm]['K_trend'])
K_season = int(config[algorithm]['K_season'])
n_season = int(config[algorithm]['n_season'])
K_bias = int(config[algorithm]['K_bias'])
time_scale = int(config[algorithm]['time_scale'])
a0 = float(config[algorithm]['a0'])
b0 = float(config[algorithm]['b0'])
v = float(config[algorithm]['v'])
return (K_trend, K_season, n_season, K_bias, time_scale, a0, b0, v)
elif algorithm == "hkmf_t":
tags = config[algorithm]['tags']
data_names = config[algorithm]['data_names']
epoch = int(config[algorithm]['epoch'])
return (tags, data_names, epoch)
elif algorithm == "bit_graph":
node_number = int(config[algorithm]['node_number'])
kernel_set = config[algorithm]['kernel_set']
dropout = float(config[algorithm]['dropout'])
subgraph_size = int(config[algorithm]['subgraph_size'])
node_dim = int(config[algorithm]['node_dim'])
seq_len = int(config[algorithm]['seq_len'])
lr = float(config[algorithm]['lr'])
epoch = int(config[algorithm]['epoch'])
seed = int(config[algorithm]['seed'])
return (node_number, kernel_set, dropout, subgraph_size, node_dim, seq_len, lr, epoch, seed)
elif algorithm == "greedy":
n_calls = int(config[algorithm]['n_calls'])
metrics = config[algorithm]['metrics']
return (n_calls, [metrics])
elif algorithm.lower() in ["bayesian", "bo", "bayesopt"]:
n_calls = int(config['bayesian']['n_calls'])
n_random_starts = int(config['bayesian']['n_random_starts'])
acq_func = str(config['bayesian']['acq_func'])
metrics = config['bayesian']['metrics']
return (n_calls, n_random_starts, acq_func, [metrics])
elif algorithm.lower() in ['pso', "particle_swarm"]:
n_particles = int(config['pso']['n_particles'])
c1 = float(config['pso']['c1'])
c2 = float(config['pso']['c2'])
w = float(config['pso']['w'])
iterations = int(config['pso']['iterations'])
n_processes = int(config['pso']['n_processes'])
metrics = config['pso']['metrics']
return (n_particles, c1, c2, w, iterations, n_processes, [metrics])
elif algorithm.lower() in ['sh', "successive_halving"]:
num_configs = int(config['sh']['num_configs'])
num_iterations = int(config['sh']['num_iterations'])
reduction_factor = int(config['sh']['reduction_factor'])
metrics = config['sh']['metrics']
return (num_configs, num_iterations, reduction_factor, [metrics])
elif algorithm.lower() in ['ray_tune', "ray"]:
metrics = config['ray_tune']['metrics']
n_calls = int(config['ray_tune']['n_calls'])
max_concurrent_trials = int(config['ray_tune']['max_concurrent_trials'])
return ([metrics], n_calls, max_concurrent_trials)
elif algorithm == "forecaster-naive":
strategy = str(config[algorithm]['strategy'])
window_length = int(config[algorithm]['window_length'])
sp = int(config[algorithm]['sp'])
return {"strategy": strategy, "window_length": window_length, "sp": sp}
elif algorithm == "forecaster-exp-smoothing":
trend = str(config[algorithm]['trend'])
seasonal = str(config[algorithm]['seasonal'])
sp = int(config[algorithm]['sp'])
return {"trend": trend, "seasonal": seasonal, "sp": sp}
elif algorithm == "forecaster-prophet":
seasonality_mode = str(config[algorithm]['seasonality_mode'])
n_changepoints = int(config[algorithm]['n_changepoints'])
return {"seasonality_mode": seasonality_mode, "n_changepoints": n_changepoints}
elif algorithm == "colors":
colors = config[algorithm]['plot']
return colors
elif algorithm == "other":
return config
else:
print("\t\t(SYS) Default/Optimal config not found for this algorithm")
return None
[docs]
def verification_limitation(percentage, low_limit=0.01, high_limit=1.0):
"""
Format and verify that the percentage given by the user is within acceptable bounds.
Parameters
----------
percentage : float
The percentage value to be checked and potentially adjusted.
low_limit : float, optional
The lower limit of the acceptable percentage range (default is 0.01).
high_limit : float, optional
The upper limit of the acceptable percentage range (default is 1.0).
Returns
-------
float
Adjusted percentage based on the limits.
Raises
------
ValueError
If the percentage is outside the accepted limits.
Notes
-----
- If the percentage is between 1 and 100, it will be divided by 100 to convert it to a decimal format.
- If the percentage is outside the low and high limits, the function will print a warning and return the original value.
"""
if low_limit <= percentage <= high_limit:
return percentage # No modification needed
elif 1 <= percentage <= 100:
print(f"The percentage {percentage} is between 1 and 100. Dividing by 100 to convert to a decimal.")
return percentage / 100
else:
raise ValueError("The percentage is out of the acceptable range.")
[docs]
def load_share_lib(name="lib_cdrec", lib=True):
"""
Load the shared library based on the operating system.
Parameters
----------
name : str, optional
The name of the shared library (default is "lib_cdrec").
lib : bool, optional
If True, the function loads the library from the default 'imputegap' path; if False, it loads from a local path (default is True).
Returns
-------
ctypes.CDLL
The loaded shared library object.
"""
if lib:
lib_path = importlib.resources.files('imputegap.algorithms.lib').joinpath("./" + str(name))
else:
local_path_lin = './algorithms/lib/' + name + '.so'
if not os.path.exists(local_path_lin):
local_path_lin = './imputegap/algorithms/lib/' + name + '.so'
lib_path = os.path.join(local_path_lin)
print("\t\t(SYS) lib loaded from:", lib_path)
return ctypes.CDLL(lib_path)
[docs]
def save_optimization(optimal_params, algorithm="cdrec", dataset="", optimizer="b", file_name=None):
"""
Save the optimization parameters to a TOML file for later use without recomputing.
Parameters
----------
optimal_params : dict
Dictionary of the optimal parameters.
algorithm : str, optional
The name of the imputation algorithm (default is 'cdrec').
dataset : str, optional
The name of the dataset (default is an empty string).
optimizer : str, optional
The name of the optimizer used (default is 'b').
file_name : str, optional
The name of the TOML file to save the results (default is None).
Returns
-------
None
"""
if file_name is None:
file_name = "../params/optimal_parameters_" + str(optimizer) + "_" + str(dataset) + "_" + str(algorithm) + ".toml"
if not os.path.exists(file_name):
file_name = file_name[1:]
dir_name = os.path.dirname(file_name)
if dir_name and not os.path.exists(dir_name):
os.makedirs(dir_name)
if algorithm == "mrnn":
params_to_save = { "hidden_dim": int(optimal_params[0]),
"learning_rate": optimal_params[1],
"num_iter": int(optimal_params[2]),
"seq_len": 7 # Default value
}
elif algorithm == "stmvl":
params_to_save = {
"window_size": int(optimal_params[0]),
"gamma": optimal_params[1],
"alpha": int(optimal_params[2])
}
elif algorithm == "iim":
params_to_save = {
"learning_neighbors": int(optimal_params[0])
}
elif algorithm == "cdrec":
params_to_save = {
"rank": int(optimal_params[0]),
"eps": optimal_params[1],
"iters": int(optimal_params[2])
}
elif algorithm == "iterative_svd":
params_to_save = {
"rank": int(optimal_params[0])
}
elif algorithm == "grouse":
params_to_save= {
"max_rank": int(optimal_params[0])
}
elif algorithm == "rosl":
params_to_save = {
"rank": int(optimal_params[0]),
"regularization": optimal_params[1]
}
elif algorithm == "soft_impute":
params_to_save = {
"max_rank": int(optimal_params[0])
}
elif algorithm == "spirit":
params_to_save = {
"k": int(optimal_params[0]),
"w": int(optimal_params[1]),
"lvalue": optimal_params[2]
}
elif algorithm == "svt":
params_to_save = {
"tau": optimal_params[0],
"delta": optimal_params[1],
"max_iter": int(optimal_params[2])
}
elif algorithm == "dynammo":
params_to_save = {
"h": int(optimal_params[0]),
"max_iteration": int(optimal_params[1]),
"approximation": bool(optimal_params[2])
}
elif algorithm == "tkcm":
params_to_save = {
"rank": int(optimal_params[0])
}
elif algorithm == "brits":
params_to_save = {
"model": optimal_params[0],
"epoch": int(optimal_params[1]),
"batch_size": int(optimal_params[2]),
"hidden_layers": int(optimal_params[3])
}
elif algorithm == "deep_mvi":
params_to_save = {
"max_epoch": int(optimal_params[0]),
"patience": int(optimal_params[1]),
"lr": float(optimal_params[2])
}
elif algorithm == "mpin":
params_to_save = {
"incre_mode": optimal_params[0],
"window": int(optimal_params[1]),
"k": int(optimal_params[2]),
"learning_rate": optimal_params[3],
"weight_decay": optimal_params[4],
"epochs": int(optimal_params[5]),
"num_of_iteration": int(optimal_params[6]),
"threshold": optimal_params[7],
"base": optimal_params[8]
}
elif algorithm == "pristi":
params_to_save = {
"target_strategy": optimal_params[0],
"unconditional": bool(optimal_params[1]),
"seed": 42, # Default seed
"device": "cpu" # Default device
}
elif algorithm == "knn":
params_to_save = {
"k": int(optimal_params[0]),
"weights": str(optimal_params[1])
}
elif algorithm == "interpolation":
params_to_save = {
"method": str(optimal_params[0]),
"poly_order": int(optimal_params[1])
}
elif algorithm == "mice":
params_to_save = {
"max_iter": int(optimal_params[0]),
"tol": float(optimal_params[1]),
"initial_strategy": str(optimal_params[2]),
"seed": 42
}
elif algorithm == "miss_forest":
params_to_save = {
"n_estimators": int(optimal_params[0]),
"max_iter": int(optimal_params[1]),
"max_features": str(optimal_params[2]),
"seed": 42
}
elif algorithm == "xgboost":
params_to_save = {
"n_estimators": int(optimal_params[0]),
"seed": 42
}
elif algorithm == "miss_net":
params_to_save = {
"alpha": float(optimal_params[0]),
"beta": float(optimal_params[1]),
"L": int(optimal_params[2]),
"n_cl": int(optimal_params[3]),
"max_iter": int(optimal_params[4]),
"tol": float(optimal_params[5]),
"random_init": bool(optimal_params[6])
}
elif algorithm == "gain":
params_to_save = {
"batch_size": int(optimal_params[0]),
"hint_rate": float(optimal_params[1]),
"alpha": int(optimal_params[2]),
"epoch": int(optimal_params[3])
}
elif algorithm == "grin":
params_to_save = {
"d_hidden": int(optimal_params[0]),
"lr": float(optimal_params[1]),
"batch_size": int(optimal_params[2]),
"window": int(optimal_params[3]),
"alpha": int(optimal_params[4]),
"patience": int(optimal_params[5]),
"epochs": int(optimal_params[6]),
"workers": int(optimal_params[7])
}
elif algorithm == "grin":
params_to_save = {
"K_trend": int(optimal_params[0]),
"K_season": int(optimal_params[1]),
"n_season": int(optimal_params[2]),
"K_bias": int(optimal_params[3]),
"time_scale": int(optimal_params[4]),
"a0": float(optimal_params[5]),
"b0": float(optimal_params[6]),
"v": float(optimal_params[7])
}
elif algorithm == "hkmf_t":
params_to_save = {
"tags": optimal_params[0],
"data_names": optimal_params[1],
"epoch": int(optimal_params[2]),
}
elif algorithm == "bit_graph":
params_to_save = {
"node_number": int(optimal_params[0]),
"kernel_set": optimal_params[1],
"dropout": float(optimal_params[2]),
"subgraph_size": int(optimal_params[3]),
"node_dim": int(optimal_params[4]),
"seq_len": int(optimal_params[5]),
"lr": float(optimal_params[6]),
"epoch": int(optimal_params[7]),
"seed": int(optimal_params[8]),
}
else:
print(f"\n\t\t(SYS) Algorithm {algorithm} is not recognized.")
return
try:
with open(file_name, 'w') as file:
toml.dump(params_to_save, file)
print(f"\n\t\t(SYS) Optimization parameters successfully saved to {file_name}")
except Exception as e:
print(f"\n\t\t(SYS) An error occurred while saving the file: {e}")
[docs]
def list_of_algorithms():
return sorted([
"CDRec",
"IterativeSVD",
"GROUSE",
"ROSL",
"SPIRIT",
"SoftImpute",
"SVT",
"TRMF",
"STMVL",
"DynaMMo",
"TKCM",
"IIM",
"XGBOOST",
"MICE",
"MissForest",
"KNN",
"Interpolation",
"MinImpute",
"MeanImpute",
"ZeroImpute",
"MeanImputeBySeries",
"MRNN",
"BRITS",
"DeepMVI",
"MPIN",
"PRISTI",
"MissNet",
"GAIN",
"GRIN",
"BayOTIDE",
"HKMF_T",
"BitGraph"
])
[docs]
def list_of_patterns():
return sorted([
"missing_completely_at_random",
"missing_percentage",
"percentage_shift",
"disjoint",
"overlap",
"gaussian",
"distribution"
])
[docs]
def list_of_datasets(txt=False):
list = sorted([
"airq",
"bafu",
"chlorine",
"climate",
"drift",
"eeg-alcohol",
"eeg-reading",
"fmri-objectviewing",
"fmri-stoptask",
"meteo",
"electricity",
"motion",
"soccer",
"temperature"
])
if txt:
list = [dataset + ".txt" for dataset in list]
return list
[docs]
def list_of_optimizers():
return sorted([
"ray_tune",
"bayesian",
"particle_swarm",
"successive_halving",
"greedy"
])
[docs]
def list_of_downstreams():
return sorted([
"prophet",
"exp-smoothing",
"naive"
])