importutilsfromsklearn.baseimportBaseEstimator#from sklearn.utils.estimator_checks import check_estimatorfromsklearn.utils.validationimportcheck_X_y,check_array,check_is_fittedfromsklearn.metricsimport(mean_absolute_percentage_errorasmape,# mean_squared_error as mse,root_mean_squared_errorasrmse,mean_absolute_errorasmae,r2_scoreasr2)importnumpyasnpimportpandasaspdfromscipyimportoptimizeaso,statsfrominspectimportgetfullargspec,getsource# import copy
classCustomRegression(BaseEstimator):""" All variables inside the Class should end with underscore """def__str__(self):returnstr(self.model)def__repr__(self):returnstr(self)defrmse(self,pred,true,sample_weight):error=pred-trueloss=error**2cost=np.sqrt(# median is robust to outliers than meannp.mean(sample_weight*loss))returncostdefloss(self,pred,true):returnself.error(pred,true,self.sample_weight)defl1(self,params):returnnp.sum(np.abs(params-self.model.param_initial_guess))defl2(self,params):returnnp.sum((params-self.model.param_initial_guess)**2)defl3(self,params,alpha=0.5):returnalpha*self.l1(params)+(1-alpha)*self.l2(params)defreg(self,params,penalty_type="l3",lambda_reg_weight=1.0):""" lambda_reg_weight = Coefficient of regularization penalty """ifpenalty_type=="l1":penalty=self.l1(params)elifpenalty_type=="l2":penalty=self.l2(params)elifpenalty_type=="l3":penalty=self.l3(params)else:raiseExceptionreturnlambda_reg_weight*penalty/self.sample_sizedefcost(self,params,X,y):pred=self.model.equation(X,*params)returnself.loss(pred,true=y)#+ self.reg(params) # regularization requires standardised parametersdefcheck_enough_samples(self):returnself.enough_samplesdefcheck_optimization_success(self):returnself.optimization.successdeffit(self,X,y,model,method="Nelder-Mead",error=None,sample_weight=None,alpha=0.05):check_X_y(X,y)#Using self.X,self.y = check_X_y(self.X,self.y) removes column namesself.X=Xself.y=yself.n_features_in_=self.X.shape[1]ifsample_weightisNoneorlen(sample_weight)<=1:# sometimes we can give scalar sample weight same for allself.sample_size=self.X.shape[0]else:self.sample_size=sample_weight[sample_weight>0].shape[0]self.sample_weight=(sample_weightifsample_weightisnotNoneelsenp.full(self.sample_size,1)# set Sample_Weight as 1 by default)self.error=(erroriferrorisnotNoneelseself.rmse)self.model=model# copy.deepcopy(model)params=getfullargspec(self.model.equation).argsparams=[paramforparaminparamsifparamnotin['self',"x"]]self.optimization=o.minimize(self.cost,x0=self.model.param_initial_guess,args=(self.X,self.y),method=method,# "L-BFGS-B", "Nelder-Mead", "SLSQP",constraints=self.model.constraints,bounds=self.model.param_bounds,# [# (-1, None) for param in params # variables must be positive# ]options=dict(maxiter=1_000,maxfev=1_000,# xatol=1e-4,# fatol=1e-4,# adaptive=False,))self.dof=self.sample_size-self.model.k-1# n-k-1ifself.dof<=0:self.enough_samples=False# self.popt = [0 for param in params]# st.warning("Not enough samples"returnselfelse:self.enough_samples=True# success = self.optimization.success# if success is False:# st.warning("Did not converge!")self.popt=(self.optimization.x)self.rmse=rmse(self.output(self.X),self.y,sample_weight=self.sample_weight)cl=1-(alpha/2)if"hess_inv"inself.optimization:self.covx=(self.optimization.hess_inv.todense())self.pcov=list(np.diag(self.rmse*np.sqrt(self.covx)))self.popt_with_uncertainty=[f"""{{ ({utils.round_f(popt,4)} ±{utils.round_f(stats.t.ppf(cl,self.dof)*pcov.round(2),2)} )}}"""forpopt,pcovinzip(self.popt,self.pcov)]else:self.popt_with_uncertainty=[f"""{{{utils.round_f(popt,4)}}}"""forpoptinself.popt]self.model.set_fitted_coeff(*self.popt_with_uncertainty)returnselfdefoutput(self,X):return(self.model.equation(X,*self.popt))defget_se_x_cent(self,X_cent):returnself.rmse*np.sqrt((1/self.sample_size)+(X_cent.T).dot(self.covx).dot(X_cent))defget_pred_se(self,X):ifFalse:# self.covx is not None: # this seems to be abnormal. check thisX_cent=X-self.X.mean()se=X_cent.apply(self.get_se_x_cent,axis=1)else:se=self.rmsereturnsedefpredict(self,X,alpha=0.05):check_is_fitted(self)# Check to verify if .fit() has been calledcheck_array(X)#X = check_array(X) # removes column namespred=(self.output(X).astype(np.float32))se=self.get_pred_se(X)cl=1-(alpha/2)ci=stats.t.ppf(cl,self.dof)*sereturnpd.concat([pred,pred+ci,pred-ci],axis=1)classCustomRegressionGrouped(CustomRegression):def__str__(self):x=""forkey,valueinself.get_params().items():x+=f"{str(key)}: {str([utils.round_f(v,4)forvinlist(value)])}\n\n"returnstr(x)def__init__(self,group):super().__init__()self.group=groupself.optimization=dict()self.model=dict()self.enough_samples=dict()self.sample_size=0defget_params(self,how="dict"):params=dict()forkey,valueinself.model.items():params[key]=value.poptifhow=="df":params=pd.DataFrame(params).Treturnparamsdefmodel_group(self,X,y,model,solver):return(CustomRegression().fit(X=X,y=y,model=model,# copy.deepcopy(model)method=solver))defcheck_enough_samples(self):enough_samples=Trueforeinself.enough_samples.values():ifnote:enough_samples=Falsereturnenough_samplesdefcheck_optimization_success(self):success=Trueforoinself.optimization.values():ifnoto.success:success=Falsereturnsuccessdefapply_model_multiple_group(self,X,y,group,model,solver):forginX[self.group].unique():mask=X.eval(f"{self.group} == {g}")m=self.model_group(X[mask],y[mask],model,solver)self.model[g]=mself.enough_samples[g]=m.enough_samplesself.optimization[g]=m.optimizationself.sample_size+=m.sample_sizedeffit(self,X,y,model,method="Nelder-Mead",error=None,sample_weight=None,alpha=0.05):self.apply_model_multiple_group(X,y,self.group,model,method)defpredict(self,X,alpha=0.05):Xs=[]preds=pd.DataFrame()forginX[self.group].unique():ifgnotinself.model.keys():returnelse:Xg=X.query(f"{self.group} == {g}")pred=self.model[g].predict(X=Xg)Xs.append(Xg)preds=pd.concat([preds,pred])returnpreds.sort_index()
curve_fit=CustomRegression()print(curve_fit)## prints latexcurve_fit.fit(X_train,y_train,model=First_Order(),error=regression.LogCosh().cost,method="Nelder-Mead")print(curve_fit)## prints latex with coefficent valuespred=curve_fit.predict(X_test)
classHoltWinters(BaseEstimator):"""Scikit-learn like interface for Holt-Winters method."""def__init__(self,season_len=24,alpha=0.5,beta=0.5,gamma=0.5):self.beta=betaself.alpha=alphaself.gamma=gammaself.season_len=season_lendeffit(self,series):# note that unlike scikit-learn's fit method, it doesn't learn# the optimal model paramters, alpha, beta, gamma instead it takes# whatever the value the user specified the produces the predicted time# series, this of course can be changed.beta=self.betaalpha=self.alphagamma=self.gammaseason_len=self.season_lenseasonals=self._initial_seasonal(series)# initial valuespredictions=[]smooth=series[0]trend=self._initial_trend(series)predictions.append(smooth)foriinrange(1,len(series)):value=series[i]previous_smooth=smoothseasonal=seasonals[i%season_len]smooth=alpha*(value-seasonal)+(1-alpha)*(previous_smooth+trend)trend=beta*(smooth-previous_smooth)+(1-beta)*trendseasonals[i%season_len]=gamma*(value-smooth)+(1-gamma)*seasonalpredictions.append(smooth+trend+seasonals[i%season_len])self.trend_=trendself.smooth_=smoothself.seasonals_=seasonalsself.predictions_=predictionsreturnselfdef_initial_trend(self,series):season_len=self.season_lentotal=0.0foriinrange(season_len):total+=(series[i+season_len]-series[i])/season_lentrend=total/season_lenreturntrenddef_initial_seasonal(self,series):season_len=self.season_lenn_seasons=len(series)//season_lenseason_averages=np.zeros(n_seasons)forjinrange(n_seasons):start_index=season_len*jend_index=start_index+season_lenseason_average=np.sum(series[start_index:end_index])/season_lenseason_averages[j]=season_averageseasonals=np.zeros(season_len)seasons=np.arange(n_seasons)index=seasons*season_lenforiinrange(season_len):seasonal=np.sum(series[index+i]-season_averages)/n_seasonsseasonals[i]=seasonalreturnseasonalsdefpredict(self,n_preds=10):""" Parameters ---------- n_preds: int, default 10 Predictions horizon. e.g. If the original input time series to the .fit method has a length of 50, then specifying n_preds = 10, will generate predictions for the next 10 steps. Resulting in a prediction length of 60. """predictions=self.predictions_original_series_len=len(predictions)foriinrange(original_series_len,original_series_len+n_preds):m=i-original_series_len+1prediction=self.smooth_+m*self.trend_+self.seasonals_[i%self.season_len]predictions.append(prediction)returnpredictions
deftimeseries_cv_score(params,series,loss_function,season_len=24,n_splits=3):""" Iterating over folds, train model on each fold's training set, forecast and calculate error on each fold's test set. """errors=[]alpha,beta,gamma=paramstime_series_split=TimeSeriesSplit(n_splits=n_splits)fortrain,testintime_series_split.split(series):model=HoltWinters(season_len,alpha,beta,gamma)model.fit(series[train])# evaluate the prediction on the test set onlypredictions=model.predict(n_preds=len(test))test_predictions=predictions[-len(test):]test_actual=series[test]error=loss_function(test_actual,test_predictions)errors.append(error)returnnp.mean(errors)