Source code for climada.engine.impact_data

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Functions to merge EMDAT damages to hazard events.
"""
import logging
import pickle
from datetime import datetime
from pathlib import Path
import pandas as pd
import numpy as np
from cartopy.io import shapereader

from climada.util.finance import gdp
from climada.util.constants import DEF_CRS
import climada.util.coordinates as u_coord
from climada.engine import Impact

LOGGER = logging.getLogger(__name__)

PERIL_SUBTYPE_MATCH_DICT = dict(TC=['Tropical cyclone'],
                                FL=['Coastal flood'],
                                EQ=['Ground movement', 'Earthquake'],
                                RF=['Riverine flood', 'Flood'],
                                WS=['Extra-tropical storm', 'Storm'],
                                DR=['Drought'],
                                LS=['Landslide'],
                                BF=['Forest fire', 'Wildfire', 'Land fire (Brush, Bush, Pastur']
                                )

PERIL_TYPE_MATCH_DICT = dict(DR=['Drought'],
                             EQ=['Earthquake'],
                             FL=['Flood'],
                             LS=['Landslide'],
                             VQ=['Volcanic activity'],
                             BF=['Wildfire'],
                             HW=['Extreme temperature']
                             )

VARNAMES_EMDAT = \
    {2018: {'Dis No': 'Disaster No.',
            'Disaster Type': 'Disaster type',
            'Disaster Subtype': 'Disaster subtype',
            'Event Name': 'Disaster name',
            'Country': 'Country',
            'ISO': 'ISO',
            'Location': 'Location',
            'Associated Dis': 'Associated disaster',
            'Associated Dis2': 'Associated disaster2',
            'Dis Mag Value': 'Magnitude value',
            'Dis Mag Scale': 'Magnitude scale',
            'Latitude': 'Latitude',
            'Longitude': 'Longitude',
            'Total Deaths': 'Total deaths',
            'Total Affected': 'Total affected',
            "Insured Damages ('000 US$)": "Insured losses ('000 US$)",
            "Total Damages ('000 US$)": "Total damage ('000 US$)"},
     2020: {'Dis No': 'Dis No',
            'Year': 'Year',
            'Seq': 'Seq',
            'Disaster Group': 'Disaster Group',
            'Disaster Subgroup': 'Disaster Subgroup',
            'Disaster Type': 'Disaster Type',
            'Disaster Subtype': 'Disaster Subtype',
            'Disaster Subsubtype': 'Disaster Subsubtype',
            'Event Name': 'Event Name',
            'Entry Criteria': 'Entry Criteria',
            'Country': 'Country',
            'ISO': 'ISO',
            'Region': 'Region',
            'Continent': 'Continent',
            'Location': 'Location',
            'Origin': 'Origin',
            'Associated Dis': 'Associated Dis',
            'Associated Dis2': 'Associated Dis2',
            'OFDA Response': 'OFDA Response',
            'Appeal': 'Appeal',
            'Declaration': 'Declaration',
            'Aid Contribution': 'Aid Contribution',
            'Dis Mag Value': 'Dis Mag Value',
            'Dis Mag Scale': 'Dis Mag Scale',
            'Latitude': 'Latitude',
            'Longitude': 'Longitude',
            'Local Time': 'Local Time',
            'River Basin': 'River Basin',
            'Start Year': 'Start Year',
            'Start Month': 'Start Month',
            'Start Day': 'Start Day',
            'End Year': 'End Year',
            'End Month': 'End Month',
            'End Day': 'End Day',
            'Total Deaths': 'Total Deaths',
            'No Injured': 'No Injured',
            'No Affected': 'No Affected',
            'No Homeless': 'No Homeless',
            'Total Affected': 'Total Affected',
            "Reconstruction Costs ('000 US$)": "Reconstruction Costs ('000 US$)",
            "Insured Damages ('000 US$)": "Insured Damages ('000 US$)",
            "Total Damages ('000 US$)": "Total Damages ('000 US$)",
            'CPI': 'CPI'},
     2023: {'Dis No': 'Dis No',
            'Year': 'Year',
            'Seq': 'Seq',
            'Glide': 'Glide',
            'Disaster Group': 'Disaster Group',
            'Disaster Subgroup': 'Disaster Subgroup',
            'Disaster Type': 'Disaster Type',
            'Disaster Subtype': 'Disaster Subtype',
            'Disaster Subsubtype': 'Disaster Subsubtype',
            'Event Name': 'Event Name',
            'Country': 'Country',
            'ISO': 'ISO',
            'Region': 'Region',
            'Continent': 'Continent',
            'Location': 'Location',
            'Origin': 'Origin',
            'Associated Dis': 'Associated Dis',
            'Associated Dis2': 'Associated Dis2',
            'OFDA Response': 'OFDA Response',
            'Appeal': 'Appeal',
            'Declaration': 'Declaration',
            "AID Contribution ('000 US$)": "AID Contribution ('000 US$)",
            'Dis Mag Value': 'Dis Mag Value',
            'Dis Mag Scale': 'Dis Mag Scale',
            'Latitude': 'Latitude',
            'Longitude': 'Longitude',
            'Local Time': 'Local Time',
            'River Basin': 'River Basin',
            'Start Year': 'Start Year',
            'Start Month': 'Start Month',
            'Start Day': 'Start Day',
            'End Year': 'End Year',
            'End Month': 'End Month',
            'End Day': 'End Day',
            'Total Deaths': 'Total Deaths',
            'No Injured': 'No Injured',
            'No Affected': 'No Affected',
            'No Homeless': 'No Homeless',
            'Total Affected': 'Total Affected',
            "Reconstruction Costs ('000 US$)": "Reconstruction Costs ('000 US$)",
            "Reconstruction Costs, Adjusted ('000 US$)": "Reconstruction Costs, Adjusted ('000 US$)",
            "Insured Damages ('000 US$)": "Insured Damages ('000 US$)",
            "Insured Damages, Adjusted ('000 US$)": "Insured Damages, Adjusted ('000 US$)",
            "Total Damages ('000 US$)": "Total Damages ('000 US$)",
            "Total Damages, Adjusted ('000 US$)": "Total Damages, Adjusted ('000 US$)",
            'CPI': 'CPI',
            'Adm Level': 'Adm Level',
            'Admin1 Code': 'Admin1 Code',
            'Admin2 Code': 'Admin2 Code',
            'Geo Locations': 'Geo Locations'}}


[docs] def assign_hazard_to_emdat(certainty_level, intensity_path_haz, names_path_haz, reg_id_path_haz, date_path_haz, emdat_data, start_time, end_time, keep_checks=False): """assign_hazard_to_emdat: link EMdat event to hazard Parameters ---------- certainty_level : str 'high' or 'low' intensity_path_haz : sparse matrix with hazards as rows and grid points as cols, values only at location with impacts names_path_haz : str identifier for each hazard (i.e. IBtracID) (rows of the matrix) reg_id_path_haz : str ISO country ID of each grid point (cols of the matrix) date_path_haz : str start date of each hazard (rows of the matrix) emdat_data: pd.DataFrame dataframe with EMdat data start_time : str start date of events to be assigned 'yyyy-mm-dd' end_time : str end date of events to be assigned 'yyyy-mm-dd' keep_checks : bool, optional Returns ------- pd.dataframe with EMdat entries linked to a hazard """ # check valid certainty level certainty_levels = ['high', 'low'] if certainty_level not in certainty_levels: raise ValueError("Invalid certainty level. Expected one of: %s" % certainty_levels) # prepare hazard set print("Start preparing hazard set") hit_countries = hit_country_per_hazard(intensity_path_haz, names_path_haz, reg_id_path_haz, date_path_haz) # prepare damage set # adjust emdat_data to the path!! print("Start preparing damage set") lookup = create_lookup(emdat_data, start_time, end_time, disaster_subtype='Tropical cyclone') # calculate possible hits print("Calculate possible hits") hit5 = emdat_possible_hit(lookup=lookup, hit_countries=hit_countries, delta_t=5) hit5_match = match_em_id(lookup=lookup, poss_hit=hit5) print("1/5") hit10 = emdat_possible_hit(lookup=lookup, hit_countries=hit_countries, delta_t=10) hit10_match = match_em_id(lookup=lookup, poss_hit=hit10) print("2/5") hit15 = emdat_possible_hit(lookup=lookup, hit_countries=hit_countries, delta_t=15) hit15_match = match_em_id(lookup=lookup, poss_hit=hit15) print("3/5") hit25 = emdat_possible_hit(lookup=lookup, hit_countries=hit_countries, delta_t=25) hit25_match = match_em_id(lookup=lookup, poss_hit=hit25) print("4/5") hit50 = emdat_possible_hit(lookup=lookup, hit_countries=hit_countries, delta_t=50) hit50_match = match_em_id(lookup=lookup, poss_hit=hit50) print("5/5") # assign only tracks with high certainty print("Assign tracks") if certainty_level == 'high': lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit10_match, possible_tracks_2=hit50_match, level=1) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit15_match, possible_tracks_2=hit50_match, level=2) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit25_match, possible_tracks_2=hit50_match, level=3) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit10_match, possible_tracks_2=hit25_match, level=4) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit15_match, possible_tracks_2=hit25_match, level=5) # assign all tracks elif certainty_level == 'low': lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit5_match, possible_tracks_2=hit50_match, level=1) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit10_match, possible_tracks_2=hit50_match, level=2) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit15_match, possible_tracks_2=hit50_match, level=3) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit5_match, possible_tracks_2=hit25_match, level=4) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit10_match, possible_tracks_2=hit25_match, level=5) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit15_match, possible_tracks_2=hit25_match, level=6) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit5_match, possible_tracks_2=hit15_match, level=7) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit10_match, possible_tracks_2=hit15_match, level=8) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit5_match, possible_tracks_2=hit10_match, level=9) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit15_match, possible_tracks_2=hit15_match, level=10) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit10_match, possible_tracks_2=hit10_match, level=11) lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit5_match, possible_tracks_2=hit5_match, level=12) if not keep_checks: lookup = lookup.drop(['Date_start_EM_ordinal', 'possible_track', 'possible_track_all'], axis=1) lookup.groupby('allocation_level').count() print('(%d/%s) tracks allocated' % ( len(lookup[lookup.allocation_level.notnull()]), len(lookup))) return lookup
[docs] def hit_country_per_hazard(intensity_path, names_path, reg_id_path, date_path): """hit_country_per_hazard: create list of hit countries from hazard set Parameters ---------- intensity_path : str Path to file containing sparse matrix with hazards as rows and grid points as cols, values only at location with impacts names_path : str Path to file with identifier for each hazard (i.e. IBtracID) (rows of the matrix) reg_id_path : str Path to file with ISO country ID of each grid point (cols of the matrix) date_path : str Path to file with start date of each hazard (rows of the matrix) Returns ------- pd.DataFrame with all hit countries per hazard """ with open(intensity_path, 'rb') as filef: inten = pickle.load(filef) with open(names_path, 'rb') as filef: names = pickle.load(filef) with open(reg_id_path, 'rb') as filef: reg_id = pickle.load(filef) with open(date_path, 'rb') as filef: date = pickle.load(filef) # loop over the tracks (over the rows of the intensity matrix) all_hits = [] for track in range(0, len(names)): # select track tc_track = inten[track, ] # select only indices that are not zero hits = tc_track.nonzero()[1] # get the country of these indices and remove dublicates hits = list(set(reg_id[hits])) # append hit countries to list all_hits.append(hits) # create data frame for output hit_countries = pd.DataFrame(columns=['hit_country', 'Date_start', 'ibtracsID']) for track, _ in enumerate(names): # Check if track has hit any country else go to the next track if len(all_hits[track]) > 0: # loop over hit_country for hit in range(0, len(all_hits[track])): # Hit country ISO ctry_iso = u_coord.country_to_iso(all_hits[track][hit], "alpha3") # create entry for each country a hazard has hit hit_countries = hit_countries.append({'hit_country': ctry_iso, 'Date_start': date[track], 'ibtracsID': names[track]}, ignore_index=True) # retrun data frame with all hit countries per hazard return hit_countries
[docs] def create_lookup(emdat_data, start, end, disaster_subtype='Tropical cyclone'): """create_lookup: prepare a lookup table of EMdat events to which hazards can be assigned Parameters ---------- emdat_data: pd.DataFrame with EMdat data start : str start date of events to be assigned 'yyyy-mm-dd' end : str end date of events to be assigned 'yyyy-mm-dd' disaster_subtype : str EMdat disaster subtype Returns ------- pd.DataFrame """ data = emdat_data[emdat_data['Disaster_subtype'] == disaster_subtype] lookup = pd.DataFrame(columns=['hit_country', 'Date_start_EM', 'Date_start_EM_ordinal', 'Disaster_name', 'EM_ID', 'ibtracsID', 'allocation_level', 'possible_track', 'possible_track_all']) lookup.hit_country = data.ISO lookup.Date_start_EM = data.Date_start_clean lookup.Disaster_name = data.Disaster_name lookup.EM_ID = data.Disaster_No lookup = lookup.reset_index(drop=True) # create ordinals for i in range(0, len(data.Date_start_clean.values)): lookup.Date_start_EM_ordinal[i] = datetime.toordinal( datetime.strptime(lookup.Date_start_EM.values[i], '%Y-%m-%d')) # ordinals to numeric lookup.Date_start_EM_ordinal = pd.to_numeric(lookup.Date_start_EM_ordinal) # select time emdat_start = datetime.toordinal(datetime.strptime(start, '%Y-%m-%d')) emdat_end = datetime.toordinal(datetime.strptime(end, '%Y-%m-%d')) lookup = lookup[lookup.Date_start_EM_ordinal.values > emdat_start] lookup = lookup[lookup.Date_start_EM_ordinal.values < emdat_end] return lookup
# Function to relate EM disaster to IBtrack using hit countries and time
[docs] def emdat_possible_hit(lookup, hit_countries, delta_t): """relate EM disaster to hazard using hit countries and time Parameters ---------- lookup : pd.DataFrame to relate EMdatID to hazard delta_t : max time difference of start of EMdat event and hazard hit_countries: Returns ------- list with possible hits """ # lookup: PD dataframe that relates EMdatID to an IBtracsID # tracks: processed IBtracks with info which track hit which country # delta_t: time difference of start of EMdat and IBrtacks possible_hit_all = [] for i in range(0, len(lookup.EM_ID.values)): possible_hit = [] country_tracks = hit_countries[ hit_countries['hit_country'] == lookup.hit_country.values[i]] for j in range(0, len(country_tracks.Date_start.values)): if (lookup.Date_start_EM_ordinal.values[i] - country_tracks.Date_start.values[j]) < \ delta_t and (lookup.Date_start_EM_ordinal.values[i] - country_tracks.Date_start.values[j]) >= 0: possible_hit.append(country_tracks.ibtracsID.values[j]) possible_hit_all.append(possible_hit) return possible_hit_all
# function to check if EM_ID has been assigned already
[docs] def match_em_id(lookup, poss_hit): """function to check if EM_ID has been assigned already and combine possible hits Parameters ---------- lookup : pd.dataframe to relate EMdatID to hazard poss_hit : list with possible hits Returns ------- list with all possible hits per EMdat ID """ possible_hit_all = [] for i in range(0, len(lookup.EM_ID.values)): possible_hit = [] # lookup without line i #lookup_match = lookup.drop(i) lookup_match = lookup # Loop over check if EM dat ID is the same for i_match in range(0, len(lookup_match.EM_ID.values)): if lookup.EM_ID.values[i] == lookup_match.EM_ID.values[i_match]: possible_hit.append(poss_hit[i]) possible_hit_all.append(possible_hit) return possible_hit_all
[docs] def assign_track_to_em(lookup, possible_tracks_1, possible_tracks_2, level): """function to assign a hazard to an EMdat event to get some confidene into the procedure, hazards get only assigned if there is no other hazard occuring at a bigger time interval in that country Thus a track of possible_tracks_1 gets only assigned if there are no other tracks in possible_tracks_2. The confidence can be expressed with a certainty level Parameters ---------- lookup : pd.DataFrame to relate EMdatID to hazard possible_tracks_1 : list list of possible hits with smaller time horizon possible_tracks_2 : list list of possible hits with larger time horizon level : int level of confidence Returns ------- pd.DataFrame lookup with assigend tracks and possible hits """ for i, _ in enumerate(possible_tracks_1): if np.isnan(lookup.allocation_level.values[i]): number_emdat_id = len(possible_tracks_1[i]) # print(number_emdat_id) for j in range(0, number_emdat_id): # check that number of possible track stays the same at given # time difference and that list is not empty if len(possible_tracks_1[i][j]) == len(possible_tracks_2[i][j]) == 1 \ and possible_tracks_1[i][j] != []: # check that all tracks are the same if all(possible_tracks_1[i][0] == possible_tracks_1[i][k] for k in range(0, len(possible_tracks_1[i]))): # check that track ID has not been assigned to that country already ctry_lookup = lookup[lookup['hit_country'] == lookup.hit_country.values[i]] if possible_tracks_1[i][0][0] not in ctry_lookup.ibtracsID.values: lookup.ibtracsID.values[i] = possible_tracks_1[i][0][0] lookup.allocation_level.values[i] = level elif possible_tracks_1[i][j] != []: lookup.possible_track.values[i] = possible_tracks_1[i] else: lookup.possible_track_all.values[i] = possible_tracks_1[i] return lookup
[docs] def check_assigned_track(lookup, checkset): """compare lookup with assigned tracks to a set with checked sets Parameters ---------- lookup: pd.DataFrame dataframe to relate EMdatID to hazard checkset: pd.DataFrame dataframe with already checked hazards Returns ------- error scores """ # merge checkset and lookup check = pd.merge(checkset, lookup[['hit_country', 'EM_ID', 'ibtracsID']], on=['hit_country', 'EM_ID']) check_size = len(check.ibtracsID.values) # not assigned values not_assigned = check.ibtracsID.isnull().sum(axis=0) # correct assigned values correct = sum(check.ibtracsID.values == check.IBtracsID_checked.values) # wrongly assigned values wrong = len(check.ibtracsID.values) - not_assigned - correct print('%.1f%% tracks assigned correctly, %.1f%% wrongly, %.1f%% not assigned' % (correct / check_size * 100, wrong / check_size * 100, not_assigned / check_size * 100))
[docs] def clean_emdat_df(emdat_file, countries=None, hazard=None, year_range=None, target_version=None): """ Get a clean and standardized DataFrame from EM-DAT-CSV-file (1) load EM-DAT data from CSV to DataFrame and remove header/footer, (2) handle version, clean up, and add columns, and (3) filter by country, hazard type and year range (if any given) Parameters ---------- emdat_file : str, Path, or DataFrame Either string with full path to CSV-file or pandas.DataFrame loaded from EM-DAT CSV countries : list of str country ISO3-codes or names, e.g. ['JAM', 'CUB']. countries=None for all countries (default) hazard : list or str List of Disaster (sub-)type accordung EMDAT terminology, i.e.: Animal accident, Drought, Earthquake, Epidemic, Extreme temperature, Flood, Fog, Impact, Insect infestation, Landslide, Mass movement (dry), Storm, Volcanic activity, Wildfire; Coastal Flooding, Convective Storm, Riverine Flood, Tropical cyclone, Tsunami, etc.; OR CLIMADA hazard type abbreviations, e.g. TC, BF, etc. year_range : list or tuple Year range to be extracted, e.g. (2000, 2015); (only min and max are considered) target_version : int required EM-DAT data format version (i.e. year of download), changes naming of columns/variables, default: newest available version in ``VARNAMES_EMDAT`` that matches the given emdat_file Returns ------- df_data : pd.DataFrame DataFrame containing cleaned and filtered EM-DAT impact data """ # (1) load EM-DAT data from CSV to DataFrame, skipping the header: if isinstance(emdat_file, (str, Path)): df_emdat = pd.read_csv(emdat_file, encoding="ISO-8859-1", header=0) counter = 0 while not ('Country' in df_emdat.columns and 'ISO' in df_emdat.columns): counter += 1 df_emdat = pd.read_csv(emdat_file, encoding="ISO-8859-1", header=counter) if counter == 10: break del counter elif isinstance(emdat_file, pd.DataFrame): df_emdat = emdat_file else: raise TypeError('emdat_file needs to be str or DataFrame') # drop rows with 9 or more NaN values (e.g. footer): df_emdat = df_emdat.dropna(thresh=9) # (2) handle version, clean up, and add columns: # (2.1) identify underlying EMDAT version of csv: version = None for vers in sorted(VARNAMES_EMDAT.keys()): if len(df_emdat.columns) >= len(VARNAMES_EMDAT[vers]) and \ all(item in list(df_emdat.columns) for item in VARNAMES_EMDAT[vers].values()): version = vers if not version: raise ValueError("the given emdat_file contains unexpected columns and cannot be" " associated with any known EM-DAT file structure") # (2.2) create new DataFrame df_data with column names as target version target_version = target_version or version df_data = pd.DataFrame(index=df_emdat.index.values, columns=VARNAMES_EMDAT[target_version].values()) if 'Year' not in df_data.columns: # make sure column "Year" exists df_data['Year'] = np.nan for _, col in enumerate(df_data.columns): # loop over columns if col in VARNAMES_EMDAT[version]: df_data[col] = df_emdat[VARNAMES_EMDAT[version][col]] elif col in df_emdat.columns: df_data[col] = df_emdat[col] elif col == 'Year' and version <= 2018: years_list = list() for _, disaster_no in enumerate(df_emdat[VARNAMES_EMDAT[version]['Dis No']]): if isinstance(disaster_no, str): years_list.append(int(disaster_no[0:4])) else: years_list.append(np.nan) df_data[col] = years_list if version <= 2018 and target_version >= 2020: # create 'Start Year', -Month' and -Day' from 'Start date' # ignore 'End date' # replace NaN with None in 'Disaster Subtype', 'Disaster Type' and 'Country' date_list = list() year_list = list() month_list = list() day_list = list() for year in list(df_data['Year']): if not np.isnan(year): date_list.append(datetime.strptime(str(year), '%Y')) else: date_list.append(datetime.strptime(str('0001'), '%Y')) boolean_warning = True for idx, datestr in enumerate(list(df_emdat['Start date'])): try: date_list[idx] = datetime.strptime(datestr[-7:], '%m/%Y') except ValueError: if boolean_warning: LOGGER.warning('EM_DAT CSV contains invalid time formats') boolean_warning = False try: date_list[idx] = datetime.strptime(datestr, '%d/%m/%Y') except ValueError: if boolean_warning: LOGGER.warning('EM_DAT CSV contains invalid time formats') boolean_warning = False day_list.append(date_list[idx].day) month_list.append(date_list[idx].month) year_list.append(date_list[idx].year) df_data['Start Month'] = np.array(month_list, dtype='int') df_data['Start Day'] = np.array(day_list, dtype='int') df_data['Start Year'] = np.array(year_list, dtype='int') for var in ['Disaster Subtype', 'Disaster Type', 'Country']: df_data[VARNAMES_EMDAT[target_version][var]].fillna('None', inplace=True) # (3) Filter by countries, year range, and disaster type # (3.1) Countries: if countries and isinstance(countries, str): countries = [countries] if countries and isinstance(countries, list): for idx, country in enumerate(countries): # convert countries to iso3 alpha code: countries[idx] = u_coord.country_to_iso(country, "alpha3") df_data = df_data[df_data['ISO'].isin(countries)].reset_index(drop=True) # (3.2) Year range: if year_range: for idx in df_data.index: if np.isnan(df_data.loc[0, 'Year']): df_data.loc[0, 'Year'] = \ df_data.loc[0, VARNAMES_EMDAT[target_version]['Start Year']] df_data = df_data[(df_data['Year'] >= min(year_range)) & (df_data['Year'] <= max(year_range))] # (3.3) Disaster type: if hazard and isinstance(hazard, str): hazard = [hazard] if hazard and isinstance(hazard, list): disaster_types = list() disaster_subtypes = list() for idx, haz in enumerate(hazard): if haz in df_data[VARNAMES_EMDAT[target_version]['Disaster Type']].unique(): disaster_types.append(haz) if haz in df_data[VARNAMES_EMDAT[target_version]['Disaster Subtype']].unique(): disaster_subtypes.append(haz) if haz in PERIL_TYPE_MATCH_DICT.keys(): disaster_types += PERIL_TYPE_MATCH_DICT[haz] if haz in PERIL_SUBTYPE_MATCH_DICT.keys(): disaster_subtypes += PERIL_SUBTYPE_MATCH_DICT[haz] df_data = df_data[ (df_data[VARNAMES_EMDAT[target_version]['Disaster Type']].isin(disaster_types)) | (df_data[VARNAMES_EMDAT[target_version]['Disaster Subtype']].isin(disaster_subtypes))] return df_data.reset_index(drop=True)
[docs] def emdat_countries_by_hazard(emdat_file_csv, hazard=None, year_range=None): """return list of all countries exposed to a chosen hazard type from EMDAT data as CSV. Parameters ---------- emdat_file : str, Path, or DataFrame Either string with full path to CSV-file or pandas.DataFrame loaded from EM-DAT CSV hazard : list or str List of Disaster (sub-)type accordung EMDAT terminology, i.e.: Animal accident, Drought, Earthquake, Epidemic, Extreme temperature, Flood, Fog, Impact, Insect infestation, Landslide, Mass movement (dry), Storm, Volcanic activity, Wildfire; Coastal Flooding, Convective Storm, Riverine Flood, Tropical cyclone, Tsunami, etc.; OR CLIMADA hazard type abbreviations, e.g. TC, BF, etc. year_range : list or tuple Year range to be extracted, e.g. (2000, 2015); (only min and max are considered) Returns ------- countries_iso3a : list List of ISO3-codes of countries impacted by the disaster (sub-)types countries_names : list List of names of countries impacted by the disaster (sub-)types """ df_data = clean_emdat_df(emdat_file_csv, hazard=hazard, year_range=year_range) countries_iso3a = list(df_data.ISO.unique()) countries_names = list() for iso3a in countries_iso3a: try: countries_names.append(u_coord.country_to_iso(iso3a, "name")) except LookupError: countries_names.append('NA') return countries_iso3a, countries_names
[docs] def scale_impact2refyear(impact_values, year_values, iso3a_values, reference_year=None): """Scale give impact values proportional to GDP to the according value in a reference year (for normalization of monetary values) Parameters ---------- impact_values : list or array Impact values to be scaled. year_values : list or array Year of each impact (same length as impact_values) iso3a_values : list or array ISO3alpha code of country for each impact (same length as impact_values) reference_year : int, optional Impact is scaled proportional to GDP to the value of the reference year. No scaling for reference_year=None (default) """ impact_values = np.array(impact_values) year_values = np.array(year_values) iso3a_values = np.array(iso3a_values) if reference_year and isinstance(reference_year, (int, float)): reference_year = int(reference_year) gdp_ref = dict() gdp_years = dict() for country in np.unique(iso3a_values): # get reference GDP value for each country: gdp_ref[country] = gdp(country, reference_year)[1] # get GDP value for each country and year: gdp_years[country] = dict() years_country = np.unique(year_values[iso3a_values == country]) print(years_country) for year in years_country: gdp_years[country][year] = gdp(country, year)[1] # loop through each value and apply scaling: for idx, val in enumerate(impact_values): impact_values[idx] = val * gdp_ref[iso3a_values[idx]] / \ gdp_years[iso3a_values[idx]][year_values[idx]] return list(impact_values) if not reference_year: return impact_values raise ValueError('Invalid reference_year')
[docs] def emdat_impact_yearlysum(emdat_file_csv, countries=None, hazard=None, year_range=None, reference_year=None, imp_str="Total Damages ('000 US$)", version=None): """function to load EM-DAT data and sum impact per year Parameters ---------- emdat_file_csv : str or DataFrame Either string with full path to CSV-file or pandas.DataFrame loaded from EM-DAT CSV countries : list of str country ISO3-codes or names, e.g. ['JAM', 'CUB']. countries=None for all countries (default) hazard : list or str List of Disaster (sub-)type accordung EMDAT terminology, i.e.: Animal accident, Drought, Earthquake, Epidemic, Extreme temperature, Flood, Fog, Impact, Insect infestation, Landslide, Mass movement (dry), Storm, Volcanic activity, Wildfire; Coastal Flooding, Convective Storm, Riverine Flood, Tropical cyclone, Tsunami, etc.; OR CLIMADA hazard type abbreviations, e.g. TC, BF, etc. year_range : list or tuple Year range to be extracted, e.g. (2000, 2015); (only min and max are considered) version : int, optional required EM-DAT data format version (i.e. year of download), changes naming of columns/variables, default: newest available version in ``VARNAMES_EMDAT`` Returns ------- out : pd.DataFrame DataFrame with summed impact and scaled impact per year and country. """ version = version or max(VARNAMES_EMDAT.keys()) imp_str = VARNAMES_EMDAT[version][imp_str] df_data = clean_emdat_df(emdat_file_csv, countries=countries, hazard=hazard, year_range=year_range, target_version=version) df_data[imp_str + " scaled"] = scale_impact2refyear(df_data[imp_str].values, df_data.Year.values, df_data.ISO.values, reference_year=reference_year) def country_df(df_data): for data_iso in df_data.ISO.unique(): country = u_coord.country_to_iso(data_iso, "alpha3") df_country = df_data.loc[df_data.ISO == country] if not df_country.size: continue # Retrieve impact data for all years all_years = np.arange(min(df_data.Year), max(df_data.Year) + 1) data_out = pd.DataFrame.from_records( [ ( year, np.nansum(df_country[df_country.Year.isin([year])][imp_str]), np.nansum( df_country[df_country.Year.isin([year])][ imp_str + " scaled" ] ), ) for year in all_years ], columns=["year", "impact", "impact_scaled"] ) # Add static data data_out["reference_year"] = reference_year data_out["ISO"] = country data_out["region_id"] = u_coord.country_to_iso(country, "numeric") # EMDAT provides damage data in 1000 USD if "000 US" in imp_str: data_out["impact"] = data_out["impact"] * 1e3 data_out["impact_scaled"] = data_out["impact_scaled"] * 1e3 yield data_out out = pd.concat(list(country_df(df_data)), ignore_index=True) return out
[docs] def emdat_impact_event(emdat_file_csv, countries=None, hazard=None, year_range=None, reference_year=None, imp_str="Total Damages ('000 US$)", version=None): """function to load EM-DAT data return impact per event Parameters ---------- emdat_file_csv : str or DataFrame Either string with full path to CSV-file or pandas.DataFrame loaded from EM-DAT CSV countries : list of str country ISO3-codes or names, e.g. ['JAM', 'CUB']. default: countries=None for all countries hazard : list or str List of Disaster (sub-)type accordung EMDAT terminology, i.e.: Animal accident, Drought, Earthquake, Epidemic, Extreme temperature, Flood, Fog, Impact, Insect infestation, Landslide, Mass movement (dry), Storm, Volcanic activity, Wildfire; Coastal Flooding, Convective Storm, Riverine Flood, Tropical cyclone, Tsunami, etc.; OR CLIMADA hazard type abbreviations, e.g. TC, BF, etc. year_range : list or tuple Year range to be extracted, e.g. (2000, 2015); (only min and max are considered) reference_year : int reference year of exposures. Impact is scaled proportional to GDP to the value of the reference year. Default: No scaling for 0 imp_str : str Column name of impact metric in EMDAT CSV, default = "Total Damages ('000 US$)" version : int, optional EM-DAT version to take variable/column names from, default: newest available version in ``VARNAMES_EMDAT`` Returns ------- out : pd.DataFrame EMDAT DataFrame with new columns "year", "region_id", and "impact" and +impact_scaled" total impact per event with same unit as chosen impact, but multiplied by 1000 if impact is given as 1000 US$ (e.g. imp_str="Total Damages ('000 US$) scaled"). """ version = version or max(VARNAMES_EMDAT.keys()) imp_str = VARNAMES_EMDAT[version][imp_str] df_data = clean_emdat_df(emdat_file_csv, hazard=hazard, year_range=year_range, countries=countries, target_version=version) df_data['year'] = df_data['Year'] df_data['reference_year'] = reference_year df_data['impact'] = df_data[imp_str] df_data['impact_scaled'] = scale_impact2refyear(df_data[imp_str].values, df_data.Year.values, df_data.ISO.values, reference_year=reference_year) df_data['region_id'] = np.nan for country in df_data.ISO.unique(): try: df_data.loc[df_data.ISO == country, 'region_id'] = \ u_coord.country_to_iso(country, "numeric") except LookupError: LOGGER.warning('ISO3alpha code not found in iso_country: %s', country) if '000 US' in imp_str: df_data['impact'] *= 1e3 df_data['impact_scaled'] *= 1e3 return df_data.reset_index(drop=True)
[docs] def emdat_to_impact(emdat_file_csv, hazard_type_climada, year_range=None, countries=None, hazard_type_emdat=None, reference_year=None, imp_str="Total Damages"): """function to load EM-DAT data return impact per event Parameters ---------- emdat_file_csv : str or pd.DataFrame Either string with full path to CSV-file or pandas.DataFrame loaded from EM-DAT CSV countries : list of str country ISO3-codes or names, e.g. ['JAM', 'CUB']. default: countries=None for all countries hazard_type_climada : str CLIMADA hazard type abbreviations, e.g. TC, BF, etc. hazard_type_emdat : list or str List of Disaster (sub-)type accordung EMDAT terminology, i.e.: Animal accident, Drought, Earthquake, Epidemic, Extreme temperature, Flood, Fog, Impact, Insect infestation, Landslide, Mass movement (dry), Storm, Volcanic activity, Wildfire; Coastal Flooding, Convective Storm, Riverine Flood, Tropical cyclone, Tsunami, etc. year_range : list or tuple Year range to be extracted, e.g. (2000, 2015); (only min and max are considered) reference_year : int reference year of exposures. Impact is scaled proportional to GDP to the value of the reference year. Default: No scaling for 0 imp_str : str Column name of impact metric in EMDAT CSV, default = "Total Damages ('000 US$)" Returns ------- impact_instance : climada.engine.Impact impact object of same format as output from CLIMADA impact computation. Values scaled with GDP to reference_year if reference_year is given. i.e. current US$ for imp_str="Total Damages ('000 US$) scaled" (factor 1000 is applied) impact_instance.eai_exp holds expected impact for each country (within 1/frequency_unit). impact_instance.coord_exp holds rough central coordinates for each country. countries : list of str ISO3-codes of countries in same order as in impact_instance.eai_exp """ if "Total Damages" in imp_str: imp_str = "Total Damages ('000 US$)" elif "Insured Damages" in imp_str: imp_str = "Insured Damages ('000 US$)" elif "Reconstruction Costs" in imp_str: imp_str = "Reconstruction Costs ('000 US$)" # use the newest version of EMDAT varnames version = max(VARNAMES_EMDAT.keys()) imp_str = VARNAMES_EMDAT[version][imp_str] if not hazard_type_emdat: hazard_type_emdat = [hazard_type_climada] if reference_year == 0: reference_year = None # Inititate Impact-instance: impact_instance = Impact(haz_type=hazard_type_climada) # Load EM-DAT impact data by event: em_data = emdat_impact_event(emdat_file_csv, countries=countries, hazard=hazard_type_emdat, year_range=year_range, reference_year=reference_year, imp_str=imp_str, version=version) if isinstance(countries, str): countries = [countries] elif not countries: countries = emdat_countries_by_hazard(emdat_file_csv, year_range=year_range, hazard=hazard_type_emdat)[0] if em_data.empty: return impact_instance, countries impact_instance.event_id = np.array(em_data.index, int) impact_instance.event_name = list( em_data[VARNAMES_EMDAT[version]['Dis No']]) date_list = list() for year in list(em_data['Year']): date_list.append(datetime.toordinal(datetime.strptime(str(year), '%Y'))) if 'Start Year' in em_data.columns and 'Start Month' in em_data.columns \ and 'Start Day' in em_data.columns: idx = 0 for year, month, day in zip(em_data['Start Year'], em_data['Start Month'], em_data['Start Day']): if np.isnan(year): idx += 1 continue if np.isnan(month): month = 1 if np.isnan(day): day = 1 date_list[idx] = datetime.toordinal(datetime.strptime( '%02i/%02i/%04i' % (day, month, year), '%d/%m/%Y')) idx += 1 impact_instance.date = np.array(date_list, int) impact_instance.crs = DEF_CRS if not reference_year: impact_instance.at_event = np.array(em_data["impact"]) else: impact_instance.at_event = np.array(em_data["impact_scaled"]) impact_instance.at_event[np.isnan(impact_instance.at_event)] = 0 if not year_range: year_range = [em_data['Year'].min(), em_data['Year'].max()] impact_instance.frequency = np.ones(em_data.shape[0]) / (1 + np.diff(year_range)) impact_instance.frequency_unit = '1/year' impact_instance.tot_value = 0 impact_instance.aai_agg = np.nansum(impact_instance.at_event * impact_instance.frequency) impact_instance.unit = 'USD' impact_instance.imp_mat = [] # init rough exposure with central point per country shp = shapereader.natural_earth(resolution='110m', category='cultural', name='admin_0_countries') shp = shapereader.Reader(shp) countries_reg_id = list() countries_lat = list() countries_lon = list() impact_instance.eai_exp = np.zeros(len(countries)) # empty: damage at exposure for idx, cntry in enumerate(countries): try: cntry = u_coord.country_to_iso(cntry, "alpha3") except LookupError: LOGGER.warning('Country not found in iso_country: %s', cntry) cntry_boolean = False for rec in shp.records(): if rec.attributes['ADM0_A3'].casefold() == cntry.casefold(): bbox = rec.geometry.bounds cntry_boolean = True break if cntry_boolean: countries_lat.append(np.mean([bbox[1], bbox[3]])) countries_lon.append(np.mean([bbox[0], bbox[2]])) else: countries_lat.append(np.nan) countries_lon.append(np.nan) try: countries_reg_id.append(u_coord.country_to_iso(cntry, "numeric")) except LookupError: countries_reg_id.append(0) df_tmp = em_data[em_data[VARNAMES_EMDAT[version]['ISO']].str.contains(cntry)] if not reference_year: impact_instance.eai_exp[idx] = sum(np.array(df_tmp["impact"]) * impact_instance.frequency[0]) else: impact_instance.eai_exp[idx] = sum(np.array(df_tmp["impact_scaled"]) * impact_instance.frequency[0]) impact_instance.coord_exp = np.stack([countries_lat, countries_lon], axis=1) return impact_instance, countries