Source code for watex.decorators

# -*- coding: utf-8 -*-
#   License: BSD-3-Clause
#   Author: LKouadio alias @Daniel <etanoyau@gmail.com>

from __future__ import print_function 
import functools
import inspect
import os
import sys
import copy 
import shutil
import warnings
import datetime 
from contextlib import contextmanager
import numpy as np
import pandas as pd
import  matplotlib.pyplot as plt 
import matplotlib as mpl 
import matplotlib.cm as cm 
import matplotlib.colorbar as mplcb

from ._typing import (
    Iterable,
    Optional,
    Callable,
    T,
    F
)
from ._watexlog import watexlog
_logger = watexlog.get_watex_logger(__name__)

__docformat__='restructuredtext'

[docs] class export_data: def __init__( self, type ='frame', encoding:str='utf8', **kws ): self.type = type self.encoding=encoding for key in list(kws.keys()): setattr(self, key, kws[key]) def __call__(self, func ): self._func = func @functools.wraps(self._func) def wrapper_func ( *args, **kwds): """ Decorated function for writting data.""" from .utils.funcutils import move_cfile dfs,fname, file_format, savepath, nameof, kws = self._func ( *args, **kwds) #if format is None # export to csv file_format = file_format or '.csv' # check wether extension is included in the # filename then update the file_format: name, ex = os.path.splitext ( fname ) if ex !="" and "." in str(ex).lower(): file_format = str(ex).lower() fname = fname or name if str(self.type).lower() =='frame': writerfunc = self.out_frame else: writerfunc= self.out_others fnames = writerfunc ( dfs, fname, file_format, savepath , nameof, **kws ) for fname in fnames: move_cfile (fname, savepath , dpath ='_out') return wrapper_func
[docs] def out_frame ( self, dfs , fname, file_format, savepath, nameof=None, **kwds ): """Export dataframes. Parameters ----------- dfs: dp.DataFrame or list Dataframe or list of dataframes fname: str, name of the file to exported file_format: str, format of expected file to write. nameof: list list of data to write. It is used to rename filename if not given. However, It is used for sheet naming savepath: str, Path to directory to save file. Default is current directory """ from .property import Config from .utils.funcutils import key_search # check whether data is on list or tuples if not isinstance ( dfs, ( list, tuple)): dfs =[dfs] # check whether all data are on frame. # If not remove then dfs = list( filter ( lambda df: hasattr (df, "__array__") and hasattr(df, 'columns'), dfs )) if len(dfs)==0: raise TypeError("NoneType cannot be written.") valid_formats = list( Config.writers(pd.DataFrame ).keys()) file_format = key_search(file_format, default_keys= valid_formats, parse_keys=False, raise_exception= True )[0] # if multiple formats are given fnames=[] if file_format==".xlsx": fname +='.xlsx' with pd.ExcelWriter( fname) as writer : for df, name in zip (dfs, nameof): df.to_excel(writer, sheet_name =name, **kwds) fnames.append(fname) else: for df, name in zip (dfs, nameof): # append iteration number to # differentiate file namep = ( f".{name}{file_format}" if len(dfs)> 1 else f"{file_format}") fname +=namep Config.writers(df ).get(file_format )(fname, **kwds) # append filename and remove # older name. fnames.append (fname) fname= fname.replace (namep , "") return fnames
[docs] def out_others ( self, dfs, fname, file_format, savepath , nameof=None, **kwds ): """Export others files. Can be text or any other types of sequences format. """ fnames=[] file_format = str(file_format).replace (".", "") with open(fname, fname + f'.{file_format}', mode ='w' , encoding=self.encoding) as f: for df in dfs: if hasattr ('__array__'): f.writelines(df) else: f.write ( df) fnames.append(fname + f".{file_format}") return fnames
export_data.__doc__="""\ Decorator to export data into different kind of format. When the type is set to ``frame`` the file format should be mentionned into the wrapper function so to export accordingly. Parameters -------------- type: str, {"frame", "text"} kind of data to export encoding: str, optional A string representing the encoding to use in the output file, defaults to "utf-8". encoding is not supported if path_or_buf is a non-binary file object. """
[docs] class temp2d: """ Two dimensional plot template Parameters ---------- reason: str, Any Does nothing. But if supplied, it should be the purpose of the plot. Note ------ For customizing the plot, `_temp2d` uses at the last parameter of the function to be decorated, the plotting arguments from :class:`watex.property.BasePlot` parameters. If not given, an atttribute errors will raise. """ def __init__(self, reason =None, **kws): self.reason=reason def __call__(self, func) : self._func =func @functools.wraps(self._func ) def new_func (*args, **kwargs ): _args = self. _func (*args, **kwargs) base_plot_kws = _args[-1] for key in base_plot_kws.keys () : # if attribute exist arase it # if ( key in self.__dict__.keys() # and base_plot_kws[key] is not None # ): # self.__dict__[key] = base_plot_kws[key] # else: setattr (self, key , base_plot_kws[key] ) return self.plot2d(*_args[:-1] ) return new_func def __getattr__(self, name): msg = ("{0!r} has no attribute {1!r}. Note that {0!r} uses the" " plot arguments from `watex.property.BasePlot`. Plot arguments" " must be supplied as a keyword argument at the last parameters" " i.e the last value of return (output) of the function to be" " decorated." ) raise AttributeError (msg.format(self.__class__.__name__, name))
[docs] def plot2d (self, arr2d, y=None , x=None,posix =None) : """ Template for 2D plot. Basically if use the stations and positions as `xlabel` and `positions` i explicitly both are not supplied. Parameters ------------ arr2d : ndarray , shape (N, M) 2D array for plotting. For instance, it can be a 2D resistivity collected at all stations (N) and all frequency (M) y: array-like Y-coordinates. It should have the length N, the same of the ``arr2d``. the rows of the ``arr2d``. x: array-like X-coordinates. It should have the length M, the same of the ``arr2d``; the columns of the 2D dimensional array. Note that if `x` is given, the `distance is not needed. posix: list of str List of stations names. If given, it should have the same length of the columns M, of `arr2d`` Returns ------- axe: Matplotllib axis """ def _format_ticks (value, tick_number, fmt ='S{:02}', nskip =3 ): """ Format thick parameter with 'FuncFormatter(func)' rather than using `axi.xaxis.set_major_locator (plt.MaxNLocator(3))` ax.xaxis.set_major_formatter (plt.FuncFormatter(format_thicks)) :param value: tick range values for formatting :param tick_number: number of ticks to format :param fmt: str, default='S{:02}', kind of tick formatage :param nskip: int, default =7, number of tick to skip """ if value % nskip==0: return fmt.format(int(value)+ 1) else: None fig, axe = plt.subplots( 1, figsize = self.fig_size, num = self.fig_num, dpi = self.fig_dpi, ) cmap = plt.get_cmap( self.cmap) if self.plt_style =='pcolormesh': X, Y = np.meshgrid (x, y) axr = axe.pcolormesh ( X, Y, arr2d, # for consistency check whether array does not # contain any NaN values vmax = arr2d[ ~np.isnan(arr2d)].max(), vmin = arr2d[ ~np.isnan(arr2d)].min(), shading= 'gouraud', cmap =cmap, ) if self.plt_style =='imshow': axr= axe.imshow (arr2d, interpolation = self.imshow_interp, cmap =cmap, aspect = self.fig_aspect , origin= 'upper', extent=( x[~np.isnan(x)].min(), x[~np.isnan(x)].max(), y[~np.isnan(y)].min(), y[~np.isnan(y)].max()) ) axe.set_ylim(y[~np.isnan(y)].min(), y[~np.isnan(y)].max()) axe.set_xlabel(self.xlabel or 'Distance(m)', fontdict ={ 'size': self.font_size , 'weight': self.font_weight} ) axe.set_ylabel(self.ylabel or 'log10(Frequency)[Hz]', fontdict ={'size': self.font_size , 'weight': self.font_weight}) axe.tick_params (axis ='both', labelsize = self.font_size ) if self.show_grid: axe.minorticks_on() # axe.grid(color='k', ls=':', lw =0.25, alpha=0.7, # which ='major') axe.grid( color= self.gc, ls=self.gls, lw =self.glw, alpha=self.galpha, which =self.gwhich ) labex , cf = self.cb_label or '$log10(App.Res)[Ω.m]$', axr cb = fig.colorbar(cf , ax= axe) cb.ax.yaxis.tick_left() cb.ax.tick_params(axis='y', direction='in', pad=2., labelsize = self.font_size ) cb.set_label(labex,fontdict={'size': 1.5 * self.font_size , 'style':self.font_style}) #--> set second axis axe2 = axe.twiny() axe2.set_xticks(range(len(x)), minor=False, fontsize = self.font_size ) #axe2.set_xticks(range(len(x)),minor=False ) if len(x ) >= 12 : axe2.xaxis.set_major_formatter (plt.FuncFormatter(_format_ticks)) else : axe2.set_xticklabels(posix, rotation=self.rotate_xlabel, fontsize = self.font_size ) axe2.set_xlabel('Stations', fontdict ={'style': self.font_style, 'size': 1.5 * self.font_size , 'weight': self.font_weight}, ) fig.suptitle(self.fig_title, ha='left', fontsize= 15* self.fs, verticalalignment='center', style =self.font_style, bbox =dict(boxstyle='round', facecolor ='moccasin')) plt.tight_layout() if self.savefig is not None : fig.savefig(self.savefig, dpi = self.fig_dpi, orientation =self.orient) plt.show() if self.savefig is None else plt.close(fig=fig) return axe
[docs] class donothing : """ Decorator to do nothing. Just return the func as it was. The `param` reason is just used to specify the skipping reason. """ def __init__(self, reason = None ): self.reason = reason def __call__(self, cls_or_func) : @functools.wraps (cls_or_func) def new_func (*args, **kwargs): return cls_or_func (*args, **kwargs) return new_func
[docs] class refAppender (object): """ Append the module docstring with reStructured Text references. Indeed, when a `func` is decorated, it will add the reStructured Text references as an appender to its reference docstring. So, sphinx can auto-retrieve some replacing values found inline from the :doc:`watex.documentation`. Parameters ---------- docref: str Reference of the documentation for appending. .. |VES| replace:: Vertical Electrical Sounding .. |ERP| replace:: Electrical Resistivity Profiling Examples --------- >>> from watex.documentation import __doc__ >>> from watex.tools import decorators >>> def donothing (): ''' Im here to just replace the `|VES|` and `|RES|` values by their real meanings.''' pass >>> decorated_donothing = decorators.refAppender(__doc__)(donothing) >>> decorated_donothing.__doc__ ... #new doctring appended and `|VES|` and `|ERP|` are replaced by ... #Vertical Electrical Sounding and Electrical resistivity profiling ... #during compilation in ReadTheDocs. """ def __init__(self, docref= None ): self.docref = docref def __call__(self, cls_or_func): return self.nfunc (cls_or_func)
[docs] def nfunc (self, f): f.__doc__ += "\n" + self.docref or '' setattr(f , '__doc__', f.__doc__) return f
[docs] class deprecated(object): """ Used to mark functions, methods and classes deprecated, and prints warning message when it called decorators based on https://stackoverflow.com/a/40301488 . Author: YingzhiGou Date: 20/06/2017 """ def __init__(self, reason): # pragma: no cover if inspect.isclass(reason) or inspect.isfunction(reason): raise TypeError("Reason for deprecation must be supplied") self.reason = reason def __call__(self, cls_or_func): # pragma: no cover if inspect.isfunction(cls_or_func): if hasattr(cls_or_func, 'func_code'): _code = cls_or_func.__code__ else: _code = cls_or_func.__code__ fmt = "Call to deprecated function or method {name} ({reason})." filename = _code.co_filename lineno = _code.co_firstlineno + 1 elif inspect.isclass(cls_or_func): fmt = "Call to deprecated class {name} ({reason})." filename = cls_or_func.__module__ lineno = 1 else: raise TypeError(type(cls_or_func)) msg = fmt.format(name=cls_or_func.__name__, reason=self.reason) @functools.wraps(cls_or_func) def new_func(*args, **kwargs): # pragma: no cover import warnings warnings.simplefilter('always', DeprecationWarning) # turn off filter warnings.warn_explicit(msg, category=DeprecationWarning, filename=filename, lineno=lineno) warnings.simplefilter('default', DeprecationWarning) # reset filter return cls_or_func(*args, **kwargs) return new_func
[docs] class gdal_data_check(object): _has_checked = False _gdal_data_found = False _gdal_data_variable_resources = 'https://trac.osgeo.org/gdal/wiki/FAQInstallationAndBuilding#HowtosetGDAL_DATAvariable ' _gdal_wheel_resources ='https://www.lfd.uci.edu/~gohlke/pythonlibs/#gdal' _gdal_installation_guide = 'https://opensourceoptions.com/blog/how-to-install-gdal-for-python-with-pip-on-windows/' def __init__(self, func, raise_error=False, verbose = 0): """ The decorator should only be used for the function that requires gdal and gdal-data correctly. GDAL standas for Geospatial Data Abstraction Library. It is a translator library for raster geospatial data formats. Its distribution includes a complete GDAL installation It will check whether the GDAL_DATA is set and the path in GDAL_DATA exists. If GDAL_DATA is not set, then try to use external program "gdal-config --datadir" to findout where the data files are installed. If failed to find the data file, then ImportError will be raised. :param func: function to be decorated """ self._func = func self.verbose= verbose if not self._has_checked: self._gdal_data_found = self._check_gdal_data() self._has_checked = True if not self._gdal_data_found: if(raise_error): raise ImportError( "GDAL is NOT installed correctly. " f"GDAL wheel can be downloaded from {self._gdal_wheel_resources}" " and use `pip install <path-to-wheel-file.whl>`" "for installing. Get more details here: " f" {self._gdal_installation_guide}." ) else: pass def __call__(self, *args, **kwargs): # pragma: no cover return self._func(*args, **kwargs) def _check_gdal_data(self): if 'GDAL_DATA' not in os.environ: # gdal data not defined, try to define from subprocess import Popen, PIPE if self.verbose : _logger.warning("GDAL_DATA environment variable is not set " f" Please see {self._gdal_data_variable_resources}") try: # try to find out gdal_data path using gdal-config if self.verbose: _logger.info("Trying to find gdal-data path ...") process = Popen(['gdal-config', '--datadir'], stdout=PIPE) (output, err) = process.communicate() exit_code = process.wait() output = output.strip() if exit_code == 0 and os.path.exists(output): os.environ['GDAL_DATA'] = output _logger.info("Found gdal-data path: {}".format(output)) return True else: _logger.error( "\tCannot find gdal-data path. Please find the" " gdal-data path of your installation and set it to" "\"GDAL_DATA\" environment variable. Please see " f"{self._gdal_data_variable_resources} for " "more information.") return False except Exception: return False else: if os.path.exists(os.environ['GDAL_DATA']): if self.verbose: _logger.info("GDAL_DATA is set to: {}". format(os.environ['GDAL_DATA'])) try: from .utils._dependency import import_optional_dependency import_optional_dependency ('osgeo') # from osgeo import osr # from osgeo.ogr import OGRERR_NONE except: # if failed to import GDAl return False return True else: if self.verbose: _logger.error("GDAL_DATA is set to: {}," " but the path does not exist.". format(os.environ['GDAL_DATA'])) return False
[docs] class redirect_cls_or_func(object) : """Used to redirected functions or classes. Deprecated functions or class can call others use functions or classes. Use new function or class to replace old function method or class with multiple parameters. Author: LKouadio~@Daniel03 Date: 18/10/2020 """ def __init__(self, *args, **kwargs) : """ self.new_func_or_cls is just a message of deprecating warning . It could be a name of new function to let user tracking its code everytime he needs . """ self._reason=[func_or_reason for func_or_reason in args if type(func_or_reason)==str][0] if self._reason is None : raise TypeError(" Redirected reason must be supplied") self._new_func_or_cls = [func_or_reason for func_or_reason in args if type(func_or_reason)!=str][0] if self._new_func_or_cls is None: raise Exception( " At least one argument must be a func_method_or class." "\but it's %s."%type(self._new_func_or_cls)) _logger.warn("\t first input argument argument must" " be a func_method_or class." "\but it's %s."%type(self._new_func_or_cls)) def __call__(self, cls_or_func) : #pragma :no cover if inspect.isfunction(self._new_func_or_cls) : if hasattr(self._new_func_or_cls, 'func_code'): _code =self._new_func_or_cls.__code__ lineno=_code.co_firstlineno+1 else : # do it once the method is decorated method like staticmethods try: _code =self._new_func_or_cls.__code__ except : pass lineno=self._new_func_or_cls.__code__.co_firstlineno fmt="redirected decorated func/methods .<{reason}> "\ "see line {lineno}." elif inspect.isclass(self._new_func_or_cls): _code=self._new_func_or_cls.__module__ # filename=os.path.basename(_code.co_filename) lineno= 1 fmt="redirected decorated class :<{reason}> "\ "see line {lineno}." else : # lineno=cls_or_func.__code__.co_firstlineno lineno= inspect.getframeinfo(inspect.currentframe())[1] fmt="redirected decorated method :<{reason}> "\ "see line {lineno}." msg=fmt.format(reason = self._reason, lineno=lineno) # print(msg) _logger.info(msg) #count variables : func.__code__.co_argscounts #find variables in function : func.__code__.co_varnames @functools.wraps(cls_or_func) def new_func (*args, **kwargs): return cls_or_func(*args, **kwargs) return self._new_func_or_cls
[docs] class writef2(object): """ Used to redirected functions or classes. Deprecated functions or class can call others use functions or classes. Decorate function or class to replace old function method or class with multiple parameters and export files into many other format. `.xlsx` , `.csv` or regular format. Decorator mainly focus to export data to other files. Exported file can `regular` file or excel sheets. :param reason: Explain the "What to do?". Can be `write` or `convert`. :param from_: Can be ``df`` or ``regular``. If ``df``, `func` is called and collect its input arguments and write to appropriate extension. If `from_`is ``regular``, Can be a simple data put on list of string ready to output file into other format. :type from_: str ``df`` or ``regular`` :param to_: Exported file extension. Can be excel sheeet (`.xlsx`, `csv`) or other kind of format. :param savepath: Give the path to save the new file written. *Author: LKouadio ~ @Daniel03* *Date: 09/07/2021* """ def __init__( self, reason:Optional[str]=None, from_:Optional[str]=None, to:Optional[str]=None, savepath:Optional[str] =None, **kws ): self._logging =watexlog().get_watex_logger(self.__class__.__name__) self.reason = reason self.from_=from_ self.to= to self.refout =kws.pop('refout', None) self.writedfIndex =kws.pop('writeindex', False) self.savepath =savepath for key in list(kws.keys()): setattr(self, key, kws[key]) def __call__(self, func): """ Call function and return new function decorated""" @functools.wraps(func) def decorated_func(*args, **kwargs): """ New decorated function and holds `func` args and kwargs arguments. :params args: positional arguments of `func` :param kwargs: keywords arguments of `func`. """ self._logging.info('Func <{}> decorated !'.format(func.__name__)) cfw = 0 # write file type for addf in ['savepath', 'filename']: if not hasattr(self, addf): setattr(self, addf, None) erp_time = '{0}_{1}'.format(datetime.datetime.now().date(), datetime.datetime.now().time()) if self.refout is None : self.refout = 'w-{0}'.format( erp_time ) if self.reason is None : print('--> No reason is set. What do you want to do?' ' `write` file or `convert` file into other format?') return func(*args, **kwargs) if self.reason is not None : if self.reason.lower().find('write')>=0 : cfw = 1 if self.from_=='df': self.df , to_, refout_, savepath_, windex = func(*args, **kwargs) fromdf =True self.writedfIndex = windex if fromdf and cfw ==1 : if to_ is not None : self.to= '.'+ to_.replace('.','') else: self.to = '.csv' if refout_ is not None : self.refout =refout_ self.refout = self.refout.replace(':','-') + self.to if savepath_ is not None: self.savepath =savepath_ if self.to =='.csv': self.df.to_csv(self.refout, header=True, index =self.writedfIndex) elif self.to =='.xlsx': self.df.to_excel(self.refout , sheet_name='{0}'.format( self.refout[: int(len(self.refout)/2)]), index=self.writedfIndex) # savepath generatedfile = '_watex{}_'.format( datetime.datetime.now().time()).replace(':', '.') if self.savepath is None : self.savepath = savepath_(generatedfile) if self.savepath is not None : if not os.path.isdir(self.savepath): self.savepath = savepath_(generatedfile) try : shutil.move(os.path.join(os.getcwd(),self.refout) , os.path.join(self.savepath , self.refout)) except : self.logging.debug("We don't find any path to save file.") else: print( '--> reference output file <{0}> is well exported to {1}'. format(self.refout, self.savepath)) return func(*args, **kwargs) return decorated_func
[docs] class writef(object): """ Used to redirected functions or classes. Deprecated functions or class can call others use functions or classes. Decorate function or class to replace old function method or class with multiple parameters and export files into many other format. `.xlsx` , `.csv` or regular format. Decorator mainly focus to export data to other files. Exported file can `regular` file or excel sheets. :param reason: Explain the "What to do?". Can be `write` or `convert`. :param from_: Can be ``df`` or ``regular``. If ``df``, `func` is called and collect its input argguments and write to appropriate extension. If `from_`is ``regular``, Can be a simple data put on list of string ready to output file into other format. :type from_: str ``df`` or ``regular`` :param to_: Exported file extension. Can be excel sheeet (`.xlsx`, `csv`) or other kind of format. :param savepath: Give the path to save the new file written. *Author: LKouadio ~ @Daniel03* *Date: 09/07/2021* """ def __init__( self, reason:Optional[str]=None, from_:Optional[str]=None, to:Optional[str]=None, savepath:Optional[str] =None, **kws ): self._logging =watexlog().get_watex_logger(self.__class__.__name__) self.reason = reason self.from_=from_ self.to= to self.refout =kws.pop('refout', None) self.writedfIndex =kws.pop('writeindex', False) self.savepath =savepath for key in list(kws.keys()): setattr(self, key, kws[key]) def __call__(self, func): """ Call function and return new function decorated""" @functools.wraps(func) def decorated_func(*args, **kwargs): """ New decorated function and holds `func` args and kwargs arguments. :params args: positional arguments of `func` :param kwargs: keywords arguments of `func`. """ self._logging.info('Func <{}> decorated !'.format(func.__name__)) cfw = 0 # write file type for addf in ['savepath', 'filename']: if not hasattr(self, addf): setattr(self, addf, None) erp_time = '{0}_{1}'.format(datetime.datetime.now().date(), datetime.datetime.now().time()) if self.refout is None : self.refout = 'w-{0}'.format( erp_time ) if self.reason is None : print('--> No reason is set. What do you want to do?' ' `write` file or `convert` file into other format?') return func(*args, **kwargs) if self.reason is not None : if self.reason.lower().find('write')>=0 : cfw = 1 if self.from_=='df': self.df , to_, refout_, savepath_, windex = func(*args, **kwargs) fromdf =True self.writedfIndex = windex if fromdf and cfw ==1 : if to_ is not None : self.to= '.'+ to_.replace('.','') else: self.to = '.csv' if refout_ is not None : self.refout =refout_ self.refout = self.refout.replace(':','-') + self.to if savepath_ is not None: self.savepath =savepath_ if self.to =='.csv': self.df.to_csv(self.refout, header=True, index =self.writedfIndex) elif self.to =='.xlsx': self.df.to_excel(self.refout , sheet_name='{0}'.format( self.refout[: int(len(self.refout)/2)]), index=self.writedfIndex) # savepath generatedfile = '_watex{}_'.format( datetime.datetime.now().time()).replace(':', '.') if self.savepath is None : self.savepath = savepath_(generatedfile) if self.savepath is not None : if not os.path.isdir(self.savepath): self.savepath = savepath_(generatedfile) try : shutil.move(os.path.join(os.getcwd(),self.refout) , os.path.join(self.savepath , self.refout)) except : self.logging.debug("We don't find any path to save file.") else: print( '--> reference output file <{0}> is well exported to {1}'. format(self.refout, self.savepath)) return func(*args, **kwargs) return decorated_func
[docs] @deprecated('Replaced by :class:`watex.utils.decorators.catmapflow2`') def catmapflow(cat_classes: Iterable[str]=['FR0', 'FR1', 'FR2', 'FR3', 'FR4']): """ Decorator function collected from the `func`the `target_values` to be categorized and the `cat_range_values` to change into `cat_classes` like:: cat_range_values= [0.0, [0.0, 3.0], [3.0, 6.0], [6.0, 10.0], 10.0] target_values =[1, 2., 3., 6., 7., 9., 15., 25, ...] Decorated Fonction returns the new function decorated holding values categorized into categorial `cat_classes`. For instance in groundwater exploration:: - FR0 --> `flow` is equal to ``0.``m3/h - FR1 --> `flow` is ``0 < FR ≤ 3`` m3/h - FR2 --> `flow` is ``3 < FR ≤ 6`` m3/h - FR3 --> `flow` is ``6 < FR ≤ 10`` m3/h - FR4 --> `flow` is ``10.+`` in m3/h :return: Iterable object with new categorized values converted into `cat_classes`. Author: LKouadio ~ @Daniel03 Date: 13/07/2021 """ def categorized_dec(func): """ Decorator can be adapted to other categorized problem by changing the `cat_classes` arguments to another categorized classes for other purposes like :: cat_classes=['dry', 'HV', 'IHV', 'IVH+', 'UH'] Where ``IVHU`` means I:improved V:village H:hydraulic and U:urban. :Note: If `func` to be decorated contains ` cat_classes` arguments, the `cat_classes` argument should be erased by the given from `func`. """ @functools.wraps(func) def wrapper(*args, **kwargs): """ Function deals with the categorized flow values. :param args: positional argumnent of `func` :param kwargs: Optional argument of `func` :return `new_target_array`: Iterable object categorized. """ cat_range_values, target_array, catfc = func(*args, **kwargs) if catfc is not None : cat_classes = catfc # else: # cat_classes: Iterable[str]=['FR0', 'FR1', 'FR2', 'FR3', 'FR4'] def mapf(crval, nfval=cat_range_values , fc=cat_classes): """ Categorizing loop to hold the convenient classes according to the `cat_range_value` provided. Come as supplement tools when ``maping`` object doesnt work properly. :param crval: value to be categorized :param nfval: array of `cat_range_values` :param fc: Object to replace the`crval` belonging to `cat_classes` """ for ii, val in enumerate(nfval): try : if isinstance(val, (float, int)): if crval ==nfval[0]: return fc[0] elif crval>= nfval[-1] : return fc[-1] elif isinstance(val, (list, tuple)): if len(val)>1: if val[0] < crval <= val[-1] : return fc[ii] except : if crval ==0.: return fc[0] elif crval>= nfval[-1] : return fc[-1] if len(cat_range_values) != len(cat_classes): _logger.error( 'Length of `cat_range_values` and `cat_classes` provided ' 'must be the same length not ``{0}`` and' ' ``{1}`` respectively.'.format(len(cat_range_values), len(cat_classes))) try : new_target_array = np.array(list(map( mapf, target_array))) # new_target_array = np.apply_along_axis( # lambda if_: mapf(crval= if_),0, target_array) except : new_target_array = np.zeros_like(target_array) for ii, ff in enumerate(target_array) : new_target_array[ii] = mapf(crval=ff, nfval=cat_range_values, fc=cat_classes) return new_target_array return wrapper return categorized_dec
[docs] class visualize_valearn_curve : """ Decorator to visualize the validation curve and learning curve Once called, will quick plot the `validation curve`. Quick plot the validation curve :param reason: what_going_there? validation cure or learning curve. - ``val`` for validation curve -``learn`` for learning curve :param turn: Continue the plotting or switch off the plot and return the function. default is `off` else `on`. :param kwargs: Could be the keywords arguments for `matplotlib.pyplot` library:: train_kws={c:'r', s:10, marker:'s', alpha :0.5} val_kws= {c:'blue', s:10, marker:'h', alpha :1} """ def __init__(self, reason ='valcurve', turn ='off', **kwargs): self.reason =reason self.turn =turn self.fig_size =kwargs.pop('fig_size', (8,6)) self.font_size =kwargs.pop('font_size', 18.) self.plotStyle =kwargs.pop('plot_style', 'scatter') self.train_kws=kwargs.pop('train_kws',{'c':'r', 'marker':'s', 'alpha' :0.5, 'label':'Training curve'}) self.val_kws= kwargs.pop('val_kws', {'c':'blue', 'marker':'h','alpha' :1, 'label':'Validation curve' }) self.k = kwargs.pop('k', np.arange(1, 220, 20)) self.xlabel =kwargs.pop('xlabel', {'xlabel':'Evaluation of parameter', 'fontsize': self.font_size} ) self.ylabel =kwargs.pop('ylabel', {'ylabel':'Performance in %', 'fontsize': self.font_size} ) self.savefig =kwargs.pop('savefig', None) self.grid_kws = kwargs.pop('grid_kws', { 'galpha' :0.2, # grid alpha 'glw':.5, # grid line width 'gwhich' :'major', # minor ticks }) self.show_grid = kwargs.pop('show_grid', False) self.error_plot =False self.scatterplot = True self.lineplot =False if self.plotStyle.lower()=='both': self.lineplot = True elif self.plotStyle.lower().find('line')>=0 : self.lineplot = True self.scatterplot =False elif self.plotStyle =='scatter': self.scatterplot =True for key in kwargs.keys(): setattr(self, key, kwargs[key]) def __call__(self, func): """ Call function and decorate `validation curve`""" @functools.wraps(func) def viz_val_decorated(*args, **kwargs): """ Decorated function for vizualization """ if self.reason.lower().find('val')>=0: self.reason ='val' train_score , val_score, switch, param_range,\ pname, val_kws, train_kws =func(*args, **kwargs) elif self.reason.lower().find('learn')>=0: self.reason ='learn' param_range, train_score , val_score, switch,\ pname, val_kws, train_kws=func(*args, **kwargs) if val_kws is not None : self.val_kws =val_kws if train_kws is not None: self.train_kws = train_kws # add the name of parameters. if pname !='': self.xlabel = {'xlabel':'Evaluation of parameter %s'%pname , 'fontsize': self.font_size} if switch is not None : self.turn =switch # if param_range is not None : # k= param_range plt.figure(figsize=self.fig_size) if self.turn in ['on', 1, True]: # if not isinstance(param_range, bool): self._plot_train_val_score(train_score, val_score, k=param_range ) if self.savefig is not None: if isinstance(self.savefig, dict): plt.savefig(**self.savefig) else : plt.savefig(self.savefig) # initialize the trainscore_dict train_score=dict() return func(*args, **kwargs) return viz_val_decorated def _plot_train_val_score (self, train_score, val_score, k): """ loop to plot the train val score""" if not isinstance(train_score, dict): train_score={'_':train_score} val_score = {'_': val_score} for trainkey, trainval in train_score.items(): if self.reason !='learn': trainval*=100 val_score[trainkey] *=100 try: if self.scatterplot: plt.scatter(k, val_score[trainkey].mean(axis=1), **self.val_kws ) plt.scatter(k, trainval.mean(axis=1) , **self.train_kws ) except : # if exception occurs maybe from matplotlib properties # then run the line plot plt.plot(k, val_score[trainkey].mean(axis=1), **self.val_kws ) plt.plot(k, trainval.mean(axis=1), **self.train_kws ) try : if self.lineplot : plt.plot(k, val_score[trainkey].mean(axis=1), **self.val_kws ) plt.plot(k, trainval.mean(axis=1), **self.train_kws ) except : plt.scatter(k, val_score[trainkey].mean(axis=1), **self.val_kws ) plt.scatter(k, trainval.mean(axis=1) , **self.train_kws ) if isinstance(self.xlabel, dict): plt.xlabel(**self.xlabel) else : plt.xlabel(self.xlabel) if isinstance(self.ylabel, dict): plt.ylabel(**self.ylabel) else : plt.ylabel(self.ylabel) plt.tick_params(axis='both', labelsize= self.font_size ) if self.show_grid is True: plt.grid(self.show_grid, **self.grid_kws ) plt.legend() plt.show()
[docs] class predplot: """ Decorator to plot the prediction. Once called, will quick plot the `prediction`. Quick plot the prediction model. Can be customize using the multiples keywargs arguments. :param turn: Continue the plotting or switch off the plot and return the function. default is `off` else `on`. :param kws: Could be the keywords arguments for `matplotlib.pyplot` library Author: LKouadio alias @Daniel Date: 23/07/2021 """ def __init__(self, turn='off', **kws): self.turn =turn self.fig_size =kws.pop('fig_size', (16,8)) self.yPred_kws = kws.pop('ypred_kws',{'c':'r', 's':200, 'alpha' :1, 'label':'Predicted flow:y_pred'}) self.yObs_kws = kws.pop('ypred_kws',{'c':'blue', 's':100, 'alpha' :0.8, 'label':'Observed flow:y_true'}) self.tick_params =kws.pop('tick_params', {'axis':'x','labelsize':10, 'labelrotation':90}) self.xlab = kws.pop('xlabel', {'xlabel':'Boreholes tested'}) self.ylab= kws.pop('ylabel', {'ylabel':'Flow rates(FR) classes'}) self.obs_line=kws.pop('ObsLine', None) self.l_kws=kws.pop('l_', {'c':'blue', 'ls':'--', 'lw':1, 'alpha':0.5}) self.savefig =kws.pop('savefig', None) if self.obs_line is None : self.obs_line = ('off', 'Obs') def __call__(self, func:Callable[..., T]): """ Call the function to be decorated """ @functools.wraps(func) def pred_decorated(*args, **kwargs): """Function to be decorated""" y_true, y_pred, switch = func(*args, **kwargs) if switch is None : self.turn ='off' if switch is not None : self.turn =switch if self.turn == ('on' or True or 1): plt.figure(figsize=self.fig_size) plt.scatter(y_pred.index, y_pred,**self.yPred_kws ) plt.scatter(y_true.index,y_true, **self.yObs_kws ) if self.obs_line[0] == ('on' or True or 1): if self.obs_line[1].lower().find('true') >=0 or\ self.obs_line[1].lower()=='obs': plt.plot(y_true, **self.l_kws) elif self.obs_line[1].lower().find('pred') >=0: plt.plot(y_pred, **self.l_kws) # plt.xticks(rotation = 'vertical') plt.tick_params(**self.tick_params) plt.xlabel(**self.xlab) plt.ylabel(**self.ylab) plt.legend() if self.savefig is not None: if isinstance(self.savefig, dict): plt.savefig(**self.savefig) else : plt.savefig(self.savefig) return func(*args, **kwargs) return pred_decorated
[docs] class pfi: """ Decorator to plot Permutation future importance. Can also plot dendrogram figure by setting `reason` to 'dendro`. Quick plot the permutation importance diagram. Can be customize using the multiples keywargs arguments. :param reason: what_going_there? validation curve or learning curve. - ``pfi`` for permutation feature importance before and after sguffling trees -``dendro`` for dendrogram plot :param turn: Continue the plotting or switch off the plot and return the function. default is `off` else `on`. :param kws: Could be the keywords arguments for `matplotlib.pyplot` library. :param barh_kws: matplotlib.pyplot.barh keywords arguments. Refer to https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.barh.html :param box_kws: :ref:`plt.boxplot` keyword arguments. Refer to <https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.boxplot.html>` :param dendro_kws: scipy.cluster.hierarchy.dendrogram diagram .. see also:: `<https://docs.scipy.org/doc/scipy/reference/generated/scipy.cluster.hierarchy.dendrogram.html>` """ def __init__(self, reason ='pfi', turn ='off', **kwargs): self._logging=watexlog().get_watex_logger(self.__class__.__name__) self.reason = reason self.turn = turn self.fig_size = kwargs.pop('fig_size',(9, 3) ) self.savefig = kwargs.pop('savefig', None) self.xlab = kwargs.pop('xlabel', {'xlabel':'Importance'}) self.ylab= kwargs.pop('ylabel', {'ylabel':'Features'}) self.barh_kws=kwargs.pop('barh_kws', {'color':'blue', 'edgecolor':'k', 'linewidth':2}) self.box_kws=kwargs.pop('box_kws', {'vert':False, 'patch_artist':False}) self.dendro_kws=kwargs.pop('dendro_kws',{'leaf_rotation':90, # 'orientation':'right' } ) self.fig_title =kwargs.pop('fig_title', 'matplotlib.axes.Axes.barh Example') def __call__(self, func:Callable[..., T]): @functools.wraps(func) def feat_importance_dec (*args, **kwargs): """ Decorated pfi and dendrogram diagram """ X, result, tree_indices, clf, tree_importance_sorted_idx,\ data_columns, perm_sorted_idx, pfi_type, switch, savefig =func( *args, **kwargs) if pfi_type is not None : self.reason = pfi_type if switch is not None : self.turn = switch if savefig is not None: self.savefig = savefig if self.turn ==('on' or True or 1): fig, axes = plt.subplots(1, 2,figsize=self.fig_size) self._plot_barh_or_spearman( X, clf, func, fig, axes, self.reason, tree_indices, tree_importance_sorted_idx, data_columns, \ result , perm_sorted_idx, **kwargs) plt.show() if self.savefig is not None: if isinstance(self.savefig, dict): plt.savefig(**self.savefig) else : plt.savefig(self.savefig) return func(*args, **kwargs) return feat_importance_dec def _plot_barh_or_spearman (self, X, clf, func, fig, axes , reason, *args, **kwargs): """ Plot bar histogram and spearmean """ ax1, ax2 = axes tree_indices, tree_importance_sorted_idx, data_columns, \ result , perm_sorted_idx = args if reason == 'pfi': ax1.barh(tree_indices, clf.feature_importances_[tree_importance_sorted_idx] *100, height=0.7, **self.barh_kws) ax1.set_yticklabels(data_columns[tree_importance_sorted_idx]) ax1.set_yticks(tree_indices) ax1.set_ylim((0, len(clf.feature_importances_))) ax2.boxplot(result.importances[perm_sorted_idx].T *100, labels=data_columns[perm_sorted_idx], **self.box_kws) ax1.set_xlabel(**{k:v +' before shuffling (%)' for k, v in self.xlab.items()} ) ax1.set_ylabel(**self.ylab) ax2.set_xlabel(**{k:v +' after shuffling (%)' for k, v in self.xlab.items()} ) try : ax1.set_title(self.fig_title + ' using '+\ clf.__class__.__name__) except : ax1.set_title(self.fig_title) fig.tight_layout() if reason == 'dendro': from scipy.stats import spearmanr from scipy.cluster import hierarchy if X is None : self._logging.debug('Please provide the train features !') warnings.warn( ' Parameter `X` is missing. ' ' Could not plot the dendromarc diagram') return func(*args, **kwargs) elif X is not None: corr = spearmanr(X).correlation *100 corr_linkage = hierarchy.ward(corr) dendro = hierarchy.dendrogram(corr_linkage, labels=data_columns, ax=ax1, **self.dendro_kws) dendro_idx = np.arange(0, len(dendro['ivl'])) ax2.imshow(corr[dendro['leaves'], :][:, dendro['leaves']]) ax2.set_xticks(dendro_idx) ax2.set_yticks(dendro_idx) ax2.set_xticklabels(dendro['ivl'], rotation='vertical') ax2.set_yticklabels(dendro['ivl']) ax1.set_ylabel(**{k:v +' linkage matrix' for k, v in self.ylab.items()} ) fig.tight_layout()
[docs] class catmapflow2: """ Decorator function collected from the `func`the `target_values` to be categorized and the `cat_range_values` to change into `cat_classes` like:: cat_range_values= [0.0, [0.0, 3.0], [3.0, 6.0], [6.0, 10.0], 10.0] target_values =[1, 2., 3., 6., 7., 9., 15., 25, ...] Decorated Fonction returns the new function decorated holding values categorized into categorial `cat_classes`. For instance in groundwater exploration: - FR0 --> `flow` is equal to ``0.``m3/h - FR1 --> `flow` is ``0 < FR ≤ 3`` m3/h - FR2 --> `flow` is ``3 < FR ≤ 6`` m3/h - FR3 --> `flow` is ``6 < FR ≤ 10`` m3/h - FR4 --> `flow` is ``10.+`` in m3/h :return: Iterable object with new categorized values converted into `cat_classes`. Author: @Daniel03 Date: 13/07/2021 """ def __init__(self, cat_classes: Iterable[str]=['FR0', 'FR1', 'FR2', 'FR3', 'FR4']): self._logging= watexlog().get_watex_logger(self.__class__.__name__) self.cat_classes = cat_classes def __call__(self, func): self._func = func return self.categorized_dec(self._func)
[docs] def categorized_dec(self, func): """ Decorator can be adapted to other categorized problem by changing the `cat_classes` arguments to another categorized classes for other purposes like :: cat_classes=['dry', 'HV', 'IHV', 'IVH+', 'UH'] Where ``IVHU`` means I:improved V:village H:hydraulic and U:urban. :Note: If `func` to be decorated contains ` cat_classes` arguments, the `cat_classes` argument should be erased by the given from `func`. """ @functools.wraps(func) def wrapper(*args, **kwargs): """ Function deals with the categorized flow values. :param args: positional argumnent of `func` :param kwargs: Optional argument of `func` :return `new_target_array`: Iterable object categorized. """ self.cat_range_values, target_array, catfc = func(*args, **kwargs) if catfc is not None : self.cat_classes = catfc if len(self.cat_range_values) != len(self.cat_classes): self._logging.error( "Length of categorical `values` and `classes` provided" " must be consistent; '{0}' and '{1}' are given " "respectively.".format(len(self.cat_range_values), len(self.cat_classes))) try : new_target_array = np.array(list(map( self.mapf, target_array))) except : new_target_array = np.zeros_like(target_array) for ii, ff in enumerate(target_array) : new_target_array[ii] = self.mapf(crval=ff) return new_target_array return wrapper
[docs] def mapf(self, crval): """ Categorizing loop to hold the convenient classes according to the `cat_range_value` provided. Come as supplement tools when ``maping`` object doesnt work properly. :param crval: value to be categorized :param nfval: array of `cat_range_values` :param fc: Object to replace the`crval` belonging to `cat_classes` """ nfval = self.cat_range_values.copy() for ii, val in enumerate(nfval): try : if isinstance(val, (float, int)): if crval ==nfval[0]: return self.cat_classes[0] elif crval>= nfval[-1] : return self.cat_classes[-1] elif isinstance(val, (list, tuple)): if len(val)>1: if val[0] < crval <= val[-1] : return self.cat_classes[ii] except : if crval ==0.: return self.cat_classes[0] elif crval>= nfval[-1] : return self.cat_classes[-1]
[docs] class docstring: """ Generate new doctring of a function or class by appending the doctring of another function from the words considered as the startpoint `start` to endpoint `end`. Sometimes two functions inherit the same parameters. Repeat the writing of the same parameters is redundancy. So the most easier part is to collect the doctring of the inherited function and paste to the new function from the `startpoint`. Parameters ----------- func0: callable, function to use its doctring start: str Value from which the new docstring should be start. end: str endpoint Value of the doctring. Stop considering point. Examples -------- .. In the followings examples let try to append the `writedf` function from ``param reason`` (start) to `param to_` (end) to the dostring to `predPlot` class. `predPlot` class class will holds new doctring with writedf.__doc__ appended from `param reason` to `param to_`. >>> from watex.decorators import writedf , predPlot, docstring >>> docs = doctring(writedf, start ='param reason', end='param to_')(predPlot) >>> docs.__doc__ >>> predPlot.__doc__ # doc modified and holds the writedf docstring too. *Author: @Daniel03* *Date: 18/09/2021* """ def __init__(self, func0, start='Parameters', end=None ): self.func0 = func0 self.start =start self.end =end def __call__(self, func): self._func =func return self._decorator(self._func ) def _decorator(self, func): """ Collect the doctring of `func0` from `start` to `end` and add to a new doctring of wrapper`. """ func0_dstr = self.func0.__doc__ # keet the only part you need if self.start is None: start_ix =0 else: start_ix = func0_dstr.find(self.start) # index of start point if self.end is not None: end_ix = func0_dstr.find(self.end) # remain_end_substring = func0_dstr[end_ix:] substring = func0_dstr[start_ix :end_ix] else : substring = func0_dstr[start_ix :] end_ix = -1 if start_ix <0 : warnings.warn(f'`{self.start}` not find in the given ' f'{self.func0.__name__!r} doctring` function will ' f'append all the doctring of {self.func0.__name__!r}' ' by default.') start_ix =0 if end_ix <0 : warnings.warn(f'`{self.end} not found in the given' f' {self.func0.__name__!r} doctring` function will ' f'append all the doctring of {self.func0.__name__!r}' ' thin the end by default.') if self.start is not None: try: param_ix = func.__doc__.find(self.start) except AttributeError: if inspect.isclass(func): fname = func.__class__.__name__ else: fname = func.__name__ # mean there is no doctrings. # but silent the warnings with warnings.catch_warnings(): warnings.simplefilter("ignore") warnings.warn(" Object `%s` has none doctrings!`NoneType`" " object has no attribute `find`."%fname) return func # find end _ix and remove if func.__doc__.find(self.end)>=0: example_ix = func.__doc__.find(self.end) str_betw_param_example = func.__doc__[ param_ix:example_ix] else : str_betw_param_example= func.__doc__[param_ix:] example_ix =None # remove --- `start`value and `\n` at the end of # in func substring str_betw_param_example = str_betw_param_example.replace( self.start +'\n', '').replace('-\n', '').replace('-', '') # now remove start point in for i, item in enumerate(str_betw_param_example): if item !=' ': str_betw_param_example= str_betw_param_example[i:] break # in the concat string to new docstring of func. func.__doc__ = func.__doc__[:param_ix] + f'{substring}'+\ str_betw_param_example if example_ix is not None: func.__doc__+= func.__doc__[example_ix:] # set new_attributes setattr(func, '__doc__', func.__doc__) return func
[docs] class docAppender: """ Decorator to generate a new doctring from appending the other class docstrings. Indeed from the startpoint <`from_`> and the endpoint<`to`>, one can select the part of the any function or class doctrings to append to the existing doctring for a new doctring creation. This trip is useful to avoid redundancing parameters definitions everywhere in the scripts. Parameters ----------- func0: callable, Function or class to collect the doctring from. from_: str Reference word or expression to start the collection of the necessary doctring from the `func0`. It is the startpoint. The *default* is ``Parameters``. to: str Reference word to end the collection of the necessary part of the docstring of `func0`. It is the endpoint. The *default* is ``Returns``. insert: str, Reference word or expression to insert the collected doctring from the `func0` and append of the index of the `insert` word in `func`. If not found in the `func` doctring, it should retun None so nothing should be appended. The *default* is ``Parameters``. Examples --------- >>> from watex.decorators import docAppender >>> def func0 (*args, **kwargs): ... '''Im here so share my doctring. ... ... Parameters ... ----------- ... * args: list, ... Collection of the positional arguments ... ** kwargs: dict ... Collection of keywords arguments ... Returns ... ------- ... None: nothing ... ''' ... pass >>> def func(s, k=0): ... ''' Im here to append the docstring from func0 ... Parameters ... ---------- ... s: str , ... Any string value ... k: dict, ... first keyword arguments ... ... Returns ... -------- ... None, I return nothing ... ''' >>> deco = docAppender(func0 , from_='Parameters', ... to='Returns', insert ='---\\n')(func) >>> deco.__doc__ ... Warnings -------- Be sure to append two doctrings with the same format. One may choose either the sphinx or the numpy doc formats. Not Mixing the both. """ insert_=('parameters', 'returns', 'raises', 'examples', 'notes', 'references', 'see also', 'warnings' ) def __init__ (self, func0: Callable[[F], F] , from_: str ='Parameters', to: str ='Returns', insertfrom: str = 'Parameters', remove =True ): self.func0 = func0 self.from_=from_ self.to=to self.remove= remove self.insert = insertfrom def __call__(self, func): self._func = copy.deepcopy(func ) return self.make_newdoc (self._func)
[docs] def make_newdoc(self, func): """ make a new docs from the given class of function """ def sanitize_docstring ( strv): """Sanitize string values and force the string to be on the same level for parameters and the arguments of the parameters. :param strv: str return a new string sanitized that match the correct spaces for the sphinx documentation. """ if isinstance(strv, str): strv = strv.split('\n') # remove the '' in the first string if strv[0].strip() =='':strv=strv[1:] # get the first occurence for parameters definitions ix_ = 0 ; for ix , value in enumerate (strv): if (value.lower().find(':param') >=0) or (value.lower( ).find('parameters')>=0): ix_ = ix ; break # Put all explanations in the same level # before the parameters for k in range(ix_ +1): strv[k]= strv[k].strip() for ii, initem in enumerate (strv): for v in self.insert_: if initem.lower().find(v)>=0: initem= initem.strip() strv[ii]= initem break if '--' in initem or (':' in initem and len(initem) < 50) : strv[ii]= initem.strip() elif (initem.lower() not in self.insert_) and ii > ix_: strv[ii]=' ' + initem.strip() return '\n'.join(strv) # get the doctring from the main func0 func0_dstr = self.func0.__doc__ # select the first occurence and remove '----' if exists if self.from_ is None: warnings.warn('Argument `from_` is missing. Should be the first' f' word of {self.func0.__name__!r} doctring.') self.from_ = func0_dstr.split()[0] from_ix = func0_dstr.find(self.from_) func0_dstr = func0_dstr [from_ix:] # remove the first occurence of the from_ value and --- under if exists. # in the case where from =':param' remove can be set to False if self.remove: func0_dstr = func0_dstr.replace(self.from_, '', 1).replace('-', '') # get the index of 'to' or set None if not given # now we are selected the part and append to the # existing doc func where do you want to insert to_ix = func0_dstr.find (self.to ) if self.to is not None else None func0_dstr= func0_dstr [:to_ix if to_ix >=0 else None] if self.insert.lower() not in (self.insert_): warnings.warn(f"It's seems the given {self.insert!r} for docstring" f" insertion is missing to {self.insert_} list") in_ix = self._func.__doc__.lower().find(self.insert.lower()) # assert whether the given value insert from exists . if in_ix < 0 : warnings.warn(f"Insert {self.insert!r} value is not found in the " "{'class' if inspect.isclass(self._func) else 'function'") # split the string with `\n` # and loop to find the first occurence # by default skip the next item which could be '----' # and insert to the list next point func0_dstr = func0_dstr.split('\n') finalstr = self._func.__doc__.split('\n') rpop(func0_dstr) func0_dstr = '\n'.join(func0_dstr) for ii, oc in enumerate(finalstr) : if oc.lower().find(self.insert.lower()) >=0 : finalstr.insert (ii+2, func0_dstr) finalstr = '\n'.join(finalstr);break setattr(func, '__doc__', sanitize_docstring (finalstr)) return func
[docs] class docSanitizer: """Decorator to clean the doctring and set all values of sections to the same level. It sanitizes the doctring for the use of sphinx documentation. Examples -------- >>> from watex.decorators import docSanitizer >>> def messdocfunc(): ... '''My doctring is mess. I need to be polished and well arranged. ... ... Im here to sanitize the mess doctring. ... ... Parameters ... ---------- ... * args: list, ... Collection of the positional arguments ... ** kwargs: dict ... Collection of keywords arguments ... ... * kwargs: list, ... Collection of the keyword arguments ... ... Warnings ... -------- ... Let check for warnings string ... ... ... ''' ... pass >>> cleandocfunc = docSanitizer()(messfocfunc) >>> print(cleandocfunc.__doc__) ... ''' ... My doctring is mess. I need to be polished and well arranged. ... ... Parameters ... ---------- ... * args: list, ... Collection of the positional arguments ... ** kwargs: dict ... Collection of keywords arguments ... * kwargs: list, ... Collection of the keyword arguments ... ''' """ insert_= ('parameters','returns','raises', 'examples','notes', 'references', 'see also', 'warnings', ':param', ':rtype', ) def __call__(self, func): func =copy.deepcopy(func) docstring = copy.deepcopy(func.__doc__) if isinstance(docstring , str): docstring = docstring .split('\n') # remove the '' in the first string if docstring [0].strip() =='':docstring =docstring [1:] # get the first occurence for parameters definitions # and separate the doctring into two parts: descriptions #and corpus doctring as the remainings ix_ = 0 for ix , value in enumerate (docstring ): if (value.lower().find(':param') >=0) or (value.lower( ).find('parameters')>=0): ix_ = ix ; break #--> sanitize the descriptions part description =docstring [: ix_] ; # before the parameters for k in range(len(description)): description [k]= description [k].strip() # remove at the end of description the blanck space '\n' description = description[:-1] if description[-1].strip( )== '' else description # --> work with the corpus docstrings # get indexes for other sections and removes spaces docstring = docstring [ix_:] rpop (docstring) ixb = len(docstring) for ind , values in enumerate (docstring): if values.lower().strip() in ( 'examples', 'see also', 'warnings', 'notes', 'references'): ixb = ind ; break # all values in same level for k in range(ixb, len(docstring)): docstring [k]= docstring [k].strip() for ii, initem in enumerate (docstring ): for v in self.insert_: if initem.lower().find(v)>=0: initem= initem.strip() docstring [ii]= initem break if '--' in initem or ( ':' in initem and len(initem) < 50 ) or ix_>=ixb : docstring [ii]= initem.strip() elif (initem.lower() not in self.insert_ ) and ix_< ii < ixb: docstring [ii]=' ' + initem.strip() # add blanck line from indexes list ixs ixs=list() for k, item in enumerate (docstring): for param in self.insert_[:-2]: if item.lower().strip() == param: ixs.append(k) break ki =0 for k in ixs : docstring.insert (k+ki, '') ki+=1 # add number of insertions # --> combine the descriptions and docstring and set attributes setattr(func, '__doc__' , '\n'.join(description + docstring )) return func
[docs] class gplot2d(object): """ Decorator class to plot geological models. Arguments ---------- **reason**: type of plot, can be `misfit` or `model`. If `` None``, will plot `model`. reason: str related to the kind of plot kws: Matplotlib properties and model properties Additional keywords attributes and descriptions ====================== =============================================== keywords Description ====================== =============================================== cb_pad padding between axes edge and color bar cb_shrink percentage to shrink the color bar climits limits of the color scale for resistivity in log scale (min, max) cmap name of color map for resistivity values fig_aspect aspect ratio between width and height of resistivity image. 1 for equal axes fig_dpi resolution of figure in dots-per-inch fig_num number of figure instance fig_size size of figure in inches (width, height) font_size size of axes tick labels, axes labels is +2 grid [ 'both' | 'major' |'minor' | None ] string to tell the program to make a grid on the specified axes. ms size of station marker plot_yn [ 'y' | 'n'] 'y' --> to plot on instantiation 'n' --> to not plot on instantiation station_color color of station marker station_font_color color station label station_font_pad padding between station label and marker station_font_rotation angle of station label in degrees 0 is horizontal station_font_size font size of station label station_font_weight font weight of station label station_id index to take station label from station name station_marker station marker. if inputing a LaTex marker be sure to input as r"LaTexMarker" otherwise might not plot properly title title of plot. If None then the name of the iteration file and containing folder will be the title with RMS and Roughness. xlimits limits of plot in x-direction in (km) xminorticks increment of minor ticks in x direction xpad padding in x-direction in km ylimits depth limits of plot positive down (km) yminorticks increment of minor ticks in y-direction ypad padding in negative y-direction (km) yscale [ 'km' | 'm' ] scale of plot, if 'm' everything will be scaled accordingly. ====================== =============================================== """ def __init__(self, reason =None , **kws): self._logging= watexlog().get_watex_logger(self.__class__.__name__) self.reason =reason self.fs =kws.pop('fs', 0.7) self.fig_num = kws.pop('fig_num', 1) self.fig_size = kws.pop('fig_size', [7,7]) self.fig_aspect = kws.pop('fig_aspect','auto') self.fig_dpi =kws.pop('fig_dpi', 300) self.font_size = kws.pop('font_size', 7) self.aspect = kws.pop('aspect', 'auto') self.font_style =kws.pop('font_style', 'italic') self.orient=kws.pop('orientation', 'landscape') self.cb_pad = kws.pop('cb_pad', .0375) self.cb_orientation = kws.pop('cb_orientation', 'vertical') self.cb_shrink = kws.pop('cb_shrink', .75) self.cb_position = kws.pop('cb_position', None) self.climits = kws.pop('climits', (0, 4)) self.station_label_rotation = kws.pop('station_label_rotation',45) self.imshow_interp = kws.pop('imshow_interp', 'bicubic') self.ms = kws.pop('ms', 2) self.lw =kws.pop('lw', 2) self.fw =kws.pop('font_weight', 'bold') self.station_font_color = kws.pop('station_font_color', 'k') self.station_marker = kws.pop('station_marker',r"$\blacktriangledown$") self.station_color = kws.pop('station_color', 'k') self.xpad = kws.pop('xpad', 1.0) self.ypad = kws.pop('ypad', 1.0) self.cmap = kws.pop('cmap', 'jet_r') self.depth_scale =kws.pop('depth_scale', None) self.doi = kws.pop('doi', 1000) self.savefig =kws.pop('savefig', None) self.model_rms =kws.pop('model_rms', None) self.model_roughness =kws.pop('model_roughness', None) self.plot_style =kws.pop( 'plot_style', 'pcolormesh') self.grid_alpha =kws.pop('alpha', 0.5) self.show_grid = kws.pop('show_grid',True) self.set_station_label=kws.pop('show_station_id', True) for keys in list(kws.keys()): setattr(self, keys, kws[keys]) def __call__(self, func): """ Model decorator to hold the input function with arguments :param func: function to be decorated :type func: object """ return self.plot2DModel(func)
[docs] def plot2DModel(self, func): @functools.wraps(func) def new_func (*args, **kwargs): """ new decorated function . Plot model data and misfit data :args: arguments of function to be decorated :type args: list :param kwargs: positional arguments of decorated function :type kwargs: dict :return: function decorated after visualisation """ self._logging.info( ' Plot decorated {0}.'.format(func.__name__)) _f=0 # flag to separated strata model misfit and occam model misfit # from occamResponse file if self.depth_scale is not None : self.depth_scale= str(self.depth_scale).lower() if self.depth_scale not in ("km", "m"): mess = ("Depth scale expects 'm' or 'km'. By default 'm'" " is used instead. Got {}").format(self.depth_scale) # warnings.warn(mess) self.depth_scale= "m" self._logging.debug (mess) if self.depth_scale == 'km': dz = 1000. elif self.depth_scale == 'm': dz = 1. # figure configuration self.fig = plt.figure(self.fig_num, self.fig_size, dpi=self.fig_dpi) plt.clf() self.fig_aspect ='auto' axm = self.fig.add_subplot(1, 1, 1, aspect=self.fig_aspect) # get geomodel data if self.reason is None: self.reason = 'model'# by default # ----populate special attributes from model or misfit ------------ if self.reason =='model': # (data, self.model_stations, self.model_station_locations, # self.model_depth, self.doi, depth_scale, self.model_rms, # self.model_roughness, misfit_G ) ( occam_model_resistiviy_obj, occam_data_station_names, occam_data_station_offsets, occam_model_depth_offsets, self.doi, self.depth_scale, self.model_rms, self.model_roughness, plot_misfit ) = func( *args, **kwargs) self.doi = occam_model_depth_offsets.max() # self.doi = occam_model_depth_offsets.max() # --> check doi value provided, and convert to default unit {meters} self.doi =assert_doi(doi=self.doi) # set boundaries of stations offsets and depth spec_f = -(self.doi/5)/dz # assume that depth will start by #0 then substract add value so # to get space for station names text if self.climits is None : self.climits =(0,4) if plot_misfit is True : _f=2 self.ylimits =(spec_f, self.doi/dz) if self.reason =='misfit': occam_model_resistiviy_obj, occam_data_station_names, *m = func( *args, **kwargs) occam_data_station_offsets, occam_model_depth_offsets, *rg=m self.model_rms, self.model_roughness= rg # check if "plotmisfit refers to 'geoStrata model 'geodrill # module then keep the doi and set `spec_f if 'geodtype' in list(kwargs.keys()): # means plot `misfit` from geostrata model spec_f = -(self.doi/5)/dz self.ylimits =(spec_f, self.doi/dz) else : # frequency are in log10 new doi is set according to self.doi =occam_model_depth_offsets.max() #spec_f = (self.doi/5)/dz # o.8 spec_f = - 0. _f=1 self.ylimits = (self.doi, occam_model_depth_offsets.min()) #------------- manage stations and climits ------------------------ occam_data_station_offsets =np.array(occam_data_station_offsets) # station separation and get xpad . ex dl=50 then xpad =25 dl = occam_data_station_offsets.max()/ (len( occam_data_station_offsets)-1) self.xpad = (dl/2)/dz self.xlimits=(occam_data_station_offsets.min()/dz -self.xpad , occam_data_station_offsets.max()/dz + self.xpad ) # configure climits if self.reason =='misfit': if self.climits is None : self.climits =(-3, 3) elif 'min' in self.climits or 'max' in self.climits : self.climits = (occam_model_resistiviy_obj.min(), occam_model_resistiviy_obj.max()) if _f==2 : self.reason = 'misfit' self._logging.info ('Ready to plot {0}' ' with matplotlib "{1}" style.'. format(self.reason, self.plot_style)) # -------------- check dimensionnality --------------------------- occam_model_resistiviy_obj, *dm= self._check_dimensionality ( occam_model_resistiviy_obj, occam_model_depth_offsets, occam_data_station_offsets ) occam_model_depth_offsets, occam_data_station_offsets = dm if self.plot_style.lower() =='pcolormesh': mesh_x , mesh_z= np.meshgrid(occam_data_station_offsets, occam_model_depth_offsets ) vmin = self.climits[0] vmax = self.climits[1] axm.pcolormesh (mesh_x/dz , mesh_z/dz , occam_model_resistiviy_obj, vmin = vmin, vmax = vmax, shading= 'auto', cmap =self.cmap, alpha = None, ) if self.plot_style.lower() =='imshow': mesh_x , mesh_z= np.meshgrid(occam_data_station_offsets, occam_model_depth_offsets ) axm.imshow (occam_model_resistiviy_obj, vmax = self.climits[1], vmin =self.climits[0], interpolation = self.imshow_interp, cmap =self.cmap, aspect = self.fig_aspect, origin= 'upper', extent=( self.xlimits[0], self.xlimits[1], self.ylimits[1], self.ylimits[0] - spec_f), ) # get colormap for making a colorbar if type(self.cmap) == str: self.cmap = cm.get_cmap(self.cmap) axm.set_xlim( [self.xlimits[0], self.xlimits[1]]) axm.set_ylim ([self.ylimits[1], self.ylimits[0]]) # create twin axis to set ticks to the top station axe2=axm.twiny() axe2.xaxis.set_visible(False) # let keep only the axe lines #set axis and set boundaries if self.reason =='model' or _f==2 : ydown_stiteslbls = self.ylimits[0]/5 ydown_stationlbls = self.ylimits[0] -(self.ylimits[0]/3) xhorizontal_lbs = (occam_data_station_offsets.max()/dz)/2 elif self.reason =='misfit': ydown_stiteslbls = self.ylimits[0] + 0.1 * self.ylimits[1] ydown_stationlbls= self.ylimits[0] +\ self.ylimits[1]/self.ylimits[0] xhorizontal_lbs = (occam_data_station_offsets.max()- occam_data_station_offsets.min())/2 for offset , names in zip (occam_data_station_offsets, occam_data_station_names): # plot the station marker ' black triangle down ' # always plots at the surface. axm.text(offset/dz , self.ylimits[0] - spec_f, s= self.station_marker, horizontalalignment='center', verticalalignment='baseline', fontdict={'size': self.ms*5, 'color': self.station_color}, ) if self.set_station_label is True : # then plot label id axm.text(offset/dz , ydown_stiteslbls, s= names, horizontalalignment='center', verticalalignment='baseline', fontdict={'size': self.ms*3, 'color': self.station_color}, rotation = self.station_label_rotation, ) if self.set_station_label is True : axm.text (xhorizontal_lbs, ydown_stationlbls, s= 'Stations', horizontalalignment='center', verticalalignment='baseline', fontdict={'size': self.ms*5, 'color':'k', 'style': self.font_style, 'weight': self.fw}, ) #-------------------- manage grid and colorbar -------------------- self.g2dgridandcbManager(axm, _f) #------------------------------------------------------------------ # initialize the reason to keep the default reason self.reason = None if self.savefig is not None : plt.savefig(self.savefig, dpi = self.fig_dpi) plt.show() return func(*args, **kwargs) return new_func
def _check_dimensionality(self, data, z, x): """ Check dimensionality of data and fix it""" def reduce_shape(Xshape, x, axis_name =None): """ Reduce shape to keep the same shape""" mess ="`{0}` shape({1}) {2} than the data shape `{0}` = ({3})." ox = len(x) dsh = Xshape if len(x) > Xshape : x = x[: int (Xshape)] self._logging.debug(''.join([ f"Resize {axis_name!r}={ox!r} to {Xshape!r}.", mess.format(axis_name, len(x),'more',Xshape)])) elif len(x) < Xshape: Xshape = len(x) self._logging.debug(''.join([ f"Resize {axis_name!r}={dsh!r} to {Xshape!r}.", mess.format(axis_name, len(x),'less', Xshape)])) return int(Xshape), x sz0, z = reduce_shape(data.shape[0], x=z, axis_name ='Z') sx0, x =reduce_shape (data.shape[1], x=x, axis_name ='X') # resize theshape # data = np.resize(data, (sz0, sx0)) data = data [:sz0, :sx0] return data , z, x
[docs] def g2dgridandcbManager(self, axm, _f=None) : """ Plot2d model by configure grid and colorbar. :param axm: 2d axis plot :param _f: resize flag; misfit =2 and model =1 """ # put a grid on if set to True if self.show_grid is True: axm.minorticks_on() axm.grid(color='k', ls=':', lw =0.5, alpha=self.grid_alpha, which ='major') #set color bar properties cbx = mplcb.make_axes(axm, shrink=self.cb_shrink, pad=self.cb_pad , location ='right' ) cb = mplcb.ColorbarBase(cbx[0], cmap=self.cmap, norm=mpl.colors.Normalize(vmin=self.climits[0], vmax=self.climits[1])) cb.set_label('Resistivity ($\Omega \cdot$m)', fontdict={'size': self.font_size + 1, 'weight': 'bold'}) if self.reason == 'model' : cb.set_ticks(np.arange(int(self.climits[0]), int(self.climits[1]) + 1)) cb.set_ticklabels(['10$^{0}$'.format('{' + str(nn) + '}') for nn in np.arange(int(self.climits[0]), int(self.climits[1]) + 1)]) else : cb.set_ticks(np.linspace(self.climits[0], self.climits[1],5)) cb.set_ticklabels(['{0}'.format(str(round(nn,2))) for nn in np.linspace(self.climits[0], self.climits[1],5)]) cb.set_label('misfitvalue(%)', fontdict={'size': self.font_size + 1, 'weight': 'bold'}) # set axes labels axm.set_xlabel('Distance ({0})'.format(self.depth_scale), fontdict={'size': self.font_size + 2, 'weight': 'bold'}) if self.reason =='misfit': if _f ==2: ylabel = 'Depth ({0})'.format(self.depth_scale) else : ylabel= 'Log10Frequency(Hz)' mesT ='Plot Misfit' elif self.reason =='model': ylabel = 'Depth ({0})'.format(self.depth_scale) mesT = 'Plot strata model' axm.set_ylabel(ylabel,fontdict={ 'size': self.font_size + 2, 'weight': 'bold'}) self.fig.suptitle('{0}- DataType = {1} :RMS={2}, Roughness={3}'.\ format(mesT, self.reason, self.model_rms, self.model_roughness), ha='center', fontsize= 7* self.fs, verticalalignment='center', style =self.font_style, bbox =dict(boxstyle='round',facecolor ='moccasin'), y=0.95 if self.reason =='model' else 0.98) return self
class _M: def _m(self): pass MethodType = type(_M()._m) class _AvailableIfDescriptor: """Implements a conditional property using the descriptor protocol. Using this class to create a decorator will raise an ``AttributeError`` if check(self) returns a falsey value. Note that if check raises an error this will also result in hasattr returning false. See https://docs.python.org/3/howto/descriptor.html for an explanation of descriptors. """ def __init__(self, fn, check, attribute_name): self.fn = fn self.check = check self.attribute_name = attribute_name # update the docstring of the descriptor functools.update_wrapper(self, fn) def __get__(self, obj, owner=None): attr_err = AttributeError( f"This {repr(owner.__name__)} has no attribute {repr(self.attribute_name)}" ) if obj is not None: # delegate only on instances, not the classes. # this is to allow access to the docstrings. if not self.check(obj): raise attr_err out = MethodType(self.fn, obj) else: # This makes it possible to use the decorated method as an unbound method, # for instance when monkeypatching. @functools.wraps(self.fn) def out(*args, **kwargs): if not self.check(args[0]): raise attr_err return self.fn(*args, **kwargs) return out
[docs] @contextmanager def nullify_output(suppress_stdout=True, suppress_stderr=True): """ suppress stdout and stderr messages using context manager. https://www.codeforests.com/2020/11/05/python-suppress-stdout-and-stderr/ """ stdout = sys.stdout stderr = sys.stderr devnull = open(os.devnull, "w") try: if suppress_stdout: sys.stdout = devnull if suppress_stderr: sys.stderr = devnull yield finally: if suppress_stdout: sys.stdout = stdout if suppress_stderr: sys.stderr = stderr
[docs] class suppress_output: """ Python recipes- suppress stdout and stderr messages If you have worked on some projects that requires API calls to the external parties or uses 3rd party libraries, you may sometimes run into the problem that you are able to get the correct return results but it also comes back with a lot of noises in the stdout and stderr. For instance, the developer may leave a lot of “for your info” messages in the standard output or some warning or error messages due to the version differences in some of the dependency libraries. All these messages would flood your console and you have no control on the source code, hence you cannot change its behavior. To reduce these noises, one option is to suppress stdout and stderr messages during making the function call. In this article, we will discuss about some recipes to suppress the messages for such scenarios. """ def __init__(self, suppress_stdout=False, suppress_stderr=False): self.suppress_stdout = suppress_stdout self.suppress_stderr = suppress_stderr self._stdout = None self._stderr = None def __enter__(self): devnull = open(os.devnull, "w") if self.suppress_stdout: self._stdout = sys.stdout sys.stdout = devnull if self.suppress_stderr: self._stderr = sys.stderr sys.stderr = devnull def __exit__(self, *args): if self.suppress_stdout: sys.stdout = self._stdout if self.suppress_stderr: sys.stderr = self._stderr
[docs] @contextmanager def suppress_stdout(): with open(os.devnull, "w") as devnull: old_stdout = sys.stdout sys.stdout = devnull try: yield finally: sys.stdout = old_stdout
[docs] def available_if(check): """An attribute that is available only if check returns a truthy value Parameters ---------- check : callable When passed the object with the decorated method, this should return a truthy value if the attribute is available, and either return False or raise an AttributeError if not available. Examples -------- >>> from sklearn.utils.metaestimators import available_if >>> class HelloIfEven: ... def __init__(self, x): ... self.x = x ... ... def _x_is_even(self): ... return self.x % 2 == 0 ... ... @available_if(_x_is_even) ... def say_hello(self): ... print("Hello") ... >>> obj = HelloIfEven(1) >>> hasattr(obj, "say_hello") False >>> obj.x = 2 >>> hasattr(obj, "say_hello") True >>> obj.say_hello() Hello """ return lambda fn: _AvailableIfDescriptor(fn, check, attribute_name=fn.__name__)
# decorators utilities
[docs] def rpop(listitem): """ remove all blank line in the item list. :param listitem: list- list of the items and pop all the existing blanck lines. """ # now pop all the index for blanck line isblanck = False for ii, item in enumerate (listitem) : if item.strip()=='': listitem.pop(ii) isblanck =True return rpop(listitem) if isblanck else False
[docs] def assert_doi(doi): """ assert the depth of investigation Depth of investigation converter :param doi: depth of investigation in meters. If value is given as string following by yhe index suffix of kilometers 'km', value should be converted instead. :type doi: str|float :returns doi:value in meter :rtype: float """ if isinstance (doi, str): if doi.find('km')>=0 : try: doi= float(doi.replace('km', '000')) except :TypeError (" Unrecognized value. Expect value in 'km' " f"or 'm' not: {doi!r}") try: doi = float(doi) except: TypeError ("Depth of investigation must be a float number " "not: {str(type(doi).__name__!r)}") return doi