Source code for watex.edi

# -*- coding: utf-8 -*-
#  License: BSD-3-Clause
#  Author: L. Kouadio
"""
:term:`EDI` stands for Electrical Data Interchange. The :class:`Edi` class read and write an EDI file(*.edi)
file as the standard :term:`MT`/:term:`EMAP` Data Interchange. It does the basic tensors operations. Each section 
of the :term:`EDI` file belongs to a class object, thus the elements of each section are attributes 
for easy access. :term:`Edi` is outputted  following the :term:`SEG` documentation. 
"""

from __future__ import annotations 
import os
import re
import warnings
import datetime
import shutil
import time
import numpy as np 

from . import __version__ 
from ._typing import NDArray 
from ._watexlog import watexlog
from .exceptions import ( 
    EDIError, NotFittedError
    )
from .externals.z import Z, Tipper  
from .utils.funcutils import ( 
    cpath, 
    interp_import, 
    spi, 
    strip_item
    )
from .utils.gistools import ( 
    convert_position_float2str ,
    convert_position_str2float, 
    assert_elevation_value, 
    assert_lon_value, 
    assert_lat_value
    )
from .property import ( 
    _EDI, 
    _ZRP_COMPS, 
    _TIP_COMPS, 
    IsEdi,
    Software, 
    Copyright, 
    Person, 
   ) 
from .site import  Location

_logger = watexlog.get_watex_logger(__name__)
_wx_version_ = __version__[:-2] 

__all__=["Edi"]


[docs] class Edi : _block_size= 6 _data_head_com='>!****{0}****!\n' _bloc_num_format=' 15.6e' _z_comps= [zz.lower().strip('>') for zz in _EDI if re.match("^>Z", zz) ] _t_comps= [tt.lower().strip('>') for tt in _EDI if ( re.match("^>T", tt) is not None and tt.find('EXP')>0) ] _res_comps= [res.lower().strip('>') for res in _EDI if re.match( "^>RHO", res)] _phs_comps= [phs.lower().strip('>') for phs in _EDI if re.match( "^>PHS", phs) is not None ] def __init__( self, verbose:int=0, ): self._logging=watexlog.get_watex_logger(self.__class__.__name__) self.Head=Head() self.Info=Info() self.DefineMeasurement=DefineMeasurement() self.MTEMAP=MTEMAP() self.Z=Z() self.Tip=Tipper() self.verbose=verbose @property def dataid (self) : return self.Head.dataid @dataid.setter def dataid (self, did): self.Head.dataid = did @property def lon(self): return self.Head.long or self.DefineMeasurement.reflong @lon.setter def lon (self, longitude): self.Head.long =longitude @property def lat(self): return self.Head.lat or self.DefineMeasurement.reflat @lat.setter def lat (self, latitude): self.Head.lat=latitude @property def elev(self): return self.Head.elev or self.DefineMeasurement.refelev @elev.setter def elev (self, elevation): self.Head.elev =elevation @property def inspect (self): """ Inspect object whether is fitted or not""" msg = ( "{obj.__class__.__name__} instance is not fitted yet." " Call 'fit' with appropriate arguments before using" " this method" ) if not hasattr (self, 'edifile'): raise NotFittedError(msg.format( obj=self) ) return 1
[docs] def fit (self, edifile: str ): """ fit edifile and populate attribute to each data section of edi. Parameters ----------- edifile: str, full path to edifile Returns -------- ``self``: :class:`watex.edi.Edi` instanciated object returns ``self`` for chaining methods. """ self._logging.info ("Reading <{0}> edifile.".format(edifile)) if edifile is not None : self.edifile = edifile #---> read each section and populate attribute IsEdi._assert_edi(self.edifile) self.Head = Head.get_header_list_from_edi(edi_fn=self.edifile ) self.Info = Info.get_info_list_from_edi(edi_fn=self.edifile) self.DefineMeasurement = \ DefineMeasurement.get_DefineMeasurement_info( edi_fn=self.edifile) self.MTEMAP = MTEMAP.get_mtemap_section_list(edi_fn= self.edifile ) self.edi_data_sectionline_ = MTEMAP.start_data_lines_num #read data section self._get_specific_comp(edifile = self.edifile ) return self
def __repr__ (self ): """ Represents the EDI object """ return "{}( verbose={!r} )".format( self.__class__.__name__, self.verbose) def _get_specific_comp ( self, edifile:str =None, data_sect_line: list =None ): """ Method to get a specific components present on the data set . put all components into dictionnary and data like : {'zxxr' :<list of data zxxr>, 'zxyr':<list of data zxyr>, ....,zyyi':<list data of zyyi>} Parameters ----------- * edifile : str full path to edifile * data_sect_line: int number of line where data section start. default is None': fill automatycally Returns -------- dict dictionnary of all component values get on edifiles. Notes ------ data_sect_line parameter is optional. """ self._logging.info('Read <get_specific_comp> in edi_data' ' section :edifile :{0}.'.format( edifile)) if edifile is not None : self.edifile = edifile # we assume that the file will check above , #just to get to microsec running IsEdi._assert_edi(self.edifile, deep= False ) with open (self.edifile, 'r', encoding ='utf8') as fedi : edilines =fedi.readlines() #get all data section of edi edi_data_section =edilines [self.edi_data_sectionline_ :] # read data section and and put data on list of dict. self.comp_dict , _flag={}, 0 for ii, datalines in enumerate (edi_data_section) : keylines =datalines.strip() if '>!' not in keylines and '>' in keylines : edi_lines = keylines[1:].strip().split() # out the '> if edi_lines==[] or edi_lines[0] ==''or edi_lines[0] ==None : continue # these assement to be sure that we met a none line compkey =edi_lines[0].lower() if compkey in self._z_comps or compkey in self._t_comps or\ compkey =='freq' or compkey in self._res_comps or\ compkey in self._phs_comps: _flag =1 # turn on to find data # create dict of key and ready to fill value on list self.comp_dict [compkey]=[] # turn off to find data else : _flag = 0 elif _flag ==1 and (keylines.find('>!') < 0 or keylines.find('>') < 0 ): data_lines =keylines.strip().split() for jj, data in enumerate(data_lines): try : data_lines [jj]= float(data) # be sure to set all none value of ** value to 0.0 if data_lines [jj] == 1.0e32 : data_lines[jj] =.0 except: data_lines [jj] = 0.0 self.comp_dict[compkey].extend(data_lines) self._fill_data_array(data_dict=self.comp_dict) def _fill_data_array ( self, data_dict: dict =None ): """ Method to fill Impedance Data bloc and Tipper blocks and Resistivities and data blocks if provided User dont need to provided data on dictionnay provided that your are sure that data providing respect SEG instructions. If not fill automatically by reading edifile . :param data_dict: dictionnary of data < get from reading edifiles > *default* is None , fill automatically :type data_dict: dict """ #frequency arranged to highest to lowest . #if 1 mean is lower to highest flag_freqOrder = 0 if data_dict is not None : self.comp_dict = data_dict elif data_dict is None : raise EDIError( 'None value found. Can not read data') # get frequency array and initialise z_array and Z_error freq_array = np.array(self.comp_dict['freq'], dtype =np.float64) z_array , z_error_array = np.zeros( (freq_array.size , 2 , 2),dtype =np.complex128),\ np.zeros ((freq_array.size , 2 , 2), dtype =np.float64) if freq_array [-1] > freq_array [0] : self._logging.info ( 'Data originally come to lower to highest frequency.' ' We gonna set to Highest to lower frequency') # set flag to 1 mean value arerange to lowest to hightest flag_freqOrder = 1 for key in self.comp_dict.keys(): if key == 'zxxr' : z_array[: , 0 , 0] = np.array( self.comp_dict[key]) + 1j * np.array( self.comp_dict[key[:-1]+'i'] ) z_error_array [: , 0 , 0]= np.sqrt( np.array(self.comp_dict[key[:-1]+'.var'])) if key == 'zxyr' : z_array[: , 0 , 1] = np.array( self.comp_dict[key]) + 1j * np.array( self.comp_dict[key[:-1]+'i']) z_error_array [: , 0 , 1]= np.sqrt( np.array(self.comp_dict[key[:-1]+'.var'])) if key == 'zyxr' : z_array[: , 1 , 0] = np.array( self.comp_dict[key] ) + 1j * np.array( self.comp_dict[key[:-1]+'i']) z_error_array [: , 1 , 0]= np.sqrt( np.array(self.comp_dict[key[:-1]+'.var'])) if key == 'zyyr' : z_array[: , 1, 1] = np.array( self.comp_dict[key]) + 1j * np.array( self.comp_dict[key[:-1]+'i']) z_error_array [: , 1 , 1]= np.sqrt( np.array(self.comp_dict[key[:-1]+'.var'])) if flag_freqOrder == 1 : #--> return matrice to high to low order z_array , z_error_array, freq_array = z_array [::-1],\ z_error_array[::-1], freq_array[::-1] #---> call Zmodule and populate attributes self.Z._freq, self.Z._z_err , self.Z._z ,=\ freq_array ,z_error_array, z_array if 'zrot' in self.comp_dict.keys() : self.Z.rotation_angle =np.array (self.comp_dict['zrot']) else : # fill zeros value as Zrot self.Z.rotation_angle = np.zeros_like (freq_array, dtype =np.float64) self.Z.compute_resistivity_phase() # compute resistivity and phase _flagTip = False # for key in list(self.comp_dict.keys()): if key in self._t_comps : _flagTip =True break if _flagTip ==True : tip_array =np.zeros ((freq_array.size, 1, 2), dtype =np.complex) tip_error_array =np.zeros ((freq_array.size , 1, 2), dtype =np.float64) if 'trot' in self.comp_dict.keys(): self.Tip.rotation_angle=np.array(self.comp_dict['trot']) elif 'zrot' in self.comp_dict.keys(): self.Tip.rotation_angle = self.comp_dict ['zrot'] else : self.Tipper.rotation_angle = np.zeros_like (freq_array, np.float64) for tkey in self.comp_dict.keys (): if tkey == 'txr.exp': tip_array [:, 0, 0 ] =np.array ( self.comp_dict[tkey])+ 1j * np.array( self.comp_dict[key.replace('r', 'i')])# txi.exp tip_array [:, 0, 1] = np.array ( self.comp_dict[tkey.replace('txr', 'tyr')])\ +1j * np.array( self.comp_dict [tkey.replace('txr', 'tyi')]) tip_error_array [:, 0, 0] = np.sqrt( np.array (self.comp_dict [tkey.replace('txr', 'txvar')])) tip_error_array [:, 0, 1] = np.sqrt( np.array (self.comp_dict [tkey.replace('txr', 'tyvar')])) if flag_freqOrder ==1 : tip_error_array, tip_array, = tip_error_array[::-1], tip_array[::-1] self.Tip._freq ,self.Tip._tipper = freq_array , tip_array self.Tip._tipper_err = tip_error_array self.Tip.compute_amp_phase (), self.Tip.compute_mag_direction()
[docs] def write_edifile ( self, edifile:str=None, new_edifilename:str=None, datatype:str=None , savepath:str=None, filtered_array:NDArray =None, prefix_edi:str='new_', ): """ Method to write edifiles from data setting oin attribute of Edi or from existing file. Can write also EMAP data are filled attribute of EDI. Parameters ------------- edifile: str, Path-like object Full path to the edi-file. Should be the old EDI expected to be rewritten. new_edifilename: str, Optional new edifile name .If None , will write edi using station_name plus type of survey (MT of EMAP) plus year of writing as< S00_emap.2021.edi> or <S00_mt.2021.edi> datatype: str, type of file , "mt" or "emap" if None , program will detect which file is provided . If datatype is set , program will be force to rewrite edi into given format. savepath: str, optional Path to save edifile. If ``None`` save to your current work directory filtered_array: Array-like of shape (nfreq, 2, 2) write filtered array especially if EDI edifile is EMAP section data,if add filter is provided , will recompute rho. Returns ------- new_edifile: str, full path to the new edifile Examples -------- >>> from watex.edi import Edi >>> e=Edi().fit('data/edis/new_E1_1.edi').write_edifile () """ f=0 if edifile is not None : self.edifile =edifile self.inspect if savepath is not None: self.savepath =savepath if not hasattr(self, 'savepath'): self.savepath =None if new_edifilename is not None: try: new_edifilename += '.{0}.edi' f=2 except: new_edifilename =None f=0 if new_edifilename is None : if self.edifile is not None : new_edifilename = '{0}{1}'.format( f'{prefix_edi}', os.path.basename(self.edifile)) else : f = 1 new_edifilename = '{0}_{1}.{2}.edi' if self.Head.dataid is None : self.read_edi() # write info, definemeasurement and mtsection or emapsection edi_header_infolines = self.Head.write_head_info() edi_info_infolines =self.Info.write_edi_info() edi_definemeasurement_infolines =\ self.DefineMeasurement.write_define_measurement() edi_mtemap_infolines =\ self.MTEMAP.write_mtemap_section(nfreq=self.Z.freq.size) edi_mtemap_infolines.append('\n') #---> try to force the program to write either emap or mt section format if datatype is None : # if self.typefile is None : self.typefile = edi_mtemap_infolines[0][2:-5].lower() elif datatype is not None : if re.match(datatype.lower(), 'emapsect') is None \ and re.match(datatype.lower(), 'mtsect') is None : warnings.warn ( 'Currently <watex> can write ">=MTSECT" or ' ' ">=EMAPSECT". The only acceptables datatype keys ' ' are either "mt" or "emap".') raise EDIError( 'Datatype provided is not acceptable .' 'Please try "mt" or "emap".') self.typefile= datatype if f==1 : new_edifilename = new_edifilename.format( self.Head.dataid, self.typefile, datetime.datetime.now().year) # set the name of new_filename elif f==2: new_edifilename=new_edifilename.format(self.Head.dataid) # write frequency >!****FREQUENCIES****! edi_freq_infolines = [self._data_head_com.format('frequencies'.upper())] edi_freq_infolines =edi_freq_infolines + self._write_components_blocks( edi_datacomp= self.Z.freq, comp_key='freq') # print(self.Z.rotation_angle) if self.typefile == 'mt' or 'mt' in self.typefile: self.typefile = 'mt' # write impedance rotation angle : #>!****IMPEDANCE ROTATION ANGLES****! #if rotation angle is =0 , for consistency, #fill zeros value as Zrot try : if isinstance(self.Z.rotation_angle, float) or \ len(self.Z.rotation_angle) ==1: self.Z.rotation_angle =np.full_like(self.Z.freq, self.Z.rotation_angle, dtype=np.float64) # self.Z.rotation_angle = np.zeros_like (self.Z.freq, # dtype =np.float64) except : self._logging.error('Error in "edi-file" while setting ' 'rotation angle. ') else : if self.verbose: warnings.warn('Rotation angle is set to {0}'.format( self.Z.rotation_angle[0])) self._logging.info('Rotation angle is = {0}.'.format( self.Z.rotation_angle[0])) edi_zrot_infolines =[self._data_head_com.format( 'impedance rotation angles'.upper())] edi_zrot_infolines = edi_zrot_infolines +\ self._write_components_blocks(edi_datacomp= self.Z.rotation_angle, comp_key = 'zrot') if self.typefile =='emap' or 'emap' in self.typefile : self.typefile ='emap' #--> Write impedance data and tipper # for consistency , may replace if exist nan number by np.nan self.Z.z =np.nan_to_num(self.Z.z) self.Z.z_err = np.nan_to_num (self.Z.z_err) self.Z.resistivity = np.nan_to_num(self.Z.resistivity) self.Z.phase = np.nan_to_num (self.Z.phase) #---> >!****IMPEDANCES****! edi_z_data_infolines = [self._data_head_com.format('impedances'.upper())] for ii in range (2): for jj in range (2): # if np.all(self.Z.z[:, ii, jj].real ==0.0) \ # or np.mean (self.Z.z[:, ii, jj].imag)==0.0 or # np.mean(self.Z.z_err[:, ii, jj])==0. :pass # dont write this none value #else : zreal_datalines =self._write_components_blocks( edi_datacomp = self.Z.z[:, ii, jj].real , comp_key =_ZRP_COMPS [0][ii *2 + jj][0]) zimag_datalines =self._write_components_blocks( edi_datacomp = self.Z.z[:, ii, jj].imag, comp_key =_ZRP_COMPS [0][ii *2 + jj][1]) z_error_values =self.Z.z_err[:, ii, jj] **2 zvariance_datalines =self._write_components_blocks( edi_datacomp =z_error_values, comp_key =_ZRP_COMPS [0][ii *2 + jj][2]) zreal_datalines.append('\n'), zimag_datalines.append('\n') zvariance_datalines.append('\n') edi_z_data_infolines.extend(zreal_datalines) edi_z_data_infolines.extend(zimag_datalines) edi_z_data_infolines.extend(zvariance_datalines) # write EMAP : #--------------------------------------------------------------- if self.typefile =='emap': # define rho and phase array edi_rhophs_infolines =[self._data_head_com.format( 'Resistivities and phases'.upper())] rho = np.zeros ((self.Z.freq.size , 2, 4), dtype=np.float64) phs = np.zeros ((self.Z.freq.size , 2, 4), dtype=np.float64) if filtered_array is not None : frho = np.zeros ((self.Z.freq.size , 2, 4), dtype=np.float64) frho [:, 1 , 0] = filtered_array[:, 0, 1] frho [:, 0 , 1] = filtered_array[:, 1, 0] rho [:, 0, 0] = self.Z.res_xx rho [:, 0, 1] = self.Z.res_err_xx **2 rho [:, 0, 2] = self.Z.res_err_xx rho [:, 1, 0] = self.Z.res_xy rho [:, 1, 1] = self.Z.res_err_xy**2 rho [:, 1, 2] = self.Z.res_err_xy phs [:, 0, 0] = self.Z.phase_xx phs [:, 0, 1] = self.Z.phase_err_xx **2 phs [:, 0, 2] = self.Z.phase_err_xx phs [:, 1, 0] = self.Z.phase_xy phs [:, 1, 1] = self.Z.phase_err_xy**2 phs [:, 1, 2] = self.Z.phase_err_xy # convert np.nan to number rho , phs = np.nan_to_num(rho), np.nan_to_num(phs) for ii in range(2): for jj in range (4) : if np.all(rho[:, ii , jj]==0 ) or\ np.all (phs[:, ii, jj]==0): continue else : res_datalines = self._write_components_blocks( edi_datacomp = rho[:, ii, jj], comp_key = _ZRP_COMPS[1][ii][jj]) phs_datalines = self._write_components_blocks( edi_datacomp = phs[:, ii, jj], comp_key = _ZRP_COMPS[2][ii][jj]) res_datalines.append('\n') phs_datalines.append('\n') edi_rhophs_infolines.extend(res_datalines) edi_rhophs_infolines.extend(phs_datalines) if filtered_array is not None : fres_datalines = self._write_components_blocks( edi_datacomp = frho[:, ii, jj], comp_key = _ZRP_COMPS[3][ii][jj]) fres_datalines.append('\n') edi_rhophs_infolines.extend(fres_datalines) # delete one more return edi_z_data_infolines = edi_z_data_infolines[:-1] # write EMAP pass :***TIPPER *** #--------------------------------------------------------------- if np.all(self.Tip.tipper ==0 ) and self.Tip.tipper is not None : edi_tip_data_infolines , edi_tip_rot_infolines =[''], [''] else : #>!****TIPPER ROTATION ANGLES****! try : edi_tip_rot_infolines =[self._data_head_com.format( 'tipper rotation angles'.upper())] if self.Tip.rotation_angle.dtype ==np.float64 : #fill tip_rot_value tip_rotation_value = np.repeat ( self.Tip.rotation_angle , self.Tip.freq.size) else : tip_rotation_value = np.repeat (self.Tip.rotation_angle) edi_tip_rot_infolines = edi_tip_rot_infolines+ \ self._write_components_blocks( edi_datacomp= np.array(tip_rotation_value), comp_key = 'trot') # write Tipper data # >!****TIPPER PARAMETERS****! _TIP_LABELS edi_tip_data_infolines =[self._data_head_com.format( 'tipper parameters'.upper())] for ss in range (2) : tipreal_lines = self._write_components_blocks( edi_datacomp= self.Tip.tipper[:, 0, jj].real, comp_key=_TIP_COMPS[ss][0]) tipimag_lines = self._write_components_blocks( edi_datacomp= self.Tip.tipper[:, 0, jj].imag, comp_key=_TIP_COMPS[ss][1]) tipvariance_lines = self._write_components_blocks( edi_datacomp= self.Tip.tipper[:, 0, jj]**2, comp_key=_TIP_COMPS[ss][2]) edi_tip_data_infolines.extend( tipreal_lines ) edi_tip_data_infolines.extend( tipimag_lines ) edi_tip_data_infolines.extend( tipvariance_lines) except : edi_tip_data_infolines , edi_tip_rot_infolines =[''], [''] #WRITE EDI write_edilines = edi_header_infolines + edi_info_infolines +\ edi_definemeasurement_infolines + edi_mtemap_infolines if self.typefile =='mt': for ilines in [edi_freq_infolines , edi_zrot_infolines, edi_z_data_infolines, edi_tip_rot_infolines, edi_tip_data_infolines ]: if ilines== [''] : continue else : write_edilines += ilines +['\n'] if self.typefile =='emap': for ilines in [edi_freq_infolines , edi_z_data_infolines, edi_rhophs_infolines]: write_edilines +=ilines +['\n'] write_edilines.append('>END') # write file : with open (new_edifilename , 'w+', encoding = 'utf8') as fw : fw.writelines(write_edilines) # make a default path. dpath= ( "_ediout.{}".format(datetime.datetime.utcnow().strftime( '%Y.%m.%d.%H.%M') ) ) self.savepath = cpath(self.savepath, dpath) try : shutil.move(new_edifilename, self.savepath ) except : pass write_edilines=[] if self.verbose >0 : print('-'*77) print('---> EDI-file <{0}> wrote successfully to <{1}>'. format(os.path.basename(new_edifilename), self.savepath ) ) return new_edifilename
def _write_components_blocks ( self, edi_datacomp , comp_key, datatype=None): """ Method to write blocks with data components keys . Parameters ------------ edi_datacomp: ndarray (nfreq, 2, 2), array of data components, :comp_key: str, component to write in edifile . :datatype: str, default ='mt' *mt* or *eamap*, *default* is "mt" Returns -------- data_list: list, list of line to write in edifile Notes ------- We assume that comp_key provided is found on the edifile before using this method. """ self._logging.info ( 'Ready to write edi data component blocks info list !') block_rot =['ROT=ZROT', 'ROT=TROT', 'ROT=NONE'] if datatype is None : datatype =self.typefile if comp_key.lower() =='freq': comp_block_line=['>FREQ //{0}\n'.format(edi_datacomp.size)] if datatype.lower() in ["mt", '>=mtsect', 'mtsect'] : if comp_key.lower() =='zrot' or comp_key.lower()=='trot': comp_block_line=['>{0} //{1}\n'.format(comp_key.upper(), edi_datacomp.size)] elif comp_key.lower().find('z') >=0 \ and comp_key.lower() not in ['zrot', 'trot']: comp_block_line = ['>{0} {1} //{2}\n'.format( comp_key.upper(),block_rot[0],edi_datacomp.size)] elif comp_key.lower().find('t')>=0 \ and comp_key.lower() not in ['zrot', 'trot']: comp_block_line = ['>{0} {1} //{2}\n'.format( comp_key.upper(),block_rot[1],edi_datacomp.size)] else : if comp_key.lower()!='freq': raise EDIError( 'Could not write the component key <%s>'% comp_key) elif datatype in ['emap', '>=emapsect','emapsect']: if comp_key.lower()!='freq': comp_block_line = ['>{0} {1} //{2}\n'.format( comp_key.upper(), block_rot[2],edi_datacomp.size)] # else : raise EDIError( #'Could not write the component key <%s>'% comp_key) else :#datatype.lower() not in ['mt' , '>=mtsect'] #and datatype.lower() not in ['emap', '>=emapsect']: warnings.warn ( 'Two Types of data can be read either MTsection or EMAPsection.' 'The dataType provided <{0}> does not match any of them.' ' We suggest to use "mt" or "emap".as datatype key, if not ' 'please refer to SEG-Edile ''write principles.'.format(datatype)) raise EDIError( 'DataType <{0}> provided is wrong!' ' please use "MT"or "EMAP".'.format(datatype)) #-- > read value form component edi_data_comp for index_data , datacomp in enumerate (edi_datacomp , 1) : if datacomp == 0.0 and comp_key.lower() not in ['zrot', 'trot']: datacomp = 1.0E32 # if datacomp.mean() == self.Head.empty : #continue dont write this data when all are empty # else : format_value ="{0:{1}}".format(datacomp, self._bloc_num_format) format_value =format_value.upper() # 1.000000E+32 if index_data % self._block_size == 0 : format_value =format_value + '\n' # ---> when block is at the end if index_data == edi_datacomp.size : format_value +='\n' #join block value to its corresponding comp_block_line.append(format_value) return comp_block_line @property def station (self): return self.Head.dataid @property def processingsoftware (self): return self.Info.Processing.ProcessingSoftware.name @station.setter def station(self, set_station_name): if not isinstance(set_station_name,str): set_station_name = str(set_station_name) self.Head.dataid = set_station_name self.MTEMAP.sectid = set_station_name @processingsoftware.setter def processingsoftware (self, sofware_name): self.Info.Processing.ProcessingSoftware.name = sofware_name
[docs] def interpolateZ(self, new_freq_array, interp_type='slinear', bounds_error=True, period_buffer=None): """ Interpolate the impedance tensor onto different frequencies. :param new_freq_array: a 1-d array of frequencies to interpolate on to. Must be with in the bounds of the existing frequency range, anything outside and an error will occur. :type new_freq_array: np.ndarray :param period_buffer: maximum ratio of a data period and the closest interpolation period. Any points outside this ratio will be excluded from the interpolated impedance array. :returns: a new impedance object with the corresponding frequencies and components. :rtype: watex.core.z.Z :Interpolate: :: >>> import watex.edi as CSedi >>> edi_fn = r"/home/edi_files/cs_01.edi" >>> edi_obj = CSedi.Edi(edi_fn) >>> # create a new frequency range to interpolate onto >>> new_freq = np.logspace(-3, 3, 24) >>> new_z_object= edi_obj.interpolate(new_freq) >>> edi_obj.write_new_edifile(new_edi_fn=r"/home/edi_files/cs_01_interp.edi", >>> ... new_Z_obj=new_z_object, >>> ... ) """ # if the interpolation module has not been loaded return if interp_import is False: raise ImportError('could not interpolate, need to install scipy') # make sure the input is a numpy array if not isinstance(new_freq_array, np.ndarray): new_freq_array = np.array(new_freq_array) new_freq_array = np.around (new_freq_array, 5) if period_buffer is not None: if 0. < period_buffer < 1.: period_buffer += 1. print("Warning: period buffer must be > 1. Updating to", period_buffer) # check the bounds of the new frequency array if bounds_error: # logger.debug("new freq array %s", new_freq_array) if self.Z.freq.min() > new_freq_array.min(): raise ValueError( 'New frequency minimum of {0:.5g}'.format(new_freq_array.min()) + \ ' is smaller than old frequency minimum of {0:.5g}'.format( self.Z.freq.min()) + \ '. The new frequency range needs to be within the ' + 'bounds of the old one.') if self.Z.freq.max() < new_freq_array.max(): raise ValueError( 'New frequency maximum of {0:.5g}'.format(new_freq_array.max()) + \ ' is larger than old frequency maximum of {0:.5g}'.format( self.Z.freq.max()) + \ '. The new frequency range needs to be within the ' + 'bounds of the old one.') # make a new Z object new_Z = Z(z_array=np.zeros((new_freq_array.shape[0], 2, 2), dtype='complex'), z_err_array=np.zeros((new_freq_array.shape[0], 2, 2)), freq=new_freq_array) # interpolate the impedance tensor for ii in range(2): for jj in range(2): # need to look out for zeros in the impedance # get the indicies of non-zero components nz_index = np.nonzero(self.Z.z[:, ii, jj]) if len(nz_index[0]) == 0: continue # get the non-zero components z_real = self.Z.z[nz_index, ii, jj].real z_imag = self.Z.z[nz_index, ii, jj].imag z_err = self.Z.z_err[nz_index, ii, jj] # get the frequencies of non-zero components f = self.Z.freq[nz_index] # get frequencies to interpolate on to, making sure the # bounds are with in non-zero components new_nz_index = np.where((new_freq_array >= f.min()) & (new_freq_array <= f.max()))[0] new_f = new_freq_array[new_nz_index] # apply period buffer if type(period_buffer) in [float, int]: new_f_update = [] new_nz_index_update = [] for ifidx,ifreq in enumerate(new_f): # find nearest data period difference = np.abs(np.log10(ifreq) - np.log10(f)) fidx = np.where(difference == np.amin(difference))[0][0] if max(f[fidx] / ifreq, ifreq / f[fidx]) < period_buffer: new_f_update.append(ifreq) new_nz_index_update.append(new_nz_index[ifidx]) new_f = np.array(new_f_update) new_nz_index = np.array(new_nz_index_update) # create a function that does 1d interpolation z_func_real = spi.interp1d(f, z_real, kind=interp_type) z_func_imag = spi.interp1d(f, z_imag, kind=interp_type) z_func_err = spi.interp1d(f, z_err, kind=interp_type) # interpolate onto new frequency range new_Z.z[new_nz_index, ii, jj] = z_func_real( new_f) + 1j * z_func_imag(new_f) new_Z.z_err[new_nz_index, ii, jj] = z_func_err(new_f) # compute resistivity and phase for new Z object new_Z.compute_resistivity_phase() return new_Z
# --> write new edi file
[docs] def write_new_edifile(self, new_edi_fn=None, new_Z=None, **kws): """ write a new edi file if things have changed. Note if new_Z is not None, they are not changed in `Edi` object, you need to change them manually if you want them to be changed. Similarly, the new function name does not change the `Edi` object `edi_filename` attribute but does change Edi.edi_object.edi_filename attribute. :param edi_fn: full path to new edi file :type new_edi_fn: string :param new_Z: new Z object :type new_Z: watex.core.z.Z :param new_Tipper: new Tipper object :type new_Tipper: watex.core.z.Tipper :returns edi_fn: full path to edi file written :rtype edi_fn: string """ # get header information, mostly from site edi_obj = Edi().fit(edifile=self.edifile) if new_Z is not None: edi_obj.Z = new_Z else: edi_obj.Z = self._Z # --> write edi file edi_fn = edi_obj.write_edifile(new_edifilename= new_edi_fn, **kws) return edi_fn
Edi.__doc__="""\ Electrical Data Interchange class. Each section of the .edi file belongs to a class object, thus the elements of each section are attributes for easy access. Edi is outputted following the SEG documentation and rules of EMAP (Electromagnetic Array Profiling) and MT sections [1]_. Ediclass is for especialy dedicated to .edi files, mainly reading and writing which are meant to follow the archaic EDI format put forward by SEG. Can read impedance, Tipper but not spectra. To read spectra format please consult MTpy documentation https://mtpy2.readthedocs.io/en/develop/ The Edi class contains a class for each major section of the .edi file. Parameters ----------- edifile: str, Full path to SEG-EDI file. Attributes ----------- MTEMAP: :class:`watex.edi.MTEMAP` MT section or EMAP DataSection class, contains basic information on the data collected . Can read MT and EMAP section DefineMeasurement: :class:`watex.edi.DefineMeasurement` DefineMeasurement class, contains information on how the data was collected. HeaD: :class:`watex.edi.Head` Head class, contains metadata on where, when, and who collected the data Information class. Info: :class:`watex.edi.Info` contains information on how the data was processed and how the transfer functions where estimated. Z: :class:`watex.external.z.Z` Z class, contains the impedance data _block_size: int, defaut=7 number of data in one line. _data_header_com: str, Default='>!****{0}****!' header string for each of the data section. _bloc_num_format: str, Default=' 16.6e' string format of data, Default is' 16.6e' _t_comps: list, components for tipper blocks _z_comps: list components for impedance blocks _res_comps:list resistivities components _phs_comps:list phase components Notes ------ Frequency and components are ordered from highest to lowest frequency. References ---------- .. [1] Wight, D.E., Drive, B., 1988. MT/EMAP Data Interchange Standard, 1rt ed. Society of Exploration Geophysicists, Texas 7831, USA. Examples ------------ >>> from watex.edi import Edi >>> e=Edi().fit('data/edis/new_E1_1.edi') >>> e.Z.phase [0, 0, 1] Out[6]: 26.42546593360611 """ class Head (object): """ The edi head block contains a series of options which (1) identity the data set, (2) describe whn , where and by whoom was acquired , and (3) describe when , how and by whom it was written. Arguments ---------- **edi_fn** :str Full path to edi path > HEAD - DATAID="kap012" - ACQBY="Phoenix" - FILEBY="EMTF FCU" - FILEDATE=01/02/18 - LAT=-30:52:05.62 - LONG=21:44:35.00 - ELEV=1166 - STDVERS=SEG 1.0 - PROGVERS="4.0" - PROGDATE=06/20/11 - MAXSECT=999 - EMPTY=1.0e+32 ============== =============================== ============= =========== Attribute Description Restriction Default ============== =============================== ============= =========== dataid Identifier for data set str Required acqby Name of contractor or str otherparty Required fileby Name of contractor of other party str Required acqdate Date of (start of) data acquisition date Required enddate Date of end of data acq date "" filedate Date EDI was written Date Required country Name of country of acq. str "" state state(s) of province(s) of acquisition str "" county Name of country of acq. str "" prospect Name of associated prospect str "" loc Description of location str "" lat avg.(approx) latitude of acq. str "" long avg.(approx)longitude of avq. str "" elev avg.(approx)elevation of acq. str "" units Units for elevation "m"|"ft" "m" stdvers Version of EDI format for this file str Required progvers Version ID for prog. written str Required progdate Last revision of prog writing file str Required coordsys coordinate system [geographic|geomagnetic] str Geog.North declination geomagnetic declination float "10." maxsect Maximum data section in EDI file int>=1 "16" bindata if not "", tag for binary data str of "" "" file empty Value which represents"nodata" float "1.0E32" ============== =============================== ============= =========== """ head_keys =[ 'dataid', 'acqby', 'fileby', 'acqdate', 'enddate', 'filedate', 'country', 'state', 'county', 'prospect', 'loc', 'lat', 'long', 'elev', 'declination', 'datum', 'units', 'stdvers', 'coordsys', 'progvers', 'progdate', 'maxsect', 'bindata', 'project', 'survey', 'empty' ] def __init__(self, edi_header_list=None , **kwargs): self.logging = watexlog.get_watex_logger(self.__class__.__name__) self.Location =Location () self.dataid =None self.acqby =None self.fileby =None self.acqdate =None self.enddate =None self.filedate =datetime.datetime.utcnow().strftime( '%Y/%m/%d %H:%M:%S UTC') self.country =None self.state =None self.county =None self.prospect =None self.loc =None self.units ='m' self.stdvers='SEG 1.0' self.progvers=f'watex {_wx_version_}' self.progdate =datetime.datetime.utcnow().strftime('%Y/%m/%d') self.coordsys ='Geomagnetic North' self.declination =None self.datum ='WGS84' self.maxsect =None self.bindata =None self.project =None self.survey =None self.empty =1.0E32 self.edi_header = edi_header_list for key in list(kwargs.keys()): setattr(self, key, kwargs[key]) if self.edi_header is not None : self.read_head() @property def lat (self): return self.Location.latitude @lat.setter def lat (self, lat): try : float(lat) except : self.Location.latitude =convert_position_str2float(lat) self.logging.info ( 'Converted string "dms" input latitude to decimal degree.') else : self.Location.latitude =float(lat) @property def long (self): return self.Location.longitude @long.setter def long (self, long): try :float (long) except : self.Location.longitude= convert_position_str2float(long) self.logging.info ( 'Converted string "dms" input longitude to decimal degree.') else : self.Location.longitude = float(long) @property def elev(self): return self.Location.elevation @elev.setter def elev(self, elev): self.Location.elevation =elev @classmethod def get_header_list_from_edi (cls, edi_fn=None): """ Class method to return edi_head_list . :paramedi_fn: full path to edifile :type edi_fn: str """ _logger.info ('Getting <%s> Head info'% edi_fn) if edi_fn is None : raise EDIError( 'Edile file not found ! Please check your right path.') markhead , ediheadlist=0,[] IsEdi._assert_edi(edi_fn) with open(edi_fn , 'r', encoding ='utf8') as fh : head_lines = fh.readlines() for hh, headitems in enumerate (head_lines): if '>HEAD' in headitems or \ re.match(r'>HEAD', headitems) is not None: markhead = hh if ('>INFO' in headitems or\ re.match(r'>INFO', headitems) is not None) or ( ###BUG # sometimes >INFO data are missing so for safety # we try to use for stop the next EDI section which # >=DEFINEMEAS' #re.match(r'>=DEFINEMEAS', headitems) is not None headitems.find('>=DEFINEMEAS')>=0 ): ediheadlist.extend(head_lines[markhead:hh]) break return cls(edi_header_list = ediheadlist) def read_head (self, edi_header_list =None): """ read_header_list and set attributes values :param edi_header_list: list of edifile header infos :type edi_header_list: list :Example: >>> from watex.edi import Head >>>> file_edi= 'S00_ss.edi' >>>> path = os.path.join(os.environ["watex"], ... 'watex','data', file_edi) >>> edihead= Head.get_header_list_from_edi(edi_fn=path) >>> print(edihead.lat) >>> print(edihead.long) >>> print(edihead.elev) >>> print(edihead.acqby) """ self.logging.info ('Reading info list.') if edi_header_list is not None : self.edi_header = edi_header_list if self.edi_header is None : raise EDIError('None items of Header found to read.') new_header =[] if self.edi_header is not None : for ii, item in enumerate(self.edi_header) : item= item.strip().replace('"','') # replace any blank line for key in self.head_keys : # some Edifile use LON instead of LONG , # must take into accountfor parsing if key=='long' : key='lon' # set attribute that exist in headkeys if key.upper() in item: #for more consistency , #try to clean possible space before setting attribute # function return a list of clean value keyi =strip_item( item_to_clean=item.lower().split('=')[0])[0] if keyi=='lon': keyi= 'long' if keyi =='coordsys' :keyi =='coordinate_system' try : value = strip_item( item_to_clean=item.split('=')[1])[0] except: value ='' self.__setattr__(keyi.lower(), value) new_header.append(''.join([keyi.upper(), '=', value])) self.edi_header = new_header return self def write_head_info (self , head_list_infos =None): """ Write list info . Can read edi and rewrite list or to provide input as ['key_01=value_01', 'key_02=value_02', ...,'key_nn=value_nn'] .. Note:: If value is None , don't need to write the key . :param head_list_infos: list , list of head info :type head_list_infos: list :returns: write_info list , list ready to write to let EDI file more visible . :rtype: list :Example: >>> from watex.edi import Head >>> path = os.path.join(os.environ["watex"], ... 'watex','data', S00_ss.edi) >>> edihead= Head.get_header_list_from_edi(edi_fn=path) >>> print(edihead.write_head_info( ... head_list_infos = edihead.edi_header)) """ self.logging.info ('Writing Edifile info .') write_header =['>HEAD\n'] if head_list_infos is not None : self.edi_header = head_list_infos if self.edi_header is None: for key in self.head_keys : keyvalue =getattr(self, key) if keyvalue == None : continue#keyvalue='' elif keyvalue !=None : if key =='long' or key=='lat': keyvalue =convert_position_float2str(keyvalue) if key =='dataid' : keyvalue ='"{0}"'.format(keyvalue) # put "" if key =='stdvers' : keyvalue ='"{0}"'.format(keyvalue) if key =='progvers': keyvalue ='"{0}"'.format(keyvalue) write_header.append(''.join([' {0}'.format( key.upper()),'=',str(keyvalue),'\n'])) else : write_header.append(''.join([' {0}'.format( key.upper()),'=',str(keyvalue).upper(),'\n'])) elif self.edi_header is not None : #for consistency rewrite . for items in self.edi_header : item, itemvalue= items.strip().split('=') if item.lower() in self.head_keys : if itemvalue in ['None' , ' ', '',None]:pass else : # be sure to let item with no space. item =strip_item(item_to_clean=item.lower())[0] if item =='dataid' : itemvalue ='"{0}"'.format(itemvalue) # put "" if item =='stdvers' : itemvalue ='"{0}"'.format(itemvalue) if item =='progvers': itemvalue ='"{0}"'.format( getattr(Software(**{ 'name':f'watex {_wx_version_}'.upper()}), 'name')) write_header.append(''.join( [' {0}'.format(item.upper()),'=', itemvalue,'\n'])) else : write_header.append(''.join( [' {0}'.format(item.upper()),'=', itemvalue.upper(),'\n'])) write_header.append('\n') return write_header class Info : """ Class EDI info class , collect information of the survey. > INFO - MAXINFO=999 - PROJECT=SAMTEX - SURVEY=Kaapvaal 2003 - YEAR=2003 - PROCESSEDBY=SAMTEX team - PROCESSINGSOFTWARE=JONES 2.3 - PROCESSINGTAG= - SITENAME=South Africa - RUNLIST= - REMOTEREF= - REMOTESITE= - SIGNCONVENTION=exp(+ i\omega t) ================ ================ ======================================= Attributes Type Explanation ================ ================ ======================================= maxrun int>=1 maximum number of text lines in info text(maybe less) Source class obj Porvenace of data to rewrite Processing Processing obj How data where processed Notes Note class info additions ================ ================ ======================================= """ infokeys =['maxinfo', 'project', 'survey', 'creationdate', 'processedby','processingsoftware', 'processingtag','sitename', 'runlist', 'remoteref', 'remotesite','signconvention'] def __init__(self, edi_info_list =None , **kwargs ): self.logging =watexlog.get_watex_logger(self.__class__.__name__) self.ediinfo =edi_info_list self.filter =None #use for EMEAP section self.maxinfo =999 self.Source=Source () self.Processing=Processing() self.Head=Head() self.Copyright =Copyright(conditions_of_use = ( "Data located in directory <data/> cannot be used for commercial" " purposes and can be distributed to a third party. However, user" " can go IRIS website: http://ds.iris.edu/ds/tags/magnetotelluric-data/" " to download metadata available free of charge and may be copied " " freely, duplicated and further distributed provided this data" " is cited as the reference." ) ) for key in list(kwargs.keys()): self.__setattar__(key, kwargs[key]) if self.ediinfo is not None : self.read_info() @classmethod def get_info_list_from_edi (cls, edi_fn =None): """ Class to get edinfo from edifiles :param edi_fn: full path to edifile :type edi_fn: str :returns: edi_info_list :rtype: list """ _logger.info ( 'subclass :Reading <%s> Ediinfo and return class.'% cls.__name__) info_mark=0 if edi_fn is None : raise EDIError( 'None infos to read! Please provide the right path .') # we do this simultaneous assement to get a ms of computation . IsEdi._assert_edi(edi_fn, deep= True ) with open(edi_fn, 'r', encoding ='utf8') as fi: edi_lines =fi.readlines() for ii, info in enumerate(edi_lines): if '>info'.upper()in info or info.find('>INF')>= 0 or\ re.match(r'>INFO', info) is not None : info_mark=ii elif info == ['\n'] :continue elif '>=' in info or re.match(r'>=DEFINEMEAS', info) or\ info.find('>=DEFINEMEAS')>=0: #get the info plus one list_info = edi_lines[info_mark+1:ii] break return cls (edi_info_list=list_info) def read_info (self, edi_info_list =None ): """ readinformation and populate attaribute info can set other attributes once read and not present on the file. :param edi_info_list: list of infos files :type edi_info_list: list """ listinfo=[] if edi_info_list is not None : self.ediinfo =edi_info_list if self.ediinfo is None : raise TypeError('NoneType of EDI-info cannot be read.') for ii , iteminfo in enumerate(self.ediinfo ): iteminfo=iteminfo.replace('"','') if iteminfo =='\n':continue #continue elif iteminfo !='\n': try : item , infovalue = iteminfo.strip().split('=') except : #Try to set attribute for consistency #wheter no value is provide as info value .[''] if len(iteminfo.strip().split('='))==1 : item , infovalue= iteminfo.strip().split('=')[0], None pass if item =='' or infovalue == '' or len(item)==0 or\ item ==[] or infovalue ==None :continue else : item , infovalue = strip_item( item_to_clean=item.lower())[0], strip_item(item_to_clean=infovalue)[0] if 'project' in item or 'survey' in item or\ 'date' in item or 'sitename'in item: self.Source.__setattr__(item, infovalue) listinfo.append(''.join([' {0}'.format(item.upper()), '=', infovalue])) elif 'process' in item or 'run' in item or\ 'remo' in item or 'signconv' in item : if item =='processingsoftware': self.Processing.ProcessingSoftware.__setattr__('name', infovalue) listinfo.append(''.join([' {0}'.format( item.upper()), '=', infovalue])) else : self.Processing.__setattr__(item, infovalue) listinfo.append(''.join([' {0}'.format( item.upper()), '=', infovalue])) elif 'creating_application' in item : if self.Processing.ProcessingSoftware.name is None : self.Processing.ProcessingSoftware.__setattr__('name', infovalue) else : self.__setattr__(item, infovalue) listinfo.append(''.join([' {0}'.format( item.upper()), '=', infovalue])) self.ediinfo = listinfo return self def write_edi_info (self, edi_info_list =None ): """ Write edi information info . Can read edi and rewrite list or to provide input as ['key_01=value_01', 'key_02=value_02', ...,'key_nn=value_nn' ] Note : If value is absent i.e None , don't write the key . Info write method add somefield notes informations from other softwares if exists. :param edi_info_list: list of infos contain in info sections :type edi_info_list: list :returns: list of info :rtype: list :Example: >>> from watex.edi import Info >>> file_edi_2='SAMTEX.edi_2.edi' >>> file_edi= 'S00_ss.edi' >>> path = os.path.join(os.environ["watex"], ... 'watex','data', file_edi_2) >>> info_obj =Info.get_info_list_from_edi(edi_fn=path) >>> print(info_obj.write_edi_info()) """ self.logging.info ( 'writing ediinfo from <%s> class.' % self.__class__.__name__) write_info =['>INFO\n'] if edi_info_list is not None : self.ediinfo = edi_info_list if self.ediinfo is None : for key in self.infokeys : if key in ['project' , 'survey' ,'creationdate', 'sitename' ]: valueinfo =getattr(self.Source, key) elif 'process' in key or 'run' in key or\ 'rem'in key or 'signconv' in key : if key == 'processingsoftware': valueinfo = getattr(self.Processing.ProcessingSoftware, 'name') elif key =='processedby': valueinfo = getattr(self.Processing, 'processedby') else : valueinfo = getattr(self.Processing, key) else : valueinfo = getattr(self, key) if key == 'processedby': write_info.append(''.join([' {0}'.format(key.upper()), '=', '"{0}"'.format(str(valueinfo)), '\n'])) else : write_info.append(''.join([' {0}'.format(key.upper()), '=', str(valueinfo).upper(), '\n'])) #add softwares info write_info.append(''.join([' {0}'.format('CREATINGSOFTWARE'), '=', getattr(self.Source, 'creatingsoftware'), '\n'])) # write_info.append(''.join([' {0}'.format('CREATIONDATE'), '=', # getattr(self.Source, 'creationdate'), '\n'])) if self.filter is not None : write_info.append(''.join([' {0}'.format('FILTER'), '=', str(getattr(self, 'filter')).upper(), '\n'])) elif self.ediinfo is not None : #rewrite for consistency for infok in self.infokeys : if infok in ['project' , 'survey' , 'creationdate' , 'sitename' ]: valueinf =getattr(self.Source, infok) if infok == 'creationdate' : valueinf = getattr(self.Source, infok) elif 'process' in infok or 'run' in infok or\ 'rem'in infok or 'signconv' in infok : if infok == 'processedby' : valueinf = '"{0}"'.format(self.Source.creatingsoftware) elif infok == 'processingsoftware' : valueinf ='"{0}"'.format( getattr(self.Processing.ProcessingSoftware, 'name')) else : valueinf =getattr(self.Processing, infok) elif self.filter is not None : write_info.append(''.join( [' {0}'.format('FILTER'), '=', getattr(self, 'filter'), '\n'])) else : valueinf =getattr(self, infok) if valueinf is None :continue else : write_info.append(''.join( [' {0}'.format(infok.upper()), '=', str(valueinf), '\n'])) for value in self.ediinfo: #add others info from procesing files. keyi, valueinf = value.strip().split('=') keyi = strip_item(item_to_clean =keyi)[0] valueinf = strip_item(item_to_clean =valueinf)[0] if keyi.lower().find('lat')>=0: write_info.append(''.join( [' {0}'.format(keyi), '=', valueinf, '\n'])) elif keyi.lower().find('azimuth')>=0: write_info.append(''.join( [' {0}'.format(keyi), '=', valueinf, '\n'])) elif keyi.lower().find ('lon')>=0: write_info.append(''.join( [' {0}'.format(keyi), '=', valueinf, '\n'])) elif keyi.lower().find('h')>=0 or keyi.lower( ).find('elev')>=0: write_info.append(''.join( [' {0}'.format(keyi), '=', valueinf, '\n'])) elif 'notes' in keyi.lower() : #add some other fieldnotes : write_info.append(''.join( [' {0}'.format(keyi), '=', valueinf, '\n'])) write_info.append('\n') return write_info class DefineMeasurement: """ Begins Measurement definition data section . Defines Location of sensors and parameters pertainning to runs for each measurments . Arguments ----------- **param defineMeas_list**: list list for define measurement infos >=DEFINEMEAS - MAXCHAN=7 - MAXRUN=999 - MAXMEAS=9999 - UNITS=M - REFTYPE=CART - REFLAT=-30:52:05.62 - REFLONG=21:44:35.00 - REFELEV=1166 >!****CHANNELS USING ORIGINAL SITE LAYOUT. FOR ROTATIONS SEE ZROT****! - >HMEAS ID=1001.001 CHTYPE=HX X= 0.0 Y= 0.0 Z= 0.0 AZM= 0.0 - >HMEAS ID=1002.001 CHTYPE=HY X= 0.0 Y= 0.0 Z= 0.0 AZM= 90.0 - >HMEAS ID=1003.001 CHTYPE=HZ X= 0.0 Y= 0.0 Z= 0.0 AZM= 0.0 - >EMEAS ID=1004.001 CHTYPE=EX X= -50.0 Y= 0.0 Z= 0.0 X2= 50.0 Y2= 0.0 AZM= 0.0 - >EMEAS ID=1005.001 CHTYPE=EY X= 0.0 Y= -50.0 Z= 0.0 X2= 0.0 Y2= 50.0 AZM= 90.0 ================ =================================== ======= =========== Attributes Description Default Restriction ================ =================================== ======= =========== defineMeas_list list of definemeasurment None no maxchan Maximum number of channels measured None yes maxmeas Maximum number of measurements 9999 yes maxrun Maximum number of measurement runs 999 yes meas_#### Hmeasurement or EmEasurment object None yes defining the measurement made refelev Reference elevation (m) None yes reflat Reference latitude None yes refloc Reference location None yes reflon Reference longituted None yes reftype Reference coordinate system 'cart' yes units Units of length m yes ================ =================================== ======= =========== .. note:: To get the list for define measurement it's better to call the classmethod <get_define_measurement_info> to full path to .edi file to read in. ... """ definemeasurementkeys = ['maxchan', 'maxrun', 'maxmeas', 'units', 'reftype', 'reflat', 'reflong', 'refelev'] definemeasurement_comment = '>!***channels using original site layout. For rotations see zrot***!' nchan =0 def __init__(self, defineMeas_list=None , **kwargs) : self.logging=watexlog.get_watex_logger(self.__class__.__name__) self.define_measurement=defineMeas_list self.maxchan =None self.maxrun =999 self.maxmeas =9999 self.units ='m' self._reflat =None self._reflong= None self._refelev =None self.reftype ='CART' for key in list(kwargs.keys()): self.__setattr__(key, kwargs[key]) if self.define_measurement is not None : self.read_define_measurement() @property def reflat (self): return self._reflat @reflat.setter def reflat (self, reflat): self._reflat = assert_lat_value(reflat) @property def reflong (self): return self._reflong @reflong.setter def reflong (self, reflong): self._reflong = assert_lon_value(reflong) @property def refelev (self): return self._refelev @refelev.setter def refelev (self, elevation): self._refelev = assert_elevation_value(elevation) @classmethod def get_DefineMeasurement_info (cls, edi_fn=None): """ Class method to get definemeasurement list. :param edi_fn: full path to edifiles. :type edi_fn: str :returns: new class with infos list :rtype: list """ _logger.info( 'Reading < %s> file from <%s>' % (os.path.basename(edi_fn), cls.__name__)) _flagmeas =0 dfmeasurementlist = [] if edi_fn is None : raise EDIError( 'Can not read edifile .None value is found.') IsEdi._assert_edi(edi_fn) with open(edi_fn , 'r', encoding= 'utf8') as fdefm : edilines =fdefm.readlines () for dd , dmvalue in enumerate(edilines) : if '>=definemeas'.upper() in dmvalue or dmvalue.find( '>=DEFINEMEAS')>=0 or\ re.match(r'>=DEFINEMEAS',dmvalue) is not None: _flagmeas= dd if _flagmeas > 0 : if dmvalue.find('>=mtsect'.upper())>=0 or dmvalue.find( '>=emapsect'.upper())>=0 : if '>=MTSECT' in dmvalue or '>=EMAPSECT' in dmvalue: dfmeasurementlist= edilines[_flagmeas+ 1:dd] break return cls (defineMeas_list=dfmeasurementlist) def read_define_measurement (self, define_measurement_list =None ): """ readmeasurement inedilist and populate attributes . :param define_measurement_list: list of measurement data can be [key_01=value_01 , ..., key_xx = value_xx] Emeas and Hmeas will be set on dictionnary and call the class to populate attribute *default* is None :type define_measurement_list: list :Example: >>> from watex.edi import DefineMeasurement >>> file_edi= 'S00_ss.edi' >>> path = os.path.join(os.environ["watex"], ... 'watex','data', file_edi_2) >>> definemeas =DefineMeasurement.get_measurement_info(edi_fn=path) >>> print(definemeas.define_measurement) >>> print(definemeas.meas_ex) .. note:: to get measurement_hx or measurement_ex for instance get attribute <id> of Emeasurement, run the following script :Example: >>> from watex.edi import DefineMeasurement >>> definemeas =DefineMeasurement.get_measurement_info(edi_fn=path) >>> print(definemeas.meas_ex.id) ... 1004. """ self.logging.info ( 'Reading DefineMeasurement info <%s>' % self.__class__.__name__) xmeas_list =[] if define_measurement_list is not None : self.define_measurement= define_measurement_list if self.define_measurement is None : raise EDIError( 'Can not find DefineMeasurment list to read.' 'Please provide the list of definemeasurment.' ' e:g:[key_01=value_01 , ..., key_xx = value_xx] ') for keys in self.define_measurement : if re.match('r^{0}'.format('hmeas'.upper()), keys) is None or\ re.match('r^{0}'.format('emeas'.upper()),keys) is None : try : dfmkey, dfmvalue =keys.strip().split('=') except : pass else : if 'maxchan' in dfmkey.lower(): if len(dfmvalue) ==0 or dfmvalue =='' : dfmvalue = '7' if len(dfmvalue) == 0 : dfmvalue == 'None' dfmkey , dfmvalue = strip_item( item_to_clean=dfmkey.lower()) [0], strip_item( item_to_clean=str(dfmvalue)) [0] if dfmkey in self.definemeasurementkeys : #set attribute and convert to degree decimal lon and lat if "reflat" in dfmkey: self.reflat= dfmvalue self.__setattr__(dfmkey, self.reflat) xmeas_list.append(''.join( [' {0}'.format(dfmkey), '=', str(convert_position_float2str( self.reflat)), '\n' ])) elif dfmkey == 'reflon' or dfmkey =='reflong': if dfmkey =='reflon': #add "g" to get reflong defalut attribute dfmkey +='g' self.reflong = dfmvalue self.__setattr__(dfmkey, self.reflong) xmeas_list.append(''.join( [' {0}'.format(dfmkey), '=', str(convert_position_float2str( self.reflong)), '\n' ])) elif 'refelev' in dfmkey : self.refelev= dfmvalue self.__setattr__(dfmkey.lower(), self.refelev) xmeas_list.append(''.join([' {0}'.format(dfmkey), '=', str(self.refelev), '\n' ])) else : self.__setattr__(dfmkey, dfmvalue) xmeas_list.append(''.join([' {0}'.format(dfmkey), '=', dfmvalue, '\n' ])) #elif re.match('r^{0}'.format('hmeas'.upper()), #keys) is not None or re.match('r^{0}'.format( #'emeas'.upper()),keys) is not None : # set the meas into dictionar of Emas and Hmeas if 'hmeas'.upper() in keys : self.nchan +=1 keyH=keys.strip().split() keyH= gather_measurement_key_value_with_str_parser ( old_measurement_list=keyH) hdict={} for ss in keyH : if '=' in ss : hmeask, hmeasv = ss.split('=') hdict [hmeask.lower()]= hmeasv.lower() xmeas_list.append(hdict) elif 'emeas'.upper() in keys : self.nchan +=1 #get number of channels keyE =keys.strip().split() keyE= gather_measurement_key_value_with_str_parser ( old_measurement_list=keyE) edict ={} for ss in keyE : if '=' in ss : emeask, emeasv = ss.split('=') edict[emeask.lower()] =emeasv.lower() xmeas_list.append(edict) #create attribute Emeas and Hmeas fonction to chtype value for dict_meas_eh in xmeas_list : if isinstance(dict_meas_eh, dict): newkey ="meas_{0}".format(dict_meas_eh['chtype'].lower()) if 'h' in newkey : measvalue = Hmeasurement(**dict_meas_eh)# self.meas_hx etc.. if 'e' in newkey[5:] : measvalue = Emeasurement(**dict_meas_eh) # self.meas_ex etc.. try : self.__setattr__(newkey, measvalue) except : AttributeError pass self.logging.info ( 'Number of channels "emeas" and "hmeas" found is <%s>.' % self.nchan) self.define_measurement = xmeas_list return self def write_define_measurement(self, define_measurement_list = None): """ Write definemeasurement method ,intend to write and rewrite measurements infos into list. informations must be on list as possible if not may set attribute manually i.e [key01=value02 , ..., keynn=valuenn + dictXX ]dictXX ={meas_e}, {meas_hx}, {meas_ey}{meas_hx}, {meas_hy} {meas_hz} :param define_measurement_list: list of define measuremenent :type define_measurement_list: list :returns: new list of define_measurement :rtype: list . .notes :: If no edifiles is provided , can write definemeasurement by creating dict of Eand H measurment. :Example: >>> ex_dict ={'id':1002, 'chtype':'Ex', 'x':0, 'y':0, 'z':0, 'x2':-50,'y2':0, 'z2':0, ... 'acqchan':0,'filter':'hanning', 'sensor':'ex', 'gain':None} >>> ey_dict ={'id':1003.1, 'chtype':'Ey', 'x':0, 'y':50, 'z':0, 'x2':0,'y2':0, 'z2':0, ... 'acqchan':0} >>> hy_dict ={'id':1003, 'chtype':'Hy', 'x':0, 'y':50, 'z':90, 'azm':0.0,'dip':35, ... 'acqchan':'hy','filter':'Hanning', 'sensor':None, 'gain':0, 'measdate':''} >>> definemeas =DefineMeasurement() >>> ex =Emeasurement(**ex_dict) >>> ey =Emeasurement(**ey_dict) >>> hy=Hmeasurement(**hy_dict) >>> definemeas.__setattr__('meas_ex', ex) >>> definemeas.__setattr__('meas_ey', ey) >>> definemeas.__setattr__('meas_hy', hy) >>> print(definemeas.write_define_measurement()) """ write_dfmeasurements, dictlist=['>=DEFINEMEAS\n'],[] self.logging.info ( 'write defineMeasurement infos <%s>.' % self.__class__.__name__) if define_measurement_list is not None : self.define_measurement =define_measurement_list if self.define_measurement is not None : if minimum_parser_to_write_edi( edilines =self.define_measurement) is None : warnings.warn( '.edi_parser provided is wrong.Definemeasurement ' 'list provided cannot be write.Please provide key values' ' as :[key01=value02 , ..., keynn=valuenn + dictXX ]' 'with dictXX ={meas_e}, {meas_hx}, {meas_ey}{meas_hx},' ' {meas_hy} {meas_hz}') self.logging.warn( 'Please check your definemeasurement ' 'list provided. Can not be written.') raise EDIError( "Can not write the items on the definemeasurement " "list provided. It seems no right parser is found.") for ii, itemkey in enumerate(self.define_measurement): if not isinstance(itemkey, dict) : keydfmeas , valuedfmeas = itemkey.strip().split("=") keydfmeas , valuedfmeas =strip_item( item_to_clean=keydfmeas)[0],strip_item( item_to_clean=valuedfmeas)[0] write_dfmeasurements.append(''.join( [' {0}'.format(keydfmeas.upper()), '=', valuedfmeas, '\n'])) elif isinstance(itemkey, dict): dictlist.append(itemkey) write_dfmeasurements.append( self.definemeasurement_comment.upper() +'\n') for itemkey in dictlist: if 'h' in itemkey['chtype'] : write_dfmeasurements.append('>hmeas'.upper()) # valuemeas= getattr(self, 'meas_{}'.format(itemkey['chtype'])) elif 'e' in itemkey['chtype'] : write_dfmeasurements.append('>emeas'.upper()) for keym, valm in itemkey.items(): if valm in ['' , 'None' , None] : valm= str("{0:>7.2f}".format(.0)) write_dfmeasurements.append(''.join( [' {0}'.format(keym.upper()), '=', '{0:>7}'.format(valm.upper())])) write_dfmeasurements.append('\n') # for consistency , all options to right file are welcome . elif self.define_measurement is None : chtype = ['ex', 'ey', 'hx', 'hy', 'hz'] for keydefm in self.definemeasurementkeys: measvalues = getattr(self, keydefm) if keydefm =='reflong': measvalues =convert_position_float2str(measvalues) if keydefm =='reflat': measvalues =convert_position_float2str(measvalues) if measvalues ==None or measvalues=='': measvalues='' write_dfmeasurements.append(''.join( [' {0}'.format(keydefm.upper()), '=', str(measvalues).upper()+'\n'])) write_dfmeasurements.append( self.definemeasurement_comment.upper() +'\n') # then write Hmeasurement and Emeasurements throught their attributes. for meastype in chtype : if hasattr(self, 'meas_{0}'.format(meastype)): if 'e' in meastype : write_dfmeasurements.append('>emeas'.upper()) for eattr in Emeasurement.emeasurementkey : try :tempattr =getattr(self, 'meas_{0}'.format(meastype)) except :pass else : valuemeasxx = tempattr.__dict__["{0}".format(eattr)] if valuemeasxx ==None : valuemeasxx='' write_dfmeasurements.append(''.join( [' {0}'.format(eattr.upper()), '=', '{0:>10}'.format(str(valuemeasxx).upper())])) write_dfmeasurements.append('\n') elif 'h' in meastype : write_dfmeasurements.append('>hmeas'.upper()) for hattr in Hmeasurement.hmeasurementkey : try: tempattr =getattr(self, 'meas_{0}'.format(meastype)) except : AttributeError pass else : valuemeasyy = tempattr.__dict__["{0}".format(hattr)] if valuemeasyy ==None : valuemeasyy ='' write_dfmeasurements.append(''.join( [' {0}'.format(hattr.upper()), '=', '{0:>10}'.format(str(valuemeasyy).upper())])) write_dfmeasurements.append('\n') write_dfmeasurements.append('\n') return write_dfmeasurements class Hmeasurement(object): """ Define the sensor location and orientation , and run parameters for a magnetic field measurement.HMeasurement contains metadata for a magnetic field measurement. ===================== ==================================================== Attributes Description ===================== ==================================================== id Measurement ID , channel number chtype Type of Hmeasurment[ HX | HY | HZ | RHX | RHY ] x x (m) north offset from reference sensor y y (m) offest from ref sensor azm angle of sensor relative to north = 0 acqchan name of the channel acquired usually same as chtype dip dip angle for sensor ("") filter description of sensor to run ("") gain gain used for run ("") measdate date of run ("") ===================== ==================================================== To fill Metadata, let get a look of this example :Example: >>> import watex.edi as csedi >>> hmeas_dict = {'id': '1000.3', 'chtype':'hx', 'x':0, ... 'y':0, 'azm':0, 'sensor':'None'} >>> hmeas = csedi.Hmeasurement(**hmeas_dict ) >>> print(hmeas.chtype) >>> print(hmeas.azm) >>> print(hmeas.measdate) >>> print(hmeas.sensor) """ hmeasurementkey = ['id', 'chtype', 'x', 'y', 'z', 'azm','dip', 'acqchan','filter', 'sensor', 'gain', 'measdate'] def __init__(self, **kws) : for hmeaskey in self.hmeasurementkey: #initialise attribute if hmeaskey =='measdate' : self.__setattr__(hmeaskey, time.strftime( '%Y-%m-%d %H:%M:%S', time.gmtime())) elif 'x' in hmeaskey or 'y' in hmeaskey or 'z' in hmeaskey : self.__setattr__(hmeaskey, '{:<7.2f}'.format(.0)) else : self.__setattr__(hmeaskey, None) for keys in list (kws.keys()): #set attribute coming from dict if keys in self.hmeasurementkey: try : self.__setattr__(keys,'{:<10.3f}'.format( float(kws[keys]))) except :self.__setattr__(keys, kws[keys]) class Emeasurement(object): """ Define the electrode location , and run parameters for electric field measurement. EMeasurement contains metadata for an electric field measurement. ===================== ==================================================== Attributes Description (Restriction) ===================== ==================================================== id Measurement ID , channel number ('required ') chtype Type of Hmeasurement[ Ex | Ey ](required) x x (m) north offset from first electrode (reauired) y y (m) offest from ref for first electrode(reauired) z z offset from the ref for first electrode(reauired) x2 x offset from the 2nd electrode ('required') y2 y offset from the 2nd electrode (required) z2 z offset from the 2nd electrode ("") acqchan name of the channel acquired usually same as chtype filter description of sensor to run ("") gain gain used for run ("") measdate date of run ("") ===================== ==================================================== To Fill Metadata , let take this example :Example: >>> import watex.edi as csedi >>> emeas_dict = {'id': '1000.4', 'chtype':'ex', 'x':0, ... 'y':0, 'azm':0, 'acqchan':'ex', } >>> emeas = csedi.Hmeasurement(**emeas_dict) ... print(emeas.chtype) ... print(emeas.azm) ... print(emeas.measdate) """ emeasurementkey = ['id', 'chtype', 'x', 'y', 'z', 'x2','y2', 'z2', 'acqchan','filter', 'sensor', 'gain', 'measdate'] def __init__(self, **kws) : for emeaskey in self.emeasurementkey: #initialise attribute if emeaskey =='measdate' : self.__setattr__(emeaskey, time.strftime( '%Y-%m-%d %H:%M:%S', time.gmtime())) elif 'x' in emeaskey or 'y' in emeaskey or 'z' in emeaskey : self.__setattr__(emeaskey,'{:<7.2f}'.format(.0)) else : self.__setattr__(emeaskey, None) for keys in list (kws.keys()): if keys in self.emeasurementkey: try : self.__setattr__(keys,'{:<10.3f}'.format( float(kws[keys]))) except :self.__setattr__(keys, kws[keys]) class TSeries(object): """ .. _MTpy::`MTpy <https://github.com/MTgeophysics/mtpy>` .. Future plan:: We will call MTpy directy for Tseries Begin a times series data section. Defines the set of measurments for which times series data are presented. refer to MTpy software :ref:`MTpy` """ pass class Spectra(object): """ .. Future plan:: We will call MTpy directy for Spectra Begin a spectra data section. Defines the set of measurments for which spectra data are presented. Refer to :ref:`MTpy<https://github.com/MTgeophysics/mtpy>` """ pass class MTEMAP (object): """ Begins an MT and EMAP data section .Defines the default measurement for MT sounding and Defines the measurments which makeup an EMAP lines. =========================== ============================================== >=MTSECT >=EMAPSECT =========================== ============================================== - SECTID="" - SECTID=S00 - NFREQ=** - NCHAN=4 - HX= 1001.001 - MAXBLKS=999 - HY= 1002.001 - NDIPOLE =47 - HZ= 1003.001 - NFREQ=17 - EX= 1004.001 - HX= 0. - EY= 1005.001 - HY= .0 - HZ= NONE - CHKSUM =None =========================== ============================================== :param mt_or_emap_section_list: mt and emap section can read ediflies by calling class method <'get_mtemap_section_list'> :type mt_or_emap_section_list: list ============== ========================== ============= ================ Attributes Description Default Restriction ============== ========================== ============= ================ sectid Name of this section None str or"" nfreq Number of frequencies required int >=1 maxblks maximum number of blocks of this section None int>=1 ndipole Number of dipoles in the EMAP line required type Descrip. of spatial filter type used None str or "" hx Meas ID for Hx measurement None Def Meas Id or " hy Meas ID for Hy measurement None Def Meas Id or " hz Meas ID for Hz measurement None Def Meas Id or " ex Meas ID for Ex measurement None Def Meas Id or " ey Meas ID for Ey measurement None Def Meas Id or " rx Meas ID for Rx measurement None Def Meas Id or " ry Meas ID for Ry measurement None Def Meas Id or " chksum checksum total for dvalues None Num of "" ============== ========================== ============= ================ .. note :: MTEMAP can recognize function to value provided with type of data acquired either MT or EMAP. More attributes can be added by inputing a key word dictionary. ... :Example: >>> MTEMAP(**{'ex':'0011.001', 'hx':'0013.001','sectid':'Niable','nfreq':47, ... 'ey':'0012.001', 'hy':0014.001 , 'hz': 0015.001 }) >>> MTEMAP (**{'sectid':'yobkro','nfreq':18 , 'maxblks':100, 'hx':"1003.1", ... 'ex':'1005.4','hz':'1006.3', 'ey':1002.3 , 'hy':'1000.2','chksum':47, ... 'ndipole':47, 'type':'hann'}) """ mtemapsectkey =['sectid', 'nfreq', 'maxblks', 'hx', 'hy', 'hz', 'ex','ey','rx', 'ry', 'ndipole', 'type', 'chksum'] start_data_lines_num = None temp_sectid = None def __init__(self, mt_or_emap_section_list =None, **kwargs): self._logging =watexlog.get_watex_logger(self.__class__.__name__) self.mtemapsectinfo = mt_or_emap_section_list for keys in self.mtemapsectkey: self.__setattr__(keys, None) for keys in list(kwargs.keys()):setattr(self, keys, kwargs[keys]) if self.mtemapsectinfo is not None: self.read_mtemap_section() @classmethod def get_mtemap_section_list (cls, edi_fn =None ): """ MT or EMAP section_classmethod to get special MT info or EMAP info in edifile :param edi_fn: full path to edifile :type edi_fn: str :returns: newclass contained a list of mtemap infos :rtype: list """ _logger.info ( "Read MT Section on edifile <%s>: %s" % ( os.path.basename(edi_fn),cls.__name__)) if edi_fn is None : raise TypeError('NoneType can not be None. Please provide your' ' right edipath.') mtflag, mtsection = 0, [] gmtsectmeasurement=[] #check whether it is an edifile. IsEdi._assert_edi (edi_fn) with open (edi_fn , 'r',encoding ='utf8') as fmt : edilines = fmt.readlines() for ii, mtitems in enumerate(edilines) : if 'dataid'.upper()in mtitems or \ re.match(r'DATAID',mtitems ) is not None : mitems = mtitems.replace('"','') try : forsectid = mitems.split('=') except : pass else : cls.temp_sectid = forsectid[-1] if '>=mtsect'.upper() in mtitems or \ '>=emapsect'.upper() in mtitems or\ mtitems.find('>=mtsect')>=0 : mtflag =ii if (mtflag > 0) and ('>!' in mtitems or\ re.match(r'>!', mtitems) is not None or\ '>FREQ' in mtitems) : mtsection = edilines[mtflag+1:ii] # get the number where start data section . cls.start_data_lines_num =ii break # rebuild the list for consistent to be sure to # take all params whatever it's messy ss = gather_measurement_key_value_with_str_parser( old_measurement_list=mtsection) gmtsectmeasurement.extend(ss) return cls (mt_or_emap_section_list =gmtsectmeasurement) def read_mtemap_section (self, mt_or_emap_section_list =None): """ Read mtsection and set attribute . values can be set as key [key01=value01, ..., keynn=valuenn] :param mt_or_emap_section_list: `mt` or `emap` section list :type mt_or_emap_section_list: list :Example: >>> import watex.edi as csedi >>> mtsection_obj= csedi.MTEMAP.get_mtemap_section_list(edi_fn =path) >>> info = mtsection_obj.read_mtemap_section() >>> print(mtsection_obj.sectid) """ mtemapsectionlist, fmt =[],"{:>12}" self._logging.info ('Reading MTEMAP section and populate attributes') if mt_or_emap_section_list is not None : self.mtsectinfo =mt_or_emap_section_list if self.mtemapsectinfo is None : warnings.warn( 'Nonelist can not be read. Please ' 'provide the right MT section list info.') raise EDIError( 'Nonelist found. cannot read MTsection info.' ' Please provide the riht path.') for ii , mtinfo in enumerate (self.mtemapsectinfo): try : mtinfo =mtinfo.replace('"','') mtkey, mtvalue = mtinfo.strip().split('=') if mtvalue =='' or mtvalue =='**' : mtvalue ='{:.2f}'.format(.0) except :pass else : mtkey, mtvalue =strip_item(item_to_clean=mtkey)[0], strip_item( item_to_clean=mtvalue)[0] self.__setattr__(mtkey.lower(), mtvalue) if mtkey.lower()=='sectid' or mtkey.lower()=='nfreq' : fmt ='{:>7}' mtemapsectionlist.append(''.join([' {0}'.format(mtkey.lower()), '=', fmt.format(str(mtvalue))])) #check the mtsection and fill it is None or 0.00 for nn, olditem in enumerate (mtemapsectionlist): if 'sectid' in olditem : newitem = olditem.split('=') try :float(newitem[-1]) except : pass else : # for consistency try to leave return if exists mtemapsectionlist[nn]=''.join( ['sectid','=',self.temp_sectid]).strip() #--> rewrite the list for consistency self._logging.info ( 'FileType with DataType supposed to be written' ' is on <%s>_sect format ' % mtemapsectionlist[0]) self.mtemapsectinfo =mtemapsectionlist return self def write_mtemap_section (self, mt_or_emap_section_list =None , nfreq=None): """ Method to write MT or EMAP section into list by providing list as ['key01= value01', ..., keyxx=valuexx ]. Method can recognize whether edifile provided is MT or EMAP then can read file according to. :param mt_or_emap_section_list: list of mt or eamp sectiionvalidate by egal ('=') :type mt_or_emap_section_list: list :Example: >>> from watex.edi import MTEMAP >>> mtemapinfo ={'sectid':'yobkro','nfreq':18 , 'maxblks':100, 'hx':"1003.1", ... 'ex':'1005.4','hz':'1006.3', 'ey':1002.3 , ... 'hy':'1000.2'}#'chksum':18, 'ndipole':47, 'type':'hann' >>> mtemapsection_obj = MTEMAP(**mtinfo) >>> writeinfomtemapsect = mtemapsection_obj.write_mtemap_section() >>> print(writeinfomtemapsect) """ self._logging.info ('Writing MT or EMAP section') fmt='{:>7}' write_lines, mtemapList =['>=MTSECT\n'],[] infoType =None #flag to check whether edifile is MT or EMAP if mt_or_emap_section_list is not None : self.mtemapsectinfo = mt_or_emap_section_list if self.mtemapsectinfo is not None : for keyitem in self.mtemapsectinfo: key, itemval = keyitem.strip().split('=') if key.lower() in self.mtemapsectkey : if key.lower() in ['type', 'ndipole', 'chksum']: if itemval not in ['', 'None',' ', None] : infoType ='>=EMAPSECT\n' write_lines.append("".join( [' {0:<7}'.format(key.upper()), '=',fmt.format(str(itemval).upper()), '\n' ])) else :continue elif key.lower() in ['ex', 'ey', 'hx', 'hy', 'hz']: fmt='{:>12}' write_lines.append(''.join( [' {0}'.format(key.upper()),'=', fmt.format(str(itemval).upper()),'\n'])) else : if key.lower()=='nfreq': if nfreq is not None : itemval= nfreq write_lines.append("".join( [' {0:<7}'.format(key.upper()), '=', fmt.format(str(itemval).upper()), '\n' ])) # the case to convert file , set attribute elif self.mtemapsectinfo is None : for keyi in self.mtemapsectkey : mtvalue =getattr(self, keyi) if keyi in ['type', 'ndipole', 'chksum']: if mtvalue not in ['', 'None',' ', None] : infoType = '>=EMAPSECT\n' else :continue if mtvalue ==None : mtvalue = '' if keyi =='nfreq': if nfreq is not None : mtvalue = nfreq if keyi in ['ex', 'ey', 'hx', 'hy', 'hz']: fmt='{:>12}' write_lines .append("".join([' {0}'.format( keyi.upper()), '=',fmt.format( str(mtvalue).upper()), '\n' ])) else : write_lines .append("".join( [' {0:<}'.format(keyi.upper()), '=',fmt.format(str(mtvalue).upper()), '\n' ])) if infoType is None :return write_lines if infoType is not None : write_lines[0]=infoType for newitem in write_lines :# reduce unuseless components for EMAP if newitem.find('EX')> 0 or newitem.find('EY')> 0 or\ newitem.find('HZ') >0 :pass else : mtemapList.append(newitem) return mtemapList class Source(object): """ Information of the file history, how it was made Holds the following information: ===================== ========== ======================================== Attributes Type Explanation ===================== ========== ======================================== project string where the project have been done sitename string where the survey have been taken place creationdate string creation time of file YYYY-MM-DD,hh:mm:ss creatingsoftware string name of program creating the file author Person person whom created the file submitter Person person whom is submitting file for archiving ===================== ========== ======================================== More attributes can be added by inputing a key word dictionary :Example: >>> from watex.edi import Source >>> Source(**{'archive':'IRIS', ... 'reprocessed_by':'grad_student'}) """ def __init__(self, **kwargs): for key in ['project', 'survey', 'sitename']: self.__setattr__(key, None) self.creationdate= time.strftime('%Y-%M-%D - %H:%M:%S', time.gmtime()) self.creatingsoftware = f'WATEX {_wx_version_}' self.Author = Person() self.Recipient =Person() for key in list(kwargs.keys()): setattr(self, key, kwargs[key]) class Processing(object): """ Information for a Edi processing Holds the following information: =================== ============= ======================================= Attributes Type Explanation =================== ============= ======================================= ProcessingSoftware class Software obj : Input software info. processedby str name handler of dataprocessing processingtag str specifictag runlist list --- remoteref str reference point for remoting remotesite str reference site name signconvention str convention sign provide default =================== ============= ======================================= More attributes can be added by inputing a key word dictionary """ def __init__(self, **kwargs): for key in ['processedby', 'processingtag', 'runlist', 'remoteref', 'remotesite']:self.__setattr__(key, None) self.ProcessingSoftware = Software() self.signconvention = 'exp(+i \omega t)' for key in list(kwargs.keys()): setattr(self, key, kwargs[key]) def minimum_parser_to_write_edi (edilines, parser =None ): """ This function validates edifile for writing , string with egal. We assume that dictionnary in list will be for definemeasurment E and H fieds. Parameters ----------- edilines: list, list of items to parse :type edilines: list parser: str, parser edifile section DefineMeasurement, can be change. *default* is egal (=) Examples ---------- >>> from watex.edi import DefineMeasurement >>> file_edi= 'S00_ss.edi' >>> path = os.path.join(os.environ["watex"], ... 'watex','data', file_edi_2) >>> definemeas =DefineMeasurement.get_measurement_info(edi_fn=path) >>> minimparser = minimum_parser_to_write_edi( ... edilines =definemeas.define_measurement) >>> print(minimparser) """ if parser is None :parser ='=' elif parser is not None : if type(parser) is not str : raise TypeError('Parser must a string value like <"=", ...>') if not isinstance(edilines,list): if isinstance(edilines , tuple) : edilines =list(edilines) else : raise TypeError( '<edilines> argument must be on list of string with egal') for ii, lines in enumerate(edilines) : if isinstance(lines, dict):continue elif lines.find(parser) <0 : warnings.warn ( 'None <"="> found on this item<{0}> of ' ' the edilines list. list can not be parsed.' ' Please put egal between key and value '.format(edilines[ii])) return None return edilines def gather_measurement_key_value_with_str_parser ( old_measurement_list, parser =None): """ fonction to rebuild xmeasurement list , to solder list with egal. In the case where no value is found at the last item, we will add "None" . :param old_measurement_list: measurement list to solder :type old_measurement_list: list :param parser: can be egal or all you want, *Default* is None mean parser '='. :type parser: str :returns: list solded with egal like <key=value> :rtype: list :Example: >>> from watex.edi import gather_measurement_key_value_with_str_parser >>> measm = [ ['>HMEAS', 'ID=1001.001', 'CHTYPE=HX', 'X=', '0.0', 'Y=', ... '0.0', 'Z=', '0.0', 'AZM=', '0.0', 'TS='], ... ['>HMEAS', 'ID=1002.001', 'CHTYPE=HY', 'X=', '0.0', ... 'Y=', '0.0', 'Z=', '0.0', 'AZM=', '90.0'], ... ['>HMEAS', 'ID=1003.001', 'CHTYPE=HZ', 'X=', '0.0', 'Y=', ... '0.0', 'Z=', '0.0', 'AZM=', '0.0', 'TS=', '']] >>> for item in measm: ... print(gather_measurement_key_value_with_str_parser(old_measurement_list=item) ) ... ['ID=1001.001', 'CHTYPE=HX', 'X=0.0', 'Y=0.0', 'Z=0.0', 'AZM=0.0', 'TS=None'] ... ['ID=1002.001', 'CHTYPE=HY', 'X=0.0', 'Y=0.0', 'Z=0.0', 'AZM=90.0'] ... ['ID=1003.001', 'CHTYPE=HZ', 'X=0.0', 'Y=0.0', 'Z=0.0', 'AZM=0.0', 'TS=None'] """ # print(old_measurement_list) new_list, temp=[[] for ii in range(2)] if parser is None :parser ='=' for ii, item in enumerate(old_measurement_list): if item.count(parser)> 1 : # where there is many egal = temp.extend(item.split()) #split assume that there is space old_measurement_list=old_measurement_list[:ii] old_measurement_list.extend(temp) break # chech whether there is eg: [X=,''] for ii, item in enumerate (old_measurement_list): if item.count(parser) ==1 : itemf = item.split(parser) if itemf[-1] !='' or len(itemf[-1]) !=0: new_list.append(item) else : try : addvalue = float(old_measurement_list[ii+1]) except : new_list.append(''.join([item, 'None'])) pass else : new_list.append(''.join([item, str(addvalue)])) return new_list