Source code for climada.hazard.base

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Define Hazard.
"""

__all__ = ['Hazard']

import copy
import datetime as dt
import itertools
import logging
import pathlib
from typing import Union, Optional, Callable, Dict, Any, List
import warnings

import geopandas as gpd
import h5py
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pathos.pools import ProcessPool as Pool
import rasterio
from rasterio.features import rasterize
from rasterio.warp import reproject, Resampling, calculate_default_transform
import sparse as sp
from scipy import sparse
import xarray as xr

from climada.hazard.centroids.centr import Centroids
import climada.util.plot as u_plot
import climada.util.checker as u_check
import climada.util.dates_times as u_dt
from climada import CONFIG
import climada.util.hdf5_handler as u_hdf5
import climada.util.coordinates as u_coord
from climada.util.constants import ONE_LAT_KM, DEF_CRS, DEF_FREQ_UNIT
from climada.util.coordinates import NEAREST_NEIGHBOR_THRESHOLD

LOGGER = logging.getLogger(__name__)

DEF_VAR_EXCEL = {'sheet_name': {'inten': 'hazard_intensity',
                                'freq': 'hazard_frequency'
                                },
                 'col_name': {'cen_id': 'centroid_id/event_id',
                              'even_id': 'event_id',
                              'even_dt': 'event_date',
                              'even_name': 'event_name',
                              'freq': 'frequency',
                              'orig': 'orig_event_flag'
                              },
                 'col_centroids': {'sheet_name': 'centroids',
                                   'col_name': {'cen_id': 'centroid_id',
                                                'lat': 'latitude',
                                                'lon': 'longitude'
                                                }
                                   }
                 }
"""Excel variable names"""

DEF_VAR_MAT = {'field_name': 'hazard',
               'var_name': {'per_id': 'peril_ID',
                            'even_id': 'event_ID',
                            'ev_name': 'name',
                            'freq': 'frequency',
                            'inten': 'intensity',
                            'unit': 'units',
                            'frac': 'fraction',
                            'comment': 'comment',
                            'datenum': 'datenum',
                            'orig': 'orig_event_flag'
                            },
               'var_cent': {'field_names': ['centroids', 'hazard'],
                            'var_name': {'cen_id': 'centroid_ID',
                                         'lat': 'lat',
                                         'lon': 'lon'
                                         }
                            }
               }
"""MATLAB variable names"""

DEF_COORDS = dict(event="time", longitude="longitude", latitude="latitude")
"""Default coordinates when reading Hazard data from an xarray Dataset"""

DEF_DATA_VARS = ["fraction", "frequency", "event_id", "event_name", "date"]
"""Default keys for optional Hazard attributes when reading from an xarray Dataset"""

[docs] class Hazard(): """ Contains events of some hazard type defined at centroids. Loads from files with format defined in FILE_EXT. Attributes ---------- haz_type : str two-letters hazard-type string, e.g., "TC" (tropical cyclone), "RF" (river flood) or "WF" (wild fire). Note: The acronym is used as reference to the hazard when centroids of multiple hazards are assigned to an ``Exposures`` object. units : str units of the intensity centroids : Centroids centroids of the events event_id : np.array id (>0) of each event event_name : list(str) name of each event (default: event_id) date : np.array integer date corresponding to the proleptic Gregorian ordinal, where January 1 of year 1 has ordinal 1 (ordinal format of datetime library) orig : np.array flags indicating historical events (True) or probabilistic (False) frequency : np.array frequency of each event frequency_unit : str unit of the frequency (default: "1/year") intensity : sparse.csr_matrix intensity of the events at centroids fraction : sparse.csr_matrix fraction of affected exposures for each event at each centroid. If empty (all 0), it is ignored in the impact computations (i.e., is equivalent to fraction is 1 everywhere). """ intensity_thres = 10 """Intensity threshold per hazard used to filter lower intensities. To be set for every hazard type""" vars_oblig = {'units', 'centroids', 'event_id', 'frequency', 'intensity', 'fraction' } """Name of the variables needed to compute the impact. Types: scalar, str, list, 1dim np.array of size num_events, scipy.sparse matrix of shape num_events x num_centroids, Centroids.""" vars_def = {'date', 'orig', 'event_name', 'frequency_unit' } """Name of the variables used in impact calculation whose value is descriptive and can therefore be set with default values. Types: scalar, string, list, 1dim np.array of size num_events. """ vars_opt = set() """Name of the variables that aren't need to compute the impact. Types: scalar, string, list, 1dim np.array of size num_events."""
[docs] def __init__(self, haz_type: str = "", pool: Optional[Pool] = None, units: str = "", centroids: Optional[Centroids] = None, event_id: Optional[np.ndarray] = None, frequency: Optional[np.ndarray] = None, frequency_unit: str = DEF_FREQ_UNIT, event_name: Optional[List[str]] = None, date: Optional[np.ndarray] = None, orig: Optional[np.ndarray] = None, intensity: Optional[sparse.csr_matrix] = None, fraction: Optional[sparse.csr_matrix] = None): """ Initialize values. Parameters ---------- haz_type : str, optional acronym of the hazard type (e.g. 'TC'). pool : pathos.pool, optional Pool that will be used for parallel computation when applicable. Default: None units : str, optional units of the intensity. Defaults to empty string. centroids : Centroids, optional centroids of the events. Defaults to empty Centroids object. event_id : np.array, optional id (>0) of each event. Defaults to empty array. event_name : list(str), optional name of each event (default: event_id). Defaults to empty list. date : np.array, optional integer date corresponding to the proleptic Gregorian ordinal, where January 1 of year 1 has ordinal 1 (ordinal format of datetime library). Defaults to empty array. orig : np.array, optional flags indicating historical events (True) or probabilistic (False). Defaults to empty array. frequency : np.array, optional frequency of each event. Defaults to empty array. frequency_unit : str, optional unit of the frequency (default: "1/year"). intensity : sparse.csr_matrix, optional intensity of the events at centroids. Defaults to empty matrix. fraction : sparse.csr_matrix, optional fraction of affected exposures for each event at each centroid. Defaults to empty matrix. Examples -------- Initialize using keyword arguments: >>> haz = Hazard('TC', intensity=sparse.csr_matrix(np.zeros((2, 2)))) Take hazard values from file: >>> haz = Hazard.from_mat(HAZ_DEMO_MAT, 'demo') """ self.haz_type = haz_type self.units = units self.centroids = centroids if centroids is not None else Centroids() # following values are defined for each event self.event_id = event_id if event_id is not None else np.array([], int) self.frequency = frequency if frequency is not None else np.array( [], float) self.frequency_unit = frequency_unit self.event_name = event_name if event_name is not None else list() self.date = date if date is not None else np.array([], int) self.orig = orig if orig is not None else np.array([], bool) # following values are defined for each event and centroid self.intensity = intensity if intensity is not None else sparse.csr_matrix( np.empty((0, 0))) # events x centroids self.fraction = fraction if fraction is not None else sparse.csr_matrix( self.intensity.shape) # events x centroids self.pool = pool if self.pool: LOGGER.info('Using %s CPUs.', self.pool.ncpus)
[docs] @classmethod def get_default(cls, attribute): """Get the Hazard type default for a given attribute. Parameters ---------- attribute : str attribute name Returns ------ Any """ return { 'frequency_unit': DEF_FREQ_UNIT, }.get(attribute)
[docs] def clear(self): """Reinitialize attributes (except the process Pool).""" for (var_name, var_val) in self.__dict__.items(): if isinstance(var_val, np.ndarray) and var_val.ndim == 1: setattr(self, var_name, np.array([], dtype=var_val.dtype)) elif isinstance(var_val, sparse.csr_matrix): setattr(self, var_name, sparse.csr_matrix(np.empty((0, 0)))) elif not isinstance(var_val, Pool): setattr(self, var_name, self.get_default(var_name) or var_val.__class__())
[docs] def check(self): """Check dimension of attributes. Raises ------ ValueError """ self.centroids.check() self._check_events()
[docs] @classmethod def from_raster(cls, files_intensity, files_fraction=None, attrs=None, band=None, haz_type=None, pool=None, src_crs=None, window=None, geometry=None, dst_crs=None, transform=None, width=None, height=None, resampling=Resampling.nearest): """Create Hazard with intensity and fraction values from raster files If raster files are masked, the masked values are set to 0. Files can be partially read using either window or geometry. Additionally, the data is reprojected when custom dst_crs and/or transform, width and height are specified. Parameters ---------- files_intensity : list(str) file names containing intensity files_fraction : list(str) file names containing fraction attrs : dict, optional name of Hazard attributes and their values band : list(int), optional bands to read (starting at 1), default [1] haz_type : str, optional acronym of the hazard type (e.g. 'TC'). Default: None, which will use the class default ('' for vanilla `Hazard` objects, and hard coded in some subclasses) pool : pathos.pool, optional Pool that will be used for parallel computation when applicable. Default: None src_crs : crs, optional source CRS. Provide it if error without it. window : rasterio.windows.Windows, optional window where data is extracted geometry : list of shapely.geometry, optional consider pixels only within these shapes dst_crs : crs, optional reproject to given crs transform : rasterio.Affine affine transformation to apply wdith : float, optional number of lons for transform height : float, optional number of lats for transform resampling : rasterio.warp.Resampling, optional resampling function used for reprojection to dst_crs Returns ------- Hazard """ if isinstance(files_intensity, (str, pathlib.Path)): files_intensity = [files_intensity] if isinstance(files_fraction, (str, pathlib.Path)): files_fraction = [files_fraction] if not attrs: attrs = {} if not band: band = [1] if files_fraction is not None and len(files_intensity) != len(files_fraction): raise ValueError('Number of intensity files differs from fraction files:' f'{len(files_intensity)} != {len(files_fraction)}') # List all parameters for initialization here (missing ones will be default) hazard_kwargs = dict() if haz_type is not None: hazard_kwargs["haz_type"] = haz_type centroids = Centroids.from_raster_file( files_intensity[0], src_crs=src_crs, window=window, geometry=geometry, dst_crs=dst_crs, transform=transform, width=width, height=height, resampling=resampling) if pool: chunksize = max(min(len(files_intensity) // pool.ncpus, 1000), 1) inten_list = pool.map( centroids.values_from_raster_files, [[f] for f in files_intensity], itertools.repeat(band), itertools.repeat(src_crs), itertools.repeat(window), itertools.repeat(geometry), itertools.repeat(dst_crs), itertools.repeat(transform), itertools.repeat(width), itertools.repeat(height), itertools.repeat(resampling), chunksize=chunksize) intensity = sparse.vstack(inten_list, format='csr') if files_fraction is not None: fract_list = pool.map( centroids.values_from_raster_files, [[f] for f in files_fraction], itertools.repeat(band), itertools.repeat(src_crs), itertools.repeat(window), itertools.repeat(geometry), itertools.repeat(dst_crs), itertools.repeat(transform), itertools.repeat(width), itertools.repeat(height), itertools.repeat(resampling), chunksize=chunksize) fraction = sparse.vstack(fract_list, format='csr') else: intensity = centroids.values_from_raster_files( files_intensity, band=band, src_crs=src_crs, window=window, geometry=geometry, dst_crs=dst_crs, transform=transform, width=width, height=height, resampling=resampling) if files_fraction is not None: fraction = centroids.values_from_raster_files( files_fraction, band=band, src_crs=src_crs, window=window, geometry=geometry, dst_crs=dst_crs, transform=transform, width=width, height=height, resampling=resampling) if files_fraction is None: fraction = intensity.copy() fraction.data.fill(1) hazard_kwargs.update(cls._attrs_to_kwargs(attrs, num_events=intensity.shape[0])) return cls(centroids=centroids, intensity=intensity, fraction=fraction, **hazard_kwargs)
[docs] def set_raster(self, *args, **kwargs): """This function is deprecated, use Hazard.from_raster.""" LOGGER.warning("The use of Hazard.set_raster is deprecated." "Use Hazard.from_raster instead.") self.__dict__ = Hazard.from_raster(*args, **kwargs).__dict__
[docs] def set_vector(self, *args, **kwargs): """This function is deprecated, use Hazard.from_vector.""" LOGGER.warning("The use of Hazard.set_vector is deprecated." "Use Hazard.from_vector instead.") self.__dict__ = Hazard.from_vector(*args, **kwargs).__dict__
[docs] @classmethod def from_xarray_raster_file( cls, filepath: Union[pathlib.Path, str], *args, **kwargs ): """Read raster-like data from a file that can be loaded with xarray This wraps :py:meth:`~Hazard.from_xarray_raster` by first opening the target file as xarray dataset and then passing it to that classmethod. Use this wrapper as a simple alternative to opening the file yourself. The signature is exactly the same, except for the first argument, which is replaced by a file path here. Additional (keyword) arguments are passed to :py:meth:`~Hazard.from_xarray_raster`. Parameters ---------- filepath : Path or str Path of the file to read with xarray. May be any file type supported by xarray. See https://docs.xarray.dev/en/stable/user-guide/io.html Returns ------- hazard : climada.Hazard A hazard object created from the input data Examples -------- >>> hazard = Hazard.from_xarray_raster_file("path/to/file.nc", "", "") Notes ----- If you have specific requirements for opening a data file, prefer opening it yourself and using :py:meth:`~Hazard.from_xarray_raster`, following this pattern: >>> open_kwargs = dict(engine="h5netcdf", chunks=dict(x=-1, y="auto")) >>> with xarray.open_dataset("path/to/file.nc", **open_kwargs) as dset: ... hazard = Hazard.from_xarray_raster(dset, "", "") """ with xr.open_dataset(filepath, chunks="auto") as dset: return cls.from_xarray_raster(dset, *args, **kwargs)
[docs] @classmethod def from_xarray_raster( cls, data: xr.Dataset, hazard_type: str, intensity_unit: str, *, intensity: str = "intensity", coordinate_vars: Optional[Dict[str, str]] = None, data_vars: Optional[Dict[str, str]] = None, crs: str = DEF_CRS, rechunk: bool = False, ): """Read raster-like data from an xarray Dataset This method reads data that can be interpreted using three coordinates: event, latitude, and longitude. The names of the coordinates to be read from the dataset can be specified via the ``coordinate_vars`` parameter. The data and the coordinates themselves may be organized in arbitrary dimensions (e.g. two dimensions 'year' and 'altitude' for the coordinate 'event'). See Notes and Examples if you want to load single-event data that does not contain an event dimension. The only required data is the intensity. For all other data, this method can supply sensible default values. By default, this method will try to find these "optional" data in the Dataset and read it, or use the default values otherwise. Users may specify the variables in the Dataset to be read for certain Hazard object entries, or may indicate that the default values should be used although the Dataset contains appropriate data. This behavior is controlled via the ``data_vars`` parameter. If this method succeeds, it will always return a "consistent" Hazard object, meaning that the object can be used in all CLIMADA operations without throwing an error due to missing data or faulty data types. Use :py:meth:`~Hazard.from_xarray_raster_file` to open a file on disk and load the resulting dataset with this method in one step. Parameters ---------- data : xarray.Dataset The dataset to read from. hazard_type : str The type identifier of the hazard. Will be stored directly in the hazard object. intensity_unit : str The physical units of the intensity. intensity : str, optional Identifier of the `xarray.DataArray` containing the hazard intensity data. coordinate_vars : dict(str, str), optional Mapping from default coordinate names to coordinate names used in the data to read. The default is ``dict(event="time", longitude="longitude", latitude="latitude")``, as most of the commonly used hazard data happens to have a "time" attribute but no "event" attribute. data_vars : dict(str, str), optional Mapping from default variable names to variable names used in the data to read. The default names are ``fraction``, ``hazard_type``, ``frequency``, ``event_name``, ``event_id``, and ``date``. If these values are not set, the method tries to load data from the default names. If this fails, the method uses default values for each entry. If the values are set to empty strings (``""``), no data is loaded and the default values are used exclusively. See examples for details. Default values are: * ``date``: The ``event`` coordinate interpreted as date or ordinal, or ones if that fails (which will issue a warning). * ``fraction``: ``None``, which results in a value of 1.0 everywhere, see :py:meth:`Hazard.__init__` for details. * ``hazard_type``: Empty string * ``frequency``: 1.0 for every event * ``event_name``: String representation of the event date or empty strings if that fails (which will issue a warning). * ``event_id``: Consecutive integers starting at 1 and increasing with time crs : str, optional Identifier for the coordinate reference system of the coordinates. Defaults to ``EPSG:4326`` (WGS 84), defined by ``climada.util.constants.DEF_CRS``. See https://pyproj4.github.io/pyproj/dev/api/crs/crs.html#pyproj.crs.CRS.from_user_input for further information on how to specify the coordinate system. rechunk : bool, optional Rechunk the dataset before flattening. This might have serious performance implications. Rechunking in general is expensive, but it might be less expensive than stacking a poorly-chunked array. One event being stored in one chunk would be the optimal configuration. If ``rechunk=True``, this will be forced by rechunking the data. Ideally, you would select the chunks in that manner when opening the dataset before passing it to this function. Defaults to ``False``. Returns ------- hazard : climada.Hazard A hazard object created from the input data See Also -------- :py:meth:`~Hazard.from_xarray_raster_file` Use this method if you want CLIMADA to open and read a file on disk for you. Notes ----- * Single-valued coordinates given by ``coordinate_vars``, that are not proper dimensions of the data, are promoted to dimensions automatically. If one of the three coordinates does not exist, use ``Dataset.expand_dims`` (see https://docs.xarray.dev/en/stable/generated/xarray.Dataset.expand_dims.html and Examples) before loading the Dataset as Hazard. * Single-valued data for variables ``frequency``. ``event_name``, and ``event_date`` will be broadcast to every event. * The ``event`` coordinate may take arbitrary values. In case these values cannot be interpreted as dates or date ordinals, the default values for ``Hazard.date`` and ``Hazard.event_name`` are used, see the ``data_vars``` parameter documentation above. * To avoid confusion in the call signature, several parameters are keyword-only arguments. * The attributes ``Hazard.haz_type`` and ``Hazard.unit`` currently cannot be read from the Dataset. Use the method parameters to set these attributes. * This method does not read coordinate system metadata. Use the ``crs`` parameter to set a custom coordinate system identifier. Examples -------- The use of this method is straightforward if the Dataset contains the data with expected names. >>> dset = xr.Dataset( ... dict( ... intensity=( ... ["time", "latitude", "longitude"], ... [[[0, 1, 2], [3, 4, 5]]], ... ) ... ), ... dict( ... time=[datetime.datetime(2000, 1, 1)], ... latitude=[0, 1], ... longitude=[0, 1, 2], ... ), ... ) >>> hazard = Hazard.from_xarray_raster(dset, "", "") For non-default coordinate names, use the ``coordinate_vars`` argument. >>> dset = xr.Dataset( ... dict( ... intensity=( ... ["day", "lat", "longitude"], ... [[[0, 1, 2], [3, 4, 5]]], ... ) ... ), ... dict( ... day=[datetime.datetime(2000, 1, 1)], ... lat=[0, 1], ... longitude=[0, 1, 2], ... ), ... ) >>> hazard = Hazard.from_xarray_raster( ... dset, "", "", coordinate_vars=dict(event="day", latitude="lat") ... ) Coordinates can be different from the actual dataset dimensions. The following loads the data with coordinates ``longitude`` and ``latitude`` (default names): >>> dset = xr.Dataset( ... dict(intensity=(["time", "y", "x"], [[[0, 1, 2], [3, 4, 5]]])), ... dict( ... time=[datetime.datetime(2000, 1, 1)], ... y=[0, 1], ... x=[0, 1, 2], ... longitude=(["y", "x"], [[0.0, 0.1, 0.2], [0.0, 0.1, 0.2]]), ... latitude=(["y", "x"], [[0.0, 0.0, 0.0], [0.1, 0.1, 0.1]]), ... ), ... ) >>> hazard = Hazard.from_xarray_raster(dset, "", "") Optional data is read from the dataset if the default keys are found. Users can specify custom variables in the data, or that the default keys should be ignored, with the ``data_vars`` argument. >>> dset = xr.Dataset( ... dict( ... intensity=( ... ["time", "latitude", "longitude"], ... [[[0, 1, 2], [3, 4, 5]]], ... ), ... fraction=( ... ["time", "latitude", "longitude"], ... [[[0.0, 0.1, 0.2], [0.3, 0.4, 0.5]]], ... ), ... freq=(["time"], [0.4]), ... event_id=(["time"], [4]), ... ), ... dict( ... time=[datetime.datetime(2000, 1, 1)], ... latitude=[0, 1], ... longitude=[0, 1, 2], ... ), ... ) >>> hazard = Hazard.from_xarray_raster( ... dset, ... "", ... "", ... data_vars=dict( ... # Load frequency from 'freq' array ... frequency="freq", ... # Ignore 'event_id' array and use default instead ... event_id="", ... # 'fraction' array is loaded because it has the default name ... ), ... ) >>> np.array_equal(hazard.frequency, [0.4]) and np.array_equal( ... hazard.event_id, [1] ... ) True If your read single-event data your dataset probably will not have a time dimension. As long as a time *coordinate* exists, however, this method will automatically promote it to a dataset dimension and load the data: >>> dset = xr.Dataset( ... dict( ... intensity=( ... ["latitude", "longitude"], ... [[0, 1, 2], [3, 4, 5]], ... ) ... ), ... dict( ... time=[datetime.datetime(2000, 1, 1)], ... latitude=[0, 1], ... longitude=[0, 1, 2], ... ), ... ) >>> hazard = Hazard.from_xarray_raster(dset, "", "") # Same as first example If one coordinate is missing altogehter, you must add it or expand the dimensions before loading the dataset: >>> dset = xr.Dataset( ... dict( ... intensity=( ... ["latitude", "longitude"], ... [[0, 1, 2], [3, 4, 5]], ... ) ... ), ... dict( ... latitude=[0, 1], ... longitude=[0, 1, 2], ... ), ... ) >>> dset = dset.expand_dims(time=[numpy.datetime64("2000-01-01")]) >>> hazard = Hazard.from_xarray_raster(dset, "", "") """ # Check data type for better error message if not isinstance(data, xr.Dataset): if isinstance(data, (pathlib.Path, str)): raise TypeError("Passing a path to this classmethod is not supported. " "Use Hazard.from_xarray_raster_file instead.") raise TypeError("This method only supports xarray.Dataset as input data") # Initialize Hazard object hazard_kwargs = dict(haz_type=hazard_type, units=intensity_unit) # Update coordinate identifiers coords = copy.deepcopy(DEF_COORDS) coordinate_vars = coordinate_vars if coordinate_vars is not None else {} unknown_coords = [co for co in coordinate_vars if co not in coords] if unknown_coords: raise ValueError( f"Unknown coordinates passed: '{unknown_coords}'. Supported " f"coordinates are {list(coords.keys())}." ) coords.update(coordinate_vars) # Retrieve dimensions of coordinates try: dims = dict( event=data[coords["event"]].dims, longitude=data[coords["longitude"]].dims, latitude=data[coords["latitude"]].dims, ) # Handle KeyError for better error message except KeyError as err: key = err.args[0] raise RuntimeError( f"Dataset is missing dimension/coordinate: {key}. Dataset dimensions: " f"{list(data.dims.keys())}" ) from err # Try promoting single-value coordinates to dimensions for key, val in dims.items(): if not val: coord = coords[key] LOGGER.debug("Promoting Dataset coordinate '%s' to dimension", coord) data = data.expand_dims(coord) dims[key] = data[coord].dims # Try to rechunk the data to optimize the stack operation afterwards. if rechunk: # We want one event to be contained in one chunk chunks = {dim: -1 for dim in dims["longitude"]} chunks.update({dim: -1 for dim in dims["latitude"]}) # Chunks can be auto-sized along the event dimensions chunks.update({dim: "auto" for dim in dims["event"]}) data = data.chunk(chunks=chunks) # Stack (vectorize) the entire dataset into 2D (time, lat/lon) # NOTE: We want the set union of the dimensions, but Python 'set' does not # preserve order. However, we want longitude to run faster than latitude. # So we use 'dict' without values, as 'dict' preserves insertion order # (dict keys behave like a set). data = data.stack( event=dims["event"], lat_lon=dict.fromkeys(dims["latitude"] + dims["longitude"]), ) # Transform coordinates into centroids centroids = Centroids.from_lat_lon( data[coords["latitude"]].values, data[coords["longitude"]].values, crs=crs, ) def to_csr_matrix(array: xr.DataArray) -> sparse.csr_matrix: """Store a numpy array as sparse matrix, optimizing storage space The CSR matrix stores NaNs explicitly, so we set them to zero. """ array = array.where(array.notnull(), 0) array = xr.apply_ufunc( sp.COO.from_numpy, array, dask="parallelized", output_dtypes=[array.dtype] ) sparse_coo = array.compute().data # Load into memory return sparse_coo.tocsr() # Convert sparse.COO to scipy.sparse.csr_matrix # Read the intensity data LOGGER.debug("Loading Hazard intensity from DataArray '%s'", intensity) intensity_matrix = to_csr_matrix(data[intensity]) # Define accessors for xarray DataArrays def default_accessor(array: xr.DataArray) -> np.ndarray: """Take a DataArray and return its numpy representation""" return array.values def strict_positive_int_accessor(array: xr.DataArray) -> np.ndarray: """Take a positive int DataArray and return its numpy representation Raises ------ TypeError If the underlying data type is not integer ValueError If any value is zero or less """ if not np.issubdtype(array.dtype, np.integer): raise TypeError(f"'{array.name}' data array must be integers") if not (array > 0).all(): raise ValueError(f"'{array.name}' data must be larger than zero") return array.values def date_to_ordinal_accessor( array: xr.DataArray, strict: bool = True ) -> np.ndarray: """Take a DataArray and transform it into ordinals""" try: if np.issubdtype(array.dtype, np.integer): # Assume that data is ordinals return strict_positive_int_accessor(array) # Try transforming to ordinals return np.array(u_dt.datetime64_to_ordinal(array.values)) # Handle access errors except (ValueError, TypeError) as err: if strict: raise err LOGGER.warning( "Failed to read values of '%s' as dates or ordinals. Hazard.date " "will be ones only", array.name, ) return np.ones(array.shape) def year_month_day_accessor( array: xr.DataArray, strict: bool = True ) -> np.ndarray: """Take an array and return am array of YYYY-MM-DD strings""" try: return array.dt.strftime("%Y-%m-%d").values # Handle access errors except (ValueError, TypeError, AttributeError) as err: if strict: raise err LOGGER.warning( "Failed to read values of '%s' as dates. Hazard.event_name will be " "empty strings", array.name, ) return np.full(array.shape, "") def maybe_repeat(values: np.ndarray, times: int) -> np.ndarray: """Return the array or repeat a single-valued array If ``values`` has size 1, return an array that repeats this value ``times`` times. If the size is different, just return the array. """ if values.size == 1: return np.array(list(itertools.repeat(values.flat[0], times))) return values # Create a DataFrame storing access information for each of data_vars # NOTE: Each row will be passed as arguments to # `load_from_xarray_or_return_default`, see its docstring for further # explanation of the DataFrame columns / keywords. num_events = data.sizes["event"] data_ident = pd.DataFrame( data=dict( # The attribute of the Hazard class where the data will be stored hazard_attr=DEF_DATA_VARS, # The identifier and default key used in this method default_key=DEF_DATA_VARS, # The key assigned by the user user_key=None, # The default value for each attribute default_value=[ None, np.ones(num_events), np.array(range(num_events), dtype=int) + 1, list( year_month_day_accessor( data[coords["event"]], strict=False ).flat ), date_to_ordinal_accessor(data[coords["event"]], strict=False), ], # The accessor for the data in the Dataset accessor=[ to_csr_matrix, lambda x: maybe_repeat(default_accessor(x), num_events), strict_positive_int_accessor, lambda x: list(maybe_repeat(default_accessor(x), num_events).flat), lambda x: maybe_repeat(date_to_ordinal_accessor(x), num_events), ], ) ) # Check for unexpected keys data_vars = data_vars if data_vars is not None else {} default_keys = data_ident["default_key"] unknown_keys = [ key for key in data_vars.keys() if not default_keys.str.contains(key).any() ] if unknown_keys: raise ValueError( f"Unknown data variables passed: '{unknown_keys}'. Supported " f"data variables are {list(default_keys)}." ) # Update with keys provided by the user # NOTE: Keys in 'default_keys' missing from 'data_vars' will be set to 'None' # (which is exactly what we want) and the result is written into # 'user_key'. 'default_keys' is not modified. data_ident["user_key"] = default_keys.map(data_vars) def load_from_xarray_or_return_default( user_key: Optional[str], default_key: str, hazard_attr: str, accessor: Callable[[xr.DataArray], Any], default_value: Any, ) -> Any: """Load data for a single Hazard attribute or return the default value Does the following based on the ``user_key``: * If the key is an empty string, return the default value * If the key is a non-empty string, load the data for that key and return it. * If the key is ``None``, look for the ``default_key`` in the data. If it exists, return that data. If not, return the default value. Parameters ---------- user_key : str or None The key set by the user to identify the DataArray to read data from. default_key : str The default key identifying the DataArray to read data from. hazard_attr : str The name of the attribute of ``Hazard`` where the data will be stored in. accessor : Callable A callable that takes the DataArray as argument and returns the data structure that is required by the ``Hazard`` attribute. default_value The default value/array to return in case the data could not be found. Returns ------- The object that will be stored in the ``Hazard`` attribute ``hazard_attr``. Raises ------ KeyError If ``user_key`` was a non-empty string but no such key was found in the data RuntimeError If the data structure loaded has a different shape than the default data structure """ # User does not want to read data if user_key == "": LOGGER.debug( "Using default values for Hazard.%s per user request", hazard_attr ) return default_value if not pd.isna(user_key): # Read key exclusively LOGGER.debug( "Reading data for Hazard.%s from DataArray '%s'", hazard_attr, user_key, ) val = accessor(data[user_key]) else: # Try default key try: val = accessor(data[default_key]) LOGGER.debug( "Reading data for Hazard.%s from DataArray '%s'", hazard_attr, default_key, ) except KeyError: LOGGER.debug( "Using default values for Hazard.%s. No data found", hazard_attr ) return default_value def vshape(array): """Return a shape tuple for any array-like type we use""" if isinstance(array, list): return len(array) if isinstance(array, sparse.csr_matrix): return array.get_shape() return array.shape # Check size for read data if default_value is not None and not np.array_equal( vshape(val), vshape(default_value) ): raise RuntimeError( f"'{user_key if user_key else default_key}' must have shape " f"{vshape(default_value)}, but shape is {vshape(val)}" ) # Return the data return val # Set the Hazard attributes for _, ident in data_ident.iterrows(): hazard_kwargs[ident["hazard_attr"] ] = load_from_xarray_or_return_default(**ident) # Done! LOGGER.debug("Hazard successfully loaded. Number of events: %i", num_events) return cls(centroids=centroids, intensity=intensity_matrix, **hazard_kwargs)
[docs] @classmethod def from_vector(cls, files_intensity, files_fraction=None, attrs=None, inten_name=None, frac_name=None, dst_crs=None, haz_type=None): """Read vector files format supported by fiona. Each intensity name is considered an event. Parameters ---------- files_intensity : list(str) file names containing intensity, default: ['intensity'] files_fraction : (list(str)) file names containing fraction, default: ['fraction'] attrs : dict, optional name of Hazard attributes and their values inten_name : list(str), optional name of variables containing the intensities of each event frac_name : list(str), optional name of variables containing the fractions of each event dst_crs : crs, optional reproject to given crs haz_type : str, optional acronym of the hazard type (e.g. 'TC'). default: None, which will use the class default ('' for vanilla `Hazard` objects, hard coded in some subclasses) Returns ------- haz : climada.hazard.Hazard Hazard from vector file """ if not attrs: attrs = {} if not inten_name: inten_name = ['intensity'] if not frac_name: inten_name = ['fraction'] if files_fraction is not None and len(files_intensity) != len(files_fraction): raise ValueError('Number of intensity files differs from fraction files:' f' {len(files_intensity)} != {len(files_fraction)}') hazard_kwargs = {} if haz_type is not None: hazard_kwargs["haz_type"] = haz_type if len(files_intensity) > 0: centroids = Centroids.from_vector_file(files_intensity[0], dst_crs=dst_crs) elif files_fraction is not None and len(files_fraction) > 0: centroids = Centroids.from_vector_file(files_fraction[0], dst_crs=dst_crs) else: centroids = Centroids() intensity = centroids.values_from_vector_files( files_intensity, val_names=inten_name, dst_crs=dst_crs) if files_fraction is None: fraction = intensity.copy() fraction.data.fill(1) else: fraction = centroids.values_from_vector_files( files_fraction, val_names=frac_name, dst_crs=dst_crs) hazard_kwargs.update(cls._attrs_to_kwargs(attrs, num_events=intensity.shape[0])) return cls( centroids=centroids, intensity=intensity, fraction=fraction, **hazard_kwargs)
@staticmethod def _attrs_to_kwargs(attrs: Dict[str, Any], num_events: int) -> Dict[str, Any]: """Transform attributes to init kwargs or use default values If attributes are missing from ``attrs``, this method will use a sensible default value. Parameters ---------- attrs : dict Attributes for a new Hazard object num_events : int Number of events stored in a new Hazard object. Used for determining default values if Hazard object attributes are missing from ``attrs``. Returns ------- kwargs : dict Keywords arguments to be passed to a Hazard constructor """ kwargs = dict() if 'event_id' in attrs: kwargs["event_id"] = attrs['event_id'] else: kwargs["event_id"] = np.arange(1, num_events + 1) if 'frequency' in attrs: kwargs["frequency"] = attrs['frequency'] else: kwargs["frequency"] = np.ones(kwargs["event_id"].size) if 'frequency_unit' in attrs: kwargs["frequency_unit"] = attrs['frequency_unit'] if 'event_name' in attrs: kwargs["event_name"] = attrs['event_name'] else: kwargs["event_name"] = list(map(str, kwargs["event_id"])) if 'date' in attrs: kwargs["date"] = np.array([attrs['date']]) else: kwargs["date"] = np.ones(kwargs["event_id"].size) if 'orig' in attrs: kwargs["orig"] = np.array([attrs['orig']]) else: kwargs["orig"] = np.ones(kwargs["event_id"].size, bool) if 'unit' in attrs: kwargs["units"] = attrs['unit'] return kwargs
[docs] def reproject_raster(self, dst_crs=False, transform=None, width=None, height=None, resampl_inten=Resampling.nearest, resampl_fract=Resampling.nearest): """Change current raster data to other CRS and/or transformation Parameters ---------- dst_crs: crs, optional reproject to given crs transform: rasterio.Affine affine transformation to apply wdith: float number of lons for transform height: float number of lats for transform resampl_inten: rasterio.warp,.Resampling optional resampling function used for reprojection to dst_crs for intensity resampl_fract: rasterio.warp,.Resampling optional resampling function used for reprojection to dst_crs for fraction """ if not self.centroids.meta: raise ValueError('Raster not set') if not dst_crs: dst_crs = self.centroids.meta['crs'] if transform and not width or transform and not height: raise ValueError('Provide width and height to given transformation.') if not transform: transform, width, height = calculate_default_transform( self.centroids.meta['crs'], dst_crs, self.centroids.meta['width'], self.centroids.meta['height'], self.centroids.meta['transform'][2], (self.centroids.meta['transform'][5] + self.centroids.meta['height'] * self.centroids.meta['transform'][4]), (self.centroids.meta['transform'][2] + self.centroids.meta['width'] * self.centroids.meta['transform'][0]), self.centroids.meta['transform'][5]) dst_meta = self.centroids.meta.copy() dst_meta.update({'crs': dst_crs, 'transform': transform, 'width': width, 'height': height }) intensity = np.zeros((self.size, dst_meta['height'], dst_meta['width'])) fraction = np.zeros((self.size, dst_meta['height'], dst_meta['width'])) kwargs = {'src_transform': self.centroids.meta['transform'], 'src_crs': self.centroids.meta['crs'], 'dst_transform': transform, 'dst_crs': dst_crs, 'resampling': resampl_inten} for idx_ev, inten in enumerate(self.intensity.toarray()): reproject( source=np.asarray(inten.reshape((self.centroids.meta['height'], self.centroids.meta['width']))), destination=intensity[idx_ev, :, :], **kwargs) kwargs.update(resampling=resampl_fract) for idx_ev, fract in enumerate(self.fraction.toarray()): reproject( source=np.asarray( fract.reshape((self.centroids.meta['height'], self.centroids.meta['width']))), destination=fraction[idx_ev, :, :], **kwargs) self.centroids.meta = dst_meta self.intensity = sparse.csr_matrix( intensity.reshape(self.size, dst_meta['height'] * dst_meta['width'])) self.fraction = sparse.csr_matrix( fraction.reshape(self.size, dst_meta['height'] * dst_meta['width'])) self.check()
[docs] def reproject_vector(self, dst_crs, scheduler=None): """Change current point data to a a given projection Parameters ---------- dst_crs : crs reproject to given crs scheduler : str, optional used for dask map_partitions. “threads”, “synchronous” or “processes” """ self.centroids.set_geometry_points(scheduler) self.centroids.geometry = self.centroids.geometry.to_crs(dst_crs) self.centroids.lat = self.centroids.geometry[:].y self.centroids.lon = self.centroids.geometry[:].x self.check()
[docs] def raster_to_vector(self): """Change current raster to points (center of the pixels)""" self.centroids.set_meta_to_lat_lon() self.centroids.meta = dict() self.check()
[docs] def vector_to_raster(self, scheduler=None): """Change current point data to a raster with same resolution Parameters ---------- scheduler : str, optional used for dask map_partitions. “threads”, “synchronous” or “processes” """ points_df = gpd.GeoDataFrame() points_df['latitude'] = self.centroids.lat points_df['longitude'] = self.centroids.lon val_names = ['val' + str(i_ev) for i_ev in range(2 * self.size)] for i_ev, inten_name in enumerate(val_names): if i_ev < self.size: points_df[inten_name] = np.asarray(self.intensity[i_ev, :].toarray()).reshape(-1) else: points_df[inten_name] = np.asarray(self.fraction[i_ev - self.size, :].toarray()). \ reshape(-1) raster, meta = u_coord.points_to_raster(points_df, val_names, crs=self.centroids.geometry.crs, scheduler=scheduler) self.intensity = sparse.csr_matrix(raster[:self.size, :, :].reshape(self.size, -1)) self.fraction = sparse.csr_matrix(raster[self.size:, :, :].reshape(self.size, -1)) self.centroids = Centroids(meta=meta) self.check()
[docs] def read_mat(self, *args, **kwargs): """This function is deprecated, use Hazard.from_mat.""" LOGGER.warning("The use of Hazard.read_mat is deprecated." "Use Hazard.from_mat instead.") self.__dict__ = Hazard.from_mat(*args, **kwargs).__dict__
[docs] @classmethod def from_mat(cls, file_name, var_names=None): """Read climada hazard generate with the MATLAB code in .mat format. Parameters ---------- file_name : str absolute file name var_names : dict, optional name of the variables in the file, default: DEF_VAR_MAT constant Returns ------- haz : climada.hazard.Hazard Hazard object from the provided MATLAB file Raises ------ KeyError """ # pylint: disable=protected-access if not var_names: var_names = DEF_VAR_MAT LOGGER.info('Reading %s', file_name) try: data = u_hdf5.read(file_name) try: data = data[var_names['field_name']] except KeyError: pass centroids = Centroids.from_mat(file_name, var_names=var_names['var_cent']) attrs = cls._read_att_mat(data, file_name, var_names, centroids) haz = cls(haz_type=u_hdf5.get_string(data[var_names['var_name']['per_id']]), centroids=centroids, **attrs ) except KeyError as var_err: raise KeyError("Variable not in MAT file: " + str(var_err)) from var_err return haz
[docs] def read_excel(self, *args, **kwargs): """This function is deprecated, use Hazard.from_excel.""" LOGGER.warning("The use of Hazard.read_excel is deprecated." "Use Hazard.from_excel instead.") self.__dict__ = Hazard.from_excel(*args, **kwargs).__dict__
[docs] @classmethod def from_excel(cls, file_name, var_names=None, haz_type=None): """Read climada hazard generated with the MATLAB code in Excel format. Parameters ---------- file_name : str absolute file name var_names (dict, default): name of the variables in the file, default: DEF_VAR_EXCEL constant haz_type : str, optional acronym of the hazard type (e.g. 'TC'). Default: None, which will use the class default ('' for vanilla `Hazard` objects, and hard coded in some subclasses) Returns ------- haz : climada.hazard.Hazard Hazard object from the provided Excel file Raises ------ KeyError """ # pylint: disable=protected-access if not var_names: var_names = DEF_VAR_EXCEL LOGGER.info('Reading %s', file_name) hazard_kwargs = {} if haz_type is not None: hazard_kwargs["haz_type"] = haz_type try: centroids = Centroids.from_excel(file_name, var_names=var_names['col_centroids']) hazard_kwargs.update(cls._read_att_excel(file_name, var_names, centroids)) except KeyError as var_err: raise KeyError("Variable not in Excel file: " + str(var_err)) from var_err return cls(centroids=centroids, **hazard_kwargs)
[docs] def select(self, event_names=None, event_id=None, date=None, orig=None, reg_id=None, extent=None, reset_frequency=False): """Select events matching provided criteria The frequency of events may need to be recomputed (see `reset_frequency`)! Parameters ---------- event_names : list of str, optional Names of events. event_id : list of int, optional Id of events. Default is None. date : array-like of length 2 containing str or int, optional (initial date, final date) in string ISO format ('2011-01-02') or datetime ordinal integer. orig : bool, optional Select only historical (True) or only synthetic (False) events. reg_id : int, optional Region identifier of the centroids' region_id attibute. extent: tuple(float, float, float, float), optional Extent of centroids as (min_lon, max_lon, min_lat, max_lat). The default is None. reset_frequency : bool, optional Change frequency of events proportional to difference between first and last year (old and new). Default: False. Returns ------- haz : Hazard or None If no event matching the specified criteria is found, None is returned. """ # pylint: disable=unidiomatic-typecheck if type(self) is Hazard: haz = Hazard(self.haz_type) else: haz = self.__class__() #filter events sel_ev = np.ones(self.event_id.size, dtype=bool) # filter events by date if date is not None: date_ini, date_end = date if isinstance(date_ini, str): date_ini = u_dt.str_to_date(date[0]) date_end = u_dt.str_to_date(date[1]) sel_ev &= (date_ini <= self.date) & (self.date <= date_end) if not np.any(sel_ev): LOGGER.info('No hazard in date range %s.', date) return None # filter events hist/synthetic if orig is not None: sel_ev &= (self.orig.astype(bool) == orig) if not np.any(sel_ev): LOGGER.info('No hazard with %s original events.', str(orig)) return None # filter events based on name sel_ev = np.argwhere(sel_ev).reshape(-1) if event_names is not None: filtered_events = [self.event_name[i] for i in sel_ev] try: new_sel = [filtered_events.index(n) for n in event_names] except ValueError as err: name = str(err).replace(" is not in list", "") LOGGER.info('No hazard with name %s', name) return None sel_ev = sel_ev[new_sel] # filter events based on id if event_id is not None: # preserves order of event_id sel_ev = np.array([ np.argwhere(self.event_id == n)[0,0] for n in event_id if n in self.event_id[sel_ev] ]) # filter centroids sel_cen = self.centroids.select_mask(reg_id=reg_id, extent=extent) if not np.any(sel_cen): LOGGER.info('No hazard centroids within extent and region') return None sel_cen = sel_cen.nonzero()[0] for (var_name, var_val) in self.__dict__.items(): if isinstance(var_val, np.ndarray) and var_val.ndim == 1 \ and var_val.size > 0: setattr(haz, var_name, var_val[sel_ev]) elif isinstance(var_val, sparse.csr_matrix): setattr(haz, var_name, var_val[sel_ev, :][:, sel_cen]) elif isinstance(var_val, list) and var_val: setattr(haz, var_name, [var_val[idx] for idx in sel_ev]) elif var_name == 'centroids': if reg_id is None and extent is None: new_cent = var_val else: new_cent = var_val.select(sel_cen=sel_cen) setattr(haz, var_name, new_cent) else: setattr(haz, var_name, var_val) # reset frequency if date span has changed (optional): if reset_frequency: if self.frequency_unit not in ['1/year', 'annual', '1/y', '1/a']: LOGGER.warning("Resetting the frequency is based on the calendar year of given" " dates but the frequency unit here is %s. Consider setting the frequency" " manually for the selection or changing the frequency unit to %s.", self.frequency_unit, DEF_FREQ_UNIT) year_span_old = np.abs(dt.datetime.fromordinal(self.date.max()).year - dt.datetime.fromordinal(self.date.min()).year) + 1 year_span_new = np.abs(dt.datetime.fromordinal(haz.date.max()).year - dt.datetime.fromordinal(haz.date.min()).year) + 1 haz.frequency = haz.frequency * year_span_old / year_span_new haz.sanitize_event_ids() return haz
[docs] def select_tight(self, buffer=NEAREST_NEIGHBOR_THRESHOLD/ONE_LAT_KM, val='intensity'): """ Reduce hazard to those centroids spanning a minimal box which contains all non-zero intensity or fraction points. Parameters ---------- buffer : float, optional Buffer of box in the units of the centroids. The default is approximately equal to the default threshold from the assign_centroids method (works if centroids in lat/lon) val: string, optional Select tight by non-zero 'intensity' or 'fraction'. The default is 'intensity'. Returns ------- Hazard Copy of the Hazard with centroids reduced to minimal box. All other hazard properties are carried over without changes. See also -------- self.select: Method to select centroids by lat/lon extent util.coordinates.match_coordinates: algorithm to match centroids. """ if val == 'intensity': cent_nz = (self.intensity != 0).sum(axis=0).nonzero()[1] if val == 'fraction': cent_nz = (self.fraction != 0).sum(axis=0).nonzero()[1] lon_nz = self.centroids.lon[cent_nz] lat_nz = self.centroids.lat[cent_nz] return self.select(extent=u_coord.toggle_extent_bounds( u_coord.latlon_bounds(lat=lat_nz, lon=lon_nz, buffer=buffer) ))
[docs] def local_exceedance_inten(self, return_periods=(25, 50, 100, 250)): """Compute exceedance intensity map for given return periods. Parameters ---------- return_periods : np.array return periods to consider Returns ------- inten_stats: np.array """ # warn if return period is above return period of rarest event: for period in return_periods: if period > 1 / self.frequency.min(): LOGGER.warning('Return period %1.1f exceeds max. event return period.', period) LOGGER.info('Computing exceedance intenstiy map for return periods: %s', return_periods) num_cen = self.intensity.shape[1] inten_stats = np.zeros((len(return_periods), num_cen)) cen_step = CONFIG.max_matrix_size.int() // self.intensity.shape[0] if not cen_step: raise ValueError('Increase max_matrix_size configuration parameter to >' f' {self.intensity.shape[0]}') # separte in chunks chk = -1 for chk in range(int(num_cen / cen_step)): self._loc_return_inten( np.array(return_periods), self.intensity[:, chk * cen_step:(chk + 1) * cen_step].toarray(), inten_stats[:, chk * cen_step:(chk + 1) * cen_step]) self._loc_return_inten( np.array(return_periods), self.intensity[:, (chk + 1) * cen_step:].toarray(), inten_stats[:, (chk + 1) * cen_step:]) # set values below 0 to zero if minimum of hazard.intensity >= 0: if np.min(inten_stats) < 0 <= self.intensity.min(): LOGGER.warning('Exceedance intenstiy values below 0 are set to 0. \ Reason: no negative intensity values were found in hazard.') inten_stats[inten_stats < 0] = 0 return inten_stats
[docs] def plot_rp_intensity(self, return_periods=(25, 50, 100, 250), smooth=True, axis=None, figsize=(9, 13), adapt_fontsize=True, **kwargs): """Compute and plot hazard exceedance intensity maps for different return periods. Calls local_exceedance_inten. Parameters ---------- return_periods: tuple(int), optional return periods to consider smooth: bool, optional smooth plot to plot.RESOLUTIONxplot.RESOLUTION axis: matplotlib.axes._subplots.AxesSubplot, optional axis to use figsize: tuple, optional figure size for plt.subplots kwargs: optional arguments for pcolormesh matplotlib function used in event plots Returns ------- axis, inten_stats: matplotlib.axes._subplots.AxesSubplot, np.ndarray intenstats is return_periods.size x num_centroids """ self._set_coords_centroids() inten_stats = self.local_exceedance_inten(np.array(return_periods)) colbar_name = 'Intensity (' + self.units + ')' title = list() for ret in return_periods: title.append('Return period: ' + str(ret) + ' years') axis = u_plot.geo_im_from_array(inten_stats, self.centroids.coord, colbar_name, title, smooth=smooth, axes=axis, figsize=figsize, adapt_fontsize=adapt_fontsize, **kwargs) return axis, inten_stats
[docs] def plot_intensity(self, event=None, centr=None, smooth=True, axis=None, adapt_fontsize=True, **kwargs): """Plot intensity values for a selected event or centroid. Parameters ---------- event: int or str, optional If event > 0, plot intensities of event with id = event. If event = 0, plot maximum intensity in each centroid. If event < 0, plot abs(event)-largest event. If event is string, plot events with that name. centr: int or tuple, optional If centr > 0, plot intensity of all events at centroid with id = centr. If centr = 0, plot maximum intensity of each event. If centr < 0, plot abs(centr)-largest centroid where higher intensities are reached. If tuple with (lat, lon) plot intensity of nearest centroid. smooth: bool, optional Rescale data to RESOLUTIONxRESOLUTION pixels (see constant in module `climada.util.plot`) axis: matplotlib.axes._subplots.AxesSubplot, optional axis to use kwargs: optional arguments for pcolormesh matplotlib function used in event plots or for plot function used in centroids plots Returns ------- matplotlib.axes._subplots.AxesSubplot Raises ------ ValueError """ self._set_coords_centroids() col_label = f'Intensity ({self.units})' crs_epsg, _ = u_plot.get_transformation(self.centroids.geometry.crs) if event is not None: if isinstance(event, str): event = self.get_event_id(event) return self._event_plot(event, self.intensity, col_label, smooth, crs_epsg, axis, adapt_fontsize=adapt_fontsize, **kwargs) if centr is not None: if isinstance(centr, tuple): _, _, centr = self.centroids.get_closest_point(centr[0], centr[1]) return self._centr_plot(centr, self.intensity, col_label, axis, **kwargs) raise ValueError("Provide one event id or one centroid id.")
[docs] def plot_fraction(self, event=None, centr=None, smooth=True, axis=None, **kwargs): """Plot fraction values for a selected event or centroid. Parameters ---------- event: int or str, optional If event > 0, plot fraction of event with id = event. If event = 0, plot maximum fraction in each centroid. If event < 0, plot abs(event)-largest event. If event is string, plot events with that name. centr: int or tuple, optional If centr > 0, plot fraction of all events at centroid with id = centr. If centr = 0, plot maximum fraction of each event. If centr < 0, plot abs(centr)-largest centroid where highest fractions are reached. If tuple with (lat, lon) plot fraction of nearest centroid. smooth: bool, optional Rescale data to RESOLUTIONxRESOLUTION pixels (see constant in module `climada.util.plot`) axis: matplotlib.axes._subplots.AxesSubplot, optional axis to use kwargs: optional arguments for pcolormesh matplotlib function used in event plots or for plot function used in centroids plots Returns ------- matplotlib.axes._subplots.AxesSubplot Raises ------ ValueError """ self._set_coords_centroids() col_label = 'Fraction' if event is not None: if isinstance(event, str): event = self.get_event_id(event) return self._event_plot(event, self.fraction, col_label, smooth, axis, **kwargs) if centr is not None: if isinstance(centr, tuple): _, _, centr = self.centroids.get_closest_point(centr[0], centr[1]) return self._centr_plot(centr, self.fraction, col_label, axis, **kwargs) raise ValueError("Provide one event id or one centroid id.")
[docs] def sanitize_event_ids(self): """Make sure that event ids are unique""" if np.unique(self.event_id).size != self.event_id.size: LOGGER.debug('Resetting event_id.') self.event_id = np.arange(1, self.event_id.size + 1)
[docs] def get_event_id(self, event_name): """Get an event id from its name. Several events might have the same name. Parameters ---------- event_name: str Event name Returns ------- list_id: np.array(int) """ list_id = self.event_id[[i_name for i_name, val_name in enumerate(self.event_name) if val_name == event_name]] if list_id.size == 0: raise ValueError(f"No event with name: {event_name}") return list_id
[docs] def get_event_name(self, event_id): """Get the name of an event id. Parameters ---------- event_id: int id of the event Returns ------- str Raises ------ ValueError """ try: return self.event_name[np.argwhere( self.event_id == event_id)[0][0]] except IndexError as err: raise ValueError(f"No event with id: {event_id}") from err
[docs] def get_event_date(self, event=None): """Return list of date strings for given event or for all events, if no event provided. Parameters ---------- event: str or int, optional event name or id. Returns ------- l_dates: list(str) """ if event is None: l_dates = [u_dt.date_to_str(date) for date in self.date] elif isinstance(event, str): ev_ids = self.get_event_id(event) l_dates = [ u_dt.date_to_str(self.date[np.argwhere(self.event_id == ev_id)[0][0]]) for ev_id in ev_ids] else: ev_idx = np.argwhere(self.event_id == event)[0][0] l_dates = [u_dt.date_to_str(self.date[ev_idx])] return l_dates
[docs] def calc_year_set(self): """From the dates of the original events, get number yearly events. Returns ------- orig_yearset: dict key are years, values array with event_ids of that year """ orig_year = np.array([dt.datetime.fromordinal(date).year for date in self.date[self.orig]]) orig_yearset = {} for year in np.unique(orig_year): orig_yearset[year] = self.event_id[self.orig][orig_year == year] return orig_yearset
[docs] def remove_duplicates(self): """Remove duplicate events (events with same name and date).""" events = list(zip(self.event_name, self.date)) set_ev = set(events) if len(set_ev) == self.event_id.size: return unique_pos = sorted([events.index(event) for event in set_ev]) for var_name, var_val in vars(self).items(): if isinstance(var_val, sparse.csr_matrix): setattr(self, var_name, var_val[unique_pos, :]) elif isinstance(var_val, np.ndarray) and var_val.ndim == 1: setattr(self, var_name, var_val[unique_pos]) elif isinstance(var_val, list) and len(var_val) > 0: setattr(self, var_name, [var_val[p] for p in unique_pos])
[docs] def set_frequency(self, yearrange=None): """Set hazard frequency from yearrange or intensity matrix. Parameters ---------- yearrange: tuple or list, optional year range to be used to compute frequency per event. If yearrange is not given (None), the year range is derived from self.date """ if self.frequency_unit not in ['1/year', 'annual', '1/y', '1/a']: LOGGER.warning("setting the frequency on a hazard object who's frequency unit" "is %s and not %s will most likely lead to unexpected results", self.frequency_unit, DEF_FREQ_UNIT) if not yearrange: delta_time = dt.datetime.fromordinal(int(np.max(self.date))).year - \ dt.datetime.fromordinal(int(np.min(self.date))).year + 1 else: delta_time = max(yearrange) - min(yearrange) + 1 num_orig = self.orig.nonzero()[0].size if num_orig > 0: ens_size = self.event_id.size / num_orig else: ens_size = 1 self.frequency = np.ones(self.event_id.size) / delta_time / ens_size
@property def size(self): """Return number of events.""" return self.event_id.size
[docs] def write_raster(self, file_name, intensity=True): """Write intensity or fraction as GeoTIFF file. Each band is an event Parameters ---------- file_name: str file name to write in tif format intensity: bool if True, write intensity, otherwise write fraction """ variable = self.intensity if not intensity: variable = self.fraction if self.centroids.meta: u_coord.write_raster(file_name, variable.toarray(), self.centroids.meta) else: pixel_geom = self.centroids.calc_pixels_polygons() profile = self.centroids.meta profile.update(driver='GTiff', dtype=rasterio.float32, count=self.size) with rasterio.open(file_name, 'w', **profile) as dst: LOGGER.info('Writing %s', file_name) for i_ev in range(variable.shape[0]): raster = rasterize( [(x, val) for (x, val) in zip(pixel_geom, np.array(variable[i_ev, :].toarray()).reshape(-1))], out_shape=(profile['height'], profile['width']), transform=profile['transform'], fill=0, all_touched=True, dtype=profile['dtype'], ) dst.write(raster.astype(profile['dtype']), i_ev + 1)
[docs] def write_hdf5(self, file_name, todense=False): """Write hazard in hdf5 format. Parameters ---------- file_name: str file name to write, with h5 format todense: bool if True write the sparse matrices as hdf5.dataset by converting them to dense format first. This increases readability of the file for other programs. default: False """ LOGGER.info('Writing %s', file_name) with h5py.File(file_name, 'w') as hf_data: str_dt = h5py.special_dtype(vlen=str) for (var_name, var_val) in self.__dict__.items(): if var_name == 'centroids': self.centroids.write_hdf5(hf_data.create_group(var_name)) elif isinstance(var_val, sparse.csr_matrix): if todense: hf_data.create_dataset(var_name, data=var_val.toarray()) else: hf_csr = hf_data.create_group(var_name) hf_csr.create_dataset('data', data=var_val.data) hf_csr.create_dataset('indices', data=var_val.indices) hf_csr.create_dataset('indptr', data=var_val.indptr) hf_csr.attrs['shape'] = var_val.shape elif isinstance(var_val, str): hf_str = hf_data.create_dataset(var_name, (1,), dtype=str_dt) hf_str[0] = var_val elif isinstance(var_val, list) and var_val and isinstance(var_val[0], str): hf_str = hf_data.create_dataset(var_name, (len(var_val),), dtype=str_dt) for i_ev, var_ev in enumerate(var_val): hf_str[i_ev] = var_ev elif var_val is not None and var_name != 'pool': try: hf_data.create_dataset(var_name, data=var_val) except TypeError: LOGGER.warning( "write_hdf5: the class member %s is skipped, due to its " "type, %s, for which writing to hdf5 " "is not implemented. Reading this H5 file will probably lead to " "%s being set to its default value.", var_name, var_val.__class__.__name__, var_name )
[docs] def read_hdf5(self, *args, **kwargs): """This function is deprecated, use Hazard.from_hdf5.""" LOGGER.warning("The use of Hazard.read_hdf5 is deprecated." "Use Hazard.from_hdf5 instead.") self.__dict__ = self.__class__.from_hdf5(*args, **kwargs).__dict__
[docs] @classmethod def from_hdf5(cls, file_name): """Read hazard in hdf5 format. Parameters ---------- file_name: str file name to read, with h5 format Returns ------- haz : climada.hazard.Hazard Hazard object from the provided MATLAB file """ LOGGER.info('Reading %s', file_name) # NOTE: This is a stretch. We instantiate one empty object to iterate over its # attributes. But then we create a new one with the attributes filled! haz = cls() hazard_kwargs = dict() with h5py.File(file_name, 'r') as hf_data: for (var_name, var_val) in haz.__dict__.items(): if var_name not in hf_data.keys(): continue if var_name == 'centroids': hazard_kwargs["centroids"] = Centroids.from_hdf5( hf_data.get(var_name)) elif isinstance(var_val, np.ndarray) and var_val.ndim == 1: hazard_kwargs[var_name] = np.array(hf_data.get(var_name)) elif isinstance(var_val, sparse.csr_matrix): hf_csr = hf_data.get(var_name) if isinstance(hf_csr, h5py.Dataset): hazard_kwargs[var_name] = sparse.csr_matrix(hf_csr) else: hazard_kwargs[var_name] = sparse.csr_matrix( (hf_csr['data'][:], hf_csr['indices'][:], hf_csr['indptr'][:]), hf_csr.attrs['shape']) elif isinstance(var_val, str): hazard_kwargs[var_name] = u_hdf5.to_string( hf_data.get(var_name)[0]) elif isinstance(var_val, list): hazard_kwargs[var_name] = [x for x in map( u_hdf5.to_string, np.array(hf_data.get(var_name)).tolist())] else: hazard_kwargs[var_name] = hf_data.get(var_name) # Now create the actual object we want to return! return cls(**hazard_kwargs)
def _set_coords_centroids(self): """If centroids are raster, set lat and lon coordinates""" if self.centroids.meta and not self.centroids.coord.size: self.centroids.set_meta_to_lat_lon() def _events_set(self): """Generate set of tuples with (event_name, event_date)""" ev_set = set() for ev_name, ev_date in zip(self.event_name, self.date): ev_set.add((ev_name, ev_date)) return ev_set def _event_plot(self, event_id, mat_var, col_name, smooth, crs_espg, axis=None, figsize=(9, 13), adapt_fontsize=True, **kwargs): """Plot an event of the input matrix. Parameters ---------- event_id: int or np.array(int) If event_id > 0, plot mat_var of event with id = event_id. If event_id = 0, plot maximum mat_var in each centroid. If event_id < 0, plot abs(event_id)-largest event. mat_var: sparse matrix Sparse matrix where each row is an event col_name: sparse matrix Colorbar label smooth: bool, optional smooth plot to plot.RESOLUTIONxplot.RESOLUTION axis: matplotlib.axes._subplots.AxesSubplot, optional axis to use figsize: tuple, optional figure size for plt.subplots kwargs: optional arguments for pcolormesh matplotlib function Returns ------- matplotlib.figure.Figure, matplotlib.axes._subplots.AxesSubplot """ if not isinstance(event_id, np.ndarray): event_id = np.array([event_id]) array_val = list() l_title = list() for ev_id in event_id: if ev_id > 0: try: event_pos = np.where(self.event_id == ev_id)[0][0] except IndexError as err: raise ValueError(f'Wrong event id: {ev_id}.') from err im_val = mat_var[event_pos, :].toarray().transpose() title = f'Event ID {self.event_id[event_pos]}: {self.event_name[event_pos]}' elif ev_id < 0: max_inten = np.asarray(np.sum(mat_var, axis=1)).reshape(-1) event_pos = np.argpartition(max_inten, ev_id)[ev_id:] event_pos = event_pos[np.argsort(max_inten[event_pos])][0] im_val = mat_var[event_pos, :].toarray().transpose() title = (f'{np.abs(ev_id)}-largest Event. ID {self.event_id[event_pos]}:' f' {self.event_name[event_pos]}') else: im_val = np.max(mat_var, axis=0).toarray().transpose() title = f'{self.haz_type} max intensity at each point' array_val.append(im_val) l_title.append(title) return u_plot.geo_im_from_array(array_val, self.centroids.coord, col_name, l_title, smooth=smooth, axes=axis, figsize=figsize, proj=crs_espg, adapt_fontsize=adapt_fontsize, **kwargs) def _centr_plot(self, centr_idx, mat_var, col_name, axis=None, **kwargs): """Plot a centroid of the input matrix. Parameters ---------- centr_id: int If centr_id > 0, plot mat_var of all events at centroid with id = centr_id. If centr_id = 0, plot maximum mat_var of each event. If centr_id < 0, plot abs(centr_id)-largest centroid where highest mat_var are reached. mat_var: sparse matrix Sparse matrix where each column represents a centroid col_name: sparse matrix Colorbar label axis: matplotlib.axes._subplots.AxesSubplot, optional axis to use kwargs: optional arguments for plot matplotlib function Returns ------- matplotlib.figure.Figure, matplotlib.axes._subplots.AxesSubplot """ coord = self.centroids.coord if centr_idx > 0: try: centr_pos = centr_idx except IndexError as err: raise ValueError(f'Wrong centroid id: {centr_idx}.') from err array_val = mat_var[:, centr_pos].toarray() title = f'Centroid {centr_idx}: ({coord[centr_pos, 0]}, {coord[centr_pos, 1]})' elif centr_idx < 0: max_inten = np.asarray(np.sum(mat_var, axis=0)).reshape(-1) centr_pos = np.argpartition(max_inten, centr_idx)[centr_idx:] centr_pos = centr_pos[np.argsort(max_inten[centr_pos])][0] array_val = mat_var[:, centr_pos].toarray() title = (f'{np.abs(centr_idx)}-largest Centroid. {centr_pos}:' f' ({coord[centr_pos, 0]}, {coord[centr_pos, 1]})') else: array_val = np.max(mat_var, axis=1).toarray() title = f'{self.haz_type} max intensity at each event' if not axis: _, axis = plt.subplots(1) if 'color' not in kwargs: kwargs['color'] = 'b' axis.set_title(title) axis.set_xlabel('Event number') axis.set_ylabel(str(col_name)) axis.plot(range(len(array_val)), array_val, **kwargs) axis.set_xlim([0, len(array_val)]) return axis def _loc_return_inten(self, return_periods, inten, exc_inten): """Compute local exceedence intensity for given return period. Parameters ---------- return_periods: np.array return periods to consider cen_pos: int centroid position Returns ------- np.array """ # sorted intensity sort_pos = np.argsort(inten, axis=0)[::-1, :] columns = np.ones(inten.shape, int) # pylint: disable=unsubscriptable-object # pylint/issues/3139 columns *= np.arange(columns.shape[1]) inten_sort = inten[sort_pos, columns] # cummulative frequency at sorted intensity freq_sort = self.frequency[sort_pos] np.cumsum(freq_sort, axis=0, out=freq_sort) for cen_idx in range(inten.shape[1]): exc_inten[:, cen_idx] = self._cen_return_inten( inten_sort[:, cen_idx], freq_sort[:, cen_idx], self.intensity_thres, return_periods) def _check_events(self): """Check that all attributes but centroids contain consistent data. Put default date, event_name and orig if not provided. Check not repeated events (i.e. with same date and name) Raises ------ ValueError """ num_ev = len(self.event_id) num_cen = self.centroids.size if np.unique(self.event_id).size != num_ev: raise ValueError("There are events with the same identifier.") u_check.check_oligatories(self.__dict__, self.vars_oblig, 'Hazard.', num_ev, num_ev, num_cen) u_check.check_optionals(self.__dict__, self.vars_opt, 'Hazard.', num_ev) self.event_name = u_check.array_default(num_ev, self.event_name, 'Hazard.event_name', list(self.event_id)) self.date = u_check.array_default(num_ev, self.date, 'Hazard.date', np.ones(self.event_id.shape, dtype=int)) self.orig = u_check.array_default(num_ev, self.orig, 'Hazard.orig', np.zeros(self.event_id.shape, dtype=bool)) if len(self._events_set()) != num_ev: raise ValueError("There are events with same date and name.") @staticmethod def _cen_return_inten(inten, freq, inten_th, return_periods): """From ordered intensity and cummulative frequency at centroid, get exceedance intensity at input return periods. Parameters ---------- inten: np.array sorted intensity at centroid freq: np.array cummulative frequency at centroid inten_th: float intensity threshold return_periods: np.array return periods Returns ------- np.array """ inten_th = np.asarray(inten > inten_th).squeeze() inten_cen = inten[inten_th] freq_cen = freq[inten_th] if not inten_cen.size: return np.zeros((return_periods.size,)) try: with warnings.catch_warnings(): warnings.simplefilter("ignore") pol_coef = np.polyfit(np.log(freq_cen), inten_cen, deg=1) except ValueError: pol_coef = np.polyfit(np.log(freq_cen), inten_cen, deg=0) inten_fit = np.polyval(pol_coef, np.log(1 / return_periods)) wrong_inten = (return_periods > np.max(1 / freq_cen)) & np.isnan(inten_fit) inten_fit[wrong_inten] = 0. return inten_fit @staticmethod def _read_att_mat(data, file_name, var_names, centroids): """Read MATLAB hazard's attributes.""" attrs = dict() attrs["frequency"] = np.squeeze(data[var_names['var_name']['freq']]) try: attrs["frequency_unit"] = u_hdf5.get_string( data[var_names['var_name']['freq_unit']]) except KeyError: pass attrs["orig"] = np.squeeze( data[var_names['var_name']['orig']]).astype(bool) attrs["event_id"] = np.squeeze( data[var_names['var_name']['even_id']].astype(int, copy=False)) try: attrs["units"] = u_hdf5.get_string( data[var_names['var_name']['unit']]) except KeyError: pass n_cen = centroids.size n_event = len(attrs["event_id"]) try: attrs["intensity"] = u_hdf5.get_sparse_csr_mat( data[var_names['var_name']['inten']], (n_event, n_cen)) except ValueError as err: raise ValueError('Size missmatch in intensity matrix.') from err try: attrs["fraction"] = u_hdf5.get_sparse_csr_mat( data[var_names['var_name']['frac']], (n_event, n_cen)) except ValueError as err: raise ValueError('Size missmatch in fraction matrix.') from err except KeyError: attrs["fraction"] = sparse.csr_matrix( np.ones(attrs["intensity"].shape, dtype=float)) # Event names: set as event_id if no provided try: attrs["event_name"] = u_hdf5.get_list_str_from_ref( file_name, data[var_names['var_name']['ev_name']]) except KeyError: attrs["event_name"] = list(attrs["event_id"]) try: datenum = data[var_names['var_name']['datenum']].squeeze() attrs["date"] = np.array([ (dt.datetime.fromordinal(int(date)) + dt.timedelta(days=date % 1) - dt.timedelta(days=366)).toordinal() for date in datenum]) except KeyError: pass return attrs @staticmethod def _read_att_excel(file_name, var_names, centroids): """Read Excel hazard's attributes.""" dfr = pd.read_excel(file_name, var_names['sheet_name']['freq']) num_events = dfr.shape[0] attrs = dict() attrs["frequency"] = dfr[var_names['col_name']['freq']].values attrs["orig"] = dfr[var_names['col_name']['orig']].values.astype(bool) attrs["event_id"] = dfr[var_names['col_name'] ['even_id']].values.astype(int, copy=False) attrs["date"] = dfr[var_names['col_name'] ['even_dt']].values.astype(int, copy=False) attrs["event_name"] = dfr[var_names['col_name'] ['even_name']].values.tolist() dfr = pd.read_excel(file_name, var_names['sheet_name']['inten']) # number of events (ignore centroid_ID column) # check the number of events is the same as the one in the frequency if dfr.shape[1] - 1 is not num_events: raise ValueError('Hazard intensity is given for a number of events ' 'different from the number of defined in its frequency: ' f'{dfr.shape[1] - 1} != {num_events}') # check number of centroids is the same as retrieved before if dfr.shape[0] is not centroids.size: raise ValueError('Hazard intensity is given for a number of centroids ' 'different from the number of centroids defined: ' f'{dfr.shape[0]} != {centroids.size}') attrs["intensity"] = sparse.csr_matrix( dfr.values[:, 1:num_events + 1].transpose()) attrs["fraction"] = sparse.csr_matrix( np.ones(attrs["intensity"].shape, dtype=float)) return attrs
[docs] def append(self, *others): """Append the events and centroids to this hazard object. All of the given hazards must be of the same type and use the same units as self. The centroids of all hazards must have the same CRS. The following kinds of object attributes are processed: - All centroids are combined together using `Centroids.union`. - Lists, 1-dimensional arrays (NumPy) and sparse CSR matrices (SciPy) are concatenated. Sparse matrices are concatenated along the first (vertical) axis. For any other type of attribute: A ValueError is raised if an attribute of that name is not defined in all of the non-empty hazards at least. However, there is no check that the attribute value is identical among the given hazard objects. The initial attribute value of `self` will not be modified. Note: Each of the hazard's `centroids` attributes might be modified in place in the sense that missing properties are added, but existing ones are not overwritten. In case of raster centroids, conversion to point centroids is applied so that raster information (meta) is lost. For more information, see `Centroids.union`. Parameters ---------- others : one or more climada.hazard.Hazard objects Hazard instances to append to self Raises ------ TypeError, ValueError See Also -------- Hazard.concat : concatenate 2 or more hazards Centroids.union : combine centroids """ # pylint: disable=no-member, protected-access if len(others) == 0: return haz_list = [self] + list(others) haz_list_nonempty = [haz for haz in haz_list if haz.size > 0] for haz in haz_list: haz._check_events() # check type, unit, and attribute consistency among hazards haz_types = {haz.haz_type for haz in haz_list if haz.haz_type != ''} if len(haz_types) > 1: raise ValueError(f"The given hazards are of different types: {haz_types}. " "The hazards are incompatible and cannot be concatenated.") self.haz_type = haz_types.pop() haz_classes = {type(haz) for haz in haz_list} if len(haz_classes) > 1: raise TypeError(f"The given hazards are of different classes: {haz_classes}. " "The hazards are incompatible and cannot be concatenated.") freq_units = {haz.frequency_unit for haz in haz_list} if len(freq_units) > 1: raise ValueError(f"The given hazards have different frequency units: {freq_units}. " "The hazards are incompatible and cannot be concatenated.") self.frequency_unit = freq_units.pop() units = {haz.units for haz in haz_list if haz.units != ''} if len(units) > 1: raise ValueError(f"The given hazards use different units: {units}. " "The hazards are incompatible and cannot be concatenated.") if len(units) == 0: units = {''} self.units = units.pop() attributes = sorted(set.union(*[set(vars(haz).keys()) for haz in haz_list])) for attr_name in attributes: if not all(hasattr(haz, attr_name) for haz in haz_list_nonempty): raise ValueError(f"Attribute {attr_name} is not shared by all hazards. " "The hazards are incompatible and cannot be concatenated.") # map individual centroids objects to union centroids = Centroids.union(*[haz.centroids for haz in haz_list]) hazcent_in_cent_idx_list = [ u_coord.match_coordinates(haz.centroids.coord, centroids.coord, threshold=0) for haz in haz_list_nonempty ] # concatenate array and list attributes of non-empty hazards for attr_name in attributes: attr_val_list = [getattr(haz, attr_name) for haz in haz_list_nonempty] if isinstance(attr_val_list[0], sparse.csr_matrix): # map sparse matrix onto centroids setattr(self, attr_name, sparse.vstack([ sparse.csr_matrix( (matrix.data, cent_idx[matrix.indices], matrix.indptr), shape=(matrix.shape[0], centroids.size) ) for matrix, cent_idx in zip(attr_val_list, hazcent_in_cent_idx_list) ], format='csr')) elif isinstance(attr_val_list[0], np.ndarray) and attr_val_list[0].ndim == 1: setattr(self, attr_name, np.hstack(attr_val_list)) elif isinstance(attr_val_list[0], list): setattr(self, attr_name, sum(attr_val_list, [])) self.centroids = centroids self.sanitize_event_ids()
[docs] @classmethod def concat(cls, haz_list): """ Concatenate events of several hazards of same type. This function creates a new hazard of the same class as the first hazard in the given list and then applies the `append` method. Please refer to the docs of `Hazard.append` for caveats and limitations of the concatenation procedure. For centroids, lists, arrays and sparse matrices, the remarks in `Hazard.append` apply. All other attributes are copied from the first object in `haz_list`. Note that `Hazard.concat` can be used to concatenate hazards of a subclass. The result's type will be the subclass. However, calling `concat([])` (with an empty list) is equivalent to instantiation without init parameters. So, `Hazard.concat([])` is equivalent to `Hazard()`. If `HazardB` is a subclass of `Hazard`, then `HazardB.concat([])` is equivalent to `HazardB()` (unless `HazardB` overrides the `concat` method). Parameters ---------- haz_list : list of climada.hazard.Hazard objects Hazard instances of the same hazard type (subclass). Returns ------- haz_concat : instance of climada.hazard.Hazard This will be of the same type (subclass) as all the hazards in `haz_list`. See Also -------- Hazard.append : append hazards to a hazard in place Centroids.union : combine centroids """ if len(haz_list) == 0: return cls() haz_concat = haz_list[0].__class__() haz_concat.haz_type = haz_list[0].haz_type for attr_name, attr_val in vars(haz_list[0]).items(): # to save memory, only copy simple attributes like # "units" that are not explicitly handled by Hazard.append if not (isinstance(attr_val, (list, np.ndarray, sparse.csr_matrix)) or attr_name in ["centroids"]): setattr(haz_concat, attr_name, copy.deepcopy(attr_val)) haz_concat.append(*haz_list) return haz_concat
[docs] def change_centroids(self, centroids, threshold=NEAREST_NEIGHBOR_THRESHOLD): """ Assign (new) centroids to hazard. Centoids of the hazard not in centroids are mapped onto the nearest point. Fails if a point is further than threshold from the closest centroid. The centroids must have the same CRS as self.centroids. Parameters ---------- haz: Hazard Hazard instance centroids: Centroids Centroids instance on which to map the hazard. threshold: int or float Threshold (in km) for mapping haz.centroids not in centroids. Argument is passed to climada.util.coordinates.match_coordinates. Default: 100 (km) Returns ------- haz_new_cent: Hazard Hazard projected onto centroids Raises ------ ValueError See Also -------- util.coordinates.match_coordinates: algorithm to match centroids. """ # define empty hazard haz_new_cent = copy.deepcopy(self) haz_new_cent.centroids = centroids # indices for mapping matrices onto common centroids if centroids.meta: new_cent_idx = u_coord.match_grid_points( self.centroids.lon, self.centroids.lat, centroids.meta['width'], centroids.meta['height'], centroids.meta['transform']) if -1 in new_cent_idx: raise ValueError("At least one hazard centroid is out of" "the raster defined by centroids.meta." " Please choose a larger raster.") else: new_cent_idx = u_coord.match_coordinates( self.centroids.coord, centroids.coord, threshold=threshold ) if -1 in new_cent_idx: raise ValueError("At least one hazard centroid is at a larger " f"distance than the given threshold {threshold} " "from the given centroids. Please choose a " "larger threshold or enlarge the centroids") if np.unique(new_cent_idx).size < new_cent_idx.size: raise ValueError("At least two hazard centroids are mapped to the same " "centroids. Please make sure that the given centroids " "cover the same area like the original centroids and " "are not of lower resolution.") # re-assign attributes intensity and fraction for attr_name in ["intensity", "fraction"]: matrix = getattr(self, attr_name) setattr(haz_new_cent, attr_name, sparse.csr_matrix( (matrix.data, new_cent_idx[matrix.indices], matrix.indptr), shape=(matrix.shape[0], centroids.size) )) return haz_new_cent
@property def centr_exp_col(self): """ Name of the centroids columns for this hazard in an exposures Returns ------- String centroids string indicator with hazard type defining column in an exposures gdf. E.g. "centr_TC" """ from climada.entity.exposures import INDICATOR_CENTR # pylint: disable=import-outside-toplevel # import outside toplevel is necessary for it not being circular return INDICATOR_CENTR + self.haz_type
[docs] def get_mdr(self, cent_idx, impf): """ Return Mean Damage Ratio (mdr) for chosen centroids (cent_idx) for given impact function. Parameters ---------- cent_idx : array-like array of indices of chosen centroids from hazard impf : ImpactFunc impact function to compute mdr Returns ------- sparse.csr_matrix sparse matrix (n_events x len(cent_idx)) with mdr values See Also -------- get_paa: get the paa ffor the given centroids """ uniq_cent_idx, indices = np.unique(cent_idx, return_inverse=True) mdr = self.intensity[:, uniq_cent_idx] if impf.calc_mdr(0) == 0: mdr.data = impf.calc_mdr(mdr.data) else: LOGGER.warning("Impact function id=%d has mdr(0) != 0." "The mean damage ratio must thus be computed for all values of" "hazard intensity including 0 which can be very time consuming.", impf.id) mdr_array = impf.calc_mdr(mdr.toarray().ravel()).reshape(mdr.shape) mdr = sparse.csr_matrix(mdr_array) return mdr[:, indices]
[docs] def get_paa(self, cent_idx, impf): """ Return Percentage of Affected Assets (paa) for chosen centroids (cent_idx) for given impact function. Note that value as intensity = 0 are ignored. This is different from get_mdr. Parameters ---------- cent_idx : array-like array of indices of chosen centroids from hazard impf : ImpactFunc impact function to compute mdr Returns ------- sparse.csr_matrix sparse matrix (n_events x len(cent_idx)) with paa values See Also -------- get_mdr: get the mean-damage ratio for the given centroids """ uniq_cent_idx, indices = np.unique(cent_idx, return_inverse=True) paa = self.intensity[:, uniq_cent_idx] paa.data = np.interp(paa.data, impf.intensity, impf.paa) return paa[:, indices]
def _get_fraction(self, cent_idx=None): """ Return fraction for chosen centroids (cent_idx). Parameters ---------- cent_idx : array-like array of indices of chosen centroids from hazard Default is None (full fraction is returned) Returns ------- sparse.csr_matrix or None sparse matrix (n_events x len(cent_idx)) with fraction values None if fraction is empty. (When calculating the impact, an empty fraction is equivalent to the identity under multiplication, i.e. a uniform matrix with value 1 everywhere.) """ if self.fraction.nnz == 0: return None if cent_idx is None: return self.fraction return self.fraction[:, cent_idx]