Source code for climada.util.interpolation

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Define interpolation and extrapolation functions for calculating (local) exceedance frequencies and return periods
"""

import logging

import numpy as np
from scipy import interpolate

LOGGER = logging.getLogger(__name__)


[docs] def preprocess_and_interpolate_ev( test_frequency, test_values, frequency, values, log_frequency=False, log_values=False, value_threshold=None, method="interpolate", y_asymptotic=np.nan, ): """Wrapper function to first preprocess (frequency, values) data and and then inter- and extrapolate to test frequencies or test values. Parameters ---------- test_frequency : array_like 1-D array of test frequencies for which values (e.g., intensities or impacts) should be assigned. test_values : array_like 1-D array of test values (e.g., intensities or impacts) for which frequencies should be assigned. frequency : array_like 1-D array of frequencies to be interpolated. values : array_like 1-D array of values (e.g., intensities or impacts) to be interpolated. log_frequency : bool, optional If set to True, frequencies are interpolated on log scale. Defaults to False. log_values : bool, optional If set to True, values (e.g., intensities) are interpolated on log scale. Defaults to False. value_threshold : float, optional Lower threshold to filter values (e.g., intensities or impacts). Defaults to None. method : str, optional Method to interpolate to test x values. Currently available are "interpolate", "extrapolate", "extrapolate_constant" and "stepfunction". If set to "interpolate", test x values outside the range of the given x values will be assigned NaN. If set to "extrapolate_constant" or "stepfunction", test x values larger than given x values will be assigned largest given y value, and test x values smaller than the given x values will be assigned y_asymtotic. If set to "extrapolate", values will be extrapolated (and interpolated). Defaults to "interpolate". y_asymptotic : float, optional Has no effect if method is "interpolate". Else, provides return value and if for test x values larger than given x values, if size < 2 or if method is set to "extrapolate_constant" or "stepfunction". Defaults to np.nan. Returns ------- np.array interpolated (and extrapolated) values or frequencies for given test frequencies or test values, respectively. Raises ------ ValueError If both test frequencies and test values are given or none of them. """ # check that only test frequencies or only test values are given if test_frequency is not None and test_values is not None: raise ValueError("Both test frequencies and test values are given.") elif test_frequency is None and test_values is None: raise ValueError("No test values or frequencies are given.") # sort values and frequencies sorted_idxs = np.argsort(values) values = np.squeeze(values[sorted_idxs]) frequency = frequency[sorted_idxs] # group similar values together frequency, values = group_frequency(frequency, values) # transform frequencies to cummulative frequencies frequency = np.cumsum(frequency[::-1])[::-1] # if test frequencies are provided if test_frequency is not None: if method == "stepfunction": return stepfunction_ev( test_frequency, frequency[::-1], values[::-1], y_threshold=value_threshold, y_asymptotic=y_asymptotic, ) else: extrapolation = None if method == "interpolate" else method return interpolate_ev( test_frequency, frequency[::-1], values[::-1], logx=log_frequency, logy=log_values, y_threshold=value_threshold, extrapolation=extrapolation, y_asymptotic=y_asymptotic, ) # if test values are provided else: if method == "stepfunction": return stepfunction_ev( test_values, values, frequency, x_threshold=value_threshold, y_asymptotic=y_asymptotic, ) else: extrapolation = None if method == "interpolate" else method return interpolate_ev( test_values, values, frequency, logx=log_values, logy=log_frequency, x_threshold=value_threshold, extrapolation=extrapolation, )
[docs] def interpolate_ev( x_test, x_train, y_train, logx=False, logy=False, x_threshold=None, y_threshold=None, extrapolation=None, y_asymptotic=np.nan, ): """ Util function to interpolate (and extrapolate) training data (x_train, y_train) to new points x_test with several options (log scale, thresholds) Parameters ---------- x_test : array_like 1-D array of x-values for which training data should be interpolated x_train : array_like 1-D array of x-values of training data y_train : array_like 1-D array of y-values of training data logx : bool, optional If set to True, x_values are converted to log scale. Defaults to False. logy : bool, optional If set to True, y_values are converted to log scale. Defaults to False. x_threshold : float, optional Lower threshold to filter x_train. Defaults to None. y_threshold : float, optional Lower threshold to filter y_train. Defaults to None. extrapolation : str, optional If set to 'extrapolate', values will be extrapolated. If set to 'extrapolate_constant', x_test values smaller than x_train will be assigned y_train[0] (x_train must be sorted in ascending order), and x_test values larger than x_train will be assigned y_asymptotic. If set to None, x_test values outside of the range of x_train will be assigned np.nan. Defaults to None. y_asymptotic : float, optional Has no effect if extrapolation is None. Else, provides return value and if for x_test values larger than x_train, for x_train.size < 2 or if extrapolation is set to 'extrapolate_constant'. Defaults to np.nan. Returns ------- np.array interpolated values y_test for the test points x_test """ # preprocess interpolation data x_test, x_train, y_train = _preprocess_interpolation_data( x_test, x_train, y_train, logx, logy, x_threshold, y_threshold ) # handle case of small training data sizes if x_train.size < 2: if not extrapolation: return np.full_like(x_test, np.nan) return _interpolate_small_input(x_test, x_train, y_train, logy, y_asymptotic) # calculate fill values if extrapolation == "extrapolate": fill_value = "extrapolate" elif extrapolation == "extrapolate_constant": if not all(sorted(x_train) == x_train): raise ValueError("x_train array must be sorted in ascending order.") fill_value = (y_train[0], np.log10(y_asymptotic) if logy else y_asymptotic) else: fill_value = np.nan interpolation = interpolate.interp1d( x_train, y_train, fill_value=fill_value, bounds_error=False ) y_test = interpolation(x_test) # adapt output scale if logy: y_test = np.power(10.0, y_test) return y_test
[docs] def stepfunction_ev( x_test, x_train, y_train, x_threshold=None, y_threshold=None, y_asymptotic=np.nan ): """ Util function to interpolate and extrapolate training data (x_train, y_train) to new points x_test using a step function Parameters ---------- x_test : array_like 1-D array of x-values for which training data should be interpolated x_train : array_like 1-D array of x-values of training data y_train : array_like 1-D array of y-values of training data x_threshold : float, optional Lower threshold to filter x_train. Defaults to None. y_threshold : float, optional Lower threshold to filter y_train. Defaults to None. y_asymptotic : float, optional Return value if x_test > x_train. Defaults to np.nan. Returns ------- np.array interpolated values y_test for the test points x_test """ # preprocess interpolation data x_test, x_train, y_train = _preprocess_interpolation_data( x_test, x_train, y_train, None, None, x_threshold, y_threshold ) # handle case of small training data sizes if x_train.size < 2: return _interpolate_small_input(x_test, x_train, y_train, None, y_asymptotic) # find indices of x_test if sorted into x_train if not all(sorted(x_train) == x_train): raise ValueError("Input array x_train must be sorted in ascending order.") indx = np.searchsorted(x_train, x_test) y_test = y_train[indx.clip(max=len(x_train) - 1)] y_test[indx == len(x_train)] = y_asymptotic return y_test
def _preprocess_interpolation_data( x_test, x_train, y_train, logx, logy, x_threshold, y_threshold ): """ helper function to preprocess interpolation training and test data by filtering data below thresholds and converting to log scale if required """ if x_train.shape != y_train.shape: raise ValueError( f"Incompatible shapes of input data, x_train {x_train.shape} " f"and y_train {y_train.shape}. Should be the same" ) # transform input to float arrays x_test, x_train, y_train = ( np.array(x_test).astype(float), np.array(x_train).astype(float), np.array(y_train).astype(float), ) # cut x and y above threshold if x_threshold or x_threshold == 0: x_th = np.asarray(x_train > x_threshold).squeeze() x_train = x_train[x_th] y_train = y_train[x_th] if y_threshold or y_threshold == 0: y_th = np.asarray(y_train > y_threshold).squeeze() x_train = x_train[y_th] y_train = y_train[y_th] # convert to log scale if logx: x_train, x_test = np.log10(x_train), np.log10(x_test) if logy: y_train = np.log10(y_train) return (x_test, x_train, y_train) def _interpolate_small_input(x_test, x_train, y_train, logy, y_asymptotic): """ helper function to handle if interpolation data is small (empty or one point) """ # return y_asymptotic if x_train and y_train empty if x_train.size == 0: return np.full_like(x_test, y_asymptotic) # reconvert logarithmic y_train to original y_train if logy: y_train = np.power(10.0, y_train) # if only one (x_train, y_train), return stepfunction with # y_train if x_test < x_train and y_asymtotic if x_test > x_train y_test = np.full_like(x_test, y_train[0]) y_test[np.squeeze(x_test) > np.squeeze(x_train)] = y_asymptotic return y_test
[docs] def group_frequency(frequency, value, n_sig_dig=2): """ Util function to aggregate (add) frequencies for equal values Parameters ---------- frequency : array_like Frequency array value : array_like Value array in ascending order n_sig_dig : int number of significant digits for value when grouping frequency. Defaults to 2. Returns ------- tuple (frequency array after aggregation, unique value array in ascending order) """ frequency, value = np.array(frequency), np.array(value) if frequency.size == 0 and value.size == 0: return ([], []) # round values and group them value = round_to_sig_digits(value, n_sig_dig) value_unique, start_indices = np.unique(value, return_index=True) if value_unique.size != frequency.size: if not all(sorted(start_indices) == start_indices): raise ValueError("Value array must be sorted in ascending order.") # add frequency for equal value start_indices = np.insert(start_indices, value_unique.size, frequency.size) frequency = np.add.reduceat(frequency, start_indices[:-1]) return frequency, value_unique return frequency, value
[docs] def round_to_sig_digits(x, n_sig_dig): """round each element array to a number of significant digits Parameters ---------- x : array-like array to be rounded n_sig_dig : int number of significant digits. Returns ------- np.array rounded array """ x = np.asarray(x) x_positive = np.where(np.isfinite(x) & (x != 0), np.abs(x), 10 ** (n_sig_dig - 1)) mags = 10 ** (n_sig_dig - 1 - np.floor(np.log10(x_positive))) return np.round(x * mags) / mags