#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Jan 25 16:52:13 2022
@author: dboateng
"""
# importing models
from sklearn.model_selection import KFold, LeaveOneOut, LeaveOneGroupOut, RepeatedKFold, TimeSeriesSplit
import numpy as np
[docs]class Splitter():
# try more on how to use the customized splitter with the model fitting
def __init__(self, method, shuffle=False, n_splits=5):
self.method = method
self.shuffle = shuffle
self.n_splits = n_splits
self.random_state = None
if self.method == "Kfold":
self.estimator = KFold(n_splits=self.n_splits, shuffle=self.shuffle, random_state=self.random_state)
elif self.method == "LeaveOneOut":
self.estimator = LeaveOneOut()
elif self.method == "LeaveOneGroupOut":
self.estimator = LeaveOneGroupOut()
elif self.method == "RepeatedKFold":
self.estimator = RepeatedKFold()
elif self.method == "TimeSeriesSplit":
self.estimator = TimeSeriesSplit()
else:
raise ValueError("Invalid splitter method might have been defined")
[docs] def get_n_splits(self, X=None, y=None, groups=None):
return self.method.get_n_splits(X, y, groups)
[docs] def split(self, X, y=None, groups=None):
return self.method.split(X, y,)
[docs]class MonthlyBooststrapper():
def __init__(self, n_splits=500, test_size=0.1, block_size=12):
self.n_splits = n_splits
self.test_size = test_size
self.block_size = int(block_size)
[docs] def split(self, X, y, groups=None):
"""
num_blocks * block_size = test_size*num_samples
--> n_blocks = test_size/block_size*num_samples
Parameters
----------
X : TYPE
DESCRIPTION.
y : TYPE
DESCRIPTION.
groups : TYPE, optional
DESCRIPTION. The default is None.
Returns
-------
None.
"""
num_samples = len(y)
num_blocks = round(self.test_size/self.block_size * num_samples)
for i in range(self.n_splits):
test_mask = np.zeros(num_samples, dtype=np.bool)
for k in range(num_blocks):
train_mask = np.zeros(num_samples, dtype=np.bool)
for j in range(num_samples - self.block_size):
train_mask[j] = not test_mask[j] and not test_mask[j+self.block_size]
train = np.where(train_mask)[0]
rand = np.random.choice(train)
test_mask[rand:rand + 12] = True
train= np.where(~test_mask)
test = np.where(test_mask)
yield train, test
[docs]class YearlyBootstrapper:
"""
Splits data in training and test set by picking complete years. You can use it like this::
X = ...
y = ...
yb = YearlyBootstrapper(10)
for i, (train, test) in enumerate(yb.split(X, y)):
X_train, y_train = X.iloc[train], y.iloc[train]
X_test, y_test = X.iloc[test], y.iloc[test]
...
Parameters
----------
n_splits : int (optional, default: 500)
number of splits
test_size : float (optional, default: 1/3)
Ratio of test years.
min_month_per_year : int (optional, default: 9)
minimum number of months that must be available in a year to use this
year in the test set.
"""
def __init__(self, n_splits=500, test_size=1/3, min_month_per_year=9):
self.n_splits = n_splits
self.test_size = test_size
self.min_month_per_year = min_month_per_year
[docs] def split(self, X, y, groups=None):
"""
Returns ``n_splits`` pairs of indices to training and test set.
Parameters
----------
X : pd.DataFrame
y : pd.Series
groups : dummy
X and y should both have the same DatetimeIndex as index.
Returns
-------
train : array of ints
Array of indices of training data
test : array of ints
Array of indices of test data
"""
if np.any(X.index.values != y.index.values):
raise ValueError("X and y must have the same index")
years = X.index.values.astype('datetime64[Y]').astype(int)
existing_years, counts = np.unique(years, return_counts=True)
# we only use years with at least self.min_month_per_year for the test
# set.
existing_years = existing_years[counts >= self.min_month_per_year]
N = len(existing_years)
size = int(self.test_size*N)
for i in range(self.n_splits):
test_years = np.random.choice(existing_years, size=size,
replace=False)
test_mask = np.zeros(len(years), dtype=np.bool)
for k in test_years:
test_mask = test_mask | (years == k)
train = np.where(~test_mask)
test = np.where(test_mask)
yield train, test