Source code for climada.util.files_handler
"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Functions to deal with files.
"""
__all__ = [
'to_list',
'get_file_names',
]
import glob
import logging
import math
import urllib
from pathlib import Path
import requests
from tqdm import tqdm
from climada.util.config import CONFIG
LOGGER = logging.getLogger(__name__)
class DownloadProgressBar(tqdm):
"""Class to use progress bar during dowloading"""
def update_to(self, blocks=1, bsize=1, tsize=None):
"""Update progress bar
Parameters
----------
blocks : int, optional
Number of blocks transferred so far [default: 1].
bsize : int, optional
Size of each block (in tqdm units) [default: 1].
tsize : int, optional
Total size (in tqdm units). If [default: None]
remains unchanged.
"""
if tsize is not None:
self.total = tsize
self.update(blocks * bsize - self.n)
def download_file(url, download_dir=None, overwrite=True):
"""Download file from url to given target folder and provide full path of the downloaded file.
Parameters
----------
url : str
url containing data to download
download_dir : Path or str, optional
the parent directory of the eventually downloaded file
default: local_data.save_dir as defined in climada.conf
overwrite : bool, optional
whether or not an already existing file at the target location should be overwritten,
by default True
Returns
-------
str
the full path to the eventually downloaded file
"""
file_name = url.split('/')[-1]
if file_name.strip() == '':
raise ValueError(f"cannot download {url} as a file")
download_path = CONFIG.local_data.save_dir.dir() if download_dir is None else Path(download_dir)
file_path = download_path.absolute().joinpath(file_name)
if file_path.exists():
if not file_path.is_file() or not overwrite:
raise FileExistsError(f"cannot download to {file_path}")
try:
req_file = requests.get(url, stream=True)
except IOError as ioe:
raise type(ioe)('Check URL and internet connection: ' + str(ioe)) from ioe
if req_file.status_code < 200 or req_file.status_code > 299:
raise ValueError(f'Error loading page {url}\n'
f' Status: {req_file.status_code}\n'
f' Content: {req_file.content}')
total_size = int(req_file.headers.get('content-length', 0))
block_size = 1024
LOGGER.info('Downloading %s to file %s', url, file_path)
with file_path.open('wb') as file:
for data in tqdm(req_file.iter_content(block_size),
total=math.ceil(total_size // block_size),
unit='KB', unit_scale=True):
file.write(data)
return str(file_path)
def download_ftp(url, file_name):
"""Download file from ftp in current folder.
Parameters
----------
url : str
url containing data to download
file_name : str
name of the file to dowload
Raises
------
ValueError
"""
LOGGER.info('Downloading file %s', file_name)
try:
with DownloadProgressBar(unit='B', unit_scale=True, miniters=1,
desc=url.split('/')[-1]) as prog_bar:
urllib.request.urlretrieve(url, file_name, reporthook=prog_bar.update_to)
except Exception as exc:
raise ValueError(
f'{exc.__class__} - "{exc}": failed to retrieve {url} into {file_name}'
) from exc
[docs]
def to_list(num_exp, values, val_name):
"""Check size and transform to list if necessary. If size is one, build
a list with num_exp repeated values.
Parameters
----------
num_exp : int
expected number of list elements
values : object or list(object)
values to check and transform
val_name : str
name of the variable values
Returns
-------
list
"""
if not isinstance(values, list):
return num_exp * [values]
if len(values) == num_exp:
return values
if len(values) == 1:
return num_exp * [values[0]]
raise ValueError(f'Provide one or {num_exp} {val_name}.')
[docs]
def get_file_names(file_name):
"""Return list of files contained. Supports globbing.
Parameters
----------
file_name : str or list(str)
Either a single string or a list of
strings that are either
- a file path
- or the path of the folder containing the files
- or a globbing pattern.
Returns
-------
list(str)
"""
pattern_list = file_name if isinstance(file_name, list) else [file_name]
pattern_list = [Path(pattern) for pattern in pattern_list]
file_list = []
for pattern in pattern_list:
if pattern.is_file():
file_list.append(str(pattern))
elif pattern.is_dir():
extension = [str(fil) for fil in pattern.iterdir() if fil.is_file()]
if not extension:
raise ValueError(f'there are no files in directory "{pattern}"')
file_list.extend(extension)
else: # glob pattern
extension = [fil for fil in glob.glob(str(pattern)) if Path(fil).is_file()]
if not extension:
raise ValueError(f'cannot find the file "{pattern}"')
file_list.extend(extension)
return file_list
def get_extension(file_name):
"""Get file without extension and its extension (e.g. ".nc", ".grd.gz").
Parameters
----------
file_name : str
file name (with or without path)
Returns
-------
str, str
"""
file_path = Path(file_name)
cuts = file_path.name.split('.')
return str(file_path.parent.joinpath(cuts[0])), "".join(file_path.suffixes)