# -*- coding: utf-8 -*-
# Licence:BSD 3-Clause
# Author: LKouadio <etanoyau@gmail.com>
# created date Wed Jul 14 20:00:26 2021
# Edited on Mon Sep 6 17:53:06 2021
"""
Gives some efficient tools for data manipulation and transformation.
"""
from __future__ import division, annotations
import inspect
import warnings
import numpy as np
import pandas as pd
from scipy import sparse
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.cluster import KMeans
from sklearn.model_selection import train_test_split, StratifiedShuffleSplit
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.preprocessing import OrdinalEncoder, OneHotEncoder
from ._watexlog import watexlog
from ._typing import F
from .exceptions import EstimatorError
from .utils.funcutils import parse_attrs, to_numeric_dtypes, assert_ratio
from .utils.mlutils import ( discretizeCategoriesforStratification,
stratifiedUsingDiscretedCategories, existfeatures )
from .utils.hydroutils import categorize_flow
from .utils.validator import get_estimator_name
__docformat__='restructuredtext'
_logger = watexlog().get_watex_logger(__name__)
__all__= ['KMeansFeaturizer',
'StratifiedWithCategoryAdder',
'StratifiedUsingBaseCategory',
'FrameUnion',
'DataFrameSelector',
'CombinedAttributesAdder',
'featurize_X'
]
[docs]
class KMeansFeaturizer ( BaseEstimator, TransformerMixin) :
"""Transforms numeric data into k-means cluster memberships.
This transformer runs k-means on the input data and converts each data point
into the ID of the closest cluster. If a target variable is present, it is
scaled and included as input to k-means in order to derive clusters that
obey the classification boundary as well as group similar points together.
Parameters
-------------
n_clusters: int, default=7
Number of initial clusters
target_scale: float, default=5.0
Apply appropriate scaling and include it in the input data to k-means.
n_components: int, optional
Number of components for reducted down the predictor. It uses the PCA
to reduce down dimension to the importance components.
random_state: int, Optional
State for shuffling the data
Attributes
-----------
km_model: KMeans featurization model used to transform
Examples
--------
>>> # (1) Use a common dataset
>>> import matplotlib.pyplot as plt
>>> from sklearn.datasets import make_moons
>>> from watex.utils.plotutils import plot_voronoi
>>> from watex.datasets import load_mxs
>>> X, y = make_moons(n_samples=5000, noise=0.2)
>>> kmf_hint = KMeansFeaturizer(n_clusters=50, target_scale=10).fit(X,y)
>>> kmf_no_hint = KMeansFeaturizer(n_clusters=50, target_scale=0).fit(X, y)
>>> fig, ax = plt.subplots(2,1, figsize =(7, 7))
>>> plot_voronoi ( X, y ,cluster_centers=kmf_hint.cluster_centers_,
fig_title ='KMeans with hint', ax = ax [0] )
>>> plot_voronoi ( X, y ,cluster_centers=kmf_no_hint.cluster_centers_,
fig_title ='KMeans No hint' , ax = ax[1])
<AxesSubplot:title={'center':'KMeans No hint'}>
>>> # (2) Use a concrete data set
>>> X, y = load_mxs ( return_X_y =True, key ='numeric' )
>>> # get the most principal components
>>> from watex.analysis import nPCA
>>> Xpca =nPCA (X, n_components = 2 ) # veronoi plot expect two dimensional data
>>> kmf_hint = KMeansFeaturizer(n_clusters=7, target_scale=10).fit(Xpca,y)
>>> kmf_no_hint = KMeansFeaturizer(n_clusters=7, target_scale=0).fit(Xpca, y)
>>> fig, ax = plt.subplots(2,1, figsize =(7, 7))
>>> plot_voronoi ( Xpca, y ,cluster_centers=kmf_hint.cluster_centers_,
fig_title ='KMeans with hint', ax = ax [0] )
>>> plot_voronoi ( Xpca, y ,cluster_centers=kmf_no_hint.cluster_centers_,
fig_title ='KMeans No hint' , ax = ax[1])
"""
def __init__(
self,
n_clusters=7,
target_scale=5.0,
random_state=None,
n_components=None
):
self.n_clusters = n_clusters
self.target_scale = target_scale
self.random_state = random_state
self.n_components=n_components
[docs]
def fit(self, X, y=None):
"""Runs k-means on the input data and finds and updated centroids.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
Training vector, where `n_samples` is the number of samples and
`n_features` is the number of features.
y : array-like of shape (n_samples,)
Target vector relative to X.
Returns
-------
self
Fitted estimator.
"""
if self.n_components:
X = PCA(n_components = self.components ).fit_transform (X )
if y is None:
# No target variable, just do plain k-means
km_model = KMeans(n_clusters=self.n_clusters,
n_init=20,
random_state=self.random_state)
km_model.fit(X)
self.km_model_ = km_model
self.cluster_centers_ = km_model.cluster_centers_
return self
# There is target information. Apply appropriate scaling and include
# it in the input data to k-means.
data_with_target = np.hstack((X, y[:, np.newaxis] * self.target_scale) )
# Build a pre-training k-means model on data and target
km_model_pretrain = KMeans(n_clusters=self.n_clusters,n_init=20,
random_state=self.random_state)
km_model_pretrain.fit(data_with_target)
# Run k-means a second time to get the clusters in the original space
# without target info. Initialize using centroids found in pre-training.
# Go through a single iteration of cluster assignment and centroid
# recomputation.
km_model = KMeans(n_clusters=self.n_clusters,
init=km_model_pretrain.cluster_centers_[:,:-1] , #[:, :-1]
n_init=1,
max_iter=1)
km_model.fit(X)
self.km_model_ = km_model
self.cluster_centers_ = self.km_model_.cluster_centers_
return self
def __repr__(self):
""" Pretty format for guidance following the API... """
_t = ("n_clusters", "target_scale", "random_state", "n_components")
outm = ( '<{!r}:' + ', '.join(
[f"{k}={ False if getattr(self, k)==... else getattr(self, k)!r}"
for k in _t]) + '>'
)
return outm.format(self.__class__.__name__)
[docs]
class StratifiedWithCategoryAdder( BaseEstimator, TransformerMixin ):
"""
Stratified sampling transformer based on new generated category
from numerical attributes and return stratified trainset and test set.
Parameters
----------
base_num_feature: str,
Numerical features to categorize.
threshold_operator: float,
The coefficient to divised the numerical features value to
normalize the data
max_category: Maximum value fits a max category to gather all
value greather than.
return_train: bool,
Return the whole stratified trainset if set to ``True``.
usefull when the dataset is not enough. It is convenient to
train all the whole trainset rather than a small amount of
stratified data. Sometimes all the stratified data are
not the similar equal one to another especially when the dataset
is not enough.
Another way to stratify dataset is to get insights from the dataset and
to add a new category as additional mileage. From this new attributes,
data could be stratified after categorizing numerical features.
Once data is tratified, the new category will be drop and return the
train set and testset stratified. For instance::
>>> from watex.transformers import StratifiedWithCategoryAdder
>>> stratifiedNumObj= StratifiedWithCatogoryAdder('flow')
>>> stratifiedNumObj.fit_transform(X=df)
>>> stats2 = stratifiedNumObj.statistics_
Usage
------
In this example, we firstly categorize the `flow` attribute using
the ceilvalue (see :func:`~discretizeCategoriesforStratification`)
and groupby other values greater than the ``max_category`` value to the
``max_category`` andput in the temporary features. From this features
the categorization is performed and stratified the trainset and
the test set.
Notes
------
If `base_num_feature` is not given, dataset will be stratified using
random sampling.
"""
def __init__(
self,
base_num_feature=None,
threshold_operator = 1.,
return_train=False,
max_category=3,
n_splits=1,
test_size=0.2,
random_state=42
):
self._logging= watexlog().get_watex_logger(self.__class__.__name__)
self.base_num_feature= base_num_feature
self.return_train= return_train
self.threshold_operator= threshold_operator
self.max_category = max_category
self.n_splits = n_splits
self.test_size = test_size
self.random_state = random_state
self.base_items_ =None
self.statistics_=None
[docs]
def fit(self, X, y=None):
"""
Does nothin just for scikit-learn API purpose.
"""
return self
[docs]
class StratifiedUsingBaseCategory( BaseEstimator, TransformerMixin ):
"""
Transformer to stratified dataset to have data more representativce into
the trainset and the test set especially when data is not large enough.
Arguments
----------
base_column: str or int,
Hyperparameters and can be index of the base mileage(category)
for stratifications. If `base_column` is None, will return
the purely random sampling.
test_size: float
Size to put in the test set.
random_state: shuffled number of instance in the overall dataset.
default is ``42``.
Usage
------
If data is not large enough especially relative number of attributes
if much possible to run therisk of introducing a significant sampling
biais.Therefore strafied sampling is a better way to avoid
a significant biais of sampling survey. For instance::
>>> from watex.transformers import StratifiedUsingBaseCategory
>>> from watex.utils.mlutils import load_data
>>> df = load_data('data/geo_fdata')
>>> stratifiedObj = StratifiedUsingBaseCategory(base_column='geol')
>>> stratifiedObj.fit_transform(X=df)
>>> stats= stratifiedObj.statistics_
Notes
------
An :attr:`~.statictics_` inspection attribute is good way to observe
the test set generated using purely random and the
stratified sampling. The stratified sampling has category
``base_column`` proportions almost indentical to those in the full
dataset whereas the test set generated using purely random sampling
is quite skewed.
"""
def __init__(self, base_column =None,test_size=0.2, random_state=42):
self._logging= watexlog().get_watex_logger(self.__class__.__name__)
self.base_column = base_column
self.test_size = test_size
self.random_state = random_state
#create inspection attributes
self.statistics_=None
self.base_flag_ =False
self.base_items_= None
[docs]
def fit(self, X, y=None):
""" Does nothing , just for API purpose.
"""
return self
[docs]
class CategorizeFeatures(BaseEstimator, TransformerMixin ):
""" Transform numerical features into categorical features and return
a new array transformed.
Arguments
----------
*num_columns_properties*: list
list composed ofnumerical `features name`, list of
`features boundaries` with their `categorized names`.
Notes
------
From the boundaries values including, features values can be transformed.
`num_columns_properties` is composed of:
- `feature name` or index equals to 'flow`' or index of flow ='12'
- `features boundaries` equals to ``[0., 1., 3]`` may correspond to:
- 0: features flow values with equal to 0. By default the begining
value like 0 is unranged.
- 0-1: replace values ranged between 0 and 1.
- 1-3:replace values ranged between 1-3
- >3 : get all values greater than 3. by default categorize values
greater than the last values.
If the default classification is not suitable, create your own range
values like ``[[0-1], [1-3], 3] (1)``
- `categorized names`: Be sure that if the value is provided as without
ranging like (1). The number of `categorized values` must be
the size of the `features boundaries` +1. For instance, we try to
replace all numerical values in column `flow` by ::
-FR0 : all fllow egal to 0.
-FR1: flow between 0-1
-FR2: flow between 1-3
-FR3: flow greater than 3.
As you can see the `features boundaries` [0., 1., 3]size is equal
to `categorized name`['FR0', 'FR1', 'FR2', 'FR3'] size +1.
Usage
------
Can categorize multiples features by setting each component explained
above as list of tuples. For instance we try to replace the both
numerical features `power` and `flow` in the dataframe by their
corresponding `features` boundaries. Here is how to set the
`num_columns_properties` ::
num_columns_porperties =[
('flow', ([0, 1, 3], ['FR0', 'FR1', 'FR2', 'FR3'])),
('power', ([10, 30, 100], ['pw0', 'pw1', 'pw2', 'pw4']))
]
Examples
--------
>>> from watex.transformers import CategorizeFeatures
>>> from watex.utils.mlutils import load_data
>>> df= mlfunc.load_data('data/geo_fdata')
>>> catObj = CategorizeFeatures(
num_columns_properties=num_columns_porperties )
>>> X= catObj.fit_transform(df)
>>> catObj.in_values_
>>> catObj.out_values_
"""
def __init__(self, num_columns_properties=None):
self._logging= watexlog().get_watex_logger(self.__class__.__name__)
self.num_columns_properties=num_columns_properties
self.base_columns_=None
self.in_values_ = None
self.out_values_ = None
self.base_columns_ix_=None
def fit(self, X, y=None):
"""
Parameters
----------
X: Ndarray ( M x N matrix where ``M=m-samples``, & ``N=n-features``)
Training set; Denotes data that is observed at training and
prediction time, used as independent variables in learning.
When a matrix, each sample may be represented by a feature vector,
or a vector of precomputed (dis)similarity with each training
sample. :code:`X` may also not be a matrix, and may require a
feature extractor or a pairwise metric to turn it into one before
learning a model.
y: array-like, shape (M, ) ``M=m-samples``,
train target; Denotes data that may be observed at training time
as the dependent variable in learning, but which is unavailable
at prediction time, and is usually the target of prediction.
Returns
--------
``Self`: Instanced object for methods chaining
"""
return self
def transform(self, X, y=None) :
""" Transform the data and return new array. Can straightforwardly
call :meth:`~.sklearn.TransformerMixin.fit_transform` inherited
from scikit_learn.
"""
self.base_columns_ = [n_[0] for n_ in self.num_columns_properties]
self.in_values_ = [n_[1][0] for n_ in self.num_columns_properties]
self.out_values_ = [n_[1][1] for n_ in self.num_columns_properties]
X_dtype =''
if isinstance(self.base_columns_, (list, tuple)):
self.base_columns_ =np.array(self.base_columns_)
if np.array(self.base_columns_).dtype in ['int', 'float']:
self.base_columns_.astype(np.int32)
#in the case indexes are provided
self.base_columns_ix_ = self.base_columns_
# check whether X is unique array or array_like.
try:
X.shape[1]
except IndexError:
if isinstance(X, pd.Series):
# if X.__class__.__name__ =='Series':
X= X.values
X_dtype = 'unik__'
except RuntimeError :
# handle other possible errors.
X_dtype = 'unik__'
else :
if X.shape[1]==1:
X=X.reshape((X.shape[0]),)
X_dtype ='unik__'
if X_dtype =='unik__':
X = categorize_flow(X, self.in_values_[0],
classes=self.out_values_[0] )
self.base_columns_ix_ =(0,)
return X
# now
if isinstance(X, pd.DataFrame):
X= self.ascertain_mumerical_values(X)
if self.base_columns_ix_ is not None:
for ii, ix_ in enumerate(self.base_columns_ix_):
X[:, ix_]=categorize_flow(X[:, ix_],
self.in_values_[ii],
classes=self.out_values_[ii]
)
self.base_columns_ =tuple(self.base_columns_)
self.in_values_ = tuple(self.in_values_)
self.out_values_ = tuple(self.out_values_)
self.base_columns_ix_ = tuple(self.base_columns_ix_)
return X
def ascertain_mumerical_values(self, X, y=None):
""" Retreive indexes from mumerical attributes and return a dataframe
values especially if `X` is dataframe else returns values of array."""
# ascertain dataframe whether there is an categorial values.
try:
# if isinstance(X, pd.DataFrame)
list_of_numerical_cols= X.select_dtypes(
exclude=['object']).columns.tolist()
except AttributeError:
# if 'numpy.ndarray' object has no attribute 'select_dtypes'
list_of_numerical_cols= []
t_=[]
# return X if no numeruical columns found
if len(list_of_numerical_cols) ==0 :
self._logging.info('`None`numerical columns detected.')
warnings.warn('None numerical columns detected.It seems')
return X.values
for bcol in self.base_columns_:
for dfcols in list_of_numerical_cols:
if dfcols.lower() == bcol.lower():
t_.append(dfcols)
break
if len(t_) ==0:
self._logging.info(
f'Numerical features `{self.base_columns_}`not found in'\
'`{list_of_numerical_cols}`')
return X.values
# get base columns index
self.base_columns_ix_ =[ int(X.columns.get_loc(col_n))
for col_n in self.base_columns_]
return X.values
[docs]
class CombinedAttributesAdder(BaseEstimator, TransformerMixin ):
""" Combined attributes from litteral string operators, indexes or names.
Create a new attribute using features index or litteral string operator.
Inherits from scikit_learn `BaseEstimator`and `TransformerMixin` classes.
Arguments
----------
*attribute_names* : list of str , optional
List of features for combinaison. Decide to combine new feature
values by from `operator` parameters. By default, the combinaison it
is ratio of the given attribute/numerical features. For instance,
``attribute_names=['lwi', 'ohmS']`` will divide the feature 'lwi' by
'ohmS'.
*attributes_indexes* : list of int,
index of each feature/feature for experience combinaison. User
warning should raise if any index does match the dataframe of array
columns.
*operator*: str, default ='/'
Type of operation to perform. Can be ['/', '+', '-', '*', '%']
Returns
--------
X : np.ndarray,
A new array contained the new data from the `attrs_indexes` operation.
If `attr_names` and attr_indexes is ``None``, will return the same array
like beginning.
Notes
------
A litteral string operator can be used. For instance dividing two numerical
features can be illustrated using the word "per" separated by underscore like
"_per_" For instance, to create a new feature based on the division of
the features ``lwi`` and ``ohmS``, the litteral string operator that holds
the ``attribute_names`` could be::
attribute_names='lwi_per_ohmS'
The same litteral string is valid for multiplication (_mul_) ,
substraction (_sub_) , modulo (_mod_) and addition (_add_). However,
indexes of features can also use rather than `attribute_names` providing
the `operator` parameters.
Or it could be the indexes of both features in the array like
``attributes_ix =[(10, 9)]`` which means the `lwi` and `ohmS` are
found at index ``10`` and ``9``respectively. Furthermore, multiples
operations can be set by adding mutiples litteral string operator into a
list like ``attributes_ix = [ 'power_per_magnitude', 'ohmS_per_lwi']``.
Examples
--------
>>> import pandas as pd
>>> from watex.transformers import CombinedAttributesAdder
>>> from watex.datasets.dload import load_bagoue
>>> X, y = load_bagoue (as_frame =True )
>>> cobj = CombinedAttributesAdder (attribute_names='lwi_per_ohmS')
>>> Xadded = cobj.fit_transform(X)
>>> cobj.attribute_names_
... ['num',
'name',
'east',
'north',
'power',
'magnitude',
'shape',
'type',
'sfi',
'ohmS',
'lwi',
'geol',
'lwi_div_ohmS'] # new attributes with 'lwi'/'ohmS'
>>> df0 = pd.DataFrame (Xadded, columns = cobj.attribute_names_)
>>> df0['lwi_div_ohmS']
... 0 0.0
1 0.000002
2 0.000005
3 0.000004
4 0.000008
426 0.453359
427 0.382985
428 0.476676
429 0.457371
430 0.379429
Name: lwi_div_ohmS, Length: 431, dtype: object
>>> cobj = CombinedAttributesAdder (
attribute_names=['lwi', 'ohmS', 'power'], operator='+')
>>> df0 = pd.DataFrame (cobj.fit_transform(X),
columns = cobj.attribute_names_)
>>> df0.iloc [:, -1]
... 0 1777.165142
1 1207.551531
2 850.5625
3 1051.943553
4 844.095833
426 1708.8585
427 1705.5375
428 1568.9825
429 1570.15625
430 1666.9185
Name: lwi_add_ohmS_add_power, Length: 431, dtype: object
>>> cobj = CombinedAttributesAdder (
attribute_indexes =[1,6], operator='+')
>>> df0 = pd.DataFrame (cobj.fit_transform(X),
columns = cobj.attribute_names_)
>>> df0.iloc [:, -1]
... 0 b1W
1 b2V
2 b3V
3 b4W
4 b5W
426 b427W
427 b428V
428 b429V
429 b430V
430 b431V
Name: name_add_shape, Length: 431, dtype: object
"""
_op ={'times': ('times', 'prod', 'mul', '*', 'x'),
'add': ('add', '+', 'plus'),
'div': ('quot', '/', 'div', 'per'),
'sub': ('sub', '-', 'less'),
'mod': ('mod', '%'),
}
def __init__(
self,
attribute_names =None,
attribute_indexes = None,
operator: str='/',
):
self.attribute_names=attribute_names
self.attribute_indexes= attribute_indexes
self.operator=operator
self.attribute_names_=None
[docs]
def fit(self, X, y=None ):
"""
Parameters
----------
X: Ndarray ( M x N matrix where ``M=m-samples``, & ``N=n-features``)
Training set; Denotes data that is observed at training and
prediction time, used as independent variables in learning.
When a matrix, each sample may be represented by a feature vector,
or a vector of precomputed (dis)similarity with each training
sample. :code:`X` may also not be a matrix, and may require a
feature extractor or a pairwise metric to turn it into one before
learning a model.
y: array-like, shape (M, ) ``M=m-samples``,
train target; Denotes data that may be observed at training time
as the dependent variable in learning, but which is unavailable
at prediction time, and is usually the target of prediction.
Returns
--------
self: `CombinedAttributesAdder` instance
returns ``self`` for easy method chaining.
"""
return self
def _get_operator (self, operator):
""" Get operator for combining attribute """
for k, v in self._op.items() :
for o in v:
if operator.find(o) >=0 :
self.operator = k
return self.operator
return
def _operate (self, X):
""" Operate data from indexes """
def weird_division(ix_):
""" Replace 0. value to 1 in denominator for division
calculus."""
return ix_ if ix_!=0. else 1
msg=("Unsupported operand type(s)! index provided {} doesn't match"
" any numerical features. Experience combinaison attributes"
" is not possible.")
self._isfine=True
Xc =X[:, self.attribute_indexes]
cb= Xc[:, 0 ] ; Xc=Xc[:, 1: ]
for k in range (Xc.shape[1]):
try :
if self.operator =='mod':
cb %= Xc[:, k]
if self.operator =='add':
cb += Xc[:, k]
if self.operator =='sub':
cb -= Xc[:, k]
if self.operator =='div':
# if the denominator contain nan or 0
# a weird division is triggered and replace
# the denominator by 1
try :
cb /= Xc[:, k]
except ZeroDivisionError:
wv= np.array(
list(map(weird_division, Xc[:, k])))
cb /=wv
except ( TypeError, RuntimeError, RuntimeWarning):
warnings.warn(msg.format(
self.attribute_indexes[1:][k]))
if self.operator =='x':
cb *= Xc[:, k]
except:
warnings.warn(msg.format(self.attribute_indexes[1:][k]))
self._isfine =False
X = np.c_[X, cb ] if self._isfine else X
return X
[docs]
class DataFrameSelector(BaseEstimator, TransformerMixin):
""" Select data from specific attributes for column transformer.
Select only numerical or categorial columns for operations. Work as the
same like sckit-learn `make_colum_tranformer`
Arguments
----------
*attribute_names*: list or array_like
List of the main columns to keep the data
*select_type*: str
Automatic numerical and categorial selector. If `select_type` is
``num``, only numerical values in dataframe are retrieved else
``cat`` for categorials attributes.
Returns
-------
X: ndarray
New array with composed of data of selected `attribute_names`.
Examples
---------
>>> from watex.transformers import DataFrameSelector
>>> from watex.utils.mlutils import load_data
>>> df = mlfunc.load_data('data/geo_fdata')
>>> XObj = DataFrameSelector(attribute_names=['power','magnitude','sfi'],
... select_type=None)
>>> cdf = XObj.fit_transform(df)
"""
def __init__(self, attribute_names=None, select_type =None):
self._logging= watexlog().get_watex_logger(self.__class__.__name__)
self.attribute_names = attribute_names
self.select_type = select_type
[docs]
def fit(self, X, y=None):
"""
Select the Data frame
Parameters
----------
X: Ndarray ( M x N matrix where ``M=m-samples``, & ``N=n-features``)
Training set; Denotes data that is observed at training and
prediction time, used as independent variables in learning.
When a matrix, each sample may be represented by a feature vector,
or a vector of precomputed (dis)similarity with each training
sample. :code:`X` may also not be a matrix, and may require a
feature extractor or a pairwise metric to turn it into one before
learning a model.
y: array-like, shape (M, ) ``M=m-samples``,
train target; Denotes data that may be observed at training time
as the dependent variable in learning, but which is unavailable
at prediction time, and is usually the target of prediction.
Returns
--------
self: `DataFrameSelector` instance
returns ``self`` for easy method chaining.
"""
return self
def __repr__(self):
return self.__class__.__name__
[docs]
class FrameUnion (BaseEstimator, TransformerMixin) :
""" Unified categorial and numerical features after scaling and
and categorial features encoded.
Use :class:`~watex.tranformers.DataframeSelector` class to define
the categorial features and numerical features.
Arguments
---------
num_attributes: list
List of numerical attributes
cat_attributes: list
list of categorial attributes
scale: bool
Features scaling. Default is ``True`` and use
`:class:~sklearn.preprocessing.StandarScaler`
imput_data: bool ,
Replace the missing data. Default is ``True`` and use
:attr:`~sklearn.impute.SimpleImputer.strategy`.
param_search: bool,
If `num_attributes` and `cat_attributes`are None, the numerical
features and categorial features` should be found automatically.
Default is ``True``
scale_mode:bool,
Mode of data scaling. Default is ``StandardScaler``but can be
a ``MinMaxScaler``
encode_mode: bool,
Mode of data encoding. Default is ``OrdinalEncoder`` but can be
``OneHotEncoder`` but creating a sparse matrix. Once selected,
the new shape of ``X`` should be different from the original
shape.
Example
-------
>>> from watex.datasets import fetch_data
>>> from watex.utils.transformers import FrameUnion
>>> X_= fetch_data ('Bagoue original').get('data=dfy1')
>>> frameObj = FrameUnion(X_, encoding =OneHotEncoder)
>>> X= frameObj.fit_transform(X_)
"""
def __init__(
self,
num_attributes =None ,
cat_attributes =None,
scale =True,
imput_data=True,
encode =True,
param_search ='auto',
strategy ='median',
scale_mode ='StandardScaler',
encode_mode ='OrdinalEncoder'
):
self._logging = watexlog().get_watex_logger(self.__class__.__name__)
self.num_attributes = num_attributes
self.cat_attributes = cat_attributes
self.param_search = param_search
self.imput_data = imput_data
self.strategy =strategy
self.scale = scale
self.encode = encode
self.scale_mode = scale_mode
self.encode_mode = encode_mode
self.X_=None
self.X_num_= None
self.X_cat_ =None
self.num_attributes_=None
self.cat_attributes_=None
self.attributes_=None
[docs]
def fit(self, X, y=None):
"""
Does nothing. Just for scikit-learn purpose.
"""
return self
[docs]
def featurize_X (
X,
y =None, *,
n_clusters:int=7,
target_scale:float= 5 ,
random_state:F|int=None,
n_components: int=None,
model: F =None,
split_X_y:bool = False,
test_ratio:float|str= .2 ,
shuffle:bool=True,
return_model:bool=...,
to_sparse: bool=...,
sparsity:str ='coo'
):
""" Featurize X with the cluster based on the KMeans featurization
Parameters
-----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
Training vector, where `n_samples` is the number of samples and
`n_features` is the number of features.
Note that when `n_components` is set, sparse matrix for `X` is not
acceptable.
y : array-like of shape (n_samples,)
Target vector relative to X.
n_clusters: int, default=7
Number of initial clusters
target_scale: float, default=5.0
Apply appropriate scaling and include it in the input data to k-means.
n_components: int, optional
Number of components for reduced down the predictor X. It uses the PCA
to reduce down dimension to the importance features.
model: :class:`KMeansFeaturizer`.
KMeasFeaturizer model. Model can be provided to featurize the
test data separated from the train data.
.. versionadded:: 0.2.4
random_state: int, Optional
State for shuffling the data
split_X_y: bool, default=False,
Split the X, y into train data and test data according to the test
size
test_ratio: int, default=0.2
ratio to keep for a test data.
shuffle: bool, default=True
Suffling the data set.
return_model: bool, default =False
If ``True`` return the KMeans featurization mode and the transformed X.
to_sparse: bool, default=False
Convert X data to sparse matrix, by default the sparse matrix is
coordinates matrix (COO)
sparsity:str, default='coo'
Kind of sparse matrix use to convert `X`. It can be ['csr'|'coo']. Any
other values with return a coordinates matrix unless `to_sparse` is
turned to ``False``.
.. versionadded:: 0.2.4
Returns
--------
X, y : NDArray shape (m_samples, n_features +1) or \
shape (m_samples, n_sparse_features)
Returns NDArray of m_features plus the clusters features from KMF
feturization procedures. The `n_sparse_features` is created if
`to_sparse` is set to ``True``.
X, y, model: NDarray and KMF models
Returns transformed array X and y and model if ``return_model`` is
set to ``True``.
Array like train data X transformed and test if `split_X_y` is set to
``True``.
X, Xtest, y, ytest: NDArray (KMF), ArrayLike
Split tuple is returned when `split_X_y=True``.
Note
-----
Everytimes ``return_model=True``, KMF model (:class:`KMeansFeaturizer`)
is appended to the return results.
Examples
--------
>>> import numpy as np
>>> from watex.transformers import featurize_X
>>> X = np.random.randn (12 , 7 ) ; y = np.arange(12 )
>>> y[ y < 6 ]= 0 ; y [y >0 ]= 1 # for binary data
>>> Xtransf , _ = featurize_X (X, to_sparse =False)
>>> X.shape, Xtransf.shape
((12, 7), (12, 8))
>>> Xtransf, y = featurize_X (X,y, to_sparse =True )
>>> Xtransf , y
(<12x8 sparse matrix of type '<class 'numpy.float64'>'
with 93 stored elements in COOrdinate format>,
array([0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1]))
>>> featurize_X (X,y, to_sparse =True, split_X_y=True )
(<9x8 sparse matrix of type '<class 'numpy.float64'>'
with 71 stored elements in COOrdinate format>,
<3x8 sparse matrix of type '<class 'numpy.float64'>'
with 24 stored elements in COOrdinate format>,
array([0, 1, 1, 0, 0, 0, 0, 1, 1]),
array([0, 1, 1]))
>>> *_, kmf_model = featurize_X (X,y, to_sparse =True, return_model =True)
>>> kmf_model
<'KMeansFeaturizer':n_clusters=7, target_scale=5, random_state=None,
n_components=None>
"""
# set False to value use
# ellipsis...
if return_model is ...:
return_model =False
if to_sparse is ...:
to_sparse =False
# if sparse convert X to sparse matrix
if to_sparse:
sparsity= str(sparsity).lower().strip()
d_sparsity = dict ( csr = sparse.csr_matrix ,
coo= sparse.coo_matrix )
sparse_func = sparse.coo_matrix if sparsity not in (
'coo', 'csr') else d_sparsity.get (sparsity )
# reduce down feature to two.
kmf_data = []
if n_components:
from watex.analysis import nPCA
X =nPCA (X, n_components = n_components )
if split_X_y:
X, test_data , y, y_test = train_test_split (
X, y ,test_size = assert_ratio(test_ratio) ,
random_state = random_state ,
shuffle =shuffle)
# create a kmeaturization with hint model
if model:
if get_estimator_name(model ) !='KMeansFeaturizer':
raise EstimatorError(
"Wrong model estimator. Expect 'KMeansFeaturizer'"
f" as the valid estimator. Got {get_estimator_name (model)!r}")
if callable ( model ):
model = model (n_clusters=n_clusters,
target_scale=target_scale,
random_state = random_state)
else:
model = KMeansFeaturizer(
n_clusters=n_clusters,
target_scale=target_scale,
random_state = random_state,
).fit(X,y)
### Use the k-means featurizer to generate cluster features
transf_cluster = model.transform(X)
### Form new input features with cluster features
# training_with_cluster
Xkmf = np.concatenate (
(X, transf_cluster), axis =1 )
if to_sparse:
Xkmf= sparse_func(Xkmf )
kmf_data.append(Xkmf)
kmf_data.append(y)
if split_X_y:
transf_test_cluster= model.transform(test_data)
test_with_cluster = np.concatenate (
(test_data, transf_test_cluster),axis =1 )
if sparse:
test_with_cluster= sparse_func(test_with_cluster)
kmf_data.insert(1,test_with_cluster )
kmf_data.append( y_test)
return tuple (kmf_data ) + (model, ) \
if return_model else tuple(kmf_data )