Source code for climada.util.files_handler
"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Functions to deal with files.
"""
__all__ = [
'to_list',
'get_file_names',
]
import glob
import logging
import math
import urllib
from pathlib import Path
import requests
from tqdm import tqdm
from climada.util.config import CONFIG
LOGGER = logging.getLogger(__name__)
class DownloadProgressBar(tqdm):
"""Class to use progress bar during dowloading"""
def update_to(self, blocks=1, bsize=1, tsize=None):
"""Update progress bar
Parameters:
blocks (int, otional): Number of blocks transferred so far [default: 1].
bsize (int, otional): Size of each block (in tqdm units) [default: 1].
tsize (int, otional): Total size (in tqdm units). If [default: None]
remains unchanged.
"""
if tsize is not None:
self.total = tsize
self.update(blocks * bsize - self.n)
def download_file(url, download_dir=None, overwrite=True):
"""Download file from url to given target folder and provide full path of the downloaded file.
Parameters
----------
url : str
url containing data to download
download_dir : Path or str, optional
the parent directory of the eventually downloaded file
overwrite : bool, optional
whether or not an alredy existing file at the target location should be overwritten,
by default True
Returns
-------
str
the full path to the eventually downloaded file
"""
file_name = url.split('/')[-1]
if file_name.strip() == '':
raise ValueError(f"cannot download {url} as a file")
download_path = CONFIG.local_data.save_dir.dir() if download_dir is None else Path(download_dir)
file_path = download_path.absolute().joinpath(file_name)
if file_path.exists():
if not file_path.is_file() or not overwrite:
raise FileExistsError(f"cannot download to {file_path}")
try:
req_file = requests.get(url, stream=True)
except IOError as ioe:
LOGGER.error('Connection error: check url and internet connection.')
raise ioe
if req_file.status_code < 200 or req_file.status_code > 299:
LOGGER.error('Error loading page %s.', url)
raise ValueError(f'Error loading page {url}\n'
+ f' Status: {req_file.status_code}\n'
+ f' Content: {req_file.content}')
total_size = int(req_file.headers.get('content-length', 0))
block_size = 1024
LOGGER.info('Downloading %s to file %s', url, file_path)
with file_path.open('wb') as file:
for data in tqdm(req_file.iter_content(block_size),
total=math.ceil(total_size // block_size),
unit='KB', unit_scale=True):
file.write(data)
return str(file_path)
def download_ftp(url, file_name):
"""Download file from ftp in current folder.
Parameters:
url (str): url containing data to download
file_name (str): name of the file to dowload
Raises:
ValueError
"""
LOGGER.info('Downloading file %s', file_name)
try:
with DownloadProgressBar(unit='B', unit_scale=True, miniters=1,
desc=url.split('/')[-1]) as prog_bar:
urllib.request.urlretrieve(url, file_name, reporthook=prog_bar.update_to)
except Exception as exc:
raise ValueError(
f'{exc.__class__} - "{exc}": failed to retrieve {url} into {file_name}'
) from exc
[docs]def to_list(num_exp, values, val_name):
"""Check size and transform to list if necessary. If size is one, build
a list with num_exp repeated values.
Parameters:
num_exp (int): number of expect list elements
values (object or list(object)): values to check and transform
val_name (str): name of the variable values
Returns:
list
"""
val_list = list()
if isinstance(values, list):
if len(values) == num_exp:
val_list = values
elif len(values) == 1:
val_list = list()
val_list += num_exp * [values[0]]
else:
logger = logging.getLogger(__name__)
logger.error('Provide one or %s %s.', num_exp, val_name)
else:
val_list += num_exp * [values]
return val_list
[docs]def get_file_names(file_name):
"""Return list of files contained. Supports globbing.
Parameters:
file_name (str or list(str)): Either a single string or a list of
strings that are either
- a file path
- or the path of the folder containing the files
- or a globbing pattern.
Returns:
list(str)
"""
file_list = list()
for pattern in file_name if isinstance(file_name, list) else [file_name]:
try:
if Path(pattern).is_file():
file_list.append(str(pattern))
elif Path(pattern).is_dir():
file_list.extend([
str(fil) for fil in Path(pattern).iterdir() if fil.is_file()
])
else: # glob pattern
file_list.extend([
str(Path(fil)) for fil in glob.glob(pattern)
])
except OSError:
file_list.extend([
str(Path(fil)) for fil in glob.glob(pattern)
])
return file_list
def get_extension(file_name):
"""Get file without extension and its extension (e.g. ".nc", ".grd.gz").
Parameters:
file_name (str): file name (with or without path)
Returns:
str, str
"""
file_path = Path(file_name)
cuts = file_path.name.split('.')
return str(file_path.parent.joinpath(cuts[0])), "".join(file_path.suffixes)