Source code for climada.entity.exposures.gdp_asset

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Define GDPAsset class.
"""

__all__ = ['GDP2Asset']
import logging
from pathlib import Path
import numpy as np
import pandas as pd
import xarray as xr
import scipy as sp
from climada.entity.tag import Tag
from climada.util.coordinates import pts_to_raster_meta
from climada.util.coordinates import country_iso2natid, get_region_gridpoints, region2isos
from climada.util.constants import RIVER_FLOOD_REGIONS_CSV, SYSTEM_DIR
from .base import Exposures, INDICATOR_IF

LOGGER = logging.getLogger(__name__)

DEF_HAZ_TYPE = 'RF'

CONVERTER = SYSTEM_DIR.joinpath('GDP2Asset_converter_2.5arcmin.nc')


[docs]class GDP2Asset(Exposures):
[docs] def set_countries(self, countries=[], reg=[], ref_year=2000, path=None): """Model countries using values at reference year. If GDP or income group not available for that year, consider the value of the closest available year. Parameters: countries (list): list of country names ISO3 ref_year (int, optional): reference year. Default: 2016 path (string): path to exposure dataset (ISIMIP) """ gdp2a_list = [] tag = Tag() if path is None: LOGGER.error('No path for exposure data set') raise NameError if not Path(path).is_file(): LOGGER.error('Invalid path %s', path) raise NameError try: if not countries: if reg: natISO = region2isos(reg) countries = np.array(natISO) else: LOGGER.error('set_countries requires countries or reg') raise ValueError for cntr_ind in range(len(countries)): gdp2a_list.append(self._set_one_country(countries[cntr_ind], ref_year, path)) tag.description += ("{} GDP2Asset \n").\ format(countries[cntr_ind]) except KeyError: LOGGER.error('Exposure countries: %s or reg %s could not be set, check ISO3 or' ' reference year %s', countries, reg, ref_year) raise tag.description += 'GDP2Asset ' + str(self.ref_year) Exposures.__init__( self, data=Exposures.concat(gdp2a_list).gdf, ref_year=ref_year, tag=tag, value_unit='USD' ) # set meta res = 0.0416666 rows, cols, ras_trans = pts_to_raster_meta((self.gdf.longitude.min(), self.gdf.latitude.min(), self.gdf.longitude.max(), self.gdf.latitude.max()), res) self.meta = {'width': cols, 'height': rows, 'crs': self.crs, 'transform': ras_trans}
@staticmethod def _set_one_country(countryISO, ref_year, path=None): """Extract coordinates of selected countries or region from NatID grid. Parameters: countryISO(str): ISO3 of country ref_year(int): year under consideration path(str): path for gdp-files Raises: KeyError, OSError Returns: GDP2Asset """ natID = country_iso2natid(countryISO) natID_info = pd.read_csv(RIVER_FLOOD_REGIONS_CSV) reg_id, if_rf = _fast_if_mapping(natID, natID_info) lat, lon = get_region_gridpoints(countries=[natID], iso=False, basemap="isimip") coord = np.stack([lat, lon], axis=1) assets = _read_GDP(coord, ref_year, path) reg_id_info = np.full((len(assets),), reg_id) if_rf_info = np.full((len(assets),), if_rf) exp_gdpasset = GDP2Asset() exp_gdpasset.gdf['value'] = assets exp_gdpasset.gdf['latitude'] = coord[:, 0] exp_gdpasset.gdf['longitude'] = coord[:, 1] exp_gdpasset.gdf[INDICATOR_IF + DEF_HAZ_TYPE] = if_rf_info exp_gdpasset.gdf['region_id'] = reg_id_info return exp_gdpasset
def _read_GDP(shp_exposures, ref_year, path=None): """Read GDP-values for the selected area and convert it to asset. Parameters: shp_exposure(2d-array float): coordinates of area ref_year(int): year under consideration path(str): path for gdp-files Raises: KeyError, OSError Returns: np.array """ try: gdp_file = xr.open_dataset(path) asset_converter = xr.open_dataset(CONVERTER) gdp_lon = gdp_file.lon.data gdp_lat = gdp_file.lat.data time = gdp_file.time.dt.year except OSError: LOGGER.error('Problems while reading %s check exposure_file specifications', path) raise OSError try: year_index = np.where(time == ref_year)[0][0] except IndexError: LOGGER.error('No data available for year %s', ref_year) raise KeyError conv_lon = asset_converter.lon.data conv_lat = asset_converter.lat.data gridX, gridY = np.meshgrid(conv_lon, conv_lat) coordinates = np.zeros((gridX.size, 2)) coordinates[:, 0] = gridY.flatten() coordinates[:, 1] = gridX.flatten() gdp = gdp_file.gdp_grid[year_index, :, :].data _test_gdp_centr_match(gdp_lat, gdp_lon, shp_exposures) conv_factors = asset_converter.conversion_factor.data if gdp.shape == conv_factors.shape: asset = gdp * conv_factors asset = sp.interpolate.interpn((gdp_lat, gdp_lon), np.nan_to_num(asset), (shp_exposures[:, 0], shp_exposures[:, 1]), method='nearest', bounds_error=False, fill_value=None) else: conv_factors = sp.interpolate.interpn((conv_lat, conv_lon), np.nan_to_num(conv_factors), (shp_exposures[:, 0], shp_exposures[:, 1]), method='nearest', bounds_error=False, fill_value=None) gdp = sp.interpolate.interpn((gdp_lat, gdp_lon), np.nan_to_num(gdp), (shp_exposures[:, 0], shp_exposures[:, 1]), method='nearest', bounds_error=False, fill_value=None) asset = gdp * conv_factors return asset def _test_gdp_centr_match(gdp_lat, gdp_lon, shp_exposures): if (max(gdp_lat) + 0.5 < max(shp_exposures[:, 0])) or\ (max(gdp_lon) + 0.5 < max(shp_exposures[:, 1])) or\ (min(gdp_lat) - 0.5 > min(shp_exposures[:, 0])) or\ (min(gdp_lon) - 0.5 > min(shp_exposures[:, 1])): LOGGER.error('Asset Data does not match selected country') raise IOError def _fast_if_mapping(countryID, natID_info): """Assign region-ID and impact function id. Parameters: countryID (int) natID_info: dataframe of lookuptable Raises: KeyError Returns: float,float """ nat = natID_info['ID'] if_RF = natID_info['if_RF'] reg_ID = natID_info['Reg_ID'] fancy_if = np.zeros((max(nat) + 1)) fancy_if[:] = np.nan fancy_if[nat] = if_RF fancy_reg = np.zeros((max(nat) + 1)) fancy_reg[:] = np.nan fancy_reg[nat] = reg_ID try: reg_id = fancy_reg[countryID] if_rf = fancy_if[countryID] except KeyError: LOGGER.error('Country ISO unknown') raise KeyError return reg_id, if_rf