Source code for imputegap.recovery.optimization

import os
import time
from itertools import product
import numpy as np

from imputegap.recovery.imputation import Imputation
from imputegap.tools import utils
from imputegap.tools.algorithm_parameters import SEARCH_SPACES, ALL_ALGO_PARAMS, PARAM_NAMES, SEARCH_SPACES_PSO, RAYTUNE_PARAMS
import imputegap.tools.algorithm_parameters as sh_params

# RAY TUNE IMPORT
from ray import tune
import ray

# PSO IMPORT
from functools import partial
import pyswarms as ps

# BAYESIAN IMPORT
import skopt
from skopt.space import Integer

from pyswarms.utils.reporter import Reporter
reporter = Reporter()

[docs] class BaseOptimizer: """ A base class for optimization of imputation algorithm hyperparameters. Provides structure and common functionality for different optimization strategies. Methods ------- _objective(**kwargs): Abstract method to evaluate the imputation algorithm with the provided parameters. Must be implemented by subclasses. optimize(input_data, incomp_data, metrics, algorithm, **kwargs): Abstract method for the main optimization process. Must be implemented by subclasses. """ def __init__(self): pass def _objective(self, **kwargs): """ Abstract objective function for optimization. This method evaluates the imputation algorithm with the provided parameters and computes the error across the selected metrics. The exact implementation depends on the optimization method. Since different optimization methods (e.g., Particle Swarm, Bayesian) may require different inputs, the parameters of this function are passed as keyword arguments (**kwargs). Subclasses should implement this method with the required parameters for the specific optimization. Parameters ---------- **kwargs : dict Parameters needed to evaluate the imputation algorithm, such as: - input_data : numpy.ndarray The ground truth time series dataset. - contamination : numpy.ndarray The contaminated time series dataset to impute. - algorithm : str The imputation algorithm name. - metrics : list of str List of selected metrics for optimization. - params : dict or list Parameter values for the optimization. Returns ------- float Mean error for the selected metrics. """ raise NotImplementedError("Subclasses must implement the _objective method")
[docs] def optimize(self, input_data, incomp_data, metrics, algorithm, **kwargs): """ Abstract method for optimization. Must be implemented in subclasses. This method performs the optimization of hyperparameters for a given imputation algorithm. Each subclass implements a different optimization strategy (e.g., Greedy, Bayesian, Particle Swarm) and uses the `_objective` function to evaluate the parameters. Parameters ---------- input_data : numpy.ndarray The ground truth time series dataset. incomp_data : numpy.ndarray The contaminated time series dataset to impute. metrics : list of str List of selected metrics for optimization. algorithm : str The imputation algorithm to optimize. **kwargs : dict Additional parameters specific to the optimization strategy (e.g., number of iterations, particles, etc.). Returns ------- tuple A tuple containing the best parameters and their corresponding score. """ raise NotImplementedError("Subclasses must implement the optimize method")
[docs] class Optimization: """ A class for performing optimization of imputation algorithm hyperparameters. This class contains methods for various optimization strategies such as Greedy, Bayesian, Particle Swarm, and Successive Halving, used to find the best parameters for different imputation algorithms. Methods ------- Greedy.optimize(input_data, incomp_data, metrics=["RMSE"], algorithm="cdrec", n_calls=250): Perform greedy optimization for hyperparameters. Bayesian.optimize(input_data, incomp_data, metrics=["RMSE"], algorithm="cdrec", n_calls=100, n_random_starts=50, acq_func='gp_hedge'): Perform Bayesian optimization for hyperparameters. ParticleSwarm.optimize(input_data, incomp_data, metrics, algorithm, n_particles, c1, c2, w, iterations, n_processes): Perform Particle Swarm Optimization (PSO) for hyperparameters. SuccessiveHalving.optimize(input_data, incomp_data, metrics, algorithm, num_configs, num_iterations, reduction_factor): Perform Successive Halving optimization for hyperparameters. """
[docs] class Greedy(BaseOptimizer): """ Greedy optimization strategy for hyperparameters. """ def _objective(self, input_data, incomp_data, algorithm, metrics, params): """ Objective function for Greedy optimization. Parameters ---------- input_data : numpy.ndarray The ground truth time series dataset. incomp_data : numpy.ndarray The contaminated time series dataset to impute. algorithm : str The imputation algorithm name. metrics : list of str List of selected metrics for optimization. params : dict The parameters for the imputation algorithm. Returns ------- float Mean error for the selected metrics. """ errors = Imputation.evaluate_params(input_data, incomp_data, params, algorithm) if not isinstance(metrics, list): metrics = [metrics] return np.mean([errors[metric] for metric in metrics])
[docs] def optimize(self, input_data, incomp_data, metrics=["RMSE"], algorithm="cdrec", n_calls=250): """ Perform greedy optimization for hyperparameters. Parameters ---------- input_data : numpy.ndarray The ground truth time series dataset. incomp_data : numpy.ndarray The contaminated time series dataset to impute. metrics : list of str, optional List of selected metrics for optimization (default is ["RMSE"]). algorithm : str, optional The imputation algorithm to optimize (default is 'cdrec'). n_calls : int, optional Number of calls to the objective function (default is 250). Returns ------- tuple A tuple containing the best parameters and their corresponding score. """ start_time = time.time() # Record start time # Map the parameter ranges to the algorithm-specific search space param_ranges = ALL_ALGO_PARAMS[algorithm] # Extract parameter names and their ranges for the selected algorithm param_names = list(param_ranges.keys()) param_values = list(param_ranges.values()) # Generate all combinations of parameters in the search space param_combinations = list(product(*param_values)) # Cartesian product of all parameter values # Placeholder for the best parameters and their score best_params = None best_score = float('inf') # Assuming we are minimizing the objective function run_count = 0 # Conduct greedy optimization over parameter combinations for params in param_combinations: if n_calls is not None and run_count >= n_calls: break # Convert params to a dictionary for compatibility params_dict = {name: value for name, value in zip(param_names, params)} # Calculate the score for the current set of parameters score = self._objective(input_data, incomp_data, algorithm, metrics, params_dict) # Update the best parameters if the current score is better if score < best_score: best_score = score best_params = params_dict # Increment the run counter run_count += 1 end_time = time.time() print(f"\n\t\t> logs, optimization greedy - Execution Time: {(end_time - start_time):.4f} seconds\n") return best_params, best_score
[docs] class Bayesian(BaseOptimizer): """ Bayesian optimization strategy for hyperparameters. """ def _objective(self, input_data, incomp_data, algorithm, metrics, params): """ Objective function for Bayesian optimization. Parameters ---------- input_data : numpy.ndarray The ground truth time series dataset. incomp_data : numpy.ndarray The contaminated time series dataset to impute. algorithm : str The imputation algorithm name. metrics : list of str List of selected metrics for optimization. params : dict Parameter values for the optimization. Returns ------- float Mean error for the selected metrics. """ # Check if params is a dictionary or a list if isinstance(params, dict): param_values = tuple(params.values()) # Convert dictionary to tuple of values else: param_values = tuple(params) if not isinstance(metrics, list): metrics = [metrics] errors = Imputation.evaluate_params(input_data, incomp_data, param_values, algorithm) return np.mean([errors[metric] for metric in metrics])
[docs] def optimize(self, input_data, incomp_data, metrics=["RMSE"], algorithm="cdrec", n_calls=100, n_random_starts=50, acq_func='gp_hedge'): """ Perform Bayesian optimization for hyperparameters. Parameters ---------- input_data : numpy.ndarray The ground truth time series dataset. incomp_data : numpy.ndarray The contaminated time series dataset to impute. metrics : list of str, optional List of selected metrics for optimization (default is ["RMSE"]). algorithm : str, optional The imputation algorithm to optimize (default is 'cdrec'). n_calls : int, optional Number of calls to the objective function (default is 100). n_random_starts : int, optional Number of random initial points (default is 50). acq_func : str, optional Acquisition function for the Gaussian prior (default is 'gp_hedge'). Returns ------- tuple A tuple containing the best parameters and their corresponding score. """ start_time = time.time() # Record start time search_spaces = SEARCH_SPACES # Adjust the search space for 'cdrec' based on incomp_data if algorithm == 'cdrec': max_rank = incomp_data.shape[1] - 1 SEARCH_SPACES['cdrec'][0] = Integer(0, min(9, max_rank), name='rank') # Update the rank range # Define the search space space = search_spaces[algorithm] # Conduct Bayesian optimization optimizer = skopt.Optimizer(dimensions=space, n_initial_points=n_random_starts, acq_func=acq_func) for i in range(n_calls): suggested_params = optimizer.ask() score = self._objective(input_data, incomp_data, algorithm, metrics, suggested_params) optimizer.tell(suggested_params, score) # Optimal parameters optimal_params = optimizer.Xi[np.argmin(optimizer.yi)] optimal_params_dict = {name: value for name, value in zip([dim.name for dim in space], optimal_params)} end_time = time.time() print(f"\n\t\t> logs, optimization bayesian - Execution Time: {(end_time - start_time):.4f} seconds\n") return optimal_params_dict, np.min(optimizer.yi)
[docs] class ParticleSwarm(BaseOptimizer): """ Particle Swarm Optimization (PSO) strategy for hyperparameters. """ def _format_params(self, particle_params, algorithm): """ Format parameters for the given algorithm. Parameters ---------- particle_params : list List of particle parameters. algorithm : str The imputation algorithm name. Returns ------- list Formatted list of parameters. """ if algorithm == 'cdrec': particle_params = [int(particle_params[0]), particle_params[1], int(particle_params[2])] if algorithm == 'iim': particle_params = [int(particle_params[0])] elif algorithm == 'mrnn': particle_params = [int(particle_params[0]), particle_params[1], int(particle_params[2])] elif algorithm == 'stmvl': particle_params = [int(particle_params[0]), particle_params[1], int(particle_params[2])] return particle_params def _objective(self, input_data, incomp_data, algorithm, metrics, params): """ Objective function for Particle Swarm Optimization. Parameters ---------- input_data : numpy.ndarray The ground truth time series dataset. incomp_data : numpy.ndarray The contaminated time series dataset to impute. algorithm : str The imputation algorithm name. metrics : list of str List of selected metrics for optimization. params : numpy.ndarray Parameter values for the optimization. Returns ------- numpy.ndarray Array of error values for each particle. """ n_particles = params.shape[0] # Get the number of particles # Initialize array to hold the errors for each particle errors_for_all_particles = np.zeros(n_particles) for i in range(n_particles): # Iterate over each particle particle_params = self._format_params(params[i], algorithm) # Get the parameters for this particle errors = Imputation.evaluate_params(input_data, incomp_data, tuple(particle_params), algorithm) errors_for_all_particles[i] = np.mean([errors[metric] for metric in metrics]) return errors_for_all_particles
[docs] def optimize(self, input_data, incomp_data, metrics, algorithm, n_particles, c1, c2, w, iterations, n_processes): """ Perform Particle Swarm Optimization for hyperparameters. Parameters ---------- input_data : numpy.ndarray The ground truth time series dataset. incomp_data : numpy.ndarray The contaminated time series dataset to impute. metrics : list of str, optional List of selected metrics for optimization (default is ["RMSE"]). algorithm : str, optional The imputation algorithm to optimize (default is 'cdrec'). n_particles : int Number of particles used in PSO. c1 : float PSO parameter, personal learning coefficient. c2 : float PSO parameter, global learning coefficient. w : float PSO parameter, inertia weight. iterations : int Number of iterations for the optimization. n_processes : int Number of processes during optimization. Returns ------- tuple A tuple containing the best parameters and their corresponding score. """ start_time = time.time() # Record start time if not isinstance(metrics, list): metrics = [metrics] # Define the search space search_space = SEARCH_SPACES_PSO if algorithm == 'cdrec': max_rank = incomp_data.shape[1] - 1 search_space['cdrec'][0] = (search_space['cdrec'][0][0], min(search_space['cdrec'][0][1], max_rank)) # Select the correct search space based on the algorithm bounds = search_space[algorithm] # Convert search space to PSO-friendly format (two lists: one for min and one for max values for each parameter) lower_bounds, upper_bounds = zip(*bounds) bounds = (np.array(lower_bounds), np.array(upper_bounds)) # Call instance of PSO optimizer = ps.single.GlobalBestPSO(n_particles=n_particles, dimensions=len(bounds[0]), options={'c1': c1, 'c2': c2, 'w': w}, bounds=bounds) # Perform optimization objective_with_args = partial(self._objective, input_data, incomp_data, algorithm, metrics) cost, pos = optimizer.optimize(objective_with_args, iters=iterations, n_processes=n_processes) param_names = PARAM_NAMES optimal_params = self._format_params(pos, algorithm) optimal_params_dict = {param_name: value for param_name, value in zip(param_names[algorithm], optimal_params)} end_time = time.time() print(f"\n\t\t> logs, optimization pso - Execution Time: {(end_time - start_time):.4f} seconds\n") return optimal_params_dict, cost
[docs] class SuccessiveHalving(BaseOptimizer): def _objective(self, errors_dict, metrics): """ Objective function for Successive Halving optimization. Parameters ---------- errors_dict : dict Dictionary containing error metrics. metrics : list of str List of selected metrics for optimization. Returns ------- float Mean error for the selected metrics. """ selected_errors = [errors_dict[metric] for metric in metrics] return np.mean(selected_errors)
[docs] def optimize(self, input_data, incomp_data, metrics, algorithm, num_configs, num_iterations, reduction_factor): """ Perform Successive Halving optimization for hyperparameters. Parameters ---------- input_data : numpy.ndarray The ground truth time series dataset. incomp_data : numpy.ndarray The contaminated time series dataset to impute. metrics : list of str, optional List of selected metrics for optimization (default is ["RMSE"]). algorithm : str, optional The imputation algorithm to optimize (default is 'cdrec'). num_configs : int Number of configurations to try. num_iterations : int Number of iterations for the optimization. reduction_factor : int Reduction factor for the number of configurations kept after each iteration. Returns ------- tuple A tuple containing the best parameters and their corresponding score. """ start_time = time.time() # Record start time if not isinstance(metrics, list): metrics = [metrics] # Define the parameter names for each algorithm param_names = PARAM_NAMES data_length = len(input_data) chunk_size = data_length // num_iterations # prepare configurations for each algorithm separately if algorithm == 'cdrec': max_rank = incomp_data.shape[1] - 1 temp_rank_range = [i for i in sh_params.CDREC_RANK_RANGE if i < max_rank] if not temp_rank_range: raise ValueError("No suitable rank found within CDREC_RANK_RANGE for the given matrix shape!") configs = [(np.random.choice(temp_rank_range), np.random.choice(sh_params.CDREC_EPS_RANGE), np.random.choice(sh_params.CDREC_ITERS_RANGE)) for _ in range(num_configs)] elif algorithm == 'iim': configs = [(np.random.choice(sh_params.IIM_LEARNING_NEIGHBOR_RANGE)) for _ in range(num_configs)] elif algorithm == 'mrnn': configs = [(np.random.choice(sh_params.MRNN_HIDDEN_DIM_RANGE), np.random.choice(sh_params.MRNN_LEARNING_RATE_CHANGE), np.random.choice(sh_params.MRNN_NUM_ITER_RANGE)) for _ in range(num_configs)] elif algorithm == 'stmvl': configs = [(np.random.choice(sh_params.STMVL_WINDOW_SIZE_RANGE), np.random.choice(sh_params.STMVL_GAMMA_RANGE), np.random.choice(sh_params.STMVL_ALPHA_RANGE)) for _ in range(num_configs)] else: raise ValueError(f"Invalid algorithm: {algorithm}") for i in range(num_iterations): # Calculate how much data to use in this iteration end_idx = (i + 1) * chunk_size partial_input_data = input_data[:end_idx] partial_obfuscated = incomp_data[:end_idx] scores = [self._objective( Imputation.evaluate_params(partial_input_data, partial_obfuscated, config, algorithm), metrics) for config in configs] top_configs_idx = np.argsort(scores)[:max(1, len(configs) // reduction_factor)] configs = [configs[i] for i in top_configs_idx] if len(configs) <= 1: break # Exit the loop if only 1 configuration remains if not configs: raise ValueError("No configurations left after successive halving.") if algorithm == 'iim': best_config = min(configs, key=lambda single_config: self._objective( Imputation.evaluate_params(input_data, incomp_data, [single_config], algorithm), metrics)) else: best_config = min(configs, key=lambda config: self._objective( Imputation.evaluate_params(input_data, incomp_data, config, algorithm), metrics)) best_score = self._objective( Imputation.evaluate_params(input_data, incomp_data, best_config, algorithm), metrics) # Check the size of param_names[algorithm] if len(param_names[algorithm]) == 1: # If only one parameter name, wrap best_config in a list if it's not already best_config = [best_config] if not isinstance(best_config, list) else best_config # Create the dictionary using zip best_config_dict = {name: value for name, value in zip(param_names[algorithm], best_config)} end_time = time.time() print(f"\n\t\t> logs, optimization sh - Execution Time: {(end_time - start_time):.4f} seconds\n") return best_config_dict, best_score
[docs] class RayTune(BaseOptimizer): """ RayTune optimization strategy for hyperparameters. """ def _objective(self, params, input_data, incomp_data, algorithm, used_metric): """ Objective function for RayTune optimization. """ print("\n\n\nPARAMS ", params) imputer = utils.config_impute_algorithm(incomp_data, algorithm) imputer.impute(user_def=True, params=params) imputer.score(input_data=input_data) score = imputer.metrics.get(used_metric, "Key not found") return score
[docs] def optimize(self, input_data, incomp_data, metrics=["RMSE"], algorithm="cdrec", n_calls=1, max_concurrent_trials=-1): """ Perform Ray Tune optimization for hyperparameters. Parameters ---------- input_data : numpy.ndarray The ground truth time series dataset. metrics : list of str, optional List of selected metrics for optimization (default is ["RMSE"]). algorithm : str, optional The imputation algorithm to optimize (default is 'cdrec'). n_calls : int, optional Number of calls to the objective function (default is 10). max_concurrent_trials : int, optional Number of trials run in parallel, related to your total memory / cpu / gpu (default is 2). Please increase the value if you have more resources Returns ------- tuple A tuple containing the best parameters and their corresponding score. """ if not ray.is_initialized(): ray.init() used_metric = metrics[0] if max_concurrent_trials == -1: total_cpus = max(1, sum(node["Resources"].get("CPU", 0) for node in ray.nodes() if node["Alive"]) - 1) total_memory_gb = sum(node["Resources"].get("memory", 0) for node in ray.nodes() if node["Alive"]) / (1024 ** 3) print(f"\n\t\t(OPTI) > Ray Total accessible CPU cores for parallelization: {total_cpus}") print(f"\n\t\t(OPTI) > Ray Total accessible memory for parallelization: {total_memory_gb:.2f} GB") max_concurrent_trials = min(int(total_memory_gb // 2), total_cpus) print(f"\n\t\t(OPTI) > Ray tune max_concurrent_trials {max_concurrent_trials}, for {n_calls} calls and metric {used_metric}\n") start_time = time.time() # Record start time search_space = RAYTUNE_PARAMS[algorithm] print(f"\n\t\t(OPTI) > Ray tune - SEARCH SPACE: {search_space}\n") def objective_wrapper(config): params = {key: config[key] for key in config} try: score = self._objective(params, input_data, incomp_data, algorithm, used_metric) if score is None or not isinstance(score, (int, float)): raise ValueError("\n\n\n\t\t\tRAY_TUNE OBJECTIVE ERROR) >> Invalid score returned from _objective") except Exception as e: print(f"\n\n\n\t\t\t(RAY_TUNE OBJECTIVE ERROR) >> Error in objective function: {e}") score = float("inf") # Return worst possible score return {used_metric: score} # Ensures correct format analysis = tune.run( objective_wrapper, config=search_space, metric=used_metric, mode="min", num_samples=n_calls, max_concurrent_trials=max_concurrent_trials ) print(f"\n\t\t(OPTI) > Ray tune - BEST CONFIG: {analysis.best_config}\n") end_time = time.time() print(f"\n\t\t> logs, optimization ray tune - Execution Time: {(end_time - start_time):.4f} seconds_____\n") ray.shutdown() return analysis.best_config