Source code for climada.entity.exposures.base

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Define Exposures class.
"""

__all__ = ["Exposures", "add_sea", "INDICATOR_IMPF", "INDICATOR_CENTR"]


import copy
import logging
import warnings
from pathlib import Path

import cartopy.crs as ccrs
import contextily as ctx
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import rasterio
from deprecation import deprecated
from geopandas import GeoDataFrame, GeoSeries, points_from_xy
from mpl_toolkits.axes_grid1 import make_axes_locatable
from rasterio.warp import Resampling

import climada.util.coordinates as u_coord
import climada.util.hdf5_handler as u_hdf5
import climada.util.plot as u_plot
from climada import CONFIG
from climada.hazard import Hazard
from climada.util.constants import CMAP_RASTER, DEF_CRS, ONE_LAT_KM

LOGGER = logging.getLogger(__name__)

INDICATOR_IMPF_OLD = "if_"
"""Previously used name of the column containing the impact functions id of specified hazard"""

INDICATOR_IMPF = "impf_"
"""Name of the column containing the impact functions id of specified hazard"""

INDICATOR_CENTR = "centr_"
"""Name of the column containing the centroids id of specified hazard"""

DEF_REF_YEAR = CONFIG.exposures.def_ref_year.int()
"""Default reference year"""

DEF_VALUE_UNIT = "USD"
"""Default value unit"""

DEF_VAR_MAT = {
    "sup_field_name": "entity",
    "field_name": "assets",
    "var_name": {
        "lat": "lat",
        "lon": "lon",
        "val": "Value",
        "ded": "Deductible",
        "cov": "Cover",
        "impf": "DamageFunID",
        "cat": "Category_ID",
        "reg": "Region_ID",
        "uni": "Value_unit",
        "ass": "centroid_index",
        "ref": "reference_year",
    },
}
"""MATLAB variable names"""


[docs] class Exposures: """geopandas GeoDataFrame with metadata and columns (pd.Series) defined in Attributes. Attributes ---------- description : str metadata - description of content and origin of the data ref_year : int metadata - reference year value_unit : str metadata - unit of the exposures values data : GeoDataFrame containing at least the columns 'geometry' and 'value' for locations and assets optionally more, a.o., 'region_id', 'category_id', columns for (hazard specific) assigned centroids and (hazard specific) impact funcitons. """ _metadata = ["description", "ref_year", "value_unit"] """List of attributes, which are by default read, e.g., from hdf5""" vars_oblig = ["value", "geometry"] """Name of the variables needed to compute the impact.""" vars_def = [INDICATOR_IMPF, INDICATOR_IMPF_OLD] """Name of variables that can be computed.""" vars_opt = [ INDICATOR_CENTR, "deductible", "cover", "category_id", "region_id", "geometry", ] """Name of the variables that aren't need to compute the impact.""" @property def crs(self): """Coordinate Reference System, refers to the crs attribute of the inherent GeoDataFrame""" return self.data.geometry.crs @property def gdf(self): """Inherent GeoDataFrame""" return self.data @property def latitude(self): """Latitude array of exposures""" try: return self.data.geometry.y.values except ValueError as valerr: nonpoints = list( self.data[ self.data.geometry.type != "Point" ].geometry.type.drop_duplicates() ) if nonpoints: raise ValueError( "Can only calculate latitude from Points." f" GeoDataFrame contains {', '.join(nonpoints)}." " Please see the lines_polygons module tutorial." ) from valerr raise @property def longitude(self): """Longitude array of exposures""" try: return self.data.geometry.x.values except ValueError as valerr: nonpoints = list( self.data[ self.data.geometry.type != "Point" ].geometry.type.drop_duplicates() ) if nonpoints: raise ValueError( "Can only calculate longitude from Points." f" GeoDataFrame contains {', '.join(nonpoints)}." " Please see the lines_polygons module tutorial." ) from valerr raise @property def geometry(self): """Geometry array of exposures""" return self.data.geometry.values @property def value(self): """Geometry array of exposures""" if "value" in self.data.columns: return self.data["value"].values return None @property def region_id(self): """Region id for each exposure Returns ------- np.array of int """ if "region_id" in self.data.columns: return self.data["region_id"].values return None @property def category_id(self): """Category id for each exposure Returns ------- np.array """ if "category_id" in self.data.columns: return self.data["category_id"].values return None @property def cover(self): """Cover value for each exposures Returns ------- np.array of float """ if "cover" in self.data.columns: return self.data["cover"].values return None @property def deductible(self): """Deductible value for each exposures Returns ------- np.array of float """ if "deductible" in self.data.columns: return self.data["deductible"].values return None
[docs] def hazard_impf(self, haz_type=""): """Get impact functions for a given hazard Parameters ---------- haz_type : str hazard type, as in the hazard's.haz_type which is the HAZ_TYPE constant of the hazard's module Returns ------- np.array of int impact functions for the given hazard """ col_name = self.get_impf_column(haz_type) return self.data[col_name].values
[docs] def hazard_centroids(self, haz_type=""): """Get centroids for a given hazard Parameters ---------- haz_type : str hazard type, as in the hazard's.haz_type which is the HAZ_TYPE constant of the hazard's module Returns ------- np.array of int centroids index for the given hazard """ if haz_type and INDICATOR_CENTR + haz_type in self.data.columns: return self.data[INDICATOR_CENTR + haz_type].values if INDICATOR_CENTR in self.data.columns: return self.data[INDICATOR_CENTR].values raise ValueError("Missing hazard centroids.")
[docs] def derive_raster(self): """Metadata dictionary, containing raster information, derived from the geometry""" if not self.data.size: return None _r, meta = u_coord.points_to_raster(self.data) return meta
@staticmethod def _consolidate( alternative_data, name, value, default=None, equals=lambda x, y: x == y ): """helper function for __init__ for consolidation of arguments. Most arguments of the __init__ function can be provided either explicitly, by themselves or as part of the input data. This method finds the specific argument from these alternative sources. In case of ambiguity it checks for any discrepancy. In case of missing souorces it returns the default. Parameters ---------- alternative_data: container of general data with named items name: name of the item in the alternative_data value: specific data, could be None default: default value in case of both specific and general data don't yield a result equals: the equality function to check for discrepancies """ altvalue = alternative_data.get(name) if value is None and altvalue is None: return default if value is None: return altvalue if altvalue is None: return value try: if all(equals(altvalue, value)): return value except TypeError: if equals(altvalue, value): return value raise ValueError( f"conflicting arguments: the given {name}" " is different from their corresponding value(s) in meta or data" )
[docs] def __init__( self, data=None, index=None, columns=None, dtype=None, copy=False, # pylint: disable=redefined-outer-name geometry=None, crs=None, meta=None, description=None, ref_year=None, value_unit=None, value=None, lat=None, lon=None, ): """ Parameters ---------- data : dict, iterable, DataFrame, GeoDataFrame, ndarray data of the initial DataFrame, see ``pandas.DataFrame()``. Used to initialize values for "region_id", "category_id", "cover", "deductible", "value", "geometry", "impf_[hazard type]". columns : Index or array, optional Columns of the initial DataFrame, see ``pandas.DataFrame()``. To be provided if `data` is an array index : Index or array, optional Columns of the initial DataFrame, see ``pandas.DataFrame()``. can optionally be provided if `data` is an array or for defining a specific row index dtype : dtype, optional data type of the initial DataFrame, see ``pandas.DataFrame()``. Can be used to assign specific data types to the columns in `data` copy : bool, optional Whether to make a copy of the input `data`, see ``pandas.DataFrame()``. Default is False, i.e. by default `data` may be altered by the ``Exposures`` object. geometry : array, optional Geometry column, see ``geopandas.GeoDataFrame()``. Must be provided if `lat` and `lon` are None and `data` has no "geometry" column. crs : value, optional Coordinate Reference System, see ``geopandas.GeoDataFrame()``. meta : dict, optional Metadata dictionary. Default: {} (empty dictionary). May be used to provide any of `description`, `ref_year`, `value_unit` and `crs` description : str, optional Default: None ref_year : int, optional Reference Year. Defaults to the entry of the same name in `meta` or 2018. value_unit : str, optional Unit of the exposed value. Defaults to the entry of the same name in `meta` or 'USD'. value : array, optional Exposed value column. Must be provided if `data` has no "value" column lat : array, optional Latitude column. Can be provided together with `lon`, alternative to `geometry` lon : array, optional Longitude column. Can be provided together with `lat`, alternative to `geometry` """ geodata = GeoDataFrame( data=data, index=index, columns=columns, dtype=dtype, copy=False ) geometry = self._consolidate(geodata, "geometry", geometry) value = self._consolidate(geodata, "value", value) # both column names are accepted, lat and latitude, respectively lon and longitude. lat = self._consolidate(geodata, "latitude", lat) lat = self._consolidate(geodata, "lat", lat) lon = self._consolidate(geodata, "longitude", lon) lon = self._consolidate(geodata, "lon", lon) # if lat then lon and vice versa: not xor if (lat is None) ^ (lon is None): raise ValueError("either provide both, lat and lon, or none of them") # either geometry or lat/lon if (lat is None) and (geometry is None): if geodata.shape[0] == 0: geodata = geodata.set_geometry([]) geometry = geodata.geometry else: raise ValueError("either provide geometry or lat/lon") meta = meta or {} if not isinstance(meta, dict): raise TypeError("meta must be of type dict") self.description = self._consolidate(meta, "description", description) self.ref_year = self._consolidate(meta, "ref_year", ref_year, DEF_REF_YEAR) self.value_unit = self._consolidate( meta, "value_unit", value_unit, DEF_VALUE_UNIT ) crs = self._consolidate(meta, "crs", crs, equals=u_coord.equal_crs) # finalize geometry, set crs if geometry is None: # -> calculate from lat/lon geometry = points_from_xy(x=lon, y=lat, crs=crs or DEF_CRS) elif isinstance(geometry, str): # -> raise exception raise TypeError( "Exposures is not able to handle customized 'geometry' column names." ) elif isinstance(geometry, GeoSeries): # -> set crs if necessary if crs and not u_coord.equal_crs(crs, geometry.crs): geometry = geometry.set_crs(crs, allow_override=True) if not crs and not geometry.crs: geometry = geometry.set_crs(DEF_CRS) else: # e.g. a list of Points -> turn into GeoSeries geometry = GeoSeries(geometry, crs=crs or DEF_CRS) self.data = GeoDataFrame( data=geodata.loc[ :, [ c for c in geodata.columns if c not in ["geometry", "latitude", "longitude", "lat", "lon"] ], ], copy=copy, geometry=geometry, ) # add a 'value' column in case it is not already part of data if value is not None and self.data.get("value") is None: self.data["value"] = value
def __str__(self): return "\n".join( [f"{md}: {self.__dict__[md]}" for md in type(self)._metadata] + [ f"crs: {self.crs}", f"data: ({self.data.shape[0]} entries)", ( str(self.data) if self.data.shape[0] < 10 else str(pd.concat([self.data[:4], self.data[-4:]])) ), ] ) def _access_item(self, *args): raise TypeError( "Since CLIMADA 2.0, Exposures objects are not subscriptable. Data " "fields of Exposures objects are accessed using the `gdf` attribute. " "For example, `expo['value']` is replaced by `expo.gdf['value']`." ) __getitem__ = _access_item __setitem__ = _access_item __delitem__ = _access_item
[docs] def check(self): """Check Exposures consistency. Reports missing columns in log messages. """ # mandatory columns for var in self.vars_oblig: if var not in self.gdf.columns: raise ValueError(f"{var} missing in gdf") # computable columns except impf_* for var in sorted( set(self.vars_def).difference([INDICATOR_IMPF, INDICATOR_IMPF_OLD]) ): if not var in self.gdf.columns: LOGGER.info("%s not set.", var) # special treatment for impf_* default_impf_present = False for var in [INDICATOR_IMPF, INDICATOR_IMPF_OLD]: if var in self.gdf.columns: LOGGER.info("Hazard type not set in %s", var) default_impf_present = True if not default_impf_present and not [ col for col in self.gdf.columns if col.startswith(INDICATOR_IMPF) or col.startswith(INDICATOR_IMPF_OLD) ]: LOGGER.warning("There are no impact functions assigned to the exposures") # optional columns except centr_* for var in sorted(set(self.vars_opt).difference([INDICATOR_CENTR])): if not var in self.gdf.columns: LOGGER.info("%s not set.", var) # special treatment for centr_* if INDICATOR_CENTR in self.gdf.columns: LOGGER.info("Hazard type not set in %s", INDICATOR_CENTR) elif not any([col.startswith(INDICATOR_CENTR) for col in self.gdf.columns]): LOGGER.info("%s not set.", INDICATOR_CENTR)
[docs] def set_crs(self, crs=DEF_CRS): """Set the Coordinate Reference System. If the epxosures GeoDataFrame has a 'geometry' column it will be updated too. Parameters ---------- crs : object, optional anything anything accepted by pyproj.CRS.from_user_input. """ self.data.geometry.set_crs(crs, inplace=True, allow_override=True)
[docs] def set_gdf(self, gdf: GeoDataFrame, crs=None): """Set the `gdf` GeoDataFrame and update the CRS Parameters ---------- gdf : GeoDataFrame crs : object, optional, anything anything accepted by pyproj.CRS.from_user_input, by default None, then `gdf.crs` applies or - if not set - the exposure's current crs """ # check argument type if not isinstance(gdf, GeoDataFrame): raise ValueError("gdf is not a GeoDataFrame") # set the dataframe self.data = Exposures(data=gdf, crs=crs).data
[docs] def get_impf_column(self, haz_type=""): """Find the best matching column name in the exposures dataframe for a given hazard type, Parameters ---------- haz_type : str or None hazard type, as in the hazard's.haz_type which is the HAZ_TYPE constant of the hazard's module Returns ------- str a column name, the first of the following that is present in the exposures' dataframe: * ``impf_[haz_type]`` * ``if_[haz_type]`` * ``impf_`` * ``if_`` Raises ------ ValueError if none of the above is found in the dataframe. """ if INDICATOR_IMPF + haz_type in self.gdf.columns: return INDICATOR_IMPF + haz_type if INDICATOR_IMPF_OLD + haz_type in self.gdf.columns: LOGGER.info( "Impact function column name 'if_%s' is not according to current" " naming conventions. It's suggested to use 'impf_%s' instead.", haz_type, haz_type, ) return INDICATOR_IMPF_OLD + haz_type if INDICATOR_IMPF in self.gdf.columns: LOGGER.info( "No specific impact function column found for hazard %s." " Using the anonymous 'impf_' column.", haz_type, ) return INDICATOR_IMPF if INDICATOR_IMPF_OLD in self.gdf.columns: LOGGER.info( "No specific impact function column found for hazard %s. Using the" " anonymous 'if_' column, which is not according to current naming" " conventions. It's suggested to use 'impf_' instead.", haz_type, ) return INDICATOR_IMPF_OLD raise ValueError(f"Missing exposures impact functions {INDICATOR_IMPF}.")
[docs] def assign_centroids( self, hazard, distance="euclidean", threshold=u_coord.NEAREST_NEIGHBOR_THRESHOLD, overwrite=True, ): """Assign for each exposure coordinate closest hazard coordinate. The Exposures ``gdf`` will be altered by this method. It will have an additional (or modified) column named ``centr_[hazard.HAZ_TYPE]`` after the call. Uses the utility function ``u_coord.match_centroids``. See there for details and parameters. The value -1 is used for distances larger than ``threshold`` in point distances. In case of raster hazards the value -1 is used for centroids outside of the raster. Parameters ---------- hazard : Hazard Hazard to match (with raster or vector centroids). distance : str, optional Distance to use in case of vector centroids. Possible values are "euclidean", "haversine" and "approx". Default: "euclidean" threshold : float If the distance (in km) to the nearest neighbor exceeds `threshold`, the index `-1` is assigned. Set `threshold` to 0, to disable nearest neighbor matching. Default: 100 (km) overwrite: bool If True, overwrite centroids already present. If False, do not assign new centroids. Default is True. See Also -------- climada.util.coordinates.match_grid_points: method to associate centroids to exposure points when centroids is a raster climada.util.coordinates.match_coordinates: method to associate centroids to exposure points Notes ----- The default order of use is: 1. if centroid raster is defined, assign exposures points to the closest raster point. 2. if no raster, assign centroids to the nearest neighbor using euclidian metric Both cases can introduce innacuracies for coordinates in lat/lon coordinates as distances in degrees differ from distances in meters on the Earth surface, in particular for higher latitude and distances larger than 100km. If more accuracy is needed, please use 'haversine' distance metric. This however is slower for (quasi-)gridded data, and works only for non-gridded data. """ haz_type = hazard.haz_type centr_haz = INDICATOR_CENTR + haz_type if centr_haz in self.gdf: LOGGER.info("Exposures matching centroids already found for %s", haz_type) if overwrite: LOGGER.info("Existing centroids will be overwritten for %s", haz_type) else: return LOGGER.info( "Matching %s exposures with %s centroids.", str(self.gdf.shape[0]), str(hazard.centroids.size), ) if not u_coord.equal_crs(self.crs, hazard.centroids.crs): raise ValueError("Set hazard and exposure to same CRS first!") # Note: equal_crs is tested here, rather than within match_centroids(), # because exp.gdf.crs may not be defined, but exp.crs must be defined. assigned_centr = u_coord.match_centroids( self.gdf, hazard.centroids, distance=distance, threshold=threshold ) self.gdf[centr_haz] = assigned_centr
[docs] @deprecated( details="Obsolete method call. As of climada 5.0, geometry points are set during" " object initialization" ) def set_geometry_points(self, scheduler=None): """obsolete and deprecated since climada 5.0"""
[docs] @deprecated( details="latitude and longitude columns are no longer meaningful in Exposures`" " GeoDataFrames. They can be retrieved from Exposures.latitude and .longitude" " properties" ) def set_lat_lon(self): """Set latitude and longitude attributes from geometry attribute.""" LOGGER.info("Setting latitude and longitude attributes.") self.data["latitude"] = self.latitude self.data["longitude"] = self.longitude
[docs] @deprecated( details="The use of Exposures.set_from_raster is deprecated." " Use Exposures.from_raster instead." ) def set_from_raster(self, *args, **kwargs): """This function is deprecated, use Exposures.from_raster instead.""" self.__dict__ = Exposures.from_raster(*args, **kwargs).__dict__
[docs] @classmethod def from_raster( cls, file_name, band=1, src_crs=None, window=None, geometry=None, dst_crs=None, transform=None, width=None, height=None, resampling=Resampling.nearest, ): """Read raster data and set latitude, longitude, value and meta Parameters ---------- file_name : str file name containing values band : int, optional bands to read (starting at 1) src_crs : crs, optional source CRS. Provide it if error without it. window : rasterio.windows.Windows, optional window where data is extracted geometry : list of shapely.geometry, optional consider pixels only within these shape dst_crs : crs, optional reproject to given crs transform : rasterio.Affine affine transformation to apply width : float number of lons for transform height : float number of lats for transform resampling : rasterio.warp,.Resampling optional resampling function used for reprojection to dst_crs returns -------- Exposures """ meta, value = u_coord.read_raster( file_name, [band], src_crs, window, geometry, dst_crs, transform, width, height, resampling, ) ulx, xres, _, uly, _, yres = meta["transform"].to_gdal() lrx = ulx + meta["width"] * xres lry = uly + meta["height"] * yres x_grid, y_grid = np.meshgrid( np.arange(ulx + xres / 2, lrx, xres), np.arange(uly + yres / 2, lry, yres) ) return cls( { "longitude": x_grid.flatten(), "latitude": y_grid.flatten(), "value": value.reshape(-1), }, meta=meta, crs=meta["crs"], )
[docs] def plot_scatter( self, mask=None, ignore_zero=False, pop_name=True, buffer=0.0, extend="neither", axis=None, figsize=(9, 13), adapt_fontsize=True, title=None, **kwargs, ): """Plot exposures geometry's value sum scattered over Earth's map. The plot will we projected according to the current crs. Parameters ---------- mask : np.array, optional mask to apply to eai_exp plotted. ignore_zero : bool, optional flag to indicate if zero and negative values are ignored in plot. Default: False pop_name : bool, optional add names of the populated places, by default True. buffer : float, optional border to add to coordinates. Default: 0.0. extend : str, optional extend border colorbar with arrows. [ 'neither' | 'both' | 'min' | 'max' ] axis : matplotlib.axes._subplots.AxesSubplot, optional axis to use figsize : tuple, optional figure size for plt.subplots adapt_fontsize : bool, optional If set to true, the size of the fonts will be adapted to the size of the figure. Otherwise the default matplotlib font size is used. Default is True. title : str, optional a title for the plot. If not set `self.description` is used. kwargs : optional arguments for scatter matplotlib function, e.g. cmap='Greys' Returns ------- cartopy.mpl.geoaxes.GeoAxesSubplot """ crs_epsg, _ = u_plot.get_transformation(self.crs) if title is None: title = self.description or "" if mask is None: mask = np.ones((self.gdf.shape[0],), dtype=bool) if ignore_zero: pos_vals = self.gdf["value"][mask].values > 0 else: pos_vals = np.ones((self.gdf["value"][mask].values.size,), dtype=bool) value = self.gdf.value[mask][pos_vals].values coord = np.stack( [ self.gdf.geometry[mask][pos_vals].y.values, self.gdf.geometry[mask][pos_vals].x.values, ], axis=1, ) return u_plot.geo_scatter_from_array( array_sub=value, geo_coord=coord, var_name=f"Value ({self.value_unit})", title=title, pop_name=pop_name, buffer=buffer, extend=extend, proj=crs_epsg, axes=axis, figsize=figsize, adapt_fontsize=adapt_fontsize, **kwargs, )
[docs] def plot_hexbin( self, mask=None, ignore_zero=False, pop_name=True, buffer=0.0, extend="neither", axis=None, figsize=(9, 13), adapt_fontsize=True, title=None, **kwargs, ): """Plot exposures geometry's value sum binned over Earth's map. An other function for the bins can be set through the key reduce_C_function. The plot will we projected according to the current crs. Parameters ---------- mask : np.array, optional mask to apply to eai_exp plotted. ignore_zero : bool, optional flag to indicate if zero and negative values are ignored in plot. Default: False pop_name : bool, optional add names of the populated places, by default True. buffer : float, optional border to add to coordinates. Default: 0.0. extend : str, optional extend border colorbar with arrows. [ 'neither' | 'both' | 'min' | 'max' ] Default is 'neither'. axis : matplotlib.axes._subplots.AxesSubplot, optional axis to use figsize : tuple figure size for plt.subplots Default is (9, 13). adapt_fontsize : bool, optional If set to true, the size of the fonts will be adapted to the size of the figure. Otherwise the default matplotlib font size is used. Default is True. title : str, optional a title for the plot. If not set `self.description` is used. kwargs : optional arguments for hexbin matplotlib function, e.g. `reduce_C_function=np.average`. Default is `reduce_C_function=np.sum` Returns ------- cartopy.mpl.geoaxes.GeoAxesSubplot """ crs_epsg, _ = u_plot.get_transformation(self.crs) if title is None: title = self.description or "" if "reduce_C_function" not in kwargs: kwargs["reduce_C_function"] = np.sum if mask is None: mask = np.ones((self.gdf.shape[0],), dtype=bool) if ignore_zero: pos_vals = self.gdf["value"][mask].values > 0 else: pos_vals = np.ones((self.gdf["value"][mask].values.size,), dtype=bool) value = self.gdf["value"][mask][pos_vals].values coord = np.stack( [ self.gdf.geometry[mask][pos_vals].y.values, self.gdf.geometry[mask][pos_vals].x.values, ], axis=1, ) return u_plot.geo_bin_from_array( array_sub=value, geo_coord=coord, var_name=f"Value ({self.value_unit})", title=title, pop_name=pop_name, buffer=buffer, extend=extend, proj=crs_epsg, axes=axis, figsize=figsize, adapt_fontsize=adapt_fontsize, **kwargs, )
[docs] def plot_raster( self, res=None, raster_res=None, save_tiff=None, raster_f=lambda x: np.log10((np.fmax(x + 1, 1))), label="value (log10)", scheduler=None, axis=None, figsize=(9, 13), fill=True, adapt_fontsize=True, **kwargs, ): """Generate raster from points geometry and plot it using log10 scale `np.log10((np.fmax(raster+1, 1)))`. Parameters ---------- res : float, optional resolution of current data in units of latitude and longitude, approximated if not provided. raster_res : float, optional desired resolution of the raster save_tiff : str, optional file name to save the raster in tiff format, if provided raster_f : lambda function transformation to use to data. Default: log10 adding 1. label : str colorbar label scheduler : str used for dask map_partitions. “threads”, “synchronous” or “processes” axis : matplotlib.axes._subplots.AxesSubplot, optional axis to use figsize : tuple, optional figure size for plt.subplots fill : bool, optional If false, the areas with no data will be plotted in white. If True, the areas with missing values are filled as 0s. The default is True. adapt_fontsize : bool, optional If set to true, the size of the fonts will be adapted to the size of the figure. Otherwise the default matplotlib font size is used. Default is True. kwargs : optional arguments for imshow matplotlib function Returns ------- matplotlib.figure.Figure, cartopy.mpl.geoaxes.GeoAxesSubplot """ raster, meta = u_coord.points_to_raster( points_df=self.data, val_names=["value"], res=res, raster_res=raster_res, crs=self.crs, scheduler=None, ) raster = raster.reshape((meta["height"], meta["width"])) # save tiff if save_tiff is not None: with rasterio.open( fp=save_tiff, mode="w", driver="GTiff", height=meta["height"], width=meta["width"], count=1, dtype=np.float32, crs=self.crs, transform=meta["transform"], ) as ras_tiff: ras_tiff.write(raster.astype(np.float32), 1) # make plot proj_data, _ = u_plot.get_transformation(self.crs) proj_plot = proj_data if isinstance(proj_data, ccrs.PlateCarree): # use different projections for plot and data to shift the central lon in the plot xmin, ymin, xmax, ymax = u_coord.latlon_bounds( lat=self.latitude, lon=self.longitude ) proj_plot = ccrs.PlateCarree(central_longitude=0.5 * (xmin + xmax)) else: xmin, ymin, xmax, ymax = ( self.longitude.min(), self.latitude.min(), self.longitude.max(), self.latitude.max(), ) if not axis: _, axis, fontsize = u_plot.make_map( proj=proj_plot, figsize=figsize, adapt_fontsize=adapt_fontsize ) else: fontsize = None cbar_ax = make_axes_locatable(axis).append_axes( "right", size="6.5%", pad=0.1, axes_class=plt.Axes ) axis.set_extent((xmin, xmax, ymin, ymax), crs=proj_data) u_plot.add_shapes(axis) if not fill: raster = np.where(raster == 0, np.nan, raster) raster_f = lambda x: np.log10((np.maximum(x + 1, 1))) if "cmap" not in kwargs: kwargs["cmap"] = CMAP_RASTER imag = axis.imshow( raster_f(raster), **kwargs, origin="upper", extent=(xmin, xmax, ymin, ymax), transform=proj_data, ) cbar = plt.colorbar(imag, cax=cbar_ax, label=label) plt.colorbar(imag, cax=cbar_ax, label=label) plt.tight_layout() plt.draw() if fontsize: cbar.ax.tick_params(labelsize=fontsize) cbar.ax.yaxis.get_offset_text().set_fontsize(fontsize) for item in [axis.title, cbar.ax.xaxis.label, cbar.ax.yaxis.label]: item.set_fontsize(fontsize) return axis
[docs] def plot_basemap( self, mask=None, ignore_zero=False, pop_name=True, buffer=0.0, extend="neither", zoom=10, url=ctx.providers.CartoDB.Positron, axis=None, **kwargs, ): """Scatter points over satellite image using contextily Parameters ---------- mask : np.array, optional mask to apply to eai_exp plotted. Same size of the exposures, only the selected indexes will be plot. ignore_zero : bool, optional flag to indicate if zero and negative values are ignored in plot. Default: False pop_name : bool, optional add names of the populated places, by default True. buffer : float, optional border to add to coordinates. Default: 0.0. extend : str, optional extend border colorbar with arrows. [ 'neither' | 'both' | 'min' | 'max' ] zoom : int, optional zoom coefficient used in the satellite image url : Any, optional image source, e.g., ``ctx.providers.OpenStreetMap.Mapnik``. Default: ``ctx.providers.CartoDB.Positron`` axis : matplotlib.axes._subplots.AxesSubplot, optional axis to use kwargs : optional arguments for scatter matplotlib function, e.g. cmap='Greys'. Default: 'Wistia' Returns ------- matplotlib.figure.Figure, cartopy.mpl.geoaxes.GeoAxesSubplot """ crs_ori = self.crs self.to_crs(epsg=3857, inplace=True) axis = self.plot_scatter( mask, ignore_zero, pop_name, buffer, extend, shapes=False, axis=axis, **kwargs, ) ctx.add_basemap(axis, zoom, source=url, origin="upper") axis.set_axis_off() self.to_crs(crs_ori, inplace=True) return axis
[docs] def write_hdf5(self, file_name): """Write data frame and metadata in hdf5 format Parameters ---------- file_name : str (path and) file name to write to. """ LOGGER.info("Writing %s", file_name) store = pd.HDFStore(file_name, mode="w") pandas_df = pd.DataFrame(self.gdf) for col in pandas_df.columns: if str(pandas_df[col].dtype) == "geometry": pandas_df[col] = np.asarray(self.gdf[col]) # Avoid pandas PerformanceWarning when writing HDF5 data with warnings.catch_warnings(): warnings.simplefilter("ignore", category=pd.errors.PerformanceWarning) # Write dataframe store.put("exposures", pandas_df) var_meta = {} for var in type(self)._metadata: var_meta[var] = getattr(self, var) var_meta["crs"] = self.crs store.get_storer("exposures").attrs.metadata = var_meta store.close()
[docs] def read_hdf5(self, *args, **kwargs): """This function is deprecated, use Exposures.from_hdf5 instead.""" LOGGER.warning( "The use of Exposures.read_hdf5 is deprecated." "Use Exposures.from_hdf5 instead." ) self.__dict__ = Exposures.from_hdf5(*args, **kwargs).__dict__
[docs] @classmethod def from_hdf5(cls, file_name): """Read data frame and metadata in hdf5 format Parameters ---------- file_name : str (path and) file name to read from. additional_vars : list list of additional variable names, other than the attributes of the Exposures class, whose values are to be read into the Exposures object class. Returns ------- Exposures """ LOGGER.info("Reading %s", file_name) if not Path(file_name).is_file(): raise FileNotFoundError(str(file_name)) with pd.HDFStore(file_name, mode="r") as store: metadata = store.get_storer("exposures").attrs.metadata # in previous versions of CLIMADA and/or geopandas, the CRS was stored in '_crs'/'crs' crs = metadata.get("crs", metadata.get("_crs")) if crs is None and metadata.get("meta"): crs = metadata["meta"].get("crs") exp = cls(store["exposures"], crs=crs) for key, val in metadata.items(): if key in type(exp)._metadata: # pylint: disable=protected-access setattr(exp, key, val) if key == "tag": # for backwards compatitbility with climada <= 3.x descriptions = [ u_hdf5.to_string(x) for x in getattr(val, "description", []) ] exp.description = "\n".join(descriptions) if descriptions else None return exp
[docs] def read_mat(self, *args, **kwargs): """This function is deprecated, use Exposures.from_mat instead.""" LOGGER.warning( "The use of Exposures.read_mat is deprecated." "Use Exposures.from_mat instead." ) self.__dict__ = Exposures.from_mat(*args, **kwargs).__dict__
[docs] @classmethod def from_mat(cls, file_name, var_names=None): """Read MATLAB file and store variables in exposures. Parameters ---------- file_name : str absolute path file var_names : dict, optional dictionary containing the name of the MATLAB variables. Default: DEF_VAR_MAT. Returns ------- Exposures """ LOGGER.info("Reading %s", file_name) if not var_names: var_names = DEF_VAR_MAT data = u_hdf5.read(file_name) try: data = data[var_names["sup_field_name"]] except KeyError: pass try: data = data[var_names["field_name"]] exposures = dict() _read_mat_obligatory(exposures, data, var_names) _read_mat_optional(exposures, data, var_names) except KeyError as var_err: raise KeyError( f"Variable not in MAT file: {var_names.get('field_name')}" ) from var_err exp = cls(data=exposures) _read_mat_metadata(exp, data, file_name, var_names) return exp
# # Extends the according geopandas method #
[docs] def to_crs(self, crs=None, epsg=None, inplace=False): """Wrapper of the :py:meth:`GeoDataFrame.to_crs` method. Transform geometries to a new coordinate reference system. Transform all geometries in a GeoSeries to a different coordinate reference system. The crs attribute on the current GeoSeries must be set. Either crs in string or dictionary form or an EPSG code may be specified for output. This method will transform all points in all objects. It has no notion or projecting entire geometries. All segments joining points are assumed to be lines in the current projection, not geodesics. Objects crossing the dateline (or other projection boundary) will have undesirable behavior. Parameters ---------- crs : dict or str Output projection parameters as string or in dictionary form. epsg : int EPSG code specifying output projection. inplace : bool, optional, default: False Whether to return a new GeoDataFrame or do the transformation in place. Returns ------- None if inplace is True else a transformed copy of the exposures object """ if crs and epsg: raise ValueError("one of crs or epsg must be None") if inplace: self.data.to_crs(crs, epsg, True) return None exp = self.copy() exp.to_crs(crs, epsg, True) return exp
[docs] def plot(self, *args, **kwargs): """Wrapper of the :py:meth:`GeoDataFrame.plot` method""" self.gdf.plot(*args, **kwargs)
[docs] def copy(self, deep=True): """Make a copy of this Exposures object. Parameters ---------- deep (bool): Make a deep copy, i.e. also copy data. Default True. Returns ------- Exposures """ gdf = self.gdf.copy(deep=deep) metadata = dict( [(md, copy.deepcopy(self.__dict__[md])) for md in type(self)._metadata] ) metadata["crs"] = self.crs return type(self)(gdf, **metadata)
[docs] def write_raster(self, file_name, value_name="value", scheduler=None): """Write value data into raster file with GeoTiff format Parameters ---------- file_name : str name output file in tif format """ raster, meta = u_coord.points_to_raster( self.gdf, [value_name], scheduler=scheduler ) u_coord.write_raster(file_name, raster, meta)
[docs] @staticmethod def concat(exposures_list): """Concatenates Exposures or DataFrame objectss to one Exposures object. Parameters ---------- exposures_list : list of Exposures or DataFrames The list must not be empty with the first item supposed to be an Exposures object. Returns ------- Exposures with the metadata of the first item in the list and the dataframes concatenated. """ exp = exposures_list[0].copy(deep=False) if not isinstance(exp, Exposures): exp = Exposures(exp) exp.check() df_list = [ex.gdf if isinstance(ex, Exposures) else ex for ex in exposures_list] crss = [ ex.crs for ex in exposures_list if isinstance(ex, (Exposures, GeoDataFrame)) and hasattr(ex, "crs") and ex.crs is not None ] if crss: crs = crss[0] if any(not u_coord.equal_crs(c, crs) for c in crss[1:]): raise ValueError("concatenation of exposures with different crs") else: crs = None exp.set_gdf( GeoDataFrame(pd.concat(df_list, ignore_index=True, sort=False)), crs=crs ) return exp
[docs] def centroids_total_value(self, hazard): """Compute value of exposures close enough to be affected by hazard .. deprecated:: 3.3 This method will be removed in a future version. Use :py:meth:`affected_total_value` instead. This method computes the sum of the value of all exposures points for which a Hazard centroid is assigned. Parameters ---------- hazard : Hazard Hazard affecting Exposures Returns ------- float Sum of value of all exposures points for which a centroids is assigned """ nz_mask = (self.gdf["value"].values > 0) & ( self.gdf[hazard.centr_exp_col].values >= 0 ) return np.sum(self.gdf["value"].values[nz_mask])
[docs] def affected_total_value( self, hazard: Hazard, threshold_affected: float = 0, overwrite_assigned_centroids: bool = True, ): """ Total value of the exposures that are affected by at least one hazard event (sum of value of all exposures points for which at least one event has intensity larger than the threshold). Parameters ---------- hazard : Hazard Hazard affecting Exposures threshold_affected : int or float Hazard intensity threshold above which an exposures is considere affected. The default is 0. overwrite_assigned_centroids : boolean Assign centroids from the hazard to the exposures and overwrite existing ones. The default is True. Returns ------- float Sum of value of all exposures points for which a centroids is assigned and that have at least one event intensity above threshold. See Also -------- Exposures.assign_centroids : method to assign centroids. Note ---- The fraction attribute of the hazard is ignored. Thus, for hazards with fraction defined the affected values will be overestimated. """ self.assign_centroids(hazard=hazard, overwrite=overwrite_assigned_centroids) assigned_centroids = self.gdf[hazard.centr_exp_col] nz_mask = (self.gdf["value"].values > 0) & (assigned_centroids.values >= 0) cents = np.unique(assigned_centroids[nz_mask]) cent_with_inten_above_thres = ( hazard.intensity[:, cents].max(axis=0) > threshold_affected ).nonzero()[1] above_thres_mask = np.isin( self.gdf[hazard.centr_exp_col].values, cents[cent_with_inten_above_thres] ) return np.sum(self.gdf["value"].values[above_thres_mask])
[docs] def add_sea(exposures, sea_res, scheduler=None): """Add sea to geometry's surroundings with given resolution. region_id set to -1 and other variables to 0. Parameters ---------- exposures : Exposures the Exposures object without sea surroundings. sea_res : tuple (float,float) (sea_coast_km, sea_res_km), where first parameter is distance from coast to fill with water and second parameter is resolution between sea points scheduler : str, optional used for dask map_partitions. “threads”, “synchronous” or “processes” Returns ------- Exposures """ LOGGER.info( "Adding sea at %s km resolution and %s km distance from coast.", str(sea_res[1]), str(sea_res[0]), ) sea_res = (sea_res[0] / ONE_LAT_KM, sea_res[1] / ONE_LAT_KM) min_lat = max(-90, float(exposures.latitude.min()) - sea_res[0]) max_lat = min(90, float(exposures.latitude.max()) + sea_res[0]) min_lon = max(-180, float(exposures.longitude.min()) - sea_res[0]) max_lon = min(180, float(exposures.longitude.max()) + sea_res[0]) lat_arr = np.arange(min_lat, max_lat + sea_res[1], sea_res[1]) lon_arr = np.arange(min_lon, max_lon + sea_res[1], sea_res[1]) lon_mgrid, lat_mgrid = np.meshgrid(lon_arr, lat_arr) lon_mgrid, lat_mgrid = lon_mgrid.ravel(), lat_mgrid.ravel() on_land = ~u_coord.coord_on_land(lat_mgrid, lon_mgrid) sea_exp_gdf = GeoDataFrame() sea_exp_gdf["latitude"] = lat_mgrid[on_land] sea_exp_gdf["longitude"] = lon_mgrid[on_land] sea_exp_gdf["region_id"] = np.zeros(sea_exp_gdf["latitude"].size, int) - 1 if "geometry" in exposures.gdf.columns: u_coord.set_df_geometry_points( sea_exp_gdf, crs=exposures.crs, scheduler=scheduler ) for var_name in exposures.gdf.columns: if var_name not in ("latitude", "longitude", "region_id", "geometry"): sea_exp_gdf[var_name] = np.zeros( sea_exp_gdf["latitude"].size, exposures.gdf[var_name].dtype ) return Exposures( pd.concat([exposures.gdf, sea_exp_gdf], ignore_index=True, sort=False), crs=exposures.crs, ref_year=exposures.ref_year, value_unit=exposures.value_unit, description=exposures.description, )
def _read_mat_obligatory(exposures, data, var_names): """Fill obligatory variables.""" exposures["value"] = np.squeeze(data[var_names["var_name"]["val"]]) exposures["latitude"] = data[var_names["var_name"]["lat"]].reshape(-1) exposures["longitude"] = data[var_names["var_name"]["lon"]].reshape(-1) exposures[INDICATOR_IMPF] = np.squeeze(data[var_names["var_name"]["impf"]]).astype( int, copy=False ) def _read_mat_optional(exposures, data, var_names): """Fill optional parameters.""" try: exposures["deductible"] = np.squeeze(data[var_names["var_name"]["ded"]]) except KeyError: pass try: exposures["cover"] = np.squeeze(data[var_names["var_name"]["cov"]]) except KeyError: pass try: exposures["category_id"] = np.squeeze( data[var_names["var_name"]["cat"]] ).astype(int, copy=False) except KeyError: pass try: exposures["region_id"] = np.squeeze(data[var_names["var_name"]["reg"]]).astype( int, copy=False ) except KeyError: pass try: assigned = np.squeeze(data[var_names["var_name"]["ass"]]).astype( int, copy=False ) if assigned.size > 0: exposures[INDICATOR_CENTR] = assigned except KeyError: pass def _read_mat_metadata(exposures, data, file_name, var_names): """Fill metadata in DataFrame object""" try: exposures.ref_year = int(np.squeeze(data[var_names["var_name"]["ref"]])) except KeyError: exposures.ref_year = DEF_REF_YEAR try: exposures.value_unit = u_hdf5.get_str_from_ref( file_name, data[var_names["var_name"]["uni"]][0][0] ) except KeyError: exposures.value_unit = DEF_VALUE_UNIT