Skip to content

Feature Selection

MDA

from sklearn.cross_validation import ShuffleSplit
from sklearn.metrics import r2_score
from collections import defaultdict

X = boston["data"]
Y = boston["target"]

rf = RandomForestRegressor()
scores = defaultdict(list)
from joblib import Parallel, delayed

class FeaturePermutationMetric():
    def __init__(self, estimator, scoring, scoring_lower_is_better, feature_names=None, random_feature_baseline=None, cv=None, random_state=None, n_repeats_fit = 3, n_repeats_feature_permute = 100, n_jobs=None):
        self.estimator = estimator
        self.scoring = scoring
        self.diff_lower_is_better = not scoring_lower_is_better
        self.random_feature_baseline = random_feature_baseline if random_feature_baseline is not None else np.random.normal(loc=0, scale=1, size=X.shape[0])
        self.n_repeats_fit = n_repeats_fit
        self.n_repeats_feature_permute = n_repeats_feature_permute
        self.random_state = random_state
        self.cv = cv if cv is not None else KFold(n_splits=5, random_state=self.random_state)
        self.feature_names = feature_names

        self.n_jobs = n_jobs if n_jobs is not None else 1

        self.comparison_metric = "scoring_diff_after_permuting"

    def get_score_after_permute_(self, X, y, train_idx, test_idx):
        score_after_permute_list = []

        X_train, X_test, y_train, y_test = X[train_idx], X[test_idx], y[train_idx], y[test_idx]

        for random_state_fit_i in range(0, self.n_repeats_fit, 1):
            random_state_fit = self.random_state + random_state_fit_i

            self.estimator_ = clone(self.estimator)

            self.estimator_.set_params(random_state=random_state_fit)
            self.estimator_.fit(X_train, y_train)

            score_before_permuting = self.scoring(y_test, self.estimator_.predict(X_test))

            for random_state_feature_permuted_i in range(0, self.n_repeats_feature_permute, 1):
                random_state_feature_permuted = self.random_state + random_state_feature_permuted_i
                permutation = np.random.default_rng(random_state_feature_permuted).permutation

                for feature_permuted in range(X_train.shape[1]):
                    X_test_permuted = X_test.copy()
                    X_test_permuted[:, feature_permuted] = permutation(X_test_permuted[:, feature_permuted])

                    score_after_permuting = self.scoring(y_test, self.estimator_.predict(X_test_permuted))
                    score_diff_after_permuting = (score_before_permuting - score_after_permuting)
                    score_diff_perc_after_permuting = score_diff_after_permuting / score_before_permuting

                    score_after_permute_list.append(
                        [fold, random_state, feature_names[feature_permuted], score_diff_after_permuting, score_diff_perc_after_permuting]
                    )
        return score_after_permute_list

    def fit(self, X, y):
        if self.feature_names is None:
            try:
                self.feature_names = X.columns
            except:
                raise Exception("No feature names passed")
                pass
        if self.random_feature_baseline:
            self.random_feature_baseline_col_name = "random_feature_baseline"
            feature_names = np.append(feature_names, [random_feature_baseline_col_name])
            X[random_feature_baseline_col_name] = self.random_feature_baseline

        with sklearn.config_context(
            assume_finite = True,
            skip_parameter_validation = True
        ):
            cv_results_full = (
                Parallel(n_jobs = self.n_jobs)
                (
                    delayed( get_score_after_permute_ )(X, y, train_idx, test_idx)
                    for fold, (train_idx, test_idx) in enumerate(self.cv.split(X, y))
                )
            )
        self.cv_results_full_ = pd.DataFrame(
            columns = ["fold", "random_state", "feature_name", "score_diff_after_permuting", "score_diff_perc_after_permuting"]
            data = cv_results_full
        )

        cv_results_ = (
            self.cv_results_full_
            .groupby("feature_name")
            ["score_diff_after_permuting", "score_diff_perc_after_permuting"]
            .agg(["median", "std", "min", "max"])
            .sort(self.comparison_metric, ascending=not self.diff_lower_is_better)
        )

        self.important_features_ = self.get_important_features_()

        return self

    def get_important_features_(self):
        variables_diff = self.cv_results_.query(f"feature_name != {self.random_feature_baseline_col_name}")[self.comparison_metric]["median"]
        random_baseline_diff = self.cv_results_.query(f"feature_name = {self.random_feature_baseline_col_name}")[self.comparison_metric]

        random_baseline_diff_best = (
            random_baseline_diff
            [
                "max"
                if not self.diff_lower_is_better
                else "min"
            ]
            .iloc[0]
        )

        important_features_mask = (
            variables_diff > random_baseline_best_score
            if not self.diff_lower_is_better
            else variables_diff < random_baseline_best_score
        )

        return self.cv_results_.eval(important_features_mask)

    def transform(self, X, y):
        check_is_fitted(self, "important_features_")
        return X[self.important_features_], y

    def fit_transform(self, X, y):
        return self.fit(X, y).transform(X, y)
fpm = FeaturePermutationMetric(
    RandomForest(n_estimators=10, max_depth=5, n_jobs=-1),
    scoring = root_mean_squared_error,
    scoring_lower_is_better = True,
    n_repeats_fit = 3,
    n_repeats_feature_permute = 100,
    n_jobs = -1,
    random_state = 0
)
fpm.fit_transform(X, y)
fpm.cv_results_
Last Updated: 2024-12-26 ; Contributors: AhmedThahir, web-flow

Comments