# -*- coding: utf-8 -*-
# License: BSD-3-Clause
# Author: LKouadio <etanoyau@gmail.com>
from __future__ import (
annotations ,
print_function
)
import os
import copy
import shutil
from six.moves import urllib
import numpy as np
import pandas as pd
from .._typing import (
Any,
List,
NDArray,
DataFrame,
)
from .funcutils import (
is_iterable,
ellipsis2false,
smart_format,
sPath
)
from ._dependency import (
import_optional_dependency
)
[docs]
def adjust_phase_range(phase_array, value_range=None, mod_base=360):
"""
Adjust phase values to a specified range, accommodating for negative phases
and allowing for normalization based on a specified base (180 or 360 degrees).
Parameters
----------
phase_array : np.ndarray
An array of phase values, which can be negative or positive, and can
have any shape.
value_range : tuple, list, single value, or None, optional
The range of values to adjust the phase to. It can be:
- None: No adjustment is made, and the original phase array is returned.
- A single value (int or float): Considered as the maximum value with
a minimum of 0.
- A tuple or list of two values: Specifies the minimum and maximum range.
mod_base : int, optional
The base used for normalizing phase values through modulo operation.
It allows adjusting phase values within a specific periodicity
(commonly 180 or 360 degrees). Default is 360.
Returns
-------
np.ndarray
The adjusted phase values within the specified range.
Examples
--------
>>> phase_array = np.array([-540, -180, 0, 180, 360, 540])
>>> adjust_phase_range(phase_array, value_range=(0, 180), mod_base=180)
array([0, 0, 0, 180, 0, 0])
>>> adjust_phase_range(phase_array, value_range=(-90, 90), mod_base=360)
array([-90, -90, 0, 90, 90, -90])
>>> adjust_phase_range(phase_array, mod_base=360)
array([-180, -180, 0, 180, 360, 180])
"""
phase_array = is_iterable(phase_array, exclude_string= True, transform =True )
# Validate phase_array is a NumPy array
if not isinstance(phase_array, np.ndarray):
# Convert input to numpy array if necessary
phase_array = np.asarray(phase_array)
# Validate mod_base
if mod_base not in (90, 180, 270, 360 ):
raise ValueError("mod_base must be either 90, 180, 270 or 360.")
# If value_range is None, normalize phase values and return without further adjustment
if value_range is None:
return np.mod(phase_array + mod_base, mod_base)
# Validate and unpack value_range
if np.isscalar(value_range):
min_val, max_val = 0, value_range
elif isinstance(value_range, (tuple, list)) and len(value_range) == 2:
min_val, max_val = value_range
else:
raise ValueError("value_range must be None, a scalar, or a tuple/list of two values.")
# Ensure min_val is less than max_val
min_val, max_val = min(min_val, max_val), max(min_val, max_val)
# Validate value_range contents
if not (np.isscalar(min_val) and np.isscalar(max_val) and min_val < max_val):
raise ValueError("Invalid value_range: min_val must be less than max_val.")
# Normalize phase values using mod_base
normalized_phase = np.mod(phase_array, mod_base)
normalized_phase[normalized_phase < 0] += mod_base
# Scale and shift normalized phase values to the desired range
scale_factor = (max_val - min_val) / mod_base
adjusted_phase = normalized_phase * scale_factor + min_val
return adjusted_phase
[docs]
def array2hdf5 (
filename: str, /,
arr: NDArray=None ,
dataname: str='data',
task: str='store',
as_frame: bool =...,
columns: List[str, ...]=None,
)-> NDArray | DataFrame:
""" Load or write array to hdf5
Parameters
-----------
arr: Arraylike ( m_samples, n_features)
Data to load or write
filename: str,
Hdf5 disk file name whether to write or to load
task: str, {"store", "load", "save", default='store'}
Action to perform. user can use ['write'|'store'] interchnageably. Both
does the same task.
as_frame: bool, default=False
Concert loaded array to data frame. `Columns` can be supplied
to construct the datafame.
columns: List, Optional
Columns used to construct the dataframe. When its given, it must be
consistent with the shape of the `arr` along axis 1
Returns
---------
None| data: ArrayLike or pd.DataFrame
Examples
----------
>>> import numpy as np
>>> from watex.utils.baseutils import array2hdf5
>>> data = np.random.randn (100, 27 )
>>> array2hdf5 ('test.h5', data )
>>> load_data = array2hdf5 ( 'test.h5', data, task ='load')
>>> load_data.shape
Out[177]: (100, 27)
"""
import_optional_dependency("h5py")
import h5py
arr = is_iterable( arr, exclude_string =True, transform =True )
act = copy.deepcopy(task)
task = str(task).lower().strip()
if task in ("write", "store", "save"):
task ='store'
assert task in {"store", "load"}, ("Expects ['store'|'load'] as task."
f" Got {act!r}")
# for consistency
arr = np.array ( arr )
h5fname = str(filename).replace ('.h5', '')
if task =='store':
if arr is None:
raise TypeError ("Array cannot be None when the task"
" consists to write a file.")
with h5py.File(h5fname + '.h5', 'w') as hf:
hf.create_dataset(dataname, data=arr)
elif task=='load':
with h5py.File(h5fname +".h5", 'r') as hf:
data = hf[dataname][:]
if ellipsis2false( as_frame )[0]:
data = pd.DataFrame ( data , columns = columns )
return data if task=='load' else None
[docs]
def lowertify (*values, strip = True, return_origin: bool =... ):
""" Strip and convert value to lowercase.
:param value: str , value to convert
:return: value in lowercase and original value.
:Example:
>>> from watex.utils.baseutils import lowertify
>>> lowertify ( 'KIND')
Out[19]: ('kind',)
>>> lowertify ( "KIND", return_origin =True )
Out[20]: (('kind', 'KIND'),)
>>> lowertify ( "args1", 120 , 'ArG3')
Out[21]: ('args1', '120', 'arg3')
>>> lowertify ( "args1", 120 , 'ArG3', return_origin =True )
Out[22]: (('args1', 'args1'), ('120', 120), ('arg3', 'ArG3'))
>>> (kind, kind0) , ( task, task0 ) = lowertify(
"KIND", "task ", return_origin =True )
>>> kind, kind0, task, task0
Out[23]: ('kind', 'KIND', 'task', 'task ')
"""
raw_values = copy.deepcopy(values )
values = [ str(val).lower().strip() if strip else str(val).lower()
for val in values]
return tuple (zip ( values, raw_values)) if ellipsis2false (
return_origin)[0] else tuple (values)
[docs]
def save_or_load(
fname:str, /,
arr: NDArray=None,
task: str='save',
format: str='.txt',
compressed: bool=...,
comments: str="#",
delimiter: str=None,
**kws
):
"""Save or load Numpy array.
Parameters
-----------
fname: file, str, or pathlib.Path
File or filename to which the data is saved.
- >.npy , .npz: If file is a file-object, then the filename is unchanged.
If file is a string or Path, a .npy extension will be appended to the
filename if it does not already have one.
- >.txt: If the filename ends in .gz, the file is automatically saved in
compressed gzip format. loadtxt understands gzipped files transparently.
arr: 1D or 2D array_like
Data to be saved to a text, npy or npz file.
task: str {"load", "save"}
Action to perform. "Save" for storing file into the format
".txt", "npy", ".npz". "load" for loading the data from storing files.
format: str {".txt", ".npy", ".npz"}
The kind of format to save and load. Note that when loading the
compressed data saved into `npz` format, it does not return
systematically the array rather than `np.lib.npyio.NpzFile` files.
Use either `files` attributes to get the list of registered files
or `f` attribute dot the data name to get the loaded data set.
compressed: bool, default=False
Compressed the file especially when file format is set to `.npz`.
comments: str or sequence of str or None, default='#'
The characters or list of characters used to indicate the start
of a comment. None implies no comments. For backwards compatibility,
byte strings will be decoded as 'latin1'. This is useful when `fname`
is in `txt` format.
delimiter: str, optional
The character used to separate the values. For backwards compatibility,
byte strings will be decoded as 'latin1'. The default is whitespace.
kws: np.save ,np.savetext, np.load , np.loadtxt
Additional keywords arguments for saving and loading data.
Return
------
None| data: ArrayLike
Examples
----------
>>> import numpy as np
>>> from watex.utils.baseutils import save_or_load
>>> data = np.random.randn (2, 7)
>>> # save to txt
>>> save_or_load ( "test.txt" , data)
>>> save_or_load ( "test", data, format='.npy')
>>> save_or_load ( "test", data, format='.npz')
>>> save_or_load ( "test_compressed", data, format='.npz', compressed=True )
>>> # load files
>>> save_or_load ( "test.txt", task ='load')
Out[36]:
array([[ 0.69265852, 0.67829574, 2.09023489, -2.34162127, 0.48689125,
-0.04790965, 1.36510779],
[-1.38349568, 0.63050939, 0.81771051, 0.55093818, -0.43066737,
-0.59276321, -0.80709192]])
>>> save_or_load ( "test.npy", task ='load')
Out[39]: array([-2.34162127, 0.55093818])
>>> save_or_load ( "test.npz", task ='load')
<numpy.lib.npyio.NpzFile at 0x1b0821870a0>
>>> npzo = save_or_load ( "test.npz", task ='load')
>>> npzo.files
Out[44]: ['arr_0']
>>> npzo.f.arr_0
Out[45]:
array([[ 0.69265852, 0.67829574, 2.09023489, -2.34162127, 0.48689125,
-0.04790965, 1.36510779],
[-1.38349568, 0.63050939, 0.81771051, 0.55093818, -0.43066737,
-0.59276321, -0.80709192]])
>>> save_or_load ( "test_compressed.npz", task ='load')
...
"""
r_formats = {"npy", "txt", "npz"}
(kind, kind0), ( task, task0 ) = lowertify(
format, task, return_origin =True )
assert kind.replace ('.', '') in r_formats, (
f"File format expects {smart_format(r_formats, 'or')}. Got {kind0!r}")
kind = '.' + kind.replace ('.', '')
assert task in {'save', 'load'}, (
"Wrong task {task0!r}. Valid tasks are 'save' or 'load'")
save= {'.txt': np.savetxt, '.npy':np.save,
".npz": np.savez_compressed if ellipsis2false(
compressed)[0] else np.savez
}
if task =='save':
arr = np.array (is_iterable( arr, exclude_string= True,
transform =True ))
save.get(kind) (fname, arr, **kws )
elif task =='load':
ext = os.path.splitext(fname)[1].lower()
if ext not in (".txt", '.npy', '.npz', '.gz'):
raise ValueError ("Unrecognized file format {ext!r}."
" Expect '.txt', '.npy', '.gz' or '.npz'")
if ext in ('.txt', '.gz'):
arr = np.loadtxt ( fname , comments= comments,
delimiter= delimiter, **kws )
else :
arr = np.load(fname,**kws )
return arr if task=='load' else None
#XXX TODO
[docs]
def request_data (
url:str, /,
task: str='get',
data: Any=None,
as_json: bool=...,
as_text: bool = ...,
stream: bool=...,
raise_status: bool=...,
save2file: bool=...,
filename:str =None,
**kws
):
""" Fetch remotely data
Request data remotely
https://docs.python-requests.org/en/latest/user/quickstart/#raw-response-content
r = requests.get('https://api.github.com/user', auth=('user', 'pass'))
r.status_code
200
r.headers['content-type']
'application/json; charset=utf8'
r.encoding
'utf-8'
r.text
'{"type":"User"...'
r.json()
{'private_gists': 419, 'total_private_repos': 77, ...}
"""
import_optional_dependency('requests' )
import requests
as_text, as_json, stream, raise_status, save2file = ellipsis2false(
as_text, as_json, stream, raise_status , save2file)
if task=='post':
r = requests.post(url, data =data , **kws)
else: r = requests.get(url, stream = stream , **kws)
if save2file and stream:
with open(filename, 'wb') as fd:
for chunk in r.iter_content(chunk_size=128):
fd.write(chunk)
if raise_status:
r.raise_for_status()
return r.text if as_text else ( r.json () if as_json else r )
[docs]
def get_remote_data(
rfile:str, /,
savepath: str=None,
raise_exception: bool =True
):
""" Try to retrieve data from remote.
Parameters
-------------
rfile: str or PathLike-object
Full path to the remote file. It can be the path to the repository
root toward the file name. For instance, to retrieve the file
``'AGSO.csv'`` which is located in ``watex/etc/`` directory then the
full path should be ``'watex/etc/AGSO.csv'``
savepath: str, optional
Full path to place where to downloaded files should be located.
If ``None`` data is saved to the current directory.
raise_exception: bool, default=True
raise exception if connection failed.
Returns
----------
status: bool,
``False`` for failure and ``True`` otherwise i.e. successfully
downloaded.
"""
connect_reason ="""\
ConnectionRefusedError: No connection could be made because the target
machine actively refused it.There are some possible reasons for that:
1. Server is not running as well. Hence it won't listen to that port.
If it's a service you may want to restart the service.
2. Server is running but that port is blocked by Windows Firewall
or other firewall. You can enable the program to go through
firewall in the inbound list.
3. there is a security program on your PC, i.e a Internet Security
or Antivirus that blocks several ports on your PC.
"""
#git_repo , git_root= AGSO_PROPERTIES['GIT_REPO'], AGSO_PROPERTIES['GIT_ROOT']
# usebar bar progression
print(f"---> Please wait while fetching {rfile!r}...")
try: import_optional_dependency ("tqdm")
except:pbar = range(3)
else:
import tqdm
data =os.path.splitext( os.path.basename(rfile))[0]
pbar = tqdm.tqdm (total=3, ascii=True,
desc =f'get-{os.path.basename(rfile)}',
ncols =97
)
status=False
root, rfile = os.path.dirname(rfile), os.path.basename(rfile)
for k in range(3):
try :
urllib.request.urlretrieve(root, rfile )
except:
try :
with urllib.request.urlopen(root) as response:
with open( rfile,'wb') as out_file:
data = response.read() # a `bytes` object
out_file.write(data)
except TimeoutError:
if k ==2:
print("---> Established connection failed because"
"connected host has failed to respond.")
except:pass
else :
status=True
break
try: pbar.update (k+1)
except: pass
if status:
try:
pbar.update (3)
pbar.close ()
except:pass
# print(f"\n---> Downloading {rfile!r} was successfully done.")
else:
print(f"\n---> Failed to download {rfile!r}.")
# now move the file to the right place and create path if dir not exists
if savepath is not None:
if not os.path.isdir(savepath):
sPath (savepath)
shutil.move(os.path.realpath(rfile), savepath )
if not status:
if raise_exception:
raise ConnectionRefusedError(connect_reason.replace (
"ConnectionRefusedError:", "") )
else: print(connect_reason )
return status
[docs]
def download_file(url, local_filename , dstpath =None ):
"""download a remote file.
Parameters
-----------
url: str,
Url to where the file is stored.
loadl_filename: str,
Name of the local file
dstpath: Optional
The destination path to save the downloaded file.
Return
--------
None, local_filename
None if the `dstpath` is supplied and `local_filename` otherwise.
Example
---------
>>> from watex.utils.baseutils import download_file
>>> url = 'https://raw.githubusercontent.com/WEgeophysics/watex/master/watex/datasets/data/h.h5'
>>> local_filename = 'h.h5'
>>> download_file(url, local_filename, test_directory)
"""
import_optional_dependency("requests")
import requests
print("{:-^70}".format(f" Please, Wait while {os.path.basename(local_filename)}"
" is downloading. "))
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
local_filename = os.path.join( os.getcwd(), local_filename)
if dstpath:
move_file_to_directory ( local_filename, dstpath)
print("{:-^70}".format(" ok! "))
return None if dstpath else local_filename
[docs]
def download_file2(url, local_filename, dstpath =None ):
""" Download remote file with a bar progression.
Parameters
-----------
url: str,
Url to where the file is stored.
loadl_filename: str,
Name of the local file
dstpath: Optional
The destination path to save the downloaded file.
Return
--------
None, local_filename
None if the `dstpath` is supplied and `local_filename` otherwise.
Example
--------
>>> from watex.utils.baseutils import download_file2
>>> url = 'https://raw.githubusercontent.com/WEgeophysics/watex/master/watex/datasets/data/h.h5'
>>> local_filename = 'h.h5'
>>> download_file(url, local_filename)
"""
import_optional_dependency("requests")
import requests
try :
import_optional_dependency("tqdm")
from tqdm import tqdm
except:
# if tqm is not install
return download_file (url, local_filename, dstpath )
with requests.get(url, stream=True) as r:
r.raise_for_status()
# Get the total file size from header
total_size_in_bytes = int(r.headers.get('content-length', 0))
block_size = 1024 # 1 Kibibyte
progress_bar = tqdm(total=total_size_in_bytes, unit='iB',
unit_scale=True, ncols=77, ascii=True)
with open(local_filename, 'wb') as f:
for data in r.iter_content(block_size):
progress_bar.update(len(data))
f.write(data)
progress_bar.close()
local_filename = os.path.join( os.getcwd(), local_filename)
if dstpath:
move_file_to_directory ( local_filename, dstpath)
return local_filename
[docs]
def move_file_to_directory(file_path, directory):
""" Move file to a directory.
Create a directory if not exists.
Parameters
-----------
file_path: str,
Path to the local file
directory: str,
Path to locate the directory.
Example
---------
>>> from watex.utils.baseutils import move_file_to_directory
>>> file_path = 'path/to/your/file.txt' # Replace with your file's path
>>> directory = 'path/to/your/directory' # Replace with your directory's path
>>> move_file_to_directory(file_path, directory)
"""
# Create the directory if it doesn't exist
if not os.path.exists(directory):
os.makedirs(directory)
# Move the file to the directory
shutil.move(file_path, os.path.join(directory, os.path.basename(file_path)))
[docs]
def check_file_exists(package, resource):
"""
Check if a file exists in a package's directory with
importlib.resources.
:param package: The package containing the resource.
:param resource: The resource (file) to check.
:return: Boolean indicating if the resource exists.
:example:
>>> from watex.utils.baseutils import check_file_exists
>>> package_name = 'watex.datasets.data' # Replace with your package name
>>> file_name = 'h.h5' # Replace with your file name
>>> file_exists = check_file_exists(package_name, file_name)
>>> print(f"File exists: {file_exists}")
"""
import_optional_dependency("importlib")
import importlib.resources as pkg_resources
return pkg_resources.is_resource(package, resource)