import datetime
import os
import time
import numpy as np
import matplotlib
from scipy.stats import zscore
from sklearn.preprocessing import MinMaxScaler
import importlib.resources
from scipy.stats import norm
from imputegap.tools import utils
# Use Agg backend if in a headless or CI environment
if os.getenv('DISPLAY') is None or os.getenv('CI') is not None:
matplotlib.use("Agg")
print("Running in a headless environment or CI. Using Agg backend.")
else:
try:
matplotlib.use("TkAgg")
if importlib.util.find_spec("tkinter") is None:
print("tkinter is not available.")
except (ImportError, RuntimeError):
matplotlib.use("Agg")
from matplotlib import pyplot as plt # type: ignore
[docs]
class TimeSeries:
"""
Class for managing and manipulating time series data.
This class allows importing, normalizing, and visualizing time series datasets. It also provides methods
to contaminate the datasets with missing values and plot results.
Methods
-------
__init__() :
Initializes the TimeSeries object.
import_matrix(data=None) :
Imports a matrix of time series data.
load_series(data=None, max_series=None, max_values=None, header=False) :
Loads time series data from a file or predefined dataset.
print(limit=10, view_by_series=False) :
Prints a limited number of time series from the dataset.
print_results(metrics, algorithm="") :
Prints the results of the imputation process.
normalize(normalizer="z_score") :
Normalizes the time series dataset.
plot(input_data, incomp_data=None, recov_data=None, max_series=None, max_values=None, size=(16, 8), save_path="", display=True) :
Plots the time series data, including raw, contaminated, or imputed data.
Contamination :
Class containing methods to contaminate time series data with missing values based on different patterns.
"""
def __init__(self):
"""
Initialize the TimeSeries object.
The class works with time series datasets, where each series is separated by space, and values
are separated by newline characters.
IMPORT FORMAT : (Values,Series) : series are seperated by "SPACE" et values by "\\n"
"""
self.data = None
self.name = "default"
self.algorithms = utils.list_of_algorithms()
self.patterns = utils.list_of_patterns()
self.datasets = utils.list_of_datasets()
self.optimizers = utils.list_of_optimizers()
self.downstream_models = utils.list_of_downstreams()
[docs]
def import_matrix(self, data=None):
"""
Imports a matrix of time series data.
The data can be provided as a list or a NumPy array. The format is (Series, Values),
where series are separated by space, and values are separated by newline characters.
Parameters
----------
data : list or numpy.ndarray, optional
The matrix of time series data to import.
Returns
-------
TimeSeries
The TimeSeries object with the imported data.
"""
if data is not None:
if isinstance(data, list):
self.data = np.array(data)
elif isinstance(data, np.ndarray):
self.data = data
else:
print("\nThe time series have not been loaded, format unknown\n")
self.data = None
raise ValueError("Invalid input for import_matrix")
return self
[docs]
def load_series(self, data, nbr_series=None, nbr_val=None, header=False, replace_nan=False):
"""
Loads time series data from a file or predefined dataset.
The data is loaded as a matrix of shape (Values, Series). You can limit the number of series
or values per series for computational efficiency.
Parameters
----------
data : str
The file path or name of a predefined dataset (e.g., 'bafu.txt').
nbr_series : int, optional
The maximum number of series to load.
nbr_val : int, optional
The maximum number of values per series.
header : bool, optional
Whether the dataset has a header. Default is False.
replace_nan : bool, optional
The Dataset has already NaN values that needs to be replaced by 0 values.
Returns
-------
TimeSeries
The TimeSeries object with the loaded data.
"""
if data is not None:
if isinstance(data, str):
saved_data = data
# update path form inner library datasets
if data in utils.list_of_datasets(txt=True):
self.name = data[:-4]
data = importlib.resources.files('imputegap.dataset').joinpath(data)
if not os.path.exists(data):
data = ".." + saved_data
if not os.path.exists(data):
data = data[1:]
self.data = np.genfromtxt(data, delimiter=' ', max_rows=nbr_val, skip_header=int(header))
print("\nThe time series have been loaded from " + str(data) + "\n")
if nbr_series is not None:
self.data = self.data[:, :nbr_series]
else:
print("\nThe time series have not been loaded, format unknown\n")
self.data = None
raise ValueError("Invalid input for load_series")
if replace_nan:
print("\nThe NaN values has been set to zero...\n")
self.data = np.nan_to_num(self.data) # Replace NaNs with 0
self.data = self.data.T
return self
[docs]
def print(self, nbr_val=10, nbr_series=7, view_by_series=False):
"""
Prints a limited number of time series from the dataset.
Parameters
----------
nbr_val : int, optional
The number of timestamps to print. Default is 15. Use -1 for no restriction.
nbr_series : int, optional
The number of series to print. Default is 10. Use -1 for no restriction.
view_by_series : bool, optional
Whether to view by series (True) or by values (False).
Returns
-------
None
"""
print("\nTime Series set :")
to_print = self.data
nbr_tot_series, nbr_tot_values = to_print.shape
print_col, print_row = "Timestamp", "Series"
if nbr_val == -1:
nbr_val = to_print.shape[1]
if nbr_series == -1:
nbr_series = to_print.shape[0]
to_print = to_print[:nbr_series, :nbr_val]
if not view_by_series:
to_print = to_print.T
print_col, print_row = "Series", "Timestamp"
header_format = "{:<15}" # Fixed size for headers
value_format = "{:>15.10f}" # Fixed size for values
# Print the header
print(f"{'':<18}", end="") # Empty space for the row labels
for i in range(to_print.shape[1]):
print(header_format.format(f"{print_col}_{i + 1}"), end="")
print()
# Print each limited series with fixed size
for i, series in enumerate(to_print):
print(header_format.format(f"{print_row} {i + 1}"), end="")
print("".join([value_format.format(elem) for elem in series]))
if nbr_series < nbr_tot_series:
print("...")
print("\nshape of the time series :", self.data.shape, "\n\tnumber of series =", nbr_tot_series,
"\n\tnumber of values =", nbr_tot_values, "\n\n")
[docs]
def print_results(self, metrics, algorithm="", text="Imputation Results of"):
"""
Prints the results of the imputation process.
Parameters
----------
metrics : dict
A dictionary containing the imputation metrics to display.
algorithm : str, optional
The name of the algorithm used for imputation.
algorithm : str, optional
Output text to help the user.
Returns
-------
None
"""
if algorithm != "":
print(f"\n\n{text} ({algorithm}) :")
else:
print(f"\n\n{text} :")
for key, value in metrics.items():
print(f"{key:<20} = {value}")
print("\n")
[docs]
def normalize(self, normalizer="z_score"):
"""
Normalize the time series dataset.
Supported normalization techniques are "z_score" and "min_max". The method also logs
the execution time for the normalization process.
Parameters
----------
normalizer : str, optional
The normalization technique to use. Options are "z_score" or "min_max". Default is "z_score".
Returns
-------
numpy.ndarray
The normalized time series data.
"""
print("Normalization of the original time series dataset with ", normalizer)
self.data = self.data.T
if normalizer == "min_max":
start_time = time.time() # Record start time
# Compute the min and max for each series (column-wise), ignoring NaN
ts_min = np.nanmin(self.data, axis=0)
ts_max = np.nanmax(self.data, axis=0)
# Compute the range for each series, and handle cases where the range is 0
range_ts = ts_max - ts_min
range_ts[range_ts == 0] = 1 # Prevent division by zero for constant series
# Apply min-max normalization
self.data = (self.data - ts_min) / range_ts
end_time = time.time()
elif normalizer == "z_lib":
start_time = time.time() # Record start time
self.data = zscore(self.data, axis=0)
end_time = time.time()
elif normalizer == "m_lib":
start_time = time.time() # Record start time
scaler = MinMaxScaler()
self.data = scaler.fit_transform(self.data)
end_time = time.time()
else:
start_time = time.time() # Record start time
mean = np.mean(self.data, axis=0)
std_dev = np.std(self.data, axis=0)
# Avoid division by zero: set std_dev to 1 where it is zero
std_dev[std_dev == 0] = 1
# Apply z-score normalization
self.data = (self.data - mean) / std_dev
end_time = time.time()
self.data = self.data.T
print(f"\n\t\t> logs, normalization {normalizer} - Execution Time: {(end_time - start_time):.4f} seconds\n")
[docs]
def plot(self, input_data, incomp_data=None, recov_data=None, nbr_series=None, nbr_val=None, series_range=None,
subplot=False, size=(16, 8), save_path="./imputegap/assets", display=True):
"""
Plot the time series data, including raw, contaminated, or imputed data.
Parameters
----------
input_data : numpy.ndarray
The original time series data without contamination.
incomp_data : numpy.ndarray, optional
The contaminated time series data.
recov_data : numpy.ndarray, optional
The imputed time series data.
nbr_series : int, optional
The maximum number of series to plot.
nbr_val : int, optional
The maximum number of values per series to plot.
series_range : int, optional
The index of a specific series to plot. If set, only this series will be plotted.
subplot : bool, optional
Print one time series by subplot or all in the same plot.
size : tuple, optional
Size of the plot in inches. Default is (16, 8).
save_path : str, optional
Path to save the plot locally.
display : bool, optional
Whether to display the plot. Default is True.
Returns
-------
str or None
The file path of the saved plot, if applicable.
"""
number_of_series = 0
if nbr_series is None or nbr_series == -1:
nbr_series = input_data.shape[0]
if nbr_val is None or nbr_val == -1:
nbr_val = input_data.shape[1]
if subplot:
series_indices = [i for i in range(incomp_data.shape[0]) if np.isnan(incomp_data[i]).any()]
count_series = [series_range] if series_range is not None else range(min(len(series_indices), nbr_series))
n_series_to_plot = len(count_series)
else:
series_indices = [series_range] if series_range is not None else range(min(input_data.shape[0], nbr_series))
n_series_to_plot = len(series_indices)
if n_series_to_plot == 0:
n_series_to_plot = min(nbr_series, incomp_data.shape[0])
if subplot:
n_cols = min(3, n_series_to_plot)
n_rows = (n_series_to_plot + n_cols - 1) // n_cols
x_size, y_size = size
x_size = x_size * n_cols
y_size = y_size * n_rows
scale_factor = 0.85
x_size_screen = (1920 / 100) * scale_factor
y_size_screen = (1080 / 100) * scale_factor
if n_rows < 4:
x_size = x_size_screen
y_size = y_size_screen
fig, axes = plt.subplots(n_rows, n_cols, figsize=(x_size, y_size), squeeze=False)
axes = axes.flatten()
else:
plt.figure(figsize=size)
plt.grid(True, linestyle='--', color='#d3d3d3', linewidth=0.6)
if input_data is not None:
colors = utils.load_parameters("default", algorithm="colors")
for idx, i in enumerate(series_indices):
if subplot:
color = colors[0]
else:
color = colors[i % len(colors)]
timestamps = np.arange(min(input_data.shape[1], nbr_val))
# Select the current axes if using subplots
if subplot:
ax = axes[idx]
ax.grid(True, linestyle='--', color='#d3d3d3', linewidth=0.6)
else:
ax = plt
if incomp_data is None and recov_data is None: # plot only raw matrix
ax.plot(timestamps, input_data[i, :nbr_val], linewidth=2.5,
color=color, linestyle='-', label=f'TS {i + 1}')
if incomp_data is not None and recov_data is None: # plot infected matrix
if np.isnan(incomp_data[i, :]).any():
ax.plot(timestamps, input_data[i, :nbr_val], linewidth=1.5,
color=color, linestyle='--', label=f'TS-INCOMP {i + 1}')
if np.isnan(incomp_data[i, :]).any() or not subplot:
ax.plot(np.arange(min(incomp_data.shape[1], nbr_val)), incomp_data[i, :nbr_val],
color=color, linewidth=2.5, linestyle='-', label=f'TS-INPUT {i + 1}')
if recov_data is not None: # plot imputed matrix
if np.isnan(incomp_data[i, :]).any():
ax.plot(np.arange(min(recov_data.shape[1], nbr_val)), recov_data[i, :nbr_val],
linestyle='-', color="r", label=f'TS-RECOV {i + 1}')
ax.plot(timestamps, input_data[i, :nbr_val], linewidth=1.5,
linestyle='--', color=color, label=f'TS-INCOM {i + 1}')
if np.isnan(incomp_data[i, :]).any() or not subplot:
ax.plot(np.arange(min(incomp_data.shape[1], nbr_val)), incomp_data[i, :nbr_val],
color=color, linewidth=2.5, linestyle='-', label=f'TS-INPUT {i + 1}')
# Label and legend for subplot
if subplot:
ax.set_title('Series ' + str(i+1), fontsize=9)
ax.set_xlabel('Timestamp', fontsize=7)
ax.set_ylabel('Values', fontsize=7)
ax.legend(loc='upper left', fontsize=7)
plt.tight_layout()
number_of_series += 1
if number_of_series == nbr_series:
break
if subplot:
for idx in range(len(series_indices), len(axes)):
axes[idx].axis('off')
if not subplot:
plt.xlabel('Timestamp')
plt.ylabel('Values')
plt.legend(
loc='upper left',
fontsize=10,
frameon=True,
fancybox=True,
shadow=True,
borderpad=1.5,
bbox_to_anchor=(1.02, 1), # Adjusted to keep the legend inside the window
)
file_path = None
if save_path:
os.makedirs(save_path, exist_ok=True)
now = datetime.datetime.now()
current_time = now.strftime("%y_%m_%d_%H_%M_%S")
file_path = os.path.join(save_path + "/" + current_time + "_plot.jpg")
plt.savefig(file_path, bbox_inches='tight')
print("plots saved in ", file_path)
if display:
plt.show()
return file_path
[docs]
class Contamination:
"""
Inner class to apply contamination patterns to the time series data.
Methods
-------
missing_completely_at_random(ts, series_rate=0.2, missing_rate=0.2, block_size=10, offset=0.1, seed=True, explainer=False) :
Apply Missing Completely at Random (MCAR) contamination to the time series data.
missing_percentage(ts, series_rate=0.2, missing_rate=0.2, offset=0.1) :
Apply missing percentage contamination to the time series data.
missing_percentage_at_random(ts, series_rate=0.2, missing_rate=0.2, offset=0.1, seed=True) :
Apply missing percentage contamination at random to the time series data.
blackout(ts, missing_rate=0.2, offset=0.1) :
Apply blackout contamination to the time series data.
gaussian(input_data, series_rate=0.2, missing_rate=0.2, std_dev=0.2, offset=0.1, seed=True):
Apply Gaussian contamination to the time series data.
distribution(input_data, rate_dataset=0.2, rate_series=0.2, probabilities=None, offset=0.1, seed=True):
Apply any distribution contamination to the time series data based on their probabilities.
disjoint(input_data, missing_rate=0.1, limit=1, offset=0.1):
Apply Disjoint contamination to the time series data.
overlap(input_data, missing_rate=0.2, limit=1, shift=0.05, offset=0.1,):
Apply Overlapping contamination to the time series data.
"""
[docs]
def missing_completely_at_random(input_data, rate_dataset=0.2, rate_series=0.2, block_size=10, offset=0.1, seed=True, explainer=False):
"""
Apply Missing Completely at Random (MCAR) contamination to the time series data.
Parameters
----------
input_data : numpy.ndarray
The time series dataset to contaminate.
rate_dataset : float, optional
Percentage of series to contaminate (default is 0.2).
rate_series : float, optional
Percentage of missing values per series (default is 0.2).
block_size : int, optional
Size of the block of missing data (default is 10).
offset : float, optional
Size of the uncontaminated section at the beginning of the series (default is 0.1).
seed : bool, optional
Whether to use a seed for reproducibility (default is True).
explainer : bool, optional
Whether to apply MCAR to specific series for explanation purposes (default is False).
Returns
-------
numpy.ndarray
The contaminated time series data.
"""
if seed:
seed_value = 42
np.random.seed(seed_value)
ts_contaminated = input_data.copy()
M, NS = ts_contaminated.shape
if not explainer: # use random series
rate_series = utils.verification_limitation(rate_series)
rate_dataset = utils.verification_limitation(rate_dataset)
offset = utils.verification_limitation(offset)
nbr_series_impacted = int(np.ceil(M * rate_dataset))
series_selected = [str(idx) for idx in np.random.choice(M, nbr_series_impacted, replace=False)]
else: # use fix series
series_selected = [str(rate_dataset)]
offset_nbr = int(offset * NS)
values_nbr = int(NS * rate_series)
if not explainer:
print(f"\n\n\tMCAR contamination has been called with :"
f"\n\t\ta number of series impacted {rate_dataset * 100}%"
f"\n\t\ta missing rate of {rate_series * 100}%"
f"\n\t\ta starting position at {offset_nbr}"
f"\n\t\ta block size of {block_size}"
f"\n\t\tvalues to remove by series {values_nbr}"
f"\n\t\twith a seed option set to {seed}"
f"\n\t\twith a seed value set to {seed_value}"
f"\n\t\tshape of the set {ts_contaminated.shape}"
f"\n\t\tthis selection of series {series_selected}\n\n")
if offset_nbr + values_nbr > NS:
raise ValueError(
f"\n\tError: The sum of offset ({offset_nbr}) and missing values ({values_nbr}) exceeds the limit of of the series."
f" ({offset_nbr+values_nbr} must be smaller than {NS}).")
for series in series_selected:
S = int(series)
N = len(ts_contaminated[S]) # number of values in the series
P = int(N * offset) # values to protect in the beginning of the series
W = int(N * rate_series) # number of data to remove
B = int(W / block_size) # number of block to remove
if B <= 0:
raise ValueError("The number of block to remove must be greater than 0. "
"The dataset or the number of blocks may not be appropriate."
"One series has", str(N), "population is ", str((N - P)), "the number to remove",
str(W), "and block site", str(block_size), "")
data_to_remove = np.random.choice(range(P, N), B, replace=False)
for start_point in data_to_remove:
for jump in range(block_size): # remove the block size for each random position
position = start_point + jump
if position >= N: # If block exceeds the series length
position = P + (position - N) # Wrap around to the start after protection
while np.isnan(ts_contaminated[S, position]):
position = position + 1
if position >= N: # If block exceeds the series length
position = P + (position - N) # Wrap around to the start after protection
ts_contaminated[S, position] = np.nan
return ts_contaminated
[docs]
def missing_percentage(input_data, rate_dataset=0.2, rate_series=0.2, offset=0.1):
"""
Apply missing percentage contamination to the time series data.
Parameters
----------
input_data : numpy.ndarray
The time series dataset to contaminate.
rate_dataset : float, optional
Percentage of series to contaminate (default is 0.2).
rate_series : float, optional
Percentage of missing values per series (default is 0.2).
offset : float, optional
Size of the uncontaminated section at the beginning of the series (default is 0.1).
Returns
-------
numpy.ndarray
The contaminated time series data.
"""
ts_contaminated = input_data.copy()
M, NS = ts_contaminated.shape
rate_series = utils.verification_limitation(rate_series)
rate_dataset = utils.verification_limitation(rate_dataset)
offset = utils.verification_limitation(offset)
nbr_series_impacted = int(np.ceil(M * rate_dataset))
offset_nbr = int(offset*NS)
values_nbr = int(NS * rate_series)
print("\n\n\tMISSING PERCENTAGE contamination has been called with :"
"\n\t\ta number of series impacted ", rate_dataset * 100, "%",
"\n\t\ta missing rate of ", rate_series * 100, "%",
"\n\t\ta starting position at ", offset,
"\n\t\tshape of the set ", ts_contaminated.shape,
"\n\t\tthis selection of series : ", 1, "->", nbr_series_impacted,
"\n\t\tvalues : ", offset_nbr, "->", offset_nbr + values_nbr, "\n\n")
if offset_nbr + values_nbr > NS:
raise ValueError(
f"\n\tError: The sum of offset ({offset_nbr}) and missing values ({values_nbr}) exceeds the limit of of the series."
f" ({offset_nbr+values_nbr} must be smaller than {NS}).")
for series in range(0, nbr_series_impacted):
S = int(series)
N = len(ts_contaminated[S]) # number of values in the series
P = int(N * offset) # values to protect in the beginning of the series
W = int(N * rate_series) # number of data to remove
for to_remove in range(0, W):
index = P + to_remove
ts_contaminated[S, index] = np.nan
return ts_contaminated
[docs]
def percentage_shift(input_data, rate_dataset=0.2, rate_series=0.2, offset=0.1, seed=True):
"""
Apply percentage shift contamination with random starting position to the time series data.
Parameters
----------
input_data : numpy.ndarray
The time series dataset to contaminate.
rate_dataset : float, optional
Percentage of series to contaminate (default is 0.2).
rate_series : float, optional
Percentage of missing values per series (default is 0.2).
offset : float, optional
Size of the uncontaminated section at the beginning of the series (default is 0.1).
seed : bool, optional
Whether to use a seed for reproducibility (default is True).
Returns
-------
numpy.ndarray
The contaminated time series data.
"""
if seed:
seed_value = 42
np.random.seed(seed_value)
ts_contaminated = input_data.copy()
M, NS = ts_contaminated.shape
rate_series = utils.verification_limitation(rate_series)
rate_dataset = utils.verification_limitation(rate_dataset)
offset = utils.verification_limitation(offset)
nbr_series_impacted = int(np.ceil(M * rate_dataset))
offset_nbr = int(offset*NS)
values_nbr = int(NS * rate_series)
print("\n\n\tMISSING PERCENTAGE AT RANDOM contamination has been called with :"
"\n\t\ta number of series impacted ", rate_dataset * 100, "%",
"\n\t\ta missing rate of ", rate_series * 100, "%",
"\n\t\ta starting position at ", offset,
"\n\t\tshape of the set ", ts_contaminated.shape,
"\n\t\tthis selection of series : ", 1, "->", nbr_series_impacted,
"\n\t\tvalues : ", offset_nbr, "->", offset_nbr + values_nbr, "\n\n")
if offset_nbr + values_nbr > NS:
raise ValueError(
f"\n\tError: The sum of offset ({offset_nbr}) and missing values ({values_nbr}) exceeds the limit of of the series."
f" ({offset_nbr+values_nbr} must be smaller than {NS}).")
for series in range(0, nbr_series_impacted):
S = int(series)
N = len(ts_contaminated[S]) # number of values in the series
P = int(N * offset) # values to protect in the beginning of the series
W = int(N * rate_series) # number of data to remove
L = (N - W - P) +1
start_index = np.random.randint(0, L) # Random start position
for to_remove in range(0, W):
index = P + start_index + to_remove
ts_contaminated[S, index] = np.nan
return ts_contaminated
[docs]
def blackout(input_data, series_rate=0.2, offset=0.1):
"""
Apply blackout contamination to the time series data.
Parameters
----------
input_data : numpy.ndarray
The time series dataset to contaminate.
series_rate : float, optional
Percentage of missing values per series (default is 0.2).
offset : float, optional
Size of the uncontaminated section at the beginning of the series (default is 0.1).
Returns
-------
numpy.ndarray
The contaminated time series data.
"""
return TimeSeries.Contamination.missing_percentage(input_data, rate_dataset=1, rate_series=series_rate, offset=offset)
[docs]
def gaussian(input_data, rate_dataset=0.2, rate_series=0.2, std_dev=0.2, offset=0.1, seed=True):
"""
Apply contamination with a Gaussian distribution to the time series data.
Parameters
----------
input_data : numpy.ndarray
The time series dataset to contaminate.
rate_dataset : float, optional
Percentage of series to contaminate (default is 0.2).
rate_series : float, optional
Percentage of missing values per series (default is 0.2).
std_dev : float, optional
Standard deviation of the Gaussian distribution for missing values (default is 0.2).
offset : float, optional
Size of the uncontaminated section at the beginning of the series (default is 0.1).
seed : bool, optional
Whether to use a seed for reproducibility (default is True).
Returns
-------
numpy.ndarray
The contaminated time series data.
"""
ts_contaminated = input_data.copy()
M, NS = ts_contaminated.shape
if seed:
seed_value = 42
np.random.seed(seed_value)
# Validation and limitation of input parameters
rate_series = utils.verification_limitation(rate_series)
rate_dataset = utils.verification_limitation(rate_dataset)
offset = utils.verification_limitation(offset)
nbr_series_impacted = int(np.ceil(M * rate_dataset))
offset_nbr = int(offset * NS)
values_nbr = int(NS * rate_series)
print(f"\n\n\tGAUSSIAN contamination has been called with :"
f"\n\t\ta number of series impacted {rate_dataset * 100}%"
f"\n\t\ta missing rate of {rate_series * 100}%"
f"\n\t\ta starting position at {offset_nbr}"
f"\n\t\tvalues to remove by series {values_nbr}"
f"\n\t\twith a seed option set to {seed}"
f"\n\t\twith a seed value set to {seed_value}"
f"\n\t\tGaussian std_dev {std_dev}"
f"\n\t\tshape of the set {ts_contaminated.shape}"
f"\n\t\tthis selection of series {nbr_series_impacted}\n\n")
if offset_nbr + values_nbr > NS:
raise ValueError(
f"\n\tError: The sum of offset ({offset_nbr}) and missing values ({values_nbr}) exceeds the limit of of the series.")
for series in range(0, nbr_series_impacted):
S = int(series)
N = len(ts_contaminated[S]) # number of values in the series
P = int(N * offset) # values to protect in the beginning of the series
W = int(N * rate_series) # number of data points to remove
R = np.arange(P, N)
# probability density function
mean = np.mean(ts_contaminated[S])
mean = max(min(mean, 1), -1)
probabilities = norm.pdf(R, loc=P + mean * (N - P), scale=std_dev * (N - P))
# normalizes the probabilities so that their sum equals 1
probabilities /= probabilities.sum()
# select the values based on the probability
missing_indices = np.random.choice(R, size=W, replace=False, p=probabilities)
# apply missing values
ts_contaminated[S, missing_indices] = np.nan
return ts_contaminated
[docs]
def distribution(input_data, rate_dataset=0.2, rate_series=0.2, probabilities=None, offset=0.1, seed=True):
"""
Apply contamination with a probabilistic distribution to the time series data.
Parameters
----------
input_data : numpy.ndarray
The time series dataset to contaminate.
rate_dataset : float, optional
Percentage of series to contaminate (default is 0.2).
rate_series : float, optional
Percentage of missing values per series (default is 0.2).
probabilities : 2-D array-like, optional
The probabilities of being contaminated associated with each values of a series.
Most match the shape of input data without the offset : (e.g. [[0.1, 0, 0.3, 0], [0.2, 0.1, 0.2, 0.9]])
offset : float, optional
Size of the uncontaminated section at the beginning of the series (default is 0.1).
seed : bool, optional
Whether to use a seed for reproducibility (default is True).
Returns
-------
numpy.ndarray
The contaminated time series data.
"""
ts_contaminated = input_data.copy()
M, NS = ts_contaminated.shape
if seed:
seed_value = 42
np.random.seed(seed_value)
# Validation and limitation of input parameters
rate_series = utils.verification_limitation(rate_series)
rate_dataset = utils.verification_limitation(rate_dataset)
offset = utils.verification_limitation(offset)
nbr_series_impacted = int(np.ceil(M * rate_dataset))
offset_nbr = int(offset * NS)
values_nbr = int(NS * rate_series)
print(f"\n\n\tGAUSSIAN contamination has been called with :"
f"\n\t\ta number of series impacted {rate_dataset * 100}%"
f"\n\t\ta missing rate of {rate_series * 100}%"
f"\n\t\ta starting position at {offset_nbr}"
f"\n\t\tvalues to remove by series {values_nbr}"
f"\n\t\twith a seed option set to {seed}"
f"\n\t\twith a seed value set to {seed_value}"
f"\n\t\tshape of the set {ts_contaminated.shape}"
f"\n\t\tprobabilities list {np.array(probabilities).shape}"
f"\n\t\tthis selection of series {nbr_series_impacted}\n\n")
if offset_nbr + values_nbr > NS:
raise ValueError(
f"\n\tError: The sum of offset ({offset_nbr}) and missing values ({values_nbr}) exceeds the limit of of the series.")
if np.array(probabilities).shape != (M,NS-offset_nbr):
raise ValueError(
f"\n\tError: The probability list does not match the matrix in input {np.array(probabilities).shape} != ({M},{NS-offset_nbr}).")
for series in range(0, nbr_series_impacted):
S = int(series)
N = len(ts_contaminated[S]) # number of values in the series
P = int(N * offset) # values to protect in the beginning of the series
W = int(N * rate_series) # number of data points to remove
R = np.arange(P, N)
D = probabilities[S]
missing_indices = np.random.choice(R, size=W, replace=False, p=D)
# apply missing values
ts_contaminated[S, missing_indices] = np.nan
return ts_contaminated
[docs]
def disjoint(input_data, rate_series=0.1, limit=1, offset=0.1):
"""
Apply disjoint contamination to the time series data.
Parameters
----------
input_data : numpy.ndarray
The time series dataset to contaminate.
rate_series : float, optional
Percentage of missing values per series (default is 0.1).
limit : float, optional
Percentage expressing the limit index of the end of the contamination (default is 1: all length).
offset : float, optional
Size of the uncontaminated section at the beginning of the series (default is 0.1).
Returns
-------
numpy.ndarray
The contaminated time series data.
"""
ts_contaminated = input_data.copy()
M, NS = ts_contaminated.shape
rate_series = utils.verification_limitation(rate_series)
offset = utils.verification_limitation(offset)
offset_nbr = int(offset * NS)
values_nbr = int(NS * rate_series)
print(f"\n\n\tDISJOINT contamination has been called with :"
f"\n\t\ta missing rate of {rate_series * 100}%"
f"\n\t\ta starting position at {offset_nbr}"
f"\n\t\tvalues to remove by series {values_nbr}"
f"\n\t\tlimit to stop {limit}"
f"\n\t\tshape of the set {ts_contaminated.shape}")
if offset_nbr + values_nbr > NS:
raise ValueError(
f"\n\tError: The sum of offset ({offset_nbr}) and missing values ({values_nbr}) exceeds the limit of of the series.")
S = 0
X = 0
final_limit = int(NS*limit)-1
while S < M:
N = len(ts_contaminated[S]) # number of values in the series
P = int(N * offset) # values to protect in the beginning of the series
W = int(N * rate_series) # number of data to remove
L = X + W # new limit
for to_remove in range(X, L):
index = P + to_remove
ts_contaminated[S, index] = np.nan
if index >= final_limit: # reach the limitation
return ts_contaminated
X = L
S = S + 1
return ts_contaminated
[docs]
def overlap(input_data, rate_series=0.2, limit=1, shift=0.05, offset=0.1):
"""
Apply overlap contamination to the time series data.
Parameters
----------
input_data : numpy.ndarray
The time series dataset to contaminate.
rate_series : float, optional
Percentage of missing values per series (default is 0.2).
limit : float, optional
Percentage expressing the limit index of the end of the contamination (default is 1: all length).
shift : float, optional
Percentage of shift inside each the last disjoint contamination.
offset : float, optional
Size of the uncontaminated section at the beginning of the series (default is 0.1).
Returns
-------
numpy.ndarray
The contaminated time series data.
"""
ts_contaminated = input_data.copy()
M, NS = ts_contaminated.shape
rate_series = utils.verification_limitation(rate_series)
offset = utils.verification_limitation(offset)
offset_nbr = int(offset * NS)
values_nbr = int(NS * rate_series)
print(f"\n\n\tOVERLAP contamination has been called with :"
f"\n\t\ta missing rate of {rate_series * 100}%"
f"\n\t\ta offset of {offset*100}%"
f"\n\t\ta starting position at {offset_nbr}"
f"\n\t\tvalues to remove by series {values_nbr}"
f"\n\t\ta shift overlap of {shift * 100} %"
f"\n\t\ta shift in number {int(shift * NS)}"
f"\n\t\tlimit to stop {limit}"
f"\n\t\tshape of the set {ts_contaminated.shape}")
if offset_nbr + values_nbr > NS:
raise ValueError(
f"\n\tError: The sum of offset ({offset_nbr}) and missing values ({values_nbr}) exceeds the limit of of the series.")
if int(NS*shift) > int(NS*offset):
raise ValueError(f"Shift too big for this dataset and offset: shift ({int(NS*shift)}), offset ({int(NS*offset)}).")
S, X = 0, 0
final_limit = int(NS * limit) - 1
while S < M:
N = len(ts_contaminated[S]) # number of values in the series
P = int(N * offset) # values to protect in the beginning of the series
W = int(N * rate_series) # number of data to remove
if X != 0:
X = X - int(N * shift)
L = X + W # new limit
for to_remove in range(X, L):
index = P + to_remove
ts_contaminated[S, index] = np.nan
if index >= final_limit: # reach the limitation
return ts_contaminated
X = L
S = S + 1
return ts_contaminated