importutilsfromsklearn.baseimportBaseEstimator#from sklearn.utils.estimator_checks import check_estimatorfromsklearn.utils.validationimportcheck_X_y,check_array,check_is_fittedfromsklearn.metricsimport(mean_absolute_percentage_errorasmape,# mean_squared_error as mse,root_mean_squared_errorasrmse,mean_absolute_errorasmae,r2_scoreasr2)importnumpyasnpimportpandasaspdfromscipyimportoptimizeaso,statsfrominspectimportgetfullargspec,getsource# import copy
classCustomRegression(BaseEstimator):""" All variables inside the Class should end with underscore """def__init__(self,model,method="Nelder-Mead",cost=None,alpha=0,l1_ratio=0.5,maxiter=500,maxfev=1_000):self.model=modelself.method=methodself.cost=costifcostisnotNoneelseself.mseself.alpha=alphaself.l1_ratio=l1_ratioself.maxiter=maxiterself.maxfev=maxfevdef__str__(self):returnstr(self.model_)def__repr__(self):returnstr(self)defmse(self,pred,true,sample_weight):error=pred-truecost=error**2cost=(np.mean(# median is robust to outliers than meansample_weight*cost))returncostdefl1(self,params):returnnp.sum(np.abs(params-self.model_.param_initial_guess))defl2(self,params):returnnp.sum((params-self.model_.param_initial_guess)**2)defl3(self,params,l1_ratio=0.5):return(l1_ratio*self.l1(params)+(1-l1_ratio)*self.l2(params))defregularization_cost(self,params,alpha,penalty_type="l3"):""" Regularization requires standardised parameters """penalty=get_attr(self,penalty_type)(params,self.l1_ratio)return(alpha*penalty)/self.sample_size_defcost_total(self,params,X,y):pred=self.model_.equation(X,*params)cost=self.cost(y,pred,self.sample_weight)ifself.alpha==0:passelse:cost+=self.regularization_cost(params,self.alpha)returncostdefcheck_enough_samples(self):returnself.enough_samples_deffit(self,X,y,sample_weight=None):check_X_y(X,y)# Using self.X,self.y = check_X_y(self.X,self.y) removes column namesself.X,self.y=X,yself.n_features_in_=self.X.shape[1]self.sample_size_=self.X.shape[0]self.sample_weight=(sample_weightifsample_weightisnotNoneelsenp.full(self.sample_size_,1)# set Sample_Weight as 1 by default)self.sample_size_=self.sample_weight[self.sample_weight>0].shape[0]self.model_=copy.deepcopy(self.model)self.dof_=self.sample_size_-self.model_.k# - 1 # n-k-1ifself.dof_<=0:self.success_=Falseself.enough_samples_=False# raise Exception("Not enough samples")returnselfelse:self.enough_samples_=Trueparams_all=getfullargspec(self.model_.equation).argsparams=[paramforparaminparams_allifparamnotin['self',"x"]]self.optimization_=o.minimize(self.cost_total,x0=self.model_.param_initial_guess,args=(self.X,self.y),method=self.method,# "L-BFGS-B", "Nelder-Mead", "SLSQP",constraints=self.model_.constraints,bounds=self.model_.param_bounds,# [# (-1, None) for param in params # variables must be positive# ]options=dict(maxiter=self.maxiter,maxfev=self.maxfev,# xatol=1e-4,# fatol=1e-4,# adaptive=False,))self.success_=self.optimization_.successself.fitted_coeff_=(self.optimization_.x)self.fitted_coeff_formatted_=[f"""{{{utils.round_f(popt,4)}}}"""forpoptinself.fitted_coeff_]self.model_.set_fitted_coeff(*self.fitted_coeff_)self.model_.set_fitted_coeff_formatted(*self.fitted_coeff_formatted_)self.rmse_response=root_mean_squared_error(y,self.output(X),)self.rmse_link=root_mean_squared_error(y,self.output(X),)returnselfdefoutput(self,X):returnnp.array(self.model_.equation(X,*self.fitted_coeff_))defget_pred_se(self,X):returnself.rmse_responsedefpredict(self,X):check_is_fitted(self,"fitted_coeff_")# Check to verify if .fit() has been calledcheck_array(X)#X = check_array(X) # removes column namespred=(self.output(X).astype(np.float32))returnpreddefpredict_quantile(self,X,q):returnself.model_.quantile(X,self.X,self.y,link_distribution_dof=self.dof_,link_distribution_q=q)classCustomRegressionGrouped(CustomRegression):def__str__(self):x=""forkey,valueinself.get_params().items():x+=f"{str(key)}: {str([utils.round_f(v,4)forvinlist(value)])}\n\n"returnstr(x)def__init__(self,model,cost,method="Nelder-Mead",group=None):super().__init__(model=model,cost=cost,method=method)self.group=groupself.model=modelself.method=methodself.cost=costself.optimization_=dict()self.model_=dict()self.enough_samples_=dict()self.dof_=0self.sample_size_=0defget_params(self,how="dict"):params=dict()forkey,valueinself.model_.items():popt="fitted_coeff_"ifpoptindir(value):params[key]=getattr(value,popt)else:params[key]=Noneifhow=="df":params=pd.DataFrame(params).Treturnparamsdefmodel_group(self,X,y,model,method,error,sample_weight):return(CustomRegression(model=self.model,cost=self.cost,method=self.method,).fit(X,y,sample_weight))defcheck_enough_samples(self,how="all"):ifhow=="all":enough_samples=Trueforeinself.enough_samples_.values():ifnote:enough_samples=Falseelifhow=="any":enough_samples=self.enough_samples_else:passreturnenough_samplesdefapply_model_multiple_group(self,X,y,group,model,method,cost,sample_weight):forginX[self.group].unique():mask=X.eval(f"{self.group} == {g}")m=self.model_group(X[mask],y[mask],model,method,cost,sample_weight[mask]ifsample_weightisnotNoneelsesample_weight)ifm.success_:self.model_[g]=mself.enough_samples_[g]=m.enough_samples_self.optimization_[g]=m.optimization_self.sample_size_+=m.sample_size_success=Trueforoinself.optimization_.values():ifnoto.success:success=Falseself.success_=successdeffit(self,X,y,sample_weight=None):self.model_=dict()self.apply_model_multiple_group(X,y,self.group,self.model,self.method,self.cost,sample_weight)defpredict(self,X):Xs=[]preds=pd.DataFrame()forginX[self.group].unique():ifgnotinself.model_.keys():returnelse:Xg=X.query(f"{self.group} == {g}")index=Xg.indexpred=self.model_[g].predict(X=Xg.copy().reset_index(drop=True),)preds=pd.concat([preds,pd.Series(pred,index=index)])returnpreds.sort_index()defpredict_quantile(self,X,q):Xs=[]preds=pd.DataFrame()forginX[self.group].unique():ifgnotinself.model_.keys():returnException(f"Model not trained for {g}")else:Xg=X.query(f"{self.group} == {g}")index=Xg.indexpred=self.model_[g].predict_quantile(X=Xg.copy().reset_index(drop=True),q=q)preds=pd.concat([preds,pd.Series(pred,index=index)])returnpreds.sort_index()
curve_fit=CustomRegression(model=model_selected,cost=regression.LogCosh().cost,method=solver)print(curve_fit)## prints latexcurve_fit.fit(X_train,y_train,sample_weight=df_train["Sample_Weight"],)print(curve_fit)## prints latex with coefficent valuespred=curve_fit.predict(X_test)ll=curve_fit.predict_quantile(X_test,q=0.025)ul=curve_fit.predict_quantile(X_test,q=0.975)
classHoltWinters(BaseEstimator):"""Scikit-learn like interface for Holt-Winters method."""def__init__(self,season_len=24,alpha=0.5,beta=0.5,gamma=0.5):self.beta=betaself.alpha=alphaself.gamma=gammaself.season_len=season_lendeffit(self,series):# note that unlike scikit-learn's fit method, it doesn't learn# the optimal model paramters, alpha, beta, gamma instead it takes# whatever the value the user specified the produces the predicted time# series, this of course can be changed.beta=self.betaalpha=self.alphagamma=self.gammaseason_len=self.season_lenseasonals=self._initial_seasonal(series)# initial valuespredictions=[]smooth=series[0]trend=self._initial_trend(series)predictions.append(smooth)foriinrange(1,len(series)):value=series[i]previous_smooth=smoothseasonal=seasonals[i%season_len]smooth=alpha*(value-seasonal)+(1-alpha)*(previous_smooth+trend)trend=beta*(smooth-previous_smooth)+(1-beta)*trendseasonals[i%season_len]=gamma*(value-smooth)+(1-gamma)*seasonalpredictions.append(smooth+trend+seasonals[i%season_len])self.trend_=trendself.smooth_=smoothself.seasonals_=seasonalsself.predictions_=predictionsreturnselfdef_initial_trend(self,series):season_len=self.season_lentotal=0.0foriinrange(season_len):total+=(series[i+season_len]-series[i])/season_lentrend=total/season_lenreturntrenddef_initial_seasonal(self,series):season_len=self.season_lenn_seasons=len(series)//season_lenseason_averages=np.zeros(n_seasons)forjinrange(n_seasons):start_index=season_len*jend_index=start_index+season_lenseason_average=np.sum(series[start_index:end_index])/season_lenseason_averages[j]=season_averageseasonals=np.zeros(season_len)seasons=np.arange(n_seasons)index=seasons*season_lenforiinrange(season_len):seasonal=np.sum(series[index+i]-season_averages)/n_seasonsseasonals[i]=seasonalreturnseasonalsdefpredict(self,n_preds=10):""" Parameters ---------- n_preds: int, default 10 Predictions horizon. e.g. If the original input time series to the .fit method has a length of 50, then specifying n_preds = 10, will generate predictions for the next 10 steps. Resulting in a prediction length of 60. """predictions=self.predictions_original_series_len=len(predictions)foriinrange(original_series_len,original_series_len+n_preds):m=i-original_series_len+1prediction=self.smooth_+m*self.trend_+self.seasonals_[i%self.season_len]predictions.append(prediction)returnpredictions
deftimeseries_cv_score(params,series,loss_function,season_len=24,n_splits=3):""" Iterating over folds, train model on each fold's training set, forecast and calculate error on each fold's test set. """errors=[]alpha,beta,gamma=paramstime_series_split=TimeSeriesSplit(n_splits=n_splits)fortrain,testintime_series_split.split(series):model=HoltWinters(season_len,alpha,beta,gamma)model.fit(series[train])# evaluate the prediction on the test set onlypredictions=model.predict(n_preds=len(test))test_predictions=predictions[-len(test):]test_actual=series[test]error=loss_function(test_actual,test_predictions)errors.append(error)returnnp.mean(errors)
fromsklearn.baseimportBaseEstimator,ClassifierMixinfromsklearn.utils.validationimportcheck_arrayfromscipy.specialimportsoftmaximportnumpyasnpdef_log_odds_ratio_scale(X):X=np.clip(X,1e-8,1-1e-8)# numerical stabilityX=np.log(X/(1-X))# transform to log-odds-ratio spacereturnXclassFuzzyTargetClassifier(ClassifierMixin,BaseEstimator):def__init__(self,regressor):''' Fits regressor in the log odds ratio space (inverse crossentropy) of target variable. during transform, rescales back to probability space with softmax function Parameters --------- regressor: Sklearn Regressor base regressor to fit log odds ratio space. Any valid sklearn regressor can be used here. '''self.regressor=regressorreturndeffit(self,X,y=None,**kwargs):#ensure passed y is onehotencoded-likey=check_array(y,accept_sparse=True,dtype='numeric',ensure_min_features=1)self.regressors_=[clone(self.regressor)for_inrange(y.shape[1])]foriinrange(y.shape[1]):self._fit_single_regressor(self.regressors_[i],X,y[:,i],**kwargs)returnselfdef_fit_single_regressor(self,regressor,X,ysub,**kwargs):ysub=_log_odds_ratio_scale(ysub)regressor.fit(X,ysub,**kwargs)returnregressordefdecision_function(self,X):all_results=[]forreginself.regressors_:results=reg.predict(X)ifresults.ndim<2:results=results.reshape(-1,1)all_results.append(results)results=np.hstack(all_results)returnresultsdefpredict_proba(self,X):results=self.decision_function(X)results=softmax(results,axis=1)returnresultsdefpredict(self,X):results=self.decision_function(X)results=results.argmax(1)returnresults