# -*- coding: utf-8 -*-
# License: BSD-3-Clause
# Author: LKouadio <etanoyau@gmail.com>
from __future__ import (
annotations ,
print_function
)
import os
import copy
import shutil
from six.moves import urllib
import numpy as np
import pandas as pd
from .._typing import (
Any,
List,
NDArray,
DataFrame,
)
from .funcutils import (
is_iterable,
ellipsis2false,
smart_format,
sPath
)
from ._dependency import (
import_optional_dependency
)
[docs]def array2hdf5 (
filename: str, /,
arr: NDArray=None ,
dataname: str='data',
task: str='store',
as_frame: bool =...,
columns: List[str, ...]=None,
)-> NDArray | DataFrame:
""" Load or write array to hdf5
Parameters
-----------
arr: Arraylike ( m_samples, n_features)
Data to load or write
filename: str,
Hdf5 disk file name whether to write or to load
task: str, {"store", "load", default='store'}
Action to perform. user can use ['write'|'store'] interchnageably. Both
does the same task.
as_frame: bool, default=False
Concert loaded array to data frame. `Columns` can be supplied
to construct the datafame.
columns: List, Optional
Columns used to construct the dataframe. When its given, it must be
consistent with the shape of the `arr` along axis 1
Returns
---------
None| data: ArrayLike or pd.DataFrame
Examples
----------
>>> import numpy as np
>>> from watex.utils.baseutils import array2hdf5
>>> data = np.random.randn (100, 27 )
>>> array2hdf5 ('test.h5', data )
>>> load_data = array2hdf5 ( 'test.h5', data, task ='load')
>>> load_data.shape
Out[177]: (100, 27)
"""
import_optional_dependency("h5py")
import h5py
arr = is_iterable( arr, exclude_string =True, transform =True )
act = copy.deepcopy(task)
task = str(task).lower().strip()
if task in ("write", "store"):
task ='store'
assert task in {"store", "load"}, ("Expects ['store'|'load'] as task."
f" Got {act!r}")
# for consistency
arr = np.array ( arr )
h5fname = str(filename).replace ('.h5', '')
if task =='store':
if arr is None:
raise TypeError ("Array cannot be None when the task"
" consists to write a file.")
with h5py.File(h5fname + '.h5', 'w') as hf:
hf.create_dataset(dataname, data=arr)
elif task=='load':
with h5py.File(h5fname +".h5", 'r') as hf:
data = hf[dataname][:]
if ellipsis2false( as_frame )[0]:
data = pd.DataFrame ( data , columns = columns )
return data if task=='load' else None
[docs]def lowertify (*values, strip = True, return_origin: bool =... ):
""" Strip and convert value to lowercase.
:param value: str , value to convert
:return: value in lowercase and original value.
:Example:
>>> from watex.utils.baseutils import lowertify
>>> lowertify ( 'KIND')
Out[19]: ('kind',)
>>> lowertify ( "KIND", return_origin =True )
Out[20]: (('kind', 'KIND'),)
>>> lowertify ( "args1", 120 , 'ArG3')
Out[21]: ('args1', '120', 'arg3')
>>> lowertify ( "args1", 120 , 'ArG3', return_origin =True )
Out[22]: (('args1', 'args1'), ('120', 120), ('arg3', 'ArG3'))
>>> (kind, kind0) , ( task, task0 ) = lowertify(
"KIND", "task ", return_origin =True )
>>> kind, kind0, task, task0
Out[23]: ('kind', 'KIND', 'task', 'task ')
"""
raw_values = copy.deepcopy(values )
values = [ str(val).lower().strip() if strip else str(val).lower()
for val in values]
return tuple (zip ( values, raw_values)) if ellipsis2false (
return_origin)[0] else tuple (values)
[docs]def save_or_load(
fname:str, /,
arr: NDArray=None,
task: str='save',
format: str='.txt',
compressed: bool=...,
comments: str="#",
delimiter: str=None,
**kws
):
"""Save or load Numpy array.
Parameters
-----------
fname: file, str, or pathlib.Path
File or filename to which the data is saved.
- >.npy , .npz: If file is a file-object, then the filename is unchanged.
If file is a string or Path, a .npy extension will be appended to the
filename if it does not already have one.
- >.txt: If the filename ends in .gz, the file is automatically saved in
compressed gzip format. loadtxt understands gzipped files transparently.
arr: 1D or 2D array_like
Data to be saved to a text, npy or npz file.
task: str {"load", "save"}
Action to perform. "Save" for storing file into the format
".txt", "npy", ".npz". "load" for loading the data from storing files.
format: str {".txt", ".npy", ".npz"}
The kind of format to save and load. Note that when loading the
compressed data saved into `npz` format, it does not return
systematically the array rather than `np.lib.npyio.NpzFile` files.
Use either `files` attributes to get the list of registered files
or `f` attribute dot the data name to get the loaded data set.
compressed: bool, default=False
Compressed the file especially when file format is set to `.npz`.
comments: str or sequence of str or None, default='#'
The characters or list of characters used to indicate the start
of a comment. None implies no comments. For backwards compatibility,
byte strings will be decoded as 'latin1'. This is useful when `fname`
is in `txt` format.
delimiter: str, optional
The character used to separate the values. For backwards compatibility,
byte strings will be decoded as 'latin1'. The default is whitespace.
kws: np.save ,np.savetext, np.load , np.loadtxt
Additional keywords arguments for saving and loading data.
Return
------
None| data: ArrayLike
Examples
----------
>>> import numpy as np
>>> from watex.utils.baseutils import save_or_load
>>> data = np.random.randn (2, 7)
>>> # save to txt
>>> save_or_load ( "test.txt" , data)
>>> save_or_load ( "test", data, format='.npy')
>>> save_or_load ( "test", data, format='.npz')
>>> save_or_load ( "test_compressed", data, format='.npz', compressed=True )
>>> # load files
>>> save_or_load ( "test.txt", task ='load')
Out[36]:
array([[ 0.69265852, 0.67829574, 2.09023489, -2.34162127, 0.48689125,
-0.04790965, 1.36510779],
[-1.38349568, 0.63050939, 0.81771051, 0.55093818, -0.43066737,
-0.59276321, -0.80709192]])
>>> save_or_load ( "test.npy", task ='load')
Out[39]: array([-2.34162127, 0.55093818])
>>> save_or_load ( "test.npz", task ='load')
<numpy.lib.npyio.NpzFile at 0x1b0821870a0>
>>> npzo = save_or_load ( "test.npz", task ='load')
>>> npzo.files
Out[44]: ['arr_0']
>>> npzo.f.arr_0
Out[45]:
array([[ 0.69265852, 0.67829574, 2.09023489, -2.34162127, 0.48689125,
-0.04790965, 1.36510779],
[-1.38349568, 0.63050939, 0.81771051, 0.55093818, -0.43066737,
-0.59276321, -0.80709192]])
>>> save_or_load ( "test_compressed.npz", task ='load')
...
"""
r_formats = {"npy", "txt", "npz"}
(kind, kind0), ( task, task0 ) = lowertify(
format, task, return_origin =True )
assert kind.replace ('.', '') in r_formats, (
f"File format expects {smart_format(r_formats, 'or')}. Got {kind0!r}")
kind = '.' + kind.replace ('.', '')
assert task in {'save', 'load'}, (
"Wrong task {task0!r}. Valid tasks are 'save' or 'load'")
save= {'.txt': np.savetxt, '.npy':np.save,
".npz": np.savez_compressed if ellipsis2false(
compressed)[0] else np.savez
}
if task =='save':
arr = np.array (is_iterable( arr, exclude_string= True,
transform =True ))
save.get(kind) (fname, arr, **kws )
elif task =='load':
ext = os.path.splitext(fname)[1].lower()
if ext not in (".txt", '.npy', '.npz', '.gz'):
raise ValueError ("Unrecognized file format {ext!r}."
" Expect '.txt', '.npy', '.gz' or '.npz'")
if ext in ('.txt', '.gz'):
arr = np.loadtxt ( fname , comments= comments,
delimiter= delimiter, **kws )
else :
arr = np.load(fname,**kws )
return arr if task=='load' else None
#XXX TODO
[docs]def request_data (
url:str, /,
task: str='get',
data: Any=None,
as_json: bool=...,
as_text: bool = ...,
stream: bool=...,
raise_status: bool=...,
save2file: bool=...,
filename:str =None,
**kws
):
""" Fetch remotely data
Request data remotely
https://docs.python-requests.org/en/latest/user/quickstart/#raw-response-content
r = requests.get('https://api.github.com/user', auth=('user', 'pass'))
r.status_code
200
r.headers['content-type']
'application/json; charset=utf8'
r.encoding
'utf-8'
r.text
'{"type":"User"...'
r.json()
{'private_gists': 419, 'total_private_repos': 77, ...}
"""
import_optional_dependency('requests' )
import requests
as_text, as_json, stream, raise_status, save2file = ellipsis2false(
as_text, as_json, stream, raise_status , save2file)
if task=='post':
r = requests.post(url, data =data , **kws)
else: r = requests.get(url, stream = stream , **kws)
if save2file and stream:
with open(filename, 'wb') as fd:
for chunk in r.iter_content(chunk_size=128):
fd.write(chunk)
if raise_status:
r.raise_for_status()
return r.text if as_text else ( r.json () if as_json else r )
[docs]def get_remote_data(
rfile:str, /,
savepath: str=None,
raise_exception: bool =True
):
""" Try to retrieve data from remote.
Parameters
-------------
rfile: str or PathLike-object
Full path to the remote file. It can be the path to the repository
root toward the file name. For instance, to retrieve the file
``'AGSO.csv'`` which is located in ``watex/etc/`` directory then the
full path should be ``'watex/etc/AGSO.csv'``
savepath: str, optional
Full path to place where to downloaded files should be located.
If ``None`` data is saved to the current directory.
raise_exception: bool, default=True
raise exception if connection failed.
Returns
----------
status: bool,
``False`` for failure and ``True`` otherwise i.e. successfully
downloaded.
"""
connect_reason ="""\
ConnectionRefusedError: No connection could be made because the target
machine actively refused it.There are some possible reasons for that:
1. Server is not running as well. Hence it won't listen to that port.
If it's a service you may want to restart the service.
2. Server is running but that port is blocked by Windows Firewall
or other firewall. You can enable the program to go through
firewall in the inbound list.
3. there is a security program on your PC, i.e a Internet Security
or Antivirus that blocks several ports on your PC.
"""
#git_repo , git_root= AGSO_PROPERTIES['GIT_REPO'], AGSO_PROPERTIES['GIT_ROOT']
# usebar bar progression
print(f"---> Please wait while fetching {rfile!r}...")
try: import_optional_dependency ("tqdm")
except:pbar = range(3)
else:
import tqdm
data =os.path.splitext( os.path.basename(rfile))[0]
pbar = tqdm.tqdm (total=3, ascii=True,
desc =f'get-{os.path.basename(rfile)}',
ncols =97
)
status=False
root, rfile = os.path.dirname(rfile), os.path.basename(rfile)
for k in range(3):
try :
urllib.request.urlretrieve(root, rfile )
except:
try :
with urllib.request.urlopen(root) as response:
with open( rfile,'wb') as out_file:
data = response.read() # a `bytes` object
out_file.write(data)
except TimeoutError:
if k ==2:
print("---> Established connection failed because"
"connected host has failed to respond.")
except:pass
else :
status=True
break
try: pbar.update (k+1)
except: pass
if status:
try:
pbar.update (3)
pbar.close ()
except:pass
# print(f"\n---> Downloading {rfile!r} was successfully done.")
else:
print(f"\n---> Failed to download {rfile!r}.")
# now move the file to the right place and create path if dir not exists
if savepath is not None:
if not os.path.isdir(savepath):
sPath (savepath)
shutil.move(os.path.realpath(rfile), savepath )
if not status:
if raise_exception:
raise ConnectionRefusedError(connect_reason.replace (
"ConnectionRefusedError:", "") )
else: print(connect_reason )
return status
[docs]def download_file(url, local_filename , dstpath =None ):
"""download a remote file.
Parameters
-----------
url: str,
Url to where the file is stored.
loadl_filename: str,
Name of the local file
dstpath: Optional
The destination path to save the downloaded file.
Return
--------
None, local_filename
None if the `dstpath` is supplied and `local_filename` otherwise.
Example
---------
>>> from watex.utils.baseutils import download_file
>>> url = 'https://raw.githubusercontent.com/WEgeophysics/watex/master/watex/datasets/data/h.h5'
>>> local_filename = 'h.h5'
>>> download_file(url, local_filename, test_directory)
"""
import_optional_dependency("requests")
import requests
print("{:-^70}".format(f" Please, Wait while {os.path.basename(local_filename)}"
" is downloading. "))
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
local_filename = os.path.join( os.getcwd(), local_filename)
if dstpath:
move_file_to_directory ( local_filename, dstpath)
print("{:-^70}".format(" ok! "))
return None if dstpath else local_filename
[docs]def download_file2(url, local_filename, dstpath =None ):
""" Download remote file with a bar progression.
Parameters
-----------
url: str,
Url to where the file is stored.
loadl_filename: str,
Name of the local file
dstpath: Optional
The destination path to save the downloaded file.
Return
--------
None, local_filename
None if the `dstpath` is supplied and `local_filename` otherwise.
Example
--------
>>> from watex.utils.baseutils import download_file2
>>> url = 'https://raw.githubusercontent.com/WEgeophysics/watex/master/watex/datasets/data/h.h5'
>>> local_filename = 'h.h5'
>>> download_file(url, local_filename)
"""
import_optional_dependency("requests")
import requests
try :
import_optional_dependency("tqdm")
from tqdm import tqdm
except:
# if tqm is not install
return download_file (url, local_filename, dstpath )
with requests.get(url, stream=True) as r:
r.raise_for_status()
# Get the total file size from header
total_size_in_bytes = int(r.headers.get('content-length', 0))
block_size = 1024 # 1 Kibibyte
progress_bar = tqdm(total=total_size_in_bytes, unit='iB',
unit_scale=True, ncols=77, ascii=True)
with open(local_filename, 'wb') as f:
for data in r.iter_content(block_size):
progress_bar.update(len(data))
f.write(data)
progress_bar.close()
local_filename = os.path.join( os.getcwd(), local_filename)
if dstpath:
move_file_to_directory ( local_filename, dstpath)
return local_filename
[docs]def move_file_to_directory(file_path, directory):
""" Move file to a directory.
Create a directory if not exists.
Parameters
-----------
file_path: str,
Path to the local file
directory: str,
Path to locate the directory.
Example
---------
>>> from watex.utils.baseutils import move_file_to_directory
>>> file_path = 'path/to/your/file.txt' # Replace with your file's path
>>> directory = 'path/to/your/directory' # Replace with your directory's path
>>> move_file_to_directory(file_path, directory)
"""
# Create the directory if it doesn't exist
if not os.path.exists(directory):
os.makedirs(directory)
# Move the file to the directory
shutil.move(file_path, os.path.join(directory, os.path.basename(file_path)))
[docs]def check_file_exists(package, resource):
"""
Check if a file exists in a package's directory with
importlib.resources.
:param package: The package containing the resource.
:param resource: The resource (file) to check.
:return: Boolean indicating if the resource exists.
:example:
>>> from watex.utils.baseutils import check_file_exists
>>> package_name = 'watex.datasets.data' # Replace with your package name
>>> file_name = 'h.h5' # Replace with your file name
>>> file_exists = check_file_exists(package_name, file_name)
>>> print(f"File exists: {file_exists}")
"""
import_optional_dependency("importlib")
import importlib.resources as pkg_resources
return pkg_resources.is_resource(package, resource)