Source code for watex.geology.stratigraphic

# -*- coding: utf-8 -*-
#   License: BSD-3-Clause
#   Author: LKouadio <etanoyau@gmail.com>
#   Created on Thu Sep 22 12:21:11 2022

"""
Stratigraphic 
==============
Construct layers model from given layers properties such as density , porosity 
permeability, transmissivity, resistivity , patches and so on ... 
Build Statigraphic model from Inversion models blocks. This should be used to 
predict the log under each station as well as the thicknesses from the collected 
true boreholes and well resistivity data in the survey area. 

"""
import os
import warnings
import copy
from importlib import resources
import numpy as np

from .._typing import NDArray 
from .._watexlog import watexlog 
from ..decorators import (
    gplot2d
    )
from ..exceptions import NotFittedError
from .core import (
    Base 
    )
from .database import ( 
    GeoDataBase,
    )
from ..utils.funcutils import ( 
    serialize_data, 
    load_serialized_data,
    smart_format, 
    concat_array_from_list,
    parse_json, 
    parse_yaml,
    )
from ..utils.geotools import (
    _sanitize_db_items,
    _assert_model_type,
    assert_len_lns_tres, 
    assert_station, 
    fit_rocks, 
    fit_stratum_property, 
    get_s_thicknesses, 
    print_running_line_prop, 
    pseudostratigraphic_log,
    get_closest_gap,
    lns_and_tres_split, 
    
    )
from ..utils._dependency import ( 
    import_optional_dependency
    )

TQDM= False 
try : 
    import_optional_dependency("tqdm")
except ImportError: 
    pass 
else: 
    import tqdm 
    TQDM = True 
_logger = watexlog().get_watex_logger(__name__ )

__all__=["GeoStrataModel"]

#XXXTODO: add MODEM construction block  in progress 
[docs]class GeoStrataModel(Base): def __init__( self, beta=5, ptol=0.1 , n_epochs=100, tres=None, eta=1e-4, kind='linear', degree=1, build=False, **kwargs ): super().__init__( **kwargs) self._beta =beta self._ptol =ptol self._n_epochs = n_epochs self._tres = tres self._eta = eta self._kind =kind self._degree = degree self._b = build self.s0 =None self._zmodel =None self.nm= None self.z =None self.nmSites=None self.crmSites=None for key in list(kwargs.keys()): setattr(self, key, kwargs[key])
[docs] def fit(self, crm: NDArray =None, beta =5 , ptol= 0.1, **kws): """ Fit, populate attributes and construct the new stratigraphic model (NM) Parameters ------------ crm : ndarray of shape(n_vertical_nodes, n_horizontal_nodes ), Array-like of inversion two dimensional model blocks. Note that the `n_vertical_nodes` is the node from the surface to the depth while `n_horizontal_nodes` must include the station location (sites/stations) beta: int, Value to divide into the CRM blocks to improve the computation times. default is`5` n_epochs: int, Number of iterations. default is `100` ptols: float, Existing tolerance error between the `tres` values given and the calculated resistivity in `crm` Return -------- ``self``: :class:`watex.geology.stratigraphic.GeoStrataModel`. return `self` for methods chaining. """ for key in list(kws.keys()): setattr(self, key, kws[key]) if crm is not None: self.crm=crm # expect the bock is build from # external modeling softwares if hasattr (self, "input_resistivities"): if self.input_resistivities: self.tres = self.input_resistivities if hasattr (self, 'model_res'): if self.model_res is not None : self.crm = self.model_res self.s0= np.zeros_like(self.model_res) if self.crm is not None: self._makeBlock() self.build return self
@property def inspect (self): """ Inspect object whether is fitted or not""" msg = ( "{obj.__class__.__name__} instance is not fitted yet." " Call 'fit' with appropriate arguments before using" " this method" ) if not hasattr (self, 'crm'): raise NotFittedError(msg.format( obj=self) ) return 1 @property def n_epochs(self): """ Iteration numbers""" return self._n_epochs @n_epochs.setter def n_epochs(self, n_iterations): """ n_epochs must be in integers value and greater than 0""" try : self._n_epochs = int(n_iterations) except: TypeError('Iteration number must be `integer`') else: if self._n_epochs <=0: self._logging.debug( " Unaceptable iteration value! Must be a positive value not " f"{'a negative value.' if self._n_epochs <0 else 'equal to 0.'}") warnings.warn(f" {self._n_epochs} is unaceptable value." " Could be resset to the default value=100.") self._n_epochs = 100 @property def beta (self): """ Block constructor param""" return self._beta @beta.setter def beta(self, beta0 ): """ Block constructor must be integer value.""" try : self._beta = int(beta0) except Exception: raise TypeError else: if self._beta <=0 : self._logging.debug( f'{self._beta} is unaceptable. Could resset to 5.') warnings.warn( f'`{self._beta}` is unaceptable. Could resset to 5.') self._beta= 5 @property def ptol(self) : """ Tolerance parameter """ return self._ptol @ptol.setter def ptol(self, ptol0): """ Tolerance parameter must be different to zero and includes between 0 and 1""" try : self._ptol =float(ptol0) except Exception : raise TypeError ('Tolerance parameter `ptol` should be ' f'a float number not {type (ptol0)!r}.') else : if 0 >= self._ptol >1: self._logging.debug(f"Tolerance value `{self._ptol}` is " "{'greater' if self._ptol >1 else 'is unacceptable value'}`." "Could resset to 10%") warnings.warn( f'Tolerance value `{self._ptol}` is unacceptable value.' 'Could resset to 10%') self._ptol = 0.1 @property def tres(self): """ Input true resistivity""" return self._tres @tres.setter def tres(self, ttres): """ Convert Tres to log 10 resistivity """ try : self._tres =[np.log10(t) for t in ttres] except : raise ValueError('Unable to convert TRES values') @property def build (self): """ Trigger the NM build and return the NM building option """ ntres ='True resistivity values' nln ='collected layer names (True)' mes =''.join([ '{0} {1} not defined. Unable to triggered the NM construction. ' 'Please, provide the list/array of {2} of survey area.']) if self._b: if (self.tres and self.input_layers ) is None: warnings.warn(mes.format( 'TRES and LN', 'are', ntres +'and'+nln)) self._b=False elif self.tres is None and self.input_layers is not None: warnings.warn(mes.format('TRES', 'is', ntres)) self._b=False elif self.input_layers is None and self.tres is not None: warnings.warn(mes.format('LN', 'is', nln)) self._b=False if not self._b: self._logging.debug ( "Build is set to TRUE, however," '{0}'.mes.format( f'{"TRES" if self.tres is None else "LN"}', 'is', f'{ntres if self.tres is None else nln}') ) if self._b: self._createNM() def _createNM(self, crm =None, beta =5 , ptol= 0.1, **kws): """ Create NM through the differents steps of NM creatings. - step 1 : soft minimal computing - step2 : model function computing - step 3: add automatic layers - step 4: use ANN to find likehood layers :param crm: calculated resistivity model blocks :param beta: number of block to build. :param ptol: Error tolerance parameters """ def s__auto_rocks (listOfauto_rocks): """ Automatick rocks collected during the step 3 :param listOfauto_rocks: List of automatic rocks from differents subblocks. :returns:rocks sanitized and resistivities. """ listOfauto_rocks= np.concatenate((listOfauto_rocks), axis =1) rho_= listOfauto_rocks[1, :] rho_=np.array([float(ss) for ss in rho_]) r_= list(set(listOfauto_rocks[0, :])) hres= np.zeros((len(r_), 1)) h_= [] for ii, rock in enumerate(r_): for jj, ro in enumerate(listOfauto_rocks[0, :]): if rock == ro: h_.append(rho_[jj]) m_= np.array(h_) hres[ii]= m_.mean() h_=[] return r_, hres iln =kws.pop('input_layers', None) tres =kws.pop('tres', None) subblocks =kws.pop('subblocks', None) disp= kws.pop('display_infos', True) n_epochs = kws.pop('n_epochs', None) hinfos =kws.pop('headerinfos', ' Layers [auto=automatic]') if subblocks is not None: self.subblocks = subblocks if iln is not None: self.input_layers = iln if tres is not None: self.tres = tres if crm is not None: self.crm = crm if beta is not None: self.beta = beta if ptol is not None: self.ptol = ptol if n_epochs is not None: self.n_epochs = n_epochs self.s0 , errors=[], [] #step1 : SOFMINERROR if TQDM : pbar =tqdm.tqdm(total= 3, ascii=True,unit='B', desc ='geostrata', ncols =77) for ii in range(len(self.subblocks)): s1, error = self._softMinError(subblocks= self.subblocks[ii]) self.s0.append(s1) errors.append(error) if TQDM : pbar.update(1) #step2 : MODELFUNCTION USING DESCENT GRADIENT for ii in range(len(self.s0)): if 0 in self.s0[ii][:, :]: s2, error = self._hardMinError(subblocks =self.subblocks[ii], s0= self.s0[ii]) self.s0[ii]=s2 errors[ii]= error if TQDM : pbar.update(2) arp_=[] #Step 3: USING DATABASE for ii in range(len(self.s0)): if 0 in self.s0[ii][:, :]: s3, autorock_properties= self._createAutoLayer( subblocks=self.subblocks[ii], s0=self.s0[ii] ) arp_.append(autorock_properties) self.s0[ii]=s3 # Assembly the blocks self.nm = np.concatenate((self.s0)) self.z=self.nm[:, 0] self.nm = self.nm[:, 1:] if TQDM : pbar.update(3) print(' process completed') pbar.close() # make site blocks self.nmSites= makeBlockSites(x_nodes=self.model_x_nodes, station_location= self.station_location, block_model=self.nm ) self.crmSites = makeBlockSites(x_nodes=self.model_x_nodes, station_location= self.station_location, block_model=self.model_res) #Update TRES and LN gammaL, gammarho = s__auto_rocks(arp_) if self.input_layers is not None: print_layers = self.input_layers + [ ' {0} (auto)'.format(l) for l in gammaL ] self.input_layers = self.input_layers + gammaL # keep the auto_layer found self.auto_layers =gammaL self.tres = list(np.power(10,self._tres)) + list (np.power(10, np.array([float(rv) for rv in gammarho]))) # display infos if disp: display_infos(infos=print_layers, header= hinfos) #STEP 4: Train ANN: (still in development) to predict your #layer: No need to plot the NM # copy main attributes for pseudostratigraphic plot purpose import copy for name , attrval in zip(['TRES', 'LNS'], [self.tres , self.input_layers]): setattr(self, name, copy.deepcopy(attrval)) # memorize data _ps_memory_management(self) return self.nm def _softMinError(self, subblocks=None, **kws ): """ Replace the calculated resistivity by the true resistivity using the soft minimal error (ξ) :param crm: Is the calculated resistivity model from Occam2D inversion results """ buffer =self.ptol +1 #bufferr error _z = subblocks[:, 0] subblocks = subblocks[:, 1:] # Hold the columns of depth values s0 = np.zeros_like(subblocks.T) error =[] for ii in range(subblocks.shape[1]): # hnodes N for jj in range(subblocks.shape[0]): # znodes V for k in range(len(self.tres)) : sfme_k = (subblocks.T[ii, jj]-self.tres[k])**2\ /subblocks.T[ii, jj] **2 error.append(sfme_k ) if sfme_k <= self.ptol : if sfme_k < buffer : # keep the best minimum buffer = sfme_k s0[ii, jj] = self.tres[k] buffer = self.ptol +1 # initilize buffer s0= np.concatenate((_z.reshape(_z.shape[0], 1), s0.T), axis =1) return s0, error def _hardMinError(self, tres=None, subblocks=None, s0=None, ptol = None, kind='linear', **kwargs ): """The second step introduces the model function F=W∙Z where W contains the weights of parameters number and Z is V×2 matrix that contains a “bias” column. If the parameter number P equal to two, the model function f(z)=∑_(p=1)^P▒〖w_(p-1) z^(p-1) 〗 becomes a linear function with 〖f_1〗^((1) ) (z)= wz+r_0 with w_1=w and w_0=r_0 he gradient descent algorithm is used to find the best parameters w and r_0 that minimizes the MSE loss function J . :param subblocks: `crm` block :param s0: blocks from the first step :meth:`~._sofminError` :param kind: Type of model function to apply. Can also be a `polynomial` by specifying the `degree` into argument `degree`. :Example: >>> from watex.geology.stratigraphic import GeoStrataModel >>> geosObj = GeoStrataModel().fit(**inversion_files, input_resistivities=input_resistivity_values) >>> ss0, error = geosObj._hardMinError(subblocks=geosObj.subblocks[0], s0=geosObj.s0[0]) """ if tres is not None: self.tres = tres if ptol is not None: self.ptol = ptol eta = kwargs.pop('eta', None) if eta is not None: self._eta = eta n_epochs =kwargs.pop('n_epochs', None) if n_epochs is not None: self.n_epochs = n_epochs kind = kwargs.pop('kind', None) if kind is not None: self._kind = kind degree = kwargs.pop('degree', None) if degree is not None: self._degree = degree buffer =self.ptol +1 #bufferr error _z= s0[:, 0] s0 = s0[:, 1:].T subblocks=subblocks[:, 1:].T error =[] for ii in range(s0.shape[0]): # hnodes N F, *_= self.gradient_descent(z=_z,s=subblocks[ii,:], alpha= self._eta, n_epochs= self.n_epochs, kind= self._kind) for jj in range(s0.shape[1]): # znodes V if s0[ii, jj] ==0. : rp =F[jj] for k in range(len(self.tres)) : with np.errstate(all='ignore'): sfme_k = (rp -self.tres[k])**2\ /rp**2 _ermin = abs(rp-subblocks[ii, jj]) error.append(sfme_k) if sfme_k <= self.ptol and _ermin<= self.ptol: if sfme_k < buffer : # keep the best minimum buffer = sfme_k s0[ii, jj]= self.tres[k] buffer = self.ptol +1 # initialize buffer s0= np.concatenate((_z.reshape(_z.shape[0], 1), s0.T), axis =1) return s0, error
[docs] @staticmethod def gradient_descent(z, s, alpha, n_epochs, **kws): """ Gradient descent algorithm to fit the best model parameter. :param z: vertical nodes containing the values of depth V :param s: vertical vector containin the resistivity values :param alpha: step descent parameter or learning rate. *Default* is ``0.01` :param n_epochs: number of iterations. *Default* is ``100`` Can be changed to other values :returns: - `F`: New model values with the best `W` parameters found. - `W`: vector containing the parameters fits - `cost_history`: Containing the error at each Itiretaions. :Example: >>> z= np.array([0, 6, 13, 20, 29 ,39, 49, 59, 69, 89, 109, 129, 149, 179]) >>> res= np.array( [1.59268,1.59268,2.64917,3.30592,3.76168, 4.09031,4.33606, 4.53951,4.71819,4.90838, 5.01096,5.0536,5.0655,5.06767]) >>> fz, weights, cost_history = gradient_descent(z=z, s=res, n_epochs=10, alpha=1e-8, degree=2) >>> import matplotlib.pyplot as plt >>> plt.scatter (z, res) >>> plt.plot(z, fz) """ kind_=kws.pop('kind', 'linear') kind_degree = kws.pop('degree', 1) if kind_degree >1 : kind_='poly' if kind_.lower() =='linear': kind_degree = 1 elif kind_.lower().find('poly')>=0 : if kind_degree <=1 : _logger.debug( 'The model function is set to `Polynomial`. ' 'The degree must be greater than 1. Degree wil reset to 2.') warnings.warn('Polynomial degree must be greater than 1.' 'Value is ressetting to `2`.') kind_degree = 2 try : kind_degree= int(kind_degree) except Exception : raise ValueError(f'Could not `{kind_degree}` convert to integer.') def kindOfModel(degree, x, y) : """ Generate kind of model. If degree is``1`` The linear subset function will use. If `degree` is greater than 2, Matrix will generate using the polynomail function. :param x: X values must be the vertical nodes values :param y: S values must be the resistivity of subblocks at node x """ c= [] deg = degree w = np.zeros((degree+1, 1)) # initialize weights def init_weights (x, y): """ Init weights by calculating the scope of the function along the vertical nodes axis for each columns. """ for j in range(x.shape[1]-1): a= (y.max()-y.min())/(x[:, j].max()-x[:, j].min()) w[j]=a w[-1] = y.mean() return w # return weights for i in range(degree): c.append(x ** deg) deg= deg -1 if len(c)> 1: x= concat_array_from_list(c, concat_axis=1) x= np.concatenate((x, np.ones((x.shape[0], 1))), axis =1) else: x= np.vstack((x, np.ones(x.shape))).T # initialize z to V*2 w= init_weights(x=x, y=y) return x, w # Return the matrix x and the weights vector w def model(Z, W): """ Model function F= Z.W where `Z` id composed of vertical nodes values and `bias` columns and `W` is weights numbers.""" return Z.dot(W) # generate function with degree Z, W = kindOfModel(degree=kind_degree, x=z, y=s) # Compute the gradient descent cost_history = np.zeros(n_epochs) s=s.reshape((s.shape[0], 1)) for ii in range(n_epochs): with np.errstate(all='ignore'): # rather than divide='warn' #https://numpy.org/devdocs/reference/generated/numpy.errstate.html W= W - (Z.T.dot(Z.dot(W)-s)/ Z.shape[0]) * alpha cost_history[ii]= (1/ 2* Z.shape[0]) * np.sum((Z.dot(W) -s)**2) F= model(Z=Z, W=W) # generate the new model with the best weights return F,W, cost_history
[docs] @classmethod def geoArgumentsParser(cls, config_file =None): """ Read and parse the `GeoStrataModel` arguments files from the config [JSON|YAML] file. :param config_file: configuration file. Can be [JSON|YAML] :Example: >>> GeoStrataModel.geoArgumentsParser( 'data/saveJSON/cj.data.json') >>> GeoStrataModel.geoArgumentsParser( 'data/saveYAML/cy.data.yml') """ if config_file.endswith('json'): args = parse_json(config_file) elif config_file.endswith('yaml') or config_file.endswith('yml'): args = parse_yaml(config_file) else: raise ValueError('Can only parse JSON and YAML data.') return cls(**args)
def _makeBlock (self): """ Construct the differnt block based on `beta` param. Separate blocks from number of vertical nodes generated by the first `beta` value applied to the `crm`.""" self.zmodel = np.concatenate((self.geo_depth.reshape( self.geo_depth.shape[0], 1), self.model_res), axis =1) vv = self.zmodel[-1, 0] / self.beta for ii, nodev in enumerate(self.zmodel[:, 0]): if nodev >= vv: npts = ii # collect number of points got. break self._subblocks =[] bp, jj =npts, 0 if len(self.zmodel[:, 0]) <= npts: self._subblocks.append(self.zmodel) else: for ii , row in enumerate(self.zmodel) : if ii == bp: _tp = self.zmodel[jj:ii, :] self._subblocks.append(_tp ) bp +=npts jj=ii if len(self.zmodel[jj:, 0])<= npts: self._subblocks.append(self.zmodel[jj:, :]) break return self._subblocks @property def subblocks(self): """ Model subblocks divised by `beta`""" return self._subblocks @subblocks.setter def subblocks(self, subblks): """ keep subblocks as :class:`~GeoStrataModel` property""" self._subblocks = subblks def _createAutoLayer(self, subblocks=None, s0=None, ptol = None,**kws): """ The third step of replacement using the geological database. The third step consists to find the rock γ_L in the Γ with the ceiled mean value γ_ρ in E_props column is close to the calculated resistivity r_11. Once the rock γ_L is found,the calculated resistivity r_11 is replaced by γ_ρ. Therefore, the rock γ_L is considered as an automatic layer. At the same time,the TRES and LN is updated by adding GeoStratigraphy_ρ and γ_L respectively to the existing given data. """ db_properties = kws.pop('properties',['electrical_props', '__description'] ) tres = kws.pop('tres', None) disp = kws.pop('display_infos', False) hinfos = kws.pop('header', 'Automatic layers') if tres is not None : self.tres = tres if ptol is not None: self.ptol = ptol def _findGeostructures(_res): """ Find the layer from database and keep the ceiled value of `_res` calculated resistivities""" structures = self.get_structure(_res) if len(structures) !=0 or structures is not None: if structures[0].find('/')>=0 : ln = structures[0].split('/')[0].lower() else: ln = structures[0].lower() return ln, _res else: valEpropsNames = self._getProperties(db_properties) indeprops = db_properties.index('electrical_props') for ii, elecp_value in enumerate(valEpropsNames[indeprops]): if elecp_value ==0.: continue elif elecp_value !=0 : try : iter(elecp_value) except : pass else : if min(elecp_value)<= _res<= max(elecp_value): ln= valEpropsNames[indeprops][ii] return ln, _res def _normalizeAutoresvalues(listOfstructures,listOfvalues): """ Find the different structures that exist and harmonize value. and return an array of originated values and the harmonize values and the number of automatics layer found as well as their harmonized resistivity values. """ autolayers = list(set(listOfstructures)) hvalues= np.zeros((len(autolayers,))) temp=[] for ii , autol in enumerate(autolayers): for jj, _alay in enumerate(listOfstructures): if _alay ==autol: temp.append(listOfvalues[jj]) hvalues[ii]= np.array(list(set(temp))).mean() temp=[] # build values array containes the res and the harmonize values h= np.zeros((len(listOfvalues),)) for ii, (name, values) in enumerate(zip (listOfstructures, listOfvalues)): for jj, hnames in enumerate(autolayers) : if name == hnames: h[ii]= hvalues[jj] finalres= np.vstack((np.array(listOfvalues),h) ) finalln = np.vstack((np.array(autolayers), hvalues)) return finalln, finalres _z= s0[:, 0] s0 = s0[:, 1:].T _temptres , _templn =[], [] subblocks=subblocks[:, 1:].T for ii in range(s0.shape[0]): # hnodes N for jj in range(s0.shape[1]): # znodes V if s0[ii, jj] ==0. : lnames, lcres =_findGeostructures( np.power(10, subblocks[ii, jj])) _temptres.append(np.log10(lcres)) _templn.append(lnames) auto_rocks_names_res, automatics_resistivities =\ _normalizeAutoresvalues(_templn,_temptres ) for ii in range(s0.shape[0]): # hnodes N for jj in range(s0.shape[1]): # znodes V if s0[ii, jj] ==0. : for k in range(automatics_resistivities.shape[1]): subblocks[ii, jj] == automatics_resistivities[0,:][k] s0[ii, jj]= automatics_resistivities[1,:][k] break s0= np.concatenate((_z.reshape(_z.shape[0], 1), s0.T), axis =1) # display infos if disp: display_infos(infos=self.input_layers, header= hinfos) return s0, auto_rocks_names_res @staticmethod def _getProperties(properties =['electrical_props', '__description'], sproperty ='electrical_props'): """ Connect database and retrieve the 'Eprops'columns and 'LayerNames' :param properties: DataBase columns. :param sproperty : property to sanitize. Mainly used for the properties in database composed of double parenthesis. Property value should be removed and converted to tuple of float values. :returns: - `_gammaVal`: the `properties` values put on list. The order of the retrieved values is function of the `properties` disposal. """ def _fs (v): """ Sanitize value and put on list :param v: value :Example: >>> _fs('(416.9, 100000.0)')) ...[416.9, 100000.0] """ try : v = float(v) except : v = tuple([float (ss) for ss in v.replace('(', '').replace(')', '').split(',')]) return v # connect to geodataBase try : _dbObj = GeoDataBase() except: _logger.debug('Connection to database failed!') else: _gammaVal = _dbObj._retreive_databasecolumns(properties) if sproperty in properties: indexEprops = properties.index(sproperty ) try: _gammaVal [indexEprops] = list(map(lambda x:_fs(x), _gammaVal[indexEprops])) except TypeError: _gammaVal= list(map(lambda x:_fs(x), _gammaVal)) return _gammaVal
[docs] @gplot2d(reason='model',cmap='jet_r', plot_style ='pcolormesh', show_grid=False ) def strataModel(self, kind ='nm', **kwargs): """ Visualize the `strataModel` after `nm` creating using decorator from :class:'~.geoplot2d'. :param kind: can be : - `nm` mean new model plots after inputs the `tres` - `crm` means calculated resistivity from occam model blocks *default* is `nm`. :param plot_misft: Set to ``True`` if you want to visualise the error between the `nm` and `crm`. :param scale: Can be ``m`` or ``km`` for plot scale :param in_percent`: Set to `True` to see your plot map scaled in %. :Example: >>> from watex.geology.stratigraphic import GeostrataModel >>> geosObj = GeostrataModel().fit(**inversion_files, input_resistivities=input_resistivity_values, input_layers=input_layer_names) >>> geosObj.strataModel(kind='nm', misfit_G =False) """ m_='watex.geology.GeostrataModel.strataModel' def compute_misfit(rawb, newb, percent=True): """ Compute misfit with calculated block and new model block """ m_misfit = .01* np.sqrt ( (rawb - newb)**2 /rawb**2 ) if percent is True: m_misfit= m_misfit *100. return m_misfit if isinstance(kind, bool): kind ='nm' if kind else 'crm' depth_scale = kwargs.pop('scale', 'm') misfit_G =kwargs.pop('misfit_G', False) misfit_percentage = kwargs.pop('in_percent', True) kind = _assert_model_type(kind) if self.nm is None: self._createNM() if kind =='nm': data = self.nmSites if kind =='crm': data = self.crmSites # compute model_misfit if misfit_G is True : if kind =='crm': warnings.warn( "By default, the plot should be the stratigraphic" " misfit<misfit_G>.") self._logging.info('Visualize the stratigraphic misfit.') data = compute_misfit(rawb=self.crmSites , newb= self.nmSites, percent = misfit_percentage) print('{0:-^77}'.format('StrataMisfit info')) print('** {0:<37} {1} {2} {3}'.format( 'Misfit max ','=',data.max()*100., '%' )) print('** {0:<37} {1} {2} {3}'.format( 'Misfit min','=',data.min()*100., '%' )) print('-'*77) warnings.warn( f'Data stored from {m_!r} should be moved on binary drive and' ' method arguments should be keywordly only.', FutureWarning) return (data, self.station_names, self.station_location, self.geo_depth, self.doi, depth_scale, self.model_rms, self.model_roughness, misfit_G )
@staticmethod def _strataPropertiesOfSite(obj, station =None, display_s=True): """ Build all properties of strata under each station. Parameters ---------- station: str or int Use normal count to identify the number of site to plot or use the name of station preceed of letter `S`. For instance site 1 matches the station `S00` litterally display_s:bool Display the log layer infos as well as layers thicknesses Examples -------- >>> from watex.geology.stratigraphic import GeoStrataModel >>> import watex.utils.geotools as GU >>> geosObj = GeoStrataModel().fit( input_resistivities=TRES, ... input_layers=LNS,**INVERS_KWS) >>> geosObj._strataPropertiesOfSite(geosObj,station= 'S05') """ def stamping_ignored_rocks(fittedrocks, lns): """ Stamping the pseudo rocks and ignored them during plot.""" ir= set(fittedrocks).difference(set(lns)) for k in range( len(fittedrocks)): if fittedrocks[k] in ir : fittedrocks[k] ='$(i)$' return fittedrocks # assert the station, get it appropriate index and take the tres # at that index if station is None: stns = ["S{:02}".format(i) for i in range(obj.nmSites.shape[1])] obj._logging.error('None station is found. Please select one station' f' between {smart_format(stns)}') warnings.warn("NoneType can not be read as station name." " Please provide your station name. list of sites" f" are {smart_format(stns)}") raise ValueError("NoneType can not be read as station name." " Please provide your station name.") if obj.nmSites is None: obj._createNM() try : id0= int(station.lower().replace('s', '')) except : id_ =assert_station(id= station, nm = obj.nmSites) station_ = 'S{0:02}'.format(id_) else : id_ =assert_station(id= id0 + 1, nm = obj.nmSites) station_ = 'S{0:02}'.format(id0) obj.logS = obj.nmSites[:, id_] # assert the input given layers and tres is_the_same_length, msg = assert_len_lns_tres( obj.LNS , obj.TRES) if is_the_same_length : # then finf pslns = obj.LNS pstres= obj.TRES ps_lnstres = [(a, b) for a , b in zip(obj.LNS, obj.TRES)] if not is_the_same_length: # find the pseudoTRES and LNS for unknowrocks or layers msg += "Unknow layers should be ignored." _logger.debug(msg) warnings.warn(msg) pslns, pstres, ps_lnstres = fit_tres( obj.LNS, obj.TRES, obj.auto_layers) # now build the fitting rocks fitted_rocks =fit_rocks(logS_array= obj.nmSites[:,id_], lns_=pslns , tres_=pstres) # set the raws fitted rocks import copy setattr(obj, 'fitted_rocks_r', copy.deepcopy(fitted_rocks) ) # change the pseudo-rocks located in fitted rocks par ignored $i$ # and get the stamped rocks setattr(obj, 'fitted_rocks',stamping_ignored_rocks( fitted_rocks, obj.LNS ) ) # fit stratum property sg, _, zg, _= fit_stratum_property (obj.fitted_rocks, obj.z, obj.logS) obj.log_thicknesses, obj.log_layers,\ obj.coverall = get_s_thicknesses( zg, sg,display_s= display_s, station = station_ ) # set the dfault layers properties hatch and colors from database obj.hatch , obj.color = fit_default_layer_properties( obj.log_layers) return obj
[docs] @staticmethod def plotPseudostratigraphic(station, zoom=None, annotate_kws=None, **kws): """ Build the pseudostratigraphic log. :param station: station to visualize the plot. :param zoom: float represented as visualization ratio ex: 0.25 --> 25% view from top =0.to 0.25* investigation depth or a list composed of list [top, bottom]. :Example: >>> input_resistivity_values =[10, 66, 700, 1000, 1500, 2000, 3000, 7000, 15000 ] >>> input_layer_names =['river water', 'fracture zone', 'granite'] # Run it to create your model block alfter that you can only use # `plotPseudostratigraphic` only # >>> obj= quick_read_geomodel(lns = input_layer_names, # tres = input_resistivity_values) >>> plotPseudostratigraphic(station ='S00') """ if annotate_kws is None: annotate_kws = {'fontsize':12} if not isinstance(annotate_kws, dict): annotate_kws=dict() obj = _ps_memory_management(option ='get' ) obj = GeoStrataModel._strataPropertiesOfSite (obj,station=station, **kws ) # plot the logs with attributes pseudostratigraphic_log (obj.log_thicknesses, obj.log_layers,station, hatch =obj.hatch ,zoom=zoom, color =obj.color, **annotate_kws) print_running_line_prop(obj) return obj
def _ps_memory_management(obj=None, option='set'): """ Manage the running times for stratigraphic model construction. The script allows to avoid running several times the GeoStrataModel model construction to retrieve the pseudostratigraphic (PS) log at each station. It memorizes the model data for the first run and used it when calling it to visualize the PS log at each station. Be aware to edit this script. """ DMOD = 'watex.etc' memory, memorypath='__memory.pkl', 'watex/etc' with resources.path (DMOD, memory) as p : memory_file = str(p) # for consistency mkeys= ('set', 'get', 'recover', 'fetch', set) if option not in mkeys: raise ValueError('Wrong `option` argument. Acceptable ' f'values are {smart_format(mkeys[:-1])}.') if option in ('set', set): if obj is None: raise TypeError('NoneType object can not be set.') psobj_token = __build_ps__token(obj) data = (psobj_token, list(obj.__dict__.items())) serialize_data ( data, memory, savepath= memorypath ) return elif option in ('get', 'recover', 'fetch'): try: memory_exists = os.path.isfile(os.path.join(memorypath, memory)) except: memory_exists = os.path.isfile(memory_file) if not memory_exists: _logger.error('No memory found. Run the GeoStrataModel class ' 'beforehand to create your first model.') warnings.warn("No memory found. You need to build your " " GeoStrataModel model by running the class first.") raise MemoryError("Memory not found. Use the GeoStrataModel class to " "create your model first.") try: psobj_token, data_ = load_serialized_data( os.path.join(memorypath, memory)) except: psobj_token, data_ = load_serialized_data(memory_file ) data = dict(data_) # create PseudoStratigraphicObj from metaclass and inherits from # dictattributes of GeoStrataModel class psobj = type ('PseudoStratigraphic', (), { k:v for k, v in data.items()}) psobj.__token = psobj_token return psobj def makeBlockSites(station_location, x_nodes, block_model): """ Build block that contains only the station locations values :param station_location: array of stations locations. Must be self contains on the horizontal nodes (x_nodes) :param x_nodes: Number of nodes in horizontal :param block_model: Resistivity blocks model :return: - `stationblocks`: Block that contains only the station location values. :Example: >>> from watex.geology.stratigraphic import makeBlockSite >>> mainblocks= get_location_value( station_location=geosObj.makeBlockSite, x_nodes=geosObj.model_x_nodes, block_model=geosObj.model_res ) """ index_array =np.zeros ((len(station_location), ), dtype =np.int32) for ii, distance in enumerate(station_location): for jj , nodes in enumerate(x_nodes): if nodes == distance : index_array [ii]= jj break elif nodes> distance: min_= np.abs(distance-x_nodes[jj-1]) max_= np.abs(distance - x_nodes[jj+1]) if min_<max_: index_array [ii]= jj-1 else: index_array [ii]=jj break _tema=[] for ii in range(len(index_array )): a_= block_model[:, int(index_array [ii])] _tema.append(a_.reshape((a_.shape[0], 1))) stationblock = np.concatenate((_tema), axis=1) return stationblock def display_infos(infos, **kws): """ Display unique element on list of array infos. :param infos: Iterable object to display. :param header: Change the `header` to other names. :Example: >>> from watex.geology.stratigraphic import display_infos >>> ipts= ['river water', 'fracture zone', 'granite', 'gravel', 'sedimentary rocks', 'massive sulphide', 'igneous rocks', 'gravel', 'sedimentary rocks'] >>> display_infos('infos= ipts,header='TestAutoRocks', size =77, inline='~') """ inline =kws.pop('inline', '-') size =kws.pop('size', 70) header =kws.pop('header', 'Automatic rocks') if isinstance(infos, str ): infos =[infos] infos = list(set(infos)) print(inline * size ) mes= '{0}({1:02})'.format(header.capitalize(), len(infos)) mes = '{0:^70}'.format(mes) print(mes) print(inline * size ) am='' for ii in range(len(infos)): if (ii+1) %2 ==0: am = am + '{0:>4}.{1:<30}'.format(ii+1, infos[ii].capitalize()) print(am) am='' else: am ='{0:>4}.{1:<30}'.format(ii+1, infos[ii].capitalize()) if ii ==len(infos)-1: print(am) print(inline * size ) def fit_default_layer_properties(layers, dbproperties_= ['hatch', 'colorMPL']): """ Get the default layers properties implemented in database. For instance get the hatches and colors from given layers implemented in the database by given the database `dbproperties_`. :param layers: str or list of layers to retrieve its properties If specific property is missing , ``'none'`` will be return :param db_properties_: str, list or database properties :return: property items sanitized :Example: >>> import watex.geology.stratigraphic as GS >>> GS.fit_default_layer_properties( ... ['tuff', 'granite', 'evaporite', 'saprock'])) ... (['none', 'none', 'none', 'none'], ... [(1.0, 1.0, 0.0), (1.0, 0.0, 1.0), (0.0, 0.0, 1.0), ... (1.0, 0.807843137254902, 1.0)]) """ # for consistency check again and keep the DB properties untouchable. dbproperties_= ['colorMPL' if g.lower().find('mpl') >=0 else 'FGDC' if g.lower()=='fgdc'else g.lower() for g in dbproperties_] if isinstance(layers , str): layers =[layers] assert_gl = ['yes' if isinstance(ll, str) else 'no' for ll in layers] if not len(set(assert_gl)) ==1: raise TypeError("Wrong given layers. Names should be a string!") if 'name' or'__description' not in dbproperties_: dbproperties_.insert(0, 'name') __gammaProps = GeoStrataModel._getProperties(dbproperties_) r_props =[['none' for i in layers] for j in range(len(__gammaProps)-1)] for k , l in enumerate(layers): if l in __gammaProps[0] : ix= __gammaProps[0].index(l) for kk, gg in enumerate(r_props) : gg[k]= __gammaProps[1:][kk][ix] r_props = [_sanitize_db_items(r_props[k], force=True ) for k in range(len (r_props))] return tuple(r_props) def __build_ps__token(obj): """ Build a special token for each GeoStrataModel model. Please don't edit anything here. Force editing is your own risk.""" import random random.seed(42) __c =''.join([ i for i in [''.join([str(c) for c in obj.crmSites.shape]), ''.join([str(n) for n in obj.nmSites.shape]), ''.join([l for l in obj.input_layers]) + str(len(obj.input_layers)), str(len(obj.tres))] + [''.join( [str(i) for i in [obj._eta, obj.beta, obj.doi,obj.n_epochs, obj.ptol, str(obj.z.max())]])]]) __c = ''.join(random.sample(__c, len(__c))).replace(' ', '') n= ''.join([str(getattr(obj, f'{l}'+'_fn')) for l in ['model', 'iter', 'mesh', 'data']]) n = ''.join([s.lower() for s in random.sample(n, len(n))] ).replace('/', '').replace('\\', '') return ''.join([n, __c]).replace('.', '') def fit_tres(lns, tres, autorocks, force=False, **kws): """ Read and get the resistivity values from tres that match the the given layers. Find the layers and their corresponding resistivity values from the database especially when values in the TRES and LN are not the same length. It's not possible to match each value to its correspinding layer name. Therefore the best approach is to read the TRES and find the layer name in the database based on the closest value. :param lns: list of input layers :param tres: list of input true resistivity values :param autorocks: list of the autorocks found when building the new model. :param force: bool, force fitting resistivity value with the rocks in the database whenever the size of rocks match perfectly the number of the rocks. Don't do that if your are sure that the TRES provided fit the layers in LNS. :param kws: is database column property. Default is `['electrical_props', '__description']` :returns: new pseudolist contains the values of rocks retrived from database as well as it closest value in TRES. """ def flip_back_to_tuple(value , substitute_value, index=1): """convert to tuple to list before assign values and reconvert to tuple after assignment for consistency. `flip_back_to_tuple` in line in this code is the the same like : newTRES[ii] = list(newTRES[ii]) newTRES[ii][1] = val newTRES[ii] = tuple (newTRES[ii]) """ value = list(value) if index is not None: value[index] = substitute_value else : value = substitute_value return tuple (value) ix = len(autorocks) lns0, tres0, rlns, rtres= lns_and_tres_split(ix, lns, tres) if len(lns0) > len(tres0): msg= ''.join(['Number of given layers `{0}` should not be greater ', ' than the number of given resistivity values` {1}`.']) msg= msg.format(len(lns0), len(tres0)) n_rock2drop = len(tres0)-len(lns0) msg += f" Layer{'s' if abs(n_rock2drop)>1 else ''} "\ f"{smart_format(lns0[n_rock2drop:])} should be ignored." lns0 = lns0[: n_rock2drop] warnings.warn(msg) _logger.debug(msg) if sorted([n.lower() for n in lns0] ) == sorted([n.lower() for n in lns]): if not force: return lns0, tres0, [(a, b) for a , b in zip(lns0, tres0)] r0 =copy.deepcopy(tres0) # for consistency, lowercase the layer name # get the properties [name and electrical properties] # from geoDataBase try to build new list with none values # loop for all layer and find their index then # their elctrical values # if name exist in database then: # loop DB layers names # if layer is found then get it index lns0 =[ln.lower().replace('_', ' ') for ln in lns0 ] _gammaRES, _gammaLN = GeoStrataModel._getProperties(**kws) newTRES =[None for i in tres0] temp=list() for ii, name in enumerate(lns0) : if name in _gammaLN: ix = _gammaLN.index (name) temp.append((name,_gammaRES[ix])) # keep the lns0 rocks that exists in the database # and replace the database value by the one given #in tres0 and remove the tres value with # unknow layer by its corresponding value. if len(temp)!=0: for name, value in temp: ix, val = get_closest_gap (value= value, iter_obj=tres0) newTRES[ix]= (name, val) tres0.pop(ix) # try to set the values of res of layer found in # the database is not set = 0 by their corresponding # auto -layers. if value is in TRES. We consider #that the rocks does not exist and set to None for ii, nvalue in enumerate(newTRES): try: iter(nvalue[1]) except: if nvalue is not None and nvalue[1]==0. : newTRES[ii]= None continue else: # if iterable get the index and value of layers # remove this values in the tres ix, val = get_closest_gap (value=nvalue[1], iter_obj=tres0) newTRES[ii] = flip_back_to_tuple (newTRES[ii], val, 1) tres0.pop(ix) for ii, nvalue in enumerate(tres0): ix,_val= get_closest_gap (value=nvalue,status ='isoff', iter_obj=_gammaRES, condition_status =True, skip_value =0 ) # get the index of this values in tres index = r0.index (_val) newTRES[index] = (_gammaLN[ix], nvalue) # create for each tres its pseudorock name # and pseudorock value pseudo_lns = [a [0] for a in newTRES] + rlns pseudo_tres = [b[1] for b in newTRES] + rtres newTRES += [(a, b) for a , b in zip(rlns, rtres)] return pseudo_lns , pseudo_tres , newTRES def quick_read_geomodel(lns=None, tres=None): """Quick read and build the geostratigraphy model (NM) :param lns: list of input layers :param tres: list of input true resistivity values :Example: >>> import watex.geology.stratigraphic as GM >>> obj= GM.quick_read_geomodel() >>> GC.fit_tres(obj.input_layers, obj.tres, obj.auto_layer_names) """ PATH = 'data/occam2D' k_ =['model', 'iter', 'mesh', 'data'] try : INVERS_KWS = { s +'_fn':os.path.join(PATH, file) for file in os.listdir(PATH) for s in k_ if file.lower().find(s)>=0 } except : INVERS_KWS=dict() TRES=[10, 66, 70, 100, 1000, 3000]# 7000] #[10, 70, 100, 1000, 3000] #[10, 66, 70, 100, 1000, 2000, 3000, 7000, 15000 ] LNS =['river water','fracture zone', 'MWG', 'LWG', 'granite', 'igneous rocks', 'basement rocks'] lns = lns or LNS tres= tres or TRES if len(INVERS_KWS) ==0: _logger.error("NoneType can not be read! Need the basics Occam2D" f" inversion {smart_format(k_)} files.") raise ValueError("NoneType can not be read! Need the basics Occam2D" f" inversion {smart_format(k_)} files.") geosObj = GeoStrataModel( input_resistivities=tres, input_layers=lns,**INVERS_KWS) geosObj._createNM() return geosObj GeoStrataModel.__doc__="""\ Create a stratigraphic model from inversion models blocks. The Inversion model block is two dimensional array of shape (n_vertical_nodes, n_horizontal_nodes ). Can use external packages to build blocks and provide the 2Dblock into `crm` parameter. The challenge of this class is firstly to delineate with much accuracy the existing layer boundary (top and bottom) and secondly, to predict the stratigraphy log before the drilling operations at each station. Moreover, it’s a better way to select the right drilling location and also to estimate the thickness of existing layer such as water table layer as well as to figure out the water reservoir rock in the case of groundwater exploration. Note that if the model blocks is build from externam softwares. You may as in the keywordsr argumments of `GeoStrataModel` the following attributes: - model_res : 2D resitivity model of (n_vertical_nodes, n_horizontal_nodes) If `crm` is given , no need to provided it. - geo_depth: Is the depth the surface to the bottom of each layer that composed the pseudo-boreholes. Note the N-vertical nodes values - input_resistivities: list of input resistivities. If the `tres` is passed not need to given. Parameters ---------- crm : ndarray of shape(n_vertical_nodes, n_horizontal_nodes ), Array-like of inversion two dimensional model blocks. Note that the `n_vertical_nodes` is the node from the surface to the depth while `n_horizontal_nodes` must include the station location (sites/stations) beta: int, Value to divide into the CRM blocks to improve the computation times. default is`5` n_epochs: int, Number of iterations. default is `100` tres: array_like, Truth values of resistivities. Refer to :class:`~.geodrill.Geodrill` for more details ptols: float, Existing tolerance error between the `tres` values given and the calculated resistivity in `crm` input_layers: list or array_like True input_layers names : geological informations of collected in the area. kind: str Kind of model function to compute the best fit model to replace the value in `crm` . Can be 'linear' or 'polynomial'. if `polynomial` is set, specify the `degree. Default is 'linear'. alpha: float , Learning rate for gradient descent computing. *Default* is ``1e+4`` for linear. If `kind` is set to `polynomial` the default value should be `1e-8`. degree: int, Polynomial function degree to implement gradient descent algorithm. If `kind` is set to `Polynomial` the default `degree` is ``3.`` and details sequences **nm**: ndarray The NM matrix with the same dimension with `crm` model blocks. Examples ---------- >>> from watex.geology.stratigraphic import GeoStrataModel >>> # Works with occam2d inversion files if 'pycsamt' or 'mtpy' is installed >>> # will call the module Geodrill from pycsamt to make occam2d 2D resistivity >>> # block for our demo. It presumes pycsamt is installed. >>> from pycsamt.geodrill.geocore import Geodrill >>> path=r'data/inversfiles/inver_res/K4' # path to inversion files >>> inversion_files = {'model_fn':'Occam2DModel', ... 'mesh_fn': 'Occam2DMesh', ... "iter_fn":'ITER27.iter', ... 'data_fn':'OccamDataFile.dat' ... } >>> input_resistivity_values =[10, 66, 70, 180, 1000, 2000, ... 3000, 50, 7] >>> input_resistivity_values =[10, 66, 70, 180, 1000, 2000, ... 3000, 7000, 15000 ] >>> input_layer_names =['river water', 'fracture zone', 'granite'] >>> inversion_files = {key:os.path.join(path, vv) for key, vv in inversion_files.items()} >>> gdrill= Geodrill (**inversion_files, input_resistivities=input_resistivity_values ) >>> # we can collect the 'model_res' and 'geo_depth_attributes' from >>> # `gdrill object` and passed to 'GeoStrataModel' fit method as >>> geosObj = GeoStrataModel(ptol =0.1).fit(crm = model_res , input_resistivities=gdrill.input_resistivity_values geo_depth= gdrill.geo_depth ) >>> zmodel = geosObj._zmodel >>> geosObj.nm # resistivity 2D model block is constructed Notes ------ Modules work properly with occam2d inversion files if 'pycsamt' or 'mtpy' is installed and inherits the `Base package` which works with occam2d model. Occam2d inversion files are also acceptables for building model blocks. However the MODEM resistivity files development is still ongoing """ # def assert_len_layers_with_resistivities( # real_layer_names:str or list, real_layer_resistivities: float or list ): # """ # Assert the length of of the real resistivites with their # corresponding layers. If the length of resistivities is larger than # the layer's names list of array, the best the remained resistivities # should be topped up to match the same length. Otherwise if the length # of layers is larger than the resistivities array or list, layer'length # should be reduced to fit the length of the given resistivities. # Parameters # ---------- # * real_layer_names: array_like, list # list of input layer names as real # layers names encountered in area # * real_layer_resistivities :array_like , list # list of resistivities get on survey area # Returns # -------- # list # real_layer_names, new list of input layers # """ # # for consistency put on string if user provide a digit # real_layer_names =[str(ly) for ly in real_layer_names] # if len(real_layer_resistivities) ==len(real_layer_names): # return real_layer_names # elif len(real_layer_resistivities) > len(real_layer_names): # # get the last value of resistivities to match the structures # # names and its resistivities # sec_res = real_layer_resistivities[len(real_layer_names):] # # fetch the name of structure # geos =Geodrill.get_structure(resistivities_range=sec_res) # if len(geos)>1 : tm = 's' # else :tm ='' # print(f"---> Temporar{'ies' if tm=='s' else 'y'} " # f"{len(geos)} geological struture{tm}." # f" {'were' if tm =='s' else 'was'} added." # " Uncertained layers should be ignored.") # real_layer_names.extend(geos) # return real_layer_names # elif len(real_layer_names) > len(real_layer_resistivities): # real_layer_names = real_layer_names[:len(real_layer_resistivities)] # return real_layer_names