import types
import copy
import pandas as pd
from sklearn.preprocessing import StandardScaler
import numpy as np
import pickle
import warnings
from memoization import cached
# There is a bug that prevents to correctly memorize a pandas.DataFrame
# Thus, all functions that use the @cached decorator need to accept serialized dataframes (pickle is a good option)
from ._config import _cache_max_items
DATAFRAME_METADATA_CONTAINER_FROM_SPY = 'spy'
DATAFRAME_PREPROCESSING_CONTAINER = 'preprocessing'
FUNC_PROP = 'func'
GRID_PROP = 'grid'
SUMMARY_PROP = 'summary'
QUERY_DF_PROP = 'query_df'
KWARGS_PROP = 'kwargs'
START_PROP = 'start'
END_PROP = 'end'
TZ_PROP = 'tz_convert'
STATUS_PROP = 'status'
def _reformat_time_delta(time_delta: pd.Timedelta) -> str:
days = time_delta.days
hours, rem = divmod(time_delta.seconds, 3600)
minutes, seconds = divmod(rem, 60)
total_hours = days * 24 + hours
return f"{total_hours:02}:{minutes:02}:{seconds:02}"
def _validate_df(df):
if not isinstance(df, pd.DataFrame):
raise TypeError("Expecting a dataframe, {} passed".format(type(df)))
if len(df) <= 0:
raise ValueError("Dataframe is empty")
if len(df.columns) == 0:
raise ValueError("There are zero signals in the dataframe")
is_number = np.vectorize(lambda x: np.issubdtype(x, np.number))
col_types = is_number(df.dtypes)
non_numeric_cols = list(df.columns[list(np.where(~np.array(col_types))[0])])
if not all(item for item in col_types):
raise ValueError(f"All columns in dataframe must be numeric. Check columns={non_numeric_cols}")
if hasattr(df, DATAFRAME_METADATA_CONTAINER_FROM_SPY) and \
getattr(df, DATAFRAME_METADATA_CONTAINER_FROM_SPY) is None:
delattr(df, DATAFRAME_METADATA_CONTAINER_FROM_SPY)
if not hasattr(df, DATAFRAME_METADATA_CONTAINER_FROM_SPY):
df_spy_properties = {
'grid': getattr(df, GRID_PROP) if hasattr(df, GRID_PROP) else None,
'query_df': getattr(df, QUERY_DF_PROP) if hasattr(df, QUERY_DF_PROP) else None,
'kwargs': getattr(df, KWARGS_PROP) if hasattr(df, KWARGS_PROP) else None,
'func': getattr(df, FUNC_PROP) if hasattr(df, FUNC_PROP) else None,
'start': getattr(df, START_PROP) if hasattr(df, START_PROP) else None,
'end': getattr(df, END_PROP) if hasattr(df, END_PROP) else None,
'tz_convert': getattr(df, TZ_PROP) if hasattr(df, TZ_PROP) else None,
'status': getattr(df, STATUS_PROP) if hasattr(df, STATUS_PROP) else None,
}
properties = types.SimpleNamespace(**df_spy_properties)
setattr(df, DATAFRAME_METADATA_CONTAINER_FROM_SPY, properties)
# noinspection PyProtectedMember
if DATAFRAME_METADATA_CONTAINER_FROM_SPY not in df._metadata:
# noinspection PyProtectedMember
df._metadata.append(DATAFRAME_METADATA_CONTAINER_FROM_SPY)
namespace = getattr(df, DATAFRAME_METADATA_CONTAINER_FROM_SPY)
if not namespace.grid:
warnings.warn(
"'DataFrame' object has 'spy.grid=None'. An attempt to infer the sampling frequency will be made")
if not pd.infer_freq(df.index):
raise ValueError("Sampling period could not be inferred. "
"It looks like the dataframe does not have a uniform grid. Try pulling the data using"
"spy.pull with a `grid` value and make sure that there are no invalid values in the"
"time period of interest")
[docs]def attach_summary(df, summary):
"""
This functions adds the summary of a pre-processing operation as property
of the DataFrame that contains the data.
Parameters
----------
df: pandas.DataFrame
A DataFrame that contains a set of signals as columns and date-time as
the index.
summary: pandas.DataFrame
A DataFrame of exactly one column with the summary of the
pre-processing step and signal names as index.
Returns
-------
df: pandas.DataFrame
The input DataFrame with a preprocessing container attached as metadata.
Notes
------
This function modifies the input DataFrame by adding or updating the
`preprocessing.summary` property.
"""
if len(df.columns) > len(summary):
columns = df.columns
else:
columns = summary.index
try:
namespace = getattr(df, DATAFRAME_PREPROCESSING_CONTAINER)
if namespace.summary is not None:
summary_df = copy.deepcopy(namespace.summary)
cols = list(summary_df.columns)
else:
summary_df = pd.DataFrame(columns=[], index=columns)
cols = []
except AttributeError:
summary_df = pd.DataFrame(columns=[], index=columns)
cols = []
if len(summary.columns) != 1:
raise ValueError("Expecting a dataframe of exactly one column")
if summary.columns[0] in summary_df:
new_summary = copy.deepcopy(summary_df)
new_summary.update(summary)
else:
new_summary = summary_df.join(summary)
if summary.columns[0] not in cols:
cols += [summary.columns[0]]
new_summary.replace('', '-', inplace=True)
new_summary.fillna('-', inplace=True)
# noinspection PyProtectedMember
if DATAFRAME_PREPROCESSING_CONTAINER not in df._metadata:
# noinspection PyProtectedMember
df._metadata.append(DATAFRAME_PREPROCESSING_CONTAINER)
# cols helps to keep the order of the columns
summary_namespace = types.SimpleNamespace(summary=new_summary[cols])
setattr(df, DATAFRAME_PREPROCESSING_CONTAINER, summary_namespace)
@cached(max_size=_cache_max_items)
def _pickled_standardization(df_serialized):
df = _pickled_non_numeric(df_serialized, True)
if df.empty:
return pd.DataFrame()
names = df.columns
index = df.index
scaler = StandardScaler()
scaled_df = scaler.fit_transform(df)
scaled_df = pd.DataFrame(scaled_df, index=index, columns=names)
# noinspection PyProtectedMember
for x in df._metadata:
if x in df.__dict__.keys():
value = df.__getattr__(x)
scaled_df.__setattr__(x, value)
# noinspection PyProtectedMember
if x not in scaled_df._metadata:
# noinspection PyProtectedMember
scaled_df._metadata += [x]
return scaled_df
@cached(max_size=_cache_max_items)
def _pickled_interpolate(df_serialized, consecutivenans):
df = pickle.loads(df_serialized)
if df.empty:
return pd.DataFrame()
percentages = df.isna().mean()
limit = int(len(df) * consecutivenans)
if limit == 0:
interpolated_df = copy.deepcopy(df)
else:
interpolated_df = df.interpolate(method='linear', limit_direction='both', limit=limit)
new_percentages = interpolated_df.isna().mean()
percentages.name = 'Initial Invalid (%)'
new_percentages.name = 'Invalid after Interpolation (%)'
attach_summary(interpolated_df, np.round(percentages.to_frame() * 100, 2))
attach_summary(interpolated_df, np.round(new_percentages.to_frame() * 100, 2))
return interpolated_df
@cached(max_size=_cache_max_items)
def _pickled_flat_lines(df_serialized):
# df = pickle.loads(df_serialized)
df = _pickled_non_numeric(df_serialized, True)
if df.empty:
return pd.DataFrame()
# dff = df.diff()
keep = list(df.describe().loc['std'][df.describe().loc['std'] != 0].index)
remove = list(df.describe().loc['std'][df.describe().loc['std'] == 0].index)
if len(remove) > 0:
warnings.warn("The signals {} appear to be flat lines and have been removed from the analysis".format(remove))
df_out = df[keep]
summary = ["Yes (removed)" if x in remove else "No" for x in df.columns]
local_summary = pd.DataFrame(data=summary, columns=['Flat Signal?'], index=df.columns)
attach_summary(df_out, local_summary)
return df_out
@cached(max_size=_cache_max_items)
def _pickled_gaps(df_serialized, percent_nan):
df = pickle.loads(df_serialized)
if df.empty:
return pd.DataFrame()
percentages = df.isna().mean()
marked_for_removal = percentages[percentages > percent_nan]
cols_to_remove = list(marked_for_removal.index)
dff = copy.deepcopy(df.drop(labels=cols_to_remove, axis=1))
if len(cols_to_remove) > 0:
warnings.warn(f"The signals {cols_to_remove} have been removed from the analysis since they have "
f"more than {percent_nan * 100:.2f}% invalid values")
summary = ["Yes (removed)" if x in cols_to_remove else "No" for x in df.columns]
local_summary = pd.DataFrame(data=summary, columns=['Gaps in Signal?'], index=df.columns)
attach_summary(dff, local_summary)
return dff
@cached(max_size=_cache_max_items)
def _pickled_non_numeric(df_serialized, auto_remove):
df = pickle.loads(df_serialized)
if df.empty:
return pd.DataFrame()
types = df.infer_objects().dtypes
types_dict = types.to_dict()
non_numeric_signals = [k for k, v in types_dict.items() if v.name == 'object']
if auto_remove:
summary = ["No (removed)" if v.name == 'object' else "Yes" for k, v in types_dict.items()]
dff = copy.deepcopy(df.drop(labels=non_numeric_signals, axis=1))
if len(non_numeric_signals) > 0:
warnings.warn(
f"The signals {non_numeric_signals} have been removed from the analysis since they are non-numeric")
else:
summary = ["No (retained)" if v.name == 'object' else "Yes" for k, v in types_dict.items()]
dff = copy.deepcopy(df)
if len(non_numeric_signals) > 0:
warnings.warn(
f"The signals {non_numeric_signals} have been detected as non-numeric but have not been removed."
f"Use `auto_remove=True` to remove them from the dataframe")
local_summary = pd.DataFrame(data=summary, columns=['Numeric Signal?'], index=df.columns)
attach_summary(dff, local_summary)
return dff
@cached(max_size=_cache_max_items)
def _pickled_duplicated_column_names(df_serialized):
df = pickle.loads(df_serialized)
_validate_df(df)
if df.empty:
return pd.DataFrame()
cols = pd.Series(df.columns)
for dup in cols[cols.duplicated()].unique():
cols[cols[cols == dup].index.values.tolist()] = [dup + '.' + str(i) if i != 0 else dup for i in
range(sum(cols == dup))]
if hasattr(df, DATAFRAME_METADATA_CONTAINER_FROM_SPY):
namespace = getattr(df, DATAFRAME_METADATA_CONTAINER_FROM_SPY)
if namespace.query_df is not None:
if list(df.columns) == list(namespace.query_df['Name'].values):
namespace.query_df['New Name'] = cols
# rename the columns with the cols list.
df.columns = cols
return df
@cached(max_size=_cache_max_items)
def _pickled_sampling_info(df_serialized, sampling_ratio_threshold, remove=False):
keyword = 'tagged'
if remove:
keyword = 'removed'
df = pickle.loads(df_serialized)
_validate_df(df)
if df.empty:
return pd.DataFrame()
dff = copy.deepcopy(df)
if hasattr(dff, DATAFRAME_METADATA_CONTAINER_FROM_SPY):
namespace = getattr(df, DATAFRAME_METADATA_CONTAINER_FROM_SPY)
if namespace.kwargs is None or 'items' not in namespace.kwargs or 'New Name' not in namespace.query_df.columns:
return dff
if 'Estimated Sample Period' not in namespace.kwargs['items']:
return dff
summary_sampling_period = pd.DataFrame(data=namespace.kwargs['items']['Estimated Sample Period'].values,
columns=['Original Sampling Period'],
index=namespace.query_df['New Name'].values)
summary_sampling_period['Original Sampling Period'] = summary_sampling_period['Original Sampling Period'].apply(
_reformat_time_delta)
attach_summary(dff, summary_sampling_period)
if hasattr(namespace, GRID_PROP):
if namespace.grid is None:
return dff
grid = pd.Timedelta(namespace.grid)
ratio_sampling_periods = pd.DataFrame(
data=namespace.kwargs['items']['Estimated Sample Period'].values / grid,
columns=['Ratio of sampling periods (raw/gridded)'], index=namespace.query_df['New Name'].values)
remove_signals = ratio_sampling_periods['Ratio of sampling periods (raw/gridded)'][
(ratio_sampling_periods['Ratio of sampling periods (raw/gridded)'] > sampling_ratio_threshold) |
(ratio_sampling_periods['Ratio of sampling periods (raw/gridded)'] < 1 / sampling_ratio_threshold)
].index
remove_signals = [x for x in remove_signals if x in dff.columns]
summary = [f'{row["Ratio of sampling periods (raw/gridded)"]} ({keyword})' if index in remove_signals else
f'{row["Ratio of sampling periods (raw/gridded)"]} (previously removed)'
if index not in dff.columns
else f'{row["Ratio of sampling periods (raw/gridded)"]}'
for index, row in ratio_sampling_periods.iterrows()]
ratio_sampling_periods['Ratio of sampling periods (raw/gridded)'] = summary
if remove:
dff = copy.deepcopy(dff.drop(labels=remove_signals, axis=1))
attach_summary(dff, ratio_sampling_periods)
return dff
[docs]def standardization(df):
"""
Scales the data in the DataFrame to zero mean and unit variance
Parameters
----------
df: pandas.DataFrame
A pickled DataFrame that contains a set of signals as columns and
date-time as the index.
Returns
--------
scaled_df: pandas.DataFrame
A DataFrame that contains a the scaled signals as columns and
date-time as the index.
Notes
------
A summary of how this function modified or tagged signals in the dataframe
can be accessed in the property `scaled_df.preprocessing.summary`.
"""
return _pickled_standardization(pickle.dumps(df))
[docs]def interpolate_nans(df, consecutivenans=0.05):
"""
Interpolates invalid values (linearly) per signal only if the percentage of
consecutive invalid values with respect to the total number of samples is
less than the specified threshold.
Parameters
----------
df: pandas.DataFrame
A DataFrame that contains a set of signals as columns and date-time
as the index.
consecutivenans: float, default 0.05
percentage of the total number of samples that the interpolator will
fill consecutive invalid values.
Returns
--------
interpolated_df: pandas.DataFrame
A DataFrame with the interpolated values (if applicable)
Notes
------
A summary of how this function modified or tagged signals in the dataframe
can be accessed in the property `interpolated_df.preprocessing.summary`.
"""
return _pickled_interpolate(pickle.dumps(df), consecutivenans)
[docs]def remove_flat_lines(df):
"""
Find signals with zero variance (flat lines) and remove them from the
input DataFrame
Parameters
----------
df: pandas.DataFrame
A DataFrame that contains a set of signals as columns and date-time as
the index.
Returns
---------
df_out: pandas.DataFrame
A DataFrame without the signals that are flat lines
Notes
------
A summary of how this function modified or tagged signals in the dataframe
can be accessed in the property `df_out.preprocessing.summary`
"""
return _pickled_flat_lines(pickle.dumps(df))
[docs]def remove_signals_with_missing_data(df, percent_nan=0.4):
"""
Removes columns from the dataframe that have a high percentage of missing
data.
Parameters
----------
df: pandas.DataFrame
A DataFrame that contains a set of signals as columns and date-time as
the index.
percent_nan: float, default 0.4
Maximum percentage of invalid values (from the total number of samples)
allowed in order to keep the signal in the DataFrame.
Returns
--------
dff: pandas.DataFrame
A DataFrame of the signals that have less than percent_nan of missing
data.
Notes
------
A summary of how this function modified or tagged signals in the dataframe
can be accessed in the property `dff.preprocessing.summary`.
"""
return _pickled_gaps(pickle.dumps(df), percent_nan)
[docs]def remove_non_numeric(df, auto_remove=True):
"""
Removes non-numeric signals from the input DataFrame.
Parameters
----------
df: pandas.DataFrame
A DataFrame that contains a set of signals as columns and date-time as
the index.
auto_remove: bool, default True
Removes the non-numeric signals from the DataFrame if set to True.
Otherwise, it just tags the signals.
Returns
--------
dff: pandas.DataFrame
A DataFrame of the signals that do not have non-numeric data
Notes
------
A summary of how this function modified or tagged signals in the dataframe
can be accessed in the property `dff.preprocessing.summary`.
"""
return _pickled_non_numeric(pickle.dumps(df), auto_remove)
[docs]def duplicated_column_names(df):
"""
Renames columns that have the same column name
Parameters
----------
df: pandas.DataFrame
A DataFrame that contains a set of signals as columns and date-time as
the index.
Returns
--------
dff: pandas.DataFrame
A DataFrame of the signals with unique column names
"""
return _pickled_duplicated_column_names(pickle.dumps(df))
[docs]def sampling_info(df, sampling_ratio_threshold, remove):
"""
Attaches sampling period information if available and removes or tags
signals that have a high or low ratio of sampling period to gridded period.
Parameters
----------
df: pandas.DataFrame
A DataFrame that contains a set of signals as columns and date-time as
the index.
sampling_ratio_threshold: float
Signals with a sampling rate ratio (raw/gridded) above
sampling_ratio_threshold or below 1/sampling_ratio_threshold will be
removed from the dataframe.
remove: bool
Removes signals whose sampling period is too different (set by
sampling_ratio_threshold) from the median of sampling rates.
Returns
--------
dff: pandas.DataFrame
A DataFrame of the signals with unique column names
"""
return _pickled_sampling_info(pickle.dumps(df), sampling_ratio_threshold, remove)
[docs]def default_preprocessing_wrapper(df, consecutivenans=0.04, percent_nan=0.0, remove_disparate_sampled=False,
sampling_ratio_threshold=4, bypass_processing=False):
"""
Parameters
----------
df: pandas.DataFrame
A DataFrame that contains a set of signals as columns and date-time as
the index.
consecutivenans: float, default 0.01
percentage of the total number of samples that the interpolator will
fill consecutive invalid values.
percent_nan: float, default 0.0
Maximum percentage of invalid values (from the total number of samples)
allowed in order to keep the signal in the DataFrame.
remove_disparate_sampled: bool, default False
Removes signals whose sampling period is too different (set by
sampling_ratio_threshold) from the median of sampling rates
sampling_ratio_threshold: float, default 2.5
Signals with a sampling rate ratio (raw/gridded) above
sampling_ratio_threshold or below 1/sampling_ratio_threshold will be
removed from the dataframe
bypass_processing: bool, default False
If true, pre-processing routines in this wrapper are by-passed.
However, the _validate_df is still run to check that the dataframe
is valid even if bypass_processing is set to True.
Returns
--------
dff: pandas.DataFrame
A DataFrame of the cleansed signals
Notes
------
A summary of how this function modified or tagged signals in the dataframe can be accessed in
the property `dff.preprocessing.summary`
"""
df = copy.deepcopy(df)
df = duplicated_column_names(df)
df = remove_non_numeric(df)
if not bypass_processing:
df = interpolate_nans(df, consecutivenans=consecutivenans)
df = remove_signals_with_missing_data(df, percent_nan=percent_nan)
df = remove_flat_lines(df)
df = standardization(df)
else:
df = remove_flat_lines(df)
df = standardization(df)
df = sampling_info(df, sampling_ratio_threshold, remove=remove_disparate_sampled)
_validate_df(df)
return df