Source code for watex.transformers

# -*- coding: utf-8 -*-
#   Licence:BSD 3-Clause
#   Author: LKouadio <etanoyau@gmail.com>
#   created date Wed Jul 14 20:00:26 2021
#   Edited on Mon Sep  6 17:53:06 2021
"""
Gives some efficient tools for data manipulation and transformation.
"""
from __future__ import division, annotations  

import inspect
import warnings 
import numpy as np 
import pandas as pd 
from scipy import sparse 

from sklearn.base import BaseEstimator, TransformerMixin 
from sklearn.cluster import KMeans 
from sklearn.model_selection import train_test_split, StratifiedShuffleSplit 
from sklearn.decomposition import PCA 
from sklearn.preprocessing import StandardScaler, MinMaxScaler 
from sklearn.preprocessing import OrdinalEncoder, OneHotEncoder 

from ._watexlog import watexlog 
from ._typing import F 
from .exceptions import EstimatorError
from .utils.funcutils import parse_attrs, to_numeric_dtypes, assert_ratio 
from .utils.mlutils import ( discretizeCategoriesforStratification, 
      stratifiedUsingDiscretedCategories, existfeatures )
from .utils.hydroutils import categorize_flow 
from .utils.validator import get_estimator_name 

__docformat__='restructuredtext'

_logger = watexlog().get_watex_logger(__name__)


__all__= ['KMeansFeaturizer',
          'StratifiedWithCategoryAdder',
          'StratifiedUsingBaseCategory', 
          'FrameUnion', 
          'DataFrameSelector',
          'CombinedAttributesAdder', 
          'featurize_X'
          ]

[docs] class KMeansFeaturizer ( BaseEstimator, TransformerMixin) : """Transforms numeric data into k-means cluster memberships. This transformer runs k-means on the input data and converts each data point into the ID of the closest cluster. If a target variable is present, it is scaled and included as input to k-means in order to derive clusters that obey the classification boundary as well as group similar points together. Parameters ------------- n_clusters: int, default=7 Number of initial clusters target_scale: float, default=5.0 Apply appropriate scaling and include it in the input data to k-means. n_components: int, optional Number of components for reducted down the predictor. It uses the PCA to reduce down dimension to the importance components. random_state: int, Optional State for shuffling the data Attributes ----------- km_model: KMeans featurization model used to transform Examples -------- >>> # (1) Use a common dataset >>> import matplotlib.pyplot as plt >>> from sklearn.datasets import make_moons >>> from watex.utils.plotutils import plot_voronoi >>> from watex.datasets import load_mxs >>> X, y = make_moons(n_samples=5000, noise=0.2) >>> kmf_hint = KMeansFeaturizer(n_clusters=50, target_scale=10).fit(X,y) >>> kmf_no_hint = KMeansFeaturizer(n_clusters=50, target_scale=0).fit(X, y) >>> fig, ax = plt.subplots(2,1, figsize =(7, 7)) >>> plot_voronoi ( X, y ,cluster_centers=kmf_hint.cluster_centers_, fig_title ='KMeans with hint', ax = ax [0] ) >>> plot_voronoi ( X, y ,cluster_centers=kmf_no_hint.cluster_centers_, fig_title ='KMeans No hint' , ax = ax[1]) <AxesSubplot:title={'center':'KMeans No hint'}> >>> # (2) Use a concrete data set >>> X, y = load_mxs ( return_X_y =True, key ='numeric' ) >>> # get the most principal components >>> from watex.analysis import nPCA >>> Xpca =nPCA (X, n_components = 2 ) # veronoi plot expect two dimensional data >>> kmf_hint = KMeansFeaturizer(n_clusters=7, target_scale=10).fit(Xpca,y) >>> kmf_no_hint = KMeansFeaturizer(n_clusters=7, target_scale=0).fit(Xpca, y) >>> fig, ax = plt.subplots(2,1, figsize =(7, 7)) >>> plot_voronoi ( Xpca, y ,cluster_centers=kmf_hint.cluster_centers_, fig_title ='KMeans with hint', ax = ax [0] ) >>> plot_voronoi ( Xpca, y ,cluster_centers=kmf_no_hint.cluster_centers_, fig_title ='KMeans No hint' , ax = ax[1]) """ def __init__( self, n_clusters=7, target_scale=5.0, random_state=None, n_components=None ): self.n_clusters = n_clusters self.target_scale = target_scale self.random_state = random_state self.n_components=n_components
[docs] def fit(self, X, y=None): """Runs k-means on the input data and finds and updated centroids. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Training vector, where `n_samples` is the number of samples and `n_features` is the number of features. y : array-like of shape (n_samples,) Target vector relative to X. Returns ------- self Fitted estimator. """ if self.n_components: X = PCA(n_components = self.components ).fit_transform (X ) if y is None: # No target variable, just do plain k-means km_model = KMeans(n_clusters=self.n_clusters, n_init=20, random_state=self.random_state) km_model.fit(X) self.km_model_ = km_model self.cluster_centers_ = km_model.cluster_centers_ return self # There is target information. Apply appropriate scaling and include # it in the input data to k-means. data_with_target = np.hstack((X, y[:, np.newaxis] * self.target_scale) ) # Build a pre-training k-means model on data and target km_model_pretrain = KMeans(n_clusters=self.n_clusters,n_init=20, random_state=self.random_state) km_model_pretrain.fit(data_with_target) # Run k-means a second time to get the clusters in the original space # without target info. Initialize using centroids found in pre-training. # Go through a single iteration of cluster assignment and centroid # recomputation. km_model = KMeans(n_clusters=self.n_clusters, init=km_model_pretrain.cluster_centers_[:,:-1] , #[:, :-1] n_init=1, max_iter=1) km_model.fit(X) self.km_model_ = km_model self.cluster_centers_ = self.km_model_.cluster_centers_ return self
[docs] def transform(self, X): """Outputs the closest cluster ID for each input data point. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) New data to predict. Returns ------- labels : ndarray of shape (n_samples,) Index of the cluster each sample belongs to. """ clusters = self.km_model_.predict(X) return clusters[:,np.newaxis]
[docs] def fit_transform(self, X, y=None): """ Fit and transform the data X : {array-like, sparse matrix} of shape (n_samples, n_features) Training vector, where `n_samples` is the number of samples and `n_features` is the number of features. y : array-like of shape (n_samples,) Target vector relative to X. Returns ------- labels : ndarray of shape (n_samples,) Index of the cluster each sample belongs to. """ self.fit(X, y) return self.transform(X, y)
def __repr__(self): """ Pretty format for guidance following the API... """ _t = ("n_clusters", "target_scale", "random_state", "n_components") outm = ( '<{!r}:' + ', '.join( [f"{k}={ False if getattr(self, k)==... else getattr(self, k)!r}" for k in _t]) + '>' ) return outm.format(self.__class__.__name__)
[docs] class StratifiedWithCategoryAdder( BaseEstimator, TransformerMixin ): """ Stratified sampling transformer based on new generated category from numerical attributes and return stratified trainset and test set. Parameters ---------- base_num_feature: str, Numerical features to categorize. threshold_operator: float, The coefficient to divised the numerical features value to normalize the data max_category: Maximum value fits a max category to gather all value greather than. return_train: bool, Return the whole stratified trainset if set to ``True``. usefull when the dataset is not enough. It is convenient to train all the whole trainset rather than a small amount of stratified data. Sometimes all the stratified data are not the similar equal one to another especially when the dataset is not enough. Another way to stratify dataset is to get insights from the dataset and to add a new category as additional mileage. From this new attributes, data could be stratified after categorizing numerical features. Once data is tratified, the new category will be drop and return the train set and testset stratified. For instance:: >>> from watex.transformers import StratifiedWithCategoryAdder >>> stratifiedNumObj= StratifiedWithCatogoryAdder('flow') >>> stratifiedNumObj.fit_transform(X=df) >>> stats2 = stratifiedNumObj.statistics_ Usage ------ In this example, we firstly categorize the `flow` attribute using the ceilvalue (see :func:`~discretizeCategoriesforStratification`) and groupby other values greater than the ``max_category`` value to the ``max_category`` andput in the temporary features. From this features the categorization is performed and stratified the trainset and the test set. Notes ------ If `base_num_feature` is not given, dataset will be stratified using random sampling. """ def __init__( self, base_num_feature=None, threshold_operator = 1., return_train=False, max_category=3, n_splits=1, test_size=0.2, random_state=42 ): self._logging= watexlog().get_watex_logger(self.__class__.__name__) self.base_num_feature= base_num_feature self.return_train= return_train self.threshold_operator= threshold_operator self.max_category = max_category self.n_splits = n_splits self.test_size = test_size self.random_state = random_state self.base_items_ =None self.statistics_=None
[docs] def fit(self, X, y=None): """ Does nothin just for scikit-learn API purpose. """ return self
[docs] def transform(self, X, y=None): """Transform data and populate inspections attributes from hyperparameters. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) New data to predict. y: Ignored Keep just for API purpose. Returns ------- X : {array-like, sparse matrix} of shape (n_samples, n_features) New data transformed. """ if self.base_num_feature is not None: in_c= 'temf_' # discretize the new added category from the threshold value X = discretizeCategoriesforStratification( X, in_cat=self.base_num_feature, new_cat=in_c, divby =self.threshold_operator, higherclass = self.max_category ) self.base_items_ = list( X[in_c].value_counts().index.values) split = StratifiedShuffleSplit(n_splits =self.n_splits, test_size =self.test_size, random_state =self.random_state) for train_index, test_index in split.split(X, X[in_c]): strat_train_set = X.loc[train_index] strat_test_set = X.loc[test_index] #keep a copy of all stratified trainset. strat_train_set_copy = X.loc[ np.delete(X.index, test_index)] train_set, test_set = train_test_split( X, test_size = self.test_size, random_state= self.random_state) if self.base_num_feature is None or self.n_splits==0: self._logging.info('Stratification not applied! Train and test sets' 'were purely generated using random sampling.') return train_set , test_set if self.base_num_feature is not None: # get statistic from `in_c` category proportions into the # the overall dataset o_ =X[in_c].value_counts() /len(X) r_ = test_set[in_c].value_counts()\ /len(test_set) s_ = strat_test_set[in_c].value_counts()\ /len( strat_test_set) r_error , s_error = ((r_/ o_)-1)*100, ((s_/ o_)-1)*100 self.statistics_ = np.c_[np.array(self.base_items_), o_, r_, s_, r_error, s_error ] self.statistics_ = pd.DataFrame(data = self.statistics_, columns =[in_c, 'Overall', 'Random', 'Stratified', 'Rand. %error', 'strat. %error' ]) # set a pandas dataframe for inspections attributes `statistics`. self.statistics_.set_index(in_c, inplace=True) # remove the add category into the set for set in(strat_train_set_copy, strat_train_set, strat_test_set): set.drop([in_c], axis=1, inplace =True) if self.return_train: strat_train_set = strat_train_set_copy # force to remove the temporary features for splitting in # the original dataset if in_c in X.columns: X.drop([in_c], axis =1, inplace=True) return strat_train_set, strat_test_set
[docs] class StratifiedUsingBaseCategory( BaseEstimator, TransformerMixin ): """ Transformer to stratified dataset to have data more representativce into the trainset and the test set especially when data is not large enough. Arguments ---------- base_column: str or int, Hyperparameters and can be index of the base mileage(category) for stratifications. If `base_column` is None, will return the purely random sampling. test_size: float Size to put in the test set. random_state: shuffled number of instance in the overall dataset. default is ``42``. Usage ------ If data is not large enough especially relative number of attributes if much possible to run therisk of introducing a significant sampling biais.Therefore strafied sampling is a better way to avoid a significant biais of sampling survey. For instance:: >>> from watex.transformers import StratifiedUsingBaseCategory >>> from watex.utils.mlutils import load_data >>> df = load_data('data/geo_fdata') >>> stratifiedObj = StratifiedUsingBaseCategory(base_column='geol') >>> stratifiedObj.fit_transform(X=df) >>> stats= stratifiedObj.statistics_ Notes ------ An :attr:`~.statictics_` inspection attribute is good way to observe the test set generated using purely random and the stratified sampling. The stratified sampling has category ``base_column`` proportions almost indentical to those in the full dataset whereas the test set generated using purely random sampling is quite skewed. """ def __init__(self, base_column =None,test_size=0.2, random_state=42): self._logging= watexlog().get_watex_logger(self.__class__.__name__) self.base_column = base_column self.test_size = test_size self.random_state = random_state #create inspection attributes self.statistics_=None self.base_flag_ =False self.base_items_= None
[docs] def fit(self, X, y=None): """ Does nothing , just for API purpose. """ return self
[docs] def transform(self, X, y=None): """ Split dataset into `trainset` and `testset` using stratified sampling. If `base_column` not given will return the `trainset` and `testset` using purely random sampling. {array-like, sparse matrix} of shape (n_samples, n_features) New data to predict. Parameters ----------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Training vector, where `n_samples` is the number of samples and `n_features` is the number of features. y : Ignored Returns -------- strat_train_set, strat_test_set : NDArray, ( n_samples , n_features) train set and test set stratified """ if self.base_column is None: self.stratified = False self._logging.debug( f'Base column is not given``{self.base_column}``.Test set' ' will be generated using purely random sampling.') train_set, test_set = train_test_split( X, test_size = self.test_size ,random_state= self.random_state ) if self.base_column is not None: if isinstance(self.base_column, (int, float)): # use index to find # base colum name. self.base_column = X.columns[int(self.base_column)] self.base_flag_ =True elif isinstance(self.base_column, str): # check wether the column exist into dataframe for elm in X.columns: if elm.find(self.base_column.lower())>=0 : self.base_column = elm self.base_flag_=True break if not self.base_flag_: self._logging.debug( f'Base column ``{self.base_column}`` not found ' f'in `{X.columns}`') warnings.warn( f'Base column ``{self.base_column}`` not found in ' f'{X.columns}.Test set is generated using purely ' 'random sampling.') self.base_items_ = list( X[self.base_column].value_counts().index.values) if self.base_flag_: strat_train_set, strat_test_set = \ stratifiedUsingDiscretedCategories(X, self.base_column) # get statistic from `basecolumn category proportions into the # the whole dataset, in the testset generated using purely random # sampling and the test set generated using the stratified sampling. o_ =X[self.base_column].value_counts() /len(X) r_ = test_set [self.base_column].value_counts()/len(test_set) s_ = strat_test_set[self.base_column].value_counts()/len( strat_test_set) r_error , s_error = ((r_/ o_)-1)*100, ((s_/ o_)-1)*100 self.statistics_ = np.c_[np.array(self.base_items_), o_, r_, s_, r_error, s_error] self.statistics_ = pd.DataFrame(data = self.statistics_, columns =[self.base_column, 'Overall', 'Random', 'Stratified', 'Rand. %error', 'strat. %error' ]) # set a pandas dataframe for inspections attributes `statistics`. self.statistics_.set_index(self.base_column, inplace=True) return strat_train_set, strat_test_set if not self.base_flag_: return train_set, test_set
[docs] class CategorizeFeatures(BaseEstimator, TransformerMixin ): """ Transform numerical features into categorical features and return a new array transformed. Arguments ---------- *num_columns_properties*: list list composed ofnumerical `features name`, list of `features boundaries` with their `categorized names`. Notes ------ From the boundaries values including, features values can be transformed. `num_columns_properties` is composed of: - `feature name` or index equals to 'flow`' or index of flow ='12' - `features boundaries` equals to ``[0., 1., 3]`` may correspond to: - 0: features flow values with equal to 0. By default the begining value like 0 is unranged. - 0-1: replace values ranged between 0 and 1. - 1-3:replace values ranged between 1-3 - >3 : get all values greater than 3. by default categorize values greater than the last values. If the default classification is not suitable, create your own range values like ``[[0-1], [1-3], 3] (1)`` - `categorized names`: Be sure that if the value is provided as without ranging like (1). The number of `categorized values` must be the size of the `features boundaries` +1. For instance, we try to replace all numerical values in column `flow` by :: -FR0 : all fllow egal to 0. -FR1: flow between 0-1 -FR2: flow between 1-3 -FR3: flow greater than 3. As you can see the `features boundaries` [0., 1., 3]size is equal to `categorized name`['FR0', 'FR1', 'FR2', 'FR3'] size +1. Usage ------ Can categorize multiples features by setting each component explained above as list of tuples. For instance we try to replace the both numerical features `power` and `flow` in the dataframe by their corresponding `features` boundaries. Here is how to set the `num_columns_properties` :: num_columns_porperties =[ ('flow', ([0, 1, 3], ['FR0', 'FR1', 'FR2', 'FR3'])), ('power', ([10, 30, 100], ['pw0', 'pw1', 'pw2', 'pw4'])) ] Examples -------- >>> from watex.transformers import CategorizeFeatures >>> from watex.utils.mlutils import load_data >>> df= mlfunc.load_data('data/geo_fdata') >>> catObj = CategorizeFeatures( num_columns_properties=num_columns_porperties ) >>> X= catObj.fit_transform(df) >>> catObj.in_values_ >>> catObj.out_values_ """ def __init__(self, num_columns_properties=None): self._logging= watexlog().get_watex_logger(self.__class__.__name__) self.num_columns_properties=num_columns_properties self.base_columns_=None self.in_values_ = None self.out_values_ = None self.base_columns_ix_=None def fit(self, X, y=None): """ Parameters ---------- X: Ndarray ( M x N matrix where ``M=m-samples``, & ``N=n-features``) Training set; Denotes data that is observed at training and prediction time, used as independent variables in learning. When a matrix, each sample may be represented by a feature vector, or a vector of precomputed (dis)similarity with each training sample. :code:`X` may also not be a matrix, and may require a feature extractor or a pairwise metric to turn it into one before learning a model. y: array-like, shape (M, ) ``M=m-samples``, train target; Denotes data that may be observed at training time as the dependent variable in learning, but which is unavailable at prediction time, and is usually the target of prediction. Returns -------- ``Self`: Instanced object for methods chaining """ return self def transform(self, X, y=None) : """ Transform the data and return new array. Can straightforwardly call :meth:`~.sklearn.TransformerMixin.fit_transform` inherited from scikit_learn. """ self.base_columns_ = [n_[0] for n_ in self.num_columns_properties] self.in_values_ = [n_[1][0] for n_ in self.num_columns_properties] self.out_values_ = [n_[1][1] for n_ in self.num_columns_properties] X_dtype ='' if isinstance(self.base_columns_, (list, tuple)): self.base_columns_ =np.array(self.base_columns_) if np.array(self.base_columns_).dtype in ['int', 'float']: self.base_columns_.astype(np.int32) #in the case indexes are provided self.base_columns_ix_ = self.base_columns_ # check whether X is unique array or array_like. try: X.shape[1] except IndexError: if isinstance(X, pd.Series): # if X.__class__.__name__ =='Series': X= X.values X_dtype = 'unik__' except RuntimeError : # handle other possible errors. X_dtype = 'unik__' else : if X.shape[1]==1: X=X.reshape((X.shape[0]),) X_dtype ='unik__' if X_dtype =='unik__': X = categorize_flow(X, self.in_values_[0], classes=self.out_values_[0] ) self.base_columns_ix_ =(0,) return X # now if isinstance(X, pd.DataFrame): X= self.ascertain_mumerical_values(X) if self.base_columns_ix_ is not None: for ii, ix_ in enumerate(self.base_columns_ix_): X[:, ix_]=categorize_flow(X[:, ix_], self.in_values_[ii], classes=self.out_values_[ii] ) self.base_columns_ =tuple(self.base_columns_) self.in_values_ = tuple(self.in_values_) self.out_values_ = tuple(self.out_values_) self.base_columns_ix_ = tuple(self.base_columns_ix_) return X def ascertain_mumerical_values(self, X, y=None): """ Retreive indexes from mumerical attributes and return a dataframe values especially if `X` is dataframe else returns values of array.""" # ascertain dataframe whether there is an categorial values. try: # if isinstance(X, pd.DataFrame) list_of_numerical_cols= X.select_dtypes( exclude=['object']).columns.tolist() except AttributeError: # if 'numpy.ndarray' object has no attribute 'select_dtypes' list_of_numerical_cols= [] t_=[] # return X if no numeruical columns found if len(list_of_numerical_cols) ==0 : self._logging.info('`None`numerical columns detected.') warnings.warn('None numerical columns detected.It seems') return X.values for bcol in self.base_columns_: for dfcols in list_of_numerical_cols: if dfcols.lower() == bcol.lower(): t_.append(dfcols) break if len(t_) ==0: self._logging.info( f'Numerical features `{self.base_columns_}`not found in'\ '`{list_of_numerical_cols}`') return X.values # get base columns index self.base_columns_ix_ =[ int(X.columns.get_loc(col_n)) for col_n in self.base_columns_] return X.values
[docs] class CombinedAttributesAdder(BaseEstimator, TransformerMixin ): """ Combined attributes from litteral string operators, indexes or names. Create a new attribute using features index or litteral string operator. Inherits from scikit_learn `BaseEstimator`and `TransformerMixin` classes. Arguments ---------- *attribute_names* : list of str , optional List of features for combinaison. Decide to combine new feature values by from `operator` parameters. By default, the combinaison it is ratio of the given attribute/numerical features. For instance, ``attribute_names=['lwi', 'ohmS']`` will divide the feature 'lwi' by 'ohmS'. *attributes_indexes* : list of int, index of each feature/feature for experience combinaison. User warning should raise if any index does match the dataframe of array columns. *operator*: str, default ='/' Type of operation to perform. Can be ['/', '+', '-', '*', '%'] Returns -------- X : np.ndarray, A new array contained the new data from the `attrs_indexes` operation. If `attr_names` and attr_indexes is ``None``, will return the same array like beginning. Notes ------ A litteral string operator can be used. For instance dividing two numerical features can be illustrated using the word "per" separated by underscore like "_per_" For instance, to create a new feature based on the division of the features ``lwi`` and ``ohmS``, the litteral string operator that holds the ``attribute_names`` could be:: attribute_names='lwi_per_ohmS' The same litteral string is valid for multiplication (_mul_) , substraction (_sub_) , modulo (_mod_) and addition (_add_). However, indexes of features can also use rather than `attribute_names` providing the `operator` parameters. Or it could be the indexes of both features in the array like ``attributes_ix =[(10, 9)]`` which means the `lwi` and `ohmS` are found at index ``10`` and ``9``respectively. Furthermore, multiples operations can be set by adding mutiples litteral string operator into a list like ``attributes_ix = [ 'power_per_magnitude', 'ohmS_per_lwi']``. Examples -------- >>> import pandas as pd >>> from watex.transformers import CombinedAttributesAdder >>> from watex.datasets.dload import load_bagoue >>> X, y = load_bagoue (as_frame =True ) >>> cobj = CombinedAttributesAdder (attribute_names='lwi_per_ohmS') >>> Xadded = cobj.fit_transform(X) >>> cobj.attribute_names_ ... ['num', 'name', 'east', 'north', 'power', 'magnitude', 'shape', 'type', 'sfi', 'ohmS', 'lwi', 'geol', 'lwi_div_ohmS'] # new attributes with 'lwi'/'ohmS' >>> df0 = pd.DataFrame (Xadded, columns = cobj.attribute_names_) >>> df0['lwi_div_ohmS'] ... 0 0.0 1 0.000002 2 0.000005 3 0.000004 4 0.000008 426 0.453359 427 0.382985 428 0.476676 429 0.457371 430 0.379429 Name: lwi_div_ohmS, Length: 431, dtype: object >>> cobj = CombinedAttributesAdder ( attribute_names=['lwi', 'ohmS', 'power'], operator='+') >>> df0 = pd.DataFrame (cobj.fit_transform(X), columns = cobj.attribute_names_) >>> df0.iloc [:, -1] ... 0 1777.165142 1 1207.551531 2 850.5625 3 1051.943553 4 844.095833 426 1708.8585 427 1705.5375 428 1568.9825 429 1570.15625 430 1666.9185 Name: lwi_add_ohmS_add_power, Length: 431, dtype: object >>> cobj = CombinedAttributesAdder ( attribute_indexes =[1,6], operator='+') >>> df0 = pd.DataFrame (cobj.fit_transform(X), columns = cobj.attribute_names_) >>> df0.iloc [:, -1] ... 0 b1W 1 b2V 2 b3V 3 b4W 4 b5W 426 b427W 427 b428V 428 b429V 429 b430V 430 b431V Name: name_add_shape, Length: 431, dtype: object """ _op ={'times': ('times', 'prod', 'mul', '*', 'x'), 'add': ('add', '+', 'plus'), 'div': ('quot', '/', 'div', 'per'), 'sub': ('sub', '-', 'less'), 'mod': ('mod', '%'), } def __init__( self, attribute_names =None, attribute_indexes = None, operator: str='/', ): self.attribute_names=attribute_names self.attribute_indexes= attribute_indexes self.operator=operator self.attribute_names_=None
[docs] def fit(self, X, y=None ): """ Parameters ---------- X: Ndarray ( M x N matrix where ``M=m-samples``, & ``N=n-features``) Training set; Denotes data that is observed at training and prediction time, used as independent variables in learning. When a matrix, each sample may be represented by a feature vector, or a vector of precomputed (dis)similarity with each training sample. :code:`X` may also not be a matrix, and may require a feature extractor or a pairwise metric to turn it into one before learning a model. y: array-like, shape (M, ) ``M=m-samples``, train target; Denotes data that may be observed at training time as the dependent variable in learning, but which is unavailable at prediction time, and is usually the target of prediction. Returns -------- self: `CombinedAttributesAdder` instance returns ``self`` for easy method chaining. """ return self
[docs] def transform(self, X): """ Tranform X and return new array with experience attributes combinaison. Parameters ---------- X: Ndarray ( M x N matrix where ``M=m-samples``, & ``N=n-features``) Training set; Denotes data that is observed at training and prediction time, used as independent variables in learning. When a matrix, each sample may be represented by a feature vector, or a vector of precomputed (dis)similarity with each training sample. :code:`X` may also not be a matrix, and may require a feature extractor or a pairwise metric to turn it into one before learning a model. Returns -------- X: NDarray, Ndarray ( M x N+1 matrix) returns X transformed (``M=m-samples``, & ``N=n+1-features``) with attribute combined. .. versionadded:: 0.1.3 """ columns =[] self.operator = self._get_operator ( self.operator or self.attribute_names) if self.operator is None: warnings.warn("None or Invalid operator cannot be use for " "attribute combinaisons.") if isinstance (self.attribute_names, str): self.attribute_names_ = parse_attrs(self.attribute_names) elif isinstance(self.attribute_names, (list, tuple, np.ndarray) ): self.attribute_names_ = self.attribute_names if isinstance(X, pd.DataFrame) : # asset wether attributes exists # no raise errors, return the dataframe if self.attribute_names_ : existfeatures(X, self.attribute_names_ ) # get the index of attributes from dataframe if self.attribute_names_: self.attribute_indexes = list(map ( lambda o: list(X.columns).index (o), self.attribute_names_) ) elif self.attribute_indexes : # try : self.attribute_names_ = list(map ( lambda ix: list(X.columns)[ix], self.attribute_indexes) ) # except IndexError: # raise IndexError("List of index is out the range.") columns = X.columns X= to_numeric_dtypes(X) X= X.values if self.attribute_indexes: X = self._operate(X) if self.attribute_names_ is not None: self.attribute_names_ = list(columns) + ([ f'_{self.operator}_'.join([ v for v in self.attribute_names_ ]) ] if self._isfine else []) return X
def _get_operator (self, operator): """ Get operator for combining attribute """ for k, v in self._op.items() : for o in v: if operator.find(o) >=0 : self.operator = k return self.operator return def _operate (self, X): """ Operate data from indexes """ def weird_division(ix_): """ Replace 0. value to 1 in denominator for division calculus.""" return ix_ if ix_!=0. else 1 msg=("Unsupported operand type(s)! index provided {} doesn't match" " any numerical features. Experience combinaison attributes" " is not possible.") self._isfine=True Xc =X[:, self.attribute_indexes] cb= Xc[:, 0 ] ; Xc=Xc[:, 1: ] for k in range (Xc.shape[1]): try : if self.operator =='mod': cb %= Xc[:, k] if self.operator =='add': cb += Xc[:, k] if self.operator =='sub': cb -= Xc[:, k] if self.operator =='div': # if the denominator contain nan or 0 # a weird division is triggered and replace # the denominator by 1 try : cb /= Xc[:, k] except ZeroDivisionError: wv= np.array( list(map(weird_division, Xc[:, k]))) cb /=wv except ( TypeError, RuntimeError, RuntimeWarning): warnings.warn(msg.format( self.attribute_indexes[1:][k])) if self.operator =='x': cb *= Xc[:, k] except: warnings.warn(msg.format(self.attribute_indexes[1:][k])) self._isfine =False X = np.c_[X, cb ] if self._isfine else X return X
[docs] class DataFrameSelector(BaseEstimator, TransformerMixin): """ Select data from specific attributes for column transformer. Select only numerical or categorial columns for operations. Work as the same like sckit-learn `make_colum_tranformer` Arguments ---------- *attribute_names*: list or array_like List of the main columns to keep the data *select_type*: str Automatic numerical and categorial selector. If `select_type` is ``num``, only numerical values in dataframe are retrieved else ``cat`` for categorials attributes. Returns ------- X: ndarray New array with composed of data of selected `attribute_names`. Examples --------- >>> from watex.transformers import DataFrameSelector >>> from watex.utils.mlutils import load_data >>> df = mlfunc.load_data('data/geo_fdata') >>> XObj = DataFrameSelector(attribute_names=['power','magnitude','sfi'], ... select_type=None) >>> cdf = XObj.fit_transform(df) """ def __init__(self, attribute_names=None, select_type =None): self._logging= watexlog().get_watex_logger(self.__class__.__name__) self.attribute_names = attribute_names self.select_type = select_type
[docs] def fit(self, X, y=None): """ Select the Data frame Parameters ---------- X: Ndarray ( M x N matrix where ``M=m-samples``, & ``N=n-features``) Training set; Denotes data that is observed at training and prediction time, used as independent variables in learning. When a matrix, each sample may be represented by a feature vector, or a vector of precomputed (dis)similarity with each training sample. :code:`X` may also not be a matrix, and may require a feature extractor or a pairwise metric to turn it into one before learning a model. y: array-like, shape (M, ) ``M=m-samples``, train target; Denotes data that may be observed at training time as the dependent variable in learning, but which is unavailable at prediction time, and is usually the target of prediction. Returns -------- self: `DataFrameSelector` instance returns ``self`` for easy method chaining. """ return self
[docs] def transform(self, X): """ Transform data and return numerical or categorial values. X : {array-like, sparse matrix} of shape (n_samples, n_features) Training vector, where `n_samples` is the number of samples and `n_features` is the number of features. """ if isinstance(self.attribute_names, str): self.attribute_names =[self.attribute_names] if self.attribute_names is not None: t_= [] for in_attr in self.attribute_names: for attr_ in X.columns: if in_attr.lower()== attr_.lower(): t_.append(attr_) break if len(t_)==0: self._logging.warn(f' `{self.attribute_names}` not found in the' '`{X.columns}`.') warnings.warn('None attribute in the dataframe match' f'`{self.attribute_names}.') if len(t_) != len(self.attribute_names): mm_= set(self.attribute_names).difference(set(t_)) warnings.warn( f'Value{"s" if len(mm_)>1 else""} {list(mm_)} not found.' f" Only `{t_}`match{'es' if len(t_) <1 else ''}" " the dataframe features.") self._logging.warning( f'Only `{t_}` can be considered as dataframe attributes.') self.attribute_names =t_ return X[self.attribute_names].values try: if self.select_type.lower().find('num')>=0: self.select_type =='num' elif self.select_type.lower().find('cat')>=0: self.select_type =='cat' else: self.select_type =None except: warnings.warn(f'`Select_type`` given argument ``{self.select_type}``' ' seems to be wrong. Should defaultly return the ' 'Dataframe value.', RuntimeWarning) self._logging.warnings('A given argument `select_type`seems to be' 'wrong %s. Use ``cat`` or ``num`` for ' 'categorical or numerical attributes ' 'respectively.'% inspect.signature(self.__init__)) self.select_type =None if self.select_type is None: warnings.warn('Arguments of `%s` arguments %s are all None. Should' ' returns the dataframe values.'% (repr(self), inspect.signature (self.__init__))) self._logging.warning('Object arguments are None.' ' Should return the dataframe values.') return X.values if self.select_type =='num': obj_columns= X.select_dtypes(include='number').columns.tolist() elif self.select_type =='cat': obj_columns= X.select_dtypes(include=['object']).columns.tolist() self.attribute_names = obj_columns return X[self.attribute_names].values
def __repr__(self): return self.__class__.__name__
[docs] class FrameUnion (BaseEstimator, TransformerMixin) : """ Unified categorial and numerical features after scaling and and categorial features encoded. Use :class:`~watex.tranformers.DataframeSelector` class to define the categorial features and numerical features. Arguments --------- num_attributes: list List of numerical attributes cat_attributes: list list of categorial attributes scale: bool Features scaling. Default is ``True`` and use `:class:~sklearn.preprocessing.StandarScaler` imput_data: bool , Replace the missing data. Default is ``True`` and use :attr:`~sklearn.impute.SimpleImputer.strategy`. param_search: bool, If `num_attributes` and `cat_attributes`are None, the numerical features and categorial features` should be found automatically. Default is ``True`` scale_mode:bool, Mode of data scaling. Default is ``StandardScaler``but can be a ``MinMaxScaler`` encode_mode: bool, Mode of data encoding. Default is ``OrdinalEncoder`` but can be ``OneHotEncoder`` but creating a sparse matrix. Once selected, the new shape of ``X`` should be different from the original shape. Example ------- >>> from watex.datasets import fetch_data >>> from watex.utils.transformers import FrameUnion >>> X_= fetch_data ('Bagoue original').get('data=dfy1') >>> frameObj = FrameUnion(X_, encoding =OneHotEncoder) >>> X= frameObj.fit_transform(X_) """ def __init__( self, num_attributes =None , cat_attributes =None, scale =True, imput_data=True, encode =True, param_search ='auto', strategy ='median', scale_mode ='StandardScaler', encode_mode ='OrdinalEncoder' ): self._logging = watexlog().get_watex_logger(self.__class__.__name__) self.num_attributes = num_attributes self.cat_attributes = cat_attributes self.param_search = param_search self.imput_data = imput_data self.strategy =strategy self.scale = scale self.encode = encode self.scale_mode = scale_mode self.encode_mode = encode_mode self.X_=None self.X_num_= None self.X_cat_ =None self.num_attributes_=None self.cat_attributes_=None self.attributes_=None
[docs] def fit(self, X, y=None): """ Does nothing. Just for scikit-learn purpose. """ return self
[docs] def transform(self, X): """ Transform data and return X numerical and categorial encoded values. Parameters ----------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Training vector, where `n_samples` is the number of samples and `n_features` is the number of features. Returns -------- X : {array-like, sparse matrix} of shape (n_samples, n_features) transformed arraylike, where `n_samples` is the number of samples and `n_features` is the number of features. """ if self.scale_mode.lower().find('stand')>=0: self.scale_mode = 'StandardScaler' elif self.scale_mode.lower().find('min')>=0: self.scale_mode = 'MinMaxScaler' if self.encode_mode.lower().find('ordinal')>=0: self.encode_mode = 'OrdinalEncoder' elif self.encode_mode.lower().find('hot') >=0: self.encode_mode = 'OneHotEncoder' numObj = DataFrameSelector(attribute_names= self.num_attributes, select_type='num') catObj =DataFrameSelector(attribute_names= self.cat_attributes, select_type='cat') num_arrayObj = numObj.fit_transform(X) cat_arrayObj = catObj.fit_transform(X) self.num_attributes_ = numObj.attribute_names self.cat_attributes_ = catObj.attribute_names self.attributes_ = self.num_attributes_ + self.cat_attributes_ self.X_num_= num_arrayObj.copy() self.X_cat_ =cat_arrayObj.copy() self.X_ = np.c_[self.X_num_, self.X_cat_] if self.imput_data : from sklearn.impute import SimpleImputer imputer_obj = SimpleImputer(missing_values=np.nan, strategy=self.strategy) num_arrayObj =imputer_obj.fit_transform(num_arrayObj) if self.scale : if self.scale_mode == 'StandardScaler': scaler = StandardScaler() if self.scale_mode =='MinMaxScaler': scaler = MinMaxScaler() num_arrayObj = scaler.fit_transform(num_arrayObj) if self.encode : if self.encode_mode =='OrdinalEncoder': encoder = OrdinalEncoder() elif self.encode_mode =='OneHotEncoder': encoder = OneHotEncoder(sparse_output=True) cat_arrayObj= encoder.fit_transform(cat_arrayObj ) # sparse matrix of type class <'numpy.float64'>' stored # element in compressed sparses raw format . To convert the sense # matrix to numpy array , we need to just call 'to_array()'. warnings.warn(f'Sparse matrix `{cat_arrayObj.shape!r}` is converted' ' in dense Numpy array.', UserWarning) # cat_arrayObj= cat_arrayObj.toarray() try: X= np.c_[num_arrayObj,cat_arrayObj] except ValueError: # For consistency use the np.concatenate rather than np.c_ X= np.concatenate((num_arrayObj,cat_arrayObj), axis =1) if self.encode_mode =='OneHotEncoder': warnings.warn('Use `OneHotEncoder` to encode categorial features' ' generates a Sparse matrix. X is henceforth ' ' composed of sparse matrix. The new dimension is' ' {0} rather than {1}.'.format(X.shape, self.X_.shape), UserWarning) self._logging.info('X become a spared matrix. The new shape is' '{X.shape!r} against the orignal ' '{self.X_shape!r}') return X
[docs] def featurize_X ( X, y =None, *, n_clusters:int=7, target_scale:float= 5 , random_state:F|int=None, n_components: int=None, model: F =None, split_X_y:bool = False, test_ratio:float|str= .2 , shuffle:bool=True, return_model:bool=..., to_sparse: bool=..., sparsity:str ='coo' ): """ Featurize X with the cluster based on the KMeans featurization Parameters ----------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Training vector, where `n_samples` is the number of samples and `n_features` is the number of features. Note that when `n_components` is set, sparse matrix for `X` is not acceptable. y : array-like of shape (n_samples,) Target vector relative to X. n_clusters: int, default=7 Number of initial clusters target_scale: float, default=5.0 Apply appropriate scaling and include it in the input data to k-means. n_components: int, optional Number of components for reduced down the predictor X. It uses the PCA to reduce down dimension to the importance features. model: :class:`KMeansFeaturizer`. KMeasFeaturizer model. Model can be provided to featurize the test data separated from the train data. .. versionadded:: 0.2.4 random_state: int, Optional State for shuffling the data split_X_y: bool, default=False, Split the X, y into train data and test data according to the test size test_ratio: int, default=0.2 ratio to keep for a test data. shuffle: bool, default=True Suffling the data set. return_model: bool, default =False If ``True`` return the KMeans featurization mode and the transformed X. to_sparse: bool, default=False Convert X data to sparse matrix, by default the sparse matrix is coordinates matrix (COO) sparsity:str, default='coo' Kind of sparse matrix use to convert `X`. It can be ['csr'|'coo']. Any other values with return a coordinates matrix unless `to_sparse` is turned to ``False``. .. versionadded:: 0.2.4 Returns -------- X, y : NDArray shape (m_samples, n_features +1) or \ shape (m_samples, n_sparse_features) Returns NDArray of m_features plus the clusters features from KMF feturization procedures. The `n_sparse_features` is created if `to_sparse` is set to ``True``. X, y, model: NDarray and KMF models Returns transformed array X and y and model if ``return_model`` is set to ``True``. Array like train data X transformed and test if `split_X_y` is set to ``True``. X, Xtest, y, ytest: NDArray (KMF), ArrayLike Split tuple is returned when `split_X_y=True``. Note ----- Everytimes ``return_model=True``, KMF model (:class:`KMeansFeaturizer`) is appended to the return results. Examples -------- >>> import numpy as np >>> from watex.transformers import featurize_X >>> X = np.random.randn (12 , 7 ) ; y = np.arange(12 ) >>> y[ y < 6 ]= 0 ; y [y >0 ]= 1 # for binary data >>> Xtransf , _ = featurize_X (X, to_sparse =False) >>> X.shape, Xtransf.shape ((12, 7), (12, 8)) >>> Xtransf, y = featurize_X (X,y, to_sparse =True ) >>> Xtransf , y (<12x8 sparse matrix of type '<class 'numpy.float64'>' with 93 stored elements in COOrdinate format>, array([0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1])) >>> featurize_X (X,y, to_sparse =True, split_X_y=True ) (<9x8 sparse matrix of type '<class 'numpy.float64'>' with 71 stored elements in COOrdinate format>, <3x8 sparse matrix of type '<class 'numpy.float64'>' with 24 stored elements in COOrdinate format>, array([0, 1, 1, 0, 0, 0, 0, 1, 1]), array([0, 1, 1])) >>> *_, kmf_model = featurize_X (X,y, to_sparse =True, return_model =True) >>> kmf_model <'KMeansFeaturizer':n_clusters=7, target_scale=5, random_state=None, n_components=None> """ # set False to value use # ellipsis... if return_model is ...: return_model =False if to_sparse is ...: to_sparse =False # if sparse convert X to sparse matrix if to_sparse: sparsity= str(sparsity).lower().strip() d_sparsity = dict ( csr = sparse.csr_matrix , coo= sparse.coo_matrix ) sparse_func = sparse.coo_matrix if sparsity not in ( 'coo', 'csr') else d_sparsity.get (sparsity ) # reduce down feature to two. kmf_data = [] if n_components: from watex.analysis import nPCA X =nPCA (X, n_components = n_components ) if split_X_y: X, test_data , y, y_test = train_test_split ( X, y ,test_size = assert_ratio(test_ratio) , random_state = random_state , shuffle =shuffle) # create a kmeaturization with hint model if model: if get_estimator_name(model ) !='KMeansFeaturizer': raise EstimatorError( "Wrong model estimator. Expect 'KMeansFeaturizer'" f" as the valid estimator. Got {get_estimator_name (model)!r}") if callable ( model ): model = model (n_clusters=n_clusters, target_scale=target_scale, random_state = random_state) else: model = KMeansFeaturizer( n_clusters=n_clusters, target_scale=target_scale, random_state = random_state, ).fit(X,y) ### Use the k-means featurizer to generate cluster features transf_cluster = model.transform(X) ### Form new input features with cluster features # training_with_cluster Xkmf = np.concatenate ( (X, transf_cluster), axis =1 ) if to_sparse: Xkmf= sparse_func(Xkmf ) kmf_data.append(Xkmf) kmf_data.append(y) if split_X_y: transf_test_cluster= model.transform(test_data) test_with_cluster = np.concatenate ( (test_data, transf_test_cluster),axis =1 ) if sparse: test_with_cluster= sparse_func(test_with_cluster) kmf_data.insert(1,test_with_cluster ) kmf_data.append( y_test) return tuple (kmf_data ) + (model, ) \ if return_model else tuple(kmf_data )