Source code for watex.geology.drilling

# -*- coding: utf-8 -*-
#   License: BSD-3-Clause
#   Author: LKouadio <etanoyau@gmail.com>
#   Created on Thu Sep 29 08:30:12 2022 

from __future__ import print_function , annotations
import os 
from warnings import warn 
import numpy as np 
import pandas as pd 

from .core import get_agso_properties 
from .geology import Geology
from .._typing import NDArray, DataFrame
from ..exceptions import NotFittedError 
from ..site import Profile 
from ..utils._dependency import import_optional_dependency 
from ..utils.box import data2Box 
from ..utils.coreutils import _is_readable, makeCoords 
from ..utils.funcutils import ( 
    _assert_all_types, 
    to_numeric_dtypes , 
    smart_strobj_recognition, 
    convert_value_in 
    )
from ..utils.geotools import get_random_thickness
from ..utils.validator import check_array 

[docs]class DSBoreholes : """ Class deals with many boreholes dataset. DSBoreholes works with the data set composed of multiple borehole data. The data columns are the all attributes of the object and any non-alphateic character is replaced by ``_``. For instance, a column name ``layer thickness`` should have an attribute named ``layer_thickness``. Each borehole (row) data become its own object which encompasses all columns as attributes. To have full control of how data must be retrieved, ``holeid`` parameter must be set. For instance, to retrieve the borehole with ID equals to `bx02`, after fitting the class with appropriate parameters, attibute `hole depth` ( if exist in the data) can be retrieved as ``self.hole.bx02.hole_depth``. By default if the projection is given as latitude/longitude Parameters ------------ area: str Name of area where the data collection is made. holeid: str, optional The name of column of the boreholes collections ID. Note that if given, it should exist in the borehole datasets. lon, lat: ArrayLike 1d /str , optional One dimensional arrays. `xlon` can be consider as the abscissa of the landmark and `ylat` as ordinates array. If `xlon` or `ylat` is passed as string argument, `data` must be passed as `fit_params` keyword arguments and the name of `xlon` and `y` must be a column name of the `data`. By default `xlon` and `ylat` are considered as `longitude` and `latitude` when ``dms`` or ``ll`` coordinate system is passed. utm_zone: Optional, string zone number and 'S' or 'N' e.g. '55S'. Default to the centre point of coordinates points in the survey area. It should be a string (##N or ##S) in the form of number and North or South hemisphere, 10S or 03N projection: str, ['utm'|'dms'|'ll'] The coordinate system in which the data points for the profile is collected. If not given, the auto-detection will be triggered and find the suitable coordinate system. However, it is recommended to provide it for consistency. Note that if `x` and `y` are composed of value less than 180 degrees for longitude and 90 degrees for latitude, it should be considered as longitude-latitude (``ll``) coordinates system. If `x` and `y` are degree-minutes-second (``dms`` or ``dd:mm:ss``) data, they must be specify as coordinate system in order to accept the non-numerical data before transforming to ``ll``. If ``data`` is passed to the :meth:`.fit` method and ``dms`` is not specify, `x` and `y` values should be discarded. datum: string, default = 'WGS84' well known datum ex. WGS84, NAD27, NAD83, etc. epsg: Optional, int epsg number defining projection ( see http://spatialreference.org/ref/ for moreinfo) Overrides utm_zone if both are provided. encoding: str, default ='utf8' Default encoding for parsing data. Can also be ['utf-16-be'] for reading bytes characters. interp_coords: bool, default=False Interpolate position coordinates. reference_ellipsoid: int, default=23 reference ellipsoids is derived from Peter H. Dana's website- http://www.utexas.edu/depts/grg/gcraft/notes/datum/elist.html Department of Geography, University of Texas at Austin Internet: pdana@mail.utexas.edu . Default is ``23`` constrained to WGS84. verbose: int, default=0 Output messages. Attributes ---------- lon_, lat_: Arraylike, longitude/latitude of coordinates arrays. `hole.<holeid>.<data_column>`: :class:`~watex.utils.box.Boxspace` Each borehole, commonly which ID correspond to each row. Each row can be fetched as 'holeID'. If `holeid` is nt specified, the string literal `hole+index of data` composed the borehole object. Notes ------ When `data` is supplied and `lon` and `lat` are given by their names existing in the dataframe columns, by default, the non-numerical data are removed. However, if `y` and `x` are given in DD:MM:SS in the dataframe, the coordinate system must explicitly set to ``dms` to keep the non-numerical values in the data. """ def __init__( self, area:str=None, holeid:str=None, lat:str=None, lon:str=None, projection:str ='ll', utm_zone:str=None, datum:str='WGS84', epsg:int=None, encoding:str='utf-8', interp_coords:bool=False, reference_ellipsoide:int=23, verbose:bool=False ): self.area =area self.holeid=holeid self.projection= projection self.utm_zone=utm_zone self.reference_ellipsoide= reference_ellipsoide self.datum=datum self.encoding= encoding self.epsg =epsg self.interp_coords=interp_coords self.lon=lon self.lat=lat self.verbose= verbose
[docs] def fit ( self, data, **fit_params): """ Fit Hole data set and populate attributes. Parameters ---------- data: Path-like Object or DataFrame Hole data. fit_params: dict, Keyword arguments passed to :func:`watex.to_numeric_dtypes` to sanitize the data. Return ------ self: :class:`DSBoreholes` Instanced object for chaining methods. """ columns = fit_params.pop ("columns", None ) data = _is_readable(data, as_frame =True, input_name= 'b', columns = columns, encoding =self.encoding ) data = check_array ( data, force_all_finite= "allow-nan", dtype =object , input_name="Boreholes data", to_frame=True, ) self.lon_=None; self.lat_=None if ( self.lon is not None and self.lat is not None ): p = Profile (utm_zone = self.utm_zone , coordinate_system= self.projection, datum= self.datum , epsg= self.epsg, reference_ellipsoid=self.reference_ellipsoid ) p.fit (x = self.lon, y = self.lat, data = data ) if self.interp_coords: p.interpolate () self.lon_= p.x self.lat_= p.y # For consistency, Check the datatype, sanitize columns # and drop all NaN columns and row values data, nf, cf = to_numeric_dtypes( data , return_feature_types= True, verbose =self.verbose, sanitize_columns= True, fill_pattern='_', **fit_params ) self.feature_names_in_ = nf + cf if len(cf )!=0: # sanitize the categorical values for c in cf : data[c] = data[c].str.strip() for name in data.columns : setattr (self, name, data[name]) # set depth attributes if 'depth' in self.feature_names_in_: self.depth_= data['depth'] self.data_ = data.copy() use_col =False if self.holeid is not None: use_col = True else: self.holeid ='hole' self.hole = data2Box ( self.data_ , name =self.holeid, use_colname= use_col ) return self
[docs] def set_coordinates ( self, reflong, reflat, step ='5m', todms=False, r= 45, **kws ): """ Generate longitude and latitude coordinates for boreholes. It assumes boreholes are aligned along the same axis. Parameters ----------- reflong: float or string or list of [start, stop] Reference longitude in degree decimal or in DD:MM:SS for the first site considered as the origin of the landmark. reflat: float or string or list of [start, stop] Reference latitude in degree decimal or in DD:MM:SS for the reference site considered as the landmark origin. If value is given in a list, it can containt the start point and the stop point. step: float or str Offset or the distance of seperation between different sites in meters. If the value is given as string type, except the ``km``, it should be considered as a ``m`` value. Only meters and kilometers are accepables. r: float or int The rotate angle in degrees. Rotate the angle features toward the direction of the projection profile. Default value use the :meth:`~.bearing` value in degrees. todms: bool, Default=False Reconvert the longitude/latitude degree decimal values into the DD:MM:SS. kws: dict, Additional keywords of :func:`~watex.utils.exmath.makeCoords`. Returns -------- self: Instanced object Instanced object for method chaining. """ self.inspect nsites = len(self.data_ ) isutm = False if self.projection =='ll' else True utm_zone = kws.pop ('utm_zone', None ) or self.utm_zone self.lon_, self.lat_= makeCoords( reflong, reflat, nsites =nsites, r= r , step =step , todms=todms, utm_zone= utm_zone, is_utm= isutm, datum=self.datum, espg=self.epsg, **kws ) return self
def __repr__(self): """ Pretty format for programmer guidance following the API... """ _t = ("name", "dname", "projection", "utm_zone", "encoding", "datum", "epsg", "reference_ellipsoid" ,"interp_coords", "verbose") outm = ( '<{!r}:' + ', '.join( [f"{k}={ False if getattr(self, k)==... else getattr(self, k)!r}" for k in _t]) + '>' ) return outm.format(self.__class__.__name__) def __getattr__(self, name): rv = smart_strobj_recognition(name, self.__dict__, deep =True) appender = "" if rv is None else f'. Do you mean {rv!r}' err_msg = f'{appender}{"" if rv is None else "?"}' raise AttributeError ( f'{self.__class__.__name__!r} object has no attribute {name!r}' f'{err_msg}' ) @property def inspect (self): """ Inspect object whether is fitted or not""" msg = ( "{obj.__class__.__name__} instance is not fitted yet." " Call 'fit' with appropriate arguments before using" " this method" ) if not hasattr (self, 'hole'): raise NotFittedError(msg.format( obj=self) ) return 1
[docs]class DSBorehole: """ Class delas with Borehole datasets. :class:`watex.geology.drilling.DSBorehole` works with data collected in a single borehole. For instance, it could follow the arrangement of ``h502`` data in :func:`watex.datasets.load_hlogs` Parameters ------------ name: str Name or ID of the borehole. dname: str, optional Depth column name. If `depth` is specify an attribute `depth_` should be created. Depth specification is usefull for log plotting of machine training. utm_zone: Optional, string zone number and 'S' or 'N' e.g. '55S'. Default to the centre point of coordinates points in the survey area. It should be a string (##N or ##S) in the form of number and North or South hemisphere, 10S or 03N projection: str, ['utm'|'dms'|'ll'] The coordinate system in which the data points for the profile is collected. If not given, the auto-detection will be triggered and find the suitable coordinate system. However, it is recommended to provide it for consistency. Note that if `x` and `y` are composed of value less than 180 degrees for longitude and 90 degrees for latitude, it should be considered as longitude-latitude (``ll``) coordinates system. If `x` and `y` are degree-minutes-second (``dms`` or ``dd:mm:ss``) data, they must be specify as coordinate system in order to accept the non-numerical data before transforming to ``ll``. If ``data`` is passed to the :meth:`.fit` method and ``dms`` is not specify, `x` and `y` values should be discarded. datum: string, default = 'WGS84' well known datum ex. WGS84, NAD27, NAD83, etc. epsg: Optional, int epsg number defining projection ( see http://spatialreference.org/ref/ for moreinfo) Overrides utm_zone if both are provided. reference_ellipsoid: int, default=23 reference ellipsoids is derived from Peter H. Dana's website- http://www.utexas.edu/depts/grg/gcraft/notes/datum/elist.html Department of Geography, University of Texas at Austin Internet: pdana@mail.utexas.edu . Default is ``23`` constrained to WGS84. encoding: str, default ='utf8' Default encoding for parsing data. Can also be ['utf-16-be'] for reading bytes characters. lonlat: Tuple, Optional longitude/latitude for borehole coordinates. The location where the borehole is performed. verbose: int, default=0 Output messages. Attributes ----------- depth_: Series Depth array if `dname` is specified. data_: Pandas DataFrame Sanitized dataframe. Note ------ Each columns of the dataframe is an attribute. Note that all the non- alphabetic letters is removed and replace by '_'. """ def __init__ ( self, name:str=None, dname: str=None, projection:str='ll', utm_zone:str=None, datum:str ='WGS84', epsg:int=None, reference_ellipsoid:int=23, encoding:str ='utf-8', lonlat:tuple =None, verbose:int= 0, ): self.name=name self.dname=dname self.projection= projection self.utm_zone=utm_zone self.reference_ellipsoid= reference_ellipsoid self.datum=datum self.encoding= encoding self.epsg =epsg self.verbose= verbose
[docs] def fit(self, data, **fit_params ): """ Fit Borehole data and populate attribute data. By default if the projection is given as latitude/longitude xlon, ylat are longitude and latitude respectively. Parameters ------------ data: pd.DataFrame or Path-like object. Data containing `xlon` and `y` values as series. Then if `xlon` and `y` are given as string argument, their names must be included in the data columns. Otherwise an error will raise. Return --------- self : Instanced object Instanced object for chaining method. """ columns = fit_params.pop ("columns", None ) data = _is_readable(data, as_frame =True, input_name= 'b', columns = columns, encoding =self.encoding ) data = check_array ( data, force_all_finite= "allow-nan", dtype =object , input_name="Borehe data", to_frame=True, ) data, nf, cf = to_numeric_dtypes( data , return_feature_types= True, verbose =self.verbose, sanitize_columns= True, fill_pattern='_', **fit_params ) self.feature_names_in_ = nf + cf if len(cf )!=0: # sanitize the categorical values for c in cf : data[c] = data[c].str.strip() for name in data.columns : setattr (self, name, data[name]) # set depth attributes self.depth_= None if self.dname is not None: if self.dname not in self.feature_names_in_: self.dname ='depth' if self.dname in self.feature_names_in_: self.depth_= data[self.dname] self.data_ = data.copy() return self
[docs] def set_depth ( self ,z0=0., max_depth =None, ): """ Set the a random depth if depth is not given. To fetch the depth, use attribute `depth_`. Note that if the depth exist, calling `set_depth` will arase the former depth value. Use in cautioness. Parameters ----------- z0: float, default=0. The surface reference. Preferably, it is set to null. max_depth: float, default=700. The maximum depth. Depth size must fit the length of the data in meters. Default depth is fixed to 700 meters. Return ------- self: Instanced object Instanced object for chaining method. """ self.inspect z0 = convert_value_in (z0 ) max_depth = 700. if not max_depth else max_depth max_depth = float( _assert_all_types ( max_depth, int, float, objname = 'Maximum-depth')) self.depth_ = pd.Series ( np.linspace ( z0, max_depth, len(self.data_) ), name ='depth') # append depth data # self.data_.insert (0 , 'depth', self.depth_, allow_duplicates =True) self.data_ = pd.concat ([ self.depth_, self.data_], axis = 1, ignore_index =True ) return self
[docs] def set_thickness(self, h0= 1 , **kws ): """ Set a random layer thickness from borehole refering to the depth. To fetch the thickness, use attribute `layer_thickness_`. Parameters ----------- h0: int, default='1m' Thickness of the first layer. shuffle: bool, default=True Shuffle the random generated thicknesses. dirichlet_dis: bool, default=False Draw samples from the Dirichlet distribution. A Dirichlet-distributed random variable can be seen as a multivariate generalization of a Beta distribution. The Dirichlet distribution is a conjugate prior of a multinomial distribution in Bayesian inference. random_state: int, array-like, BitGenerator, np.random.RandomState, \ np.random.Generator, optional If int, array-like, or BitGenerator, seed for random number generator. If np.random.RandomState or np.random.Generator, use as given. unit: str, default='m' The reference unit for generated layer thicknesses. Default is ``meters`` Return ------- self: Instanced object Instanced object for chaining method. """ self.inspect if self.depth_ is None: self.set_depth () thickness = get_random_thickness ( self.depth_, **kws) self.layer_thickness_= pd.Series (thickness, name='layer_thickness' ) self.data_ = pd.concat ([ self.data_, self.layer_thickness_], axis = 1, ignore_index =True ) return self
def __repr__(self): """ Pretty format for programmer guidance following the API... """ _t = ("name", "dname", "projection", "utm_zone", "encoding", "datum", "epsg", "reference_ellipsoid" ,"interp_coords", "verbose") outm = ( '<{!r}:' + ', '.join( [f"{k}={ False if getattr(self, k)==... else getattr(self, k)!r}" for k in _t]) + '>' ) return outm.format(self.__class__.__name__) def __getattr__(self, name): rv = smart_strobj_recognition(name, self.__dict__, deep =True) appender = "" if rv is None else f'. Do you mean {rv!r}' err_msg = f'{appender}{"" if rv is None else "?"}' raise AttributeError ( f'{self.__class__.__name__!r} object has no attribute {name!r}' f'{err_msg}' ) @property def inspect (self): """ Inspect object whether is fitted or not""" msg = ( "{obj.__class__.__name__} instance is not fitted yet." " Call 'fit' with appropriate arguments before using" " this method" ) if not hasattr (self, 'data_'): raise NotFittedError(msg.format( obj=self) ) return 1
[docs]class Borehole(Geology): """ Focused on Wells and `Borehole` offered to the population. To use the data for prediction purpose, each `Borehole` provided must be referenced on coordinates values or provided the same as the one used on `ves` or `erp` file. """ def __init__( self, lat:float = None, lon:float = None, area:str = None, status:str =None, depth:float = None, base_depth:float =None, geol:str=None, staticlevel:float =None, airlift:float =None, id=None, qmax =None, **kwds ): super().__init__(**kwds) self.lat=lat self.lon=lon self.area=area self.status=status self.depth=depth self.base_depth=base_depth self.geol=geol self.staticlevel=staticlevel self.airlift =airlift self.id=id self.qmax=qmax for key in list(kwds.keys()): setattr (self, key, kwds[key])
[docs] def fit(self, data: str |DataFrame | NDArray )-> object: """ Fit Borehole data and populate the corrsponding attributes""" self._logging.info ("fit {self.__class__.__name__!r} for corresponding" "attributes. ") return self
[docs]class Drill(Geology): """ This class is focus on well logs . How to generate well Log for Oasis: Arguments ----------- **well_filename** : string , The well filename. 02 options is set : 1rst option is to build well data manually and the program will generate a report. 2nd option is to send to the program a typical file type to be parsed . the programm parses only the typical well datafile. If None , the program will redirect to build mannually option . **auto** : bool option to automatically well data . set to True if you want to build manually a well data . *default* is False ==================== ========== ========================================= Key Words/Attributes Type Description ==================== ========== ========================================= utm_zone str utm WGS84 zone. should be N or S. *default* is 49N . compute_azimuth bool if no azimuth is provided. set to True to letprogram to compute azimuth .*Default* is False. Drill_dip float The dip of drill hole.*default* is 90 Drill_buttom float The average bottom of drill , can be filled during the well buiding . *default* is None mask int the mask of DrillHole(DH) data. *Default * is 1. ==================== ========== ========================================= ================== ======================================================= Methods Description ================== ======================================================= _collar build _collar data *return* collar log dataframe format dhGeology build DH log geology *return* geology log dataframe. dhSample build DH Geochemistry-Strutural sample, *return* Sample log dataframe dhSurveyElevAz build DH Elevation & Azimuth logs.*return * Elevation & Azimuth dataframes writeDHDATA output log :* return * the right log to output for Oasis Montaj ================== ======================================================= :Example: >>> from watex.geoloy.drilling import Drill >>> parser_file ='nbleDH.csv' >>> drill_obj=Drill(well_filename='data/drill/drill_example_files') >>> scollar=drill._collar(DH_Top=None) >>> sgeo=drill.dhGeology() >>> ssam=drill.dhSample() >>> selevaz=drill.dhSurveyElevAz( add_elevation=None, ... add_azimuth=None) >>> swrite=drill.writeDHData(data2write ="*", savepath =None) """ import_optional_dependency ("openpyxl") def __init__(self, well_filename=None , auto=True, **kwargs): self.wfilename=well_filename self.auto=auto self.mask=kwargs.pop("mask",1) self.utm_zone=kwargs.pop("utm_zone","49N") self.compute_azimuth=kwargs.pop("compute_azimuth",False) self.dip =kwargs.pop("Drill_dip",90) self.buttom=kwargs.pop("Drill_buttom", None) self.savepath =kwargs.pop('savepath', None ) self.easts=None self.norths= None self.wellnames= None self._f=None #populate attribute later self.wdico={"DH_Hole" :None, "DH_East":None, "DH_North":None, "Mask": None, "DH_RH":None, 'DH_From':None , "DH_To": None , "Rock": None , "DH_Azimuth":None , 'DH_Top':None, 'DH_Bottom':None, 'DH_PlanDepth':None, 'DH_Decr':None, 'Sample':None, 'DH_Dip': None, 'Elevation':None, 'DH_RL':None, } # if self.auto is False and self.wfilename is None : # self.daTA=func.build_wellData (add_azimuth=self.compute_azimuth, # utm_zone=self.utm_zone, # report_path = self.savepath, # ) # self.wdata=self.daTA[1] # self.wdico["DH_East"] = self.wdata[:,1] # self.wdico["DH_North"] = self.wdata[:,2] # self.wdico["DH_Hole"] = self.wdata[:,0] # self.wdico['DH_Dip'] = self.wdata[:,4] # self.wdico['DH_Bottom'] = self.wdata[:,3] # self.wdico['DH_Decr'] = self.wdata[:,7] # self.wdico['DH_PlanDepth'] = self.wdata[:,6] # self.wdico["DH_Azimuth"] = self.wdata[:,5] # self._f=0 # elif self.wfilename is not None : # self.daTA=func.parse_wellData(filename=self.wfilename, # include_azimuth=False, # utm_zone=self.utm_zone) # self.wdata=self.daTA[1] # self.wdico.__setitem__("DH_East", self.wdata[:,1]) # self.wdico.__setitem__("DH_North", self.wdata[:,2]) # self.wdico.__setitem__("DH_Hole", self.wdata[:,0]) # self.wdico.__setitem__('DH_Dip', self.wdata[:,3]) # self.wdico.__setitem__('DH_PlanDepth', self.wdata[:,8]) # self.wdico.__setitem__("DH_Azimuth", self.wdata[:,5]) # self.wdico.__setitem__('DH_Decr', self.wdata[:,9]) # self.wdico.__setitem__('DH_Bottom', self.wdata[:,7]) # self._f=1 #set Mask and set dr_rh self.mask=np.full((self.wdata.shape[0]),self.mask,dtype='<U12') # print(self.mask.shape) self.wdico.__setitem__("Mask", self.mask) self.dh_rh=np.zeros((self.wdata.shape[0])) self.wdico.__setitem__("DH_RH", self.dh_rh) for keys in kwargs.keys(): self.__setattr__(keys, kwargs[keys]) def _collar(self, DH_Top=None,add_elevation =None ): """ Method to build Collar Data Parameters ---------- * DH_Top : np.ndarray , it's the Top of data for each Hole Name. ndaray (number of DH , 1) *Default* is None. Returns ------- pd.DataFrme collar Drillhole log """ if DH_Top is None : DH_Top=np.zeros((self.wdata.shape[0])) elif type(DH_Top) is float or type(DH_Top) is int : DH_Top=np.full((self.wdata.shape[0]),DH_Top,dtype='<U12') elif DH_Top is not None : if type(DH_Top)==list: DH_Top=np.array(DH_Top) assert DH_Top.shape[0]==self.wdata.shape[0],'the input DH_Top '\ 'shape doesnt match. The convenience '\ ' shape is %d.'%self.wdata.shape[0] # print(DH_Top) self.wdico.__setitem__('DH_Top',DH_Top) if self._f == 0 : if add_elevation is None : #No topography is added , set to 0 add_elevation=np.full((len(self.wdico['DH_East']),1),0, dtype='<U12') elif add_elevation is not None : if type(add_elevation ) is list : add_elevation =np.array(add_elevation) assert add_elevation.shape[0]==\ self.wdico['DH_East'].shape[0],"INDEXERROR:"\ " The the current dimention of Elevation data is {0}.It's must be"\ " the size {1}.".format( add_elevation.shape[0],self.wdico['DH_East'].shape[0]) self.wdico.__setitem__("Elevation", add_elevation) elif self._f == 1 : if add_elevation is not None: if type(add_elevation ) is list : add_elevation =np.array(add_elevation) try : np.concat((add_elevation,self.wdico['DH_East'])) except Exception : mess =''.join([ 'SIZEERROR! Try to set the elevation dimentional as ', 'same size like the collar data']) self._logging.error(mess) warn(mess) elif add_elevation is None : add_elevation=self.daTA [1][:,4] self.wdico.__setitem__("Elevation", add_elevation) collarKeys=["DH_Hole", "DH_East", "DH_North", "DH_RH", "DH_Dip", "Elevation", "DH_Azimuth","DH_Top", "DH_Bottom", "DH_PlanDepth", "DH_Decr", "Mask"] # print(self.wdico) collar=self.wdico[collarKeys[0]] collar=collar.reshape((collar.shape[0],1)) for ss, collk in enumerate(collarKeys[1:]): # print(collk) for key , value in self.wdico.items(): if key == collk : value=value.reshape((value.shape[0],1)) collar=np.concatenate((collar,value), axis=1) self.coLLAR=pd.DataFrame(data=collar, columns=collarKeys) return self.coLLAR
[docs] def dhGeology (self, dh_geomask=None): """ Method to build geology drillhole log. The name of input rock must feell exaction accordinag to a convention AGSO file . If not sure for the name of rock and Description and label. You may consult the geocode folder before building the well_filename. If the entirely rock name is given , program will search on the AGSO file the corresponding Label and code . If the rock name is founc then it will take its CODE else it will generate exception. Parameters ---------- * dh_geomask : np.ndarray, optional geology mask. send mask value can take exactly the np.ndarray(num_of_geology set ,). The better way to set geology maskis to fill on the wellfilename. if not , programm will take the general mask value. The *default* is None. Returns ------- pd.DataFrame geology drillhole log. """ geolKeys=["DH_Hole","DH_From", "DH_To","Rock", "Sample", "East", "DH_North", "DH_RH", "Mask"] wgeo=self.daTA[2] # print(wgeo) self.wdico.__setitem__('DH_From', wgeo[:,1]) self.wdico.__setitem__('DH_To', wgeo[:,2]) self.wdico.__setitem__("Rock",wgeo[:,3]) dhgeopseudosamp=np.zeros((wgeo.shape[0])) ###### FIND AGSO MODULE ####### #Try to check the name of rocks and their acronym geoelm= get_agso_properties() # #extract elem with their acronym geolemDico_AGSO={key:value for key , value in \ zip (geoelm["CODE"],geoelm['__DESCRIPTION'])} # elemgeo_AGSO=sorted(geolemDico.items()) for ii, elm in enumerate (self.wdico['Rock']): if elm.upper() in geolemDico_AGSO.keys(): pass elif elm.upper() not in geolemDico_AGSO.keys(): if elm.lower() in geolemDico_AGSO.values(): for key, values in geolemDico_AGSO.items(): if elm.lower() == values : self.wdico['Rock'][ii]=key else : mess=''.join(['The Geological Name ({0})' ' given in is wrong'.format(elm), 'Please provide a right name the right Name.', 'Please consult the AGSO file in _geocodes folder', 'without changing anything.']) self._logging.warn(mess) warn(mess) ######END AGS0 ######## self.dh_geoleast=np.zeros((wgeo.shape[0])) self.dh_geol_norths=np.zeros((wgeo.shape[0])) for ss , value in enumerate(self.dh_geoleast): for indix, val in enumerate(self.wdico["DH_East"]): if wgeo[:,0][ss] in self.wdico["DH_Hole"]: value=val self.dh_geoleast[ss] =value self.dh_geol_norths[ss]=self.wdico["DH_North"][indix] dhgeopseudosamp=np.zeros((wgeo.shape[0])) if dh_geomask == None : dh_geomask =self.mask[0] maskgeo= np.full((wgeo.shape[0]),dh_geomask,dtype='<U12') dhrhgeo=np.array([ -1* np.float(ii) for ii in self.wdico['DH_From']]) dhGeol=np.concatenate((wgeo[:,0].reshape(wgeo[:,0].shape[0],1), self.wdico['DH_From'].reshape(( self.wdico['DH_From'].shape[0],1)), self.wdico['DH_To'].reshape(( self.wdico['DH_To'].shape[0],1)), self.wdico['Rock'].reshape(( self.wdico['Rock'].shape[0],1)), dhgeopseudosamp.reshape(( dhgeopseudosamp.shape[0],1)), self.dh_geoleast.reshape(( self.dh_geoleast.shape[0],1)), self.dh_geol_norths.reshape(( self.dh_geol_norths.shape[0],1)), dhrhgeo.reshape((dhrhgeo.shape[0],1)), maskgeo.reshape((maskgeo.shape[0],1))),axis=1) self.geoDHDATA=pd.DataFrame(data=dhGeol, columns=geolKeys) return self.geoDHDATA
[docs] def dhSample (self,path_to_agso_codefile=None, dh_sampmask=None): """ Method to build Sample log. This method focuses on the sample obtained during the DH trip.it may georeferenced as the well_filename needed. A main thing is to set the AGSO_STCODES file. AGSO_STCODES is the conventional code of structurals sample. If you have an own AGSO_STCODES , you may provide the path * kwargs=path_to_ags_codefile * . the program will read and generate logs according to the DESCRIPTION and STCODES figured. if None, the program will take it STCODES and set the samplelogs. When you set the Sample code aor sample name , make sur that the name match the same name on STCODES. If not , program will raises an error. Parameters ---------- * path_to_agso_codefile : str, optional path to conventional AGSO_STRUCTURAL CODES. The *default* is None. * dh_sampmask : np.ndarray, optional Structural mask. The default is None. Returns ------- pd.DataFrame Sample DH log. """ sampKeys=["DH_Hole","DH_From", "DH_To","Rock", "Sample", "East", "DH_North", "DH_RH", "Mask"] wsamp=self.daTA[3] # print(wgeo) if wsamp is None : self.sampleDHDATA = None return # mean no geochemistry sample is provided self.wdico.__setitem__('DH_From', wsamp[:,1]) self.wdico.__setitem__('DH_To', wsamp[:,2]) self.wdico.__setitem__("Sample",wsamp[:,3]) dhsampseudorock=np.zeros((wsamp.shape[0])) ###### FIND AGSO MODULE (AGSO_STCODES) ####### #Try to check the name of sample and their acronym if path_to_agso_codefile is None: path_to_agso_codefile=os.path.join(os.path.abspath('.'), 'watex/etc/_geocodes' ) sampelm= get_agso_properties( config_file = os.path.join(path_to_agso_codefile, 'AGSO_STCODES.csv') ) # #extrcat elem with their acronym sampelmDico_AGSO={key:value for key , value in \ zip (sampelm["CODE"],sampelm['__DESCRIPTION'])} # elemgeo_AGSO=sorted(geolemDico.items()) for ii, elm in enumerate (self.wdico['Sample']): if elm.lower() in sampelmDico_AGSO.keys(): pass elif elm.lower() not in sampelmDico_AGSO.keys(): if elm in sampelmDico_AGSO.values(): for key, values in sampelmDico_AGSO.items(): if elm == values : self.wdico['Sample'][ii]=key else : mess=''.join([ 'The Sample Name({0}) given in is wrong'.format(elm), 'Please provide a right name the right Name.', 'Please consult the AGSO_STCODES.csv file located in ', '<watex/etc/_geocodes> dir. Please keep the' ' header safe and untouchable.']) self._logging.warn(mess) warn(mess) ######END AGS0_STCODES ######## dh_sampeast=np.zeros((wsamp.shape[0])) dh_sampnorths=np.zeros((wsamp.shape[0])) for ss , value in enumerate(dh_sampeast): for indix, val in enumerate(self.wdico["DH_East"]): if wsamp[:,0][ss] in self.wdico["DH_Hole"]: value=val dh_sampeast[ss] =value dh_sampnorths[ss]=self.wdico["DH_North"][indix] dhsampseudorock=np.zeros((wsamp.shape[0])) if dh_sampmask == None : dh_sampmask =self.mask[0] masksamp= np.full((wsamp.shape[0]),dh_sampmask,dtype='<U12') dhrhsamp=np.array([ -1* np.float(ii) for ii in self.wdico['DH_From']]) dhSample=np.concatenate((wsamp[:,0].reshape(wsamp[:,0].shape[0],1), self.wdico['DH_From'].reshape( (self.wdico['DH_From'].shape[0],1)), self.wdico['DH_To'].reshape( (self.wdico['DH_To'].shape[0],1)), dhsampseudorock.reshape( (dhsampseudorock.shape[0],1)), self.wdico['Sample'].reshape( (self.wdico['Sample'].shape[0],1)), dh_sampeast.reshape( (dh_sampeast.shape[0],1)), dh_sampnorths.reshape( (dh_sampnorths.shape[0],1)), dhrhsamp.reshape((dhrhsamp.shape[0],1)), masksamp.reshape((masksamp.shape[0],1))),axis=1) self.sampleDHDATA=pd.DataFrame(data=dhSample, columns=sampKeys) return self.sampleDHDATA
[docs] def dhSurveyElevAz(self, add_elevation=None, add_azimuth=None, **kwargs): """ Method to build Elevation & Azimuth DH logs. if add_elevation and . add_azimuth are set . The programm will ignore the computated azimuth, and it will replace to the new azimuth provided . all elevation will be ignore and set by the new elevation . *kwargs arguments {add_elevation , add-azimuth } must match the same size like the number of Drillholes . Each one must be on ndarray(num_of_holes, 1). Parameters ---------- * add_elevation : np.nadarray , optional elevation data (num_of_holes, 1) The *default* is None. * add_azimuth : np.ndarray , optional azimuth data (num_of_holes,1). The *default* is None. * DH_RL :np.float or np.ndarray(num_of_hole,1), if not provided , it's set to 0. means No topography is added'. Returns ------- pd.Dataframe Elevation DH log . pd.DataFrame Azimuth DH log. """ dh_rl=kwargs.pop("DH_RL",None) # sizep=self.wdico['DH_East'].shape[0] if self._f == 0 : if add_elevation is None : #No topography is added , set to 0 add_elevation=np.full((len(self.wdico['DH_East']),1),0, dtype='<U12') elif add_elevation is not None : if type(add_elevation ) is list : add_elevation =np.array(add_elevation) assert add_elevation.shape[0]==self.wdico[ 'DH_East'].shape[0],"INDEXERROR:"\ " The the current dimention of Elevation data is {0}.It's must be"\ " the size {1}.".format( add_elevation.shape[0],self.wdico['DH_East'].shape[0]) self.wdico.__setitem__("Elevation", add_elevation) elif self._f == 1 : if add_elevation is not None: if type(add_elevation ) is list : add_elevation =np.array(add_elevation) try : np.concat((add_elevation,self.wdico['DH_East'])) except : mess= ''.join([ 'SIZEERROR! Try to set the elevation dimentional. ', 'same like the collar data ']) self._logging.error(mess) warn(mess) elif add_elevation is None : add_elevation=self.daTA [1][:,4] self.wdico.__setitem__("Elevation", add_elevation) #set DH_RL if dh_rl is not None : if type (dh_rl) is list : dh_rl=np.array (dh_rl) assert dh_rl.shape[0]==self.data.shape[0]," DH_RL data size is out"\ " of the range.Must be {0}".format(self.data.shape[0]) self.wdico.__setitem__("DH_RL",dh_rl) elif dh_rl is None : #if None set DH_RL to None : self.wdico.__setitem__("DH_RL",np.full( (self.daTA[1].shape[0]),0,dtype='<U12')) #set azimuth if add_azimuth is not None : if type(add_azimuth) ==list : add_azimuth=np.array(add_azimuth) assert add_azimuth.shape[0]==self.data.shape[0]," Azimuth data size is out"\ " of the range.Must be {0}".format(self.data.shape[0]) self.wdico.__setitem__("DH_Azimuth",add_azimuth) elif add_azimuth is None : pass elevazKeys=['DH_Hole', 'Depth','DH_East', 'DH_North','Elevation','DH_RL','DH_Dip'] self.wdico.__setitem__("DH_RL",np.full( (self.daTA[1].shape[0]),0,dtype='<U12')) # add Hole and Depth surveyELEV =np.concatenate((self.wdico['DH_Hole'].reshape( (self.wdico['DH_Hole'].shape[0],1)), self.wdico["DH_Bottom"].reshape( (self.wdico["DH_Bottom"].shape[0],1))), axis=1) surveyAZIM=np.concatenate((self.wdico['DH_Hole'].reshape( (self.wdico['DH_Hole'].shape[0],1)), self.wdico["DH_Bottom"].reshape( (self.wdico["DH_Bottom"].shape[0],1))), axis=1) for ss , elm in enumerate (elevazKeys[2:]): for key, values in self.wdico.items(): if elm==key : values=values.reshape((values.shape[0],1)) if elm =='DH_RL'or elm=='DH_Dip': # print(values) surveyAZIM=np.concatenate((surveyAZIM,values),axis=1) elif elm=='Elevation': surveyELEV =np.concatenate((surveyELEV,values),axis=1) else: surveyAZIM=np.concatenate((surveyAZIM,values),axis=1) if ss < elevazKeys.index('Elevation')-1: surveyELEV =np.concatenate((surveyELEV,values),axis=1) self.surveyDHELEV=pd.DataFrame( data=surveyELEV, columns=elevazKeys[:5]) # pop the elevation elm on the list [elevazKeys.pop(ii) for ii, elm in enumerate(elevazKeys) if elm=='Elevation'] self.surveyDHAZIM=pd.DataFrame(data=surveyAZIM, columns=elevazKeys) return (self.surveyDHELEV, self.surveyDHAZIM)
[docs] def writeDHData (self, data2write =None ,**kwargs): """ Method to write allDH logs. It depends to the users to sort which data want to export and which format. the program support only two format (.xlsx and .csv) if one is set , it will ouptput the convenience format. Users can give a list of the name of log he want to export. Program is dynamic and flexible. It tolerates quite symbols number to extract data logs. Parameters ---------- * data2write : str or list , optional the search key. The default is None. * datafn :str savepath to exported file *Default* is current work directory. * write_index_on_sheet : bool, choice to write the sheet with pandas.Dataframe index. * writeType : str , file type . its may *.csv or *.xlsx . *Default* is *.xlsx * add_header : bool, add head on exported sheet. set False to mask heads. *Default* is True. * csv_separateType : str , Indicated for csv exported files , the type of comma delimited . defaut is ','. """ savepath =kwargs.pop("savepath",None ) writeIndex=kwargs.pop('write_index_on_sheet',False) writeType =kwargs.pop('writeType', 'xlsx') # csvencoding =kwargs.pop('encoding','utf-8') csvsetHeader =kwargs.pop('add_header',True) csvsep =kwargs.pop('csv_separateType',',') wDATA ={"collar": self._collar, "geology": self.dhGeology, 'sample':self.dhSample, 'elevazim':self.dhSurveyElevAz} _all=['5',"all","__all__",'CollGeoSampElevAz','CGSAZ','cgsaz', ['Collar','Geology','Sample','Elevation','Azimuth'], 'colgeosamelevaz','alldata','*'] df_collar=wDATA['collar']() df_geology=wDATA['geology']() df_sample=wDATA['sample']() df_elevation,df_azimuth=wDATA['elevazim']() # for df_ in [df_collar, df_geology, df_sample, # df_elevation,df_azimuth]: # df_.set_index(setIndex) # this is unnecessary _dHDico ={'collar': [['1','c'], df_collar], 'geology':[['2','g'],df_geology], 'sample': [['3','s'],df_sample], 'survey_elevation':[['4','elev', 'topo','topography','e'], df_elevation], 'survey_azimuth': [['5','-1','azim','a'],df_azimuth]} # skip the sample building geochemistry doesnt exists if self.sampleDHDATA is None : data2write =['1','2','4','5'] if data2write is None or data2write in _all : # write all with pd.ExcelWriter(''.join([self.daTA[0][:-1],'.xlsx'])) as writer : for keys, df_ in _dHDico.items(): df_[1].to_excel(writer,sheet_name=keys, index =writeIndex) elif data2write is not None : if type(data2write) is not list: data2write=str(data2write) try : if writeType in ['xlsx','.xlsx', 'excell', 'Excell','excel','Excel','*.xlsx']: for keys, df in _dHDico.items(): if data2write ==keys or data2write.lower( ) in keys or data2write in df[0]: df[1].to_excel('.'.join( [self.daTA[0][:-1],'xlsx']), sheet_name=keys,index =writeIndex) elif writeType in ['csv','.csv', 'comma delimited','*.csv', 'comma-separated-value', 'comma seperated value', 'comsepval']: # print('passed') for keys, df_ in _dHDico.items(): if data2write == keys or data2write.lower( ) in keys or data2write in df_[0]: df_[1].to_csv(''.join( [self.daTA[0][:-1],'.csv']), header=csvsetHeader, index =writeIndex,sep=csvsep) except Exception as error : self._logging.error ( 'The type you provide as WriteType argument is wrong.' ' Support only *.xlsx and *.csv format',error) warn ( 'Argument writeType support only [xlsx or csv] format.' ' Must change your *.{0} format'.format(writeType)) elif type(data2write) is list : data2write=[str(elm) for elm in data2write] # check the string format with pd.ExcelWriter(''.join( [self.daTA[0][:-1],'xlsx'])) as writer : for ii, df in enumerate (data2write): for keys, df__ in _dHDico.items(): if df.lower() in keys or df in df__[0] : df__[1].to_excel( writer,sheet_name=keys, index =writeIndex) else : self._logging.error ( 'The key you provide as agrument of data2write is wrong. ' 'the data2write argument should be either [collar, geology,' ' sample, elevation, azimuth] or all (*). ') warn ( 'Wrong format of input data2write ! Argument dataType is str,' ' or list of string element choosen among [collar, geology,' 'sample, elevation, azimuth] or all (*),' ' not {0}'.format(data2write)) # export to savepath if savepath is not None : self.savepath = savepath # create a folder in your current work directory if self.savepath is None : try : self.savepath = os.path.join(os.getcwd(), '_outputDH_') if not os.path.isdir(self.savepath): os.mkdir(self.savepath)# mode =0o666) except : warn("It seems the path already exists !") if self.savepath is not None : import shutil if writeType in ['csv','.csv', 'comma delimited', 'comma-separated-value','comma sperated value', 'comsepval']: shutil.move ( os.path.join(os.getcwd(), ''.join( [self.daTA[0][:-1],'csv'])), self.savepath) print('---> Borehole output <{0}> has been written to {1}.'.\ format(os.path.basename( ''.join([self.daTA[0][:-1],'.csv'])), self.savepath)) elif writeType in ['xlsx','.xlsx', 'excell','Excell','excel','Excel']: try : shutil.move (os.path.join(os.getcwd(), '.'.join([self.daTA[0][:-1],'xlsx'])), self.savepath) except: print("--> It seems the destination path " f"{self.savepath} already exists") print('---> Borehole output <{0}> has been written to {1}.'.\ format(os.path.basename( '.'.join([self.daTA[0][:-1],'xlsx'])), self.savepath))