Adaptation of Critical Infrastructure - Calculate Adaptations#

In this notebook, the goal is to calculate the risk of river flooding under a baseline scenario where no adaptation has happened, and compare it with what would the risk be if different adaptation levels were considered. The measures tested in this example include the following:

  • Level 1 - Hazard level adaptation: Build a floodwall

  • Level 2 - Asset level adaptation: Elevate railway as viaduct

  • Level 3 - Network level adaptation: Create new bridges between assets

  • Level 4 - System level adaptation: Reduce traffic and freight demand

1. Load Packages#

Hide code cell source

# HIDE CODE
import os
# os.chdir('path/to/book/directory')
os.chdir('..')
import sys
from pathlib import Path
sys.path.append(str(Path.cwd()))
from src.ci_adapt_utilities import *

data_path = Path(os.getcwd() + '/data')
interim_data_path = data_path / 'interim' / 'collected_flood_runs'

2. Load Default Configuration and Model Parameters#

config_file=Path(os.getcwd() + '/config_ci_adapt_test.ini')
hazard_type, infra_type, country_code, country_name, hazard_data_subfolders, asset_data, vulnerability_data = load_config(config_file)

3. Load Asset Data and Adaptation Cost Data#

This step includes also loading the results of the Risk Analysis under the baseline scenario, where no adaptation options are considered.

assets, geom_dict, _, return_period_dict, adaptation_unit_costs, rp_spec_priority, average_road_cost_per_ton_km, average_train_cost_per_ton_km, average_train_load_tons = startup_ci_adapt(data_path, config_file)

# Load data from baseline impact assessment
shortest_paths = pickle.load(open(data_path / 'interim' / 'indirect_damages' / 'shortest_paths.pkl', 'rb'))
disrupted_edges_by_basin = pickle.load(open(data_path / 'interim' / 'indirect_damages' / 'disrupted_edges_by_basin.pkl', 'rb'))
graph_r0 = pickle.load(open(data_path / 'interim' / 'indirect_damages' / 'graph_0.pkl', 'rb'))
disrupted_shortest_paths = pickle.load(open(data_path / 'interim' / 'indirect_damages' / 'disrupted_shortest_paths.pkl', 'rb'))
event_impacts = pickle.load(open(data_path / 'interim' / 'indirect_damages' / 'event_impacts.pkl', 'rb'))
full_flood_event=pickle.load(open(data_path / 'interim' / 'indirect_damages' / 'full_flood_event.pkl', 'rb'))
all_disrupted_edges = pickle.load(open(data_path / 'interim' / 'indirect_damages' / 'all_disrupted_edges.pkl', 'rb'))
collect_output = pickle.load(open(data_path / 'interim' / 'collected_flood_runs' / 'sample_collected_run.pkl', 'rb'))
print('Loaded data from baseline impact assessment')
graph_v0=create_virtual_graph(graph_r0)
graph_v=graph_v0.copy()
675 assets loaded.
Loaded data from baseline impact assessment
Creating virtual graph...
Success: only int type values

4. Define Adaptations#

adaptations={}
adaptations['baseline'] = {'l1_l2_adapt_path': None, 'added_links':[], 'l4_adapt_path': None}
adaptations['l1_trib'] = {'l1_l2_adapt_path': data_path/Path('input/adaptations/l1_tributary.geojson'), 'added_links':[], 'l4_adapt_path': None}
adaptations['l2_trib'] = {'l1_l2_adapt_path': data_path/Path('input/adaptations/l2_tributary.geojson'), 'added_links':[], 'l4_adapt_path': None}
adaptations['l3_trib'] = {'l1_l2_adapt_path': None, 'added_links':[(219651487, 111997047)], 'l4_adapt_path': None}
adaptations['l4_trib'] = {'l1_l2_adapt_path': None, 'added_links':[], 'l4_adapt_path': data_path/Path('input/adaptations/l4_tributary.geojson')}

# Other adaptations can be added
# adaptations['l1_rhine'] = {'l1_l2_adapt_path': data_path/r'input\adaptations\l1_rhine.geojson', 'added_links':[], 'l4_adapt_path': None}
# adaptations['l2_rhine'] = {'l1_l2_adapt_path': data_path/r'input\adaptations\l2_rhine.geojson', 'added_links':[], 'l4_adapt_path': None}
# adaptations['l3_rhine'] = {'l1_l2_adapt_path': None, 'added_links':[(112044105, 110947346)], 'l4_adapt_path': None}
# adaptations['l4_rhine'] = {'l1_l2_adapt_path': None, 'added_links':[], 'l4_adapt_path': data_path/r'input\adaptations\l4_rhine.geojson'}

5. Apply Adaptations and Recalculate Risk#

# Store results in dictionaries
direct_damages_adapted_dict = {}
indirect_damages_adapted_dict = {}
indirect_damages_adapted_full_dict = {}
adaptation_costs={}
adapted_assets_dict = {}

# Print adaptations that will be run
print(f"Processing {len(adaptations)} scenarios:")
for adapt_id in adaptations.keys():
    print('- ',adapt_id)

for adapt_id in adaptations.keys():
    adaptations_df_path = data_path / 'interim' / 'adaptations' / f'{adapt_id}_adaptations.csv'

    if adaptations_df_path.exists():
        print(f"Adaptation {adapt_id} already processed. Skipping.")
        continue

    # Reset graph
    graph_v=graph_v0.copy()

    # Load adaptations dictionary to the relevant variables
    l1_l2_adapt_path = adaptations[adapt_id]['l1_l2_adapt_path']
    added_links = adaptations[adapt_id]['added_links']  
    l4_adapt_path = adaptations[adapt_id]['l4_adapt_path']

    # Load adaptation data
    if l1_l2_adapt_path is not None:
        adapted_area = gpd.read_file(l1_l2_adapt_path).to_crs(3857)
    else:
        adapted_area = None
    if l4_adapt_path is not None:
        adapted_route_area = gpd.read_file(l4_adapt_path).to_crs(3857)
    else:
        adapted_route_area = None

    # Apply adaptations
    adapted_assets, adaptations_df, demand_reduction_dict, l3_adaptation_costs = apply_adaptations(adapted_area, assets, collect_output, interim_data_path, rp_spec_priority, adaptation_unit_costs, shortest_paths, graph_v, added_links, adapted_route_area)

    # Calculate l1 adaptation costs
    local_haz_path=Path(data_path / 'Floods/Germany/basin_intersections')
    l1_adaptation_costs=calculate_l1_costs(local_haz_path, interim_data_path, adapted_area, adaptation_unit_costs, adapted_assets) 

    # Run adapted damages for individual hazard maps
    direct_damages_adapted, indirect_damages_adapted, adaptation_run_full, l2_adaptation_costs, overlay_assets_lists = run_adapted_damages(data_path, config_file, collect_output, disrupted_edges_by_basin, interim_data_path, assets, geom_dict, adapted_assets, adaptations_df, rp_spec_priority, adaptation_unit_costs, shortest_paths, graph_v, average_train_load_tons, average_train_cost_per_ton_km, average_road_cost_per_ton_km, demand_reduction_dict)

    # Run adapted damages for full flood event
    indirect_damages_adapted_full = calculate_indirect_dmgs_fullflood(full_flood_event, overlay_assets_lists, adaptation_run_full, assets, all_disrupted_edges, shortest_paths, graph_v, average_train_load_tons, average_train_cost_per_ton_km, average_road_cost_per_ton_km, demand_reduction_dict)

    # Fill in missing values in dictionaries
    for hazard_map in collect_output.keys():
        if direct_damages_adapted[hazard_map]=={}:
            direct_damages_adapted[hazard_map]=collect_output[hazard_map]
        if indirect_damages_adapted[hazard_map]=={}:
            indirect_damages_adapted[hazard_map]=event_impacts[hazard_map] if hazard_map in event_impacts.keys() else 0.0
    
    # Store results in dictionaries
    direct_damages_adapted_dict[adapt_id] = direct_damages_adapted
    indirect_damages_adapted_dict[adapt_id] = indirect_damages_adapted
    indirect_damages_adapted_full_dict[adapt_id] = indirect_damages_adapted_full
    adapted_assets_dict[adapt_id] = adapted_assets
    adaptation_costs[adapt_id] = {'l1': l1_adaptation_costs, 
                                  'l2': l2_adaptation_costs, 
                                  'l3': l3_adaptation_costs}
    adaptations_df.to_csv(data_path / 'interim' / 'adaptations' / f'{adapt_id}_adaptations.csv')
  
Processing 5 scenarios:
-  baseline
-  l1_trib
-  l2_trib
-  l3_trib
-  l4_trib
Adaptation baseline already processed. Skipping.
Adaptation l1_trib already processed. Skipping.
Adaptation l2_trib already processed. Skipping.
Adaptation l3_trib already processed. Skipping.
Adaptation l4_trib already processed. Skipping.

6. Save Raw Output to Files#

# Report output DataFrame
output_df = pd.DataFrame.from_dict([direct_damages_adapted_dict, indirect_damages_adapted_dict, indirect_damages_adapted_full_dict, adapted_assets_dict, adaptation_costs])
output_df.to_csv(data_path / 'output' / 'adaptations' / 'adaptations_output.csv')
output_df
0
1
2
3
4
for adapt_id in adaptations.keys():
    if not adapt_id in direct_damages_adapted_dict.keys():
        continue
    direct_damages_adapted_path = data_path / 'interim' / f'adapted_direct_damages_{adapt_id}.pkl'
    indirect_damages_adapted_path = data_path / 'interim' / f'adapted_indirect_damages_{adapt_id}.pkl'
    indirect_damages_adapted_full_path = data_path / 'interim' / f'adapted_indirect_damages_full_{adapt_id}.pkl'
    # adaptations_df_path = data_path / 'output' / f'adaptations_{adapt_id}.csv'
    adapted_assets_path = data_path / 'interim' / f'adapted_assets_{adapt_id}.pkl'
    adaptation_costs_path = data_path / 'interim' / f'adaptation_costs_{adapt_id}.pkl'
    adaptations_csv_path = data_path / 'interim' / 'adaptations' / f'{adapt_id}_adaptations.csv'


    adaptations_df = pd.DataFrame.from_dict(adaptations[adapt_id])
    

    with open(direct_damages_adapted_path, 'wb') as f:
        pickle.dump(direct_damages_adapted_dict[adapt_id], f)
    with open(indirect_damages_adapted_path, 'wb') as f:
        pickle.dump(indirect_damages_adapted_dict[adapt_id], f)
    with open(indirect_damages_adapted_full_path, 'wb') as f:
        pickle.dump(indirect_damages_adapted_full_dict[adapt_id], f)    
    with open(adapted_assets_path, 'wb') as f:
        pickle.dump(adapted_assets_dict[adapt_id], f)
    with open(adaptation_costs_path, 'wb') as f:
        pickle.dump(adaptation_costs[adapt_id], f)
    print(f'Saved results for adaptation: {adapt_id}')