#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Nov 21 00:55:22 2021
@author: dboateng
"""
import sys
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from copy import copy
from collections import OrderedDict
import scipy.stats as stats
try:
from standardizer import MonthlyStandardizer, NoStandardizer
from feature_selection import RecursiveFeatureElimination, TreeBasedSelection, SequentialFeatureSelection
from models import Regressors
from ensemble_models import EnsembleRegressor
from metrics import Evaluate
from MLR_model import BootstrappedForwardSelection, MultipleLSRegression
except:
from .standardizer import MonthlyStandardizer, NoStandardizer
from .feature_selection import RecursiveFeatureElimination, TreeBasedSelection, SequentialFeatureSelection
from .models import Regressors
from .ensemble_models import EnsembleRegressor
from .metrics import Evaluate
from .MLR_model import BootstrappedForwardSelection, MultipleLSRegression
[docs]class PredictandTimeseries():
def __init__(self, data, transform=None, standardizer=None):
self.data = data
self.transform = None
self.standardizer = None
if transform is not None:
self.set_transform(transform)
self.set_standardizer(standardizer)
[docs] def get(self, daterange=None, anomalies=True):
if anomalies:
if daterange is not None:
return self.data_st.loc[daterange]
else:
return copy(self.data_st)
else:
if daterange is not None:
return self.data.loc[daterange]
else:
return copy(self.data)
[docs] def set_standardizer(self, standardizer):
if standardizer is not None:
self.standardizer = standardizer
else:
self.standardizer = NoStandardizer()
if self.transform is not None:
self.data_st = self.transform(self.data)
self.data_st = self.standardizer.fit_transform(self.data_st)
else:
self.data_st = self.standardizer.fit_transform(self.data)
[docs] def set_model(self, method, ensemble_learning=False, estimators=None, cv=10, final_estimator_name=None,
daterange=None, predictor_dataset=None, fit_predictors=True,
scoring=["r2", "neg_root_mean_squared_error"], MLR_learning=False, **predictor_kwargs):
self.cv = cv
self.scoring = scoring
if ensemble_learning == True:
if method not in ["Stacking", "Voting"]:
raise ValueError("ensemble method used either stacking or voting")
if estimators is None:
raise ValueError("...estimators list must be provided for ensemble models")
regressors = []
for i in range(len(estimators)) :
regressor = Regressors(method=estimators[i], cv=self.cv, scoring=self.scoring)
regressor.set_model()
if estimators[i] in ["MLP", "SVR"]:
X = self._get_predictor_data(daterange=daterange, dataset=predictor_dataset,
fit_predictors = fit_predictors,
**predictor_kwargs)
y = self.get(daterange = daterange, anomalies=fit_predictors)
X = X.loc[~np.isnan(y)]
y = y.dropna()
regressor.fit(X.values, y.values)
regressors.append((estimators[i], regressor.estimator))
self.model = EnsembleRegressor(estimators=regressors, cv=self.cv, method=method,
final_estimator_name=final_estimator_name,
scoring=self.scoring)
elif MLR_learning == True:
if method == "OLSForward":
self.model = BootstrappedForwardSelection(MultipleLSRegression(), cv=self.cv)
else:
self.model = Regressors(method=method, cv=self.cv, scoring=self.scoring)
self.model.set_model()
[docs] def set_predictors(self, predictors):
self.predictors = OrderedDict()
for p in predictors:
self.predictors[p.name] = p
def _get_predictor_data(self, daterange, dataset, fit_predictors=True, **predictor_kwargs):
Xs = []
for p in self.predictors:
Xs.append(self.predictors[p].get(daterange, dataset, fit=fit_predictors))
return pd.concat(Xs, axis=1)
[docs] def predictor_correlation(self, daterange, predictor_dataset, fit_predictors=True, fit_predictand=True,
method="pearson", use_scipy=False, **predictor_kwargs):
X = self._get_predictor_data(daterange, predictor_dataset, fit_predictors=fit_predictors,
**predictor_kwargs)
y = self.get(daterange, anomalies=fit_predictand)
# dropna values
X = X.loc[~np.isnan(y)]
y = y.dropna()
if use_scipy:
df_results = pd.DataFrame(index=np.arange(2), columns=X.columns)
for column in X.columns:
if method =="pearson":
corr = stats.pearsonr(y, X[column])
df_results[column][0] = corr[0]
df_results[column][1] = corr[1]
elif method == "spearman":
corr = stats.spearmanr(y, X[column])
df_results[column][0] = corr[0]
df_results[column][1] = corr[1]
else:
raise ValueError("The defined method is not accurate")
return df_results
else:
corr = X.corrwith(other=y, axis=0, drop=True, method=method)
corr = corr.to_frame()
return corr.T # change the code with scipy stats in order to estimate the significance
[docs] def fit_predictor(self, name, daterange, predictor_dataset):
if type(name) == list:
for n in name:
self.predictors[n].fit(daterange, predictor_dataset)
else:
self.predictors[name].fit(daterange, predictor_dataset)
[docs] def fit(self, daterange, predictor_dataset, fit_predictors=True , predictor_selector=True, selector_method="Recursive",
selector_regressor="Ridge", num_predictors=None, selector_direction=None, cal_relative_importance=False,
fit_predictand=True, impute=False, impute_method=None, impute_order=None,
**predictor_kwargs):
# checking attributes required before fitting
if not hasattr(self, "model"):
raise ValueError("...set model before fitting...")
if not hasattr(self, "predictors"):
raise ValueError("-----define predictor set first with set_predictors method....")
X = self._get_predictor_data(daterange, predictor_dataset, fit_predictors=fit_predictors, **predictor_kwargs)
y = self.get(daterange, anomalies=fit_predictand)
# dropna values
if impute == False:
X = X.loc[~np.isnan(y)]
y = y.dropna()
else:
if impute_method is None:
raise ValueError("Enter the imputation method, i.e is either linear or spline ..")
else:
y = y.fillna(y.interpolate(method = impute_method,
order= impute_order))
if predictor_selector ==True:
if selector_method == "Recursive":
self.selector = RecursiveFeatureElimination(regressor_name=selector_regressor)
elif selector_method == "TreeBased":
self.selector = TreeBasedSelection(regressor_name=selector_regressor)
elif selector_method == "Sequential":
if num_predictors == None and selector_direction == None:
self.selector = SequentialFeatureSelection(regressor_name=selector_regressor)
else:
self.selector = SequentialFeatureSelection(regressor_name=selector_regressor,
n_features=num_predictors, direction=selector_direction)
else:
raise ValueError("....selector method not recognized .....")
X = X.loc[~np.isnan(y)] # because the imputation won't fill series of nan for more years (it just interpolate)
y = y.dropna()
self.selector.fit(X, y)
self.selector.print_selected_features(X)
X_selected = self.selector.transform(X)
self.model.fit(X_selected, y)
else:
self.model.fit(X, y)
# explained variance for OLS model
if hasattr(self.model, "regressor"):
if hasattr(self.model.regressor, "explained_variances"):
self.explained_variance_predictors = self.model.regressor.explained_variances
if cal_relative_importance == True:
if not hasattr(self.model, "coef"):
raise ValueError("The estimator should have coef_attributes..or must be fitted before this method....")
else:
coef_ = self.model.coef()
if predictor_selector == True:
score = self.model.score(X_selected,y)
residual = np.sqrt(1 - score)
normalized_coef_ = coef_ * np.std(X_selected, axis=0)
else:
score = self.model.score(X,y)
residual = np.sqrt(1 - score)
normalized_coef_ = coef_ * np.std(X, axis=0)
total = residual + np.sum(np.abs(normalized_coef_))
self.predictor_relative_contribution = normalized_coef_ / total
[docs] def predict(self, daterange, predictor_dataset, fit_predictors=True, fit_predictand=True,
**predictor_kwargs):
X = self._get_predictor_data(daterange, predictor_dataset, fit_predictors, **predictor_kwargs)
if not hasattr(self, "selector"):
yhat = pd.Series(data=self.model.predict(X), index=daterange)
else:
X_selected = self.selector.transform(X)
yhat = pd.Series(data=self.model.predict(X_selected), index=daterange)
if fit_predictand == False:
if self.standardizer is not None:
yhat = self.standardizer.inverse_transform(yhat)
if self.transform is not None:
yhat = self.backtransform(yhat)
return yhat
[docs] def cross_validate_and_predict(self, daterange, predictor_dataset, fit_predictand=True, return_cv_scores=False,
**predictor_kwargs):
X = self._get_predictor_data(daterange, predictor_dataset, **predictor_kwargs)
y = self.get(daterange, anomalies=fit_predictand)
X = X.loc[~np.isnan(y)]
y = y.dropna()
if hasattr(self, "selector"):
X_selected = self.selector.transform(X)
val_score = self.model.cross_validate(X_selected, y)
fit_score = self.model.score(X_selected, y)
y_pred = self.model.cross_val_predict(X_selected, y)
else:
fit_score = self.model.score(X, y)
val_score = self.model.cross_validate(X, y)
y_pred = self.model.cross_val_predict(X, y)
# write code to work for every scoring variable
if "r2" in self.scoring:
scores = {"test_r2": np.mean(val_score["test_r2"]),
"test_r2_std": np.std(val_score["test_r2"]),
"train_r2": fit_score,
"test_rmse": -np.mean(val_score["test_neg_root_mean_squared_error"]),
"test_rmse_std": np.std(val_score["test_neg_root_mean_squared_error"]),
"test_mae": -np.mean(val_score["test_neg_mean_absolute_error"]),
"test_mae_std": np.std(val_score["test_neg_mean_absolute_error"]),
}
else:
scores = {"train_r2": fit_score,
"test_rmse": -np.mean(val_score["test_neg_root_mean_squared_error"]),
"test_rmse_std": np.std(val_score["test_neg_root_mean_squared_error"]),
"test_mae": -np.mean(val_score["test_neg_mean_absolute_error"]),
"test_mae_std": np.std(val_score["test_neg_mean_absolute_error"]),
"test_mae_precent": -np.mean(val_score["test_neg_mean_absolute_percentage_error"]),
"test_mae_precent_std": np.std(val_score["test_neg_mean_absolute_percentage_error"]),
}
y_pred = pd.DataFrame({"obs": y,
"prediction" : y_pred})
if return_cv_scores == True:
return scores, y_pred, val_score
else:
return scores, y_pred
[docs] def evaluate(self, daterange, predictor_dataset, fit_predictand=True, **predictor_kwargs):
y_true = self.get(daterange, anomalies=fit_predictand)
y_pred = self.predict(daterange, predictor_dataset, anomalies=fit_predictand, **predictor_kwargs)
y_pred = y_pred.loc[~np.isnan(y_true)]
y_true = y_true.dropna()
self.evaluate = Evaluate(y_true, y_pred)
rmse = self.evaluate.RMSE()
nse = self.evaluate.NSE()
mse = self.evaluate.MSE()
mae = self.evaluate.MAE()
exp_var = self.evaluate.explained_variance()
r2 = self.evaluate.R2_score()
max_error = self.evaluate.max_error()
adj_r2 = self.evaluate.adjusted_r2()
scores = {"RMSE": rmse,
"MSE": mse,
"NSE": nse,
"MAE": mae,
"explained_variance": exp_var,
"r2": r2,
"max_error": max_error,
"adj_r2": adj_r2}
return scores
[docs] def relative_predictor_importance(self):
if not hasattr(self,"predictor_relative_contribution"):
raise ValueError("The relative varince must be calculated during fit")
return self.predictor_relative_contribution
[docs] def selected_names(self):
if not hasattr(self, "selector"):
raise ValueError("Predictor selection must be defined when fitting the model")
names = self.selector.select_names
names = names.to_frame()
return names.T
[docs] def tree_based_feature_importance(self, daterange, predictor_dataset, fit_predictand=True, plot=False, **predictor_kwargs):
if not hasattr(self, "selector"):
raise ValueError("Predictor selection must be defined when fitting the model")
# if not hasattr(self.selector, "feature_importance"):
# raise TypeError("the feature selector must be treebased")
X = self._get_predictor_data(daterange, predictor_dataset, **predictor_kwargs)
y = self.get(daterange, anomalies=fit_predictand)
X = X.loc[~np.isnan(y)]
y = y.dropna()
return self.selector.feature_importance(X,y, plot=plot)
[docs] def tree_based_feature_permutation_importance(self, daterange, predictor_dataset, fit_predictand=True,
plot=False, **predictor_kwargs):
if not hasattr(self, "selector"):
raise ValueError("Predictor selection must be defined when fitting the model")
# if not hasattr(self.selector.estimator, "feature_importances_"):
# raise TypeError("the feature selector must be treebased")
X = self._get_predictor_data(daterange, predictor_dataset, **predictor_kwargs)
y = self.get(daterange, anomalies=fit_predictand)
X = X.loc[~np.isnan(y)]
y = y.dropna()
return self.selector.feature_importance(X,y, plot=plot)
[docs] def climate_score(self, fit_period, score_period, predictor_dataset, **predictor_kwargs):
"""
How much better the prediction for the given period is then the
annual mean.
Parameters
----------
fit_period : pd.DatetimeIndex
Range of data that should will be used for creating the reference prediction.
score_period : pd.DatetimeIndex
Range of data for that the prediction score is evaluated
predictor_dataset : stat_downscaling_tools.Dataset
The dataset that should be used to calculate the predictors
predictor_kwargs : keyword arguments
These arguments are passed to the predictor's get function
Returns
-------
cscore : double
Climate score (similar to rho squared). 1 for perfect fit, 0 for no
skill, negative for even worse skill than mean prediction.
"""
if isinstance(self.standardizer, MonthlyStandardizer):
# the reference prediction is the seasonal cycle
y_fit_period = self.get(fit_period, anomalies=False)
new_standardizer = copy(self.standardizer)
new_standardizer.fit(y_fit_period)
zero_prediction = pd.Series(np.zeros(len(y_fit_period)), index=fit_period)
mean_prediction = new_standardizer.inverse_transform(zero_prediction)
climate_mse = np.mean((y_fit_period - mean_prediction).dropna()**2)
y_predict_period = self.get(score_period, anomalies=False)
# Anomalies need to be predicted and the anomalies converted using
# the new standardizer, otherwise the prediction has more info than the reference
yhat_anomalies = self.predict(score_period, predictor_dataset, anomalies=True, **predictor_kwargs)
yhat = new_standardizer.inverse_transform(yhat_anomalies)
prediction_mse = np.mean((y_predict_period - yhat).dropna()**2)
else:
# don't use this together with a transform
y_fit = self.get(fit_period, anomalies=True).dropna()
y_predict = self.get(score_period, anomalies=True)
yhat = self.predict(score_period, predictor_dataset, anomalies=True, **predictor_kwargs)
error = (y_predict - yhat).dropna()
prediction_mse = np.mean(error**2)
climate_mse = np.mean((np.mean(y_fit) - y_fit)**2)
if climate_mse == 0:
return 0.0 # identifier for such a case (avoid dividing by 0)
else:
return 1 - prediction_mse/climate_mse