Source code for watex.datasets.rload

# -*- coding: utf-8 -*-
#   License: BSD-3-Clause
#   Author: LKouadio <etanoyau@gmail.com> 
#   Created on Sat Oct  1 15:24:33 2022
"""
Remote Loader 
==============

Fetch data online from zenodo record or repository.  
"""
from __future__ import (
    print_function , 
    annotations 
    )
import os 
import time
import sys 
import subprocess 
import concurrent.futures
import shutil  
import zipfile
import warnings 
from six.moves import urllib 

from .._typing import (
    Optional, 
    )
from ..utils.funcutils import (
    is_installing 
)
from ..exceptions import (
    ExtractionError 
)
from ..utils.mlutils import (
    fetchSingleTGZData, 
    subprocess_module_installation
    )
from .._watexlog import  watexlog
_logger = watexlog().get_watex_logger(__name__)

##### config repo data ################################################

_DATA = 'data/geodata/main.bagciv.data.csv'
_ZENODO_RECORD= '10.5281/zenodo.5571534'

_TGZ_DICT = dict (
    # path = 'data/__tar.tgz', 
    tgz_f = 'data/__tar.tgz/fmain.bagciv.data.tar.gz', 
    csv_f = '/__tar.tgz_files__/___fmain.bagciv.data.csv'
 )

_GIT_DICT = dict(
    root  = 'https://raw.githubusercontent.com/WEgeophysics/watex/master/' , 
    repo = 'https://github.com/WEgeophysics/watex' , 
    blob_root = 'https://github.com/WEgeophysics/watex/blob/master/'
 )
_GIT_DICT ['url_tgz'] = _GIT_DICT.get ('root') + _TGZ_DICT.get('tgz_f')


[docs] def loadBagoueDataset (): """Load a Bagoue dataset Example -------- >>> from watex.datasets import Loader >>> loadBagoueDataset () ... dataset: 0%| | 0/1 [00:00<?, ?B/s] ... ### -> Wait while decompressing 'fmain.bagciv.data.tar.gz' file ... ... --- -> Fail to decompress 'fmain.bagciv.data.tar.gz' file ... --- -> 'main.bagciv.data.csv' not found in the local machine ... ### -> Wait while fetching data from 'https://raw.githubusercontent.com/WEgeophysics/watex/master/'... ... +++ -> Load data from 'https://raw.githubusercontent.com/WEgeophysics/watex/master/' successfully done! ... dataset: 100%|##################################| 1/1 [00:03<00:00, 3.38s/B] """ # LOCAL_DIR = 'data/geodata' # DATA_DIR= os.path.join(LOCAL_DIR, 'main.bagciv.data.csv') # DATA_PATH = 'data/__tar.tgz' # TGZ_FILENAME = '/fmain.bagciv.data.tar.gz' # CSV_FILENAME = '/__tar.tgz_files__/___fmain.bagciv.data.csv' # DATA_URL = GIT_ROOT + DATA_PATH + TGZ_FILENAME # blob root = 'https://github.com/WEgeophysics/watex/blob/master/' # GIT_ROOT = 'https://raw.githubusercontent.com/WEgeophysics/watex/master/' # GIT_REPO= 'https://github.com/WEgeophysics/watex' # from Zenodo: 'https://zenodo.org/record/5560937#.YWQBOnzithE' Loader ( zenodo_record= _ZENODO_RECORD, content_url= _GIT_DICT.get('root'), repo_url= _GIT_DICT.get ('repo'), tgz_file=_GIT_DICT.get('url_tgz'), blobcontent_url = _GIT_DICT.get ('blob_root'), zip_or_rar_file= 'BagoueCIV__dataset__main.rar', csv_file = _TGZ_DICT.get('csv_f'), verbose= 10 ).fit(_DATA)
[docs] class Loader: """ Load data from online Parameters ---------- *zenodo_record*: str A zenod digital object identifier (doi) of filepath to zenodo record. *content_url*: str, File path to the repository user content. If your use GitHub where the data is located in default branch for example a master branch, it can be 'https://raw.githubusercontent.com/WEgeophysics/watex/master/' *repo_url*: str A url for repository that host the project *tgzfile*: str, Data can be save in TGZ file format. It that is the case, can provide to fetch the data if all attempt to fetched the file failed. *verbose*: int, Level of verbosity. Higher equals to more messages. *root2blobcontent*: str Root to blob master is a nested way to the convenient way to retrieve raw data in GitHUB *csv_file*: str Path to the main csv file to retreive in the record. """ def __init__(self, zenodo_record:str = None, content_url:str = None, repo_url: str = None, tgz_file:str = None, blobcontent_url:str = None, zip_or_rar_file:str = None, csv_file: str = None, verbose: int =0 , ): self.zenodo_record = zenodo_record self.content_url = content_url self.blobcontent_url = blobcontent_url self.repo_url =repo_url self.tgz_file = tgz_file self.zip_or_rar_file=zip_or_rar_file self.csv_file = csv_file self.verbose = verbose self.f_= None @property def update_zenodo_record (self): return self.zenodo_record @update_zenodo_record.setter def update_zenodo_record(self, uzr): self.zenodo_record = uzr @property def f(self): return self.f_ @f.setter def f (self, file): """ assert the file exists""" self.f_ = file
[docs] def fit(self , f:str = None): """ Retreive Bagoue dataset from Github repository or zenodo record. It will take a while when fetching data for the first time outsite of this repository. Since cloning the repository come with examples dataset located to its appropriate directory. It's probably a rare to fectch using internet unless dataset as well as the tarfile are deleted from its located directory. Parameters ------------ f : str `f` is the reference to the main file containing the data acting like a path -like object. Returns ------- ``self`` :class:`~.Loader` instance Notes --------- Retreiving dataset line Bagoue dataset from Github repository or zenodo record. It could take a while to fetch data for the first time outsite of therepository. Since cloning the repository come with examples dataset located to its appropriate directory, it's probably not useful to fectch the data from internet unless the dataset ( with the tarfileor not ) are deleted from the local directory. Example --------- >>> from watex.datasets.load import Loader >>> loadObj = Loader ( zenodo_record= '10.5281/zenodo.5571534', content_url= 'https://raw.githubusercontent.com/WEgeophysics/watex/master/', repo_url= 'https://github.com/WEgeophysics/watex', tgz_file='https://raw.githubusercontent.com/WEgeophysics/watex/master/data/__tar.tgz/fmain.bagciv.data.tar.gz', blobcontent_url = 'https://github.com/WEgeophysics/watex/blob/master/', zip_or_rar_file= 'BagoueCIV__dataset__main.rar', csv_file = '/__tar.tgz_files__/___fmain.bagciv.data.csv', verbose= 10 ) >>> loadObj.fit('data/geodata/main.bagciv.data.csv') ... ### -> Wait while decompressing 'fmain.bagciv.data.tar.gz' file ... ... --- -> Fail to decompress 'fmain.bagciv.data.tar.gz' file ... --- -> 'main.bagciv.data.csv' not found in the local machine ... ### -> Wait while fetching data from 'https://raw.githubusercontent.com/WEgeophysics/watex/master/'... ... +++ -> Load data from 'https://raw.githubusercontent.com/WEgeophysics/watex/master/' successfully done! dataset: 100%|##################################| 1/1 [00:04<00:00, 4.95s/B] Out[23]: <watex.datasets.load.Loader at 0x2210bedf880> """ #--++++++-------import tqdm package TQDM= False try : import tqdm except ImportError: is_success = is_installing('tqdm' ) if not is_success: warnings.warn("'Auto-install tqdm' failed. Could be installed it manually" " Can get 'tqdm' here <https://pypi.org/project/tqdm/> ") _logger.info ("Failed to install automatically 'tqdm'. Can get the " "package via https://pypi.org/project/tqdm/") else : TQDM = True else: TQDM = True #--++++++------- if f is not None: self.f= f mess =f" Unable to load {os.path.basename(self.f)!r} from " if not TQDM: with concurrent.futures.ThreadPoolExecutor() as executor: modules =[ 'notebook', 'ipywidgets', 'tqdm'] try : is_success =list(executor.map( subprocess_module_installation, modules)) except : results = [executor.submit( subprocess_module_installation, args =[mod, True]) for mod in modules] is_success =[f.result() for f in concurrent.futures.as_completed(results)] # if n all modules were executed successffuly # force tqm TQDM = is_success [0] if len(set(is_success))==1 else False pbar = range(1) if not TQDM else tqdm.tqdm(range(1) ,ascii=True, unit='B', desc ="dataset", ncols =77) for _ in pbar : total , start =0, time.perf_counter() if not os.path.isdir(os.path.dirname (self.f) ): os.makedirs(os.path.dirname (self.f) ) # --> seek local file is_file = self._fromlocal(self.f) if not is_file: if self.verbose > 3: print(f"--- -> {os.path.basename(self.f)!r} not found in the " " local machine ") _logger.info(f"{os.path.basename(self.f)!r} file is missing ") is_file = self._fromgithub() if not is_file : _logger.info(mess + 'Github') is_file = self._fromzenodo() if not is_file : _logger.info(mess + 'Zenodo') _logger.info (f"Unable to fetch {os.path.basename(f)!r} from online") end = time.perf_counter() time.sleep(abs(start -end)) pbar.update(total) return _logger.info(f"{os.path.basename(f)!r} was successfully loaded.") end = time.perf_counter() time.sleep(abs(start -end)) if is_file: total =1 pbar.update(total) return self #f
def _fromzenodo(self, zenodo_record: str = None, # ZENODO_RECORD_ID_OR_DOI, # LOCAL_DIR, f: str = None, zip_or_rar_file : str = None, csv_file : Optional[str]= None, )-> str: """Fetch data from zenodo records with ``zenodo_record`` and ``f`` Here is the way to fetch the main dataset from the record using the module `zenodo_get` Parameters ----------- zenodo_record: str or Zenodo get obj Record of zenodo database. see https://zenodo.org/ f : str Path -like object. f is the main file containing the data zip_or_rar: str Path like object to *.zip or *.rar file. csv_file: str Path to the main csv file to retreive in the record. Returns --------- str : File or record path Here is the way to fetch the main dataset Example -------- >>> from watex.datasets.load import ( _DATA , _ZENODO_RECORD , _TGZ_DICT, _GIT_DICT, Loader) >>> Loader (verbose = 10 )._fromzenodo( f = _DATA, zenodo_record =_ZENODO_RECORD, zip_or_rar_file= 'BagoueCIV__dataset__main.rar', csv_file = _TGZ_DICT.get('csv_f') ) """ if f is not None: self.f = f if zenodo_record is not None: self.zenodo_record= zenodo_record if zip_or_rar_file is not None: self.zip_or_rar_file = zip_or_rar_file if self.zenodo_record is None: raise TypeError ( "Expect a zenodo record <'XXX/zenodo.YYYYY'>, get: 'None'") if not os.path.isdir(os.path.dirname(self.f ) ): os.makedirs(os.path.dirname(self.f ) ) success_import=False try: import zenodo_get except: # this will take a while if the connection is low. # Please be patient. try: if self.verbose : print("--- -> wait while zenodo_get is installing ...") is_ = is_installing ('zenodo_get') if is_ : _logger.info("'+++ -> zenodo_get' installation complete. ") success_import=True if self.verbose > 3 : print("+++ -> zenodo_get' installation complete. ") except : # Connection problem may happens. if self.verbose > 3 : print('--- -> Fail to install Zenodo_get') _logger.info("Fail to install `zenodo_get`") else: success_import=True if not success_import: raise ConnectionError( F"Unable to retrieve data from record= <{self.zenodo_record!r}>.") # if zenodo_get is already installed Then used to # downloaed the record by calling the subprocess methods _logger.info(" 'zenodo_get' package already installed") if self.verbose: print(f"### -> wait while the record {self.zenodo_record!r}" " is downloading...") try : subprocess.check_call([sys.executable, '-m', 'zenodo_get', self.zenodo_record]) except: raise ConnectionError ( f"CalledProcessError: {self.zenodo_record!r} returned " "non-zero exit status 1. Please check your internet!") if self.verbose: print(f"+++ -> Record {self.zenodo_record!r} successfully downloaded.") if not os.path.isdir(os.path.dirname(self.f ) ): os.makedirs(os.path.dirname(self.f ) ) # check whether Archive file is '.rar' or '.zip' _, ex = os.path.splitext (self.zip_or_rar_file) is_zipORrar =os.path.isfile (self.zip_or_rar_file ) if is_zipORrar : ziprar_file = os.path.basename (self.zip_or_rar_file ) # # else: ziprar_file = 'BagoueCIV__dataset__main.zip' # is_zipORrar =os.path.isfile ('BagoueCIV__dataset__main.rar') # if is_zipORrar : ziprar_file = 'BagoueCIV__dataset__main.rar' # else: ziprar_file = 'BagoueCIV__dataset__main.zip' # For consistency add curent work directory and move zip_rar file to # the path =LOCAL_DIR and also move the md5sums file. move_file(os.path.join(os.getcwd(),ziprar_file), os.path.dirname(self.f ) ) if os.path.isfile(os.path.join(os.getcwd(), 'md5sums.txt')): move_file(os.path.join(os.getcwd(), 'md5sums.txt'), os.path.dirname(self.f ) ) if self.verbose > 3 : print(f"### -> Record <{zenodo_record!r}={ziprar_file!r}> found in " f" {os.path.dirname(self.f )!r}.") print(f"### -> Wait while {'unziping' if not is_zipORrar else 'unraring'}" " the record...") #Now unzip file in the LOCAL DIR then move the file to # it right place and rename it. f0=self.unZipFileFetchedFromZenodo( f= os.path.dirname(self.f ) , zip_file =self.zip_or_rar_file, csv_file = self.csv_file , ) try : # if file exists then remove the archive os.remove(os.path.join(self.f , ziprar_file)) except : pass return f0 def _fromgithub( self, f: str=None , content_url:str=None ) -> bool | str: """ Fetch the data from repository if file is hosted there. It creates path to the local matchine and save file. Parameters ----------- *f* : str Path -like object. f is the main file containing the data *content_url*: str, File path to the repository user content. If your use GitHub where the data is located in default branch for example a master branch, it can be 'https://raw.githubusercontent.com/WEgeophysics/watex/master/' *repo_url*: str A url for repository that host the project """ # make a request if f is not None: self.f = f if content_url is not None: self.content_url = content_url if not os.path.isdir(os.path.dirname(self.f)): os.makedirs(os.path.dirname(self.f)) success =False #'https://raw.githubusercontent.com/WEgeophysics/watex/master/data/geo_fdata/main.bagciv.data.csv' rootf = os.path.join(self.content_url, self.f) atp =[f"### -> Wait while fetching data from {self.content_url!r}...", '... ', '... '] for i in range(3): try : # first attemptts to 03 print(atp[i], end ='') urllib.request.urlretrieve(rootf, self.f) except TimeoutError: if i ==2: if self.verbose> 3: print("--- -> Established connection failed because " " connected host has failed to respond.") success =False except:success =False else : success=True if success: break if not success: # CHANGEGIT Root try: if self.verbose > 3 : print("### -> An alternative way using <blob/master>...") rootf0= self.blobcontent_url + self.f urllib.request.urlretrieve(rootf0, self.f ) except :success =False else:success =True if not success: if self.verbose: print("---> Coerce the root instead ...") #'https://github.com/WEgeophysics/watex/blob/master/data/geo_fdata/main.bagciv.data.csv' #second attempts try : with urllib.request.urlopen(rootf) as testfile, open(f, 'w') as fs: fs.write(testfile.read().decode()) except : # third attempts try: import requests response = requests.get(rootf) with open(os.path.join( os.path.dirname (self.f),os.path.basename(self.f)), 'wb') as fs: fs.write(response.content) except: success=False else : if self.verbose: print(f"+++ -> Load data from {self.content_url!r} " "successfully done!") success =True if not success: print(f"--- -> Fail to download data from {self.content_url!r} !") return False if success : # assume the data is locate in current directory # then move to the right place in Local dir if os.path.isfile(os.path.basename (self.f)): move_file(os.path.basename (self.f), os.path.dirname (self.fn)) # print("---> Fetching `main.bagciv.data.csv`from {GIT_REPO!r}" # " was successfully done!") if success: print() print( f"+++ -> Load data from {self.content_url!r} successfully done!") return self.f def _fromlocal (self, f: str = None # DATA_DIR ) -> str : """ Check whether the local file exists and return file name. Turn on all the possibility i.e read the *.tgz and *.tar file if exist in the local machine. Parameters ----------- f : str Path -like object. f is the main file containing the data """ if f is not None: self.f = f is_file =os.path.isfile(self.f) if not is_file and self.tgz_file is not None: tgz= os.path.basename(self.tgz_file) try: if self.verbose > 3 : print() print(f"### -> Wait while decompressing {tgz!r} file ... ") f0=fetchSingleTGZData( self.tgz_file, rename_outfile=os.path.basename(self.f) ) _logger.info(f"Decompressed {tgz!r} successufully done.") except : _logger.info(f"Fail to decompres{tgz!r} ") if self.verbose: print(f"--- -> Fail to decompress {tgz!r} file") return False else : if self.verbose: print(f"+++ -> Decompressed {tgz!r} sucessfully done!") # return new file if file alread created in the local # machine. self.f = f0 return self.f if os.path.isfile (self.f) else False
[docs] def unZipFileFetchedFromZenodo(self, f: str = None , # LOCAL_DIR, #'BagoueCIV__dataset__main.rar', zip_or_rar_file: str = None, # '/__tar.tgz_files__/___fmain.bagciv.data.csv', csv_file:str =None, ): """ Unzip or Unrar the archived file and shift from the local directory created if not exits. Parameters ----------- f : str Path -like object. f is the main file containing the data zip_or_rar: str Path like object to *.zip or *.rar file. csv_file: str Path to the main csv file to retreive in the record. Returns --------- str : path like object to the unzipped File """ # zipORrar_ex = zip_file.replace('BagoueCIV__dataset__main', '') # zip_file=zip_file.replace(zipORrar_ex, '') if f is not None: self.f = f if zip_or_rar_file is not None: self.zip_or_rar_file = zip_or_rar_file if csv_file is not None: self.csv_file = csv_file zipORrar_ex = os.path.splitext(self.zip_or_rar_file )[1] self.zip_or_rar_file=self.zip_or_rar_file.replace(zipORrar_ex, '') # file is in zip #'/__tar.tgz_files__/___fmain.bagciv.data.csv' raw_location = self.zip_or_rar_file + self.csv_file zipdir = os.path.dirname (self.f) if zipORrar_ex=='.zip': try : # CSV_FILENAME[1:]= '__tar.tgz_files__/___fmain.bagciv.data.csv', zip_location= os.path.join(zipdir, self.zip_or_rar_file +'.zip') fetchSingleZIPData(zip_file= zip_location, zipdir = zipdir , file_to_extract=self.csv_file[1:], savepath=zipdir, rename_outfile=os.path.basename (self.f) , verbose= self.verbose ) except : raise OSError(f"Unzip {self.zip_or_rar_file +'.zip'}!r> failed." 'Please try again.') elif zipORrar_ex=='.rar': fetchSingleRARData(zip_file = self.zip_or_rar_file, file_to_extract= raw_location, zipdir =zipdir ) #'/___fmain.bagciv.data.csv'): if os.path.isfile (zipdir + '/' + os.path.basename(self.csv_file)): os.rename(zipdir + '/' + os.path.basename(self.csv_file), zipdir + '/' + os.path.basename (self.f) #'main.bagciv.data.csv' ) # Ascertain the file f0 = self._fromlocal(zipdir + '/' + os.path.basename (self.f)) if f0 ==zipdir + '/' + os.path.basename (self.f): print(f"+++ -> Extraction of {'/' + os.path.basename (self.f)} complete!") return f0
[docs] def fetchSingleRARData( zip_file :str , member_to_extract:str, zipdir: str , verbose: False, )-> None: """ RAR archived file domwloading process.""" rarmsg = ["--> Please wait while using `rarfile` module to " f"<{zip_file}> decompressing...", "--> Please wait while using `unrar` module to " f"<{zip_file}> decompressing..."] for i, name in enumerate('rarfile', 'unrar'): installation_succeeded =False try : if i==0: import rarfile elif i==1: from unrar import rarfile except : try: print(f"---> {name!r} is installing. Please wait ...") is_installing(name) except : print("--> Failed to install {name!r} module !") if name =='unrar': print("---> Couldn't find path to unrar library. Please refer" " to https://pypi.org/project/unrar/ and download the " "UnRAR library. src: http://www.rarlab.com/rar/unrarsrc-5.2.6.tar.gz " "or src(Window): (http://www.rarlab.com/rar/UnRARDLL.exe)." ) raise ExtractionError ( "Fail to install UnrarLibrary!") continue else : _logger.info(f"Intallation of {name!r} was successfully done!") print(f"---> Installing of {name!r} is sucessfully done!") installation_succeeded=True if installation_succeeded : print(f"---> Please wait while `<{zip_file+'.rar'}=" "main.bagciv.data.csv>`is unraring...") # rarfile.RarFile.(os.path.join(zipdir, zip_file +'.rar')) _logger.info("Extract {os.path.basename(CSV_FILENAME)!r}" " from {zip_file + '.rar'} file.") #--------------------------work on the rar extraction since ----------- # rar can not extract larger file excceed fo 50 # we are working to find the way to automatically decompressed rarfile. # and keep it to the local directory. print(rarmsg[i]) decompress_succeed =False try : with rarfile.RarFile(os.path.join(zipdir, zip_file +'.rar'))as rar_ref: rar_ref.extract(member=member_to_extract, path = zipdir) except : print("--> Failed the read enough data: req=33345 got>=52 files.") import warnings warnings.warn("Minimal Rar version needed for decompressing. " "As (major*10 + minor), so 2.9 is 29.RAR3: 10, 20, 29" "RAR5 does not have such field in archive, it’s simply" " set to 50." ) continue else : decompress_succeed=True if decompress_succeed: break if not decompress_succeed: print(f"---> Please unrar the <{zip_file!r}> with an appropriate !" " software. Failed to read enough data more than 50. ") raise ExtractionError ( "Failed the read enough data: req=33345 got>=52 files.") # rarfile.RarFile().extract(member=raw_location, path = zipdir) #---------------------------------------------------------------------- if decompress_succeed: print(f"---> Unraring the `{zip_file}=main.bagciv.data.csv`" "was successfully done.")
[docs] def fetchSingleZIPData( zip_file:str, zipdir:str, **zip_kws )-> None: """ Find only the archived zip file and save to the current directory. Parameters ----------- zip_file: str or Path-like obj Name of archived zip file zipdir : str or Path-like obj Directory where `zip_file` is located. Examples -------- >>> from watex.datasets.property import fetchSingleZIPData >>> fetchSingleZIPData(zip_file= zip_file, zipdir = zipdir, file_to_extract='__tar.tgz_files__/___fmain.bagciv.data.csv', savepath=save_zip_file, rename_outfile='main.bagciv.data.csv') """ is_zip_file = os.path.isfile(zip_file) if not is_zip_file: raise FileNotFoundError(f"{os.path.basename(zip_file)!r} is wrong file!" " Please provide the right file.") #ZipFile.extractall(path=None, members=None, pwd=None) # path: location where zip file needs to be extracted; if not # provided, it will extract the contents in the current # directory. # members: list of files to be extracted. It will extract all # the files in the zip if this argument is not provided. # pwd: If the zip file is encrypted, then pass the password in # this argument default is None. # remove the first '/'--> if not os.path.isfile(zip_file): zip_file=os.path.join(zipdir, zip_file) with zipfile.ZipFile(zip_file,'r') as zip_ref: # try : # extract in the current directory fetchedfile = retrieveZIPmember(zip_ref, **zip_kws )
# except : # raise ExtractionError ( # f"Unable to retreive file from zip {zip_file!r}") # print(f"---> Dataset={os.path.basename(fetchedfile)!r} " # "was successfully retreived.")
[docs] def retrieveZIPmember( zipObj, *, file_to_extract:str ='__tar.tgz_files__/___fmain.bagciv.data.csv', savepath: Optional[str] =None, rename_outfile: str ='main.bagciv.data.csv' ) -> str: """ Retreive member from zip and collapse the extracted directory by " "saving into a new directory Parameters ----------- ZipObj: Obj zip Reference zip object file_to_extract:str or Path-Like Object File to extract existing in zip archived. It should be a name list of archived file. savepath: str or Path-Like obj Destination path after fetching the single data from zip archive. rename_outfile:str or Path-Like obj Rename the `file_to_extract` if think it necessary. Returns -------- The name of path retreived. If file is renamed than shoud take it new names. """ if savepath is None: savepath =os.getcwd() if not os.path.isdir(savepath) : os.makedirs(savepath) if file_to_extract in zipObj.namelist(): member2extract=zipObj.getinfo(file_to_extract) zipObj.extractall(members = [member2extract]) shutil.move (os.path.join(os.getcwd(), file_to_extract), savepath) # destroy the previous path if savepath != os.path.join(os.getcwd(), os.path.dirname(file_to_extract)): # detroy the root if only if the savepath is different # from the raw then extract member into the directory created. shutil.rmtree(os.path.join(os.getcwd(), os.path.dirname(file_to_extract))) if rename_outfile is not None: os.rename(os.path.join(savepath, os.path.basename (file_to_extract)), os.path.join(savepath, rename_outfile)) elif rename_outfile is None: rename_outfile= os.path.basename(file_to_extract) print(f"---> {rename_outfile!r} was successfully decompressed" f" and saved to {savepath!r}" ) return rename_outfile
[docs] def move_file(filename:str , directory:str )-> str: if os.path.isfile(filename): shutil.move(filename, directory)