Source code for watex.methods.em

# -*- coding: utf-8 -*-
#   Licence:BSD 3-Clause
#   Author: LKouadio <etanoyau@gmail.com>

"""
:mod:`~watex.methods.em` module is related for a few meter exploration in 
the case of groundwater exploration. Module provides some basics processing 
steps for EMAP data filtering and remove noises. 
"""
from __future__ import annotations 
import os
import re
import functools 
import numpy as np 

from .._watexlog import watexlog
from ..edi import Edi 
from ..exceptions import ( 
    EDIError, 
    TopModuleError, 
    NotFittedError, 
    EMError,
) 
from ..externals.z import Z as EMz 
from ..utils.funcutils import ( 
    _assert_all_types, 
    make_ids, 
    show_stats, 
    fit_by_ll, 
    reshape, 
    smart_strobj_recognition, 
    repr_callable_obj,
    remove_outliers, 
    normalizer
    ) 
from ..utils.exmath import ( 
    scalePosition, 
    fittensor,
    get2dtensor, 
    betaj, 
    interpolate1d,
    interpolate2d, 
    get_full_frequency, 
    rhoa2z, 
    z2rhoa, 
    mu0, 
    
    )
from ..utils.coreutils import ( 
    makeCoords, 
    )
from ..property import (
    IsEdi
    )
from ..site import Location 
from .._typing import ( 
    ArrayLike, 
    Optional, 
    List,
    Tuple, 
    Dict, 
    NDArray, 
    DType,
    EDIO,
    T,
    F, 
    )
from ..utils._dependency import ( 
    import_optional_dependency
    )
from ..utils.validator import ( 
    _validate_tensor, 
    )

_logger = watexlog.get_watex_logger(__name__)

__all__ =['EM',
          'Processing',
          ]

[docs]class EM(IsEdi): """ Create EM object as a collection of EDI-file. Collect edifiles and create an EM object. It sets the properties from audio-magnetotelluric. The two(2) components XY and YX will be set and calculated.Can read MT data instead, however the full handling transfer function like Tipper and Spectra is not completed. Use other MT softwares for a long periods data. Parameters ------------- survey_name: str location name where the date where collected . If surveyname is None can chech on edifiles. Attributes ----------- ediObjs_: Array-like of shape (N,) array of the collection of edifiles read_sucessfully data_: Array-like of shape (N, ) array of all edifiles feed in the `EM` modules whatever sucessuffuly read or not. edinames_: array-like of shape (N,) array of all edi-names sucessfully read edifiles_: array of shape (N, ) array of all edifiles if given. freqs_: array-like of shape (N, ) Array of the frequency range from EDIs refreq_: float, Reference refrequency for data correction. Note the reference frequency is the highest frequency with clean data. Properties ------------ longitude: array-like, shape (N,) longitude coordinate values collected from EDIs latitude: array-like, shape (N, ) Latitude coordinate values collected from EDIs elevation: array-like, shape (N,) Elevation coordinates collected from EDIs """ def __init__(self, survey_name:str =None , verbose=0): self._logging = watexlog.get_watex_logger(self.__class__.__name__) self.survey_name =survey_name self.Location_= Location() self.verbose=verbose self._latitude = None self._longitude=None self._elevation= None @property def latitude(self): return self._latitude @latitude.setter def latitude(self, latitude): self._assertattr ( 'latitude', latitude, self.Location_.lat ) @property def longitude(self): return self._longitude @longitude.setter def longitude(self, longitude): self._assertattr ( 'longitude', longitude, self.Location_.lon ) @property def elevation(self): return self._elevation @elevation.setter def elevation(self, elevation): self._assertattr ('elevation', elevation, self.Location_.elev ) @property def stnames(self): return self._station_names @stnames.setter def stnames (self, edi_stations): try : _assert_all_types(edi_stations, list, tuple, np.ndarray) except : self._station_names = self.id_ else : self._station_names = list( map(lambda n: n.replace('.edi', ''), edi_stations)) if len(set (self._station_names)) ==1 : self._station_names = self.id if len(self._station_names) != len(self.ediObjs_): self._station_names = self.id
[docs] def is_valid (self, obj: str | EDIO )-> Edi : """Assert that the given argument is an EDI -object from modules EDI or EDI from pycsamt and MTpy packages. A TypeError will occurs otherwise. Parameters ------------ obj: str, :class:`pycsamt.core.edi.Edi` or :class:`mtpy.core.edi.Edi` Full path EDI file or `pycsamt`_ or `MTpy`_ objects. Return ------- obj:str, :class:`pycsamt.core.edi.Edi` or :class:`mtpy.core.edi.Edi` Identical object after asserting. """ emsg=("{0!r} object detected while the package is not installed" " yet. To pass ['pycsamt' | 'mtpy'] EDI-object to 'EM'" " class for basic implementation,prior install the {0!r}" " first. Use 'pip' or 'conda' for installation." ) IsEdi.register (Edi) if isinstance(obj, str): obj = Edi().fit(obj) if "pycsamt" in str(obj): try : import_optional_dependency ("pycsamt") except ImportError: raise TopModuleError(emsg.format("pycsamt")) else : #XXX TODO : prior revising the pkg structure # to pycsamt.core since ff subpackage does # no longer exist in pycsamt newest version from pycsamt.ff.core import edi IsEdi.register (edi.Edi ) elif "mtpy" in str(obj): try : import_optional_dependency ("mtpy") except ImportError: raise TopModuleError(emsg.format("mtpy")) else : from mtpy.core import edi IsEdi.register (edi.Edi) try : obj = _assert_all_types ( obj, IsEdi, objname="Wrong Edi-Objects or EDI-path, ") except AttributeError: # force checking instead obj = _assert_all_types (obj, Edi, objname="EDI") return obj
def _assertattr (self, name, value, locprop ): """ Read and set attributes from location object . For instance:: >>> name = 'longitude',object = self.Location.lon >>> self._longitude = np.array ( ... list (map ( lambda o : self.Location.lon , ... longitude))) """ if isinstance(value, (float, int)): value= [value] s=np.zeros_like(value) for i, p in enumerate(value): locprop = p ; s[i] = locprop setattr(self, f'_{name}', s ) @property def inspect (self): """ Inspect object whether is fitted or not""" msg = ( "{obj.__class__.__name__} instance is not fitted yet." " Call 'fit' with appropriate arguments before using" " this method" ) if not hasattr (self, 'ediObjs_'): raise NotFittedError(msg.format( obj=self) ) return 1
[docs] def fit (self, data: str|List[EDIO] )->"EM": """ Assert and make EM object from a collection EDIs. Parameters ----------- data : str, or list or :class:`pycsamt.core.edi.Edi` object Full path to EDI files or collection of EDI-objects Returns -------- self: EM object from a collection EDIs Examples -------- >>> from watex.methods.em import EM >>> emObjs = EM().fit (r'data/edis') >>> emObjs.ediObjs_ ... """ def _fetch_headinfos (cobj, attr): """ Set attribute `attr` from collection object `cobj`.""" return list(map (lambda o: getattr(o, attr), cobj)) self._logging.info ( 'Read, collect EDI-objects from <%s>'% self.__class__.__name__) rfiles =[] # count number of reading files. self.data_ = data # if ediObjs is not None: # self.ediObjs_ = ediObjs if isinstance(self.data_, str): # single edi and get the path if os.path.isfile (self.data_): if not self._assert_edi(self.data_, False): raise EDIError(" Unrecognized SEG- EDI-file. Follow the " "[Wight, D.E., Drive, B., 1988.] to build a" " correct EDI- file.") #edipath = os.path.dirname (self.data_) rfiles=[os.path.dirname (self.data_)] self.ediObjs_ = np.array ([self.is_valid(self.data_)]) #self.data_=[self.data_] elif os.path.dirname (self.data_): # path is given and read rfiles= os.listdir(self.data_) self.data_= sorted ([ os.path.join(self.data_, edi ) for edi in rfiles if edi.endswith ('.edi')]) else : raise EDIError (f"Object {self.data_!r} is not an EDI object.") elif isinstance(self.data_, (tuple, list)): self.data_= sorted(self.data_) rfiles = self.data_.copy() if self.data_ is not None: try : self.ediObjs_ = list(map( lambda o: self.is_valid(o), self.data_)) except: # in the case a single object is given at the param # the list-of edifiles rather than ediObjs self.ediObjs_ = list(map( lambda o: self.is_valid(o), [self.data_])) # for consistency if self.ediObjs_ is not None: if not isinstance (self.ediObjs_,(list,tuple, np.ndarray)): self.ediObjs_ = [self.ediObjs_] rfiles = self.ediObjs_ try: self.ediObjs_ = list(map( lambda o: self.is_valid(o), self.ediObjs_)) except EDIError: raise EMError ("Expect a list of EDI objects not " f"{type(self.ediObjs_[0]).__name__!r}") # sorted ediObjs from latlong self.ediObjs_ , self.edinames = fit_by_ll(self.ediObjs_) # reorganize edis in lon lat order. self.edifiles = list(map(lambda o: o.edifile , self.ediObjs_)) if self.verbose: try: show_stats(rfiles, self.ediObjs_) except: pass #--get coordinates values and correct lon_lat ------------ lat = _fetch_headinfos(self.ediObjs_, 'lat') lon = _fetch_headinfos(self.ediObjs_, 'lon') elev = _fetch_headinfos(self.ediObjs_, 'elev') lon,*_ = scalePosition(lon) if len(self.ediObjs_)> 1 else lon lat,*_ = scalePosition(lat) if len(self.ediObjs_)> 1 else lat # ---> get impedances, phase tensor and # resistivities values form ediobject self._logging.info('Setting impedances and phases tensors and' 'resistivity values from a collection ediobj.') zz= [edi_obj.Z.z for edi_obj in self.ediObjs_] zz_err= [edi_obj.Z.z_err for edi_obj in self.ediObjs_] rho= [edi_obj.Z.resistivity for edi_obj in self.ediObjs_] rho_err= [edi_obj.Z.resistivity_err for edi_obj in self.ediObjs_] phs= [edi_obj.Z.phase for edi_obj in self.ediObjs_] phs_err= [edi_obj.Z.phase_err for edi_obj in self.ediObjs_] # Create the station ids self.id = make_ids(self.ediObjs_, prefix='S') self.longitude= lon self.latitude= lat self.elevation= elev try : self.elevation= self.elevation.astype (np.float64) except :pass # get frequency array from the first value of edifiles. self.freq_array = self.ediObjs_[0].Z.freq #---> set into dictionnary the impdance and phase Tensors self._z = {key:value for key , value in zip (self.id, zz)} self._z_err ={key:value for key , value in zip (self.id, zz_err)} self._res = {key:value for key , value in zip (self.id, rho)} self._res_err ={key:value for key , value in zip (self.id, rho_err)} self._phs ={key:value for key , value in zip (self.id, phs)} self._phs_err ={key:value for key , value in zip (self.id, phs_err)} self.stnames = self.edinames self.freqs_ = self.getfullfrequency () self.refreq_ = self.getreferencefrequency() return self
[docs] def rewrite ( self, *, by: str = 'name', prefix: Optional[str] = None, dataid: Optional[List[str]] =None, savepath: Optional[str] = None, how: str ='py', correct_ll: bool =True, make_coords: bool =False, reflong: Optional[str | float] =None, reflat: Optional[str | float]=None, step: str ='1km', edi_prefix: Optional[str] =None, export: bool =True, **kws )-> "EM": """ Rewrite Edis, correct station coordinates and dipole length. Can rename the dataid, customize sites and correct the positioning latitudes and longitudes. Parameters ------------ dataid: list list of ids to rename the existing EDI-dataid from :class:`Head.dataid`. If given, it should match the length of the collections of `ediObjs`. A ValueError will occurs if the length of ids provided is out of the range of the number of EDis objects by: str Rename according to the inner module Id. Can be ``name``, ``id``, ``number``. Default is ``name``. If :attr:`~.EM.survey_name` is given, the whole survey name should be overwritten. Conversly, the argument ``ix`` outputs the number of formating stations excluding the survey name. prefix: str Prefix the number of the site. It could be the abbreviation of the survey area. correct_ll: bool, Write the scaled positions( longitude and latitude). Default is ``True``. make_coords: bool Useful to hide the real coordinates of the sites by generating a 'fake' coordinates for a specific purposes. When setting to ``True`` be sure to provide the `reflong` and `reflat` values otherwise and error will occurs. reflong: float or string Reference longitude in degree decimal or in DD:MM:SS for the site considered as the origin of the lamdmark. reflat: float or string Reference latitude in degree decimal or in DD:MM:SS for the reference site considered as the landmark origin. step: float or str Offset or the distance of seperation between different sites in meters. If the value is given as string type, except the ``km``, it should be considered as a ``m`` value. Only meters and kilometers are accepables. Default value of seperation between the site is ``1km``. savepath: str Full path of the save directory. If not given, EDIs should be outputed in the created directory. how: str The way to index the stations. Default is the Python indexing i.e. the counting starts by 0. Any other value will start counting the site from 1. export: bool, Export new edi-files kws: dict Additionnal keyword arguments from `~Edi.write_edifile` and :func:`watex.utils.coreutils.makeCoords`. Returns -------- EM: :class:`~.EM` instance Returns ``self`` for easy method chaining. Examples --------- >>> from watex.methods.em import EM >>> edipath = r'data/edis' >>> savepath = r'/Users/Daniel/Desktop/ediout' >>> emObjs = EM().fit(edipath) >>> emObjs.rewrite_edis(by='id', edi_prefix ='b1', savepath =savepath) >>> # >>> # second example to write 7 samples of edi from >>> # Edi objects inner datasets >>> # >>> import watex as wx >>> edi_sample = wx.fetch_data ('edis', key ='edi', samples =7, return_data =True ) >>> emobj = wx.EM ().fit(edi_sample) >>> emobj.rewrite(by='station', prefix='PS') """ def replace_reflatlon ( olist , nval, kind ='reflat'): """ Replace Definemeaseurement Reflat and Reflong by the interpolated values. :param olist: old list composing the read EDI measurement infos. :type olist: list :param nval: New reflat or reflong list. Mostly is the DD:MM:SS value interpolated. :param kind: Type of measurement to write. :type kind:str :return: List of old element replaced. :rtype: list """ try : for ii, comp in enumerate (olist): if comp.strip().find(kind)>=0: olist[ii]= f' {kind}={nval}\n' break except: pass return olist regex = re.compile('\d+', re.IGNORECASE) by = str(by).lower() if by.find('survey')>=0 : by ='name' prefix = str(prefix) self.inspect self.id = make_ids(self.ediObjs_, prefix='S', how= how ) if how !='py': self.id = make_ids(self.ediObjs_, prefix='S', cmode =None) if dataid is None: if prefix !='None' : dataid = list(map(lambda s: s.replace('S', prefix), self.id)) elif by =='name': # get the first name of dataId of the EDI ediObjs and filled # the rename dataId. remove the trail'_' name = self.survey_name or regex.sub( '', self.ediObjs_[0].Head.dataid).replace('_', '') # remove prefix )'S' and keep only the digit dataid = list(map(lambda n: name + n, regex.findall( ''.join(self.id)) )) elif by.find('num')>=0: dataid = regex.findall(''.join(self.id)) elif by =='id': dataid = self.id elif by =='ix': dataid = list(map( lambda x: str(int(x)), regex.findall(''.join(self.id)))) else : dataid = list(map(lambda obj: obj.Head.dataid, self.ediObjs_)) elif dataid is not None: if not np.iterable(dataid): raise ValueError('DataId parameter should be an iterable ' f'object, not {type(dataid).__name__!r}') if len(dataid) != len(self.ediObjs_): raise ValueError ( 'DataId length must have the same length with the number' ' of collected EDIs({0}). But {1} {2} given.'.format( len(self.ediObjs_), len(dataid), f"{'is' if len(dataid)<=1 else 'are'}")) if make_coords: if (reflong or reflat) is None: raise ValueError('Reflong and reflat params must not be None!') self.longitude, self.latitude = makeCoords( reflong = reflong, reflat= reflat, nsites= len(self.ediObjs_), step = step , **kws) # clean the old main Edi section info and # and get the new values if correct_ll or make_coords: londms,*_ = scalePosition(self.longitude) latdms,*_ = scalePosition(self.latitude) # collect new ediObjs cobjs = np.zeros_like (self.ediObjs_, dtype=object ) for k, (obj, did) in enumerate(zip(self.ediObjs_, dataid)): obj.Head.edi_header = None obj.Head.dataid = did obj.Info.ediinfo = None if correct_ll or make_coords: obj.Head.long = float(self.longitude[k]) obj.Head.lat = float(self.latitude[k]) obj.Head.elev = float(self.elevation[k]) oc = obj.DefineMeasurement.define_measurement oc= replace_reflatlon(oc, nval= latdms[k]) oc= replace_reflatlon(oc, nval= londms[k], kind='reflong') oc = replace_reflatlon(oc, nval= self.elevation[k], kind='refelev') obj.DefineMeasurement.define_measurement = oc # Empty the previous MTEMAP infos and # fetch the attribute values newly set. obj.MTEMAP.mtemapsectinfo =None obj.MTEMAP.sectid= did if export: obj.write_edifile( savepath = savepath ,new_edifilename = edi_prefix, **kws) cobjs[k] = obj self.ediObjs_ = cobjs return self
[docs] def getfullfrequency ( self, to_log10:bool =False )-> ArrayLike[DType[float]]: """ Get the frequency with clean data. The full or plain frequency is array frequency with no missing data during the data collection. Note that when using |NSAMT|, some data are missing due to the weak of missing frequency at certain band especially in the attenuation band. Parameters ----------- to_log10: bool, default=False, export frequency to base 10 logarithm Returns -------- f: Arraylike 1d of shape(N, ) frequency with clean data. Out of `attenuation band` if survey is completed with |NSAMT|. See Also -------- watex.utils.exmath.get_full_frequency: Get the complete frequency with no missing signals. Example ------- >>> import watex as wx >>> edi_sample = wx.fetch_data ('edis', return_data=True, samples = 12 ) >>> wx.EM().fit(edi_sample).getfullfrequency(to_log10 =True ) array([4.76937733, 4.71707639, 4.66477553, 4.61247466, 4.56017382, 4.50787287, 4.45557204, 4.40327104, 4.35097021, 4.29866928, 4.24636832, 4.19406761, 4.14176668, 4.08946565, 4.03716465, ... 2.67734228, 2.62504479, 2.57274385, 2.52044423, 2.46814047, 2.41584107, 2.36353677, 2.31124512, 2.25892448, 2.20663701, 2.15433266, 2.10202186, 2.04972182, 1.99743007]) """ self.inspect return get_full_frequency (self.ediObjs_ , to_log10 = to_log10)
[docs] def make2d ( self, out:str = 'resxy', *, kind:str = 'complex' , **kws )-> NDArray[DType[float]]: """ Out 2D resistivity, phase-error and tensor matrix from a collection of EDI-objects. Matrix depends of the number of frequency times number of sites. The function asserts whether all data from all frequencies are available. The missing values should be filled by NaN. Parameters ----------- data: Path-like object or list of pycsamt.core.edi objects Collections of EDI-objects from `pycsamt`_ or full path to EDI files. out: str kind of data to output. Be sure to provide the component to retrieve the attribute from the collection object. Except the `error` and frequency attribute, the missing component to the attribute will raise an error. for instance ``resxy`` for xy component. Default is ``resxy``. kind : bool or str focuses on the tensor output. Note that the tensor is a complex number of ndarray (nfreq, 2,2 ). If set to``modulus`, the modulus of the complex tensor should be outputted. If ``real`` or``imag``, it returns only the specific one. Default is ``complex``. kws: dict Additional keywords arguments from :func:`~.getfullfrequency `. Returns -------- mat2d : np.ndarray(nfreq, nstations) the matrix of number of frequency and number of Edi-collectes which correspond to the number of the stations/sites. Examples --------- >>> from watex.methods.em import EM >>> edipath ='data/edis' >>> emObjs= EM().fit(edipath) >>> phyx = EM().make2d ('phaseyx') >>> phyx ... array([[ 26.42546593, 32.71066454, 30.9222746 ], [ 44.25990541, 40.77911136, 41.0339148 ], ... [ 37.66594686, 33.03375863, 35.75420802], [ nan, nan, 44.04498791]]) >>> phyx.shape ... (55, 3) >>> # get the real number of the yy componet of tensor z >>> zyy_r = make2d (ediObjs, 'zyx', kind ='real') ... array([[ 4165.6 , 8665.64 , 5285.47 ], [ 7072.81 , 11663.1 , 6900.33 ], ... [ 90.7099, 119.505 , 122.343 ], [ nan, nan, 88.0624]]) >>> # get the resistivity error of component 'xy' >>> resxy_err = EM.make2d ('resxy_err') >>> resxy_err ... array([[0.01329037, 0.02942557, 0.0176034 ], [0.0335909 , 0.05238863, 0.03111475], ... [3.33359942, 4.14684926, 4.38562271], [ nan, nan, 4.35605603]]) >>> phyx.shape ,zyy_r.shape, resxy_err.shape ... ((55, 3), (55, 3), (55, 3)) """ self.inspect def fit2dall(objs, attr, comp): """ Read all ediObjs and replace all missing data by NaN value. This is useful to let the arrays at each station to match the length of the complete frequency rather than shrunking up some data. The missing data should be filled by NaN values. """ zl = [getattr( ediObj.Z, f"{attr}")[tuple (_c.get(comp))] for ediObj in objs ] if name =='z': if kind =='modulus': zl = [ np.abs (v) for v in zl] zl = [fittensor(self.freqs_, ediObj.Z._freq, v) for ediObj , v in zip(objs, zl)] if kind in ('real' , 'complex') : zr = [fittensor(self.freqs_, ediObj.Z._freq, v.real) for ediObj , v in zip(objs, zl)] if kind in ('imag', 'complex'): zi= [fittensor(self.freqs_, ediObj.Z._freq, v.imag) for ediObj , v in zip(objs, zl)] if kind =='complex': zl = [ r + 1j * im for r, im in zip (zr, zi)] zl = zl if kind in ('modulus', 'complex') else ( zr if kind =='real' else zi ) else : zl = [fittensor(self.freqs_, ediObj.Z._freq, v) for ediObj , v in zip(objs, zl)] # stacked the z values alomx axis=1. return np.hstack ([ reshape (o, axis=0) for o in zl]) # read/assert edis and get the complete frequency name , m2 = _validate_tensor(out = out , kind = kind, **kws ) #=> slice index for component retreiving purpose _c= { 'xx': [slice (None, len(self.freqs_)), 0 , 0] , 'xy': [slice (None, len(self.freqs_)), 0 , 1], 'yx': [slice (None, len(self.freqs_)), 1 , 0], 'yy': [slice (None, len(self.freqs_)), 1, 1] } #==> returns mat2d freq # if m1 =='_freq': if name =='_freq': f2d = [fittensor(self.freqs_, ediObj.Z._freq, ediObj.Z._freq) for ediObj in self.ediObjs_ ] return np.hstack ([ reshape (o, axis=0) for o in f2d]) # # get the value for exportation (attribute name and components) mat2d = fit2dall(objs= self.ediObjs_, attr= name, comp= m2) return mat2d
[docs] def getreferencefrequency ( self, to_log10: bool =False ): """ Get the reference frequency from collection Edis objects. The highest frequency with clean data should be selected as the reference frequency Parameters ---------- data: list of pycsamt.core.edi.Edi or mtpy.core.edi.Edi objects Collections of EDI-objects from `pycsamt`_ to_log10: bool, outputs the reference frequency into base 10 logarithm in Hz. Returns ------- rf : float the reference frequency at the clean data in Hz Examples --------- >>> from watex.methods.em import EM >>> edipath ='data/3edis' >>> ref = EM().getreferencefrequency(edipath, to_log10=True) >>> ref ... 4.845098040014257 # in Hz References ---------- http://www.zonge.com/legacy/PDF_DatPro/Astatic.pdf """ self.inspect self.freqs_= self.getfullfrequency () # fit z and find all missing data from complete frequency f # we take only the component xy for fitting. zxy = [fittensor(self.freqs_, ediObj.Z._freq, ediObj.Z.z[:, 0, 1].real) for ediObj in self.ediObjs_ ] # stacked the z values alomx axis=1. arr2d = np.hstack ([ reshape (o, axis=0) for o in zxy]) ix_nan = reshape (np.argwhere(np.isnan(arr2d).any(axis =1) )) # create bool array and mask the row of NaN mask = np.full_like (self.freqs_, fill_value = True , dtype=bool) mask[[*ix_nan] ] = False # get the reference frequency and index return self.freqs_ [mask].max() if not to_log10 else np.log10( self.freqs_ [mask].max())
[docs] def exportedis ( self, ediObj: EDIO, new_Z: NDArray[DType[complex]], savepath:str =None, **kws )-> "EDIO" : """ Export new EDI files from the former object with a given new impedance tensors. The export is assumed a new output EDI resulting from multiples corrections applications. Parameters ----------- ediObj: str or :class:`watex.edi.Edi` Full path to Edi file/object or object from `pycsamt`_ or `MTpy`_ new_Z: ndarray (nfreq, 2, 2) Ndarray of impendance tensors Z. The tensor Z is 3D array composed of number of frequency `nfreq`and four components (``xx``, ``xy``, ``yx``, and ``yy``) in 2X2 matrices. The tensor Z is a complex number. Returns -------- ediObj from pycsamt.core.edi.Edi """ ediObj = self.is_valid(ediObj) ediObj.write_new_edifile( new_Z=new_Z, savepath = savepath , **kws) return ediObj
def __repr__(self): """ Pretty format for programmer guidance following the API... """ return repr_callable_obj (self, skip =('edifiles', 'freq_array', 'id')) def __getattr__(self, name): rv = smart_strobj_recognition(name, self.__dict__, deep =True) appender = "" if rv is None else f'. Do you mean {rv!r}' raise AttributeError ( f'{self.__class__.__name__!r} object has no attribute {name!r}' f'{appender}{"" if rv is None else "?"}' )
class _zupdate(EM): """ A decorator for impedance tensor updating. Update a Z object from each EDI object composing the collection objects and output a new EDI-files if `option` is set to ``write``. :param option: str - kind of action to perform with new Z collection. When `option` is set to ``write``. The new EDI -files are exported. Any other values should return only the updated impedance tensors. :returns: A collection of :class:`watex.externals.z.Z` impedance tensor objects. """ def __init__(self, option: str = 'write'): self.option = option def __call__ (self, func:F): @functools.wraps (func) def new_func ( *args, **kws): """ Wrapper to make new Z. The updated Z is a collection object from ':class:`pycsamt.core.z.Z` """ (ediObjs , freq, z_dict), kwargs = func (*args, **kws) # pop the option argument if user provides it option = kwargs.pop('option', None) self.option = str(option).lower() or self.option # create an empty array to collect each new Z object Zc = np.empty((len(ediObjs), ), dtype = object ) for kk in range (len(ediObjs)): # create a new Z object for each Edi Z= self._make_zObj(kk,freq=freq, z_dict = z_dict ) if self.option =='write': self.exportedis(ediObj=ediObjs[kk] , new_Z=Z, **kwargs) Zc[kk] =Z return Zc if self.option !='write' else None return new_func def _make_zObj ( self, kk: int , *, freq: ArrayLike[DType[float]], z_dict: Dict[str, NDArray[DType[complex]]] )-> NDArray[DType[complex]]: """ Make new Z object from frequency and dict tensor component Z. :param kk: int index of routine to retrive the tensor data. It may correspond of the station index. :param freq: array-like full frequency of component :param z_dict: dict, dictionnary of all tensor component. """ Z = EMz( z_array=np.zeros((len(freq ), 2, 2),dtype='complex'), z_err_array=np.zeros((len(freq), 2, 2)), freq=freq ) zxx = z_dict.get('zxx') zxy = z_dict.get('zxy') zyx = z_dict.get('zyx') zyy = z_dict.get('zyy') # dont raise any error if the component # does not exist. if zxx is not None: Z.z[:, 0, 0] = reshape (zxx[:, kk], 1) if zxy is not None: Z.z[:, 0, 1] = reshape (zxy[:, kk], 1) if zyx is not None: Z.z[:, 1, 0] = reshape (zyx[:, kk], 1) if zyy is not None: Z.z[:, 1, 1] = reshape (zyy[:, kk], 1) # set the z_err zxx_err = z_dict.get('zxx_err') zxy_err = z_dict.get('zxy_err') zyx_err = z_dict.get('zyx_err') zyy_err = z_dict.get('zyy_err') if zxx_err is not None: Z.z_err[:, 0, 0] = reshape (zxx_err[:, kk], 1) if zxy_err is not None: Z.z_err[:, 0, 1] = reshape (zxy_err[:, kk], 1) if zyx_err is not None: Z.z_err[:, 1, 0] = reshape (zyx_err[:, kk], 1) if zyy_err is not None: Z.z_err[:, 1, 1] = reshape (zyy_err[:, kk], 1) Z.compute_resistivity_phase() return Z
[docs]class Processing (EM) : """ Base processing of EM object Fast process EMAP and AMT data. Tools are used for data sanitizing, removing noises and filtering. Parameters ---------- data: Path-like object or list of `pycsamt.core.edi.Edi` objects Collections of EDI-objects from `pycsamt`_ freqs: array-like, shape (N) Frequency array. It should be the complete frequency used during the survey area. It can be get using the :func:`getfullfrequency ` No need if ediObjs is provided. window_size : int the length of the window. Must be greater than 1 and preferably an odd integer number. Default is ``5`` component: str field tensors direction. It can be ``xx``, ``xy``,``yx``, ``yy``. If `arr2d`` is provided, no need to give an argument. It become useful when a collection of EDI-objects is provided. If don't specify, the resistivity and phase value at component `xy` should be fetched for correction by default. Change the component value to get the appropriate data for correction. Default is ``xy``. mode: str mode of the border trimming. Should be 'valid' or 'same'.'valid' is used for regular trimimg whereas the 'same' is used for appending the first and last value of resistivity. Any other argument except 'valid' should be considered as 'same' argument. Default is ``same``. method: str, default ``slinear`` Interpolation technique to use. Can be ``nearest``or ``pad``. Refer to the documentation of :doc:`~.interpolate2d`. out : str Value to export. Can be ``sfactor``, ``tensor`` for corrections factor and impedance tensor. Any other values will export the static corrected resistivity. c : int, A window-width expansion factor that must be input to the filter adaptation process to control the roll-off characteristics of the applied Hanning window. It is recommended to select `c` between ``1`` and ``4``. Default is ``2``. Examples -------- >>> import matplotlib.pyplot as plt >>> from watex.methods.em import Processing >>> edipath = 'data/edis' >>> p = Processing().fit(edipath) >>> p.window_size =2 >>> p.component ='yx' >>> rc= p.tma() >>> # get the resistivy value of the third frequency at all stations >>> p.res2d_[3, :] ... array([ 447.05423001, 1016.54352954, 1415.90992189, 536.54293994, 1307.84456036, 65.44806698, 86.66817791, 241.76592273, ... 248.29077039, 247.71452712, 17.03888414]) >>> # get the resistivity value corrected at the third frequency >>> rc [3, :] ... array([ 447.05423001, 763.92416768, 929.33837349, 881.49992091, 404.93382163, 190.58264151, 160.71917654, 163.30034875, 394.2727092 , 679.71542811, 953.2796567 , 1212.42883944, ... 164.58282866, 96.60082159, 17.03888414]) >>> plt.semilogy (np.arange (p.res2d_.shape[1] ), p.res2d_[3, :], '--', np.arange (p.res2d_.shape[1] ), rc[3, :], 'ok--') References ----------- .. [1] http://www.zonge.com/legacy/PDF_DatPro/Astatic.pdf """ def __init__( self, window_size:int =5, component:str ='xy', mode: str ='same', method:str ='slinear', out:str ='srho', c: int =2, **kws ): super().__init__(**kws) self._logging= watexlog.get_watex_logger(self.__class__.__name__) self.window_size=window_size self.component=component self.mode=mode self.method=method self.out=out self.c=c
[docs] def tma ( self, )-> NDArray[DType[float]] : """ A trimmed-moving-average filter to estimate average apparent resistivities at a single static-correction-reference frequency. The TMA filter option estimates near-surface resistivities by averaging apparent resistivities along line at the selected static-correction reference frequency. The highest frequency with clean data should be selected as the reference frequency. Parameters ---------- data: path-like object or list of pycsamt.core.edi.Edi Collections of EDI-objects from `pycsamt`_ Returns ------- rc or cf: np.ndarray, shape (N, M) EMAP apparent resistivity static shift corrected or static correction factor or impedance tensor. References ----------- .. [1] http://www.zonge.com/legacy/PDF_DatPro/Astatic.pdf """ self.inspect # assert filter arguments self.res2d_ , self.phs2d_ , self.freqs_, self.c, self.window_size, \ self.component, self.out = self._make2dblobs () # interpolate resistivity and phases self.phs2d_= interpolate2d( self.phs2d_, method =self.method) self.res2d_= interpolate2d( self.res2d_, method =self.method) # get the index of the reference frequency and collect # the resistivity and phase at that frequency ix_rf = np.int(reshape (np.argwhere (self.freqs_==self.refreq_))) # normalize log frequency and take the normalize ref freq norm_f = (np.log10(self.freqs_) / np.linalg.norm( np.log10(self.freqs_))) # compute the slope at each normalize frequency slope2d = np.arctan( (np.deg2rad(self.phs2d_) / ( np.pi /4 )) -1 ) / (np.pi /2 ) log_rho2d = np.log10 (self.res2d_) + norm_f[:, None] * slope2d # extrapolate up # replace the up frequency thin the index of rf by interpolating up log_rho2d [:ix_rf, :] = np.log10 ( self.res2d_[:ix_rf, : ]) + np.log10( np.sqrt(2)) * slope2d[:ix_rf, :] # make an array of weight factor wf wf = np.zeros_like(log_rho2d) # average logj # For each station collect a group of window-size log(rj ), # #i.e. for window size =5 station index j, i = j-2 to j+2. half_window = self.window_size //2 for ii in range(log_rho2d.shape[1]): if ii ==0 or ii ==log_rho2d.shape[1] -1: w = (log_rho2d[ :, :ii + half_window +1] if ii - half_window < 0 else log_rho2d[:, ii-half_window:] ) if self.mode =='valid' else log_rho2d[:, ii][:, None] elif ii - half_window < 0: w= log_rho2d[ :, :ii + half_window +1] elif ii + half_window +1 > log_rho2d.shape[1]: w= log_rho2d[:, ii-half_window:] else : # Discard the lowest and highest valued log(rj ) # from the group of five and average the remaining # three => avg_logj. w= log_rho2d[:, ii-half_window : half_window + ii + 1 ] try : ls = [ np.delete (w[ii, :] , np.where ( np.logical_or(w[ii, :] ==w[ii, :].max(), w[ii, :] ==w[ii, :].min()) )) for ii in range (len(w))] w = np.vstack (ls) except : # in the case the ls has some array with different length # do the average manually and set as an array of axis 1. ls = [np.sum(w[ii, :])/ len(w[ii, :] ) for ii in range(len(w))] w = np.array(ls)[:, None] # make axis = 1 wf[:, ii] = np.average (w, axis = 1) # compute the correction factor cf cf = np.power(10, wf, dtype =float)/ np. power(10, log_rho2d) rc = self.res2d_ * cf if self.out =='z': rc = rhoa2z(rc, self.phs2d_, self.freq_s) return cf if self.out =='sf' else rc
def _make2dblobs ( self, ): """ Asserts argument of |EMAP| filter and returns useful arguments. :note: created to collect argument of EMAP filters. Refer to functions :func:`~.tma`, :func:`~.flma` and :func:`~.ama` documentation. """ self.component= str(self.component).lower().strip() self.out= str(self.out).lower().strip() try : self.c = int (self.c) except : raise TypeError( f'Expect an integer value not {type(self.c).__name__!r}') if self.out.find ('factor') >= 0 or self.out =='sf': self.out ='sf' elif self.out in ('z', 'impedance', 'tensor'): self.out ='z' if self.component not in ('xx', 'xy', 'yx', 'yy'): raise ValueError(f"Unacceptable component {self.component!r}. " "Expect 'xx', 'xy', 'yx' or 'yy'") self.res2d_= self.make2d(out=f'res{self.component}') self.phs2d_ = self.make2d(out=f'phase{self.component}') if len(self.res2d_) != len(self.freqs_): raise ValueError ("Resistivity and frequency arrays must have a same" f" length. But {len(self.res2d_)} & {len(self.freqs_)}" " were given") if len(self.res2d_) != len(self.phs2d_): raise ValueError ("Resistivity and phase must have the same length." f" But {len(self.res2d_)} & {len(self.phs2d_)} " "were given.") try : self.window_size = int(self.window_size) except ValueError : raise ValueError ( 'Could not convert {type(self.window_size).__name__!r} ' 'to integer: {self.window_size!r}') self.res2d_ = np.array (self.res2d_) if self.window_size > self.res2d_.shape [1]: raise ValueError ("window size might not be less than" f" {str(self.res2d_.shape [1])!r}") return (self.res2d_ , self.phs2d_ , self.freqs_, self.c, self.window_size, self.component, self.out)
[docs] def ama ( self, )-> NDArray[DType[float]] : """ Use an adaptive-moving-average filter to estimate average apparent resistivities at a single static-correction-reference frequency.. The AMA filter estimates static-corrected apparent resistivities at a single reference frequency by calculating a profile of average impedances along the length of the line. Sounding curves are then shifted so that they intersect the averaged profile. Parameters ---------- data: path-like object or list of pycsamt.core.edi.Edi Collections of EDI-objects from `pycsamt`_ Returns ------- rc or z: np.ndarray, shape (N, M) EMAP apparent resistivity static shift corrected or static correction tensor References ----------- .. [1] http://www.zonge.com/legacy/PDF_DatPro/Astatic.pdf .. [2] Torres-Verdin and Bostick, 1992, Principles of spatial surface electric field filtering in magnetotellurics: electromagnetic array profiling (EMAP), Geophysics, v57, p603-622.https://doi.org/10.1190/1.2400625 """ self.inspect # assert filter arguments self.res2d_ , self.phs2d_ , self.freqs_, self.c, self.window_size, \ self.component, self.out = self._make2dblobs () # interpolate resistivity and phases self.phs2d_= interpolate2d(self.phs2d_, method =self.method) self.res2d_= interpolate2d(self.res2d_, method =self.method,) # convert app. resistivity and impedance phase to # impedance values, Zj, for each station omega0 = 2 * np.pi * self.freqs_ zj = np.sqrt(self.res2d_ * omega0[:, None] * mu0 ) * (np.cos ( np.deg2rad(self.phs2d_)) + 1j * np.sin(np.deg2rad(self.phs2d_))) # compute the weight factor for convoluting # L = dipole length = L : 1 is fixed dipole -length w = np.array([betaj (xj = ii, L= 1 , W= self.window_size) for ii in range(self.window_size)]) #print(w) zjr = np.zeros_like(self.res2d_) zji = zjr.copy() for ii in range (len(zj)): w_exp = [ k * self.window_size for k in range(1, self.c +1 )] zcr=list(); zci = list() # compute Zk(xk, w) iteratively # with adpatavive W expanded to 1 to c for wind_k in w_exp : w= np.array([betaj (xj = jj, L= 1, W= wind_k ) for jj in range(wind_k) ]) # block mode to same to keep the same dimensions zcr.append(np.convolve(zj[ii, :].real, w[::-1], 'same')) zci.append(np.convolve(zj[ii, :].imag, w[::-1], 'same')) # and take the average zjr [ii, :] = np.average (np.vstack (zcr), axis = 0) zji[ii, :] = np.average (np.vstack (zci), axis = 0) zjc = zjr + 1j * zji rc = z2rhoa(zjc, self.freqs_) if self.mode =='same': rc[:, 0] = self.res2d_[:, 0] zjc[:, 0] = zj [:, 0] return zjc if self.out =='z' else rc
[docs] def flma ( self, )-> NDArray[DType[float]] : """ A fixed-length-moving-average filter to estimate average apparent resistivities at a single static-correction-reference frequency. The FLMA filter estimates static-corrected apparent resistivities at a single reference frequency by calculating a profile of average impedances along the length of the line. Sounding curves are then shifted so that they intersect the averaged profile. Parameters ---------- data: path-like object or list of pycsamt.core.edi.Edi Collections of EDI-objects from `pycsamt`_ Returns ------- rc or z : np.ndarray, shape (N, M) EMAP apparent resistivity static shift corrected or static correction impedance tensor. References ----------- .. [1] http://www.zonge.com/legacy/PDF_DatPro/Astatic.pdf """ self.inspect # assert filter arguments self.res2d_ , self.phs2d_ , self.freqs_, self.c, self.window_size, \ self.component, self.out = self._make2dblobs () # interpolate resistivity and phases self.phs2d_= interpolate2d(self.phs2d_, method =self.method) self.res2d_= interpolate2d(self.res2d_, method =self.method) # convert app. resistivity and impedance phase to #impedance values, Zj, for each station omega0 = 2 * np.pi * self.freqs_ zj = np.sqrt(self.res2d_ * omega0[:, None] * mu0 ) * (np.cos ( np.deg2rad(self.phs2d_)) + 1j * np.sin(np.deg2rad(self.phs2d_))) # compute the weight factor for convoluting # L = dipole length = L w = np.array([betaj (xj = ii, L= 1 , W= self.window_size) for ii in range(self.window_size)]) zjr = np.zeros_like(self.res2d_) zji = zjr.copy() for ii in range(len(zjr)) : # block mode to same to keep the same array dimensions zjr[ii, :] = np.convolve(zj[ii, :].real, w[::-1], 'same') zji[ii, :] = np.convolve(zj[ii, :].imag, w[::-1], 'same') # recover the static apparent resistivity from reference freq zjc = zjr + 1j * zji rc = z2rhoa (zjc, self.freqs_) #np.abs(zjc)**2 / (omega0[:, None] * mu0 ) if self.mode =='same': rc[:, 0]= self.res2d_[:, 0] zjc[:, 0]= zj [:, 0] return zjc if self.out =='z' else rc
[docs] def skew( self, method:str ='swift', return_skewness:bool=False, suppress_outliers:bool=False, )-> NDArray[DType[float]]: r""" The conventional asymmetry parameter based on the Z magnitude. The EM signal is influenced by several factors such as the dimensionality of the propagation medium and the physical anomalies, which can distort the EM field both locally and regionally. The distortion of Z was determined from the quantification of its asymmetry and the deviation from the conditions that define its dimensionality. The parameters used for this purpose are all rotational invariant because the Z components involved in its definition are independent of the orientation system used. The conventional asymmetry parameter based on the Z magnitude is the skew defined by Swift (1967) as follows: .. math:: skew_{swift}= |\frac{Z_{xx} + Z_{yy}}{ Z_{xy} - Z_{yx}}| When the :math:`skew_{swift}` is close to ``0.``, we assume a 1D or 2D model when the :math:`skew_{swift}` is greater than ``>=0.2``, we assume 3D local anomaly (Bahr, 1991; Reddy et al., 1977). It is generally considered that an electrical structure of :math:`skew < 0.4` can be treated as a 2D medium. Furthermore, Bahr (1988) proposed the phase sensitive skew which calculates the skew taking into account the distortions produced in Z over 2D structures by shallow conductive anomalies and is defined as follows: .. math:: skew_{Bahr} & = & \sqrt{ \frac{|[D_1, S_2] -[S_1, D_2]|}{|D_2|}} \quad \text{where} S_1 & = & Z_{xx} + Z_{yy} \quad ; \quad S_2 = Z_{xy} + Z_{yx} D_1 & = & Z_{xx} - Z_{yy} \quad ; \quad D_2 = Z_{xy} - Z_{yx} Note that The phase differences between two complex numbers :math:`C_1` and :math:`C_2` and the corresponding amplitude products are now abbreviated by the commutators: .. math:: \[C_1, C_2] & = & \text{Im} C_2*C_1^* \[C_1, C_2] & = & \text{Re} C_1 * \text{Im}C_2 - R_e(C_2)* \text{Im}C_1 Indeed, :math:`skew_{Bahr}` measures the deviation from the symmetry condition through the phase differences between each pair of tensor elements,considering that phases are less sensitive to surface distortions(i.e. galvanic distortion). The :math:`skew_{Bahr}` threshold is set at ``0.3`` and higher values mean 3D structures (Bahr, 1991). Parameters --------- data: str of path-like or list of pycsamt.core.edi.Edi EDI data or EDI object with full impedance tensor Z. method: str Kind of correction. Can be ``swift`` for the remove distorsion proposed by Swift in 1967. The value close to 0. assume the 1D and 2D structures and 3D otherwise. Conversly to ``bahr`` for the remove distorsion proposed by Bahr in 1991. The latter threshold is set to 0.3. Above this value the structures is 3D. return_skewness: str, Typically returns the type of skewness. ``'skew'`` or ``mu`` for skew and rotation- all invariant values respectively. Any other value return both skew and rotational invariant. suppress_outliers: bool, default=False, Remove the outliers (if applicable in the data ) before normalizing. .. versionadded:: 0.1.6 Returns --------- skw, mu : Tuple of ndarray-like , shape (N, M ) - Array of skew at each frequency - rotational invariant ``mu`` at each frequency that measures of phase differences in the impedance tensor. See Also --------- watex.utils.plot_skew: For phase sensistive skew visualization - naive plot. watex.view.TPlot.plotSkew: For consistent plot of phase sensitive skew visualization. Allow customize plots. watex.view.TPlot.plot_phase_tensors: Plot skew as ellipsis visualization by turning the `tensor` parameter to ``skew``. Examples -------- >>> from watex.methods.em import Processing >>> edipath = 'data/edis' >>> p = Processing().fit(edipath) >>> sk,_ = p.skew() >>> sk[0:, ] ... array([0.45475527, 0.7876896 , 0.44986397]) References ---------- Bahr, K., 1991. Geological noise in magnetotelluric data: a classification of distortion types. Physics of the Earth and Planetary Interiors 66 (1–2), 24–38. Barcelona, H., Favetto, A., Peri, V.G., Pomposiello, C., Ungarelli, C., 2013. The potential of audiomagnetotellurics in the study of geothermal fields: A case study from the northern segment of the La Candelaria Range, northwestern Argentina. J. Appl. Geophys. 88, 83–93. https://doi.org/10.1016/j.jappgeo.2012.10.004 Swift, C., 1967. A magnetotelluric investigation of an electrical conductivity anomaly in the southwestern United States. Ph.D. Thesis, MIT Press. Cambridge. """ self.inspect self.method = str(method).lower() if self.method not in ('swift', 'bahr'): raise ValueError( f'Expected argument ``swift`` or ``bahr`` not: {self.method!r}') return_skewness= str(return_skewness).lower() if ( 'mu' in return_skewness or 'rot' in return_skewness or 'invariant' in return_skewness): return_skewness ='mu' elif 'skew' in return_skewness: return_skewness ='skew' Zxx= self.make2d('zxx') Zxy = self.make2d('zxy') Zyx = self.make2d('zyx') Zyy= self.make2d('zyy') S1 =Zxx + Zyy; S2 = Zxy + Zyx; D1 =Zxx-Zyy ; D2= Zxy-Zyx D1S2 = (S2 * np.conj(D1)).imag ; S1D2 = (D2 * np.conj(S1)).imag if method =='swift': skw = np.abs ( S1 / D2 ) else : skw = np.sqrt(np.abs( D1S2 - S1D2))/np.abs(D2) mu = np.sqrt(np.abs(D1S2) + np.abs (S1D2))/ np.abs(D2) if suppress_outliers: skw = remove_outliers(skw, fill_value= np.nan ) mu = remove_outliers(mu, fill_value= np.nan ) skw = normalizer(skw) ; mu = normalizer(mu ) return skw if return_skewness=='skew' else ( mu if return_skewness=='mu' else skw, mu)
[docs] def zrestore( self, *, tensor:str=None, component:str=None, buffer: Tuple[float]=None, method:str ='pd', **kws ): """ Fix the weak and missing signal at the 'dead-band`- and recover the missing impedance tensor values. The function uses the complete frequency (frequency with clean data) collected thoughout the survey to recover by inter/extrapolating the missing or weak frequencies thereby restoring the impedance tensors at that 'dead-band'. Note that the 'dead- band' also known as 'attenuation -band' is where the AMT signal is weak or generally abscent. One main problem in collecting |NSAMT| data is the signal level in the 'attenuation band'. Compared to the |CSAMT| method (Wang and Tan, 2017; Zonge and Hughes, 1991),the natural signals are not under our control and suffer from frequency ranges with little or no signal. Most notably, the |NSAMT| 'dead-band' between approximately 1 kHz and 4 kHz, but also a signal low in the vicinityof 1 Hz where the transition to magnetospheric energy sources occurs (Goldak and Olson, 2015). In this band, natural source signals are generally absent. The EM energy is dissipated and often cultural |EM| noise fills the gap (Zonge, 2000). The response is extrapolated from results observed top frequencies( For instance at 20, 40, 250, and 500 Hz).Experience indicates that the natural source signal level at 2000 Hz can be expected to approach 10-6 Ξ³/√Hz (Zheng, 2010; Zonge, 2000). Parameters ---------- tensor: str, optional Name of the :term:`tensor`. It can be [ resistivity|phase|z|frequency]. If the name of tensor is given, function returns the tensor value in two-dimensionals composed of (n_freq , n_sites) where ``n_freq=number of frequency`` and ``n_sations`` number of sites. Note that if the tensor is passed as boolean values ``True``, the ``resistivity`` tensor is exported by default and the ``component`` should be the component passed to :class:`Processing` at initialization. buffer: list [max, min] frequency in Hz list of maximum and minimum frequencies. It must contain only two values. If `None`, the max and min of the clean frequencies are selected. Moreover the [min, max] frequency should not compulsory to fit the frequency range in the data. The given frequency can be interpolated to match the best closest frequencies in the data. method: str, optional Method of interpolation. Can be ``base`` for `scipy.interpolate.interp1d` ``mean`` or ``bff`` for scaling methods and ``pd`` for pandas interpolation methods. Note that the first method is fast and efficient when the number of NaN in the array if relatively few. It is less accurate to use the `base` interpolation when the data is composed of many missing values. Alternatively, the scaled method(the second one) is proposed to be the alternative way more efficient. Indeed, when ``mean`` argument is set, function replaces the NaN values by the nonzeros in the raw array and then uses the mean to fit the data. The result of fitting creates a smooth curve where the index of each NaN in the raw array is replaced by its corresponding values in the fit results. The same approach is used for ``bff`` method. Conversely, rather than averaging the nonzeros values, it uses the backward and forward strategy to fill the NaN before scaling. ``mean`` and ``bff`` are more efficient when the data are composed of a lot of missing values. When the interpolation `method` is set to `pd`, function uses the pandas interpolation but ended the interpolation with forward/backward NaN filling since the interpolation with pandas does not deal with all NaN at the begining or at the end of the array. Default is ``pd``. fill_value: array-like or ``extrapolate``, optional If a ndarray (or float), this value will be used to fill in for requested points outside of the data range. If not provided, then the default is NaN. The array-like must broadcast properly to the dimensions of the non-interpolation axes. If a two-element tuple, then the first element is used as a fill value for x_new < x[0] and the second element is used for x_new > x[-1]. Anything that is not a 2-element tuple (e.g., list or ndarray, regardless of shape) is taken to be a single array-like argument meant to be used for both bounds as below, above = fill_value, fill_value. Using a two-element tuple or ndarray requires bounds_error=False. Default is ``extrapolate``. kws: dict Additional keyword arguments from :func:`~interpolate1d`. Returns -------- Array-like of :class:`watex.external.z.Z` objects Array collection of new Z impedances objects with dead-band tensor recovered. :class:`watex.externals.z..Z` are ndarray (nfreq, 2, 2). 2x2 matrices for components xx, xy and yx, yy. References ---------- Goldak, D.K., Olson, R.W., 2015. New developments in |AMT| explorationβ€―: Case study from Darnley Bay. CSEG Rec. 22–27. Wang, K., Tan, H., 2017. Research on the forward modeling of |CSAMT| in three-dimensional axial anisotropic media. J. Appl. Geophys. 146, 27–36. https://doi.org/10.1016/j.jappgeo.2017.08.007 Zonge, I., 2000. |NSAMT| Imaging. 3322 East Fort Lowell Road, Tucson, AZ 85716 USA. Zonge, L., Hughes, L.J., 1991. |CSAMT|. Soc. Explor. Geophys. 2, 713–809. Examples -------- >>> import numpy as np >>> import matplotlib.pyplot as plt >>> from watex.methods.em import Processing >>> path2edi = 'data/edis' >>> pObjs= Processing().fit(path2edi) >>> # One can specify the frequency buffer like the example below, However >>> # it is not necessaray at least there is a a specific reason to fix the frequencies >>> buffer = [1.45000e+04,1.11500e+01] >>> zobjs_b = pObjs.zrestore(buffer = buffer ) # with buffer """ def z_transform (z , rfq, fq, slice_= None): """ Route to do the same task for real, imaginary and error """ with np.errstate(all='ignore'): z = reshape(z) z = fittensor(compfreq= fq, refreq =rfq, z = z ) z = interpolate1d(arr=z , method = self.method, **kws ) return z [slice_] self.inspect self.method = method # get the frequencies obj zObjs = np.array (list(map(lambda o: o.Z, self.ediObjs_)) , dtype =object) #read all frequency length and take the max frequency # known as the complete frequencies ( That compose all values) freqsize = np.array (list (map (lambda o:len(o._freq), zObjs))) ix_max = np.argmax(freqsize) # get the complete freq cfreq = zObjs[ix_max]._freq # control the buffer and get the the range of frequency buffer = self.controlFrequencyBuffer(cfreq, buffer) ix_buf, = np.where ( np.isin (cfreq, buffer)) ## index for slice the array in the case the buffer is set ix_s , ix_end = ix_buf ; ix_end += 1 ; slice_= slice (ix_s, ix_end) s_cfreq = cfreq [slice_] # slice frequency within the buffer # make a new Z objects # make a new object new_zObjs =np.zeros_like (zObjs, dtype =object ) # loop to correct the Z impedance object values for kk, ediObj in enumerate (self.ediObjs_): new_Z = EMz(z_array=np.zeros((len(s_cfreq), 2, 2), dtype='complex'), z_err_array=np.zeros((len(s_cfreq), 2, 2)), freq=s_cfreq) new_Z = self._tfuncZtransformer( ediObj, new_Z, tfunc= z_transform, cfreq= cfreq, ix_s= ix_s, ix_end= ix_end ) new_zObjs[kk] = new_Z if tensor: tensor = str(tensor).lower() tensor = 'res' if tensor =='true' else tensor new_zObjs = get2dtensor( new_zObjs, tensor = tensor , component = component or self.component, ) return new_zObjs
def _tfuncZtransformer ( self, ediObj: EDIO , new_Z: NDArray [DType[complex]], tfunc: F, cfreq: ArrayLike, slice_: slice =None, ix_s: int = 0 , ix_end: int = -1, )-> NDArray [DType[complex]]: """ Loop and transform the previous tensor to a new tensor from a transform function `tfunc`. :param ediObj: EDI-object from EM. :param new_Z: new tensoor of 2 x2 matrix , complex number :param tfunc: Callable, function for transform the tensor :param cfreq: new interpolate frequency use to initialize the new tensor :param slice_: slice object , to preserve data in the previous tensor :param ix_s: int, index of startting point to read the tensor from previous tensor :param ix_end: int, end point to stop reading the previous tensor. :note: the new tensor is composed of shape (cfreq, 2 , 2 ), 2 x2 matrices for the four component xx, xy, yx, yy . :return: NDArray of shpe (cfreq, 2 * 2 ), dtype = complex """ for ii in range(2): for jj in range(2): # need to look out for zeros in the impedance # get the indicies of non-zero components nz_index = np.nonzero(ediObj.Z.z[:, ii, jj]) if len(nz_index[0]) == 0: continue # get the non_zeros components and interpolate # frequency to recover the component in dead-band frequencies # Use the whole edi with np.errstate(all='ignore'): zfreq = ediObj.Z._freq z_real = reshape(ediObj.Z.z[nz_index, ii, jj].real) z_real = tfunc (z_real, rfq=cfreq, fq=zfreq, slice_=slice_ ) z_imag = reshape(ediObj.Z.z[nz_index, ii, jj].imag) z_imag = tfunc (z_imag, rfq=cfreq, fq=zfreq, slice_=slice_ ) z_err = reshape(ediObj.Z.z_err[nz_index, ii, jj]) z_err = tfunc (z_err, rfq=cfreq, fq=zfreq, slice_=slice_ ) # Use the new dimension of the z and slice z according # the buffer range. make the new index start at 0. new_nz_index = slice ( * np.array([ix_s, ix_end],dtype=np.int32)-ix_s) new_Z.z[new_nz_index, ii, jj] = reshape( z_real, 1) + 1j * reshape(z_imag, 1) new_Z.z_err[new_nz_index, ii, jj] = reshape(z_err, 1) # compute resistivity and phase for new Z object new_Z.compute_resistivity_phase() return new_Z
[docs] @staticmethod def freqInterpolation ( y:ArrayLike[DType[T]] , /, buffer:Optional[Tuple[float]] = None , kind: str ='freq' )-> ArrayLike[DType[T]]: """ Interpolate frequency in frequeny buffer range. :param y: array-like, shape(N, ) - Can be a frequency array or periods note that the frequency is not in log10 Hz. :param buffer: list of maximum and minimum frequency. It should contains only two values. If `None`, the max and min frequencies are used :param kind: str type of given data. Can be 'period' if the value is given as periods or 'frequency' otherwise. Any other value should be considered as a frequency values. :return: array_like, shape (N2, ) New interpolated frequency with N2 size :example: >>> from watex.methods.em import Processing >>> pobj = Processing().fit('data/edis') >>> f = getfullfrequency (pobj.ediObjs_) >>> buffer = [5.86000e+04, 1.6300e+01] >>> f ... array([7.00000e+04, 5.88000e+04, 4.95000e+04, 4.16000e+04, 3.50000e+04, 2.94000e+04, 2.47000e+04, 2.08000e+04, 1.75000e+04, 1.47000e+04, ... 2.75000e+01, 2.25000e+01, 1.87500e+01, 1.62500e+01, 1.37500e+01, 1.12500e+01, 9.37500e+00, 8.12500e+00, 6.87500e+00, 5.62500e+00]) >>> new_f = freqInterpolation(f, buffer = buffer) >>> new_f ... array([5.88000000e+04, 4.93928459e+04, 4.14907012e+04, 3.48527859e+04, 2.92768416e+04, 2.45929681e+04, 2.06584471e+04, 1.73533927e+04, ... 2.74153120e+01, 2.30292565e+01, 1.93449068e+01, 1.62500000e+01]) """ kind =str (kind).lower().strip() if kind.find('peri')>=0 : kind ='periods' y = 1./ np.array (y) if kind =='periods' else np.array (y) buffer = Processing.controlFrequencyBuffer(y, buffer ) ix_s, ix_end = np.argwhere (np.isin(y, buffer)) y = y[slice ( int(ix_s), int(ix_end) +1)] # put frequency in logspace and return # the same order like the input value y = np.log10 (y) if y[0] < y[-1]: f = np.logspace(y.min() ,y.max() , len(y)) else : f = np.logspace(y.min(),y.max() , len(y))[::-1] return f
[docs] @staticmethod def controlFrequencyBuffer ( freq: ArrayLike[DType[T]], buffer:Optional[Tuple[float]] = None )-> ArrayLike[DType[T]] : """ Assert buffer and find the nearest value if the value of the buffer is not in frequency ranges . :param freq: array-like of frequencies :param buffer: list of maximum and minimum frequency. It should contains only two values. If `None`, the max and min frequencies are selected :returns: Buffer frequency range :Example: >>> import numpy as np >>> from watex.methods.em import Processing >>> freq_ = np.linspace(7e7, 1e0, 20) # 20 frequencies as reference >>> buffer = Processing.controlFrequencyBuffer(freq_, buffer =[5.70e7, 2e1]) >>> freq_ ... array([7.00000000e+07, 6.63157895e+07, 6.26315791e+07, 5.89473686e+07, 5.52631581e+07, 5.15789476e+07, 4.78947372e+07, 4.42105267e+07, 4.05263162e+07, 3.68421057e+07, 3.31578953e+07, 2.94736848e+07, 2.57894743e+07, 2.21052638e+07, 1.84210534e+07, 1.47368429e+07, 1.10526324e+07, 7.36842195e+06, 3.68421147e+06, 1.00000000e+00]) >>> buffer ... array([5.52631581e+07, 1.00000000e+00]) """ if buffer is not None: if np.iterable(buffer): if 1 < len(buffer) > 2 : raise ValueError('Frequency buffer expects two values [max, min].' f' But {"is" if len(buffer)==1 else "are"} given ') if len(set(buffer))==1: raise ValueError('Expect distinct values [max, min].' f'{str(buffer[0])!r} is given in twice.') for i, v in enumerate(buffer) : if str(v).lower() =='min': buffer[i] = freq.min() elif str(v).lower() =='max': buffer[i]= freq.max() elif isinstance(v, str): raise ValueError(f"Expect 'min' or 'max' not: {v!r}") # Find the absolute difference with each value # Get the index of the smallest absolute difference arr_diff = np.abs(freq - v ) buffer[i] = freq[arr_diff.argmin()] buffer = np.array(buffer) buffer.sort() ; buffer = buffer [::-1] if buffer is None: buffer = np.array([freq.max(), freq.min()]) if buffer.min() < freq.min(): raise ValueError( f'Given value {round(buffer.min(), 4) } is out of frequency range.' f' Expect a frequency greater than or equal to {round(freq.min(), 4)}' ) if buffer.max() > freq.max() : raise ValueError( f'Given value {round(buffer.max(),4)} is out of frequency range.' f' Expect a frequency less than or equal {round(freq.max(),4)}') return buffer
[docs] def qc ( self, tol: float = .5 , *, return_freq: bool =False, return_ratio:bool=False, to_log10: bool=True, )->Tuple[float, ArrayLike]: """ Check the quality control of the collected EDIs. Analyse the data in the EDI collection and return the quality control value. It indicates how percentage are the data to be representative. :param tol: float, the tolerance parameter. The value indicates the rate from which the data can be consider as meaningful. Preferably it should be less than 1 and greater than 0. At this value. Default is ``.5`` means 50 % :param return_freq: bool return the interpolated frequency if set to ``True``. Default is ``False``. :param return_ratio: bool, default=False, return only the ratio of the representation of the data. .. versionadded:: 0.1.5 :param to_log10:bool, default=False convert the interpolated frequency into a log10. :returns: Tuple (float , index ) or (float, array-like, shape (N, )) return the quality control value and interpolated frequency if `return_freq` is set to ``True`` otherwise return the index of useless data. :Example: >>> from watex.methods.em import Processing >>> pobj = Processing().fit('data/edis') >>> f = pobj.getfullfrequency () >>> # len(f) >>> # ... 55 # 55 frequencies >>> c,_ = pobj.qc ( tol = .4 ) # mean 60% to consider the data as >>> # representatives >>> c # the representative rate in the whole EDI- collection >>> # ... 0.95 # the whole data at all stations is safe to 95%. >>> # now check the interpolated frequency >>> c, freq_new = pobj.qc ( tol=.6 , return_freq =True) """ if isinstance (tol, str): tol = tol.replace('%', '') try : tol = float (tol) except TypeError : raise TypeError (f"Unable to convert {type(tol).__name__!r} " f"to float: {tol}") except ValueError: raise ValueError(f"Expects 'float' not {type(tol).__name__!r}: " f"{(tol)!r}") if tol ==0.: raise ValueError ("Expects a tolerance value greater than " f"'0' and less than '1', got: '{tol}'") if 1 < tol <=100: tol /= 100. if tol > 100: raise ValueError ("Tolerance value should be greater than" f" '0' and less than '1', got: {tol}") self.inspect f=self.freqs_.copy() try : # take a sample of collected edi # and make two day array # all frequency at all stations ar = self.make2d ('freq') except : try : ar = self.make2d( 'zxy') except: ar = self.make2d ('zyx') # compute the ratio of NaN in axis =0 nan_sum =np.nansum(np.isnan(ar), axis =1) rr= np.around ( nan_sum / ar.shape[1] , 2) # compute the ratio ck # ck = 1. - rr[np.nonzero(rr)[0]].sum() / ( # 1 if len(np.nonzero(rr)[0])== 0 else len(np.nonzero(rr)[0])) ck = (1. * len(rr) - len(rr[np.nonzero(rr)[0]]) ) / len(rr) index = reshape (np.argwhere (rr > tol)) ar_new = np.delete (rr , index , axis = 0 ) new_f = np.delete (f[:, None], index, axis =0 ) # interpolate freq if f[0] < f[-1]: f =f[::-1] # reverse the array ar_new = ar_new [::-1] # or np.flipud(np.isnan(ar)) new_f = np.logspace(np.log10(new_f.min()) ,np.log10(new_f.max()), len(new_f))[::-1] if not to_log10: new_f = np.power(10 , new_f) return np.around (ck, 2) if return_ratio else ( np.around (ck, 2), new_f if return_freq else index )
[docs] @_zupdate(option = 'none') def getValidTensors( self, tol:float = .5, **kws )-> NDArray[DType[complex]]: """Select valid tensors from tolerance threshold and write EDI if applicable. Function analyzes the data and keep the good ones. The goodness of the data depends on the `threshold` rate. For instance 50% means to consider an impedance tensor 'z' valid if the quality control shows at least that score at each frequency of all stations. Parameters ---------- data: Path-like object or list of :class:`pycsamt.core.edi.Edi` collections of EDI-objects from `pycsamt`_ . `data` params is passed to :meth:`~.Processing.fit` method. tol : float, tolerance parameter. The value indicates the rate from which the data can be consider as a valid. The valid data selection should be soft when the tolerance parameter is close to '1' and hard otherwise. As the `tol` value decreases, the selection becomes severe. Default is ``.5`` means 50 % kws: dict , Additional keywords arguments for EDI file exporting Returns ------- Zc: :class:`watex.externals.z.Z` impedance tensor objects. Examples -------- >>> from watex.methods.em import Processing >>> pObj = Processing ().fit('data/edis') >>> f= pObj.freqs_ >>> len(f) ... 55 >>> zObjs_hard = pObj.getValidTensors (tol= 0.3 ) # None doesn't export EDI-file >>> len(zObjs_hard[0]._freq) # suppress 3 tensor data ... 52 >>> zObjs_soft = pObj.getValidTensors(p.ediObjs_, tol = 0.6 , option ='write') >>> len(zObjs_soft[0]._freq) # suppress only two ... 53 """ def delete_useless_tensor (z , index , axis = 0): """Remove meningless tensor data""" return np.delete (z, index , axis ) def set_null(freq, objs): """Set null in the case the component doesn't exist""" return np.zeros ((len(f), len(objs)), dtype = np.float32) self.inspect # ediObjs = get_ediObjs(ediObjs) _, no_ix = self.qc(tol=tol ) f = self.freqs_.copy() ff = np.delete (f[:, None], no_ix, 0) # interpolate frequency new_f = Processing.freqInterpolation (reshape (ff)) # gather the 2D z objects # -XX-- try : zxx = delete_useless_tensor( self.make2d (self.ediObjs_, 'zxx'), no_ix) zxx = interpolate2d(zxx) zxx_err = delete_useless_tensor( self.make2d (self.ediObjs_, 'zxx_err') , no_ix ) zxx_err = interpolate2d (zxx_err ) except : zxx = set_null(new_f, self.ediObjs_) zxx_err= zxx.copy() # -XY-- try : zxy = delete_useless_tensor( self.make2d (self.ediObjs_, 'zxy'), no_ix ) zxy= interpolate2d( zxy) zxy_err = delete_useless_tensor( self.make2d (self.ediObjs_, 'zxy_err') , no_ix ) zxy_err = interpolate2d(zxy_err) except: zxy = set_null(new_f, self.ediObjs_) zxy_err= zxy.copy() # -YX-- try: zyx = delete_useless_tensor( self.make2d (self.ediObjs_, 'zyx') , no_ix ) zyx = interpolate2d(zyx) zyx_err = delete_useless_tensor( self.make2d (self.ediObjs_, 'zyx_err') , no_ix ) zyx_err = interpolate2d( zyx_err ) except: zyx = set_null(new_f, self.ediObjs_) zyx_err= zyx.copy() # -YY-- try: zyy = delete_useless_tensor( self.make2d (self.ediObjs_, 'zyy'), no_ix ) zyy = interpolate2d(zyy) zyy_err = delete_useless_tensor( self.make2d (self.ediObjs_, 'zyy_err') , no_ix ) zyy_err = interpolate2d(zyy_err) except : zyy = set_null(new_f, self.ediObjs_) zyy_err= zyy.copy() z_dict = { 'zxx': zxx ,'zxy': zxy , 'zyx': zyx,'zyy': zyy, 'zxx_err': zxx_err ,'zxy_err': zxy_err , 'zyx_err': zyx_err, 'zyy_err': zyy_err } return (self.ediObjs_, new_f , z_dict ), kws