Source code for climada.util.api_client

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU Lesser General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Data API client
"""
from dataclasses import dataclass
from datetime import datetime
import hashlib
import json
import logging
from os.path import commonprefix
from pathlib import Path
from urllib.parse import quote, unquote, urlsplit, urlunsplit
import time

import pandas as pd
from peewee import CharField, DateTimeField, IntegrityError, Model, SqliteDatabase
import requests
import pycountry

from climada import CONFIG
from climada.entity import Exposures
from climada.hazard import Hazard, Centroids
from climada.util.constants import SYSTEM_DIR

LOGGER = logging.getLogger(__name__)

DB = SqliteDatabase(Path(CONFIG.data_api.cache_db.str()).expanduser())

HAZ_TYPES = [ht.str() for ht in CONFIG.data_api.supported_hazard_types.list()]
EXP_TYPES = [et.str() for et in CONFIG.data_api.supported_exposures_types.list()]


[docs] class Download(Model): """Database entry keeping track of downloaded files from the CLIMADA data API""" url = CharField() path = CharField(unique=True) startdownload = DateTimeField() enddownload = DateTimeField(null=True) class Meta: """SQL database and table definition.""" database = DB
[docs] class Failed(Exception): """The download failed for some reason."""
DB.connect() DB.create_tables([Download])
[docs] @dataclass class FileInfo: """file data from CLIMADA data API.""" uuid: str url: str file_name: str file_format: str file_size: int check_sum: str
[docs] @dataclass class DataTypeInfo: """data type meta data from CLIMADA data API.""" data_type: str data_type_group: str status: str description: str properties: list # of dict key_reference: list = None version_notes: list = None
[docs] @dataclass class DataTypeShortInfo: """data type name and group from CLIMADA data API.""" data_type: str data_type_group: str
[docs] @dataclass class DatasetInfo: """dataset data from CLIMADA data API.""" uuid: str data_type: DataTypeShortInfo name: str version: str status: str properties: dict files: list # of FileInfo doi: str description: str license: str activation_date: str expiration_date: str
[docs] @staticmethod def from_json(jsono): """creates a DatasetInfo object from the json object returned by the CLIMADA data api server. Parameters ---------- jsono : dict Returns ------- DatasetInfo """ dataset = DatasetInfo(**jsono) dataset.data_type = DataTypeShortInfo( data_type=dataset.data_type["data_type"], data_type_group=dataset.data_type["data_type_group"], ) dataset.files = [FileInfo(uuid=dataset.uuid, **filo) for filo in dataset.files] return dataset
[docs] def checksize(local_path, fileinfo): """Checks sanity of downloaded file simply by comparing actual and registered size. Parameters ---------- local_path : Path the downloaded file filinfo : FileInfo file information from CLIMADA data API Raises ------ Download.Failed if the file is not what it's supposed to be """ if not local_path.is_file(): raise Download.Failed(f"{str(local_path)} is not a file") if local_path.stat().st_size != fileinfo.file_size: raise Download.Failed( f"{str(local_path)} has the wrong size:" f"{local_path.stat().st_size} instead of {fileinfo.file_size}" )
[docs] def checkhash(local_path, fileinfo): """Checks sanity of downloaded file by comparing actual and registered check sum. Parameters ---------- local_path : Path the downloaded file filinfo : FileInfo file information from CLIMADA data API Raises ------ Download.Failed if the file is not what it's supposed to be """ raise NotImplementedError("sanity check by hash sum needs to be implemented yet")
[docs] class Cacher: """Utility class handling cached results from http requests, to enable the API Client working in offline mode. """
[docs] def __init__(self, cache_enabled): """Constructor of Cacher. Parameters ---------- cache_enabled : bool, None Default: None, in this case the value is taken from CONFIG.data_api.cache_enabled. """ self.enabled = ( CONFIG.data_api.cache_enabled.bool() if cache_enabled is None else cache_enabled ) self.cachedir = CONFIG.data_api.cache_dir.dir() if self.enabled else None
@staticmethod def _make_key(*args, **kwargs): as_text = "\t".join( [str(a) for a in args] + [f"{k}={kwargs[k]}" for k in sorted(kwargs.keys())] ) md5h = hashlib.md5() md5h.update(as_text.encode()) return md5h.hexdigest()
[docs] def store(self, result, *args, **kwargs): """stores the result from a API call to a local file. The name of the file is the md5 hash of a string created from the call's arguments, the content of the file is the call's result in json format. Parameters ---------- result : dict will be written in json format to the cached result file *args : list of str **kwargs : list of dict of (str,str) """ _key = Cacher._make_key(*args, **kwargs) try: with Path(self.cachedir, _key).open("w", encoding="utf-8") as flp: json.dump(result, flp) except (OSError, ValueError): pass
[docs] def fetch(self, *args, **kwargs): """reloads the result from a API call from a local file, created by the corresponding call of `self.store`. If no call with exactly the same arguments has been made in the past, the result is None. Parameters ---------- *args : list of str **kwargs : list of dict of (str,str) Returns ------- dict or None """ _key = Cacher._make_key(*args, **kwargs) try: with Path(self.cachedir, _key).open(encoding="utf-8") as flp: return json.load(flp) except (OSError, ValueError): return None
[docs] class Client: """Python wrapper around REST calls to the CLIMADA data API server.""" MAX_WAITING_PERIOD = 6 UNLIMITED = 100000 DOWNLOAD_TIMEOUT = 3600 QUERY_TIMEOUT = 300
[docs] class AmbiguousResult(Exception): """Custom Exception for Non-Unique Query Result"""
[docs] class NoResult(Exception): """Custom Exception for No Query Result"""
[docs] class NoConnection(Exception): """To be raised if there is no internet connection and no cached result."""
def _online(self) -> bool: """Check if this client can connect to the target URL""" # Use just the base location parse_result = urlsplit(self.url) query_url = urlunsplit((parse_result.scheme, parse_result.netloc, "", "", "")) try: # NOTE: 'timeout' might not work as intended, depending on OS and network status return requests.head(query_url, timeout=1).status_code == 200 except (requests.ConnectionError, requests.Timeout): return False
[docs] def __init__(self, cache_enabled=None): """Constructor of Client. Data API host and chunk_size (for download) are configurable values. Default values are 'climada.ethz.ch' and 8096 respectively. Parameters ---------- cache_enabled : bool, optional This flag controls whether the api calls of this client are going to be cached to the local file system (location defined by CONFIG.data_api.cache_dir). If set to true, the client can reload the results from the cache in case there is no internet connection and thus work in offline mode. Default: None, in this case the value is taken from CONFIG.data_api.cache_enabled. """ self.headers = {"accept": "application/json"} self.url = CONFIG.data_api.url.str().rstrip("/") self.chunk_size = CONFIG.data_api.chunk_size.int() self.cache = Cacher(cache_enabled) self.online = self._online()
def _request_200(self, url, params=None): """Helper method, triaging successfull and failing requests. Returns ------- dict loaded from the json object of a successful request. Raises ------ NoResult if the response status code is different from 200 """ # pylint: disable=no-else-return if params is None: params = dict() if self.online: page = requests.get(url, params=params, timeout=Client.QUERY_TIMEOUT) if page.status_code != 200: raise Client.NoResult(page.content.decode()) result = json.loads(page.content.decode()) if self.cache.enabled: self.cache.store(result, url, **params) return result else: # try to restore previous results from an identical request if not self.cache.enabled: raise Client.NoConnection( "there is no internet connection and the client does" " not cache results." ) cached_result = self.cache.fetch(url, **params) if not cached_result: raise Client.NoConnection( "there is no internet connection and the client has not" " found any cached result for this request." ) LOGGER.warning( "there is no internet connection but the client has stored the results" " of this very request sometime in the past." ) return cached_result @staticmethod def _divide_straight_from_multi(properties): straights, multis = dict(), dict() for k, _v in properties.items(): if _v is None: straights[k] = "" elif isinstance(_v, str): straights[k] = _v elif isinstance(_v, list): multis[k] = _v else: raise ValueError( "the value of a property must be a string or a list of strings" ) return straights, multis @staticmethod def _filter_datasets(datasets, multi_props): pdf = pd.DataFrame([ds.properties for ds in datasets]) for prop, selection in multi_props.items(): pdf = pdf[pdf[prop].isin(selection)] return [datasets[i] for i in pdf.index]
[docs] def list_dataset_infos( self, data_type=None, name=None, version=None, properties=None, status="active" ): """Find all datasets matching the given parameters. Parameters ---------- data_type : str, optional data_type of the dataset, e.g., 'litpop' or 'draught' name : str, optional the name of the dataset version : str, optional the version of the dataset, 'any' for all versions, 'newest' or None for the newest version meeting the requirements Default: None properties : dict, optional search parameters for dataset properties, by default None any property has a string for key and can be a string or a list of strings for value status : str, optional valid values are 'preliminary', 'active', 'expired', 'test_dataset' and None by default 'active' Returns ------- list of DatasetInfo """ url = f"{self.url}/dataset/" params = { "data_type": data_type, "name": name, "version": version, "status": "" if status is None else status, "limit": Client.UNLIMITED, } if properties: straight_props, multi_props = self._divide_straight_from_multi(properties) else: straight_props, multi_props = None, None if straight_props: params.update(straight_props) datasets = [ DatasetInfo.from_json(ds) for ds in self._request_200(url, params=params) ] if datasets and multi_props: return self._filter_datasets(datasets, multi_props) return datasets
[docs] def get_dataset_info( self, data_type=None, name=None, version=None, properties=None, status="active" ): """Find the one dataset that matches the given parameters. Parameters ---------- data_type : str, optional data_type of the dataset, e.g., 'litpop' or 'draught' name : str, optional the name of the dataset version : str, optional the version of the dataset Default: newest version meeting the requirements properties : dict, optional search parameters for dataset properties, by default None any property has a string for key and can be a string or a list of strings for value status : str, optional valid values are 'preliminary', 'active', 'expired', 'test_dataset', None by default 'active' Returns ------- DatasetInfo Raises ------ AmbiguousResult when there is more than one dataset matching the search parameters NoResult when there is no dataset matching the search parameters """ jarr = self.list_dataset_infos( data_type=data_type, name=name, version=version, properties=properties, status=status, ) if len(jarr) > 1: shown = 10 endofmessage = "" if len(jarr) <= shown else f"\nand {len(jarr)-shown} more" datasetlist = ",\n* ".join( str(jarr[i]) for i in range(min(shown, len(jarr))) ) raise Client.AmbiguousResult( f"there are {len(jarr)} datasets meeting the requirements:" f"\n* {datasetlist}{endofmessage}." ) if len(jarr) < 1: data_info = self.list_dataset_infos(data_type) properties = self.get_property_values(data_info) raise Client.NoResult( "there is no dataset meeting the requirements, the following" f" property values are available for {data_type}: {properties}" ) return jarr[0]
[docs] def get_dataset_info_by_uuid(self, uuid): """Returns the data from 'https://climada.ethz.ch/data-api/v1/dataset/{uuid}' as DatasetInfo object. Parameters ---------- uuid : str the universal unique identifier of the dataset Returns ------- DatasetInfo Raises ------ NoResult if the uuid is not valid """ url = f"{self.url}/dataset/{uuid}/" return DatasetInfo.from_json(self._request_200(url))
[docs] def list_data_type_infos(self, data_type_group=None): """Returns all data types from the climada data API belonging to a given data type group. Parameters ---------- data_type_group : str, optional name of the data type group, by default None Returns ------- list of DataTypeInfo """ url = f"{self.url}/data_type/" params = {"data_type_group": data_type_group} if data_type_group else {} return [DataTypeInfo(**jobj) for jobj in self._request_200(url, params=params)]
[docs] def get_data_type_info(self, data_type): """Returns the metadata of the data type with the given name from the climada data API. Parameters ---------- data_type : str data type name Returns ------- DataTypeInfo Raises ------ NoResult if there is no such data type registered """ url = f"{self.url}/data_type/{quote(data_type)}/" return DataTypeInfo(**self._request_200(url))
def _download(self, url, path, replace=False): """Downloads a file from the given url to a specified location. Parameters ---------- url : str the link to the file to be downloaded path : Path download path, if it's a directory the original file name is kept replace : bool, optional flag to indicate whether a present file with the same name should be replaced Returns ------- Path Path to the downloaded file Raises ------ FileExistsError in case there is already a file present at the given location and replace is False """ if path.is_dir(): path /= unquote(url.split("/")[-1]) if path.is_file() and not replace: raise FileExistsError(path) with requests.get(url, stream=True, timeout=Client.DOWNLOAD_TIMEOUT) as stream: stream.raise_for_status() with open(path, "wb") as dump: for chunk in stream.iter_content(chunk_size=self.chunk_size): dump.write(chunk) return path def _tracked_download(self, remote_url, local_path): if local_path.is_dir(): raise ValueError( "tracked download requires a path to a file not a directory" ) path_as_str = str(local_path.absolute()) try: dlf = Download.create( url=remote_url, path=path_as_str, startdownload=datetime.utcnow() ) except IntegrityError as ierr: dlf = Download.get( Download.path == path_as_str ) # path is the table's one unique column if not Path(path_as_str).is_file(): # in case the file has been removed dlf.delete_instance() # delete entry from database return self._tracked_download(remote_url, local_path) # and try again if dlf.url != remote_url: raise RuntimeError( f"this file ({path_as_str}) has been downloaded from another" f" url ({dlf.url}), possibly because it belongs to a dataset with" " a recent version update. Please remove the file or purge the" " entry from data base before trying again" ) from ierr return dlf try: self._download(url=remote_url, path=local_path, replace=True) dlf.enddownload = datetime.utcnow() dlf.save() except Exception: dlf.delete_instance() raise return Download.get(Download.path == path_as_str) def _download_file(self, local_path, fileinfo, check=checksize, retries=3): """Download a file if it is not already present at the target destination. Parameters ---------- local_path : Path target destination, if it is a directory the original filename (fileinfo.filen_name) is kept fileinfo : FileInfo file object as retrieved from the data api check : function, optional how to check download success, by default checksize retries : int, optional how many times one should retry in case of failure, by default 3 Returns ------- Path the path to the downloaded file Raises ------ Exception when number of retries was exceeded or when a download is already running """ try: if local_path.is_dir(): local_path /= fileinfo.file_name downloaded = self._tracked_download( remote_url=fileinfo.url, local_path=local_path ) if not downloaded.enddownload: raise Download.Failed( f"A download of {fileinfo.url} via the API Client has been" " requested before. Either it is still in progress or the" " process got interrupted. In the former case just wait" " until the download has finished and try again, in the" f" latter run `Client.purge_cache_db(Path('{local_path}'))`" " from Python. If unsure, check your internet connection," " wait for as long as it takes to download a file of size" f" {fileinfo.file_size} and try again. If the problem" " persists, purge the cache db with said call." ) try: check(local_path, fileinfo) except Download.Failed as dlf: local_path.unlink(missing_ok=True) self.purge_cache_db(local_path) raise dlf return local_path except Download.Failed as dle: if retries < 1: raise dle LOGGER.warning("Download failed: %s, retrying...", dle) time.sleep(Client.MAX_WAITING_PERIOD / retries) return self._download_file( local_path=local_path, fileinfo=fileinfo, check=check, retries=retries - 1, )
[docs] def download_dataset(self, dataset, target_dir=SYSTEM_DIR, organize_path=True): """Download all files from a given dataset to a given directory. Parameters ---------- dataset : DatasetInfo the dataset target_dir : Path, optional target directory for download, by default `climada.util.constants.SYSTEM_DIR` organize_path: bool, optional if set to True the files will end up in subdirectories of target_dir: [target_dir]/[data_type_group]/[data_type]/[name]/[version] by default True Returns ------- download_dir : Path the path to the directory containing the downloaded files, will be created if organize_path is True downloaded_files : list of Path the downloaded files themselves Raises ------ Exception when one of the files cannot be downloaded """ if not target_dir.is_dir(): raise ValueError(f"{target_dir} is not a directory") if organize_path: target_dir = self._organize_path(dataset, target_dir) return target_dir, [ self._download_file(local_path=target_dir, fileinfo=dsfile) for dsfile in dataset.files ]
@staticmethod def _organize_path(dataset, target_dir): if dataset.data_type.data_type_group: target_dir /= dataset.data_type.data_type_group if dataset.data_type.data_type_group != dataset.data_type.data_type: target_dir /= dataset.data_type.data_type target_dir /= dataset.name if dataset.version: target_dir /= dataset.version target_dir.mkdir(exist_ok=True, parents=True) return target_dir
[docs] @staticmethod def purge_cache_db(local_path): """Removes entry from the sqlite database that keeps track of files downloaded by `cached_download`. This may be necessary in case a previous attempt has failed in an uncontroled way (power outage or the like). Parameters ---------- local_path : Path target destination fileinfo : FileInfo file object as retrieved from the data api """ dlf = Download.get(Download.path == str(local_path.absolute())) dlf.delete_instance()
@staticmethod def _multi_version(datasets): ddf = pd.DataFrame(datasets) gdf = ddf.groupby("name").agg({"version": "nunique"}) return list(gdf[gdf.version > 1].index)
[docs] def get_hazard( self, hazard_type, name=None, version=None, properties=None, status="active", dump_dir=SYSTEM_DIR, ): """Queries the data api for hazard datasets of the given type, downloads associated hdf5 files and turns them into a climada.hazard.Hazard object. Parameters ---------- hazard_type : str Type of climada hazard. name : str, optional the name of the dataset version : str, optional the version of the dataset Default: newest version meeting the requirements properties : dict, optional search parameters for dataset properties, by default None any property has a string for key and can be a string or a list of strings for value status : str, optional valid values are 'preliminary', 'active', 'expired', 'test_dataset', None by default 'active' dump_dir : str, optional Directory where the files should be downoladed. Default: SYSTEM_DIR If the directory is the SYSTEM_DIR (as configured in climada.conf, i.g. ~/climada/data), the eventual target directory is organized into dump_dir > hazard_type > dataset name > version Returns ------- climada.hazard.Hazard The combined hazard object """ if not hazard_type in HAZ_TYPES: raise ValueError( "Valid hazard types are a subset of CLIMADA hazard types." f" Currently these types are supported: {HAZ_TYPES}" ) dataset = self.get_dataset_info( data_type=hazard_type, name=name, version=version, properties=properties, status=status, ) return self.to_hazard(dataset, dump_dir)
[docs] def to_hazard(self, dataset, dump_dir=SYSTEM_DIR): """Downloads hdf5 files belonging to the given datasets reads them into Hazards and concatenates them into a single climada.Hazard object. Parameters ---------- dataset : DatasetInfo Dataset to download and read into climada.Hazard object. dump_dir : str, optional Directory where the files should be downoladed. Default: SYSTEM_DIR (as configured in climada.conf, i.g. ~/climada/data). If the directory is the SYSTEM_DIR, the eventual target directory is organized into dump_dir > hazard_type > dataset name > version Returns ------- climada.hazard.Hazard The combined hazard object """ target_dir = ( self._organize_path(dataset, dump_dir) if dump_dir == SYSTEM_DIR else dump_dir ) hazard_list = [ Hazard.from_hdf5(self._download_file(target_dir, dsf)) for dsf in dataset.files if dsf.file_format == "hdf5" ] if not hazard_list: raise ValueError("no hdf5 files found in dataset") if len(hazard_list) == 1: return hazard_list[0] hazard_concat = Hazard() hazard_concat = hazard_concat.concat(hazard_list) hazard_concat.sanitize_event_ids() hazard_concat.check() return hazard_concat
[docs] def get_exposures( self, exposures_type, name=None, version=None, properties=None, status="active", dump_dir=SYSTEM_DIR, ): """Queries the data api for exposures datasets of the given type, downloads associated hdf5 files and turns them into a climada.entity.exposures.Exposures object. Parameters ---------- exposures_type : str Type of climada exposures. name : str, optional the name of the dataset version : str, optional the version of the dataset Default: newest version meeting the requirements properties : dict, optional search parameters for dataset properties, by default None any property has a string for key and can be a string or a list of strings for value status : str, optional valid values are 'preliminary', 'active', 'expired', 'test_dataset', None by default 'active' dump_dir : str, optional Directory where the files should be downoladed. Default: SYSTEM_DIR If the directory is the SYSTEM_DIR, the eventual target directory is organized into dump_dir > hazard_type > dataset name > version Returns ------- climada.entity.exposures.Exposures The combined exposures object """ if not exposures_type in EXP_TYPES: raise ValueError( "Valid exposures types are a subset of CLIMADA exposures types." f" Currently these types are supported: {EXP_TYPES}" ) dataset = self.get_dataset_info( data_type=exposures_type, name=name, version=version, properties=properties, status=status, ) return self.to_exposures(dataset, dump_dir)
[docs] def to_exposures(self, dataset, dump_dir=SYSTEM_DIR): """Downloads hdf5 files belonging to the given datasets reads them into Exposures and concatenates them into a single climada.Exposures object. Parameters ---------- dataset : DatasetInfo Dataset to download and read into climada.Exposures objects. dump_dir : str, optional Directory where the files should be downoladed. Default: SYSTEM_DIR (as configured in climada.conf, i.g. ~/climada/data). If the directory is the SYSTEM_DIR, the eventual target directory is organized into dump_dir > exposures_type > dataset name > version Returns ------- climada.entity.exposures.Exposures The combined exposures object """ target_dir = ( self._organize_path(dataset, dump_dir) if dump_dir == SYSTEM_DIR else dump_dir ) exposures_list = [ Exposures.from_hdf5(self._download_file(target_dir, dsf)) for dsf in dataset.files if dsf.file_format == "hdf5" ] if not exposures_list: raise ValueError("no hdf5 files found in dataset") if len(exposures_list) == 1: return exposures_list[0] exposures_concat = Exposures() exposures_concat = exposures_concat.concat(exposures_list) exposures_concat.check() return exposures_concat
[docs] def get_litpop( self, country=None, exponents=(1, 1), version=None, dump_dir=SYSTEM_DIR ): """Get a LitPop ``Exposures`` instance on a 150arcsec grid with the default parameters: exponents = (1,1) and fin_mode = 'pc'. Parameters ---------- country : str, optional Country name or iso3 codes for which to create the LitPop object. For creating a LitPop object over multiple countries, use ``get_litpop`` individually and concatenate using ``LitPop.concat``, see Examples. If country is None a global LitPop instance is created. Defaut is None. exponents : tuple of two integers, optional Defining power with which lit (nightlights) and pop (gpw) go into LitPop. To get nightlights^3 without population count: (3, 0). To use population count alone: (0, 1). Default: (1, 1) version : str, optional the version of the dataset Default: newest version meeting the requirements dump_dir : str directory where the files should be downoladed. Default: SYSTEM_DIR Returns ------- climada.entity.exposures.Exposures default litpop Exposures object Examples -------- Combined default LitPop object for Austria and Switzerland: >>> client = Client() >>> litpop_aut = client.get_litpop("AUT") >>> litpop_che = client.get_litpop("CHE") >>> litpop_comb = LitPop.concat([litpop_aut, litpop_che]) """ properties = { "exponents": "".join(["(", str(exponents[0]), ",", str(exponents[1]), ")"]) } if country is None: properties["spatial_coverage"] = "global" elif isinstance(country, str): properties["country_name"] = pycountry.countries.lookup(country).name elif isinstance(country, list): if len(set(country)) > 1: raise ValueError( "``get_litpop`` can only query single countries. Download the" " data for multiple countries individually and concatenate the" " objects using ``LitPop.concat``" ) properties["country_name"] = [ pycountry.countries.lookup(c).name for c in country ] else: raise ValueError("country must be string") return self.get_exposures( exposures_type="litpop", properties=properties, version=version, dump_dir=dump_dir, )
[docs] def get_centroids( self, res_arcsec_land=150, res_arcsec_ocean=1800, extent=(-180, 180, -60, 60), country=None, version=None, dump_dir=SYSTEM_DIR, ): """Get centroids from teh API Parameters ---------- res_land_arcsec : int resolution for land centroids in arcsec. Default is 150 res_ocean_arcsec : int resolution for ocean centroids in arcsec. Default is 1800 country : str country name, numeric code or iso code based on pycountry. Default is None (global). extent : tuple Format (min_lon, max_lon, min_lat, max_lat) tuple. If min_lon > lon_max, the extend crosses the antimeridian and is [lon_max, 180] + [-180, lon_min] Borders are inclusive. Default is (-180, 180, -60, 60). version : str, optional the version of the dataset Default: newest version meeting the requirements dump_dir : str directory where the files should be downoladed. Default: SYSTEM_DIR Returns ------- climada.hazard.centroids.Centroids Centroids from the api """ properties = { "res_arcsec_land": str(res_arcsec_land), "res_arcsec_ocean": str(res_arcsec_ocean), "extent": "(-180, 180, -90, 90)", } dataset = self.get_dataset_info( "centroids", version=version, properties=properties ) target_dir = ( self._organize_path(dataset, dump_dir) if dump_dir == SYSTEM_DIR else dump_dir ) centroids = Centroids.from_hdf5( self._download_file(target_dir, dataset.files[0]) ) if country: reg_id = pycountry.countries.lookup(country).numeric centroids = centroids.select(reg_id=int(reg_id), extent=extent) if extent: centroids = centroids.select(extent=extent) return centroids
[docs] @staticmethod def get_property_values( dataset_infos, known_property_values=None, exclude_properties=None ): """Returns a dictionnary of possible values for properties of a data type, optionally given known property values. Parameters ---------- dataset_infos : list of DataSetInfo as returned by list_dataset_infos known_properties_value : dict, optional dict {'property':'value1, 'property2':'value2'}, to provide only a subset of property values that can be combined with the given properties. exclude_properties: list of str, optional properties in this list will be excluded from the resulting dictionary, e.g., because they are strictly metadata and don't provide any information essential to the dataset. Default: 'creation_date', 'climada_version' Returns ------- dict of possibles property values """ if exclude_properties is None: exclude_properties = ["date_creation", "climada_version"] ppdf = pd.DataFrame([ds.properties for ds in dataset_infos]) if known_property_values: for key, val in known_property_values.items(): ppdf = ppdf[ppdf[key] == val] property_values = dict() for col in ppdf.columns: if col in exclude_properties: continue valar = ppdf[col].dropna().drop_duplicates().values if valar.size: property_values[col] = list(valar) return property_values
[docs] @staticmethod def into_datasets_df(dataset_infos): """Convenience function providing a DataFrame of datasets with properties. Parameters ---------- dataset_infos : list of DatasetInfo as returned by list_dataset_infos Returns ------- pandas.DataFrame of datasets with properties as found in query by arguments """ dsdf = pd.DataFrame(dataset_infos) ppdf = pd.DataFrame([ds.properties for ds in dataset_infos]) dtdf = pd.DataFrame([pd.Series(dt) for dt in dsdf.data_type]) return ( dtdf.loc[ :, [c for c in dtdf.columns if c not in ["description", "properties"]] ] .join( dsdf.loc[ :, [ c for c in dsdf.columns if c not in ["data_type", "properties", "files"] ], ] ) .join(ppdf) )
[docs] @staticmethod def into_files_df(dataset_infos): """Convenience function providing a DataFrame of files aligned with the input datasets. Parameters ---------- datasets : list of DatasetInfo as returned by list_dataset_infos Returns ------- pandas.DataFrame of the files' informations including dataset informations """ return Client.into_datasets_df(dataset_infos).merge( pd.DataFrame([dsfile for ds in dataset_infos for dsfile in ds.files]) )
[docs] def purge_cache(self, target_dir=SYSTEM_DIR, keep_testfiles=True): """Removes downloaded dataset files from the given directory if they have been downloaded with the API client, if they are beneath the given directory and if one of the following is the case: - there status is neither 'active' nor 'test_dataset' - their status is 'test_dataset' and keep_testfiles is set to False - their status is 'active' and they are outdated, i.e., there is a dataset with the same data_type and name but a newer version. Parameters ---------- target_dir : Path or str, optional files downloaded beneath this directory and empty subdirectories will be removed. default: SYSTEM_DIR keep_testfiles : bool, optional if set to True, files from datasets with status 'test_dataset' will not be removed. default: True """ # collect urls from datasets that should not be removed test_datasets = ( self.list_dataset_infos(status="test_dataset") if keep_testfiles else [] ) test_urls = set( file_info.url for ds_info in test_datasets for file_info in ds_info.files ) active_datasets = self.list_dataset_infos(status="active", version="newest") active_urls = set( file_info.url for ds_info in active_datasets for file_info in ds_info.files ) not_to_be_removed = test_urls.union(active_urls) # make a list of downloaded files that could be removed to_be_removed = [d for d in Download.select() if d.url not in not_to_be_removed] # helper function for filtering by target_dir target_dir = Path(target_dir).absolute() # remove files and sqlite db entries for obsolete in to_be_removed: opath = Path(obsolete.path) if opath.exists() and Path(commonprefix([target_dir, opath])) == target_dir: opath.unlink() obsolete.delete_instance() # clean up: remove all empty directories beneath target_dir def rm_empty_dirs(directory: Path): for subdir in directory.iterdir(): if subdir.is_dir(): rm_empty_dirs(subdir) try: directory.rmdir() except OSError: # raised when the directory is not empty pass rm_empty_dirs(target_dir)
[docs] def get_dataset_file(self, **kwargs): """Convenience method. Combines ``get_dataset`` and ``download_dataset``. Returns the path to a single file if the dataset has only one, otherwise throws an error. Parameters ---------- **kwargs arguments for get_dataset and download_dataset Returns ------- Path """ download_arguments = { "target_dir": kwargs.pop("target_dir", SYSTEM_DIR), "organize_path": kwargs.pop("organize_path", True), } dsi = self.get_dataset_info(**kwargs) _, [test_file] = self.download_dataset(dsi, **download_arguments) return test_file